Setup an S3 bucket and upload local files with Apache Airflow
We’ve written a couple of Airflow DAGs so far, but all of them stored data locally, either to a file or database. What if you want to store data in the cloud? Well, you’re in luck - today you’ll learn how to work with Amazon S3 in a few lines of code.
Amazon Simple Storage Service (S3) is a scalable object storage solution that you can start using for free, and scale relatively cheaply if you need. The article assumes you already have an AWS account set up, as we won’t go through that process. Everything else, from setting up the bucket to downloading security credentials is covered below.
Don’t feel like reading? Watch my video instead:
How to Create a Bucket on Amazon S3
First things first, open your AWS console and go to S3 - Buckets - Create bucket. You’ll be presented with the following screen:
Name your bucket however you want. Note that you can’t use special characters and uppercase letters. Leave every other option as is - not recommended for production use - and scroll to the bottom of the screen.
Once there, hit the big orange Create bucket button:
Your bucket will be created immediately, provided the name you’ve specified matches the criteria and isn’t already taken:
See how easy that was? Let’s now grab the credentials and set up the Airflow connection.
How to Create an S3 Connection in Airflow
Before doing anything, make sure to install the Amazon provider for Apache Airflow - otherwise, you won’t be able to create an S3 connection:
pip install 'apache-airflow[amazon]'
Once installed, restart both the Airflow webserver and the scheduler and you’re good to go.
Open the AWS page again, click on your username in the top right corner (mine is darioddd), and select Security credentials. Under Access keys, click on Create New Access Key. This will generate two things:
- Access Key ID
- Secret Access Key
Feel free to download the key file in CSV format, but that’s not mandatory today.
Head over to Airflow webserver, and go to Admin - Connections. Click on the plus sign to define a new one. Here’s what you should specify:
- Connection Id - arbitrary string, name yours however you want.
- Connection Type - Amazon S3
- Extra - JSON-like object, with the keys of
aws_access_key_id
andaws_secret_access_key
. You know what to put for key values.
And that’s all you need to do, configuration-wise. Let’s write up the actual Airflow DAG next.
How to Write an Airflow DAG that Uploads Files to S3
Create a new Python file in ~/airflow/dags
folder. I’ve named mine s3_upload.py
. We’ll start with the library imports and the DAG boilerplate code. Note there’s one new import - S3Hook
- it will be responsible for communicating with the S3 bucket:
from datetime import datetime
from airflow.models import DAG
from airflow.operators.python import PythonOperator
from airflow.hooks.S3_hook import S3Hook
with DAG(
dag_id='s3_dag',
schedule_interval='@daily',
start_date=datetime(2022, 3, 1),
catchup=False
) as dag:
pass
A task for uploading files boils down to using a PythonOperator
to call a function. The upload_to_s3()
function accepts three parameters - make sure to get them right:
filename
- string, a full path to the file you want to upload. Any file will do, but I’m using the one downloaded in the Airflow REST API article.key
- string, the name that will the uploaded file get. For example,posts.json
will upload the file to the S3 bucket root. You can also specify paths, such as/data/posts/posts.json
and S3 will automatically create the folder structure for you.bucket_name
- string, name of the bucket you want to upload the file to.
The same function first creates an instance of the S3Hook
class and uses the connection established earlier. Then, you can call the load_file()
method to upload a local file to an S3 bucket:
from datetime import datetime
from airflow.models import DAG
from airflow.operators.python import PythonOperator
from airflow.hooks.S3_hook import S3Hook
def upload_to_s3(filename: str, key: str, bucket_name: str) -> None:
hook = S3Hook('s3_conn')
hook.load_file(filename=filename, key=key, bucket_name=bucket_name)
with DAG(
dag_id='s3_dag',
schedule_interval='@daily',
start_date=datetime(2022, 3, 1),
catchup=False
) as dag:
# Upload the file
task_upload_to_s3 = PythonOperator(
task_id='upload_to_s3',
python_callable=upload_to_s3,
op_kwargs={
'filename': '/Users/dradecic/airflow/data/posts.json',
'key': 'posts.json',
'bucket_name': 'bds-airflow-bucket'
}
)
Everything looks good, so let’s test the task:
airflow tasks test s3_dag upload_to_s3 2022-3-1
The task finished successfully, which means you should see the uploaded file in the S3 bucket:
Mission accomplished. Let’s make a summary before wrapping things up.
Conclusion
Apache Airflow makes working with cloud storage a breeze. In only a couple of minutes, you’ve created a new S3 bucket, configured an Airflow connection, and written an Airflow task that uploads a local file to the cloud. It’s a massive milestone, as most businesses use S3 for one thing or another.
Stay tuned to the following article in which we’ll download a file from an S3 bucket.
Recommended reads
- 5 Best Books to Learn Data Science Prerequisites (Math, Stats, and Programming)
- Top 5 Books to Learn Data Science in 2022
- How to Install Apache Airflow Locally
Stay connected
- Hire me as a technical writer
- Subscribe on YouTube
- Connect on LinkedIn