Airflow Dynamic Task Mapping

Use Case

SCENARIO: Suppose you have daily data delivery to an S3/GCS bucket and want to apply the same processing to every file that arrives, but the number of files that arrive each day might be different.

  • Need to know beforehand list of all files.
  • Set some Sensors to determine if particular file exists for today and short-circuit/skip downstream tasks.
  • Process files that are available today.
  • Process downstream tasks or trigger another DAG
  • etc..
Figure 1. Common Approach
Figure 2. Another Common (Bad) Approach
  • a task get_list_of_files to list all files in the S3/GCS bucket
  • a PythonOperator to iterate through list of files
[2022-07-07, 23:20:15] {test.py:28} Processing file: file_0
[2022-07-07, 23:20:16] {test.py:30} File file_0 has been processed!
[2022-07-07, 23:20:16] {test.py:28} Processing file: file_1
[2022-07-07, 23:20:17] {test.py:30} File file_1 has been processed!
[2022-07-07, 23:20:17] {test.py:28} Processing file: file_2
[2022-07-07, 23:20:18] {test.py:30} File file_2 has been processed!
[2022-07-07, 23:20:18] {test.py:28} Processing file: file_3
[2022-07-07, 23:20:19] {test.py:30} File file_3 has been processed!
[2022-07-07, 23:20:19] {test.py:28} Processing file: file_4
[2022-07-07, 23:20:20] {test.py:30} File file_4 has been processed!
Photo by Jason Strull on Unsplash

Dynamic Task Mapping

Now, enter Airflow 2.3.0! With its new feature, we can simply design the DAG like this:

Figure 3. Dynamic Task Mapping on Airflow 2.3
Figure 4. Mapped Task Instance
Photo by Ainara Oto on Unsplash
Figure 5. Graph View After Re-run
Figure 6. Mapped Task Instance After Re-Run

Sample Working Code

Below is sample working code for reference:

import logging

import pendulum
from airflow import DAG, XComArg
from airflow.operators.python import PythonOperator

from random import randint


# generate list with random number of elements
# to simulate number of files in S3/GCS which can changes daily
def generate_random_list():
return [[i] for i in range(randint(0, 7))]


def printer_task(num):
logging.info(f"Currently working on this file number: {num}")

return


with DAG(
dag_id="sample_test_dag",
schedule_interval=None,
start_date=pendulum.today('UTC').add(days=-1),
catchup=False,
max_active_runs=1,
) as dag:

get_list_of_files = PythonOperator(
task_id="get_list_of_files",
python_callable=generate_random_list
)

printing_task = PythonOperator.partial(
task_id="printing_task",
python_callable=printer_task,
).expand(op_args=XComArg(get_list_of_files))

get_list_of_files >> printing_task

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store