Often Airflow DAGs become too big and complicated to understand. They get split between different teams within a company for future implementation and support. It may end up with a problem of incorporating different DAGs into one pipeline.

I had exactly this problem — I had to connect two independent but logically connected DAGs. In this post, we gonna discuss what options are available in Airflow for connecting dependent DAGs with each other.

Example setup

Let's start by setting up an example. Let's imagine that we have an ETL process divided between 3 independent DAGs — extract, transform, and load.

For the example to be more illustrative, we need at least a Local executor so that more than one task can be run in parallel. To do this I will use this docker-compose file with Airflow, PostgreSQL pre-installed and LocalExecutor pre-configured.

$ docker-compose -f docker-compose.yml up -d

Extract dag:

# dags/extract.py

from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators import BashOperator

FREQUENCY = '*/10 * * * *'
DEFAULT_ARGS = {
    'depends_on_past': False,
    'start_date': datetime.today() - timedelta(1),
    'retries': 1,
    'catchup': False,
}

dag = DAG(
    'extract_dag',
    default_args=DEFAULT_ARGS,
    schedule_interval=FREQUENCY,
    catchup=False,
)
# Printing message at the logs and sleep for 2 seconds
t1 = BashOperator(
    task_id='task_1',
    bash_command='echo "Extracting stuff from s3";sleep 2;',
    dag=dag)
# Printing message at the logs and sleep for 2 seconds
t2 = BashOperator(
    task_id='task_2',
    bash_command='echo "Extracting stuff from jdbc";sleep 2;',
    dag=dag)

t1.set_downstream(t2)

Transform dag:

# dags/transform.py
from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators import BashOperator

FREQUENCY = '*/10 * * * *'
DEFAULT_ARGS = {
    'depends_on_past': False,
    'start_date': datetime.today() - timedelta(1),
    'retries': 1,
    'catchup': False,
}

dag = DAG(
    'transform_dag',
    default_args=DEFAULT_ARGS,
    schedule_interval=FREQUENCY,
    catchup=False,
)
# Printing message at the logs and sleep for 2 seconds
t1 = BashOperator(
    task_id='task_1',
    bash_command='echo "Transforming stuff from s3";sleep 2;',
    dag=dag)
# Printing message at the logs and sleep for 2 seconds
t2 = BashOperator(
    task_id='task_2',
    bash_command='echo "Transforming stuff from jdbc";sleep 2;',
    dag=dag)

t1.set_downstream(t2)

Load dag:

# dags/load.py
from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators import BashOperator

FREQUENCY = '*/10 * * * *'
DEFAULT_ARGS = {
    'depends_on_past': False,
    'start_date': datetime.today() - timedelta(1),
    'retries': 1,
    'catchup': False,
}

dag = DAG(
    'load_dag',
    default_args=DEFAULT_ARGS,
    schedule_interval=FREQUENCY,
    catchup=False,
)
t1 = BashOperator(
    task_id='task_1',
    bash_command='echo "Loading stuff to s3";sleep 2;',
    dag=dag)

t2 = BashOperator(
    task_id='task_2',
    bash_command='echo "Loading stuff to hive";sleep 2;',
    dag=dag)

t1.set_downstream(t2)

If those DAGs were tasks in the same DAG, we could just add those lines to the DAG file:

extract.set_downstream(transform)
transform.set_downstream(load)

However, since they are not in the same DAG, we cannot do this. But there are ways to achieve the same in Airflow.

TriggerDagRunOperator

TriggerDagRunOperator is an operator that can call external DAGs. With this operator and external DAG identifiers, we can easily trigger them.

It is necessary that the external DAGs are turned on. If this is not the case then they will still be triggered but will not be run — just stuck in the running state.

The cool thing about this operator is that the DAG runs are saved in the history of these same DAGs as well as the logs. So you see all dag runs in just one page instead of digging into the airflow UI which seems very convenient for me.

For TriggerDagRunOperator we need a controller, a function that controls the start of the target DAG based on some condition. This condition can use the execution context passed to the function and can be quite complex.

def conditionally_trigger(context, dag_run_obj):
    if context['params']['condition']:
        return dag_run_obj

In the controller function, if the dag_run_obj object is returned, the dag will be triggered. In the example above, a function simply returns this object, i.e. it always triggers.

To look closer at the context object, we can print it out. In the output we see a huge dictionary with a lot of information about the current run:

{
'conf': <airflow.configuration.AirflowConfigParser object at 0x7fb1a8c722d0>, 
'dag': <DAG: etl_with_trigger>, 
'ds': ..., 'next_ds': ..., 'next_ds_nodash': ..., 'prev_ds': ..., 
'prev_ds_nodash': ..., 'ds_nodash': ..., 'ts': ..., 'ts_nodash': ..., 
'ts_nodash_with_tz': ..., 'yesterday_ds': ..., 'yesterday_ds_nodash': ...,
'tomorrow_ds': ..., 'tomorrow_ds_nodash': ..., 'END_DATE': ...,
'end_date': ..., 'dag_run': ..., 'run_id': ..., 'execution_date': ...,
'prev_execution_date': ..., 'prev_execution_date_success': ...,
'prev_start_date_success': ..., 'next_execution_date': ...,
'latest_date': ..., 'macros': ...,
'params': {'condition': True}, 
'tables': ...,  'task': ..., 'task_instance': ..., 'ti': ...,
'task_instance_key_str': ..., 'test_mode': False,  
'var': {'value': None, 'json': None}, 'inlets': ..., 
'outlets': ...
}

Below is an example of a DAG that will run every 5 minutes and trigger three more DAGs using TriggerDagRunOperator.

from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator

FREQUENCY = '*/5 * * * *'
DEFAULT_ARGS = {
    'depends_on_past': False,
    'start_date': datetime.today() - timedelta(1),
    'retries': 1,
    'catchup': False,
}

dag = DAG(
    'etl_with_trigger',
    description='DAG with trigger',
    default_args=DEFAULT_ARGS,
    schedule_interval=FREQUENCY,
    catchup=False,
)
trigger_extract_dag = TriggerDagRunOperator(
    task_id='trigger_extract_dag',
    trigger_dag_id='extract_dag',
    python_callable=conditionally_trigger,
    params={'condition': True},
    dag=dag,
)
trigger_transform_dag = TriggerDagRunOperator(
    task_id='trigger_transform_dag',
    trigger_dag_id='transform_dag',
    python_callable=conditionally_trigger,
    params={'condition': True},
    dag=dag,
)
trigger_load_dag = TriggerDagRunOperator(
    task_id='trigger_load_dag',
    trigger_dag_id='load_dag',
    python_callable=conditionally_trigger,
    params={'condition': True},
    dag=dag,
)

trigger_extract_dag.set_downstream(trigger_transform_dag)
trigger_transform_dag.set_downstream(trigger_load_dag)

The trigger_dag_id here is simply the identification of the external DAG you want to trigger.

Summing up, TriggerDagRunOperator can be used to run some heavy or costly dags that need to be run only when certain conditions are met. But TriggerDagRunOperator works in a fire-and-forget way. This means that the parent DAG doesn't wait until the triggered DAGs are complete before starting the next task.

ExternalTaskSensor

One way of signaling task completion between DAGs is to use sensors. Sensors in Airflow is a special type of task. It checks whether certain criteria are met before it complete and let their downstream tasks execute.

This is a great way to create a connection between the DAG and the external system. This external system can be another DAG when using ExternalTaskSensor. ExternalTaskSensor regularly pokes the execution state of child DAGs and waits till they get to the desired state, described in the allowed_states parameter. By default, the desired state is success.

There are two things that the ExternalTaskSensor assumes:

  • Child DAGs shouldn't be manually triggered in order to get the sensor working;
  • Child DAGs should run on the same execution date as the parent DAG, meaning they should have the same schedule interval.

To configure the sensor, we need the identifier of another DAG, the dag_id. Additionally, we can also specify the external_task_id identifier of a task within the DAG if we want to wait for a particular task to finish. If we want to wait for the whole DAG we must set it to None

from datetime import datetime, timedelta

from airflow import DAG
from airflow.sensors.external_task_sensor import ExternalTaskSensor

FREQUENCY = '*/5 * * * *'
DEFAULT_ARGS = {
    'depends_on_past': False,
    'start_date': datetime.today() - timedelta(1),
    'retries': 1,
    'catchup': False,
}

dag = DAG(
    'etl_with_sensor',
    description='DAG with sensor',
    default_args=DEFAULT_ARGS,
    schedule_interval=FREQUENCY,
    catchup=False,
)
waiting_extract_dag = ExternalTaskSensor(
    task_id='waiting_extract_dag',  # waiting for the whole dag to execute
    execution_delta=None,  # Same day as today
    external_dag_id='extract_dag',  # here is the id of the dag
    external_task_id=None,  # waiting for the whole dag to execute 
    dag=dag,
    timeout=60,
)
waiting_transform_dag = ExternalTaskSensor(
    task_id='waiting_transform_dag',  # waiting for the whole dag to execute
    execution_delta=None,  # Same day as today
    external_dag_id='transform_dag',  # here is the id of the dag
    external_task_id=None,  # waiting for the whole dag to execute 
    dag=dag,
    timeout=60,
)
waiting_load_dag = ExternalTaskSensor(
    task_id='waiting_load_dag',  # waiting for the whole dag to execute
    execution_delta=None,  # Same day as today
    external_dag_id='load_dag',  # here is the id of the dag
    external_task_id=None,  # waiting for the whole dag to execute 
    dag=dag,
    timeout=60,
)

waiting_extract_dag.set_downstream(waiting_transform_dag)
waiting_transform_dag.set_downstream(waiting_load_dag)

I have also set the timeout parameter here, as I think it is necessary. If the timeout is not set and some of our dags are not working, the sensors will be stuck in a running state, which can cause the whole Airflow to hang when the maximum tasks are running.

ExternalTaskSensor assumes that it dependents on a task in a DAG run with the same execution date. Otherwise, you need to use the execution_delta or execution_date_fn when you instantiate the sensor.

But can we combine ExternalTaskSensor and TriggerDagRunOperator to wait for one dag to complete before triggering the next one? That's what we want, right?

Of course, we can. But we need to do some extra steps for that:

  • Create a dependency between TriggerDagRunOperator and ExternalTaskSensor
  • Delete the child DAGs schedule by making schedule_interval=None so that these dags are triggered only from our DAG.
  • ExternalTaskSensor should know the specific TriggerDagRunOperator execution time up to a second. It will need to be implemented because Airflow does not have this functionality.

But all of this sounds complicated and unnecessary when Airflow has a SubDagOperator.

SubDagOperator

SubDAG is a pluggable DAG that can be inserted into a parent DAG. It allows DAG developers to better organize complex DAG definitions and reuse existing DAGs with SubDagOperator.

This is an ideal solution to my problem, which essentially can be presented as TriggerDagRunOperator + ExternalTaskSensor without adding additional complexity and unnecessary operators.

from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.subdag_operator import SubDagOperator
from extract import dag as extract_dag
from transform import dag as transform_dag
from load import dag as load_dag

FREQUENCY = '*/5 * * * *'
DEFAULT_ARGS = {
    'depends_on_past': False,
    'start_date': datetime.today() - timedelta(1),
    'retries': 1,
    'catchup': False,
}

dag = DAG(
    'etl_with_subdag',
    description='DAG with subdag',
    default_args=DEFAULT_ARGS,
    schedule_interval=FREQUENCY,
    catchup=False,
)
waiting_extract_dag_id = 'waiting_extract_dag'
waiting_transform_dag_id = 'waiting_transform_dag'
waiting_load_dag_id = 'waiting_load_dag'
extract_dag.dag_id = '{}.{}'.format(dag.dag_id, waiting_extract_dag_id)
transform_dag.dag_id = '{}.{}'.format(dag.dag_id, waiting_transform_dag_id)
load_dag.dag_id = '{}.{}'.format(dag.dag_id, waiting_load_dag_id)

waiting_extract_dag = SubDagOperator(
    task_id=waiting_extract_dag_id,
    subdag=extract_dag,
    dag=dag,
)
waiting_transform_dag = SubDagOperator(
    task_id=waiting_transform_dag_id,
    subdag=transform_dag,
    dag=dag,
)
waiting_load_dag = SubDagOperator(
    task_id=waiting_load_dag_id,
    subdag=load_dag,
    dag=dag,
)

waiting_extract_dag.set_downstream(waiting_transform_dag)
waiting_transform_dag.set_downstream(waiting_load_dag)

Here you can see that instead of dag_id SubDAG uses real DAG objects imported from another part of the code.

Zoom into SubDAG

In essence, all SubDAGs are part of a parent DAG in every sense — you will not see their runs in the DAG history or logs. In Airflow UI there is a "Zoom into Sub DAG" button to see the child DAGs internals.

The documentation says that the best way to create such DAGs is to use the factory method, but I have neglected this to simplify the code. Keep in mind that without the factory method you can see SubDAG as a normal DAG on the admin panel in Airflow UI, but for some reason, it is not always the case.


Buy me a coffee