Airflow Dynamic Task Mapping

Use Case

SCENARIO: Suppose you have daily data delivery to an S3/GCS bucket and want to apply the same processing to every file that arrives, but the number of files that arrive each day might be different.

  • Need to know beforehand list of all files.
  • Set some Sensors to determine if particular file exists for today and short-circuit/skip downstream tasks.
  • Process files that are available today.
  • Process downstream tasks or trigger another DAG
  • etc..
Figure 1. Common Approach
Figure 2. Another Common (Bad) Approach
  • a task get_list_of_files to list all files in the S3/GCS bucket
  • a PythonOperator to iterate through list of files
[2022-07-07, 23:20:15] {} Processing file: file_0
[2022-07-07, 23:20:16] {} File file_0 has been processed!
[2022-07-07, 23:20:16] {} Processing file: file_1
[2022-07-07, 23:20:17] {} File file_1 has been processed!
[2022-07-07, 23:20:17] {} Processing file: file_2
[2022-07-07, 23:20:18] {} File file_2 has been processed!
[2022-07-07, 23:20:18] {} Processing file: file_3
[2022-07-07, 23:20:19] {} File file_3 has been processed!
[2022-07-07, 23:20:19] {} Processing file: file_4
[2022-07-07, 23:20:20] {} File file_4 has been processed!
Photo by Jason Strull on Unsplash

Dynamic Task Mapping

Now, enter Airflow 2.3.0! With its new feature, we can simply design the DAG like this:

Figure 3. Dynamic Task Mapping on Airflow 2.3
Figure 4. Mapped Task Instance
Photo by Ainara Oto on Unsplash
Figure 5. Graph View After Re-run
Figure 6. Mapped Task Instance After Re-Run

Sample Working Code

Below is sample working code for reference:

import logging

import pendulum
from airflow import DAG, XComArg
from airflow.operators.python import PythonOperator

from random import randint

# generate list with random number of elements
# to simulate number of files in S3/GCS which can changes daily
def generate_random_list():
return [[i] for i in range(randint(0, 7))]

def printer_task(num):"Currently working on this file number: {num}")


with DAG(
) as dag:

get_list_of_files = PythonOperator(

printing_task = PythonOperator.partial(

get_list_of_files >> printing_task



Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store