Airflow for Data Scientists

Manuel Tilgner Blog, Data Science

Getting the data in the quantity, quality and format you need is often the most challenging part of data science projects. But it’s also one, if not the most important part. That’s why my colleagues and I at STATWORX tend to spend a lot of time setting up good ETL processes. Thanks to frameworks like Airflow this isn’t just a Data Engineer prerogative anymore. If you know a bit of SQL and Python, you can orchestrate your own ETL process like a pro. Read on to find out how!

ETL does not stand for Extraterrestrial Life

At least not in Data Science and Engineering.

ETL stands for Extract, Transform, Load and describes a set of database operations. Extracting means we read the data from one or more data sources. Transforming means we clean, aggregate or combine the data to get it into the shape we want. Finally, we load it to a destination database.

Does your ETL process consist of Karen sending you an Excel sheet that you can spend your workday just scrolling down? Or do you have to manually query a database every day, tweaking your SQL queries to the occasion? If yes, venture a peek at Airflow.

How Airflow can help with ETL processes

Airflow is a python based framework that allows you to programmatically create, schedule and monitor workflows. These workflows consist of tasks and dependencies that can be automated to run on a schedule. If anything fails, there are logs and error handling facilities to help you fix it.

Using Airflow can make your workflows more manageable, transparent and efficient. And yes, I’m talking to you fellow Data Scientists! Getting access to up-to-date, high-quality data is far too important to leave it only to the Data Engineers 😉 (we still love you).

The point is, if you’re working with data, you’ll profit from knowing how to wield this powerful tool.

How Airflow works

To learn more about Airflow, check out this blog post from my colleague Marvin. It will get you up to speed quickly. Marvin explains in more detail how Airflow works and what advantages/disadvantages it has as a workflow manager. Also, he has an excellent quick-start guide with Docker.

What matters to us is knowing that Airflow’s based on DAGs or Directed Acyclic Graphs that describe what tasks our workflow consists of and how these are connected.

Note that in this tutorial we’re not actually going to deploy the pipeline. Otherwise, this post would be even longer. And you probably have friends and family that like to see you. So today is all about creating a DAG step by step.

If you care more about the deployment side of things, stay tuned though! I plan to do a step by step guide of how to do that in the next post. As a small solace, know that you can test every task of your workflow with:

airflow test [your dag id] [your task id] [execution date]

There are more options, but that’s all we need for now.

What you need to follow this tutorial

This tutorial shows you how you can use Airflow in combination with BigQuery and Google Cloud Storage to run a daily ETL process. So what you need is:

If you already have a Google Cloud account, you can hit the ground running! If not, consider opening one. You do need to provide a credit card. But don’t worry, this tutorial won’t cost you. If you sign up new, you get a free yearly trial period. But even if that one’s expired, we’re staying well within the bounds of Google’s Always Free Tier.

Finally, it helps if you know some SQL. I know it’s something most Data Scientists don’t find too sexy, but the more you use it the more you like it. I guess it’s like the orthopedic shoes of Data Science. A bit ugly sure, but unbeatable in what it’s designed for. If you’re not familiar with SQL or dread it like the plague, don’t sweat it. Each query’s name says what it does.

What BigQuery and Google Cloud Storage are

BigQuery and Cloud Storage are some of the most popular products of the Google Cloud Platform (GCP). BigQuery is a serverless cloud data warehouse that allows you to analyze up to petabytes of data at high speeds. Cloud Storage, on the other hand, is just that: a cloud-based object storage. Grossly simplified, we use BigQuery as a database to query and Cloud Storage as a place to save the results.

In more detail, our ETL process:

  • checks for the existence of data in BigQuery and Google Cloud Storage
  • queries a BigQuery source table and writes the result to a table
  • ingests the Cloud Storage data into another BigQuery table
  • merges the two tables and writes the result back to Cloud Storage as a CSV

Connecting Airflow to these services

If you set up a Google Cloud account you should have a JSON authentication file. I suggest putting this in your home directory. We use this file to connect Airflow to BigQuery and Cloud Storage. To do this, just copy and paste these lines in your terminal, substituting your project ID and JSON path. You can read more about connections here.

# for bigquery
airflow connections -d --conn_id bigquery_default

airflow connections -a --conn_id bigquery_default --conn_uri 'google-cloud-platform://:@:?extra__google_cloud_platform__project=[YOUR PROJECT ID]&extra__google_cloud_platform__key_path=[PATH TO YOUR JSON]'


# for google cloud storage
airflow connections -d --conn_id google_cloud_default

airflow connections -a --conn_id google_cloud_default --conn_uri 'google-cloud-platform://:@:?extra__google_cloud_platform__project=[YOUR PROJECT ID]&extra__google_cloud_platform__key_path=[PATH TO YOUR JSON]'

Writing our DAG

Task 0: Setting the start date and the schedule interval

We are ready to define our DAG! This DAG consists of multiple tasks or things our ETL process should do. Each task is instantiated by a so-called operator. Since we’re working with BigQuery and Cloud Storage, we take the appropriate Google Cloud Platform (GCP) operators.

Before defining any tasks, we specify the start date and schedule interval. The start date is the date when the DAG first runs. In this case, I picked February 20th, 2020. The schedule interval is how often the DAG runs, i.e. on what schedule. You can use cron notation here, a timedelta object or one of airflow’s cron presets (e.g. ‚@daily‘).

Tip: you normally want to keep the start date static to avoid unpredictable behavior (so no datetime.now() shenanigans even though it may seem tempting).

# set start date and schedule interval
start_date = datetime(2020, 2, 20)
schedule_interval = timedelta(days=1)

You find the complete DAG file on our STATWORX Github. There you also see all the config parameters I set, e.g., what our project, dataset, buckets, and tables are called, as well as all the queries. We gloss over it here, as it’s not central to understanding the DAG.

Task 1: Check that there is data in BigQuery

We set up our DAG taking advantage of python’s context manager. You don’t have to do this, but it saves some typing. A little detail: I’m setting catchup=False because I don’t want Airflow to do a backfill on my data.

# write dag
with DAG(dag_id='blog', default_args=default_args, schedule_interval=schedule_interval, catchup=False) as dag:

    t1 = BigQueryCheckOperator(task_id='check_bq_data_exists',
                               sql=queries.check_bq_data_exists,
                               use_legacy_sql=False)

We start by checking if the data for the date we’re interested in is available in the source table. Our source table, in this case, is the Google public dataset bigquery-public-data.austin_waste.waste_and_diversion. To perform this check we use the aptly named BigQueryCheckOperator and pass it an SQL query.

If the check_bq_data_exists query returns even one non-null row of data, we consider it successful and the next task can run. Notice that we’re making use of macros and Jinja templating to dynamically insert dates into our queries that are rendered at runtime.

check_bq_data_exists = """
SELECT load_id
FROM `bigquery-public-data.austin_waste.waste_and_diversion`
WHERE report_date BETWEEN DATE('{{ macros.ds_add(ds, -365) }}') AND DATE('{{ ds }}')
"""

Task 2: Check that there is data in Cloud Storage

Next, let’s check that the CSV file is in Cloud Storage. If you’re following along, just download the file from the STATWORX Github and upload it to your bucket. We pretend that this CSV gets uploaded to Cloud Storage by another process and contains data that we need (in reality I just extracted it from the same source table).

So we don’t care how the CSV got into the bucket, we just want to know: is it there? This can easily be verified in Airflow with the GoogleCloudStorageObjectSensor which checks for the existence of a file in Cloud Storage. Notice the indent because it’s still part of our DAG context.

Defining the task itself is simple: just tell Airflow which object to look for in your bucket.

    t2 = GoogleCloudStorageObjectSensor(task_id='check_gcs_file_exists',
                                        bucket=cfg.BUCKET,
                                        object=cfg.SOURCE_OBJECT)

Task 3: Extract data and save it to BigQuery

If the first two tasks succeed, then all the data we need is available! Now let’s extract some data from the source table and save it to a new table of our own. For this purpose, there’s none better than the BigQueryOperator.

    t3 = BigQueryOperator(task_id='write_weight_data_to_bq',
                          sql=queries.write_weight_data_to_bq,
                          destination_dataset_table=cfg.BQ_TABLE_WEIGHT,
                          create_disposition='CREATE_IF_NEEDED',
                          write_disposition='WRITE_TRUNCATE',
                          use_legacy_sql=False)

This operator sends a query called write_weight_data_to_bq to BigQuery and saves the result in a table specified by the config parameter cfg.BQ_TABLE_WEIGHT. We can also set a create and write disposition if we so choose.

The query itself pulls the total weight of dead animals collected every day by Austin waste management services for a year. If the thought of possum pancakes makes you queasy, just substitute ‚RECYCLING – PAPER‘ for the TYPE variable in the config file.

Task 4: Ingest Cloud Storage data into BigQuery

Once we’re done extracting the data above, we need to get the data that’s currently in our Cloud Storage bucket into BigQuery as well. To do this, just tell Airflow what (source) object from your Cloud Storage bucket should go to which (destination) table in your BigQuery dataset.

Tip: You can also specify a schema at this step, but I didn’t bother since the autodetect option worked well.

    t4 = GoogleCloudStorageToBigQueryOperator(task_id='write_route_data_to_bq',
                                              bucket=cfg.BUCKET,
                                              source_objects=[cfg.SOURCE_OBJECT],
                                              field_delimiter=';',
                                              destination_project_dataset_table=cfg.BQ_TABLE_ROUTE,
                                              create_disposition='CREATE_IF_NEEDED',
                                              write_disposition='WRITE_TRUNCATE',
                                              skip_leading_rows=1)

Task 5: Merge BigQuery and Cloud Storage data

Now we have both the BigQuery source table extract and the CSV data from Cloud Storage in two separate BigQuery tables. Time to merge them! How do we do this?

The BigQueryOperator is our friend here. We just pass it a SQL query that specifies how we want the tables merged. By specifying the destination_dataset argument, it’ll put the result into a table that we choose.

    t5 = BigQueryOperator(task_id='prepare_and_merge_data',
                          sql=queries.prepare_and_merge_data,
                          use_legacy_sql=False,
                          destination_dataset_table=cfg.BQ_TABLE_MERGE,
                          create_disposition='CREATE_IF_NEEDED',
                          write_disposition='WRITE_TRUNCATE')

Click below if you want to see the query. I know it looks long and excruciating, but trust me, there are Tinder dates worse than this (‚So, uh, do you like SQL?‘ – ‚Which one?‘). If he/she follows it up with Lord of the Rings or ‚Yes‘, propose!


What is it that this query is doing? Let’s recap: We have two tables at the moment. One is basically a time series on how much dead animal waste Austin public services collected over the course of a year. The second table contains information on what type of routes were driven on those days.

As if this pipeline wasn’t weird enough, we now also want to know what the most common route type was on a given day. So we start by counting what route types were recorded on a given day. Next, we use a window function (... OVER (PARTITION BY ... ORDER BY ...)) to find the route type with the highest count for each day. In the end, we pull it out and using the date as a key, merge it to the table with the waste info.

prepare_and_merge_data = """
WITH
simple_route_counts AS (
SELECT report_date,
       route_type,
       count(route_type) AS count
FROM `my-first-project-238015.waste.route` 
GROUP BY report_date, route_type
),
max_route_counts AS (
SELECT report_date,
       FIRST_VALUE(route_type) OVER (PARTITION BY report_date ORDER BY count DESC) AS top_route,
       ROW_NUMBER() OVER (PARTITION BY report_date ORDER BY count desc) AS row_number
FROM simple_route_counts
),
top_routes AS (
SELECT report_date AS date,
       top_route,
FROM max_route_counts
WHERE row_number = 1
)
SELECT a.date,
       a.type,
       a.weight,
       b.top_route
FROM `my-first-project-238015.waste.weight` a
LEFT JOIN top_routes b
ON a.date = b.date
ORDER BY a.date DESC
"""

Task 6: Export result to Google Cloud Storage

Let’s finish off this process by exporting our result back to Cloud Storage. By now, you’re probably guessing what the right operator is called. If you guessed BigQueryToCloudStorageOperator, you’re spot on. How to use it though? Just specify what the source table and the path (uri) to the Cloud Storage bucket are called.

    t6 = BigQueryToCloudStorageOperator(task_id='export_results_to_gcs',
                                        source_project_dataset_table=cfg.BQ_TABLE_MERGE,
                                        destination_cloud_storage_uris=cfg.DESTINATION_URI,
                                        export_format='CSV')

The only thing left to do now is to determine how the tasks relate to each other, i.e. set the dependencies. We can do this using the >> notation which I find more readable than set_upstream() or set_downstream(). But take your pick.

    t1 >> t2 >> [t3, t4] >> t5 >> t6

The notation above says: if data is available in the BigQuery source table, check next if data is also available in Cloud Storage. If so, go ahead, extract the data from the source table and save it to a new BigQuery table. In addition, transfer the CSV file data from Cloud Storage into a separate BigQuery table. Once those two tasks are done, merge the two newly created tables. At last, export the merged table to Cloud Storage as a CSV.

Conclusion

That’s it! Thank you very much for sticking with me to the end! Let’s wrap up what we did: we wrote a DAG file to define an automated ETL process that extracts, transforms and loads data with the help of two Google Cloud Platform services: BigQuery and Cloud Storage. Everything we needed was some Python and SQL code.

What’s next? There’s much more to explore, from using the Web UI, monitoring your workflows, dynamically creating tasks, orchestrating machine learning models, and, and, and. We barely scratched the surface here.

So check out Airflow’s official website to learn more. For now, I hope you got a better sense of the possibilities you have with Airflow and how you can harness its power to manage and automate your workflows.

References

Über den Autor
Manuel Tilgner

Manuel Tilgner

I am a data scientist at STATWORX, and I enjoy making data make sense. Why? Because there's something magical about turning a jumble of numbers into insights. In my free time, I love wandering through the forest or playing in the local big band.

ABOUT US


STATWORX
is a consulting company for data science, statistics, machine learning and artificial intelligence located in Frankfurt, Zurich and Vienna. Sign up for our NEWSLETTER and receive reads and treats from the world of data science and AI. If you have questions or suggestions, please write us an e-mail addressed to blog(at)statworx.com.