In the first part of this blog post, we talked about what a DAG is, how to apply this mathematical concept in project planning and programming and why we at STATWORX decided to use Airflow compared to other workflow managers. In this part, however, we will get more technical and investigate a quite informative hello-world programming and how to set up Airflow for different setups one could face. If you are just interested in the technical part and therefore do not want to read the first one, but still want a recap- here is a summary:
- DAG is short for directed acyclic graph and as such can represent relationships and dependencies
- This last aspect can be used in project management as it can become clear what task can run independently of each other and which can not
- The same properties can be used in programming, as software can determine which jobs can run concurrently or in which order the others have to finish (or fail).
Why did we choose Airflow:
- No Cron – With Airflows included scheduler we don't need to rely on cron to schedule our DAG and only use one framework (not like Luigi)
- Code Bases – In Airflow all the workflows, dependencies, and scheduling are done in python code. Therefore, it is rather easy to build complex structures and extend the flows.
- Language – Python is a language somewhat natural to pick up, and that skill was already available on our team.
The first thing was to set up a new virtual environment with python and
$pip install virtualenv # if it hasn't been installed yet $cd # change into home # create a separated folder with all environments $mkdir env $cd env $virtualenv airflow
Once it has been created, we can source this environment whenever we want to work with Airflow, so we don't get into conflict with other dependencies.
Then we can install all python packages we will need
$pip install -U pip setuptools wheel \ psycopg2\ Cython \ pytz \ pyOpenSSL \ ndg-httpsclient \ pyasn1 \ psutil \ apache-airflow[postgres]\
A small breeze
Once our setup is done, we can check if Airflow is correctly installed by typing
airflow version into the bash and you should see something like this
Initially, Airflow will run with an SQLite database, which cannot execute more than one task at a time and therefore should be switched out once you want or get serious about it. However, more on this later. Next, let us start with the typical hello world example. Navigate to your
AIRFLOW_HOME-path which is by default a folder called
airflow in your root directory. If you want to change this, edit the environment variable with
export AIRFLOW_HOME=/your/new/path and call
airflow version once again.
# ~/airflow/dags/HelloWorld.py from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.operators.python_operator import PythonOperator from datetime import datetime, timedelta def print_hello(): return 'Hello world!' dag = DAG('hello_world', description='Simple tutorial DAG', start_date= datetime.now() - timedelta(days= 4), schedule_interval= '0 12 * * *' ) dummy_operator= DummyOperator(task_id= 'dummy_task', retries= 3, dag= dag) hello_operator= PythonOperator(task_id= 'hello_task', python_callable= print_hello, dag= dag) dummy_operator >> hello_operator # same as dummy_operator.set_downstream(hello_operator)
The first nine lines should be somewhat self-explanatory, just the import of the necessary libraries and the definition of the 'hello world'-function. The interesting part starts at line ten. Here we are going to define the core of our workflow a DAG-Object with the identifier
hello _world in this case and a small description what this workflow will be used for and what it does (line 10). As you might have suspected the argument,
start_date defines the beginning date of the task. This date should always be in the past. Otherwise, the task would be triggered and always keep asking if it can run and as such it will remain active until it is planned. The
schedule_interval define the periods when the graph should be executed. We either set with a cron-like notation as above or with some syntactical sugar Airflow offers. In the example above, we define that the task should run daily at 12:00 o'clock. The fact that it should run daily could have also been expressed with
schedule_interval='@daily. The cron notation follows the schema
minute - hour - day (of the month) - month - day (of the week), like
mi h d m wd. With the usage of
* as a wildcard, we gain the ability to schedule in very flexible intervals. Let's say we want a job to run every first day of the month at twelve o'clock. In this case, we wouldn't want a specific month nor a specific day of the week and as such replace the placeholder by wildcard
min h d * *). Since it should run at
12:00 we will replace
h with 12. Last but not least, we would plug in the day of the month as
1 and get our final cron-notation
0 12 1 * *. If we don't want to be this specific but rather say run daily or hourly starting from the
start_date, we could use Airflows sugar –
Once we have this
DAG-instance, we can start to fill it with some task. Instances of operators in Airflow represent these. Here we initiate a
DummyOperator and a
PythonOperator. Both need to be assigned a unique
id, but this time it only needs to be unique within the workflow. The first operator we define is a
DummyOperator, which does nothing at all. We only want it to fill our graph and that we could test Airflow with a scenario as simple as possible. The second one is a
PythonOperator. Additional to the assignment to a graph and the
id, the operator requires a function which will be executed once the task is triggered. Now we can use our
hello_world function and attach through the
PythonOperator to our workflow.
Before we can finally execute our flow, we still need to set the relation between our task. This linking is done either with the binary operators
>> or by calling the
set_downstream methods respectively. This way we can set the dependency that first the DummyOperator needs to run and succeed before our PythonOperator is executed.
Now that our code is fine we should test it. Therefore we should run it directly in the python interpreter to check whether we have a syntax mistake. So, either run it in an IDE or the terminal with the command
python hello_world.py. If the interpreter doesn't throw an error – congratulation you didn't screw up too bad. Next, we need to check if Airflow is aware of our DAG with
airflow list_dags. Now we should see our
hello_world id in the printed list. If so we can check whether each task is assigned to it with
airflow list_task hello_world. Again, we should see some familiar id's namely
hello_task. So far so good, seems like at least the assignment worked. Next up is a unit test of the individual operators with
airflow test dummy_task 2018-01-01 and
airflow test hello_task 2018-01-01. Hopefully, this doesn't raise any error, and we can continue.
Now that we could deploy our example workflow we first need to initiate Airflow entirely. This requires three commands before we can continue to trigger our task manually.
airflow initdbto initiate the database where Airflow saves the workflows and their states:
airflow webserverto start the web server at
localhost:8080where we can reach the web interface:
airflow schedulerto start the scheduling process of the DAGs such that the individual workflows can be triggered:
airflow trigger_dag hello_worldto trigger our workflow and place it on the schedule.
Now we can either open a web browser and navigate to the respective website or call
open http://localhost:8080/admin/ in the terminal, and it should lead us to a web page like this.
At the bottom, you should see your creation and the light green circle indicates that our flow is scheduled and running. The only thing left for us to do is to wait until it is executed. In the meantime, we can talk about the setup and how we could use some of the other executors.
As mentioned before – once we want to get serious about the execution of our graphs we need to change the backend of Airflow. Initially, it will use a simple SQLite database which will limit Airflow only to execute one task at a time sequentially. Therefore, we will first change the connected database to PostgreSQL. In case you haven't installed Postgres yet and needed help with it I'd recommend you to check out it's wiki. I couldn't possibly describe the process as good as the page. For those who are on a Linux-based system (sorry windows) try,
sudo apt-get install postgresql-client or by using homebrew on a mac –
brew install postgresql. Another easy way would be to use a Docker container with the respective image.
Now let us create a new database for Airflow by typing into to the terminal
psql createdb airflow where all the metadata will be saved. Next, we need to edit the
airflow.cfg file which should have appeared in your
AIRFLOW_HOME folder (which is again by default
airflow in your home directory) and restart the steps 1 – 4 from above (
initdb…). Now fire up your favorite editor and look for line 32
sql_alchemy_conn =. Here we are going to replace the SQLite connection string with the one from our PostgreSQL-Server and a new driver. This string will be made up out of
The first part tells sqlalchemy that the connection will lead to PostgreSQL and that it should use the
psycopg2 driver to connect to it. In case you have Postgres locally installed (or in a container which maps to localhost) and did not change the standard port of 5432 our
IPADRESS:PORT could be translated to
localhost:5432 or simply
DBNAME would be changed to
airflow in our case as we just created that for this purpose. The last two parts depend on what you have chosen as security measurements. Finally, we might have gotten a line that looks like this:
sql_alchemy_conn = postgresql+psycopg2://localhost/airflow?user=postgres&password=password
Once we have done this, we can also change our executor in line 27 from
executor = SequentialExecutor to an
executor = LocalExecutor. That way every task will be started as a subprocess and as such the parallelizing will happen locally. This approach works great as long as our jobs are not too complicated or should run on multiple machines.
Once we reached that point, we need
Celery as Executor. It is an asynchronous task/job queue based on distributed message passing. However, to use the
CeleryExecutor, we need one more piece of software – a message broker. A message broker is an intermediary program module that translates a message from the 'language' of the sender to one for the receiver. The two most common options are either redis or rabbitmq. Use whatever you feel most comfortable with. Since we used rabbitmq, the whole process will continue with this broker but should be more or less analog for redis.
Again, for Linux and Mac users with homebrew, it is a one-liner to install it. Merely tip into your terminal
sudo apt-get install rabbitmq-server or
brew install rabbitmq and done. Next, we need a new user with a password and a virtual host. Both – user and host – can be created in the terminal by utilizing rabbitsmqs command line tool
rabbitmqctl . Let's say we want to create a new user called
mypassword and a virtual host as
myvhost. This can be achieved as follows:
$rabbitmqctl add_user myuser mypassword $rabbitmqctl add_vhost myvhost
However, now back to the Airflows configuration. Navigate your editor to the line 230, and you will hopefully see
broker_url =. This connection-string is similar to the one for the database and build with the pattern
BROKER://USER:PASSWORD@IP:PORT/HOST. Our broker has the acronym
amqp, and we can plug in our newly created user, password, and host. Unless you have changed the port or use a remote server your line should look something like this:
broker_url = amqp://myuser:mypassword@localhost:5672/myvhost
Next, we need to grant Celery access to our
airflow database and fill line 232 with
This string should be mostly equivalent to the one we used before. We only need to drop the driver
psycopg2 and instead add
db+ at the beginning. And that's it! All of the three executors should now be in your hand, and the setups are complete. Independent of the executor you have chosen, once you change the configuration you need to restart the steps 1 – 4 – initializing the DB, restart the scheduler and the web server. If you do this now, you will realize that the prompt changed slightly as it will show which executer you are using.
Airflow is an easy to use code-based workflow manager with an integrated scheduler and multiple executors to scale as needed. If you want to execute a flow sequentially or if there is nothing which could run concurrently, the default SQLite database and sequential executor should do the job. If you're going to use Airflow to start multiple tasks at the same time and a thus keep track of the dependencies you should first change the database and to a
LocalExecutor for local multiprocessing. Moreover, due to
Celery, we are even able to use multiple machines to execute even more advanced and complex workflows without much effort and worries.
is a consulting company for data science, statistics, machine learning and artificial intelligence located in Frankfurt, Zurich and Vienna. Sign up for our NEWSLETTER and receive reads and treats from the world of data science and AI.