You've successfully subscribed to Better Data Science
Great! Next, complete checkout for full access to Better Data Science
Welcome back! You've successfully signed in
Success! Your account is fully activated, you now have access to all content.

Apache Airflow for Data Science - How to Upload Files to Amazon S3

Apache Airflow for Data Science - How to Upload Files to Amazon S3

Setup an S3 bucket and upload local files with Apache Airflow

We've written a couple of Airflow DAGs so far, but all of them stored data locally, either to a file or database. What if you want to store data in the cloud? Well, you're in luck - today you'll learn how to work with Amazon S3 in a few lines of code.

Amazon Simple Storage Service (S3) is a scalable object storage solution that you can start using for free, and scale relatively cheaply if you need. The article assumes you already have an AWS account set up, as we won't go through that process. Everything else, from setting up the bucket to downloading security credentials is covered below.

Don't feel like reading? Watch my video instead:


How to Create a Bucket on Amazon S3

First things first, open your AWS console and go to S3 - Buckets - Create bucket. You'll be presented with the following screen:

Image 1 - Creating a bucket on Amazon S3 (image by author)
Image 1 - Creating a bucket on Amazon S3 (image by author)

Name your bucket however you want. Note that you can't use special characters and uppercase letters. Leave every other option as is - not recommended for production use - and scroll to the bottom of the screen.

Once there, hit the big orange Create bucket button:

Image 2 - Creating a bucket on Amazon S3 (image by author)
Image 2 - Creating a bucket on Amazon S3 (image by author)

Your bucket will be created immediately, provided the name you've specified matches the criteria and isn't already taken:

Image 3 - Creating a bucket on Amazon S3 (image by author)
Image 3 - Creating a bucket on Amazon S3 (image by author)

See how easy that was? Let's now grab the credentials and set up the Airflow connection.


How to Create an S3 Connection in Airflow

Before doing anything, make sure to install the Amazon provider for Apache Airflow - otherwise, you won't be able to create an S3 connection:

pip install 'apache-airflow[amazon]'

Once installed, restart both the Airflow webserver and the scheduler and you're good to go.

Open the AWS page again, click on your username in the top right corner (mine is darioddd), and select Security credentials. Under Access keys, click on Create New Access Key. This will generate two things:

  • Access Key ID
  • Secret Access Key
Image 4 - Obtaining S3 access key ID and secret access key (image by author)
Image 4 - Obtaining S3 access key ID and secret access key (image by author)

Feel free to download the key file in CSV format, but that's not mandatory today.

Head over to Airflow webserver, and go to Admin - Connections. Click on the plus sign to define a new one. Here's what you should specify:

  • Connection Id - arbitrary string, name yours however you want.
  • Connection Type - Amazon S3
  • Extra - JSON-like object, with the keys of aws_access_key_id and aws_secret_access_key. You know what to put for key values.
Image 5 - Setting up a S3 connection in Airflow (image by author)
Image 5 - Setting up an S3 connection in Airflow (image by author)

And that's all you need to do, configuration-wise. Let's write up the actual Airflow DAG next.


How to Write an Airflow DAG that Uploads Files to S3

Create a new Python file in ~/airflow/dags folder. I've named mine s3_upload.py. We'll start with the library imports and the DAG boilerplate code. Note there's one new import - S3Hook - it will be responsible for communicating with the S3 bucket:

from datetime import datetime
from airflow.models import DAG
from airflow.operators.python import PythonOperator
from airflow.hooks.S3_hook import S3Hook


with DAG(
    dag_id='s3_dag',
    schedule_interval='@daily',
    start_date=datetime(2022, 3, 1),
    catchup=False
) as dag:
	pass

A task for uploading files boils down to using a PythonOperator to call a function. The upload_to_s3() function accepts three parameters - make sure to get them right:

  • filename - string, a full path to the file you want to upload. Any file will do, but I'm using the one downloaded in the Airflow REST API article.
  • key - string, the name that will the uploaded file get. For example, posts.json will upload the file to the S3 bucket root. You can also specify paths, such as /data/posts/posts.json and S3 will automatically create the folder structure for you.
  • bucket_name - string, name of the bucket you want to upload the file to.

The same function first creates an instance of the S3Hook class and uses the connection established earlier. Then, you can call the load_file() method to upload a local file to an S3 bucket:

from datetime import datetime
from airflow.models import DAG
from airflow.operators.python import PythonOperator
from airflow.hooks.S3_hook import S3Hook


def upload_to_s3(filename: str, key: str, bucket_name: str) -> None:
    hook = S3Hook('s3_conn')
    hook.load_file(filename=filename, key=key, bucket_name=bucket_name)


with DAG(
    dag_id='s3_dag',
    schedule_interval='@daily',
    start_date=datetime(2022, 3, 1),
    catchup=False
) as dag:

    # Upload the file
    task_upload_to_s3 = PythonOperator(
        task_id='upload_to_s3',
        python_callable=upload_to_s3,
        op_kwargs={
            'filename': '/Users/dradecic/airflow/data/posts.json',
            'key': 'posts.json',
            'bucket_name': 'bds-airflow-bucket'
        }
    )

Everything looks good, so let's test the task:

airflow tasks test s3_dag upload_to_s3 2022-3-1
Image 6 - Testing the S3 upload task (image by author)
Image 6 - Testing the S3 upload task (image by author)

The task finished successfully, which means you should see the uploaded file in the S3 bucket:

Image 7 - Verifying the file was uploaded to S3 (image by author)
Image 7 - Verifying the file was uploaded to S3 (image by author)

Mission accomplished. Let's make a summary before wrapping things up.


Conclusion

Apache Airflow makes working with cloud storage a breeze. In only a couple of minutes, you've created a new S3 bucket, configured an Airflow connection, and written an Airflow task that uploads a local file to the cloud. It's a massive milestone, as most businesses use S3 for one thing or another.

Stay tuned to the following article in which we'll download a file from an S3 bucket.

Stay connected