Im haben wir darüber gesprochen, was ein DAG ist, wie man dieses mathematische Konzept in der Projektplanung und -programmierung anwendet und warum wir bei beschlossen haben, Airflow statt anderer Workflow-Manager einzusetzen. In diesem Teil werden wir jedoch etwas technischer und untersuchen eine recht informative Hello-World-Programmierung und wie man Airflow für verschiedene Szenarien einrichtet, mit denen man konfrontiert werden könnte. Wenn du dich nur für den technischen Teil interessierst und deshalb den ersten Teil nicht lesen willst, aber trotzdem eine Zusammenfassung möchtest, findest du hier eine Zusammenfassung:
- DAG ist die Abkürzung für “Directed Acyclic Graph” und kann als solcher Beziehungen und Abhängigkeiten darstellen.
- Dieser letzte Aspekt kann im Projektmanagement genutzt werden, um deutlich zu machen, welche Aufgaben unabhängig voneinander ausgeführt werden können und welche nicht.
- Die gleichen Eigenschaften können in der Programmierung genutzt werden, da Software bestimmen kann, welche Aufgaben gleichzeitig ausgeführt werden können oder in welcher Reihenfolge die anderen beendet werden (oder fehlschlagen) müssen.
Warum haben wir Airflow gewählt:
- Kein Cron – Mit Airflows integriertem Scheduler müssen wir uns nicht auf Cron verlassen, um unsere DAG zu planen und verwenden nur ein Framework (nicht wie Luigi).
- Code Bases – In Airflow werden alle Workflows, Abhängigkeiten und das Scheduling in Python Code durchgeführt. Daher ist es relativ einfach, komplexe Strukturen aufzubauen und die Abläufe zu erweitern.
- Sprache – Python ist eine Sprache, die man relativ leicht erlernen kann und Python Kenntnisse war in unserem Team bereits vorhanden.
Vorbereitung
Der erste Schritt war die Einrichtung einer neuen, virtuellen Umgebung mit Python und virtualenv
.
$pip install virtualenv # if it hasn't been installed yet
$cd # change into home
# create a separated folder with all environments
$mkdir env
$cd env
$virtualenv airflow
Sobald die Umgebung erstellt wurde, können wir sie immer dann verwenden, wenn wir mit Airflow arbeiten wollen, so dass wir nicht in Konflikt mit anderen Abhängigkeiten geraten.
$source ~/env/airflow/bin/activate
Dann können wir alle Python-Pakete installieren, die wir benötigen.
$ pip install -U pip setuptools wheel \
psycopg2\
Cython \
pytz \
pyOpenSSL \
ndg-httpsclient \
pyasn1 \
psutil \
apache-airflow[postgres]\
A Small Breeze
Sobald unser Setup fertig ist, können wir überprüfen, ob Airflow korrekt installiert ist, indem wir airflow version
in Bash eingeben und du solltest etwas wie dieses sehen:
Anfänglich läuft Airflow mit einer SQLite-Datenbank, die nicht mehr als eine DAG-Aufgabe gleichzeitig ausführen kann und daher ausgetauscht werden sollte, sobald du dich ernsthaft damit befassen willst oder musst. Doch dazu später mehr. Beginnen wir nun mit dem typischen Hello-World-Beispiel. Navigiere zu deinem AIRFLOW_HOME
-Pfad, der standardmäßig ein Ordner namens airflow
in deinem Stammverzeichnis ist. Wenn du das ändern willst, editiere die Umgebungsvariable mit export AIRFLOW_HOME=/your/new/path
und rufe airflow version
noch einmal auf.
# ~/airflow/dags/HelloWorld.py
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
def print_hello():
return 'Hello world!'
dag = DAG('hello_world',
description='Simple tutorial DAG',
start_date= datetime.now() - timedelta(days= 4),
schedule_interval= '0 12 * * *'
)
dummy_operator= DummyOperator(task_id= 'dummy_task', retries= 3, dag= dag)
hello_operator= PythonOperator(task_id= 'hello_task', python_callable= print_hello, dag= dag)
dummy_operator >> hello_operator # same as dummy_operator.set_downstream(hello_operator)
Die ersten neun Zeilen sollten einigermaßen selbsterklärend sein, nur der Import der notwendigen Bibliotheken und die Definition der Hello-World-Funktion passieren hier. Der interessante Teil beginnt in Zeile zehn. Hier definieren wir den Kern unseres Workflows, ein DAG-Objekt mit dem Identifier hello _world
in diesem Fall und eine kleine Beschreibung, wofür dieser Workflow verwendet wird und was er tut (Zeile 10). Wie du vielleicht schon vermutet hast, definiert das Argument start_date
das Anfangsdatum des Tasks. Dieses Datum sollte immer in der Vergangenheit liegen. Andernfalls würde die Aufgabe ausgelöst werden und immer wieder nachfragen, ob sie ausgeführt werden kann, und als solche bleibt sie aktiv, bis sie geplant ist. Das schedule_interval
definiert die Zeiträume, in denen der Graph ausgeführt werden soll. Wir setzen sie entweder mit einer Cron-ähnlichen Notation auf (wie oben) oder mit einem syntaktischen Hilfsmittel, das Airflow übersetzen kann. Im obigen Beispiel definieren wir, dass die Aufgabe täglich um 12:00 Uhr laufen soll. Die Tatsache, dass sie täglich laufen soll, hätte auch mit schedule_interval='@daily
ausgedrückt werden können. Die Cron-Notation folgt dem Schema Minute - Stunde - Tag (des Monats) - Monat - Tag (der Woche)
, etwa mi h d m wd
. Mit der Verwendung von *
als Platzhalter haben wir die Möglichkeit, in sehr flexiblen Intervallen zu planen. Nehmen wir an, wir wollen, dass ein Job jeden ersten Tag des Monats um zwölf Uhr ausgeführt wird. In diesem Fall wollen wir weder einen bestimmten Monat noch einen bestimmten Wochentag und ersetzen den Platzhalter durch eine Wildcard *
( min h d * *
). Da es um 12:00
laufen soll, ersetzen wir mi
mit 0
und h
mit 12. Schließlich geben wir noch den Tag des Monats als 1
ein und erhalten unsere endgültige Cron-Notation 0 12 1 * *
. Wenn wir nicht so spezifisch sein wollen, sondern lediglich täglich oder stündlich, beginnend mit dem Startdatum
Ausführungen benötigen, können wir Airflows Hilfsmittel verwenden – @daily
, @hourly
, @monthly
oder @yeary
.
Sobald wir diese DAG
-Instanz haben, können wir damit beginnen, sie mit einer Aufgabe zu füllen. Instanzen von Operatoren in Airflow repräsentieren diese. Hier initiieren wir einen DummyOperator
und einen PythonOperator
. Beiden muss eine eindeutige id
zugewiesen werden, aber dieses Mal muss sie nur innerhalb des Workflows eindeutig sein. Der erste Operator, den wir definieren, ist ein DummyOperator
, der überhaupt nichts tut. Wir wollen nur, dass er unseren Graphen füllt und dass wir Airflow mit einem möglichst einfachen Szenario testen können. Der zweite ist ein PythonOperator
. Neben der Zuordnung zu einem Graphen und der id
benötigt der Operator eine Funktion, die ausgeführt wird, sobald die Aufgabe ausgelöst wird. Nun können wir unsere Funktion hello_world
verwenden und über den PythonOperator
an unseren Workflow anhängen.
Bevor wir unseren Ablauf schließlich ausführen können, müssen wir noch die Beziehung zwischen unseren Aufgaben herstellen. Diese Verknüpfung wird entweder mit den binären Operatoren <<
und >>
oder durch den Aufruf der Methoden set_upstream
und set_downstream
vorgenommen. Auf diese Weise können wir die Abhängigkeit einstellen, dass zuerst der DummyOperator laufen und erfolgreich sein muss, bevor unser PythonOperator ausgeführt wird.
Nun da unser Code in Ordnung ist, sollten wir ihn testen. Dazu sollten wir ihn direkt im Python-Interpreter ausführen, um zu prüfen, ob wir einen Syntaxfehler haben. Führe ihn also entweder in einer IDE oder im Terminal mit dem Befehl python hello_world.py
aus. Wenn der Interpreter keine Fehlermeldung ausgibt, kannst du dich glücklich schätzen, dass du es nicht allzu sehr vermasselt hast. Als nächstes müssen wir überprüfen, ob Airflow unsere DAG mit airflow list_dags
kennt. Jetzt sollten wir unsere hello_world
id in der gedruckten Liste sehen. Wenn dies der Fall ist, können wir mit airflow list_task hello_world
überprüfen, ob jede Aufgabe ihm zugewiesen ist. Auch hier sollten wir einige bekannte IDs sehen, nämlich dummy_task
und hello_task
. So weit so gut, zumindest die Zuweisung scheint zu funktionieren. Als nächstes steht ein Unit-Test der einzelnen Operatoren mit airflow test dummy_task 2018-01-01
und airflow test hello_task 2018-01-01
an. Hoffentlich gibt es dabei keine Fehler, und wir können fortfahren.
Da wir nun unseren Beispiel-Workflow bereitstellen konnten, müssen wir Airflow zunächst vollständig starten. Dazu sind drei Befehle erforderlich, bevor wir mit der manuellen Auslösung unserer Aufgabe fortfahren können.
airflow initdb
um die Datenbank zu initiieren, in der Airflow die Arbeitsabläufe und ihre Zustände speichert:
airflow webserver
, um den Webserver auflocalhost:8080
zu starten, von wo aus wir die Weboberfläche erreichen können:
airflow scheduler
, um den Scheduling-Prozess der DAGs zu starten, damit die einzelnen Workflows ausgelöst werden können:airflow trigger_dag hello_world
um unseren Workflow auszulösen und ihn in den Zeitplan aufzunehmen.
Jetzt können wir entweder einen Webbrowser öffnen und zu der entsprechenden Website navigieren oder open http://localhost:8080/admin/
im Terminal aufrufen, und es sollte uns zu einer Webseite wie dieser führen.
Unten solltest du deine Kreation sehen und der hellgrüne Kreis zeigt an, dass unser Ablauf geplant ist und ausgeführt wird. Jetzt müssen wir nur noch warten, bis er ausgeführt wird. In der Zwischenzeit können wir über das Einrichten von Airflow sprechen und darüber, wie wir einige der anderen Executors verwenden können.
Das Backend
Wie bereits erwähnt – sobald wir uns ernsthaft mit der Ausführung unserer Graphen beschäftigen wollen, müssen wir das Backend von Airflow ändern. Anfänglich wird eine einfache SQLite-Datenbank verwendet, die Airflow darauf beschränkt, jeweils nur eine Aufgabe sequenziell auszuführen. Daher werden wir zunächst die angeschlossene Datenbank auf PostgreSQL umstellen. Falls du Postgres noch nicht installiert hast und Hilfe dabei brauchst, empfehle ich dir diesen Wiki-Artikel. Ich könnte den Prozess nicht so gut beschreiben wie die Seite. Für diejenigen, die mit einem Linux-basierten System arbeiten (sorry, Windows), versucht es mit sudo apt-get install postgresql-client
oder mit homebrew auf einem Mac – brew install postgresql
. Eine andere einfache Möglichkeit wäre die Verwendung eines Docker-Containers mit dem entsprechenden image.
Nun erstellen wir eine neue Datenbank für Airflow, indem wir im Terminal psql createdb airflow
eingeben, in dem alle Metadaten gespeichert werden. Als nächstes müssen wir die Datei airflow.cfg
bearbeiten, die in dem AIRFLOW_HOME
-Ordner erscheinen sollte (der wiederum standardmäßig airflow
in Ihrem Home-Verzeichnis ist) und die Schritte 1 – 4 von oben (initdb
…) neu starten. Starte nun deinen Lieblingseditor und suche nach Zeile 32 sql_alchemy_conn =
. Hier werden wir den SQLite Connection String durch den von unserem PostgreSQL-Server und einen neuen Treiber ersetzen. Diese Zeichenkette wird zusammengesetzt aus:
postgresql+psycopg2://IPADRESS:PORT/DBNAME?user=USERNAME&password=PASSWORD
Der erste Teil teilt sqlalchemy mit, dass die Verbindung zu PostgreSQL führen wird und dass es den psycopg2
-Treiber verwenden soll, um sich mit diesem zu verbinden. Falls du Postgres lokal installiert hast (oder in einem Container, der auf localhost mappt) und den Standard-Port von 5432 nicht geändert hast, könnte IPADRESS:PORT
in localhost:5432
oder einfach localhost
übersetzt werden. Der DBNAME
würde in unserem Fall in airflow
geändert werden, da wir ihn nur zu diesem Zweck erstellt haben. Die letzten beiden Teile hängen davon ab, was du als Sicherheitsmaßnahmen gewählt hast. Schließlich könnten wir eine Zeile erhalten haben, die wie folgt aussieht:
sql_alchemy_conn = postgresql+psycopg2://localhost/airflow?user=postgres&password=password
Wenn wir dies getan haben, können wir auch unseren Executor in Zeile 27 von “Executor = SequentialExecutor” in einen “Executor = LocalExecutor” ändern. Auf diese Weise wird jede Aufgabe als Unterprozess gestartet und die Parallelisierung findet lokal statt. Dieser Ansatz funktioniert hervorragend, solange unsere Aufträge nicht zu kompliziert sind oder auf mehreren Rechnern laufen sollen.
Sobald wir diesen Punkt erreicht haben, brauchen wir Celery
als Executor. Dabei handelt es sich um eine asynchrone Task/Job-Warteschlange, die auf verteilter Nachrichtenübermittlung basiert. Um den CeleryExecutor
zu verwenden, benötigen wir jedoch ein weiteres Stück Software – einen Message Broker. Ein Message Broker ist ein zwischengeschaltetes Programmmodul, das eine Nachricht von der “Sprache” des Senders in die des Empfängers übersetzt. Die beiden gängigsten Optionen sind entweder redis oder rabbitmq. Verwende das, womit du dich am wohlsten fühlst. Da wir rabbitmq verwendet haben, wird der gesamte Prozess mit diesem Broker fortgesetzt, sollte aber für redis mehr oder weniger analog sein.
Wiederum ist es für Linux- und Mac-Benutzer mit apt/homebrew ein Einzeiler, ihn zu installieren. Tippe einfach in dein Terminal sudo apt-get install rabbitmq-server
oder brew install rabbitmq
ein und fertig. Als nächstes brauchen wir einen neuen Benutzer mit einem Passwort und einen virtuellen Host. Beides – Benutzer und Host – kann im Terminal mit dem rabbitsmqs Kommandozeilen-Tool rabbitmqctl
erstellt werden. Nehmen wir an, wir wollen einen neuen Benutzer namens myuser
mit mypassword
und einen virtuellen Host als myvhost
erstellen. Dies kann wie folgt erreicht werden:
$ rabbitmqctl add_user myuser mypassword
$ rabbitmqctl add_vhost myvhost
Doch nun zurück zur Airflows-Konfiguration. Navigiere in deinem Editor zur Zeile 230, und du wirst hoffentlich broker_url =
sehen. Dieser Connection-String ist ähnlich wie der für die Datenbank und wird nach dem Muster BROKER://USER:PASSWORD@IP:PORT/HOST
aufgebaut. Unser Broker hat das Akronym amqp,
und wir können unseren neu erstellten Benutzer, das Passwort und den Host einfügen. Sofern du nicht den Port geändert hast oder einen Remote Server verwendest, sollte deine Zeile in etwa so aussehen:
broker_url = amqp://myuser:mypassword@localhost:5672/myvhost
Als nächstes müssen wir Celery Zugriff auf unsere airflow
-Datenbank gewähren und die Zeile 232 mit:
db+postgresql://localhost:5432/airflow?user=postgres&password=password
Dieser String sollte im Wesentlichen dem entsprechen, den wir zuvor verwendet haben. Wir müssen nur den Treiber psycopg2
weglassen und stattdessen db+
am Anfang hinzufügen. Und das war’s! Du solltest nun alle drei Executors in der Hand haben und die Einrichtung ist abgeschlossen. Unabhängig davon, welchen Executor du gewählt hast, musst du, sobald du die Konfiguration geändert hast, die Schritte 1-4 – Initialisierung der DB, Neustart des Schedulers und des Webservers – erneut ausführen. Wenn du dies jetzt tust, wirst du feststellen, dass sich die Eingabeaufforderung leicht verändert hat, da sie anzeigt, welchen Executor du verwendest.
Airflow ist ein einfach zu bedienender, codebasierter Workflow-Manager mit einem integrierten Scheduler und mehreren Executors, die je nach Bedarf skaliert werden können.
Wenn du einen Ablauf sequenziell ausführen willst oder wenn es nichts gibt, was gleichzeitig laufen könnte, sollten die Standard-SQLite-Datenbank und der sequenzielle Executor die Aufgabe erfüllen.
Wenn du Airflow verwenden willst, um mehrere Aufgaben gleichzeitig zu starten und so die Abhängigkeiten zu verfolgen, solltest du zuerst die Datenbank und einen LocalExecutor
für lokale Mehrfachverarbeitung verwenden. Dank Celery
sind wir sogar in der Lage, mehrere Maschinen zu verwenden, um noch fortgeschrittenere und komplexere Workflows ohne viel Aufwand und Sorgen auszuführen.