Make multiple GET requests in parallel with Apache Airflow and Python
In the previous article, we’ve configured Apache Airflow in such a way that it can run tasks in parallel. To do so, we had to switch the underlying metadata database from SQLite to Postgres, and also change the executor from Sequential to Local.
After that, we reinitialized the database and created a new Admin user for Airflow. It goes without saying, but reading that article is mandatory before reading this one, as otherwise, you won’t be able to run tasks in parallel.
Today we’ll finally write a DAG that runs the tasks in parallel. It will fetch data from a couple of REST API endpoints. We’ll implement everything through the PythonOperator
, which isn’t the optimal way to communicate with APIs. We’ll leave it be for simplicity’s sake, and discuss the proper ways of communicating with APIs some other time.
Don’t feel like reading? Watch my video instead:
REST API Overview and DAG Boilerplate
I’ve found this GoRest website that serves for testing purposes as a dummy REST API. As you can see, we can make GET requests to either of these four endpoints, and we’ll get some JSON data as a response:
It’s perfect for today’s example since one GET request is by no means connected to the other. In other words, we don’t have to wait for one response before making another request.
To get started with the DAG, create a new file in the dags
folder. I’ve named mine parallel_dag.py
but feel free to name yours however you want. Let’s write the imports first:
import json
import time
import requests
from datetime import datetime
from airflow.models import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
Below we can declare the DAG with the context manager syntax:
with DAG(
dag_id='parallel_dag',
schedule_interval='@daily',
start_date=datetime(2022, 3, 1),
catchup=False
) as dag:
pass
That’s all we need to get started, so let’s write the entire DAG next.
Write the Airflow DAG
Before writing the function for connecting to the API, we’ll create a couple of tasks in the DAG. All will leverage the PythonOperator
to call a Python function. Since the URL for every request is different, we don’t want to write four nearly identical Python functions. The op_kwargs
argument in the PythonOperator
allows us to specify arguments that will be passed to the function as key-value pairs.
Just write a single task and you’ll immediately get the idea:
task_get_users = PythonOperator(
task_id='get_users',
python_callable=get,
op_kwargs={'url': 'https://gorest.co.in/public/v2/users'}
)
This task will call the Python function get()
which we haven’t defined yet, and it will pass the specified URL as a parameter. Before writing the function, let’s copy the task three more times to connect to other endpoints:
task_get_posts = PythonOperator(
task_id='get_posts',
python_callable=get,
op_kwargs={'url': 'https://gorest.co.in/public/v2/posts'}
)
task_get_comments = PythonOperator(
task_id='get_comments',
python_callable=get,
op_kwargs={'url': 'https://gorest.co.in/public/v2/comments'}
)
task_get_todos = PythonOperator(
task_id='get_todos',
python_callable=get,
op_kwargs={'url': 'https://gorest.co.in/public/v2/todos'}
)
Finally, we’ll connect the tasks in a sequential manner. You’ll see how to connect them in parallel later, but this is just so you can get the idea of what’s wrong with running the tasks one after the other:
task_get_users >> task_get_posts >> task_get_comments >> task_get_todos
The only thing left to do is to write the function, so let’s do that in the same file but above the DAG. It will extract the endpoint from the URL, capture the current datetime, make a request to the endpoint, and save the response in JSON format. Finally, the function sleeps for two seconds - just to make the entire runtime a bit longer:
def get(url: str) -> None:
endpoint = url.split('/')[-1]
now = datetime.now()
now = f"{now.year}-{now.month}-{now.day}T{now.hour}-{now.minute}-{now.second}"
res = requests.get(url)
res = json.loads(res.text)
with open(f"/Users/dradecic/airflow/data/{endpoint}-{now}.json", 'w') as f:
json.dump(res, f)
time.sleep(2)
We can test a single task through the Terminal, just to see if everything is working as expected:
airflow tasks test parallel_dag get_users 2022-3-1
The task execution succeeded, and here’s what it saved to the data
folder:
That’s all we need for now, so let’s test the DAG through the Airflow homepage next.
Test the Airflow DAG (Sequential)
Open up the Airflow webserver page and open our new DAG. Here’s what it looks like in the Graph view:
You can see that the tasks are connected in a sequential manner - one after the other. It’s a huge waste of time since the GET requests aren’t connected in any way.
Running the DAG confirms the tasks are running sequentially :
But probably the best confirmation is the Gantt view that shows the time each task took:
Let’s go back to the code editor and modify the DAG so the tasks run in parallel.
How to Run Airflow DAG in Parallel
To start, we’ll need to write another task that basically does nothing, but it’s here only so we can connect the other tasks to something. Let’s write it above the current first task:
task_start = BashOperator(
task_id='start',
bash_command='date'
)
And now we’ll have to change the dependencies at the bottom. We’ll run the start task first, which will run all of the other four tasks after completion:
task_start >> [task_get_users, task_get_posts, task_get_comments, task_get_todos]
Refresh the Airflow DAG page now. You can see how the Graph view has changed:
The start task will now run first, followed by the other four tasks that connect to the APIs and run in parallel. Trigger the DAG once again and inspect the Tree view - you’ll see that the tasks have started running at the same time:
The best indicator is, once again, the Gantt view:
Bars representing the runtimes are placed on top of each other, indicating the tasks have indeed run in parallel.
That’s all I wanted to cover today, so let’s wrap things up next.
Conclusion
Today you’ve successfully written your first Airflow DAG that runs the tasks in parallel. It’s a huge milestone, especially because you can be more efficient now. Most of the time you don’t need to run similar tasks one after the other, so running them in parallel is a huge time saver.
In the following article, we’ll take a deep dive into Airflow Xcoms, which is a method of sending data between the tasks. Stay tuned for that, and I’ll make sure to publish the article in a couple of days.
Recommended reads
- 5 Best Books to Learn Data Science Prerequisites (Math, Stats, and Programming)
- Top 5 Books to Learn Data Science in 2022
- How to Install Apache Airflow Locally
Stay connected
- Hire me as a technical writer
- Subscribe on YouTube
- Connect on LinkedIn