Data Engineering

Apache Airflow 2.0 Examples - A basic DAG template for any project

Over the years I've written a lot of Apache Airflow pipelines (DAGs). Be it in a custom Apache Airflow setup or a Google Cloud Composer instance. I've created a DAG file structure (boilerplate) so that it improved consistency and collaboration within my team, which I'm sharing in this tutorial.

  • Airflow 2.0 starter template, containing practical Airflow concepts and examples
  • Go directly to the code

Concepts and examples

Airflow 2.0 DAG example template

The example / template contains a lot of concepts that I use regularly. I've provided a short description and reference to most of these concepts in the comments. To summarize:

  • It shows how to import default and contributed Operators and Hooks (through providers, like AWS and Google Cloud Platform)
  • Grouped the configuration settings of the DAG in the header section (comes in handy especially within large DAGs)
  • Scheduling and backfilling (using the catchup parameter)
  • Using custom Python functions with the new Airflow 2.0 Taskflow API (instead of the PythonOperator). Also see the tutorial on the Airflow website.
  • Setting task dependency using >> operators. Also see this section of the Airflow documentation
  • Using dynamic tasks: (a selection) of tasks can be placed in a loop, for example multiple different file imports.
  • Using TaskGroups: combining a set of Tasks in a Taskgroup, which will also be grouped in the interface. Usefull in large DAGs.
  • Using macros like executing date {{ ds }} or a modified one, like execution date minus 5 days {{ macros.ds_add(ds, -5) }}

The DAG starter template / boilerplate

The starter template was originally written for Apache Airflow versions 1.9.x. We've rewritten the code for Airflow 2.x and added Airflow 2.0 and added new functionality and concepts (like the Taskflow API). The starter template for Apache Airflow version 1.9.x can be found here.

View code on GitHub

1############################################################
2# Author    Krisjan Oldekamp / Stacktonic.com              #
3# Email     krisjan@stacktonic.com                         #
4############################################################
5
6# Libraries
7import json
8import os
9from datetime import datetime, timedelta
10import pendulum
11
12# Airflow
13from airflow.models import DAG, Variable
14from airflow.utils.task_group import TaskGroup
15from airflow.utils.dates import days_ago
16from airflow.decorators import task
17
18# Default Airflow operators 
19# https://airflow.apache.org/docs/apache-airflow/stable/concepts/operators.html
20
21from airflow.operators.bash import BashOperator
22from airflow.operators.dummy import DummyOperator
23
24# Contributed Airflow operators (providers) 
25# https://airflow.apache.org/docs/apache-airflow-providers/operators-and-hooks-ref/index.html
26
27#from airflow.providers.amazon.aws.transfers.gcs_to_s3 import GCSToS3Operator
28#from airflow.providers.amazon.aws.transfers.local_to_s3 import LocalFilesystemToS3Operator
29
30from airflow.providers.google.cloud.operators.bigquery import BigQueryExecuteQueryOperator
31from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
32from airflow.providers.google.cloud.transfers.bigquery_to_gcs import BigQueryToGCSOperator
33from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
34
35############################################################
36# DAG settings
37############################################################
38
39LOCAL_TZ = pendulum.timezone("Europe/Amsterdam")
40
41DAG_NAME = "stacktonic_example_dag" # DAG name (proposed format: lowercase underscore). Should be unique.
42DAG_DESCRIPTION = "Example DAG by Krisjan Oldekamp / stacktonic.com"
43DAG_START_DATE = datetime(2021, 10, 15, tzinfo=LOCAL_TZ) # Startdate. When setting the "catchup" parameter to True, you can perform a backfill when you insert a specific date here like datetime(2021, 6, 20)
44DAG_SCHEDULE_INTERVAL = "@daily" # Cron notation -> see https://airflow.apache.org/scheduler.html#dag-runs
45DAG_CATCHUP = False # When set to true, DAG will start running from DAG_START_DATE instead of current date
46DAG_PAUSED_UPON_CREATION = True # Defaults to False. When set to True, uploading a DAG for the first time, the DAG doesn't start directly 
47DAG_MAX_ACTIVE_RUNS = 5 # Configure efficiency: Max. number of active runs for this DAG. Scheduler will not create new active DAG runs once this limit is hit. 
48
49############################################################
50# Default DAG arguments
51############################################################
52
53default_args = {
54    "owner": "airflow",
55    "start_date": DAG_START_DATE,
56    "depends_on_past": False,
57    "email": Variable.get("email_monitoring", default_var="<FALLBACK-EMAIL>"), # Make sure you create the "email_monitoring" variable in the Airflow interface
58    "email_on_failure": True,
59    "email_on_retry": False,
60    "retries": 2, # Max. number of retries before failing
61    "retry_delay": timedelta(minutes=60) # Retry delay
62}
63
64############################################################
65# DAG configuration (custom)
66############################################################
67
68SOME_CUSTOM_CONFIG = "yes"
69
70DYNAMIC_TASKS =  [{
71    "account": "123",
72    "some_setting": True
73},{
74    "account": "456",
75    "some_setting": False
76}]
77
78############################################################
79# Python functions (custom) using the Taskflow API decorators (@task)
80############################################################
81
82@task
83def python_function_with_input(value: str):
84    print("Custom Python function, print whatever you want!")
85    print(value)
86
87############################################################
88# Repeatable Airflow Operator functions (for use in dynamic tasks)
89############################################################
90
91def dynamic_operator_task(i):
92    return DummyOperator(
93        task_id="dynamic_task_" + DYNAMIC_TASKS[i]["account"]
94    )
95
96############################################################
97# Main DAG
98#############################################################
99
100# Create DAG
101with DAG(
102    DAG_NAME,
103    description=DAG_DESCRIPTION,
104    schedule_interval=DAG_SCHEDULE_INTERVAL,
105    catchup=DAG_CATCHUP,
106    max_active_runs=DAG_MAX_ACTIVE_RUNS,
107    is_paused_upon_creation=DAG_PAUSED_UPON_CREATION,
108    default_args=default_args) as dag:
109
110    # Start
111    start = DummyOperator(
112        task_id="start")
113
114    # Bash Operator
115    print_start_bash = BashOperator(
116        task_id="print_start_bash",
117        bash_command="echo 'hello {{ ds }}'") # Using a macro for execution date, see other macros https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
118
119    # Wait till finished
120    wait_till_finished = python_function_with_input("All finished")
121
122    # Complete
123    complete = DummyOperator(
124        task_id="complete")
125
126    # Define a taskgroup so tasks will be grouped in the interface -> usefull when you have a lot of repeated task sequences for example.
127    with TaskGroup(group_id='task_group_with_two_tasks') as task_group_with_two_tasks:
128        # Set order of execution within taskgroup (see also https://airflow.apache.org/concepts.html#bitshift-composition)
129        python_function_with_input('taskgroup task #1') >> python_function_with_input('taskgroup task #2')
130
131    # Set task depencency using >> (see also https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html#task-dependencies)
132    start >> print_start_bash
133
134    # Add dynamic (repeating) tasks (https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html#dynamic-dags), looping the dicts from DYNAMIC_TASK and defining order of execution
135    for i, dct in enumerate(DYNAMIC_TASKS):
136
137        print_start_bash \
138        >> python_function_with_input(dct["account"]) \
139        >> [python_function_with_input("Execute together. This is the execution date: {{ ds }} #1"), python_function_with_input("Execute together. This is the execution date minus 5 days: {{ macros.ds_add(ds, -5) }} #2")] \
140        >> wait_till_finished \
141        >> task_group_with_two_tasks \
142        >> dynamic_operator_task(i) \
143        >> complete

Learnings and considerations

Lastly, some learnings and considerations;

  • Use connectors to store sensitive information like keys and passwords
  • Split up your DAGs when they become to large or to complex
  • DRY: Use dynamic tasks (or DAGs) when performing the same flow but with different settings (e.g. file imports for different countries)
  • When using the same (custom) functionality in multiple DAG files, try to write generic, reusable functions
  • Setup alert monitoring properly (tip: Slack of Microsoft Teams notifications)
Did you like this article? Stay hydrated and subscribe to a monthly roundup of our newest articles and tutorials. No spam and you can unsubscribe at any time. You can also checkout my Twitter account for updates.