Apache Airflow for Databricks

1. Prepare Your Environment

How to submit a run from DatabricksSubmitRunOperator and request a recommendation from the sync library within a DAG.

Pre-requisites

Please configure your airflow instance with the following in mind:

- Airflow (This tutorial uses 2.0+)
- Python 3.7+
- Sync Library installed and environment variables configured on the airflow instance (details below)
- An s3 path you would like to use for cluster logs - your databricks ARN will need access to this path so it can save the cluster logs there.
- An account with Sync and a Project created to track the task you would like to optimize.

Sync Account Setup and Library Installation

Quick start instructions on how to create an account, project, and install the Sync Library can be found here. Please configure the cli on your airflow instance. When going through the configuration steps, be sure to choose yes when prompted to configure the Databricks variables.

Note: In the quickstart above, there are instructions on using an init script. Copy the contents of the init script into a file on a shared or personal workspace accessible by the account the Databricks job will run as.

Variables

Certain variables are generated and stored during installation of the sync library. For transparency, they are:

- SYNC_API_KEY_ID - https://app.synccomputing.com/account
- SYNC_API_KEY_SECRET - https://app.synccomputing.com/account
- DATABRICKS_HOST - Databricks Host URL
- DATABRICKS_TOKEN - Databricks Personal Access Token
- AWS_REGION_NAME - AWS Region for your Databricks instance

Besides the variables generated by the library, you’ll need the following ENV variables. These are necessary to use the AWS API to retrieve cluster logs when requesting a prediction. DBFS is supported, however, it is not recommended as it goes against Databrick' best practices. As mentioned in the quick start, it's best to set these via the AWS CLI.

- AWS_ACCESS_KEY_ID
- AWS_SECRET_ACCESS_KEY
- AWS_DEFAULT_REGION

2. Configure your Cluster

In preparation for the first run, some specific cluster details need to be configured.

What are we adding?

  • Cluster_log_conf: An s3 path to send our cluster logs. These will be used to generate an optimized recommendation

  • Custom_tags: the sync:project_id tag is added so we can assign the run to a sync project

  • Init_scripts: identifies the init script path that we copied into our Databricks workspace during the quick start setup

  • spark_env_vars: environment variables passed to the cluster that the init script will use.

Note: the retrieval of tokens/keys in this tutorial is simplified to use the information configured during the sync-cli setup process. Passing them in this manner will result in tokens being visible in plaintext when viewing the cluster in Databricks. Please use Databricks Secrets when productionalizing this code.

The rest of the cluster configuration dictionary comprises the typical settings you normally pass into the DatabricksSubmitRunOperator.

from sync.config import DatabricksConf as sync_databricks_conf
from sync.config import get_api_key

{
    "spark_version": "13.0.x-scala2.12",
    ...
    "cluster_log_conf": {
        "s3": {
            "destination": "", # Add the s3 path for the cluster logs
            "enable_encryption": True,
            "region": "", # Add your aws region ie: us-east-1
            "canned_acl": "bucket-owner-full-control",
        }
    },
    "custom_tags": {"sync:project-id": "",}, # Add the project id from Gradient
    ...
    "init_scripts": [
        {"workspace": {
            "destination": "" # Path to the init script in the workspace ie: Shared/init_scripts/init.sh
            }
        }
    ],
    "spark_env_vars": {
        "DATABRICKS_HOST": f"{sync_databricks_conf().host}",
        "DATABRICKS_TOKEN": f"{sync_databricks_conf().token}",
        "SYNC_API_KEY_ID": f"{get_api_key().id}",
        "SYNC_API_KEY_SECRET": f"{get_api_key().secret}",
        "AWS_DEFAULT_REGION": f"{os.environ['AWS_DEFAULT_REGION']}",
        "AWS_ACCESS_KEY_ID": f"{os.environ['AWS_ACCESS_KEY_ID']}",
        "AWS_SECRET_ACCESS_KEY": f"{os.environ['AWS_SECRET_ACCESS_KEY']}",
    }
}

Reminder: the Databricks ARN attached to the cluster will need access to the s3 path specified in the cluster_log_conf.

3. Update Your Databricks Submit Run Operator

Next, we'll ensure the Databricks Operator passes the run_id of the created job back to xcom. This is needed in the subsequent task to request a prediction for the run. Just enable the do_xcom_push parameter.

# DAG code
    ...
    # Submit the Databricks run
    run_operator = DatabricksSubmitRunOperator(
        task_id=...,
        do_xcom_push=True,
    )
    ...

4. Create A Recommendation

Upon successful completion of the DatabricksSubmitRunOperator task, we'll have the run_id we need to create a recommendation for optimal cluster configuration. We'll utilize the PythonOperator to call the create_prediction_for_run method from the Sync Library. Within the library, this method will connect to the Databricks instance to gather the cluster log location, fetch the logs, and generate the recommendation.

Define submit function

Below is an example of how to call the create_prediction_for_run method from the Sync Library.

    # DAG code
from sync.awsdatabricks import create_prediction_for_run
...

    def submit_run_for_recommendation(task_to_submit: str, **kwargs):
        run_id = kwargs["ti"].xcom_pull(task_ids=task_to_submit, key="run_id")
        project_id = "Project_Id_Goes_Here"
        create_prediction_for_run(
            run_id=run_id,
            plan_type="Premium",
            project_id=project_id,
            compute_type="Jobs Compute",
        )

What this code block does:

  • wraps and implements create_prediction_for_run

  • pulls the run_id for the previous task from xcom. We supply the task_to_submit as the task_id that we named the DatabricksSubmitRunOperator.

  • We assign the project id for that task to the project_id variable.

  • We pass our project id, supplied on the project details page in Gradient, to the Sync library method.

Optionally, add a parameter to the submit_run_for_prediction if you'd like to extract this out to the python operator. Edit plan_type and compute_type as needed, these reference your databricks settings.

Implement PythonOperator

To call the submit_run_for_recommendation method we defined, implement the python operator as follows:

    submit_for_recommendation = PythonOperator(
        task_id="submit_for_recommendation",
        python_callable=submit_run_for_recommendation,
        op_kwargs={
            "task_to_submit": "Task_id of the DatabricksSubmitRunOperator of which to run a prediction for",
        },
        provide_context=True,
        retries=0,
    )
    ...
    ...run_operator >> submit_for_recommendation...

5. Putting It All Together

Let’s combine all of the previous steps together in a DAG. The DAG will submit a run to Databricks, and then make a call through Sync's library to generate a prediction for an optimized cluster for that task.

# DAG .py code
from airflow.operators.python_operator import PythonOperator
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from sync.awsdatabricks import create_prediction_for_run
from sync.config import DatabricksConf as sync_databricks_conf
from sync.config import get_api_key


with DAG(
    dag_id="example_dag",
    default_args=default_args,
    tags=["example"],
    ...
) as dag:

    # define the cluster configuration
        cluster_config = {
        "spark_version": "13.0.x-scala2.12",
        ...
        "cluster_log_conf": {
            "s3": {
                "destination": "", # Add the s3 path for the cluster logs
                "enable_encryption": True,
                "region": "", # Add your aws region ie: us-east-1
                "canned_acl": "bucket-owner-full-control",
            }
        },
        "custom_tags": {"sync:project-id": "",}, # Add the project id from Gradient
        ...
        "init_scripts": [
            {"workspace": {
                "destination": "" # Path to the init script in the workspace ie: Shared/init_scripts/init.sh
                }
            }
        ],
        "spark_env_vars": {
            "DATABRICKS_HOST": "", # f"{sync_databricks_conf().host}"
            "DATABRICKS_TOKEN": "", # f"{sync_databricks_conf().token}"
            "SYNC_API_KEY_ID": "", # f"{get_api_key().id}"
            "SYNC_API_KEY_SECRET": "", # f"{get_api_key().secret}"
            "AWS_DEFAULT_REGION": "", # f"{os.environ['AWS_DEFAULT_REGION']}"
            "AWS_ACCESS_KEY_ID": "", # f"{os.environ['AWS_ACCESS_KEY_ID']}"
            "AWS_SECRET_ACCESS_KEY": "", # f"{os.environ['AWS_SECRET_ACCESS_KEY']}",
        }
    }

    # define your databricks operator
    dbx_operator = DatabricksSubmitRunOperator(
        task_id="dbx_operator",
        do_xcom_push=True,
        ...
        new_cluster=cluster_config,
        ...
    )

    # define the submit function to pass to the PythonOperator
    def submit_run_for_recommendation(task_to_submit: str, **kwargs):
    run_id = kwargs["ti"].xcom_pull(task_ids=task_to_submit, key="run_id")
    project_id = "Project_Id_Goes_Here"
    create_prediction_for_run(
        run_id=run_id,
        plan_type="Premium",
        project_id=project_id,
        compute_type="Jobs Compute",
    )

    # define the python operator
    submit_for_recommendation = PythonOperator(
        task_id="submit_for_recommendation",
        python_callable=submit_run_for_recommendation,
        op_kwargs={
            "task_to_submit": "dbx_operator",
        },
        provide_context=True,
        retries=0,
    )

    # define dag dependency
    dbx_operator >> submit_for_recommendation

Helpful library config snippets

To create the .sync file normally configured via sync-cli configure. Replace the values in the following snippet and run it as a bash script.

#!/bin/bash

# Create the .sync directory
mkdir -p ~/.sync

# Create and populate the config file
echo '{
  "default_prediction_preference": "balanced",
  "api_url": "https://api.synccomputing.com"
}' > ~/.sync/config

# Create and populate the sync credentials file
echo '{
  "api_key_id": "",
  "api_key_secret": ""
}' > ~/.sync/credentials

# Create and populate the sync databrickscfg file
echo '{
  "host": "https://[id].cloud.databricks.com",
  "token": "",
  "aws_region_name": ""
}' > ~/.sync/databrickscfg

Done!

Once the code above is implemented into your DAG, head over to the Projects dashboard in Gradient. There you'll be able to easily review recommendations and can make changes to the cluster configuration as needed.

Last updated