Comment on page
Apache Airflow for Databricks

How to submit a run from
DatabricksSubmitRunOperator
and request a recommendation from the sync library within a DAG.Please configure your airflow instance with the following in mind:
- Airflow (This tutorial uses 2.0+)
- Python 3.7+
- Sync Library installed and environment variables configured on the airflow instance (details below)
- An s3 path you would like to use for cluster logs - your databricks ARN will need access to this path so it can save the cluster logs there.
- An account with Sync and a Project created to track the task you would like to optimize.
Quick start instructions on how to create an account, project, and install the Sync Library can be found here. Please configure the cli on your airflow instance. When going through the configuration steps, be sure to choose yes when prompted to configure the Databricks variables.
Note: In the quickstart above, there are instructions on using an init script. Copy the contents of the init script into a file on a shared or personal workspace accessible by the account the Databricks job will run as.
Certain variables are generated and stored during installation of the sync library. For transparency, they are:
- SYNC_API_KEY_ID - https://app.synccomputing.com/account
- SYNC_API_KEY_SECRET - https://app.synccomputing.com/account
- DATABRICKS_HOST - Databricks Host URL
- DATABRICKS_TOKEN - Databricks Personal Access Token
- AWS_REGION_NAME - AWS Region for your Databricks instance
Besides the variables generated by the library, you’ll need the following ENV variables. These are necessary to use the AWS API to retrieve cluster logs when requesting a prediction. DBFS is supported, however, it is not recommended as it goes against Databrick' best practices. As mentioned in the quick start, it's best to set these via the AWS CLI.
- AWS_ACCESS_KEY_ID
- AWS_SECRET_ACCESS_KEY
- AWS_DEFAULT_REGION
In preparation for the first run, some specific cluster details need to be configured.
- Cluster_log_conf: An s3 path to send our cluster logs. These will be used to generate an optimized recommendation
- Custom_tags: the sync:project_id tag is added so we can assign the run to a sync project
- Init_scripts: identifies the init script path that we copied into our Databricks workspace during the quick start setup
- spark_env_vars: environment variables passed to the cluster that the init script will use.
Note: the retrieval of tokens/keys in this tutorial is simplified to use the information configured during the sync-cli setup process. Passing them in this manner will result in tokens being visible in plaintext when viewing the cluster in Databricks. Please use Databricks Secrets when productionalizing this code.
The rest of the cluster configuration dictionary comprises the typical settings you normally pass into the DatabricksSubmitRunOperator.
from sync.config import DatabricksConf as sync_databricks_conf
from sync.config import get_api_key
{
"spark_version": "13.0.x-scala2.12",
...
"cluster_log_conf": {
"s3": {
"destination": "", # Add the s3 path for the cluster logs
"enable_encryption": True,
"region": "", # Add your aws region ie: us-east-1
"canned_acl": "bucket-owner-full-control",
}
},
"custom_tags": {"sync:project-id": "",}, # Add the project id from Gradient
...
"init_scripts": [
{"workspace": {
"destination": "" # Path to the init script in the workspace ie: Shared/init_scripts/init.sh
}
}
],
"spark_env_vars": {
"DATABRICKS_HOST": f"{sync_databricks_conf().host}",
"DATABRICKS_TOKEN": f"{sync_databricks_conf().token}",
"SYNC_API_KEY_ID": f"{get_api_key().id}",
"SYNC_API_KEY_SECRET": f"{get_api_key().secret}",
"AWS_DEFAULT_REGION": f"{os.environ['AWS_DEFAULT_REGION']}",
"AWS_ACCESS_KEY_ID": f"{os.environ['AWS_ACCESS_KEY_ID']}",
"AWS_SECRET_ACCESS_KEY": f"{os.environ['AWS_SECRET_ACCESS_KEY']}",
}
}
Reminder: the Databricks ARN attached to the cluster will need access to the s3 path specified in the cluster_log_conf.
Next, we'll ensure the Databricks Operator passes the
run_id
of the created job back to xcom. This is needed in the subsequent task to request a prediction for the run. Just enable the do_xcom_push
parameter.# DAG code
...
# Submit the Databricks run
run_operator = DatabricksSubmitRunOperator(
task_id=...,
do_xcom_push=True,
)
...
Upon successful completion of the DatabricksSubmitRunOperator task, we'll have the
run_id
we need to create a recommendation for optimal cluster configuration. We'll utilize the PythonOperator to call the create_prediction_for_run
method from the Sync Library. Within the library, this method will connect to the Databricks instance to gather the cluster log location, fetch the logs, and generate the recommendation.Below is an example of how to call the create_prediction_for_run method from the Sync Library.
# DAG code
from sync.awsdatabricks import create_prediction_for_run
...
def submit_run_for_recommendation(task_to_submit: str, **kwargs):
run_id = kwargs["ti"].xcom_pull(task_ids=task_to_submit, key="run_id")
project_id = "Project_Id_Goes_Here"
create_prediction_for_run(
run_id=run_id,
plan_type="Premium",
project_id=project_id,
compute_type="Jobs Compute",
)
What this code block does:
- wraps and implements create_prediction_for_run
- pulls the run_id for the previous task from xcom. We supply the task_to_submit as the task_id that we named the DatabricksSubmitRunOperator.
- We assign the project id for that task to the project_id variable.
- We pass our project id, supplied on the project details page in Gradient, to the Sync library method.
Optionally, add a parameter to the
submit_run_for_prediction
if you'd like to extract this out to the python operator. Edit plan_type and compute_type as needed, these reference your databricks settings.To call the
submit_run_for_recommendation
method we defined, implement the python operator as follows: submit_for_recommendation = PythonOperator(
task_id="submit_for_recommendation",
python_callable=submit_run_for_recommendation,
op_kwargs={
"task_to_submit": "Task_id of the DatabricksSubmitRunOperator of which to run a prediction for",
},
provide_context=True,
retries=0,
)
...
...run_operator >> submit_for_recommendation...
Let’s combine all of the previous steps together in a DAG. The DAG will submit a run to Databricks, and then make a call through Sync's library to generate a prediction for an optimized cluster for that task.
# DAG .py code
from airflow.operators.python_operator import PythonOperator
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from sync.awsdatabricks import create_prediction_for_run
from sync.config import DatabricksConf as sync_databricks_conf
from sync.config import get_api_key
with DAG(
dag_id="example_dag",
default_args=default_args,
tags=["example"],
...
) as dag:
# define the cluster configuration
cluster_config = {
"spark_version": "13.0.x-scala2.12",
...
"cluster_log_conf": {
"s3": {
"destination": "", # Add the s3 path for the cluster logs
"enable_encryption": True,
"region": "", # Add your aws region ie: us-east-1
"canned_acl": "bucket-owner-full-control",
}
},
"custom_tags": {"sync:project-id": "",}, # Add the project id from Gradient
...
"init_scripts": [
{"workspace": {
"destination": "" # Path to the init script in the workspace ie: Shared/init_scripts/init.sh
}
}
],
"spark_env_vars": {
"DATABRICKS_HOST": "", # f"{sync_databricks_conf().host}"
"DATABRICKS_TOKEN": "", # f"{sync_databricks_conf().token}"
"SYNC_API_KEY_ID": "", # f"{get_api_key().id}"
"SYNC_API_KEY_SECRET": "", # f"{get_api_key().secret}"
"AWS_DEFAULT_REGION": "", # f"{os.environ['AWS_DEFAULT_REGION']}"
"AWS_ACCESS_KEY_ID": "", # f"{os.environ['AWS_ACCESS_KEY_ID']}"
"AWS_SECRET_ACCESS_KEY": "", # f"{os.environ['AWS_SECRET_ACCESS_KEY']}",
}
}
# define your databricks operator
dbx_operator = DatabricksSubmitRunOperator(
task_id="dbx_operator",
do_xcom_push=True,
...
new_cluster=cluster_config,
...
)
# define the submit function to pass to the PythonOperator
def submit_run_for_recommendation(task_to_submit: str, **kwargs):
run_id = kwargs["ti"].xcom_pull(task_ids=task_to_submit, key="run_id")
project_id = "Project_Id_Goes_Here"
create_prediction_for_run(
run_id=run_id,
plan_type="Premium",
project_id=project_id,
compute_type="Jobs Compute",
)
# define the python operator
submit_for_recommendation = PythonOperator(
task_id="submit_for_recommendation",
python_callable=submit_run_for_recommendation,
op_kwargs={
"task_to_submit": "dbx_operator",
},
provide_context=True,
retries=0,
)
# define dag dependency
dbx_operator >> submit_for_recommendation
To create the
.sync
file normally configured via sync-cli configure
. Replace the values in the following snippet and run it as a bash script.#!/bin/bash
# Create the .sync directory
mkdir -p ~/.sync
# Create and populate the config file
echo '{
"default_prediction_preference": "balanced",
"api_url": "https://api.synccomputing.com"
}' > ~/.sync/config
# Create and populate the sync credentials file
echo '{
"api_key_id": "",
"api_key_secret": ""
}' > ~/.sync/credentials
# Create and populate the sync databrickscfg file
echo '{
"host": "https://[id].cloud.databricks.com",
"token": "",
"aws_region_name": ""
}' > ~/.sync/databrickscfg
Once the code above is implemented into your DAG, head over to the Projects dashboard in Gradient. There you'll be able to easily review recommendations and can make changes to the cluster configuration as needed.
Last modified 3d ago