Apache Airflow for Databricks
Last updated
Last updated
Currently, Gradient is optimized to work with predefined Databricks Jobs and Databricks Workflows. However, the Sync Python Library allows you to integrate your Databricks pipelines when using 3rd party tools like Airflow and Azure Data Factory. Typically, these tools use run_submit()
or DatabricksSubmitRunOperator()
, found in Databricks' APIs and Databricks' Airflow provider, to initiate runs using Databricks.
If you are using a tool that uses this job invocation method, you can follow the pattern below to submit your event logs to Gradient for evaluation, generate a recommendation, and apply that recommendation to your next job run.
These instructions guide you through the Gradient integration for Airflow DAGs containing DatabricksSubmitRunOperator()
tasks through the use of a pre-execute hook.
The pre-execute hook for DatabricksSubmitRunOperator()
creates/fetches the relevant project. It then retrieves a recommendation from Gradient for an optimized cluster config for this project. The recommendation overrides the cluster config being passed to DatabricksSubmitRunOperator()
. The task then runs with this optimized cluster config instead of the original untuned cluster config.
Databricks Workspace Integrated with Gradient - This process requires a Databricks Workspace Integration to be configured. Detailed instructions are available here.
Environment variables for SYNC_API_KEY_ID
and SYNC_API_KEY_SECRET
. The values for these variables can be found in Gradient under Org Settings -> API Keys. Managing environment variables under Airflow can be found here.
syncsparkpy
library has been installed and configured in your Airflow instance - steps to configure the library can be found here.
Install syncsparkpy
via:
pip install git+https://github.com/synccomputingcode/syncsparkpy@v2.0.1a0
Tested with Python 3.8+ and Airflow 2.0+
Decide DAG parameters:
Cluster Log Location - A Databricks supported path for cluster log delivery (See Databricks cluster documentation for details)
App ID - An App ID is a human readable unique identifier supplied with each Databricks Job run. Its purpose is to provide criteria by which to group execution metrics. DatabricksSubmitRunOperator tasks that utilize multiple clusters are not currently supported.
Auto apply - Whether or not you want recommendations automatically applied
Databricks workspace id
syncsparkpy
library functionIn Gradient, a project is a collection of runs of a specific task, or tasks, that correspond to a single compute cluster. Projects are named using atask_id:gradient_app_id
format.
The example below shows a DAG with three tasks where each task associated with its own compute. The gradient_app_id
is at the DAG level and is set to gradient_databricks_multitask
. In Gradient, a project is created for each task in the DAG:
bronze_task:gradient_databricks_multitask
silver_task:gradient_databricks_multitask
gold_task:gradient_databricks_multitask
pre_execute
hook kwarg to DatabricksSubmitRunOperator
task and set it to the library function imported in step 1Gradient additions are annotated below
Once the code above is implemented in your DAG, head over to the Projects dashboard in Gradient. There you'll be able to easily review recommendations and can make changes to the cluster configuration as needed.