Airflow¶
Operating System:
Terminal:
Shell:
Editor:
Package Manager:
Programming Language:
Database:
Utility:
Extension:
Apache Airflow is an open-source workflow management framework for data engineering pipelines, written in Python. It allows for the development, scheduling, and monitoring of batch-oriented workflows using directed acyclic graphs (DAGs). Each node in a DAG corresponds to a task, which is defined using Python scripts. Airflow is responsible for scheduling these tasks and managing their execution.
Workflows can be triggered on a defined schedule or by external events, and can include tasks like executing bash commands, using modules such as pandas, or connecting to databases and cloud services. Airflow is ideal for scheduling ETL pipelines, training machine learning models, generating reports, and performing backups and DevOps operations.
A web interface built on Flask enables users to visualize pipelines, track progress, manage and trigger DAGs, debug issues, and view detailed logs and performance metrics.
For more information, check out the official documentation.
Initialization¶
For information on how to use the Initialization parameter, please refer to the Initialization: Bash Script section of the documentation.
Airflow's capabilities can be extended by installing additional packages during initialization and also once the job has started using pip. In order to install them correctly, use the following structure:
$ pip install "apache-airflow==<airflow-version>" <aiflow-provider-name>==<airflow-provider-version>
For example (the provider's version can be omitted if not relevant):
$ pip install "apache-airflow==3.1.0" apache-airflow-providers-google
This prevents pip from upgrading/downgrading airflow's version by accident while installing new packages.
Configuration¶
To set up the Airflow environment, a few components must be configured.
PostgreSQL database¶
Airflow stores all the metadata in a database to keep track of the state of the workflows ("success", "failure", etc.), task execution history, DAGs, variables, connections, users, roles, and more.
A PostgreSQL database server will be initialized and configured when the job starts. The parameter Database is used to import the PostgreSQL database in the same way as specified in the PosgreSQL app documentation.
Note
If an empty folder is specified in Database, a new database will be initialized in that directory.
To configure Airflow to use a PostgreSQL metadata database, both the airflow_user (database user) and airflow_db (database name) must be created and assigned the appropriate permissions. These are typically initialized automatically when the PostgreSQL server starts for the first time.
The pg_hba.conf file should include an entry permitting connections from airflow_user. Additionally, the sql_alchemy_conn parameter in Airflow must be set to reference the PostgreSQL database, following the guidelines provided in the official Airflow documentation on setting up a PostgreSQL database backend.
Environment variables¶
To allow further customization, a number of optional parameters can be set by the user during job submission. Among these are the following Airflow environment variables:
Airflow Home (
AIRFLOW_HOMEenvironment variable): By default set to/work/airflow. It's the directory where Airflow stores its configuration files, logs, DAGs, and plugins.Load Examples (
AIRFLOW__CORE__LOAD_EXAMPLESenvironment variable): By defaultfalse. Determines whether Airflow loads example DAGs upon initialization.Airflow Executor (
AIRFLOW__CORE__EXECUTORenvironment variable): By default set toLocalExecutor. Specifies how tasks are run. Airflow supports several executors. The choice depends on the use case and deployment scale.
Airflow configuration¶
The airflow.cfg configuration file is automatically generated upon the initial initialization of Airflow and is typically located in $AIRFLOW_HOME. This file contains the core settings for the Airflow environment. For a detailed list of configuration options, consult the official configuration reference.
If a custom Airflow Configuration File is specified during job submission, the provided file will replace the default airflow.cfg.
To regenerate and overwrite the existing airflow.cfg with a fresh default version, execute the following command:
$ airflow config list --defaults > "$AIRFLOW_HOME/airflow.cfg"
Airflow supports multiple ways to configure its settings, primarily through environment variables and the airflow.cfg file. To determine which value to use when there are conflicts, Airflow applies a clear order of precedence—environment variables always override values set in airflow.cfg. For a detailed explanation of the configuration hierarchy, refer to the official documentation.
Airflow web server authentication¶
Access to the Airflow webserver requires authentication with a user account. By default, an initial user account is automatically created at job startup with the following credentials:
$ airflow users create --username "ucloud" --firstname "ucloud" --lastname "ucloud" --role Admin --password "ucloud" --email "ucloud@email.dk"
The user information can be customized using the job parameters with the corresponding names.
Airflow allows for multiple users and roles, which can be created at any moment using the command line. For more information read the Access Control documentation.
Note
An email address must be provided when creating a user. However, since configuring an email server is not supported, the email address is used solely for user identification and will not be used to send emails.
Restarting web server and scheduler¶
Upon job initialization, the Airflow web server and scheduler processes are started automatically. Any modifications to environment variables or the configuration file made after startup require both processes to be restarted for changes to take effect. The following script can be used to perform this restart:
#!/bin/bash
# Function to stop Airflow services
stop_airflow() {
printf "\nStopping Airflow API server..."
if pkill -f "airflow api-server"; then
printf " Done."
else
printf " API server not running."
fi
printf "\nStopping Airflow scheduler..."
if pkill -f "airflow scheduler"; then
printf " Done."
else
printf " Scheduler not running."
fi
printf "\nStopping Airflow DAG processor..."
if pkill -f "airflow dag-processor"; then
printf " Done."
else
printf " DAG processor not running."
fi
printf "\nStopping Airflow triggerer..."
if pkill -f "airflow triggerer"; then
printf " Done."
else
printf " Triggerer not running."
fi
}
# Function to start Airflow services
start_airflow() {
printf "\nStarting Airflow API server..."
nohup airflow api-server --port 8080 > /work/api-server.log 2>&1 &
printf " Started (PID: $!)."
printf "\nStarting Airflow scheduler..."
nohup airflow scheduler > /work/scheduler.log 2>&1 &
printf " Started (PID: $!)."
printf "\nStarting Airflow DAG processor..."
nohup airflow dag-processor > /work/dag-processor.log 2>&1 &
printf " Started (PID: $!)."
printf "\nStarting Airflow triggerer..."
nohup airflow triggerer > /work/triggerer.log 2>&1 &
printf " Started (PID: $!)."
}
# Main script
echo "Restarting Airflow..."
stop_airflow
start_airflow
sleep 5
echo -e "\nAirflow restarted successfully!"
Remote Connectivity and Distributed Execution¶
Airflow allows the assignment of specific tasks to different machines using connectors and operators. Connectors are plugins or components that allow Airflow to interact with external systems or services, while operators are specialized classes within Airflow that dictate how tasks should be executed. Both can come pre-installed with Airflow or be added via providers (or packages). Some examples of integrations with external systems are AWS, Spark or MySQL providers.
This allows tasks to be executed with different UCloud Apps as long as they are supported by these providers and connected to the Airflow job. For instance, with the Spark provider, Airflow can send Spark jobs from the current Airflow job to the Spark Cluster App while executing a workflow.
External third-party integration¶
It is also possible to connect to a running Airflow job from third-party apps running outside UCloud. This can be done e.g., using Airflow's REST API. To do this, the Airflow job should have a public link attached to it.
To use this API, make the following changes to airflow.cfg:
api_client = airflow.api.client.json_clientendpoint_url = https://URLauth_backends = airflow.contrib.auth.backends.basic_auth
Note
Replace URL with the job's public link.
Remember to restart both the webserver and scheduler after changing the configuration file.
Celery Executor¶
Distributed execution of tasks across multiple nodes can be achieved in Airflow using the CeleryExecutor, without requiring the installation of additional providers. This approach utilizes a Redis server as a message broker and involves a more advanced configuration. For a detailed example, see Airflow use cases.
Contents