Airflow¶
Apache Airflow is an open-source workflow management framework for data engineering pipelines, written in Python. It allows for the development, scheduling, and monitoring of batch-oriented workflows using directed acyclic graphs (DAGs). Each node in a DAG corresponds to a task, which is defined using Python scripts. Airflow is responsible for scheduling these tasks and managing their execution.
Workflows can be triggered on a defined schedule or by external events, and can include tasks like executing bash commands, using modules such as pandas, or connecting to databases and cloud services. Airflow is ideal for scheduling ETL pipelines, training machine learning models, generating reports, and performing backups and DevOps operations.
A web interface built on Flask enables users to visualize pipelines, track progress, manage and trigger DAGs, debug issues, and view detailed logs and performance metrics.
For more information, check out the official documentation.
Initialization¶
For information on how to use the Initialization parameter, please refer to the Initialization: Bash Script section of the documentation.
Airflow's capabilities can be extended by installing additional packages during initialization and also once the job has started using pip
. In order to install them correctly, use the following structure:
$ pip install "apache-airflow==<airflow-version>" <aiflow-provider-name>==<airflow-provider-version>
For example (the provider's version can be omitted if not relevant):
$ pip install "apache-airflow==2.10.2" apache-airflow-providers-google
This prevents pip
from upgrading/downgrading airflow's version by accident while installing new packages.
Configuration¶
To set up the Airflow environment, a few components must be configured.
PostgreSQL database¶
Airflow stores all the metadata in a database to keep track of the state of the workflows ("success", "failure", etc.), task execution history, DAGs, variables, connections, users, roles, and more.
A PostgreSQL database server will be initialized and configured when the job starts. The parameter Database path is used to import the PostgreSQL database in the same way as specified in the PosgreSQL app documentation.
Note
If an empty folder is specified in Database path, a new database will be initialized in that directory.
For Airflow to be able to use the PostgreSQL metadata database, the airflow_user
and airflow_db
variables must be defined and configured with the required permissions. By default, they will be initialized when starting the PostgreSQL server for the first time.
In addition, the pg_hba.conf
file should contain an entry for airflow_user
, and the variable sql_alchemy_conn
should be configured as in Airflow's documentation on how to set up a database backend
Environment variables¶
To allow further customization, a number of optional parameters can be set by the user during job submission. Among these are the following Airflow environment variables:
Parameter |
Info |
---|---|
|
By default set to |
|
By default set to |
|
By default |
|
By default set inside of Airflow Home. Specifies where the DAG files are stored. All DAGs must be inside of this directory or else they will not be detected by the |
|
By default an empty string. It is used for encrypting and decrypting connection passwords in the metadata database. |
Airflow configuration¶
The airflow.cfg
configuration file generated the first time Airflow is initialized. By default it is located in $AIRFLOW_HOME
. For more information check the configuration reference.
If the Configuration file optional parameter is specified by the user, the provided configuration file will replace the default one.
If the user wants to generate a new airflow.cfg
file and overwrite the existing one, this can be done using:
$ airflow config list --defaults > "$AIRFLOW_HOME/airflow.cfg"
Since Airflow can have different sources of configurations (environment variables and airflow.cfg
), in order to remain consistent, Airflow follows a list of priorities (see details here). Importantly, environment variables take precedence over the airflow.cfg
file. Users should pay attention to this in particular if they use the Configuration file optional parameter.
Airflow web server authentication¶
It is required to have a user account to log in the first time Airflow webserver is launched. By default, it is created at the start of the job with the following values:
$ airflow users create --username "ucloud" --firstname "ucloud" --lastname "ucloud" --role Admin --password "ucloud" --email "ucloud@email.dk"
The user information can be customized using the job parameters with the corresponding names.
Note
It is necessary to specify an email address when creating a user. However, it is not possible to set up an e-mail server so the e-mails address are not used for anything other than user identification.
flow allows for multiple users and roles, which can be created at any moment using the command line. For more information read the Access Control documentation.
Note
If the metadata database already exists and it is selected using the Database path parameter, an admin user will already exist. In this case, the creation of an admin is always skipped.
Restarting web server and scheduler¶
Once all parameters are set and the job starts, two processes are launched: the Airflow web server and the Airflow scheduler. If, at some point, the environment variables or the configuration file are changed, both processes must be restarted. This can be done by using the following script:
#!/bin/bash
# Function to stop Airflow services
stop_airflow() {
printf "\nStopping Airflow web server..."
if pkill -f "airflow webserver"; then
printf " Done."
else
printf " Web server not running."
fi
printf "\nStopping Airflow scheduler..."
if pkill -f "airflow scheduler"; then
printf " Done."
else
printf " Scheduler not running."
fi
}
# Function to start Airflow services
start_airflow() {
printf "\nStarting Airflow web server..."
nohup airflow webserver > /dev/null 2>&1 &
printf " Started."
printf "\nStarting Airflow scheduler..."
nohup airflow scheduler > /dev/null 2>&1 &
printf " Started."
}
# Function to wait for Airflow services to start
wait_for_airflow() {
printf "\nWaiting for Airflow web server to start..."
until curl -s http://localhost:8080 > /dev/null; do
sleep 1
done
printf " Web server is up."
printf "\nWaiting for Airflow scheduler to start..."
while ! pgrep -f "airflow scheduler" > /dev/null; do
sleep 1
done
printf " Scheduler is up."
}
# Main script
echo "Restarting Airflow..."
stop_airflow
sleep 5
start_airflow
wait_for_airflow
echo -e "\nAirflow restarted successfully!"
Remote Connectivity and Distributed Execution¶
Airflow allows the assignment of specific tasks to different machines using connectors and operators. Connectors are plugins or components that allow Airflow to interact with external systems or services, while operators are specialized classes within Airflow that dictate how tasks should be executed. Both can come pre-installed with Airflow or be added via providers (or packages). Some examples of integrations with external systems are AWS, Spark or MySQL providers.
This allows tasks to be executed with different UCloud Apps as long as they are supported by these providers and connected to the Airflow job. For instance, with the Spark provider, Airflow can send Spark jobs from the current Airflow job to the Spark Cluster App while executing a workflow.
Connecting from external third-party apps¶
It is also possible to connect to a running Airflow job from third-party apps running outside UCloud. This can be done e.g., using Airflow's REST API. To do this, the Airflow job should have a public link attached to it.
To use this API, make the following changes to airflow.cfg
:
api_client = airflow.api.client.json_client
endpoint_url = https://URL
auth_backends = airflow.contrib.auth.backends.basic_auth
To enable the Airflow experimental API, the user should also set enable_experimental_api = True
in the configuration file.
Note
Replace URL
with the job's public link.
Remember to restart both the webserver and scheduler after changing the configuration file.
Celery Executor¶
Airflow allows distributed execution of tasks in different nodes without the need to install specific providers by using CeleryExecutor
. This makes use of a Redis server as a message broker, and requires a more complex setup. For an in depth example, refer to Airflow use cases.
Contents