Pentaho Data Integration
InstallationBusiness AnalyticsCToolsData CatalogData QualityLLMs
  • Overview
    • Pentaho Data Integration ..
  • Data Integration
    • Getting Started
      • Configuring PDI UI
      • KETTLE Variables
    • Concepts & Terminolgy
      • Hello World
      • Logging
      • Error Handling
    • Data Sources
      • Flat Files
        • Text
          • Text File Input
          • Text File Output
        • Excel
          • Excel Writer
        • XML
          • Read XML
        • JSON
          • Read JSON
      • Databases
        • CRUID
          • Database Connections
          • Create DB
          • Read DB
          • Update DB
          • Insert / Update DB
          • Delete DB
        • SCDs
          • SCDs
      • Object Stores
        • MinIO
      • SMB
      • Big Data
        • Hadoop
          • Apache Hadoop
    • Enrich Data
      • Merge
        • Merge Streams
        • Merge Rows (diff)
      • Joins
        • Cross Join
        • Merge Join
        • Database Join
        • XML Join
      • Lookups
        • Database Lookups
      • Scripting
        • Formula
        • Modified JavaScript Value
        • User Defined Java Class
    • Enterprise Solution
      • Jobs
        • Job - Hello World
        • Backward Chaining
        • Parallel
      • Parameters & Variables
        • Parameters
        • Variables
      • Scalability
        • Run Configurations
        • Partition
      • Monitoring & Scheduling
        • Monitoring & Scheduling
      • Logging
        • Logging
      • Dockmaker
        • BA & DI Servers
      • Metadata Injection
        • MDI
    • Plugins
      • Hierarchical Data Type
  • Use Cases
    • Streaming Data
      • MQTT
        • Mosquitto
        • HiveMQ
      • AMQP
        • RabbitMQ
      • Kafka
        • Kafka
    • Machine Learning
      • Prerequiste Tasks
      • AutoML
      • Credit Card
    • RESTful API
    • Jenkins
    • GenAI
  • Reference
    • Page 1
Powered by GitBook
On this page
  1. Use Cases

Data Lineage

Automate pipelines with Apache Airflow ..

Last updated 11 months ago

Data lineage in a data catalog outlines the lifecycle of data, detailing its journey from origin to endpoint across various processes and transformations. It offers a clear, visual map of data's provenance, its modifications, and its final location. This mapping includes tracing data from source to destination, capturing all steps, transformations, and processes it encounters along the way.

The importance of data lineage lies in its role in ensuring data integrity, supporting compliance with regulations, facilitating error root cause analysis, and enhancing overall data governance and management. By providing a comprehensive view of data’s journey, data lineage is an indispensable tool for maintaining high-quality, reliable data in a data catalog.

An automated data pipeline with Apache Airflow, OpenLineage, and Marquez consists of several integral components.

Apache Airflow serves as the orchestrator, managing the execution of workflows and ensuring tasks run in the correct sequence.

OpenLineage acts as the standard for tracking and reporting data lineage, offering a clear way to record the flow of data across different jobs and transformations.

Marquez functions as the metadata repository, storing detailed information about datasets, jobs, and their executions, enabling rich lineage tracking and metadata querying.

Together, these tools provide a robust framework for orchestrating, monitoring, and managing data workflows, ensuring data integrity, compliance, and ease of troubleshooting.

Apache Airflow plays a crucial role in orchestrating data pipelines by managing and scheduling complex workflows. It enables users to define dependencies between tasks, ensuring they run in the correct sequence.

Airflow provides a user-friendly interface to monitor task execution, visualize data flow, and troubleshoot any issues that arise. With its ability to integrate with various data processing tools, Airflow automates the entire data lifecycle, from extraction and transformation to loading and reporting, making it an essential component for scalable and reliable data pipeline management.

Pentaho Data Catalog adheres to the OpenLineage standards.

OpenLineage is an open platform for collection and analysis of data lineage. It tracks metadata about datasets, jobs, and runs, giving users the information required to identify the root cause of complex issues and understand the impact of changes. OpenLineage contains an open standard for lineage data collection, a metadata repository reference implementation (Marquez), libraries for common languages, and integrations with data pipeline tools.

At the core of OpenLineage is a standard API for capturing lineage events. Pipeline components - like schedulers, warehouses, analysis tools, and SQL engines - can use this API to send data about runs, jobs, and datasets to a compatible OpenLineage backend for further study.

OpenLineage supports both simple deployments with single consumers and complex deployments with multiple consumers.

OpenLineage’s object model provides a structured approach to track the lifecycle of data across various systems, ensuring clarity and consistency. It comprises several key entities:

  1. Datasets: These are the fundamental units representing data sources or outputs in the lineage graph. Each dataset captures critical metadata including schema, location, and other characteristics that define its structure and content. Datasets help in identifying the data at different stages of the pipeline.

  2. Jobs: Jobs denote the processes or operations performed on datasets. They encapsulate the logic of the data transformation and include metadata about the specific task executed, its parameters, and any associated code or scripts. Jobs are essential for understanding how data is manipulated within the pipeline.

  3. Runs: Runs are instances of job executions. They carry runtime metadata such as execution time, duration, status (e.g., success, failure), and logs. Runs provide a temporal aspect to lineage, allowing users to trace back to specific executions and understand the context and outcome of each job run.

These components interact through a standard API provided by OpenLineage. This API allows various pipeline components—such as schedulers, data warehouses, analysis tools, and SQL engines—to send lineage events to an OpenLineage backend. This integration facilitates comprehensive lineage tracking, enabling users to gain insights into data transformations, track data flows, and maintain data integrity. Through this structured model, OpenLineage helps ensure that data provenance is transparent and traceable, which is crucial for data governance, compliance, and troubleshooting.

Facets in OpenLineage provide a detailed view of metadata for lineage events, extending the core model with additional information about datasets, jobs, and runs. They are structured as key-value pairs, allowing for customizable and extensible metadata descriptions. These facets serve as building blocks that provide context and deeper insights into the data lineage, enhancing the ability to perform thorough analysis and detailed tracking.

Key types of facets include:

  1. Data Quality Facet: This facet captures a range of data quality metrics, such as null counts, distinct counts, and summary statistics. By monitoring these metrics, the data quality facet helps to ensure that the data retains its integrity throughout its lifecycle.

  2. Ownership Facet: This facet details ownership information, identifying the data owner or steward responsible for a dataset. It is crucial for data governance, as it clarifies accountability and provides a direct point of contact for queries or issues related to the data.

  3. Schema Facet: This facet describes the structure of datasets, including information about column names, data types, and other structural attributes. It helps in understanding the organization and format of the data, which is essential for data processing and transformation activities.

  4. Location Facet: This facet specifies the physical or logical location of data, such as file paths, URLs, or database addresses. Knowing where the data resides is important for data access, security, and management.

These facets enrich lineage data by adding layers of context and information, thereby facilitating comprehensive lineage tracking and detailed metadata management. They allow users to gain insights into various aspects of data and its movement, making it easier to trace data flows, understand dependencies, and maintain data integrity. The ability to extend and customize facets enables organizations to adapt the lineage model to their specific needs, supporting a wide range of data governance, compliance, and operational requirements.

Marquez is a metadata service for data lineage. It collects, stores, and visualizes metadata about datasets, jobs, and runs in a centralized repository. By integrating with various data pipeline tools, Marquez tracks end-to-end data lineage, helping users to query historical changes, investigate data dependencies, and ensure data governance. Its open metadata model and versioning capabilities enable detailed exploration of data provenance, making it easier to perform audits, troubleshoot issues, and comply with regulatory requirements.

Marquez Metadata Storage Model

Marquez’s metadata storage model centralizes the representation of end-to-end pipeline metadata, supporting versioning and flexible data lineage queries. It efficiently tracks and associates dependencies between jobs and datasets, allowing users to query historical changes and compare schema versions. This model is particularly beneficial for auditing and troubleshooting affected jobs downstream of any schema modifications. By normalizing metadata across various sources and frameworks, Marquez ensures comprehensive metadata management and lineage tracking.

The metadata storage model in Marquez employs a relational database to store the details of datasets, jobs, and runs, capturing a wide range of metadata attributes.

For datasets, it includes properties such as schema information, physical location, and version history.

Jobs, which represent the transformations applied to data, store metadata about their execution logic, parameters, and execution code.

Runs, or instances of job executions, maintain runtime details like start/end times, status, duration, and logs.

Marquez’s system also supports versioning of datasets and jobs, enabling users to navigate the historical changes and understand the evolution of their data pipelines. This is particularly valuable for compliance and governance, as it ensures that any transformations can be traced and validated. The flexible query capabilities provided by Marquez allow for detailed lineage exploration, where users can identify the upstream and downstream impacts of changes to their datasets or jobs, thus aiding in root cause analysis and impact assessment.

Integrating with various data pipeline tools, Marquez acts as a centralized metadata repository, promoting consistency and reducing fragmentation across different systems. Its open metadata model can seamlessly adapt to diverse data environments, ensuring all metadata is standardized and accessible. This harmonized approach simplifies data governance, allowing for efficient monitoring, auditing, and reporting activities.

Integrating with various data pipeline tools, Marquez acts as a centralized metadata repository, promoting consistency and reducing fragmentation across different systems. Its open metadata model can seamlessly adapt to diverse data environments, ensuring all metadata is standardized and accessible. This harmonized approach simplifies data governance, allowing for efficient monitoring, auditing, and reporting activities. x

Run-level metadata is tracked via HTTP API calls to /lineage using OpenLineage.

A run has a unique ID and records it’s code version, inputs and outputs, run args, and run state transitions.

When a run transitions to a complete state, the output datasets for the run are created if not already registered with Marquez.

A dataset version pointer is present for each input and output dataset and maintained by Marquez keeping track of the historical set of metadata changes. Each immutable dataset version is mapped to a metadata change and the run ID that modifyed the dataset preserving it’s state at some given point in time.

A job version pointer is also present and mapped to a unique referenceable link to code, the latest run ID, and one or more versioned input and output datasets.

When you submit a lineage event, you first need to define a unique run ID similar to d46e465b-d358-4d32-83d4-df660ff614dd. UUID format is recommended, and it should be unique. This run ID will enable the tracking of run-level metadata over time for a job that may have a name, like my-job.

  1. Start a RUN

REQUEST

curl -X POST http://localhost:5000/api/v1/lineage \
  -i -H 'Content-Type: application/json' \
  -d '{
        "eventType": "START",
        "eventTime": "2020-12-28T19:52:00.001+10:00",
        "run": {
          "runId": "d46e465b-d358-4d32-83d4-df660ff614dd"
        },
        "job": {
          "namespace": "my-namespace",
          "name": "my-job"
        },
        "inputs": [{
          "namespace": "my-namespace",
          "name": "my-input"
        }],  
        "producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
        "schemaURL": "https://openlineage.io/spec/1-0-5/OpenLineage.json#/definitions/RunEvent"
      }'

RESPONSE

HTTP/1.1 201 Created
Date: Wed, 29 May 2024 17:46:17 GMT
Content-Length: 0
  1. Complete the RUN

REQUEST

curl -X POST http://localhost:5000/api/v1/lineage \
  -i -H 'Content-Type: application/json' \
  -d '{
        "eventType": "COMPLETE",
        "eventTime": "2020-12-28T20:52:00.001+10:00",
        "run": {
          "runId": "d46e465b-d358-4d32-83d4-df660ff614dd"
        },
        "job": {
          "namespace": "my-namespace",
          "name": "my-job"
        },
        "outputs": [{
          "namespace": "my-namespace",
          "name": "my-output",
          "facets": {
            "schema": {
              "_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
              "_schemaURL": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/spec/OpenLineage.json#/definitions/SchemaDatasetFacet",
              "fields": [
                { "name": "a", "type": "VARCHAR"},
                { "name": "b", "type": "VARCHAR"}
              ]
            }
          }
        }],     
        "producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
        "schemaURL": "https://openlineage.io/spec/1-0-5/OpenLineage.json#/definitions/RunEvent"
      }'

RESPONSE

HTTP/1.1 201 Created
Date: Wed, 29 May 2024 17:55:14 GMT
Content-Length: 0

View collected lineage metadata

  1. Browse to:

Use the search bar in the upper right-side of the page and search for the job my-job. To view lineage metadata for my-job, click on the job from the drop-down list:

In the search result, you should see the job namespace and name, and in the lineage graph you should see my-input as an input dataset and my-output as an output dataset. In the RUN HISTORY tab on the Job Detail page below the graph, the job run state should be COMPLETED.

Finally, click on the output dataset my-output for my-job. Metadata displayed includes the name, column names, data types, and more:

Airflow-Project: (Steps 2-4)

This project focuses on:

  • Deploying & configuring: Apache Airflow + Openlineage + Marquez in Docker

  • Define a connection to exampledb database

  • Run a test DAG - bashOperator.py

This section is for Reference only.

Apache Airflow + Openlinegae & Marquez have been installed & configured.

Airflow-Project

An Airflow-Project contains the set of files necessary to run Airflow, including dedicated folders for your DAG files, plugins, and dependencies.

  1. Create Airflow project folders.

cd
mkdir -p Airflow-Project/{dags,docker,include,plugins,tests} && cd Airflow-Project
  1. Download the required Marquez scripts to download / initialze containers, docker network & create volumes.

cd
cd ~/Airflow-Project/docker
curl -O "https://raw.githubusercontent.com/MarquezProject/marquez/examples/airflow/docker/{entrypoint.sh,wait-for-it.sh}"
  1. Check the Airflow-Project directory

cd
cd ~/Airflow-Project
ls -al
$ ls -a
.
├── dags                   # Where your DAGs go
├── docker
├── plugins                # For any custom or community Airflow plugins
├── docker-compose.yml
├── openlineage.env        # Local environment variables
└── requirements.txt       # For Python package
  1. Add the OpenLineage Airflow Provider and the Common SQL Provider to the requirements.txt file.

cd
cd ~/Airflow-Project
touch requirements.txt
nano requirements.txt
  1. Add the following Providers:

# apache airflow providers
apache-airflow-providers-common-sql==1.7.2
apache-airflow-providers-openlineage==1.1.0
apache-airflow-providers-github==2.6.0
  1. Save.

CTRL + o
Enter
CTRL + x
  1. To configure Airflow to send lineage metadata to Marquez, configure database connection and allow DAGs to be triggered by params, add the following environment variables.

cd
cd ~/Airflow-Project
touch openlineage.env
nano openlineage.env
  1. Add:

OPENLINEAGE_URL=http://marquez:5000  
OPENLINEAGE_NAMESPACE=example     
AIRFLOW__OPENLINEAGE__TRANSPORT='{"type": "http", "url": "http://marquez:5000", "endpoint": "api/v1/lineage"}'
AIRFLOW__CORE__TEST_CONNECTION=Enabled
AIRFLOW__WEBSERVER__SHOW_TRIGGER_FORM_IF_NO_PARAMS=True
OPENLINEAGE_URL                 - URL of Marquez API
OPENLINEAGE_NAMESPACE           - Marquez Namespace
AIRFLOW__OPENLINEAGE__TRANSPORT - Where to push the lineage metrics - Marquez
AIRFLOW__CORE__TEST_CONNECTION  - Disabled by default. Test connections
AIRFLOW__WEBSERVER__SHOW_TRIGGER_FORM_IF_NO_PARAMS - Trigger DAG w/ config button to appear in the Airflow UI
  1. Save.

CTRL + o
Enter
CTRL + x

Create new file docker-compose.yml in your project and copy/paste the following into the file:

  1. Create new file docker-compose.yml

cd
cd ~/Airflow-Project
nano docker-compose.yml
  1. Copy and paste the following content.

# SPDX-License-Identifier: Apache-2.0

version: "3.8"
services:
  airflow:
    image: bitnami/airflow:2
    ports:
      - "8080:8080"
    env_file:
      - openlineage.env
    environment:
      - AIRFLOW_USERNAME=airflow
      - AIRFLOW_PASSWORD=airflow
      - AIRFLOW_EMAIL=airflow@example.com
      - AIRFLOW_FERNET_KEY=Z2uDm0ZL60fXNkEXG8LW99Ki2zf8wkmIltaTz1iQPDU=
      - AIRFLOW_DATABASE_HOST=postgres
      - AIRFLOW_DATABASE_NAME=airflow
      - AIRFLOW_DATABASE_USERNAME=airflow
      - AIRFLOW_DATABASE_PASSWORD=airflow
      - AIRFLOW_EXECUTOR=CeleryExecutor
      - AIRFLOW_LOAD_EXAMPLES=no
      - AIRFLOW_CONN_EXAMPLE_DB=postgres://example:example@postgres:6543/example
    volumes:
      - ./dags:/opt/bitnami/airflow/dags
      - ${PWD}/whl:/whl
      - type: bind
        source: ${PWD}/requirements.txt
        target: /bitnami/python/requirements.txt

  airflow_scheduler:
    image: bitnami/airflow-scheduler:2
    env_file:
      - openlineage.env
    environment:
      - AIRFLOW_FERNET_KEY=Z2uDm0ZL60fXNkEXG8LW99Ki2zf8wkmIltaTz1iQPDU=
      - AIRFLOW_DATABASE_HOST=postgres
      - AIRFLOW_DATABASE_NAME=airflow
      - AIRFLOW_DATABASE_USERNAME=airflow
      - AIRFLOW_DATABASE_PASSWORD=airflow
      - AIRFLOW_EXECUTOR=CeleryExecutor
      - AIRFLOW_LOAD_EXAMPLES=no
      - AIRFLOW_CONN_EXAMPLE_DB=postgres://example:example@postgres:6543/example
      - AIRFLOW_WEBSERVER_HOST=airflow
    volumes:
      - ./dags:/opt/bitnami/airflow/dags
      - ${PWD}/whl:/whl
      - type: bind
        source: ${PWD}/requirements.txt
        target: /bitnami/python/requirements.txt

  airflow_worker:
    image: bitnami/airflow-worker:2
    env_file:
      - openlineage.env
    environment:
      - AIRFLOW_FERNET_KEY=Z2uDm0ZL60fXNkEXG8LW99Ki2zf8wkmIltaTz1iQPDU=
      - AIRFLOW_DATABASE_HOST=postgres
      - AIRFLOW_DATABASE_NAME=airflow
      - AIRFLOW_DATABASE_USERNAME=airflow
      - AIRFLOW_DATABASE_PASSWORD=airflow
      - AIRFLOW_EXECUTOR=CeleryExecutor
      - AIRFLOW_LOAD_EXAMPLES=no
      - AIRFLOW_CONN_EXAMPLE_DB=postgres://example:example@postgres:6543/example
      - AIRFLOW_WEBSERVER_HOST=airflow
    volumes:
      - ./dags:/opt/bitnami/airflow/dags
      - ${PWD}/whl:/whl
      - type: bind
        source: ${PWD}/requirements.txt
        target: /bitnami/python/requirements.txt

  marquez:
    image: marquezproject/marquez:latest
    ports:
      - "5000:5000"
      - "5001:5001"
    volumes:
      - ./docker/wait-for-it.sh:/usr/src/app/wait-for-it.sh
    depends_on:
      - postgres
    entrypoint: ["./wait-for-it.sh", "postgres:6543", "--", "./entrypoint.sh"]
    # Enables SQL statement logging (see: https://www.postgresql.org/docs/12/runtime-config-logging.html#GUC-LOG-STATEMENT)
    # command: ["postgres", "-c", "log_statement=all"]

  marquez_web:
    image: marquezproject/marquez-web:latest
    environment:
      - MARQUEZ_HOST=marquez
      - MARQUEZ_PORT=5000
    ports:
      - "3000:3000"
    stdin_open: true
    tty: true
    depends_on:
      - marquez

  postgres:
    image: bitnami/postgresql:12.1.0
    ports:
      - "6543:6543"
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
      - AIRFLOW_USER=airflow
      - AIRFLOW_PASSWORD=airflow
      - AIRFLOW_DB=airflow
      - MARQUEZ_USER=marquez
      - MARQUEZ_PASSWORD=marquez
      - MARQUEZ_DB=marquez
      - EXAMPLE_USER=example
      - EXAMPLE_PASSWORD=example
      - EXAMPLE_DB=example
      - ALLOW_EMPTY_PASSWORD=yes
    volumes:
      - ./docker/init-db.sh:/docker-entrypoint-initdb.d/init-db.sh

  redis:
    image: bitnami/redis:6.0.6
    environment:
      - ALLOW_EMPTY_PASSWORD=yes
  1. Save.

CTRL + o
Enter
CTRL + x

Start Airflow with Marquez

When starting the Airflow-OpenLineage-Marquez for the first time, be patient, as the Airflow-OpenLineage-Marquez environment is deployed.

  1. Execute the following command to start Airflow-OpenLineage-Marquez environment.

cd
cd ~/Airflow-Project
docker compose up -d

• airflow-worker-1 - The worker that executes the tasks given by the scheduler.

• airflow-postgres-1 - The database.

• airflow-marquez-1 - Marquez API.

  1. To view the Airflow UI and verify it's running, open:

x

Username

airflow

Password

airflow

  1. x

Airflow variables are key value pairs that can be accessed from any DAG in your Airflow environment. Because the variable my_github_repo is used in the DAG code with a default of apache/airflow, you'll need to create the variable and give it a value in the Airflow UI to wait for a commit in your own repository.

Unlike DAG code changes, package dependency changes require a complete restart of Airflow.

Configure GitHub variable

  1. Go to Admin -> Variables to open the list of Airflow variables.

x

  1. Click on the + sign to open the form for adding a new variable.

Set the Key for the variable as:

airflow_github_repo

and set the Val as:

GitHub repository you have administrator access to. Make sure the Val is in the format github_account_name/repository_name

(for example apache/airflow). The repository can be private.

  1. Click Save.

x

x

DAGs

In this step, you will create an Airflow DAG that performs simple tasks using the bashOperator. The example_bash-operator DAG

This section is for Reference only.

The exampledb databse connection has been configured in Airflow & DAG

  1. Open the Admin->Connections section of the UI.

  2. Click the Create link to create a new connection.

  3. Enter the following details:

Connection Id

exampleddb

Connection Type

Postgres

Description

Connection to 'example' database

Host

172.21.0.3 (Container IP)

Database

example

Login

example

Password

example

Port

5432

  1. Test the connection.

To use this DAG:

• Ensure that OpenLineage is installed within your Airflow environment.

• Configure the BYPASS_LATEST_VERSION_CHECK and LINEAGE_BACKEND variables as needed.

• Add the DAG file to your Airflow DAGs folder.

• Trigger the DAG manually or just enable it and allow it to run once automatically based on its schedule (@once) to perform the preflight checks.

In dags/, create a file named preflight_check.py and add the following code:

from __future__ import annotations

import logging
import os
import attr

from packaging.version import Version

from airflow import DAG
from airflow.configuration import conf
from airflow import __version__ as airflow_version
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago

# Set this to True to bypass the latest version check for OpenLineage package.
# Version check will be skipped if unable to access PyPI URL
BYPASS_LATEST_VERSION_CHECK = False
# Update this to `CUSTOM` if using any other backend for OpenLineage events ingestion
# When using custom transport - implement custom checks in _verify_custom_backend function
LINEAGE_BACKEND = "MARQUEZ"

log = logging.getLogger(__name__)


def _get_latest_package_version(library_name: str) -> Version | None:
    try:
        import requests

        response = requests.get(f"https://pypi.org/pypi/{library_name}/json")
        response.raise_for_status()
        version_string = response.json()["info"]["version"]
        return Version(version_string)
    except Exception as e:
        log.error(f"Failed to fetch latest version for `{library_name}` from PyPI: {e}")
        return None


def _get_installed_package_version(library_name) -> Version | None:
    try:
        from importlib.metadata import version

        return Version(version(library_name))
    except Exception as e:
        raise ModuleNotFoundError(f"`{library_name}` is not installed") from e


def _provider_can_be_used() -> bool:
    parsed_version = Version(airflow_version)
    if parsed_version < Version("2.1"):
        raise RuntimeError("OpenLineage is not supported in Airflow versions <2.1")
    elif parsed_version >= Version("2.7"):
        return True
    return False


def validate_ol_installation() -> None:
    library_name = "openlineage-airflow"
    if _provider_can_be_used():
        library_name = "apache-airflow-providers-openlineage"

    library_version = _get_installed_package_version(library_name)
    if BYPASS_LATEST_VERSION_CHECK:
        log.info(f"Bypassing the latest version check for `{library_name}`")
        return

    latest_version = _get_latest_package_version(library_name)
    if latest_version is None:
        log.warning(f"Failed to fetch the latest version for `{library_name}`. Skipping version check.")
        return

    if library_version < latest_version:
        raise ValueError(
            f"`{library_name}` is out of date. "
            f"Installed version: `{library_version}`, "
            f"Required version: `{latest_version}`"
            f"Please upgrade the package using `pip install --upgrade {library_name}` or set BYPASS_LATEST_VERSION_CHECK to True"
        )


def _is_transport_set() -> None:
    transport = conf.get("openlineage", "transport", fallback="")
    if transport:
        raise ValueError(
            "Transport value found: `%s`\n"
            "Please check the format at "
            "https://openlineage.io/docs/client/python/#built-in-transport-types",
            transport,
        )
    log.info("Airflow OL transport is not set.")
    return


def _is_config_set(provider: bool = True) -> None:
    if provider:
        config_path = conf.get("openlineage", "config_path", fallback="")
    else:
        config_path = os.getenv("OPENLINEAGE_CONFIG", "")

    if config_path and not _check_openlineage_yml(config_path):
        raise ValueError(
            "Config file is empty or does not exist: `%s`",
            config_path,
        )

    log.info("OL config is not set.")
    return


def _check_openlineage_yml(file_path) -> bool:
    file_path = os.path.expanduser(file_path)
    if os.path.exists(file_path):
        with open(file_path, "r") as file:
            content = file.read()
        if not content:
            raise ValueError(f"Empty file: `{file_path}`")
        raise ValueError(
                f"File found at `{file_path}` with the following content: `{content}`. "
                "Make sure there the configuration is correct."
            )
    log.info("File not found: `%s`", file_path)
    return False


def _check_http_env_vars() -> None:
    from urllib.parse import urljoin

    final_url = urljoin(os.getenv("OPENLINEAGE_URL", ""), os.getenv("OPENLINEAGE_ENDPOINT", ""))
    if final_url:
        raise ValueError("OPENLINEAGE_URL and OPENLINEAGE_ENDPOINT are set to: %s", final_url)
    log.info(
        "OPENLINEAGE_URL and OPENLINEAGE_ENDPOINT are not set. "
        "Please set up OpenLineage using documentation at "
        "https://airflow.apache.org/docs/apache-airflow-providers-openlineage/stable/guides/user.html"
    )
    return


def _debug_missing_transport():
    if _provider_can_be_used():
        _is_config_set(provider=True)
        _is_transport_set()
    _is_config_set(provider=False)
    _check_openlineage_yml("openlineage.yml")
    _check_openlineage_yml("~/.openlineage/openlineage.yml")
    _check_http_env_vars()
    raise ValueError("OpenLineage is missing configuration, please refer to the OL setup docs.")


def _is_listener_accessible():
    if _provider_can_be_used():
        try:
            from airflow.providers.openlineage.plugins.openlineage import OpenLineageProviderPlugin as plugin
        except ImportError as e:
            raise ValueError("OpenLineage provider is not accessible") from e
    else:
        try:
            from openlineage.airflow.plugin import OpenLineagePlugin as plugin
        except ImportError as e:
            raise ValueError("OpenLineage is not accessible") from e

    if len(plugin.listeners) == 1:
        return True

    return False


def _is_ol_disabled():
    if _provider_can_be_used():
        try:
            # apache-airflow-providers-openlineage >= 1.7.0
            from airflow.providers.openlineage.conf import is_disabled
        except ImportError:
            # apache-airflow-providers-openlineage < 1.7.0
            from airflow.providers.openlineage.plugins.openlineage import _is_disabled as is_disabled
    else:
        from openlineage.airflow.plugin import _is_disabled as is_disabled

    if is_disabled():
        if _provider_can_be_used() and conf.getboolean("openlineage", "disabled", fallback=False):
            raise ValueError("OpenLineage is disabled in airflow.cfg: openlineage.disabled")
        elif os.getenv("OPENLINEAGE_DISABLED", "false").lower() == "true":
            raise ValueError(
                "OpenLineage is disabled due to the environment variable OPENLINEAGE_DISABLED"
            )
        raise ValueError(
            "OpenLineage is disabled because required config/env variables are not set. "
            "Please refer to "
            "https://airflow.apache.org/docs/apache-airflow-providers-openlineage/stable/guides/user.html"
        )
    return False


def _get_transport():
    if _provider_can_be_used():
        from airflow.providers.openlineage.plugins.openlineage import OpenLineageProviderPlugin
        transport = OpenLineageProviderPlugin().listeners[0].adapter.get_or_create_openlineage_client().transport
    else:
        from openlineage.airflow.plugin import OpenLineagePlugin
        transport = (
            OpenLineagePlugin.listeners[0].adapter.get_or_create_openlineage_client().transport
        )
    return transport

def is_ol_accessible_and_enabled():
    if not _is_listener_accessible():
        _is_ol_disabled()

    try:
        transport = _get_transport()
    except Exception as e:
        raise ValueError("There was an error when trying to build transport.") from e

    if transport is None or transport.kind in ("noop", "console"):
        _debug_missing_transport()


def validate_connection():
    transport = _get_transport()
    config = attr.asdict(transport.config)
    verify_backend(LINEAGE_BACKEND, config)


def verify_backend(backend_type: str, config: dict):
    backend_type = backend_type.lower()
    if backend_type == "marquez":
        return _verify_marquez_http_backend(config)
    elif backend_type == "atlan":
        return _verify_atlan_http_backend(config)
    elif backend_type == "custom":
        return _verify_custom_backend(config)
    raise ValueError(f"Unsupported backend type: {backend_type}")


def _verify_marquez_http_backend(config):
    log.info("Checking Marquez setup")
    ol_url = config["url"]
    ol_endpoint = config["endpoint"]  # "api/v1/lineage"
    marquez_prefix_path = ol_endpoint[: ol_endpoint.rfind("/") + 1]  # "api/v1/"
    list_namespace_url = ol_url + "/" + marquez_prefix_path + "namespaces"
    import requests

    try:
        response = requests.get(list_namespace_url)
        response.raise_for_status()
    except Exception as e:
        raise ConnectionError(f"Failed to connect to Marquez at `{list_namespace_url}`") from e
    log.info("Airflow is able to access the URL")


def _verify_atlan_http_backend(config):
    raise NotImplementedError("This feature is not implemented yet")


def _verify_custom_backend(config):
    raise NotImplementedError("This feature is not implemented yet")


with DAG(
    dag_id="openlineage_preflight_check_dag",
    start_date=days_ago(1),
    description="A DAG to check OpenLineage setup and configurations",
    schedule_interval="@once",
) as dag:
    validate_ol_installation_task = PythonOperator(
        task_id="validate_ol_installation",
        python_callable=validate_ol_installation,
    )

    is_ol_accessible_and_enabled_task = PythonOperator(
        task_id="is_ol_accessible_and_enabled",
        python_callable=is_ol_accessible_and_enabled,
    )

    validate_connection_task = PythonOperator(
        task_id="validate_connection",
        python_callable=validate_connection,
    )

    validate_ol_installation_task >> is_ol_accessible_and_enabled_task
    is_ol_accessible_and_enabled_task >> validate_connection_task

The DAG defines three main tasks that sequentially execute the above validations:

  1. validate_ol_installation: Confirms that the OpenLineage installation is correct and up-to-date.

  2. is_ol_accessible_and_enabled: Checks if OpenLineage is accessible and enabled within Airflow.

  3. validate_connection: Verifies the connection to the specified lineage backend.

  1. In dags/, create a file named example_bashOperator.py and add the following code:

"""Example DAG demonstrating the usage of the BashOperator."""
from __future__ import annotations

import datetime

import pendulum

from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator

with DAG(
    dag_id="example_bash_operator",
    schedule="0 0 * * *",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    dagrun_timeout=datetime.timedelta(minutes=60),
    tags=["example", "example2"],
    params={"example_key": "example_value"},
) as dag:
    run_this_last = EmptyOperator(
        task_id="run_this_last",
    )

    # [START howto_operator_bash]
    run_this = BashOperator(
        task_id="run_after_loop",
        bash_command="echo https://airflow.apache.org/",
    )
    # [END howto_operator_bash]

    run_this >> run_this_last

    for i in range(3):
        task = BashOperator(
            task_id=f"runme_{i}",
            bash_command='echo "{{ task_instance_key_str }}" && sleep 1',
        )
        task >> run_this

    # [START howto_operator_bash_template]
    also_run_this = BashOperator(
        task_id="also_run_this",
        bash_command='echo "ti_key={{ task_instance_key_str }}"',
    )
    # [END howto_operator_bash_template]
    also_run_this >> run_this_last

# [START howto_operator_bash_skip]
this_will_skip = BashOperator(
    task_id="this_will_skip",
    bash_command='echo "hello world"; exit 99;',
    dag=dag,
)
# [END howto_operator_bash_skip]
this_will_skip >> run_this_last

if __name__ == "__main__":
    dag.test()
  1. RUN the DAG.

x

To make life easier the folks at Damavis have developed an airflow-pentaho-plugin.

airflow-pentaho-plugin

  1. Retrieve the Webserver AirFlow containerID:

docker ps | grep air-flow-project*
CONTAINER ID   IMAGE                                                                                    COMMAND                  CREATED        STATUS                 PORTS                                                                                                                 NAMES
dfcde46e5ae9   air-flow-project_feb761/airflow:latest                                                   "tini -- /entrypoint…"   6 hours ago    Up 6 hours (healthy)   127.0.0.1:8080->8080/tcp                                         
  1. Exec into the container:

docker exec -it <webserver-airflow-container-id> /bin/bash

or use Portainer

  1. Install airflow-pentaho-plugin:

pip install airflow-pentaho-plugin
astro@dfcde46e5ae9:/usr/local/airflow$ pip install airflow-pentaho-plugin
Defaulting to user installation because normal site-packages is not writeable
Looking in indexes: https://pip.astronomer.io/v2/
Looking in links: https://pip.astronomer.io/simple/astronomer-fab-security-manager/, https://pip.astronomer.io/simple/astronomer-airflow-version-check/
Collecting airflow-pentaho-plugin
  Downloading airflow_pentaho_plugin-1.1.2-py3-none-any.whl.metadata (6.2 kB)
Collecting xmltodict>=0.12.0 (from airflow-pentaho-plugin)
  Downloading xmltodict-0.13.0-py2.py3-none-any.whl.metadata (7.7 kB)
Downloading airflow_pentaho_plugin-1.1.2-py3-none-any.whl (20 kB)
Downloading xmltodict-0.13.0-py2.py3-none-any.whl (10.0 kB)
Installing collected packages: xmltodict, airflow-pentaho-plugin
Successfully installed airflow-pentaho-plugin-1.1.2 xmltodict-0.13.0

[notice] A new release of pip is available: 23.3.2 -> 24.0
[notice] To update, run: pip install --upgrade pip
astro@dfcde46e5ae9:/usr/local/airflow$ 
  1. Restart the container:

docker restart <webserver-airflow-container-id>

or Restart using Portainer

  1. Check airflow-pentaho-plugin is installed

x

x

In this workflow, you will learn how to collect dataset and job metadata using the and .

Use d46e465b-d358-4d32-83d4-df660ff614dd to complete the run for my-job with my-output as the output dataset. We also specify the facet to collect the schema for my-output before marking the run as completed. Note, you don't have to specify the input dataset my-input again for the run since it has already been associated with the run ID:

• airflow-1 - The Airflow webserver is available at:

• airflow-scheduler-1 - The monitors all tasks and DAGs, then triggers the task instances once their dependencies are complete.

• airflow-redis-1 - - broker that forwards messages from scheduler to worker.

• airflow-marquez-web-1 - The Marquez webserver is available at:

To view the Marquez, browse to: .

• Set the necessary environment variables for OpenLineage, such as the namespace and the URL or transport mechanism using or .

DAG Tasks

Marquez
LineageAPI
UI
schema
http://localhost:8080
scheduler
The redis
http://localhost:8080
http://localhost:3000
provider package docs
OL docs
​
LogoHomeApache Airflow
Link to Apache Airflow
LogoHome
Link to Openlineage
http://localhost:3000localhost
Link to Marquez UI
http://localhost:3000localhost
Link to Marquez
http://localhost:8080localhost
Link to Apache Airflow
LogoPentaho PDI Plugin for Airflow - Damavis BlogDamavis Blog - Data - Machine Learning - Visualization
Link to Blog
https://localhost:9443/#!/homelocalhost
Logoairflow-pentaho-pluginPyPI
Data Lineage - ETL
Apache Airflow
OpenLineage Stack
Simple - Single Consumers
Complex - Multiple Consumers
Object Model
Facet examples
Marquez metadata storage model
Marquez versioning model
my-job
my-output
Connection to example database
example_bash_operator.py