Data Lineage
Automate pipelines with Apache Airflow ..
Last updated
Automate pipelines with Apache Airflow ..
Last updated
Data lineage in a data catalog outlines the lifecycle of data, detailing its journey from origin to endpoint across various processes and transformations. It offers a clear, visual map of data's provenance, its modifications, and its final location. This mapping includes tracing data from source to destination, capturing all steps, transformations, and processes it encounters along the way.
The importance of data lineage lies in its role in ensuring data integrity, supporting compliance with regulations, facilitating error root cause analysis, and enhancing overall data governance and management. By providing a comprehensive view of data’s journey, data lineage is an indispensable tool for maintaining high-quality, reliable data in a data catalog.
An automated data pipeline with Apache Airflow, OpenLineage, and Marquez consists of several integral components.
Apache Airflow serves as the orchestrator, managing the execution of workflows and ensuring tasks run in the correct sequence.
OpenLineage acts as the standard for tracking and reporting data lineage, offering a clear way to record the flow of data across different jobs and transformations.
Marquez functions as the metadata repository, storing detailed information about datasets, jobs, and their executions, enabling rich lineage tracking and metadata querying.
Together, these tools provide a robust framework for orchestrating, monitoring, and managing data workflows, ensuring data integrity, compliance, and ease of troubleshooting.
Apache Airflow plays a crucial role in orchestrating data pipelines by managing and scheduling complex workflows. It enables users to define dependencies between tasks, ensuring they run in the correct sequence.
Airflow provides a user-friendly interface to monitor task execution, visualize data flow, and troubleshoot any issues that arise. With its ability to integrate with various data processing tools, Airflow automates the entire data lifecycle, from extraction and transformation to loading and reporting, making it an essential component for scalable and reliable data pipeline management.
Pentaho Data Catalog adheres to the OpenLineage standards.
OpenLineage is an open platform for collection and analysis of data lineage. It tracks metadata about datasets, jobs, and runs, giving users the information required to identify the root cause of complex issues and understand the impact of changes. OpenLineage contains an open standard for lineage data collection, a metadata repository reference implementation (Marquez), libraries for common languages, and integrations with data pipeline tools.
At the core of OpenLineage is a standard API for capturing lineage events. Pipeline components - like schedulers, warehouses, analysis tools, and SQL engines - can use this API to send data about runs, jobs, and datasets to a compatible OpenLineage backend for further study.
OpenLineage supports both simple deployments with single consumers and complex deployments with multiple consumers.
OpenLineage’s object model provides a structured approach to track the lifecycle of data across various systems, ensuring clarity and consistency. It comprises several key entities:
Datasets: These are the fundamental units representing data sources or outputs in the lineage graph. Each dataset captures critical metadata including schema, location, and other characteristics that define its structure and content. Datasets help in identifying the data at different stages of the pipeline.
Jobs: Jobs denote the processes or operations performed on datasets. They encapsulate the logic of the data transformation and include metadata about the specific task executed, its parameters, and any associated code or scripts. Jobs are essential for understanding how data is manipulated within the pipeline.
Runs: Runs are instances of job executions. They carry runtime metadata such as execution time, duration, status (e.g., success, failure), and logs. Runs provide a temporal aspect to lineage, allowing users to trace back to specific executions and understand the context and outcome of each job run.
These components interact through a standard API provided by OpenLineage. This API allows various pipeline components—such as schedulers, data warehouses, analysis tools, and SQL engines—to send lineage events to an OpenLineage backend. This integration facilitates comprehensive lineage tracking, enabling users to gain insights into data transformations, track data flows, and maintain data integrity. Through this structured model, OpenLineage helps ensure that data provenance is transparent and traceable, which is crucial for data governance, compliance, and troubleshooting.
Facets in OpenLineage provide a detailed view of metadata for lineage events, extending the core model with additional information about datasets, jobs, and runs. They are structured as key-value pairs, allowing for customizable and extensible metadata descriptions. These facets serve as building blocks that provide context and deeper insights into the data lineage, enhancing the ability to perform thorough analysis and detailed tracking.
Key types of facets include:
Data Quality Facet: This facet captures a range of data quality metrics, such as null counts, distinct counts, and summary statistics. By monitoring these metrics, the data quality facet helps to ensure that the data retains its integrity throughout its lifecycle.
Ownership Facet: This facet details ownership information, identifying the data owner or steward responsible for a dataset. It is crucial for data governance, as it clarifies accountability and provides a direct point of contact for queries or issues related to the data.
Schema Facet: This facet describes the structure of datasets, including information about column names, data types, and other structural attributes. It helps in understanding the organization and format of the data, which is essential for data processing and transformation activities.
Location Facet: This facet specifies the physical or logical location of data, such as file paths, URLs, or database addresses. Knowing where the data resides is important for data access, security, and management.
These facets enrich lineage data by adding layers of context and information, thereby facilitating comprehensive lineage tracking and detailed metadata management. They allow users to gain insights into various aspects of data and its movement, making it easier to trace data flows, understand dependencies, and maintain data integrity. The ability to extend and customize facets enables organizations to adapt the lineage model to their specific needs, supporting a wide range of data governance, compliance, and operational requirements.
Marquez is a metadata service for data lineage. It collects, stores, and visualizes metadata about datasets, jobs, and runs in a centralized repository. By integrating with various data pipeline tools, Marquez tracks end-to-end data lineage, helping users to query historical changes, investigate data dependencies, and ensure data governance. Its open metadata model and versioning capabilities enable detailed exploration of data provenance, making it easier to perform audits, troubleshoot issues, and comply with regulatory requirements.
Marquez’s metadata storage model centralizes the representation of end-to-end pipeline metadata, supporting versioning and flexible data lineage queries. It efficiently tracks and associates dependencies between jobs and datasets, allowing users to query historical changes and compare schema versions. This model is particularly beneficial for auditing and troubleshooting affected jobs downstream of any schema modifications. By normalizing metadata across various sources and frameworks, Marquez ensures comprehensive metadata management and lineage tracking.
The metadata storage model in Marquez employs a relational database to store the details of datasets, jobs, and runs, capturing a wide range of metadata attributes.
For datasets, it includes properties such as schema information, physical location, and version history.
Jobs, which represent the transformations applied to data, store metadata about their execution logic, parameters, and execution code.
Runs, or instances of job executions, maintain runtime details like start/end times, status, duration, and logs.
Marquez’s system also supports versioning of datasets and jobs, enabling users to navigate the historical changes and understand the evolution of their data pipelines. This is particularly valuable for compliance and governance, as it ensures that any transformations can be traced and validated. The flexible query capabilities provided by Marquez allow for detailed lineage exploration, where users can identify the upstream and downstream impacts of changes to their datasets or jobs, thus aiding in root cause analysis and impact assessment.
Integrating with various data pipeline tools, Marquez acts as a centralized metadata repository, promoting consistency and reducing fragmentation across different systems. Its open metadata model can seamlessly adapt to diverse data environments, ensuring all metadata is standardized and accessible. This harmonized approach simplifies data governance, allowing for efficient monitoring, auditing, and reporting activities.
Integrating with various data pipeline tools, Marquez acts as a centralized metadata repository, promoting consistency and reducing fragmentation across different systems. Its open metadata model can seamlessly adapt to diverse data environments, ensuring all metadata is standardized and accessible. This harmonized approach simplifies data governance, allowing for efficient monitoring, auditing, and reporting activities. x
Run-level metadata is tracked via HTTP API calls to /lineage
using OpenLineage.
A run has a unique ID and records it’s code version, inputs and outputs, run args, and run state transitions.
When a run transitions to a complete state, the output datasets for the run are created if not already registered with Marquez.
A dataset version pointer is present for each input and output dataset and maintained by Marquez keeping track of the historical set of metadata changes. Each immutable dataset version is mapped to a metadata change and the run ID that modifyed the dataset preserving it’s state at some given point in time.
A job version pointer is also present and mapped to a unique referenceable link to code, the latest run ID, and one or more versioned input and output datasets.
When you submit a lineage event, you first need to define a unique run ID
similar to d46e465b-d358-4d32-83d4-df660ff614dd
. UUID format is recommended, and it should be unique. This run ID will enable the tracking of run-level metadata over time for a job that may have a name, like my-job
.
Start a RUN
curl -X POST http://localhost:5000/api/v1/lineage \
-i -H 'Content-Type: application/json' \
-d '{
"eventType": "START",
"eventTime": "2020-12-28T19:52:00.001+10:00",
"run": {
"runId": "d46e465b-d358-4d32-83d4-df660ff614dd"
},
"job": {
"namespace": "my-namespace",
"name": "my-job"
},
"inputs": [{
"namespace": "my-namespace",
"name": "my-input"
}],
"producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"schemaURL": "https://openlineage.io/spec/1-0-5/OpenLineage.json#/definitions/RunEvent"
}'
HTTP/1.1 201 Created
Date: Wed, 29 May 2024 17:46:17 GMT
Content-Length: 0
Complete the RUN
curl -X POST http://localhost:5000/api/v1/lineage \
-i -H 'Content-Type: application/json' \
-d '{
"eventType": "COMPLETE",
"eventTime": "2020-12-28T20:52:00.001+10:00",
"run": {
"runId": "d46e465b-d358-4d32-83d4-df660ff614dd"
},
"job": {
"namespace": "my-namespace",
"name": "my-job"
},
"outputs": [{
"namespace": "my-namespace",
"name": "my-output",
"facets": {
"schema": {
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"_schemaURL": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/spec/OpenLineage.json#/definitions/SchemaDatasetFacet",
"fields": [
{ "name": "a", "type": "VARCHAR"},
{ "name": "b", "type": "VARCHAR"}
]
}
}
}],
"producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"schemaURL": "https://openlineage.io/spec/1-0-5/OpenLineage.json#/definitions/RunEvent"
}'
HTTP/1.1 201 Created
Date: Wed, 29 May 2024 17:55:14 GMT
Content-Length: 0
Browse to:
Use the search bar in the upper right-side of the page and search for the job my-job
. To view lineage metadata for my-job
, click on the job from the drop-down list:
In the search result, you should see the job namespace
and name
, and in the lineage graph you should see my-input
as an input dataset and my-output
as an output dataset. In the RUN HISTORY
tab on the Job Detail page below the graph, the job run state should be COMPLETED
.
Finally, click on the output dataset my-output
for my-job
. Metadata displayed includes the name, column names, data types, and more:
This project focuses on:
Deploying & configuring: Apache Airflow + Openlineage + Marquez in Docker
Define a connection to exampledb database
Run a test DAG - bashOperator.py
This section is for Reference only.
Apache Airflow + Openlinegae & Marquez have been installed & configured.
An Airflow-Project contains the set of files necessary to run Airflow, including dedicated folders for your DAG files, plugins, and dependencies.
Create Airflow project folders.
cd
mkdir -p Airflow-Project/{dags,docker,include,plugins,tests} && cd Airflow-Project
Download the required Marquez scripts to download / initialze containers, docker network & create volumes.
cd
cd ~/Airflow-Project/docker
curl -O "https://raw.githubusercontent.com/MarquezProject/marquez/examples/airflow/docker/{entrypoint.sh,wait-for-it.sh}"
Check the Airflow-Project directory
cd
cd ~/Airflow-Project
ls -al
$ ls -a
.
├── dags # Where your DAGs go
├── docker
├── plugins # For any custom or community Airflow plugins
├── docker-compose.yml
├── openlineage.env # Local environment variables
└── requirements.txt # For Python package
Add the OpenLineage Airflow Provider and the Common SQL Provider to the requirements.txt file.
cd
cd ~/Airflow-Project
touch requirements.txt
nano requirements.txt
Add the following Providers:
# apache airflow providers
apache-airflow-providers-common-sql==1.7.2
apache-airflow-providers-openlineage==1.1.0
apache-airflow-providers-github==2.6.0
Save.
CTRL + o
Enter
CTRL + x
To configure Airflow to send lineage metadata to Marquez, configure database connection and allow DAGs to be triggered by params, add the following environment variables.
cd
cd ~/Airflow-Project
touch openlineage.env
nano openlineage.env
Add:
OPENLINEAGE_URL=http://marquez:5000
OPENLINEAGE_NAMESPACE=example
AIRFLOW__OPENLINEAGE__TRANSPORT='{"type": "http", "url": "http://marquez:5000", "endpoint": "api/v1/lineage"}'
AIRFLOW__CORE__TEST_CONNECTION=Enabled
AIRFLOW__WEBSERVER__SHOW_TRIGGER_FORM_IF_NO_PARAMS=True
OPENLINEAGE_URL - URL of Marquez API
OPENLINEAGE_NAMESPACE - Marquez Namespace
AIRFLOW__OPENLINEAGE__TRANSPORT - Where to push the lineage metrics - Marquez
AIRFLOW__CORE__TEST_CONNECTION - Disabled by default. Test connections
AIRFLOW__WEBSERVER__SHOW_TRIGGER_FORM_IF_NO_PARAMS - Trigger DAG w/ config button to appear in the Airflow UI
Save.
CTRL + o
Enter
CTRL + x
Create new file docker-compose.yml
in your project and copy/paste the following into the file:
Create new file docker-compose.yml
cd
cd ~/Airflow-Project
nano docker-compose.yml
Copy and paste the following content.
# SPDX-License-Identifier: Apache-2.0
version: "3.8"
services:
airflow:
image: bitnami/airflow:2
ports:
- "8080:8080"
env_file:
- openlineage.env
environment:
- AIRFLOW_USERNAME=airflow
- AIRFLOW_PASSWORD=airflow
- AIRFLOW_EMAIL=airflow@example.com
- AIRFLOW_FERNET_KEY=Z2uDm0ZL60fXNkEXG8LW99Ki2zf8wkmIltaTz1iQPDU=
- AIRFLOW_DATABASE_HOST=postgres
- AIRFLOW_DATABASE_NAME=airflow
- AIRFLOW_DATABASE_USERNAME=airflow
- AIRFLOW_DATABASE_PASSWORD=airflow
- AIRFLOW_EXECUTOR=CeleryExecutor
- AIRFLOW_LOAD_EXAMPLES=no
- AIRFLOW_CONN_EXAMPLE_DB=postgres://example:example@postgres:6543/example
volumes:
- ./dags:/opt/bitnami/airflow/dags
- ${PWD}/whl:/whl
- type: bind
source: ${PWD}/requirements.txt
target: /bitnami/python/requirements.txt
airflow_scheduler:
image: bitnami/airflow-scheduler:2
env_file:
- openlineage.env
environment:
- AIRFLOW_FERNET_KEY=Z2uDm0ZL60fXNkEXG8LW99Ki2zf8wkmIltaTz1iQPDU=
- AIRFLOW_DATABASE_HOST=postgres
- AIRFLOW_DATABASE_NAME=airflow
- AIRFLOW_DATABASE_USERNAME=airflow
- AIRFLOW_DATABASE_PASSWORD=airflow
- AIRFLOW_EXECUTOR=CeleryExecutor
- AIRFLOW_LOAD_EXAMPLES=no
- AIRFLOW_CONN_EXAMPLE_DB=postgres://example:example@postgres:6543/example
- AIRFLOW_WEBSERVER_HOST=airflow
volumes:
- ./dags:/opt/bitnami/airflow/dags
- ${PWD}/whl:/whl
- type: bind
source: ${PWD}/requirements.txt
target: /bitnami/python/requirements.txt
airflow_worker:
image: bitnami/airflow-worker:2
env_file:
- openlineage.env
environment:
- AIRFLOW_FERNET_KEY=Z2uDm0ZL60fXNkEXG8LW99Ki2zf8wkmIltaTz1iQPDU=
- AIRFLOW_DATABASE_HOST=postgres
- AIRFLOW_DATABASE_NAME=airflow
- AIRFLOW_DATABASE_USERNAME=airflow
- AIRFLOW_DATABASE_PASSWORD=airflow
- AIRFLOW_EXECUTOR=CeleryExecutor
- AIRFLOW_LOAD_EXAMPLES=no
- AIRFLOW_CONN_EXAMPLE_DB=postgres://example:example@postgres:6543/example
- AIRFLOW_WEBSERVER_HOST=airflow
volumes:
- ./dags:/opt/bitnami/airflow/dags
- ${PWD}/whl:/whl
- type: bind
source: ${PWD}/requirements.txt
target: /bitnami/python/requirements.txt
marquez:
image: marquezproject/marquez:latest
ports:
- "5000:5000"
- "5001:5001"
volumes:
- ./docker/wait-for-it.sh:/usr/src/app/wait-for-it.sh
depends_on:
- postgres
entrypoint: ["./wait-for-it.sh", "postgres:6543", "--", "./entrypoint.sh"]
# Enables SQL statement logging (see: https://www.postgresql.org/docs/12/runtime-config-logging.html#GUC-LOG-STATEMENT)
# command: ["postgres", "-c", "log_statement=all"]
marquez_web:
image: marquezproject/marquez-web:latest
environment:
- MARQUEZ_HOST=marquez
- MARQUEZ_PORT=5000
ports:
- "3000:3000"
stdin_open: true
tty: true
depends_on:
- marquez
postgres:
image: bitnami/postgresql:12.1.0
ports:
- "6543:6543"
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- AIRFLOW_USER=airflow
- AIRFLOW_PASSWORD=airflow
- AIRFLOW_DB=airflow
- MARQUEZ_USER=marquez
- MARQUEZ_PASSWORD=marquez
- MARQUEZ_DB=marquez
- EXAMPLE_USER=example
- EXAMPLE_PASSWORD=example
- EXAMPLE_DB=example
- ALLOW_EMPTY_PASSWORD=yes
volumes:
- ./docker/init-db.sh:/docker-entrypoint-initdb.d/init-db.sh
redis:
image: bitnami/redis:6.0.6
environment:
- ALLOW_EMPTY_PASSWORD=yes
Save.
CTRL + o
Enter
CTRL + x
When starting the Airflow-OpenLineage-Marquez for the first time, be patient, as the Airflow-OpenLineage-Marquez environment is deployed.
Execute the following command to start Airflow-OpenLineage-Marquez environment.
cd
cd ~/Airflow-Project
docker compose up -d
• airflow-worker-1
- The worker that executes the tasks given by the scheduler.
• airflow-postgres-1
- The database.
• airflow-marquez-1
- Marquez API.
To view the Airflow UI and verify it's running, open:
x
Username
airflow
Password
airflow
x
Airflow variables are key value pairs that can be accessed from any DAG in your Airflow environment. Because the variable my_github_repo
is used in the DAG code with a default of apache/airflow
, you'll need to create the variable and give it a value in the Airflow UI to wait for a commit in your own repository.
Unlike DAG code changes, package dependency changes require a complete restart of Airflow.
Go to Admin -> Variables to open the list of Airflow variables.
x
Click on the + sign to open the form for adding a new variable.
Set the Key for the variable as:
airflow_github_repo
and set the Val as:
GitHub repository you have administrator access to. Make sure the Val is in the format github_account_name/repository_name
(for example apache/airflow
). The repository can be private.
Click Save.
x
x
In this step, you will create an Airflow DAG that performs simple tasks using the bashOperator. The example_bash-operator
DAG
This section is for Reference only.
The exampledb databse connection has been configured in Airflow & DAG
Open the Admin->Connections
section of the UI.
Click the Create
link to create a new connection.
Enter the following details:
Connection Id
exampleddb
Connection Type
Postgres
Description
Connection to 'example' database
Host
172.21.0.3 (Container IP)
Database
example
Login
example
Password
example
Port
5432
Test the connection.
To use this DAG:
• Ensure that OpenLineage is installed within your Airflow environment.
• Configure the BYPASS_LATEST_VERSION_CHECK
and LINEAGE_BACKEND
variables as needed.
• Add the DAG file to your Airflow DAGs folder.
• Trigger the DAG manually or just enable it and allow it to run once automatically based on its schedule (@once) to perform the preflight checks.
In dags/
, create a file named preflight_check.py
and add the following code:
from __future__ import annotations
import logging
import os
import attr
from packaging.version import Version
from airflow import DAG
from airflow.configuration import conf
from airflow import __version__ as airflow_version
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
# Set this to True to bypass the latest version check for OpenLineage package.
# Version check will be skipped if unable to access PyPI URL
BYPASS_LATEST_VERSION_CHECK = False
# Update this to `CUSTOM` if using any other backend for OpenLineage events ingestion
# When using custom transport - implement custom checks in _verify_custom_backend function
LINEAGE_BACKEND = "MARQUEZ"
log = logging.getLogger(__name__)
def _get_latest_package_version(library_name: str) -> Version | None:
try:
import requests
response = requests.get(f"https://pypi.org/pypi/{library_name}/json")
response.raise_for_status()
version_string = response.json()["info"]["version"]
return Version(version_string)
except Exception as e:
log.error(f"Failed to fetch latest version for `{library_name}` from PyPI: {e}")
return None
def _get_installed_package_version(library_name) -> Version | None:
try:
from importlib.metadata import version
return Version(version(library_name))
except Exception as e:
raise ModuleNotFoundError(f"`{library_name}` is not installed") from e
def _provider_can_be_used() -> bool:
parsed_version = Version(airflow_version)
if parsed_version < Version("2.1"):
raise RuntimeError("OpenLineage is not supported in Airflow versions <2.1")
elif parsed_version >= Version("2.7"):
return True
return False
def validate_ol_installation() -> None:
library_name = "openlineage-airflow"
if _provider_can_be_used():
library_name = "apache-airflow-providers-openlineage"
library_version = _get_installed_package_version(library_name)
if BYPASS_LATEST_VERSION_CHECK:
log.info(f"Bypassing the latest version check for `{library_name}`")
return
latest_version = _get_latest_package_version(library_name)
if latest_version is None:
log.warning(f"Failed to fetch the latest version for `{library_name}`. Skipping version check.")
return
if library_version < latest_version:
raise ValueError(
f"`{library_name}` is out of date. "
f"Installed version: `{library_version}`, "
f"Required version: `{latest_version}`"
f"Please upgrade the package using `pip install --upgrade {library_name}` or set BYPASS_LATEST_VERSION_CHECK to True"
)
def _is_transport_set() -> None:
transport = conf.get("openlineage", "transport", fallback="")
if transport:
raise ValueError(
"Transport value found: `%s`\n"
"Please check the format at "
"https://openlineage.io/docs/client/python/#built-in-transport-types",
transport,
)
log.info("Airflow OL transport is not set.")
return
def _is_config_set(provider: bool = True) -> None:
if provider:
config_path = conf.get("openlineage", "config_path", fallback="")
else:
config_path = os.getenv("OPENLINEAGE_CONFIG", "")
if config_path and not _check_openlineage_yml(config_path):
raise ValueError(
"Config file is empty or does not exist: `%s`",
config_path,
)
log.info("OL config is not set.")
return
def _check_openlineage_yml(file_path) -> bool:
file_path = os.path.expanduser(file_path)
if os.path.exists(file_path):
with open(file_path, "r") as file:
content = file.read()
if not content:
raise ValueError(f"Empty file: `{file_path}`")
raise ValueError(
f"File found at `{file_path}` with the following content: `{content}`. "
"Make sure there the configuration is correct."
)
log.info("File not found: `%s`", file_path)
return False
def _check_http_env_vars() -> None:
from urllib.parse import urljoin
final_url = urljoin(os.getenv("OPENLINEAGE_URL", ""), os.getenv("OPENLINEAGE_ENDPOINT", ""))
if final_url:
raise ValueError("OPENLINEAGE_URL and OPENLINEAGE_ENDPOINT are set to: %s", final_url)
log.info(
"OPENLINEAGE_URL and OPENLINEAGE_ENDPOINT are not set. "
"Please set up OpenLineage using documentation at "
"https://airflow.apache.org/docs/apache-airflow-providers-openlineage/stable/guides/user.html"
)
return
def _debug_missing_transport():
if _provider_can_be_used():
_is_config_set(provider=True)
_is_transport_set()
_is_config_set(provider=False)
_check_openlineage_yml("openlineage.yml")
_check_openlineage_yml("~/.openlineage/openlineage.yml")
_check_http_env_vars()
raise ValueError("OpenLineage is missing configuration, please refer to the OL setup docs.")
def _is_listener_accessible():
if _provider_can_be_used():
try:
from airflow.providers.openlineage.plugins.openlineage import OpenLineageProviderPlugin as plugin
except ImportError as e:
raise ValueError("OpenLineage provider is not accessible") from e
else:
try:
from openlineage.airflow.plugin import OpenLineagePlugin as plugin
except ImportError as e:
raise ValueError("OpenLineage is not accessible") from e
if len(plugin.listeners) == 1:
return True
return False
def _is_ol_disabled():
if _provider_can_be_used():
try:
# apache-airflow-providers-openlineage >= 1.7.0
from airflow.providers.openlineage.conf import is_disabled
except ImportError:
# apache-airflow-providers-openlineage < 1.7.0
from airflow.providers.openlineage.plugins.openlineage import _is_disabled as is_disabled
else:
from openlineage.airflow.plugin import _is_disabled as is_disabled
if is_disabled():
if _provider_can_be_used() and conf.getboolean("openlineage", "disabled", fallback=False):
raise ValueError("OpenLineage is disabled in airflow.cfg: openlineage.disabled")
elif os.getenv("OPENLINEAGE_DISABLED", "false").lower() == "true":
raise ValueError(
"OpenLineage is disabled due to the environment variable OPENLINEAGE_DISABLED"
)
raise ValueError(
"OpenLineage is disabled because required config/env variables are not set. "
"Please refer to "
"https://airflow.apache.org/docs/apache-airflow-providers-openlineage/stable/guides/user.html"
)
return False
def _get_transport():
if _provider_can_be_used():
from airflow.providers.openlineage.plugins.openlineage import OpenLineageProviderPlugin
transport = OpenLineageProviderPlugin().listeners[0].adapter.get_or_create_openlineage_client().transport
else:
from openlineage.airflow.plugin import OpenLineagePlugin
transport = (
OpenLineagePlugin.listeners[0].adapter.get_or_create_openlineage_client().transport
)
return transport
def is_ol_accessible_and_enabled():
if not _is_listener_accessible():
_is_ol_disabled()
try:
transport = _get_transport()
except Exception as e:
raise ValueError("There was an error when trying to build transport.") from e
if transport is None or transport.kind in ("noop", "console"):
_debug_missing_transport()
def validate_connection():
transport = _get_transport()
config = attr.asdict(transport.config)
verify_backend(LINEAGE_BACKEND, config)
def verify_backend(backend_type: str, config: dict):
backend_type = backend_type.lower()
if backend_type == "marquez":
return _verify_marquez_http_backend(config)
elif backend_type == "atlan":
return _verify_atlan_http_backend(config)
elif backend_type == "custom":
return _verify_custom_backend(config)
raise ValueError(f"Unsupported backend type: {backend_type}")
def _verify_marquez_http_backend(config):
log.info("Checking Marquez setup")
ol_url = config["url"]
ol_endpoint = config["endpoint"] # "api/v1/lineage"
marquez_prefix_path = ol_endpoint[: ol_endpoint.rfind("/") + 1] # "api/v1/"
list_namespace_url = ol_url + "/" + marquez_prefix_path + "namespaces"
import requests
try:
response = requests.get(list_namespace_url)
response.raise_for_status()
except Exception as e:
raise ConnectionError(f"Failed to connect to Marquez at `{list_namespace_url}`") from e
log.info("Airflow is able to access the URL")
def _verify_atlan_http_backend(config):
raise NotImplementedError("This feature is not implemented yet")
def _verify_custom_backend(config):
raise NotImplementedError("This feature is not implemented yet")
with DAG(
dag_id="openlineage_preflight_check_dag",
start_date=days_ago(1),
description="A DAG to check OpenLineage setup and configurations",
schedule_interval="@once",
) as dag:
validate_ol_installation_task = PythonOperator(
task_id="validate_ol_installation",
python_callable=validate_ol_installation,
)
is_ol_accessible_and_enabled_task = PythonOperator(
task_id="is_ol_accessible_and_enabled",
python_callable=is_ol_accessible_and_enabled,
)
validate_connection_task = PythonOperator(
task_id="validate_connection",
python_callable=validate_connection,
)
validate_ol_installation_task >> is_ol_accessible_and_enabled_task
is_ol_accessible_and_enabled_task >> validate_connection_task
The DAG defines three main tasks that sequentially execute the above validations:
validate_ol_installation
: Confirms that the OpenLineage installation is correct and up-to-date.
is_ol_accessible_and_enabled
: Checks if OpenLineage is accessible and enabled within Airflow.
validate_connection
: Verifies the connection to the specified lineage backend.
In dags/
, create a file named example_bashOperator.py
and add the following code:
"""Example DAG demonstrating the usage of the BashOperator."""
from __future__ import annotations
import datetime
import pendulum
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
with DAG(
dag_id="example_bash_operator",
schedule="0 0 * * *",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
dagrun_timeout=datetime.timedelta(minutes=60),
tags=["example", "example2"],
params={"example_key": "example_value"},
) as dag:
run_this_last = EmptyOperator(
task_id="run_this_last",
)
# [START howto_operator_bash]
run_this = BashOperator(
task_id="run_after_loop",
bash_command="echo https://airflow.apache.org/",
)
# [END howto_operator_bash]
run_this >> run_this_last
for i in range(3):
task = BashOperator(
task_id=f"runme_{i}",
bash_command='echo "{{ task_instance_key_str }}" && sleep 1',
)
task >> run_this
# [START howto_operator_bash_template]
also_run_this = BashOperator(
task_id="also_run_this",
bash_command='echo "ti_key={{ task_instance_key_str }}"',
)
# [END howto_operator_bash_template]
also_run_this >> run_this_last
# [START howto_operator_bash_skip]
this_will_skip = BashOperator(
task_id="this_will_skip",
bash_command='echo "hello world"; exit 99;',
dag=dag,
)
# [END howto_operator_bash_skip]
this_will_skip >> run_this_last
if __name__ == "__main__":
dag.test()
RUN the DAG.
x
To make life easier the folks at Damavis have developed an airflow-pentaho-plugin.
Retrieve the Webserver AirFlow containerID:
docker ps | grep air-flow-project*
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
dfcde46e5ae9 air-flow-project_feb761/airflow:latest "tini -- /entrypoint…" 6 hours ago Up 6 hours (healthy) 127.0.0.1:8080->8080/tcp
Exec into the container:
docker exec -it <webserver-airflow-container-id> /bin/bash
or use Portainer
Install airflow-pentaho-plugin:
pip install airflow-pentaho-plugin
astro@dfcde46e5ae9:/usr/local/airflow$ pip install airflow-pentaho-plugin
Defaulting to user installation because normal site-packages is not writeable
Looking in indexes: https://pip.astronomer.io/v2/
Looking in links: https://pip.astronomer.io/simple/astronomer-fab-security-manager/, https://pip.astronomer.io/simple/astronomer-airflow-version-check/
Collecting airflow-pentaho-plugin
Downloading airflow_pentaho_plugin-1.1.2-py3-none-any.whl.metadata (6.2 kB)
Collecting xmltodict>=0.12.0 (from airflow-pentaho-plugin)
Downloading xmltodict-0.13.0-py2.py3-none-any.whl.metadata (7.7 kB)
Downloading airflow_pentaho_plugin-1.1.2-py3-none-any.whl (20 kB)
Downloading xmltodict-0.13.0-py2.py3-none-any.whl (10.0 kB)
Installing collected packages: xmltodict, airflow-pentaho-plugin
Successfully installed airflow-pentaho-plugin-1.1.2 xmltodict-0.13.0
[notice] A new release of pip is available: 23.3.2 -> 24.0
[notice] To update, run: pip install --upgrade pip
astro@dfcde46e5ae9:/usr/local/airflow$
Restart the container:
docker restart <webserver-airflow-container-id>
or Restart using Portainer
Check airflow-pentaho-plugin is installed
x
x
In this workflow, you will learn how to collect dataset and job metadata using the and .
Use d46e465b-d358-4d32-83d4-df660ff614dd
to complete the run for my-job
with my-output
as the output dataset. We also specify the facet to collect the schema for my-output
before marking the run as completed. Note, you don't have to specify the input dataset my-input
again for the run since it has already been associated with the run ID:
• airflow-1
- The Airflow webserver is available at:
• airflow-scheduler-1
- The monitors all tasks and DAGs, then triggers the task instances once their dependencies are complete.
• airflow-redis-1
- - broker that forwards messages from scheduler to worker.
• airflow-marquez-web-1
- The Marquez webserver is available at:
To view the Marquez, browse to: .
• Set the necessary environment variables for OpenLineage, such as the namespace and the URL or transport mechanism using or .