Validio for Airflow

This Validio Airflow integration demonstrates how you can incorporate Validio into your Apache Airflow environment. This integration includes configurations for the Airflow connection to Validio, a Hook that uses the connection to access the Validio API, and examples of custom operators and sensors to fetch information with the Validio SDK. You can use these operators as a template to customize operators to suit your needs.

You can use the operators provided in this configuration to do the following tasks:

  • Poll immediately for a source to refresh its validators after a job (such as ETL) completes instead of relying on a schedule.
  • Run a script or send an email when incidents are detected.
  • Don’t trigger a workflow if there are active incidents.

You can also use just the connection and hook to access the Validio SDK to do the following tasks:

  • Add sources or validators to Validio after creating them in Airflow.
  • Update a notification rule to include new tags as they’re added in Airflow.

Prerequisites for Airflow

To set up this integration, you need to install (with pip or another preferred tool) the following modules:

  • validio-sdk
  • validio-airflow

You can find the validio-airflow module in the Validio Airflow GitLab repository.

This integration was developed using Poetry. You can choose to clone this repository and use Poetry to set up your environment. For more information, refer to Poetry documentation.

You can also choose to install the Validio CLI. For more information, see Getting Started with Developer Toolkit.

Configure an Airflow Connection for Validio

Create an Airflow Connection to authenticate with the Validio API. You can set up a Validio Connection type using the Airflow web interface or the Airflow CLI. For more information see Managing Connections and Command Line Interface in Airflow documentation.

The following example demonstrates how to create a Validio connection with the Validio and Airflow CLI:

📘

Note

This example uses the jq processer. If you don't have jq installed in your environment, you can copy the relevant values manually. For more information, refer to jq documentation.

# Get the Validio config
read -r host login password \
  <<<$(validio config get -o json --show-secrets  | jq -r  '. | "\(.endpoint) \(.access_key) \(.access_secret)"')

# Create the connection
airflow connections add 'validio_default' \
    --conn-json '{
        "conn_type": "validio",
        "description": "Validio API connection",
        "host": "'$host'",
        "login": "'$login'",
        "password": "'$password'"
    }'

📘

Note

This configuration expects Airflow to handle sensitive data appropriately and does not provide extra measures to mask information.

Create a Hook to Access the Validio API

The Validio Airflow module includes ValidioHook, which returns an instance of the Validio API using the Airflow connection details you configured in the previous section. If there are no connection details, the hook will default to your system settings (environment variables or the configuration JSON file).

To create an instance of the ValidioHook:

validio_hook = ValidioHook(conn_id="validio_default")

Use the hook to return an instance of the Validio API (ValidioAPIClient) which you can then use to execute a command or task within the SDK:

validio_client = validio_hook.get_client()

Custom Operators

An operator is a piece of code that performs a task. The Validio Airflow module includes the following operators. You can customize these operators or use them as a template to write your own operators. For more information, refer to Operators in Airflow documentation.

ValidioPollSourceOperator

The ValidioPollSourceOperator triggers a source poll in Validio. Instead of relying on a cron schedule, you can use this operator to poll Validio when you know that data has landed in the data source.

ParameterDescription
conn_idThe name of the connection, if not using the default.
source_idThe source ID to trigger a poll for. This value must be a string.

ValidioMetricsOperator

The ValidioMetricsOperator fetches metrics for a validator and segment. The operator will generate a CSV file at the specified location containing metric start time, end time, incident flag and what the value was.

ParameterDescription
conn_idThe name of the connection, if not using the default.
file_locationLocation for where to write the output as CSV.
segment_idThe segment ID to get metrics for. If passed as an empty string the first segment from the segmentation used on the validator will be used.
validator_idThe validator ID to get metrics for. This value must be a string.

Custom Sensors

A sensor is a type of operator used to wait for something to happen. The Validio Airflow module includes the following sensors. For more information, refer to Sensors in Airflow documentation.

ValidioIncidentsSensors

The ValidioIncidentsSensor pokes the Validio API for incidents in the last 10 minutes. This sensor does not take any parameters, but can be configured to allow a certain amount of incidents within this time period and also certain incident severities. The sensor will trigger once there are more than a configured number of incidents outside the allowed severities.