airflow title

A framework to automate your work: How to set up Airflow!

Marvin Taschenberger Blog, Data Science

In the first part of this blog post, we talked about what a DAG is, how to apply this mathematical concept in project planning and programming and why we at STATWORX decided to use Airflow compared to other workflow managers. In this part, however, we will get more technical and investigate a quite informative hello-world programming and how to set up Airflow for different setups one could face. If you are just interested in the technical part and therefore do not want to read the first one, but still want a recap- here is a summary:

  • DAG is short for directed acyclic graph and as such can represent relationships and dependencies
  • This last aspect can be used in project management as it can become clear what task can run independently of each other and which can not
  • The same properties can be used in programming, as software can determine which jobs can run concurrently or in which order the others have to finish (or fail).

Why did we choose Airflow:

  1. No Cron – With Airflows included scheduler we don't need to rely on cron to schedule our DAG and only use one framework (not like Luigi)
  2. Code Bases – In Airflow all the workflows, dependencies, and scheduling are done in python code. Therefore, it is rather easy to build complex structures and extend the flows.
  3. Language – Python is a language somewhat natural to pick up, and that skill was already available on our team.

Preparation

The first thing was to set up a new virtual environment with python and virtualenv.

$pip install virtualenv # if it hasn't been installed yet   
$cd  # change into home 

# create a separated folder with all environments  
$mkdir env   
$cd env   
$virtualenv airflow

Once it has been created, we can source this environment whenever we want to work with Airflow, so we don't get into conflict with other dependencies.

$source ~/env/airflow/bin/activate  

Then we can install all python packages we will need

$pip install -U pip setuptools wheel \
psycopg2\
Cython \
pytz \
pyOpenSSL \
ndg-httpsclient \
pyasn1 \
psutil \
apache-airflow[postgres]\  

A small breeze

Once our setup is done, we can check if Airflow is correctly installed by typing airflow version into the bash and you should see something like this

version-sequential

Initially, Airflow will run with an SQLite database, which cannot execute more than one task at a time and therefore should be switched out once you want or get serious about it. However, more on this later. Next, let us start with the typical hello world example. Navigate to your AIRFLOW_HOME-path which is by default a folder called airflow in your root directory. If you want to change this, edit the environment variable with export AIRFLOW_HOME=/your/new/path and call airflow version once again.

# ~/airflow/dags/HelloWorld.py  

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta  

def print_hello():
    return 'Hello world!'  

dag = DAG('hello_world',
            description='Simple tutorial DAG',
            start_date= datetime.now() - timedelta(days= 4),  
            schedule_interval= '0 12 * * *'  
         )  

dummy_operator= DummyOperator(task_id= 'dummy_task', retries= 3, dag= dag)  

hello_operator= PythonOperator(task_id= 'hello_task',
                               python_callable= print_hello,
                               dag= dag)

dummy_operator >> hello_operator # same as  dummy_operator.set_downstream(hello_operator)  

The first nine lines should be somewhat self-explanatory, just the import of the necessary libraries and the definition of the 'hello world'-function. The interesting part starts at line ten. Here we are going to define the core of our workflow a DAG-Object with the identifier hello _world in this case and a small description what this workflow will be used for and what it does (line 10). As you might have suspected the argument, start_date defines the beginning date of the task. This date should always be in the past. Otherwise, the task would be triggered and always keep asking if it can run and as such it will remain active until it is planned. The schedule_interval define the periods when the graph should be executed. We either set with a cron-like notation as above or with some syntactical sugar Airflow offers. In the example above, we define that the task should run daily at 12:00 o'clock. The fact that it should run daily could have also been expressed with schedule_interval='@daily. The cron notation follows the schema minute - hour - day (of the month) - month - day (of the week), like mi h d m wd. With the usage of * as a wildcard, we gain the ability to schedule in very flexible intervals. Let's say we want a job to run every first day of the month at twelve o'clock. In this case, we wouldn't want a specific month nor a specific day of the week and as such replace the placeholder by wildcard * ( min h d * *). Since it should run at 12:00 we will replace mi with 0 and h with 12. Last but not least, we would plug in the day of the month as 1 and get our final cron-notation 0 12 1 * *. If we don't want to be this specific but rather say run daily or hourly starting from the start_date, we could use Airflows sugar – @daily, @hourly, @monthly or @yeary.

Once we have this DAG-instance, we can start to fill it with some task. Instances of operators in Airflow represent these. Here we initiate a DummyOperator and a PythonOperator. Both need to be assigned a unique id, but this time it only needs to be unique within the workflow. The first operator we define is a DummyOperator, which does nothing at all. We only want it to fill our graph and that we could test Airflow with a scenario as simple as possible. The second one is a PythonOperator. Additional to the assignment to a graph and the id, the operator requires a function which will be executed once the task is triggered. Now we can use our hello_world function and attach through the PythonOperator to our workflow.

Before we can finally execute our flow, we still need to set the relation between our task. This linking is done either with the binary operators << and >> or by calling the set_upstream and set_downstream methods respectively. This way we can set the dependency that first the DummyOperator needs to run and succeed before our PythonOperator is executed.

Now that our code is fine we should test it. Therefore we should run it directly in the python interpreter to check whether we have a syntax mistake. So, either run it in an IDE or the terminal with the command python hello_world.py. If the interpreter doesn't throw an error – congratulation you didn't screw up too bad. Next, we need to check if Airflow is aware of our DAG with airflow list_dags. Now we should see our hello_world id in the printed list. If so we can check whether each task is assigned to it with airflow list_task hello_world. Again, we should see some familiar id's namely dummy_task and hello_task. So far so good, seems like at least the assignment worked. Next up is a unit test of the individual operators with airflow test dummy_task 2018-01-01 and airflow test hello_task 2018-01-01. Hopefully, this doesn't raise any error, and we can continue.

Now that we could deploy our example workflow we first need to initiate Airflow entirely. This requires three commands before we can continue to trigger our task manually.

  1. airflow initdb to initiate the database where Airflow saves the workflows and their states:
    initdb-sequential
  1. airflow webserver to start the web server at localhost:8080 where we can reach the web interface:
    webserver-sequential
  1. airflow scheduler to start the scheduling process of the DAGs such that the individual workflows can be triggered:
    scheduler-sequential
  1. airflow trigger_dag hello_world to trigger our workflow and place it on the schedule.

Now we can either open a web browser and navigate to the respective website or call open http://localhost:8080/admin/ in the terminal, and it should lead us to a web page like this.

web-ui

At the bottom, you should see your creation and the light green circle indicates that our flow is scheduled and running. The only thing left for us to do is to wait until it is executed. In the meantime, we can talk about the setup and how we could use some of the other executors. ​

The Backend

As mentioned before – once we want to get serious about the execution of our graphs we need to change the backend of Airflow. Initially, it will use a simple SQLite database which will limit Airflow only to execute one task at a time sequentially. Therefore, we will first change the connected database to PostgreSQL. In case you haven't installed Postgres yet and needed help with it I'd recommend you to check out it's wiki. I couldn't possibly describe the process as good as the page. For those who are on a Linux-based system (sorry windows) try, sudo apt-get install postgresql-client or by using homebrew on a mac – brew install postgresql. Another easy way would be to use a Docker container with the respective image.

Now let us create a new database for Airflow by typing into to the terminal psql createdb airflow where all the metadata will be saved. Next, we need to edit the airflow.cfg file which should have appeared in your AIRFLOW_HOME folder (which is again by default airflow in your home directory) and restart the steps 1 – 4 from above (initdb…). Now fire up your favorite editor and look for line 32 sql_alchemy_conn =. Here we are going to replace the SQLite connection string with the one from our PostgreSQL-Server and a new driver. This string will be made up out of

postgresql+psycopg2://IPADRESS:PORT/DBNAME?user=USERNAME&password=PASSWORD

The first part tells sqlalchemy that the connection will lead to PostgreSQL and that it should use the psycopg2 driver to connect to it. In case you have Postgres locally installed (or in a container which maps to localhost) and did not change the standard port of 5432 our IPADRESS:PORT could be translated to localhost:5432 or simply localhost. The DBNAME would be changed to airflow in our case as we just created that for this purpose. The last two parts depend on what you have chosen as security measurements. Finally, we might have gotten a line that looks like this:

sql_alchemy_conn = postgresql+psycopg2://localhost/airflow?user=postgres&password=password  

Once we have done this, we can also change our executor in line 27 from executor = SequentialExecutor to an executor = LocalExecutor. That way every task will be started as a subprocess and as such the parallelizing will happen locally. This approach works great as long as our jobs are not too complicated or should run on multiple machines.

Once we reached that point, we need Celery as Executor. It is an asynchronous task/job queue based on distributed message passing. However, to use the CeleryExecutor, we need one more piece of software – a message broker. A message broker is an intermediary program module that translates a message from the 'language' of the sender to one for the receiver. The two most common options are either redis or rabbitmq. Use whatever you feel most comfortable with. Since we used rabbitmq, the whole process will continue with this broker but should be more or less analog for redis.

Again, for Linux and Mac users with homebrew, it is a one-liner to install it. Merely tip into your terminal sudo apt-get install rabbitmq-server or brew install rabbitmq and done. Next, we need a new user with a password and a virtual host. Both – user and host – can be created in the terminal by utilizing rabbitsmqs command line tool rabbitmqctl . Let's say we want to create a new user called myuser with mypassword and a virtual host as myvhost. This can be achieved as follows:

$rabbitmqctl add_user myuser mypassword  
$rabbitmqctl add_vhost myvhost  

However, now back to the Airflows configuration. Navigate your editor to the line 230, and you will hopefully see broker_url =. This connection-string is similar to the one for the database and build with the pattern BROKER://USER:PASSWORD@IP:PORT/HOST. Our broker has the acronym amqp, and we can plug in our newly created user, password, and host. Unless you have changed the port or use a remote server your line should look something like this:

broker_url = amqp://myuser:mypassword@localhost:5672/myvhost

Next, we need to grant Celery access to our airflow database and fill line 232 with

db+postgresql://localhost:5432/airflow?user=postgres&password=password

This string should be mostly equivalent to the one we used before. We only need to drop the driverpsycopg2 and instead add db+ at the beginning. And that's it! All of the three executors should now be in your hand, and the setups are complete. Independent of the executor you have chosen, once you change the configuration you need to restart the steps 1 – 4 – initializing the DB, restart the scheduler and the web server. If you do this now, you will realize that the prompt changed slightly as it will show which executer you are using.

webserver-celery

End

Airflow is an easy to use code-based workflow manager with an integrated scheduler and multiple executors to scale as needed. If you want to execute a flow sequentially or if there is nothing which could run concurrently, the default SQLite database and sequential executor should do the job. If you're going to use Airflow to start multiple tasks at the same time and a thus keep track of the dependencies you should first change the database and to a LocalExecutor for local multiprocessing. Moreover, due to Celery, we are even able to use multiple machines to execute even more advanced and complex workflows without much effort and worries.

Über den Autor
Marvin Taschenberger

Marvin Taschenberger

Marvin ist Teil des Data Science Teams und einer unserer Experten in Python. In seiner Freizeit ist er Mitglied im Debattierclub und geht gerne klettern.