Databricks comes with a seamless Apache Airflow integration to schedule complex Data Pipelines.


Apache Airflow

Apache Airflow is a solution for managing and scheduling data pipelines. Airflow represents data pipelines as directed acyclic graphs (DAGs) of operations, where an edge represents a logical dependency between operations.


Install the Airflow Databricks integration

To use Apache Airflow, we need to install the Databricks python package in our Airflow instance. The integration between Airflow and Databricks is available in Airflow version 1.9.0 and above. To install the Airflow Databricks integration, run:

pip install "apache-airflow[databricks]"

Configure a Databricks connection

To use Databricks Airflow Operator you must provide credentials in the appropriate Airflow connection. By default, if you do not specify the databricks_conn_id parameter to DatabricksSubmitRunOperator, the operator tries to find credentials in the connection with the ID equal to databricks_default.

We can setup Databricks Connection as

Databricks_Connection


Airflow Job

Apache Airflow DAG definition looks like below. Here we're using Azure Databricks as the Databricks workspace.

We're also using a Docker Image in our DAG which is stored in ACR (Azure Container Registry). To pull the image from ACR, Databricks expects us to pass Azure Service Principal Client ID and Password. I have stored the credentials as Airflow variables.

Dockerfile
FROM databricksruntime/standard:latest

RUN /databricks/conda/envs/dcs-minimal/bin/pip install pandas
RUN /databricks/conda/envs/dcs-minimal/bin/pip install awscli
RUN /databricks/conda/envs/dcs-minimal/bin/pip install s3fs
DAG
import airflow
import time
from airflow import DAG
from airflow.contrib.operators.databricks_operator import DatabricksSubmitRunOperator
from airflow.models import Variable

args = {
    'owner': 'prateek.dubey',
    'email': ['dataengineeringe2e@gmail.com'],
    'depends_on_past': False,
    'start_date': "2020-06-18"
}

with DAG(dag_id='Airflow_Databricks_Integration', default_args=args,schedule_interval='@daily') as dag:

    new_cluster = {
        'name':'dataengineeringe2e',
        'spark_version': '7.0.x-scala2.12',
        "node_type_id": "Standard_DS12_v2",
        "driver_node_type_id": "Standard_F8s_v2",
        'num_workers': 1,
        'autoscale': {
            'min_workers': 1,
            'max_workers': 2
        },
        'custom_tags': {
            'TeamName': 'DataEngineeringE2E'
        },
        'spark_conf': {
            'spark.databricks.cluster.profile': 'serverless',
            'hive.metastore.schema.verification.record.version': 'TRUE',
            'spark.databricks.repl.allowedLanguages': 'sql,python,r',
            'spark.databricks.delta.preview.enabled': 'TRUE'
        },
        "docker_image": {
            "url": "dataengineeringe2e.azurecr.io/dataengineeringe2e:latest",
            "basic_auth": {
                "username": Variable.get("dataengineeringe2e_sp_client_id"),
                "password": Variable.get("dataengineeringe2e_sp_password"),
            }
        },
    }
    notebook_task_params = {
        'new_cluster': new_cluster,
        'notebook_task': {
            'notebook_path': '/dataengineeringe2e@gmail.com/Airflow_Databricks_Integration',
        }
    }

    notebook_task = DatabricksSubmitRunOperator(
        task_id='Airflow_Databricks_Integration',
        databricks_conn_id='databricks_default',
        dag=dag,
        json=notebook_task_params)

Test Airflow Job

We can now start scheduling our Data Pipelines via Airflow on Databricks Platform. For AWS Databricks, the DAG will look exactly the same except create cluster JSON definition.

Have fun scheduling your jobs via Airflow.