Airflow is a platform used to programmatically declare ETL workflows. Learn how to leverage hooks for uploading a file to AWS S3 with it.
This article is a step-by-step tutorial that will show you how to upload a file to an S3 bucket thanks to an Airflow ETL (Extract Transform Load) pipeline. ETL pipelines are defined by a set of interdependent tasks.
A task might be “download data from an API” or “upload data to a database” for example. A dependency would be “wait for the data to be downloaded before uploading it to the database”. After an introduction to ETL tools, you will discover how to upload a file to S3 thanks to boto3.
Airflow is a platform composed of a web interface and a Python library. This project has been initiated by AirBnB in January 2015 and incubated by The Apache Software Foundation since March 2018 (version 1.8). The Airflow community is really active and counts more than 690 contributors for a 10k stars repository.
Also, The Apache Software foundation recently announced Airflow as a top-level project. This gives us a measure of the community and project management health so far.
As for every Python project, create a folder for your project and a virtual environment.
# Create your virtual environment virutalenv venv source venv/bin/activate # Create your Airflow project folder mkdir airflow_project cd airflow_project
You also need to export an additional environment variable as mentioned in the 21st of November announcement.
Eventually, run the commands of the Getting Started part of the documentation that are pasted below.
# airflow needs a home, ~/airflow is the default, # but you can lay foundation somewhere else if you prefer # (optional) export AIRFLOW_HOME=~/airflow # install from pypi using pip pip install apache-airflow # initialize the database airflow initdb # start the web server, default port is 8080 airflow webserver -p 8080 # start the scheduler airflow scheduler # visit localhost:8080 in the browser and enable the example dag in the home page
Congratulations! You now have access to the Airflow UI at http://localhost:8080 and you are all set to begin this tutorial.
Note: Airflow home folder will be used to store important files (configuration, logs, database among others).
A DAG is a Directed Acyclic Graph that represents the tasks chaining of your workflow. Here is the first DAG you are going to build in this tutorial.
On this schematic, we see that task
upload_file_to_S3 may be executed only once
dummy_start has been successful.
Note: Our ETL is only composed of a L (Load) step in this example
As you can see in
$AIRFLOW_HOME/airlow.cfg, the value of the
dags_folder entry indicates that your DAG must be declared in folder
$AIRFLOW_HOME/dags . Also, we will call
upload_file_to_S3.py the file in which we are going to implement our DAG:
# Create the folder containing your DAGs definition mkdir airflow_home/dags # Create your DAG definition file touch airflow_home/dags/upload_file_to_S3.py # then open this file with your favorite IDE
First, import the required operators from
airflow.operators. Then, declare two tasks, attach them to your DAG
my_dag thanks to the parameter
dag. Using the context manager allows you not to duplicate the parameter
dag in each operator. Finally, set a dependency between them with
Now that we have the spine of our DAG, let’s make it useful. To do so, we will write a helper that uploads a file from your machine to an S3 bucket thanks to boto3.
boto3 is a Python library allowing you to communicate with AWS. In our tutorial, we will use it to upload a file from our local computer to your S3 bucket.
Install boto3 and fill
~/.aws/config with your AWS credentials as mentioned in Quick Start. More information about authentication mechanism is given in boto3 Credentials documentation.
All you need to do now is implement this little helper which allows you to upload a file to S3 and call it in your Python upload task.
Now, make your DAG task
upload_to_S3_task call this helper thanks to the argument
Launch your DAG. When it is finished, you should see your file in your S3 bucket.
Note: A good tip for launching your DAG is to clear the first step with option Downstream checked. The scheduler will then relaunch it automatically.
Do you remember the little helper we wrote to upload a file to S3? Well, all of this is already implemented. To use it, you will have to create a
Once you have created your new connection, all there is to be done is fill the two following fields:
Conn Id and
Conn Type and click
Now that your Airflow S3 connection is setup, you are ready to create an S3 hook to upload your file. Your hook will be linked to your connection thanks to its argument
python_callable helper in
upload_file_to_S3_with_hook and you are all set.
If you read AWS hooks source code you will see that they use boto3. They add an abstraction layer over boto3 and provide an improved implementation of what we did in Step 3 of this article.
Note: Although you did not specify your credentials in your Airflow connection, the process worked. This is because, when Airflow creates a boto3 session with
aws_secret_access_key=None , boto3 will authenticate you with your
~/.aws/credentials information. If you did not configure your AWS profile locally, you can also fill your AWS credentials directly in Airflow UI trough
login/password or through
Hooks add a great value to Airflow since they allow you to connect your DAG to your environment. There are already numerous hooks ready to be used like
SlackHook and many others so make sure to check Airflow hooks and Airflow contribution hooks out before establishing a connection to an external service.
Thanks to this tutorial, you should know how to :
Install and configure Airflow;
Make your first Airflow DAG with a python task;
Use boto3 to upload a file on AWS S3;
Use hooks to connect your DAG to your environment;
Manage authentication to AWS via Airflow connections.
How to Perform Fraud Detection with Personalized Page Rank
This article shows how to perform fraud detection with Graph Analysis.
How to Track your Users over Several Domains?
Track users over different domains is a recurrent issue while developing a substantial web solution.
Introducing tf-explain, Interpretability for TensorFlow 2.0
A Tensorflow 2.0 library for deep learning model interpretability.