Apache Airflow for Data Science - How to Run Tasks in Parallel

Apache Airflow for Data Science - How to Run Tasks in Parallel

Make multiple GET requests in parallel with Apache Airflow and Python

In the previous article, we’ve configured Apache Airflow in such a way that it can run tasks in parallel. To do so, we had to switch the underlying metadata database from SQLite to Postgres, and also change the executor from Sequential to Local.

After that, we reinitialized the database and created a new Admin user for Airflow. It goes without saying, but reading that article is mandatory before reading this one, as otherwise, you won’t be able to run tasks in parallel.

Today we’ll finally write a DAG that runs the tasks in parallel. It will fetch data from a couple of REST API endpoints. We’ll implement everything through the PythonOperator, which isn’t the optimal way to communicate with APIs. We’ll leave it be for simplicity’s sake, and discuss the proper ways of communicating with APIs some other time.

Don’t feel like reading? Watch my video instead:


REST API Overview and DAG Boilerplate

I’ve found this GoRest website that serves for testing purposes as a dummy REST API. As you can see, we can make GET requests to either of these four endpoints, and we’ll get some JSON data as a response:

Image 1 - GoRest REST API homepage (image by author)

Image 1 - GoRest REST API homepage (image by author)

It’s perfect for today’s example since one GET request is by no means connected to the other. In other words, we don’t have to wait for one response before making another request.

To get started with the DAG, create a new file in the dags folder. I’ve named mine parallel_dag.py but feel free to name yours however you want. Let’s write the imports first:

import json
import time
import requests
from datetime import datetime
from airflow.models import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

Below we can declare the DAG with the context manager syntax:

with DAG(
    dag_id='parallel_dag',
    schedule_interval='@daily',
    start_date=datetime(2022, 3, 1),
    catchup=False
) as dag:
    pass

That’s all we need to get started, so let’s write the entire DAG next.


Write the Airflow DAG

Before writing the function for connecting to the API, we’ll create a couple of tasks in the DAG. All will leverage the PythonOperator to call a Python function. Since the URL for every request is different, we don’t want to write four nearly identical Python functions. The op_kwargs argument in the PythonOperator allows us to specify arguments that will be passed to the function as key-value pairs.

Just write a single task and you’ll immediately get the idea:

task_get_users = PythonOperator(
    task_id='get_users',
    python_callable=get,
    op_kwargs={'url': 'https://gorest.co.in/public/v2/users'}
)

This task will call the Python function get() which we haven’t defined yet, and it will pass the specified URL as a parameter. Before writing the function, let’s copy the task three more times to connect to other endpoints:

task_get_posts = PythonOperator(
    task_id='get_posts',
    python_callable=get,
    op_kwargs={'url': 'https://gorest.co.in/public/v2/posts'}
)

task_get_comments = PythonOperator(
    task_id='get_comments',
    python_callable=get,
    op_kwargs={'url': 'https://gorest.co.in/public/v2/comments'}
)

task_get_todos = PythonOperator(
    task_id='get_todos',
    python_callable=get,
    op_kwargs={'url': 'https://gorest.co.in/public/v2/todos'}
)

Finally, we’ll connect the tasks in a sequential manner. You’ll see how to connect them in parallel later, but this is just so you can get the idea of what’s wrong with running the tasks one after the other:

task_get_users >> task_get_posts >> task_get_comments >> task_get_todos

The only thing left to do is to write the function, so let’s do that in the same file but above the DAG. It will extract the endpoint from the URL, capture the current datetime, make a request to the endpoint, and save the response in JSON format. Finally, the function sleeps for two seconds - just to make the entire runtime a bit longer:

def get(url: str) -> None:
    endpoint = url.split('/')[-1]
    now = datetime.now()
    now = f"{now.year}-{now.month}-{now.day}T{now.hour}-{now.minute}-{now.second}"
    res = requests.get(url)
    res = json.loads(res.text)

    with open(f"/Users/dradecic/airflow/data/{endpoint}-{now}.json", 'w') as f:
        json.dump(res, f)
    time.sleep(2)

We can test a single task through the Terminal, just to see if everything is working as expected:

airflow tasks test parallel_dag get_users 2022-3-1
Image 2 - Testing an Airflow task through Terminal (image by author)

Image 2 - Testing an Airflow task through Terminal (image by author)

The task execution succeeded, and here’s what it saved to the data folder:

Image 3 - Saved users in JSON format (image by author)

Image 3 - Saved users in JSON format (image by author)

That’s all we need for now, so let’s test the DAG through the Airflow homepage next.


Test the Airflow DAG (Sequential)

Open up the Airflow webserver page and open our new DAG. Here’s what it looks like in the Graph view:

Image 4 - Tasks of the Airflow DAG connected sequentially (image by author)

Image 4 - Tasks of the Airflow DAG connected sequentially (image by author)

You can see that the tasks are connected in a sequential manner - one after the other. It’s a huge waste of time since the GET requests aren’t connected in any way.

Running the DAG confirms the tasks are running sequentially :

Image 5 - Airflow DAG running tasks sequentially (image by author)

Image 5 - Airflow DAG running tasks sequentially (image by author)

But probably the best confirmation is the Gantt view that shows the time each task took:

Image 6 - Airflow DAG runtime in the Gantt view (image by author)

Image 6 - Airflow DAG runtime in the Gantt view (image by author)

Let’s go back to the code editor and modify the DAG so the tasks run in parallel.


How to Run Airflow DAG in Parallel

To start, we’ll need to write another task that basically does nothing, but it’s here only so we can connect the other tasks to something. Let’s write it above the current first task:

task_start = BashOperator(
    task_id='start',
    bash_command='date'
)

And now we’ll have to change the dependencies at the bottom. We’ll run the start task first, which will run all of the other four tasks after completion:

task_start >> [task_get_users, task_get_posts, task_get_comments, task_get_todos]

Refresh the Airflow DAG page now. You can see how the Graph view has changed:

Image 7 - DAG view showing the tasks will run in parallel (image by author)

Image 7 - DAG view showing the tasks will run in parallel (image by author)

The start task will now run first, followed by the other four tasks that connect to the APIs and run in parallel. Trigger the DAG once again and inspect the Tree view - you’ll see that the tasks have started running at the same time:

Image 8 - Inspecting the running DAG (image by author)

Image 8 - Inspecting the running DAG (image by author)

The best indicator is, once again, the Gantt view:

Image 9 - Airflow DAG runtime in the Gantt view (image by author)

Image 9 - Airflow DAG runtime in the Gantt view (image by author)

Bars representing the runtimes are placed on top of each other, indicating the tasks have indeed run in parallel.

That’s all I wanted to cover today, so let’s wrap things up next.


Conclusion

Today you’ve successfully written your first Airflow DAG that runs the tasks in parallel. It’s a huge milestone, especially because you can be more efficient now. Most of the time you don’t need to run similar tasks one after the other, so running them in parallel is a huge time saver.

In the following article, we’ll take a deep dive into Airflow Xcoms, which is a method of sending data between the tasks. Stay tuned for that, and I’ll make sure to publish the article in a couple of days.

Stay connected