Airflow Dynamic Task Mapping
Yeay, we get another new feature from Airflow 2.3! This time it’s called Dynamic Task Mapping. Cool name, I suppose? Quoted from Airflow documentation, this is the brief explanation of the new feature:
Dynamic Task Mapping allows a way for a workflow to create a number of tasks at runtime based upon current data, rather than the DAG author having to know in advance how many tasks would be needed.
This is similar to defining your tasks in a for loop, but instead of having the DAG file fetch the data and do that itself, the scheduler can do this based on the output of a previous task. Right before a mapped task is executed the scheduler will create n copies of the task, one for each input.
It is also possible to have a task operate on the collected output of a mapped task, commonly known as map and reduce.
Use Case
SCENARIO: Suppose you have daily data delivery to an S3/GCS bucket and want to apply the same processing to every file that arrives, but the number of files that arrive each day might be different.
Previously, when using Airflow version ≤ 2.2.x, the most common implementation to tackle this problem is described below:
- Need to know beforehand list of all files.
- Set some Sensors to determine if particular file exists for today and short-circuit/skip downstream tasks.
- Process files that are available today.
- Process downstream tasks or trigger another DAG
- etc..
This approach works quite well if the number of files in the bucket for daily delivery stays constant over the time. When there is a new file coming, we need to modify the DAG code to add the it to the task list. Not very convenient, huh?
Another approach that might be used (while more convenient, but in my opinion is badly designed), is something like this:
The DAG is designed to have:
- a task
get_list_of_files
to list all files in the S3/GCS bucket - a PythonOperator to iterate through list of files
The log might be looking something like this:
[2022-07-07, 23:20:15] {test.py:28} Processing file: file_0
[2022-07-07, 23:20:16] {test.py:30} File file_0 has been processed!
[2022-07-07, 23:20:16] {test.py:28} Processing file: file_1
[2022-07-07, 23:20:17] {test.py:30} File file_1 has been processed!
[2022-07-07, 23:20:17] {test.py:28} Processing file: file_2
[2022-07-07, 23:20:18] {test.py:30} File file_2 has been processed!
[2022-07-07, 23:20:18] {test.py:28} Processing file: file_3
[2022-07-07, 23:20:19] {test.py:30} File file_3 has been processed!
[2022-07-07, 23:20:19] {test.py:28} Processing file: file_4
[2022-07-07, 23:20:20] {test.py:30} File file_4 has been processed!
In case of new files coming, the get_list_of_files
function will produce another list with more elements, so maybe we don’t need to modify the DAG code definition. Quite promising, eh?
However, the design is still considered bad because the processing task will be run only in one task/operator and all files will be processed in sequential manner. Imagine if you have hundreds/thousands of files, well, suddenly this approach doesn’t look so promising?
Additional note, in production environment (where CeleryExecutors are mostly used), this design is not scalable since it doesn’t utilize the parallelism of Airflow.
Dynamic Task Mapping
Now, enter Airflow 2.3.0! With its new feature, we can simply design the DAG like this:
Notice the brackets after the printing_task
? While it might be similar to second implementation above, the Dynamic Task Mapping feature allows several tasks to run in parallel, since the printing_task
will actually be mapped to what Airflow called Mapped Instances.
On picture below, you can see that there are “5 Tasks Mapped”. And when you use CeleryExecutors, those mapped tasks will run in parallel. Very cool!
Now, for example if the number of files in S3/GCS bucket changed to only 2 files tomorrow, there should be no need to modify the DAG code. Hurray!
But what is actually happening in that case? Actually the number of Mapped Instance will also be changed to 2. Interesting, right?
Also, what happens if there are no files at all? This is quoted from Airflow documentation:
Automatically skipping zero-length maps
If the input is empty (zero length), no new tasks will be created and the mapped task will be marked as
SKIPPED
.
Happy Airflow-ing ~~
Reference: https://airflow.apache.org/docs/apache-airflow/2.3.0/concepts/dynamic-task-mapping.html
Sample Working Code
Below is sample working code for reference:
import logging
import pendulum
from airflow import DAG, XComArg
from airflow.operators.python import PythonOperator
from random import randint
# generate list with random number of elements
# to simulate number of files in S3/GCS which can changes daily
def generate_random_list():
return [[i] for i in range(randint(0, 7))]
def printer_task(num):
logging.info(f"Currently working on this file number: {num}")
return
with DAG(
dag_id="sample_test_dag",
schedule_interval=None,
start_date=pendulum.today('UTC').add(days=-1),
catchup=False,
max_active_runs=1,
) as dag:
get_list_of_files = PythonOperator(
task_id="get_list_of_files",
python_callable=generate_random_list
)
printing_task = PythonOperator.partial(
task_id="printing_task",
python_callable=printer_task,
).expand(op_args=XComArg(get_list_of_files))
get_list_of_files >> printing_task