Skip to content

Commit

Permalink
DAG code with postgres database operation plus addl screenshots.
Browse files Browse the repository at this point in the history
Signed-off-by: merobi-hub <merobi@gmail.com>
  • Loading branch information
merobi-hub committed Dec 7, 2024
1 parent 47f8590 commit e2d2eb6
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 24 deletions.
138 changes: 114 additions & 24 deletions docs/v2/docs/airflow_tutorial/index.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@ import Tabs from '@theme/Tabs'; import TabItem from '@theme/TabItem';
## Table of Contents

1. [Prerequisites](#prerequisites)
2. [Get and Start Marquez](#get-marquez)
3. [Get Airflow](#get-airflow)
4. [Collecting Live Metadata](#collecting-live-metadata)
5. [Summary](#summary)
6. [Next Steps](#next-steps)
7. [Feedback](#feedback)
2. [Get and start Marquez](#get-marquez)
3. [Configure Airflow to send events to Marquez](#configure-airflow)
4. [View Airflow operational analytics and data lineage in Marquez](#view-airflow)
5. [Next steps](#next-steps)
6. [Feedback](#feedback)

# Prerequisites {#prerequisites}

Expand All @@ -23,23 +22,24 @@ Before you begin, make sure you have installed:
<Tabs groupId="prereqs">
<TabItem value="macos" label="MacOS/Linux">

* [Docker 17.05](https://docs.docker.com/install)+
* [Docker 17.05+](https://docs.docker.com/install)
* [Docker Compose](https://docs.docker.com/compose/install)
* [Airflow 2.8+](https://airflow.apache.org/docs/apache-airflow/stable/start.html)
* [PostgreSQL 14+](https://www.postgresql.org)

</TabItem>
<TabItem value="windows" label="Windows">

* [Git Bash](https://gitforwindows.org/)
* [PostgreSQL 14](https://www.postgresql.org/)
* [Docker 17.05](https://docs.docker.com/install)+
* [PostgreSQL 14+](https://www.postgresql.org/)
* [Docker 17.05+](https://docs.docker.com/install)
* [Docker Compose](https://docs.docker.com/compose/install)
* [Airflow 2.8+](https://airflow.apache.org/docs/apache-airflow/stable/start.html)

</TabItem>
</Tabs>

## Get and Start Marquez
## Get and Start Marquez {#get-marquez}

To checkout the Marquez source code, run:

Expand All @@ -61,7 +61,7 @@ $ git clone https://github.com/MarquezProject/marquez && cd marquez
</TabItem>
</Tabs>

Both Airflow and Marquez require port 5432 for their metastores, but the Marquez services are much easier to configure on the fly. So start Marquez with an alternate port supplied to the `db-port` parameter:
Both Airflow and Marquez require port 5432 for their metastores, but the Marquez services are much easier to configure, even on the fly. So start Marquez with an alternate port supplied to the `db-port` parameter:

<Tabs groupId="start">
<TabItem value="macos" label="MacOS/Linux">
Expand All @@ -82,30 +82,120 @@ $ sh ./docker/up.sh --db-port 2345
</TabItem>
</Tabs>

To view the Marquez UI and verify it's running, open [http://localhost:3000](http://localhost:3000). The UI enables you to discover dependencies between jobs and the datasets they produce and consume via the lineage graph, view run-level metadata of current and previous job runs, and much more!
To view the Marquez UI and verify it's running, open [http://localhost:3000](http://localhost:3000). The UI allow you to discover dependencies between jobs and the datasets they produce and consume via the lineage graph, view run-level metadata of current and previous job runs, and get a high-level view of current and historical operations.

## Configure Airflow to send events to Marquez
## Configure Airflow to send events to Marquez {#configure-airflow}

To configure Airflow to emit OpenLineage events to Marquez, you need to define an OpenLineage transport. This is easy to do with an environment variable. Run:
1. To configure Airflow to emit OpenLineage events to Marquez, you need to define an OpenLineage transport and namespace. This is easy to do using environment variables. Run:

```bash
$ export AIRFLOW__OPENLINEAGE__TRANSPORT='{"type": "http", "url": "http://localhost:5000", "endpoint": "api/v1/lineage"}'
```
```bash
$ export AIRFLOW__OPENLINEAGE__TRANSPORT='{"type": "http", "url": "http://localhost:5000", "endpoint": "api/v1/lineage"}'
```

To add the required Airflow OpenLineage Provider package to your Airflow environment, run:
```bash
$ export AIRFLOW__OPENLINEAGE__NAMESPACE='my-team-airflow-instance'
```

```bash
pip install apache-airflow-providers-openlineage
```
2. To add the required Airflow OpenLineage Provider package to your Airflow environment, run:

```bash
$ pip install apache-airflow-providers-openlineage
```

3. To enable adding a Postgres connection for this tutorial, run:

```bash
$ pip install apache-airflow-providers-postgres
```

4. Now add both packages to `requirements.txt`:

```txt
apache-airflow-providers-openlineage
apache-airflow-providers-postgres
```

5. Create a database in your local Postgres instance and create an Airflow Postgres connection. For help, see: #add URL

6. Add a flaky DAG to Airflow that will _often_ create a table in the Postgres database:

```py
from __future__ import annotations
import time
import random

import pendulum
from airflow.decorators import dag, task
from airflow.operators.empty import EmptyOperator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator

SQL_1="""CREATE TABLE IF NOT EXISTS airflowsample (
col1 VARCHAR(255),
col2 VARCHAR(255)
)"""

SQL_2="""DROP TABLE airflowsample"""

Run a dag in Airflow. To verify that the OpenLineage Provider is configured correctly, check the task logs for an `INFO`-level log reporting the transport type you defined:
@dag(
schedule='@hourly',
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
dag_display_name="Flaky DAG",
)

![](airflow_task_logs.png)
def flaky_dag():

## View Airflow lineage in Marquez
sample_task_1 = EmptyOperator(
task_id="sample_task_1",
task_display_name="Sample Task 1",
)

@task(
task_display_name="Sample Task 2",
)
def sample_task_2():
pers = [0, 60, 120, 'fail']
per = random.choice(pers)
time.sleep(per)

sample_task_3 = SQLExecuteQueryOperator(
task_id="sample_task_3",
sql=SQL_1,
conn_id="postgres_default",
)

sample_task_4 = SQLExecuteQueryOperator(
task_id="sample_task_4",
sql=SQL_2,
conn_id="postgres_default",
)

sample_task_1 >> sample_task_2() >> sample_task_3 >> sample_task_4

flaky_dag()
```

7. Run your DAG. To verify that the OpenLineage Provider is configured correctly, check the task logs for an `INFO`-level log reporting the transport type you defined: `OpenLineageClient will use http transport`.

## View Airflow operational analytics and data lineage in Marquez {#view-airflow}

The DataOps view offers a high-level view of historical and in-process operations, including task-level run status and runtime information at a glance:

![](marquez_dataops.png)

In the Datasets view, you can click on a dataset to inspect a cross-platfrom-capable lineage graph. In this case, you can view the upstream tasks feeding the `airflowsample` table:

![](marquez_graph_wide.png)

Click on the dataset node for a more details, including the schema, the time of the most recent update, and any facets in the OpenLineage event payload:

![](marquez_dataset_drawer.png)

Click on the versions tab in the drawer for a versioned schema history:

![](marquez_dataset_versions.png)

![](marquez_jobs_drawer.png)

![](marquez_jobs_view.png)
Expand Down
Binary file modified docs/v2/docs/airflow_tutorial/marquez_dataops.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit e2d2eb6

Please sign in to comment.