Sync Docs
Sync HomeLaunch GradientBook Demo
  • Sync Gradient
    • The Gradient Platform
      • How Does it Work?
    • Discover Quickstart
    • Add Workspace
      • Create Sync API Key
      • Add Databricks Workspace
        • AWS Databricks Setup
          • EventBridge Setup
        • Azure Databricks Setup
      • Webhook Setup
    • Project Setup
      • Import Jobs to Projects
      • Verify and Run Jobs
      • Generate and Apply Recommendation
    • Advanced Use Cases
      • Install the Sync-CLI
      • Manual Workspace Setup
        • AWS Instance Profile
      • Apache Airflow for Databricks
      • Gradient Terraform Integration
    • Project Settings
    • Account Settings
    • ROI Reporting
    • FAQ
  • Tutorials & Best Practices
    • Running Gradient in Production
      • Production Auto-Enabled
      • Optimization Windows
      • Development Clones
    • Demos
  • Developer Docs
    • Resources
    • Sync Python Library
    • Gradient CLI Walkthrough
  • Security
    • Privacy and Security Compliance
  • Trust Center
    • Portal
  • Product Announcements
    • Product Updates
  • Need Help?
    • Troubleshooting Guide
Powered by GitBook
On this page
  • Process Overview
  • Prerequisites
  • Steps
  • 1. Import syncsparkpy library function
  • 2. Add relevant params to DAG definition
  • 3. Add pre_execute hook kwarg to DatabricksSubmitRunOperator task and set it to the library function imported in step 1
  • Example Airflow DAG
  • Done!

Was this helpful?

Export as PDF
  1. Sync Gradient
  2. Advanced Use Cases

Apache Airflow for Databricks

PreviousAWS Instance ProfileNextGradient Terraform Integration

Last updated 6 months ago

Was this helpful?

Currently, Gradient is optimized to work with predefined Databricks Jobs and Databricks Workflows. However, the Sync Python Library allows you to integrate your Databricks pipelines when using 3rd party tools like Airflow and Azure Data Factory. Typically, these tools use run_submit() or DatabricksSubmitRunOperator(), found in Databricks' APIs and Databricks' Airflow provider, to initiate runs using Databricks.

If you are using a tool that uses this job invocation method, you can follow the pattern below to submit your event logs to Gradient for evaluation, generate a recommendation, and apply that recommendation to your next job run.

Process Overview

These instructions guide you through the Gradient integration for Airflow DAGs containing DatabricksSubmitRunOperator() tasks through the use of a pre-execute hook.

The pre-execute hook for DatabricksSubmitRunOperator() creates/fetches the relevant project. It then retrieves a recommendation from Gradient for an optimized cluster config for this project. The recommendation overrides the cluster config being passed to DatabricksSubmitRunOperator(). The task then runs with this optimized cluster config instead of the original untuned cluster config.

Prerequisites

Install syncsparkpy via:

pip install git+https://github.com/synccomputingcode/syncsparkpy@v2.0.1a0

Tested with Python 3.8+ and Airflow 2.0+

  • Decide DAG parameters:

    • App ID - An App ID is a human readable unique identifier supplied with each Databricks Job run. Its purpose is to provide criteria by which to group execution metrics. DatabricksSubmitRunOperator tasks that utilize multiple clusters are not currently supported.

    • Auto apply - Whether or not you want recommendations automatically applied

    • Databricks workspace id

Steps

1. Import syncsparkpy library function

from sync.databricks.integrations.airflow import airflow_gradient_pre_execute_hook

2. Add relevant params to DAG definition

In Gradient, a project is a collection of runs of a specific task, or tasks, that correspond to a single compute cluster. Projects are named using atask_id:gradient_app_id format.

The example below shows a DAG with three tasks where each task associated with its own compute. The gradient_app_id is at the DAG level and is set to gradient_databricks_multitask. In Gradient, a project is created for each task in the DAG:

  • bronze_task:gradient_databricks_multitask

  • silver_task:gradient_databricks_multitask

  • gold_task:gradient_databricks_multitask

params={
        'gradient_app_id': 'gradient_databricks_multitask',
        'gradient_auto_apply': True,
        'cluster_log_url': 'dbfs:/cluster-logs',
        'databricks_workspace_id': '10295812058'
    }

3. Add pre_execute hook kwarg to DatabricksSubmitRunOperator task and set it to the library function imported in step 1

pre_execute=airflow_gradient_pre_execute_hook

Example Airflow DAG

Gradient additions are annotated below

from airflow import DAG
from airflow.decorators import task
from airflow.operators.python import PythonVirtualenvOperator
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow.utils.dates import days_ago
from airflow.models.variable import Variable
from airflow.models import TaskInstance





default_args = {
    'owner': 'airflow'
}

with DAG(
    dag_id='gradient_databricks_multitask',
    default_args=default_args,
    schedule_interval = None,
    start_date=days_ago(2),
    tags=['demo'],
    
        'gradient_app_id': gradient_databricks_multitask,
        'gradient_auto_apply': True,
        'cluster_log_url': 'dbfs:/cluster-logs',
        'databricks_workspace_id': '10295812058'
    }
) as dag:

    def get_task_params():
        task_params = {
            "new_cluster":{
                  "node_type_id":"i3.xlarge",
                  "driver_node_type_id":"i3.xlarge",
                  "custom_tags":{},
                  "num_workers":4,
                  "spark_version":"14.0.x-scala2.12",
                  "runtime_engine":"STANDARD",
                  "aws_attributes":{
                     "first_on_demand":0,
                     "availability":"SPOT_WITH_FALLBACK",
                     "spot_bid_price_percent":100
                  }
               },
               "notebook_task":{
                  "notebook_path":"/Users/pete.tamisin@synccomputing.com/gradient_databricks_multitask",
                  "source":"WORKSPACE"
               }
        }

        return task_params

    notebook_task = DatabricksSubmitRunOperator(
        
        task_id="notebook_task",
        dag=dag,
        json=get_task_params(),
    )

##################################################################


    notebook_task

Done!

Databricks Workspace Integrated with Gradient - This process requires a Databricks Workspace Integration to be configured. Detailed instructions are available .

Environment variables for SYNC_API_KEY_ID and SYNC_API_KEY_SECRET . The values for these variables can be found in Gradient under Org Settings -> API Keys. Managing environment variables under Airflow can be found .

syncsparkpy library has been installed and configured in your Airflow instance - steps to configure the library can be found .

Cluster Log Location - A Databricks supported path for cluster log delivery ()

Airflow DAG: Three tasks each running on their own compute
Gradient UI: One project per unique task-compute

Once the code above is implemented in your DAG, head over to the Projects dashboard in Gradient. There you'll be able to easily review and can make changes to the cluster configuration as needed.

here
here
here
See Databricks cluster documentation for details
recommendations