Airflow

type access

  • Operating System:

  • Terminal:

  • Shell:

  • Editor:

  • Package Manager:

  • Programming Language:

  • Database:

  • Utility:

  • Extension:

Apache Airflow is an open-source workflow management framework for data engineering pipelines, written in Python. It allows for the development, scheduling, and monitoring of batch-oriented workflows using directed acyclic graphs (DAGs). Each node in a DAG corresponds to a task, which is defined using Python scripts. Airflow is responsible for scheduling these tasks and managing their execution.

Workflows can be triggered on a defined schedule or by external events, and can include tasks like executing bash commands, using modules such as pandas, or connecting to databases and cloud services. Airflow is ideal for scheduling ETL pipelines, training machine learning models, generating reports, and performing backups and DevOps operations.

A web interface built on Flask enables users to visualize pipelines, track progress, manage and trigger DAGs, debug issues, and view detailed logs and performance metrics.

For more information, check out the official documentation.

Initialization

For information on how to use the Initialization parameter, please refer to the Initialization: Bash Script section of the documentation.

Airflow's capabilities can be extended by installing additional packages during initialization and also once the job has started using pip. In order to install them correctly, use the following structure:

$ pip install "apache-airflow==<airflow-version>" <aiflow-provider-name>==<airflow-provider-version>

For example (the provider's version can be omitted if not relevant):

$ pip install "apache-airflow==2.10.2" apache-airflow-providers-google

This prevents pip from upgrading/downgrading airflow's version by accident while installing new packages.

Configuration

To set up the Airflow environment, a few components must be configured.

PostgreSQL database

Airflow stores all the metadata in a database to keep track of the state of the workflows ("success", "failure", etc.), task execution history, DAGs, variables, connections, users, roles, and more.

A PostgreSQL database server will be initialized and configured when the job starts. The parameter Database path is used to import the PostgreSQL database in the same way as specified in the PosgreSQL app documentation.

Note

If an empty folder is specified in Database path, a new database will be initialized in that directory.

For Airflow to be able to use the PostgreSQL metadata database, the airflow_user and airflow_db variables must be defined and configured with the required permissions. By default, they will be initialized when starting the PostgreSQL server for the first time.

In addition, the pg_hba.conf file should contain an entry for airflow_user, and the variable sql_alchemy_conn should be configured as in Airflow's documentation on how to set up a database backend

Environment variables

To allow further customization, a number of optional parameters can be set by the user during job submission. Among these are the following Airflow environment variables:

Parameter

Info

AIRFLOW_HOME

By default set to /work/airflow. It's the directory where Airflow stores its configuration files, logs, dags and plugins.

AIRFLOW__CORE__EXECUTOR

By default set to SequentialExecutor. Specifies how tasks are run. Airflow supports several executors. The choice depends on the use case and deployment scale.

AIRFLOW__CORE__LOAD_EXAMPLES

By default false. Determines whether Airflow loads example DAGs upon initialization.

AIRFLOW__CORE__DAGS_FOLDER

By default set inside of Airflow Home. Specifies where the DAG files are stored. All DAGs must be inside of this directory or else they will not be detected by the scheduler and therefore not executed nor shown in the Web server.

AIRFLOW__CORE__FERNET_KEY

By default an empty string. It is used for encrypting and decrypting connection passwords in the metadata database.

Airflow configuration

The airflow.cfg configuration file generated the first time Airflow is initialized. By default it is located in $AIRFLOW_HOME. For more information check the configuration reference.

If the Configuration file optional parameter is specified by the user, the provided configuration file will replace the default one.

If the user wants to generate a new airflow.cfg file and overwrite the existing one, this can be done using:

$ airflow config list --defaults > "$AIRFLOW_HOME/airflow.cfg"

Since Airflow can have different sources of configurations (environment variables and airflow.cfg), in order to remain consistent, Airflow follows a list of priorities (see details here). Importantly, environment variables take precedence over the airflow.cfg file. Users should pay attention to this in particular if they use the Configuration file optional parameter.

Airflow web server authentication

It is required to have a user account to log in the first time Airflow webserver is launched. By default, it is created at the start of the job with the following values:

$ airflow users create --username "ucloud" --firstname "ucloud" --lastname "ucloud" --role Admin --password "ucloud" --email "ucloud@email.dk"

The user information can be customized using the job parameters with the corresponding names.

Note

It is necessary to specify an email address when creating a user. However, it is not possible to set up an e-mail server so the e-mails address are not used for anything other than user identification.

flow allows for multiple users and roles, which can be created at any moment using the command line. For more information read the Access Control documentation.

Note

If the metadata database already exists and it is selected using the Database path parameter, an admin user will already exist. In this case, the creation of an admin is always skipped.

Restarting web server and scheduler

Once all parameters are set and the job starts, two processes are launched: the Airflow web server and the Airflow scheduler. If, at some point, the environment variables or the configuration file are changed, both processes must be restarted. This can be done by using the following script:

#!/bin/bash

# Function to stop Airflow services
stop_airflow() {
    printf "\nStopping Airflow web server..."
    if pkill -f "airflow webserver"; then
        printf " Done."
    else
        printf " Web server not running."
    fi

    printf "\nStopping Airflow scheduler..."
    if pkill -f "airflow scheduler"; then
        printf " Done."
    else
        printf " Scheduler not running."
    fi
}

# Function to start Airflow services
start_airflow() {
    printf "\nStarting Airflow web server..."
    nohup airflow webserver > /dev/null 2>&1 &
    printf " Started."

    printf "\nStarting Airflow scheduler..."
    nohup airflow scheduler > /dev/null 2>&1 &
    printf " Started."
}

# Function to wait for Airflow services to start
wait_for_airflow() {
    printf "\nWaiting for Airflow web server to start..."
    until curl -s http://localhost:8080 > /dev/null; do
        sleep 1
    done
    printf " Web server is up."

    printf "\nWaiting for Airflow scheduler to start..."
    while ! pgrep -f "airflow scheduler" > /dev/null; do
        sleep 1
    done
    printf " Scheduler is up."
}

# Main script
echo "Restarting Airflow..."
stop_airflow
sleep 5
start_airflow
wait_for_airflow
echo -e "\nAirflow restarted successfully!"

Remote Connectivity and Distributed Execution

Airflow allows the assignment of specific tasks to different machines using connectors and operators. Connectors are plugins or components that allow Airflow to interact with external systems or services, while operators are specialized classes within Airflow that dictate how tasks should be executed. Both can come pre-installed with Airflow or be added via providers (or packages). Some examples of integrations with external systems are AWS, Spark or MySQL providers.

This allows tasks to be executed with different UCloud Apps as long as they are supported by these providers and connected to the Airflow job. For instance, with the Spark provider, Airflow can send Spark jobs from the current Airflow job to the Spark Cluster App while executing a workflow.

Connecting from external third-party apps

It is also possible to connect to a running Airflow job from third-party apps running outside UCloud. This can be done e.g., using Airflow's REST API. To do this, the Airflow job should have a public link attached to it.

To use this API, make the following changes to airflow.cfg:

  • api_client = airflow.api.client.json_client

  • endpoint_url = https://URL

  • auth_backends = airflow.contrib.auth.backends.basic_auth

To enable the Airflow experimental API, the user should also set enable_experimental_api = True in the configuration file.

Note

Replace URL with the job's public link.

Remember to restart both the webserver and scheduler after changing the configuration file.

Celery Executor

Airflow allows distributed execution of tasks in different nodes without the need to install specific providers by using CeleryExecutor. This makes use of a Redis server as a message broker, and requires a more complex setup. For an in depth example, refer to Airflow use cases.