Apache Airflow for Databricks

Currently, Gradient is optimized to work with predefined Databricks Jobs and Databricks Workflows. However, the Sync Python Library allows you to integrate your Databricks pipelines when using 3rd party tools like Airflow and Azure Data Factory. Typically, these tools use run_submit() or DatabricksSubmitRunOperator(), found in Databricks' APIs and Databricks' Airflow provider, to initiate runs using Databricks.

If you are using a tool that uses this job invocation method, you can follow the pattern below to submit your event logs to Gradient for evaluation, generate a recommendation, and apply that recommendation to your next job run.

Process Overview

These instructions guide you through the Gradient integration for Airflow DAGs containing DatabricksSubmitRunOperator() tasks through the use of a pre-execute hook.

The pre-execute hook for DatabricksSubmitRunOperator() creates/fetches the relevant project. It then retrieves a recommendation from Gradient for an optimized cluster config for this project. The recommendation overrides the cluster config being passed to DatabricksSubmitRunOperator(). The task then runs with this optimized cluster config instead of the original untuned cluster config.

Prerequisites

  • Databricks Workspace Integrated with Gradient - This process requires a Databricks Workspace Integration to be configured. Detailed instructions are available here.

  • syncsparkpy library has been installed and configured in your Airflow instance - steps to configure the library can be found here.

Install syncsparkpy via:

pip install git+https://github.com/synccomputingcode/syncsparkpy@v2.0.1a0

Tested with Python 3.8+ and Airflow 2.0+

  • Decide DAG parameters:

    • Cluster Log Location - A Databricks supported path for cluster log delivery (See Databricks cluster documentation for details)

    • App ID - An App ID is a human readable unique identifier supplied with each Databricks Job run. Its purpose is to provide criteria by which to group execution metrics. DatabricksSubmitRunOperator tasks that utilize multiple clusters are not currently supported.

    • Auto apply - Whether or not you want recommendations automatically applied

    • Databricks workspace id

Steps

1. Import syncsparkpy library function

from sync.databricks.integrations.airflow import airflow_gradient_pre_execute_hook

2. Add relevant params to DAG definition

In Gradient, a project is a collection of runs of a specific task, or tasks, that correspond to a single compute cluster. Projects are named using atask_id:gradient_app_id format.

The example below shows a DAG with three tasks where each task associated with its own compute. The gradient_app_id is at the DAG level and is set to gradient_databricks_multitask. In Gradient, a project is created for each task in the DAG:

  • bronze_task:gradient_databricks_multitask

  • silver_task:gradient_databricks_multitask

  • gold_task:gradient_databricks_multitask

params={
        'gradient_app_id': 'gradient_databricks_multitask',
        'gradient_auto_apply': True,
        'cluster_log_url': 'dbfs:/cluster-logs',
        'databricks_workspace_id': '10295812058'
    }

3. Add pre_execute hook kwarg to DatabricksSubmitRunOperator task and set it to the library function imported in step 1

pre_execute=airflow_gradient_pre_execute_hook

Example Airflow DAG

Gradient additions are annotated below

Done!

Once the code above is implemented in your DAG, head over to the Projects dashboard in Gradient. There you'll be able to easily review recommendations and can make changes to the cluster configuration as needed.

Last updated