diff --git a/.github/workflows/build-and-test.yml b/.github/workflows/build-and-test.yml
index 3eb34eca85a46f..8a000396f59358 100644
--- a/.github/workflows/build-and-test.yml
+++ b/.github/workflows/build-and-test.yml
@@ -91,6 +91,8 @@ jobs:
-x :metadata-ingestion-modules:airflow-plugin:check \
-x :metadata-ingestion-modules:dagster-plugin:build \
-x :metadata-ingestion-modules:dagster-plugin:check \
+ -x :metadata-ingestion-modules:prefect-plugin:build \
+ -x :metadata-ingestion-modules:prefect-plugin:check \
-x :metadata-ingestion-modules:gx-plugin:build \
-x :metadata-ingestion-modules:gx-plugin:check \
-x :datahub-frontend:build \
@@ -138,4 +140,4 @@ jobs:
uses: actions/upload-artifact@v3
with:
name: Event File
- path: ${{ github.event_path }}
+ path: ${{ github.event_path }}
\ No newline at end of file
diff --git a/.github/workflows/prefect-plugin.yml b/.github/workflows/prefect-plugin.yml
new file mode 100644
index 00000000000000..09af0ad3f354a3
--- /dev/null
+++ b/.github/workflows/prefect-plugin.yml
@@ -0,0 +1,86 @@
+name: Prefect Plugin
+on:
+ push:
+ branches:
+ - master
+ paths:
+ - ".github/workflows/prefect-plugin.yml"
+ - "metadata-ingestion-modules/prefect-plugin/**"
+ - "metadata-ingestion/**"
+ - "metadata-models/**"
+ pull_request:
+ branches:
+ - "**"
+ paths:
+ - ".github/workflows/prefect-plugin.yml"
+ - "metadata-ingestion-modules/prefect-plugin/**"
+ - "metadata-ingestion/**"
+ - "metadata-models/**"
+ release:
+ types: [published]
+
+concurrency:
+ group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
+ cancel-in-progress: true
+
+jobs:
+ prefect-plugin:
+ runs-on: ubuntu-latest
+ env:
+ SPARK_VERSION: 3.0.3
+ DATAHUB_TELEMETRY_ENABLED: false
+ strategy:
+ matrix:
+ python-version: ["3.8", "3.9", "3.10"]
+ include:
+ - python-version: "3.8"
+ - python-version: "3.9"
+ - python-version: "3.10"
+ fail-fast: false
+ steps:
+ - name: Set up JDK 17
+ uses: actions/setup-java@v3
+ with:
+ distribution: "zulu"
+ java-version: 17
+ - uses: gradle/gradle-build-action@v2
+ - uses: actions/checkout@v3
+ - uses: actions/setup-python@v4
+ with:
+ python-version: ${{ matrix.python-version }}
+ cache: "pip"
+ - name: Install dependencies
+ run: ./metadata-ingestion/scripts/install_deps.sh
+ - name: Install prefect package
+ run: ./gradlew :metadata-ingestion-modules:prefect-plugin:lint :metadata-ingestion-modules:prefect-plugin:testQuick
+ - name: pip freeze show list installed
+ if: always()
+ run: source metadata-ingestion-modules/prefect-plugin/venv/bin/activate && pip freeze
+ - uses: actions/upload-artifact@v3
+ if: ${{ always() && matrix.python-version == '3.10'}}
+ with:
+ name: Test Results (Prefect Plugin ${{ matrix.python-version}})
+ path: |
+ **/build/reports/tests/test/**
+ **/build/test-results/test/**
+ **/junit.*.xml
+ !**/binary/**
+ - name: Upload coverage to Codecov
+ if: always()
+ uses: codecov/codecov-action@v3
+ with:
+ token: ${{ secrets.CODECOV_TOKEN }}
+ directory: .
+ fail_ci_if_error: false
+ flags: prefect,prefect-${{ matrix.extra_pip_extras }}
+ name: pytest-prefect-${{ matrix.python-version }}
+ verbose: true
+
+ event-file:
+ runs-on: ubuntu-latest
+ steps:
+ - name: Upload
+ uses: actions/upload-artifact@v3
+ with:
+ name: Event File
+ path: ${{ github.event_path }}
diff --git a/.github/workflows/test-results.yml b/.github/workflows/test-results.yml
index 947fc35f169a04..23a80aab8082a5 100644
--- a/.github/workflows/test-results.yml
+++ b/.github/workflows/test-results.yml
@@ -2,7 +2,7 @@ name: Test Results
on:
workflow_run:
- workflows: ["build & test", "metadata ingestion", "Airflow Plugin", "Dagster Plugin", "GX Plugin"]
+ workflows: ["build & test", "metadata ingestion", "Airflow Plugin", "Dagster Plugin", "Prefect Plugin", "GX Plugin"]
types:
- completed
diff --git a/datahub-web-react/src/images/dagsterlogo.svg b/datahub-web-react/src/images/dagsterlogo.svg
new file mode 100644
index 00000000000000..d2ae628553a7dd
--- /dev/null
+++ b/datahub-web-react/src/images/dagsterlogo.svg
@@ -0,0 +1,11 @@
+
diff --git a/datahub-web-react/src/images/prefectlogo.svg b/datahub-web-react/src/images/prefectlogo.svg
new file mode 100644
index 00000000000000..54c4e7f553327b
--- /dev/null
+++ b/datahub-web-react/src/images/prefectlogo.svg
@@ -0,0 +1 @@
+
\ No newline at end of file
diff --git a/docs-website/build.gradle b/docs-website/build.gradle
index 803112bf857166..3b78804eafd9d2 100644
--- a/docs-website/build.gradle
+++ b/docs-website/build.gradle
@@ -86,6 +86,7 @@ task yarnGenerate(type: YarnTask, dependsOn: [yarnInstall,
':metadata-ingestion:buildWheel',
':metadata-ingestion-modules:airflow-plugin:buildWheel',
':metadata-ingestion-modules:dagster-plugin:buildWheel',
+ ':metadata-ingestion-modules:prefect-plugin:buildWheel',
':metadata-ingestion-modules:gx-plugin:buildWheel',
]) {
inputs.files(projectMdFiles)
diff --git a/docs-website/filterTagIndexes.json b/docs-website/filterTagIndexes.json
index e1e63ab5a9dbd4..2309593b2c3b9f 100644
--- a/docs-website/filterTagIndexes.json
+++ b/docs-website/filterTagIndexes.json
@@ -85,7 +85,7 @@
"tags": {
"Platform Type": "Orchestrator",
"Connection Type": "Pull",
- "Features": "Stateful Ingestion, UI Ingestion, Status Aspect"
+ "Features": "Status Aspect"
}
},
{
@@ -429,6 +429,17 @@
"Features": "Stateful Ingestion, Lower Casing, Status Aspect"
}
},
+ {
+ "Path": "docs/lineage/prefect",
+ "imgPath": "img/logos/platforms/prefect.svg",
+ "Title": "Prefect",
+ "Description": "Prefect is a modern workflow orchestration for data and ML engineers.",
+ "tags": {
+ "Platform Type": "Orchestrator",
+ "Connection Type": "Pull",
+ "Features": "Status Aspect"
+ }
+ },
{
"Path": "docs/generated/ingestion/sources/presto",
"imgPath": "img/logos/platforms/presto.svg",
diff --git a/docs-website/generateDocsDir.ts b/docs-website/generateDocsDir.ts
index ceac79bd5cad37..032e912c7190e1 100644
--- a/docs-website/generateDocsDir.ts
+++ b/docs-website/generateDocsDir.ts
@@ -573,6 +573,7 @@ function copy_python_wheels(): void {
"../metadata-ingestion/dist",
"../metadata-ingestion-modules/airflow-plugin/dist",
"../metadata-ingestion-modules/dagster-plugin/dist",
+ "../metadata-ingestion-modules/prefect-plugin/dist",
"../metadata-ingestion-modules/gx-plugin/dist",
];
diff --git a/docs-website/sidebars.js b/docs-website/sidebars.js
index 076bc5aa3bf188..20bed6099cdae3 100644
--- a/docs-website/sidebars.js
+++ b/docs-website/sidebars.js
@@ -444,6 +444,11 @@ module.exports = {
id: "docs/lineage/openlineage",
label: "OpenLineage",
},
+ {
+ type: "doc",
+ id: "docs/lineage/prefect",
+ label: "Prefect",
+ },
{
type: "doc",
id: "metadata-integration/java/acryl-spark-lineage/README",
@@ -917,6 +922,7 @@ module.exports = {
// "metadata-integration/java/openlineage-converter/README"
//"metadata-ingestion-modules/airflow-plugin/README"
//"metadata-ingestion-modules/dagster-plugin/README"
+ //"metadata-ingestion-modules/prefect-plugin/README"
//"metadata-ingestion-modules/gx-plugin/README"
// "metadata-ingestion/schedule_docs/datahub", // we can delete this
// TODO: change the titles of these, removing the "What is..." portion from the sidebar"
diff --git a/docs-website/src/pages/_components/Logos/index.js b/docs-website/src/pages/_components/Logos/index.js
index b17c072d02d575..a4ac46649ccf49 100644
--- a/docs-website/src/pages/_components/Logos/index.js
+++ b/docs-website/src/pages/_components/Logos/index.js
@@ -40,6 +40,7 @@ const platformLogos = [
name: "CouchBase",
imageUrl: "/img/logos/platforms/couchbase.svg",
},
+ { name: "Dagster", imageUrl: "/img/logos/platforms/dagster.png" },
{ name: "Databricks", imageUrl: "/img/logos/platforms/databricks.png" },
{ name: "DBT", imageUrl: "/img/logos/platforms/dbt.svg" },
{ name: "Deltalake", imageUrl: "/img/logos/platforms/deltalake.svg" },
@@ -87,6 +88,7 @@ const platformLogos = [
{ name: "Pinot", imageUrl: "/img/logos/platforms/pinot.svg" },
{ name: "PostgreSQL", imageUrl: "/img/logos/platforms/postgres.svg" },
{ name: "PowerBI", imageUrl: "/img/logos/platforms/powerbi.png" },
+ { name: "Prefect", imageUrl: "/img/logos/platforms/prefect.svg" },
{ name: "Presto", imageUrl: "/img/logos/platforms/presto.svg" },
{ name: "Protobuf", imageUrl: "/img/logos/platforms/protobuf.png" },
{ name: "Pulsar", imageUrl: "/img/logos/platforms/pulsar.png" },
diff --git a/docs-website/static/img/logos/platforms/prefect.svg b/docs-website/static/img/logos/platforms/prefect.svg
new file mode 100644
index 00000000000000..54c4e7f553327b
--- /dev/null
+++ b/docs-website/static/img/logos/platforms/prefect.svg
@@ -0,0 +1 @@
+
\ No newline at end of file
diff --git a/docs/lineage/prefect.md b/docs/lineage/prefect.md
new file mode 100644
index 00000000000000..538e50d979e01d
--- /dev/null
+++ b/docs/lineage/prefect.md
@@ -0,0 +1,137 @@
+# Prefect Integration with DataHub
+
+## Overview
+
+DataHub supports integration with Prefect, allowing you to ingest:
+
+- Prefect flow and task metadata
+- Flow run and Task run information
+- Lineage information (when available)
+
+This integration enables you to track and monitor your Prefect workflows within DataHub, providing a comprehensive view of your data pipeline activities.
+
+## Prefect DataHub Block
+
+### What is a Prefect DataHub Block?
+
+Blocks in Prefect are primitives that enable the storage of configuration and provide an interface for interacting with external systems. The `prefect-datahub` block uses the [DataHub REST](../../metadata-ingestion/sink_docs/datahub.md#datahub-rest) emitter to send metadata events while running Prefect flows.
+
+### Prerequisites
+
+1. Use either Prefect Cloud (recommended) or a self-hosted Prefect server.
+2. For Prefect Cloud setup, refer to the [Cloud Quickstart](https://docs.prefect.io/latest/getting-started/quickstart/) guide.
+3. For self-hosted Prefect server setup, refer to the [Host Prefect Server](https://docs.prefect.io/latest/guides/host/) guide.
+4. Ensure the Prefect API URL is set correctly. Verify using:
+
+ ```shell
+ prefect profile inspect
+ ```
+
+5. API URL format:
+ - Prefect Cloud: `https://api.prefect.cloud/api/accounts//workspaces/`
+ - Self-hosted: `http://:/api`
+
+## Setup Instructions
+
+### 1. Installation
+
+Install `prefect-datahub` using pip:
+
+```shell
+pip install 'prefect-datahub'
+```
+
+Note: Requires Python 3.7+
+
+### 2. Saving Configurations to a Block
+
+Save your configuration to the [Prefect block document store](https://docs.prefect.io/latest/concepts/blocks/#saving-blocks):
+
+```python
+from prefect_datahub.datahub_emitter import DatahubEmitter
+
+DatahubEmitter(
+ datahub_rest_url="http://localhost:8080",
+ env="PROD",
+ platform_instance="local_prefect"
+).save("MY-DATAHUB-BLOCK")
+```
+
+Configuration options:
+
+| Config | Type | Default | Description |
+|--------|------|---------|-------------|
+| datahub_rest_url | `str` | `http://localhost:8080` | DataHub GMS REST URL |
+| env | `str` | `PROD` | Environment for assets (see [FabricType](https://datahubproject.io/docs/graphql/enums/#fabrictype)) |
+| platform_instance | `str` | `None` | Platform instance for assets (see [Platform Instances](https://datahubproject.io/docs/platform-instances/)) |
+
+### 3. Using the Block in Prefect Workflows
+
+Load and use the saved block in your Prefect workflows:
+
+```python
+from prefect import flow, task
+from prefect_datahub.dataset import Dataset
+from prefect_datahub.datahub_emitter import DatahubEmitter
+
+datahub_emitter = DatahubEmitter.load("MY-DATAHUB-BLOCK")
+
+@task(name="Transform", description="Transform the data")
+def transform(data):
+ data = data.split(" ")
+ datahub_emitter.add_task(
+ inputs=[Dataset("snowflake", "mydb.schema.tableA")],
+ outputs=[Dataset("snowflake", "mydb.schema.tableC")],
+ )
+ return data
+
+@flow(name="ETL flow", description="Extract transform load flow")
+def etl():
+ data = transform("This is data")
+ datahub_emitter.emit_flow()
+```
+
+**Note**: To emit tasks, you must call `emit_flow()`. Otherwise, no metadata will be emitted.
+
+## Concept Mapping
+
+| Prefect Concept | DataHub Concept |
+|-----------------|-----------------|
+| [Flow](https://docs.prefect.io/latest/concepts/flows/) | [DataFlow](https://datahubproject.io/docs/generated/metamodel/entities/dataflow/) |
+| [Flow Run](https://docs.prefect.io/latest/concepts/flows/#flow-runs) | [DataProcessInstance](https://datahubproject.io/docs/generated/metamodel/entities/dataprocessinstance) |
+| [Task](https://docs.prefect.io/latest/concepts/tasks/) | [DataJob](https://datahubproject.io/docs/generated/metamodel/entities/datajob/) |
+| [Task Run](https://docs.prefect.io/latest/concepts/tasks/#tasks) | [DataProcessInstance](https://datahubproject.io/docs/generated/metamodel/entities/dataprocessinstance) |
+| [Task Tag](https://docs.prefect.io/latest/concepts/tasks/#tags) | [Tag](https://datahubproject.io/docs/generated/metamodel/entities/tag/) |
+
+## Validation and Troubleshooting
+
+### Validating the Setup
+
+1. Check the Prefect UI's Blocks menu for the DataHub emitter.
+2. Run a Prefect workflow and look for DataHub-related log messages:
+
+ ```text
+ Emitting flow to datahub...
+ Emitting tasks to datahub...
+ ```
+
+### Debugging Common Issues
+
+#### Incorrect Prefect API URL
+
+If the Prefect API URL is incorrect, set it manually:
+
+```shell
+prefect config set PREFECT_API_URL='http://127.0.0.1:4200/api'
+```
+
+#### DataHub Connection Error
+
+If you encounter a `ConnectionError: HTTPConnectionPool(host='localhost', port=8080)`, ensure that your DataHub GMS service is running.
+
+## Additional Resources
+
+- [Prefect Documentation](https://docs.prefect.io/)
+- [DataHub Documentation](https://datahubproject.io/docs/)
+
+For more information or support, please refer to the official Prefect and DataHub documentation or reach out to their respective communities.
diff --git a/metadata-ingestion-modules/prefect-plugin/.gitignore b/metadata-ingestion-modules/prefect-plugin/.gitignore
new file mode 100644
index 00000000000000..1d2916d00eabde
--- /dev/null
+++ b/metadata-ingestion-modules/prefect-plugin/.gitignore
@@ -0,0 +1,143 @@
+.envrc
+src/prefect_datahub/__init__.py.bak
+.vscode/
+output
+pvenv36/
+bq_credentials.json
+/tmp
+*.bak
+
+# Byte-compiled / optimized / DLL files
+__pycache__/
+*.py[cod]
+*$py.class
+
+# C extensions
+*.so
+
+# Distribution / packaging
+.Python
+build/
+develop-eggs/
+dist/
+downloads/
+eggs/
+.eggs/
+lib/
+lib64/
+parts/
+sdist/
+var/
+wheels/
+pip-wheel-metadata/
+share/python-wheels/
+*.egg-info/
+.installed.cfg
+*.egg
+MANIFEST
+
+# PyInstaller
+# Usually these files are written by a python script from a template
+# before PyInstaller builds the exe, so as to inject date/other infos into it.
+*.manifest
+*.spec
+
+# Installer logs
+pip-log.txt
+pip-delete-this-directory.txt
+
+# Unit test / coverage reports
+htmlcov/
+.tox/
+.nox/
+.coverage
+.coverage.*
+.cache
+nosetests.xml
+coverage.xml
+*.cover
+*.py,cover
+.hypothesis/
+.pytest_cache/
+
+# Translations
+*.mo
+*.pot
+
+# Django stuff:
+*.log
+local_settings.py
+db.sqlite3
+db.sqlite3-journal
+
+# Flask stuff:
+instance/
+.webassets-cache
+
+# Scrapy stuff:
+.scrapy
+
+# Sphinx documentation
+docs/_build/
+
+# PyBuilder
+target/
+
+# Jupyter Notebook
+.ipynb_checkpoints
+
+# IPython
+profile_default/
+ipython_config.py
+
+# pyenv
+.python-version
+
+# pipenv
+# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
+# However, in case of collaboration, if having platform-specific dependencies or dependencies
+# having no cross-platform support, pipenv may install dependencies that don't work, or not
+# install all needed dependencies.
+#Pipfile.lock
+
+# PEP 582; used by e.g. github.com/David-OConnor/pyflow
+__pypackages__/
+
+# Celery stuff
+celerybeat-schedule
+celerybeat.pid
+
+# SageMath parsed files
+*.sage.py
+
+# Environments
+.env
+.venv
+env/
+venv/
+ENV/
+env.bak/
+venv.bak/
+
+# Spyder project settings
+.spyderproject
+.spyproject
+
+# Rope project settings
+.ropeproject
+
+# mkdocs documentation
+/site
+
+# mypy
+.mypy_cache/
+.dmypy.json
+dmypy.json
+
+# Pyre type checker
+.pyre/
+
+# Generated classes
+src/datahub/metadata/
+wheels/
+junit.quick.xml
diff --git a/metadata-ingestion-modules/prefect-plugin/README.md b/metadata-ingestion-modules/prefect-plugin/README.md
new file mode 100644
index 00000000000000..607d93e460c630
--- /dev/null
+++ b/metadata-ingestion-modules/prefect-plugin/README.md
@@ -0,0 +1,132 @@
+# prefect-datahub
+
+Emit flows & tasks metadata to DataHub REST API with `prefect-datahub`
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+## Introduction
+
+The `prefect-datahub` collection allows you to easily integrate DataHub's metadata ingestion capabilities into your Prefect workflows. With this collection, you can emit metadata about your flows, tasks, and workspace to DataHub's metadata service.
+
+## Features
+
+- Seamless integration with Prefect workflows
+- Support for ingesting metadata of flows, tasks, and workspaces to DataHub GMS REST API
+- Easy configuration using Prefect blocks
+
+## Prerequisites
+
+- Python 3.7+
+- Prefect 2.0.0+
+- A running instance of DataHub
+
+## Installation
+
+Install `prefect-datahub` using pip:
+
+```bash
+pip install prefect-datahub
+```
+
+We recommend using a Python virtual environment manager such as pipenv, conda, or virtualenv.
+
+## Getting Started
+
+### 1. Set up DataHub
+
+Before using `prefect-datahub`, you need to deploy an instance of DataHub. Follow the instructions on the [DataHub Quickstart page](https://datahubproject.io/docs/quickstart) to set up DataHub.
+
+After successful deployment, the DataHub GMS service should be running on `http://localhost:8080` if deployed locally.
+
+### 2. Configure DataHub Emitter
+
+Save your DataHub configuration as a Prefect block:
+
+```python
+from prefect_datahub.datahub_emitter import DatahubEmitter
+
+datahub_emitter = DatahubEmitter(
+ datahub_rest_url="http://localhost:8080",
+ env="DEV",
+ platform_instance="local_prefect",
+ token=None, # generate auth token in the datahub and provide here if gms endpoint is secure
+)
+datahub_emitter.save("datahub-emitter-test")
+```
+
+Configuration options:
+
+| Config | Type | Default | Description |
+|--------|------|---------|-------------|
+| datahub_rest_url | `str` | `http://localhost:8080` | DataHub GMS REST URL |
+| env | `str` | `PROD` | Environment for assets (see [FabricType](https://datahubproject.io/docs/graphql/enums/#fabrictype)) |
+| platform_instance | `str` | `None` | Platform instance for assets (see [Platform Instances](https://datahubproject.io/docs/platform-instances/)) |
+
+### 3. Use DataHub Emitter in Your Workflows
+
+Here's an example of how to use the DataHub Emitter in a Prefect workflow:
+
+```python
+from prefect import flow, task
+from prefect_datahub.datahub_emitter import DatahubEmitter
+from prefect_datahub.entities import Dataset
+
+datahub_emitter_block = DatahubEmitter.load("datahub-emitter-test")
+
+@task(name="Extract", description="Extract the data")
+def extract():
+ return "This is data"
+
+@task(name="Transform", description="Transform the data")
+def transform(data, datahub_emitter):
+ transformed_data = data.split(" ")
+ datahub_emitter.add_task(
+ inputs=[Dataset("snowflake", "mydb.schema.tableX")],
+ outputs=[Dataset("snowflake", "mydb.schema.tableY")],
+ )
+ return transformed_data
+
+@flow(name="ETL", description="Extract transform load flow")
+def etl():
+ datahub_emitter = datahub_emitter_block
+ data = extract()
+ transformed_data = transform(data, datahub_emitter)
+ datahub_emitter.emit_flow()
+
+if __name__ == "__main__":
+ etl()
+```
+
+**Note**: To emit task metadata, you must call `emit_flow()` at the end of your flow. Otherwise, no metadata will be emitted.
+
+## Advanced Usage
+
+For more advanced usage and configuration options, please refer to the [prefect-datahub documentation](https://datahubproject.io/docs/lineage/prefect/).
+
+## Contributing
+
+We welcome contributions to `prefect-datahub`! Please refer to our [Contributing Guidelines](https://datahubproject.io/docs/contributing) for more information on how to get started.
+
+## Support
+
+If you encounter any issues or have questions, you can:
+
+- Open an issue in the [DataHub GitHub repository](https://github.com/datahub-project/datahub/issues)
+- Join the [DataHub Slack community](https://datahubspace.slack.com)
+- Seek help in the [Prefect Slack community](https://prefect.io/slack)
+
+## License
+
+`prefect-datahub` is released under the Apache 2.0 license. See the [LICENSE](https://github.com/datahub-project/datahub/blob/master/LICENSE) file for details.
\ No newline at end of file
diff --git a/metadata-ingestion-modules/prefect-plugin/build.gradle b/metadata-ingestion-modules/prefect-plugin/build.gradle
new file mode 100644
index 00000000000000..b078b8d8de3b37
--- /dev/null
+++ b/metadata-ingestion-modules/prefect-plugin/build.gradle
@@ -0,0 +1,127 @@
+plugins {
+ id 'base'
+}
+
+ext {
+ python_executable = 'python3'
+ venv_name = 'venv'
+}
+
+if (!project.hasProperty("extra_pip_requirements")) {
+ ext.extra_pip_requirements = ""
+}
+
+def pip_install_command = "VIRTUAL_ENV=${venv_name} ${venv_name}/bin/uv pip install -e ../../metadata-ingestion"
+
+task checkPythonVersion(type: Exec) {
+ commandLine python_executable, '-c', 'import sys; assert sys.version_info >= (3, 7)'
+}
+
+task environmentSetup(type: Exec, dependsOn: checkPythonVersion) {
+ def sentinel_file = "${venv_name}/.venv_environment_sentinel"
+ inputs.file file('setup.py')
+ outputs.file(sentinel_file)
+ commandLine 'bash', '-c',
+ "${python_executable} -m venv ${venv_name} && " +
+ "${venv_name}/bin/python -m pip install --upgrade pip uv wheel 'setuptools>=63.0.0' && " +
+ "touch ${sentinel_file}"
+}
+
+task installPackage(type: Exec, dependsOn: [environmentSetup, ':metadata-ingestion:codegen']) {
+ def sentinel_file = "${venv_name}/.build_install_package_sentinel"
+ inputs.file file('setup.py')
+ outputs.file(sentinel_file)
+ commandLine 'bash', '-x', '-c',
+ "source ${venv_name}/bin/activate && set -x && " +
+ "${pip_install_command} -e . ${extra_pip_requirements} &&" +
+ "touch ${sentinel_file}"
+}
+
+task install(dependsOn: [installPackage])
+
+task installDev(type: Exec, dependsOn: [install]) {
+ def sentinel_file = "${venv_name}/.build_install_dev_sentinel"
+ inputs.file file('setup.py')
+ outputs.file("${sentinel_file}")
+ commandLine 'bash', '-x', '-c',
+ "source ${venv_name}/bin/activate && set -x && " +
+ "${pip_install_command} -e .[dev] ${extra_pip_requirements} && " +
+ "touch ${sentinel_file}"
+}
+
+task lint(type: Exec, dependsOn: installDev) {
+ commandLine 'bash', '-c',
+ "source ${venv_name}/bin/activate && set -x && " +
+ "black --check --diff src/ tests/ && " +
+ "isort --check --diff src/ tests/ && " +
+ "flake8 --count --statistics src/ tests/ && " +
+ "mypy --show-traceback --show-error-codes src/ tests/"
+}
+task lintFix(type: Exec, dependsOn: installDev) {
+ commandLine 'bash', '-x', '-c',
+ "source ${venv_name}/bin/activate && " +
+ "black src/ tests/ && " +
+ "isort src/ tests/ && " +
+ "flake8 src/ tests/ && " +
+ "mypy src/ tests/ "
+}
+
+task installDevTest(type: Exec, dependsOn: [installDev]) {
+ def sentinel_file = "${venv_name}/.build_install_dev_test_sentinel"
+ inputs.file file('setup.py')
+ outputs.dir("${venv_name}")
+ outputs.file("${sentinel_file}")
+ commandLine 'bash', '-x', '-c',
+ "${pip_install_command} -e .[dev,integration-tests] && touch ${sentinel_file}"
+}
+
+def testFile = hasProperty('testFile') ? testFile : 'unknown'
+task testSingle(dependsOn: [installDevTest]) {
+ doLast {
+ if (testFile != 'unknown') {
+ exec {
+ commandLine 'bash', '-x', '-c',
+ "source ${venv_name}/bin/activate && pytest ${testFile}"
+ }
+ } else {
+ throw new GradleException("No file provided. Use -PtestFile=")
+ }
+ }
+}
+
+task testQuick(type: Exec, dependsOn: installDevTest) {
+ // We can't enforce the coverage requirements if we run a subset of the tests.
+ inputs.files(project.fileTree(dir: "src/", include: "**/*.py"))
+ inputs.files(project.fileTree(dir: "tests/"))
+ outputs.dir("${venv_name}")
+ commandLine 'bash', '-x', '-c',
+ "source ${venv_name}/bin/activate && pytest --cov-config=setup.cfg --cov-report xml:coverage_quick.xml -vv --continue-on-collection-errors --junit-xml=junit.quick.xml -s"
+}
+
+
+task testFull(type: Exec, dependsOn: [testQuick, installDevTest]) {
+ commandLine 'bash', '-x', '-c',
+ "source ${venv_name}/bin/activate && pytest -m 'not slow_integration' -vv --continue-on-collection-errors --junit-xml=junit.full.xml"
+}
+
+
+task buildWheel(type: Exec, dependsOn: [environmentSetup]) {
+ commandLine 'bash', '-c', "source ${venv_name}/bin/activate && " +
+ 'uv pip install build && RELEASE_VERSION="\${RELEASE_VERSION:-0.0.0.dev1}" RELEASE_SKIP_INSTALL=1 RELEASE_SKIP_UPLOAD=1 ./scripts/release.sh'
+}
+
+task cleanPythonCache(type: Exec) {
+ commandLine 'bash', '-c',
+ "find src -type f -name '*.py[co]' -delete -o -type d -name __pycache__ -delete -o -type d -empty -delete"
+}
+
+build.dependsOn install
+check.dependsOn lint
+check.dependsOn testQuick
+
+clean {
+ delete venv_name
+ delete 'build'
+ delete 'dist'
+}
+clean.dependsOn cleanPythonCache
diff --git a/metadata-ingestion-modules/prefect-plugin/pyproject.toml b/metadata-ingestion-modules/prefect-plugin/pyproject.toml
new file mode 100644
index 00000000000000..fba81486b9f677
--- /dev/null
+++ b/metadata-ingestion-modules/prefect-plugin/pyproject.toml
@@ -0,0 +1,19 @@
+[build-system]
+build-backend = "setuptools.build_meta"
+requires = ["setuptools>=54.0.0", "wheel", "pip>=21.0.0"]
+
+[tool.black]
+extend-exclude = '''
+# A regex preceded with ^/ will apply only to files and directories
+# in the root of the project.
+^/tmp
+'''
+include = '\.pyi?$'
+
+[tool.isort]
+indent = ' '
+profile = 'black'
+sections = 'FUTURE,STDLIB,THIRDPARTY,FIRSTPARTY,LOCALFOLDER'
+
+[tool.pyright]
+extraPaths = ['tests']
\ No newline at end of file
diff --git a/metadata-ingestion-modules/prefect-plugin/scripts/release.sh b/metadata-ingestion-modules/prefect-plugin/scripts/release.sh
new file mode 100755
index 00000000000000..f398db98b60290
--- /dev/null
+++ b/metadata-ingestion-modules/prefect-plugin/scripts/release.sh
@@ -0,0 +1,26 @@
+#!/bin/bash
+set -euxo pipefail
+
+if [[ ! ${RELEASE_SKIP_TEST:-} ]] && [[ ! ${RELEASE_SKIP_INSTALL:-} ]]; then
+ ../../gradlew build # also runs tests
+elif [[ ! ${RELEASE_SKIP_INSTALL:-} ]]; then
+ ../../gradlew install
+fi
+
+MODULE=prefect_datahub
+
+# Check packaging constraint.
+python -c 'import setuptools; where="./src"; assert setuptools.find_packages(where) == setuptools.find_namespace_packages(where), "you seem to be missing or have extra __init__.py files"'
+if [[ ${RELEASE_VERSION:-} ]]; then
+ # Replace version with RELEASE_VERSION env variable
+ sed -i.bak "s/__version__ = \"1\!0.0.0.dev0\"/__version__ = \"$(echo $RELEASE_VERSION|sed s/-/+/)\"/" src/${MODULE}/__init__.py
+else
+ vim src/${MODULE}/__init__.py
+fi
+
+rm -rf build dist || true
+python -m build
+if [[ ! ${RELEASE_SKIP_UPLOAD:-} ]]; then
+ python -m twine upload 'dist/*'
+fi
+mv src/${MODULE}/__init__.py.bak src/${MODULE}/__init__.py
\ No newline at end of file
diff --git a/metadata-ingestion-modules/prefect-plugin/setup.cfg b/metadata-ingestion-modules/prefect-plugin/setup.cfg
new file mode 100644
index 00000000000000..c59a99fa8aec0a
--- /dev/null
+++ b/metadata-ingestion-modules/prefect-plugin/setup.cfg
@@ -0,0 +1,74 @@
+[flake8]
+max-complexity = 15
+ignore =
+ # Ignore: line length issues, since black's formatter will take care of them.
+ E501,
+ # Ignore: 1 blank line required before class docstring.
+ D203,
+ # See https://stackoverflow.com/a/57074416.
+ W503,
+ # See https://github.com/psf/black/issues/315.
+ E203
+exclude =
+ .git,
+ venv,
+ .tox,
+ __pycache__
+per-file-ignores =
+ # imported but unused
+ __init__.py: F401
+ban-relative-imports = true
+
+[mypy]
+plugins =
+ sqlmypy,
+ pydantic.mypy
+exclude = ^(venv|build|dist)/
+ignore_missing_imports = yes
+strict_optional = yes
+check_untyped_defs = yes
+disallow_incomplete_defs = yes
+disallow_untyped_decorators = yes
+warn_unused_configs = yes
+# eventually we'd like to enable these
+disallow_untyped_defs = no
+
+# try to be a bit more strict in certain areas of the codebase
+[mypy-datahub.*]
+ignore_missing_imports = no
+[mypy-tests.*]
+ignore_missing_imports = no
+
+[tool:pytest]
+asyncio_mode = auto
+addopts = --cov=src --cov-report term-missing --cov-config setup.cfg --strict-markers
+
+testpaths =
+ tests/unit
+ tests/integration
+
+[coverage:run]
+# Because of some quirks in the way setup.cfg, coverage.py, pytest-cov,
+# and tox interact, we should not uncomment the following line.
+# See https://pytest-cov.readthedocs.io/en/latest/config.html and
+# https://coverage.readthedocs.io/en/coverage-5.0/config.html.
+# We also have some additional pytest/cov config options in tox.ini.
+# source = src
+
+[coverage:paths]
+# This is necessary for tox-based coverage to be counted properly.
+source =
+ src
+ */site-packages
+
+[coverage:report]
+# The fail_under value ensures that at least some coverage data is collected.
+# We override its value in the tox config.
+show_missing = true
+exclude_lines =
+ pragma: no cover
+ @abstract
+ if TYPE_CHECKING:
+omit =
+ # omit example jobs
+ src/prefect_datahub/example/*
diff --git a/metadata-ingestion-modules/prefect-plugin/setup.py b/metadata-ingestion-modules/prefect-plugin/setup.py
new file mode 100644
index 00000000000000..746d786f10cbc0
--- /dev/null
+++ b/metadata-ingestion-modules/prefect-plugin/setup.py
@@ -0,0 +1,130 @@
+import os
+import pathlib
+
+import setuptools
+
+package_metadata: dict = {}
+with open("./src/prefect_datahub/__init__.py") as fp:
+ exec(fp.read(), package_metadata)
+
+
+def get_long_description():
+ root = os.path.dirname(__file__)
+ return pathlib.Path(os.path.join(root, "README.md")).read_text()
+
+_version: str = package_metadata["__version__"]
+_self_pin = (
+ f"=={_version}"
+ if not (_version.endswith(("dev0", "dev1")) or "docker" in _version)
+ else ""
+)
+
+
+rest_common = {"requests", "requests_file"}
+
+base_requirements = {
+ # For python 3.7 and importlib-metadata>=5.0.0, build failed with attribute error
+ "importlib-metadata>=4.4.0,<5.0.0; python_version < '3.8'",
+ # Actual dependencies.
+ "prefect >= 2.0.0",
+ *rest_common,
+ # Ignoring the dependency below because it causes issues with the vercel built wheel install
+ # f"acryl-datahub[datahub-rest]{_self_pin}",
+ "acryl-datahub[datahub-rest]",
+}
+
+
+mypy_stubs = {
+ "types-dataclasses",
+ "sqlalchemy-stubs",
+ "types-setuptools",
+ "types-six",
+ "types-python-dateutil",
+ "types-requests",
+ "types-toml",
+ "types-PyYAML",
+ "types-freezegun",
+ "types-cachetools",
+ # versions 0.1.13 and 0.1.14 seem to have issues
+ "types-click==0.1.12",
+ "types-tabulate",
+ # avrogen package requires this
+ "types-pytz",
+}
+
+dev_requirements = {
+ *base_requirements,
+ *mypy_stubs,
+ "black==22.12.0",
+ "coverage>=5.1",
+ "flake8>=3.8.3",
+ "flake8-tidy-imports>=4.3.0",
+ "isort>=5.7.0",
+ "mypy>=1.4.0",
+ # pydantic 1.8.2 is incompatible with mypy 0.910.
+ # See https://github.com/samuelcolvin/pydantic/pull/3175#issuecomment-995382910.
+ "pydantic>=1.10",
+ "pytest>=6.2.2",
+ "pytest-asyncio>=0.16.0",
+ "pytest-cov>=2.8.1",
+ "tox",
+ "deepdiff",
+ "requests-mock",
+ "freezegun",
+ "jsonpickle",
+ "build",
+ "twine",
+ "packaging",
+}
+
+entry_points = {
+ "prefect.block": "prefect-datahub = prefect_datahub.prefect_datahub:DatahubEmitter"
+}
+
+
+setuptools.setup(
+ # Package metadata.
+ name=package_metadata["__package_name__"],
+ version=package_metadata["__version__"],
+ url="https://datahubproject.io/",
+ project_urls={
+ "Documentation": "https://datahubproject.io/docs/",
+ "Source": "https://github.com/datahub-project/datahub",
+ "Changelog": "https://github.com/datahub-project/datahub/releases",
+ },
+ license="Apache License 2.0",
+ description="Datahub prefect block to capture executions and send to Datahub",
+ long_description=get_long_description(),
+ long_description_content_type="text/markdown",
+ classifiers=[
+ "Development Status :: 5 - Production/Stable",
+ "Programming Language :: Python",
+ "Programming Language :: Python :: 3",
+ "Programming Language :: Python :: 3 :: Only",
+ "Programming Language :: Python :: 3.7",
+ "Programming Language :: Python :: 3.8",
+ "Programming Language :: Python :: 3.9",
+ "Programming Language :: Python :: 3.10",
+ "Intended Audience :: Developers",
+ "Intended Audience :: Information Technology",
+ "Intended Audience :: System Administrators",
+ "License :: OSI Approved",
+ "License :: OSI Approved :: Apache Software License",
+ "Operating System :: Unix",
+ "Operating System :: POSIX :: Linux",
+ "Environment :: Console",
+ "Environment :: MacOS X",
+ "Topic :: Software Development",
+ ],
+ # Package info.
+ zip_safe=False,
+ python_requires=">=3.7",
+ package_dir={"": "src"},
+ packages=setuptools.find_namespace_packages(where="./src"),
+ entry_points=entry_points,
+ # Dependencies.
+ install_requires=list(base_requirements),
+ extras_require={
+ "dev": list(dev_requirements),
+ },
+)
diff --git a/metadata-ingestion-modules/prefect-plugin/src/prefect_datahub/__init__.py b/metadata-ingestion-modules/prefect-plugin/src/prefect_datahub/__init__.py
new file mode 100644
index 00000000000000..8cc65f9010613d
--- /dev/null
+++ b/metadata-ingestion-modules/prefect-plugin/src/prefect_datahub/__init__.py
@@ -0,0 +1,21 @@
+# Published at https://pypi.org/project/acryl-datahub/.
+__package_name__ = "prefect-datahub"
+__version__ = "1!0.0.0.dev0"
+
+
+def is_dev_mode() -> bool:
+ return __version__.endswith("dev0")
+
+
+def nice_version_name() -> str:
+ if is_dev_mode():
+ return "unavailable (installed in develop mode)"
+ return __version__
+
+
+def get_provider_info():
+ return {
+ "package-name": f"{__package_name__}",
+ "name": f"{__package_name__}",
+ "description": "Datahub prefect block to capture executions and send to Datahub",
+ }
diff --git a/metadata-ingestion-modules/prefect-plugin/src/prefect_datahub/datahub_emitter.py b/metadata-ingestion-modules/prefect-plugin/src/prefect_datahub/datahub_emitter.py
new file mode 100644
index 00000000000000..5991503416aec7
--- /dev/null
+++ b/metadata-ingestion-modules/prefect-plugin/src/prefect_datahub/datahub_emitter.py
@@ -0,0 +1,659 @@
+"""Datahub Emitter classes used to emit prefect metadata to Datahub REST."""
+
+import asyncio
+import traceback
+from typing import Any, Dict, List, Optional, cast
+from uuid import UUID
+
+import datahub.emitter.mce_builder as builder
+from datahub.api.entities.datajob import DataFlow, DataJob
+from datahub.api.entities.dataprocess.dataprocess_instance import (
+ DataProcessInstance,
+ InstanceRunResult,
+)
+from datahub.emitter.mcp import MetadataChangeProposalWrapper
+from datahub.emitter.rest_emitter import DatahubRestEmitter
+from datahub.metadata.schema_classes import BrowsePathsClass
+from datahub.utilities.urns.data_flow_urn import DataFlowUrn
+from datahub.utilities.urns.data_job_urn import DataJobUrn
+from datahub.utilities.urns.dataset_urn import DatasetUrn
+from prefect import get_run_logger
+from prefect.blocks.core import Block
+from prefect.client import cloud, orchestration
+from prefect.client.schemas import FlowRun, TaskRun, Workspace
+from prefect.client.schemas.objects import Flow
+from prefect.context import FlowRunContext, TaskRunContext
+from prefect.settings import PREFECT_API_URL
+from pydantic.v1 import SecretStr
+
+from prefect_datahub.entities import _Entity
+
+ORCHESTRATOR = "prefect"
+
+# Flow and task common constants
+VERSION = "version"
+RETRIES = "retries"
+TIMEOUT_SECONDS = "timeout_seconds"
+LOG_PRINTS = "log_prints"
+ON_COMPLETION = "on_completion"
+ON_FAILURE = "on_failure"
+
+# Flow constants
+FLOW_RUN_NAME = "flow_run_name"
+TASK_RUNNER = "task_runner"
+PERSIST_RESULT = "persist_result"
+ON_CANCELLATION = "on_cancellation"
+ON_CRASHED = "on_crashed"
+
+# Task constants
+CACHE_EXPIRATION = "cache_expiration"
+TASK_RUN_NAME = "task_run_name"
+REFRESH_CACHE = "refresh_cache"
+TASK_KEY = "task_key"
+
+# Flow run and task run common constants
+ID = "id"
+CREATED = "created"
+UPDATED = "updated"
+TAGS = "tags"
+ESTIMATED_RUN_TIME = "estimated_run_time"
+START_TIME = "start_time"
+END_TIME = "end_time"
+TOTAL_RUN_TIME = "total_run_time"
+NEXT_SCHEDULED_START_TIME = "next_scheduled_start_time"
+
+# Fask run constants
+CREATED_BY = "created_by"
+AUTO_SCHEDULED = "auto_scheduled"
+
+# Task run constants
+FLOW_RUN_ID = "flow_run_id"
+RUN_COUNT = "run_count"
+UPSTREAM_DEPENDENCIES = "upstream_dependencies"
+
+# States constants
+COMPLETE = "Completed"
+FAILED = "Failed"
+CANCELLED = "Cancelled"
+
+
+class DatahubEmitter(Block):
+ """
+ Block used to emit prefect task and flow related metadata to Datahub REST
+ """
+
+ _block_type_name: Optional[str] = "datahub emitter"
+
+ datahub_rest_url: str = "http://localhost:8080"
+ env: str = builder.DEFAULT_ENV
+ platform_instance: Optional[str] = None
+ token: Optional[SecretStr] = None
+ _datajobs_to_emit: Dict[str, Any] = {}
+
+ def __init__(self, *args: Any, **kwargs: Any):
+ """
+ Initialize datahub rest emitter
+ """
+ super().__init__(*args, **kwargs)
+ # self._datajobs_to_emit: Dict[str, _Entity] = {}
+
+ token = self.token.get_secret_value() if self.token is not None else None
+ self.emitter = DatahubRestEmitter(gms_server=self.datahub_rest_url, token=token)
+ self.emitter.test_connection()
+
+ def _entities_to_urn_list(self, iolets: List[_Entity]) -> List[DatasetUrn]:
+ """
+ Convert list of _entity to list of dataser urn
+
+ Args:
+ iolets (list[_Entity]): The list of entities.
+
+ Returns:
+ The list of Dataset URN.
+ """
+ return [DatasetUrn.create_from_string(let.urn) for let in iolets]
+
+ def _get_workspace(self) -> Optional[str]:
+ """
+ Fetch workspace name if present in configured prefect api url.
+
+ Returns:
+ The workspace name.
+ """
+ try:
+ asyncio.run(cloud.get_cloud_client().api_healthcheck())
+ except Exception:
+ get_run_logger().debug(traceback.format_exc())
+ return None
+
+ if "workspaces" not in PREFECT_API_URL.value():
+ get_run_logger().debug(
+ "Cannot fetch workspace name. Please login to prefect cloud using "
+ "command 'prefect cloud login'."
+ )
+ return None
+
+ current_workspace_id = PREFECT_API_URL.value().split("/")[-1]
+ workspaces: List[Workspace] = asyncio.run(
+ cloud.get_cloud_client().read_workspaces()
+ )
+
+ for workspace in workspaces:
+ if str(workspace.workspace_id) == current_workspace_id:
+ return workspace.workspace_name
+
+ return None
+
+ async def _get_flow_run_graph(self, flow_run_id: str) -> Optional[List[Dict]]:
+ """
+ Fetch the flow run graph for provided flow run id
+
+ Args:
+ flow_run_id (str): The flow run id.
+
+ Returns:
+ The flow run graph in json format.
+ """
+ try:
+ response = orchestration.get_client()._client.get(
+ f"/flow_runs/{flow_run_id}/graph"
+ )
+
+ if asyncio.iscoroutine(response):
+ response = await response
+
+ if hasattr(response, "json"):
+ response_json = response.json()
+ else:
+ raise ValueError("Response object does not have a 'json' method")
+ except Exception:
+ get_run_logger().debug(traceback.format_exc())
+ return None
+ return response_json
+
+ def _emit_browsepath(self, urn: str, workspace_name: str) -> None:
+ """
+ Emit browsepath for provided urn. Set path as orchestrator/env/workspace_name.
+
+ Args:
+ urn (str): The entity URN
+ workspace_name (str): The prefect cloud workspace name
+ """
+ mcp = MetadataChangeProposalWrapper(
+ entityUrn=urn,
+ aspect=BrowsePathsClass(
+ paths=[f"/{ORCHESTRATOR}/{self.env}/{workspace_name}"]
+ ),
+ )
+ self.emitter.emit(mcp)
+
+ def _generate_datajob(
+ self,
+ flow_run_ctx: FlowRunContext,
+ task_run_ctx: Optional[TaskRunContext] = None,
+ task_key: Optional[str] = None,
+ ) -> Optional[DataJob]:
+ """
+ Create datajob entity using task run ctx and flow run ctx.
+ Assign description, tags, and properties to created datajob.
+
+ Args:
+ flow_run_ctx (FlowRunContext): The prefect current running flow run context.
+ task_run_ctx (Optional[TaskRunContext]): The prefect current running task \
+ run context.
+ task_key (Optional[str]): The task key.
+
+ Returns:
+ The datajob entity.
+ """
+ assert flow_run_ctx.flow
+
+ dataflow_urn = DataFlowUrn.create_from_ids(
+ orchestrator=ORCHESTRATOR,
+ flow_id=flow_run_ctx.flow.name,
+ env=self.env,
+ platform_instance=self.platform_instance,
+ )
+
+ if task_run_ctx is not None:
+ datajob = DataJob(
+ id=task_run_ctx.task.task_key,
+ flow_urn=dataflow_urn,
+ name=task_run_ctx.task.name,
+ )
+
+ datajob.description = task_run_ctx.task.description
+ datajob.tags = task_run_ctx.task.tags
+ job_property_bag: Dict[str, str] = {}
+
+ allowed_task_keys = [
+ VERSION,
+ CACHE_EXPIRATION,
+ TASK_RUN_NAME,
+ RETRIES,
+ TIMEOUT_SECONDS,
+ LOG_PRINTS,
+ REFRESH_CACHE,
+ TASK_KEY,
+ ON_COMPLETION,
+ ON_FAILURE,
+ ]
+ for key in allowed_task_keys:
+ if (
+ hasattr(task_run_ctx.task, key)
+ and getattr(task_run_ctx.task, key) is not None
+ ):
+ job_property_bag[key] = repr(getattr(task_run_ctx.task, key))
+ datajob.properties = job_property_bag
+ return datajob
+ elif task_key is not None:
+ datajob = DataJob(
+ id=task_key, flow_urn=dataflow_urn, name=task_key.split(".")[-1]
+ )
+ return datajob
+ return None
+
+ def _generate_dataflow(self, flow_run_ctx: FlowRunContext) -> Optional[DataFlow]:
+ """
+ Create dataflow entity using flow run ctx.
+ Assign description, tags, and properties to created dataflow.
+
+ Args:
+ flow_run_ctx (FlowRunContext): The prefect current running flow run context.
+
+ Returns:
+ The dataflow entity.
+ """
+
+ async def get_flow(flow_id: UUID) -> Flow:
+ client = orchestration.get_client()
+ if not hasattr(client, "read_flow"):
+ raise ValueError("Client does not support async read_flow method")
+ return await client.read_flow(flow_id=flow_id)
+
+ assert flow_run_ctx.flow
+ assert flow_run_ctx.flow_run
+
+ try:
+ flow: Flow = asyncio.run(get_flow(flow_run_ctx.flow_run.flow_id))
+ except Exception:
+ get_run_logger().debug(traceback.format_exc())
+ return None
+
+ assert flow
+
+ dataflow = DataFlow(
+ orchestrator=ORCHESTRATOR,
+ id=flow_run_ctx.flow.name,
+ env=self.env,
+ name=flow_run_ctx.flow.name,
+ platform_instance=self.platform_instance,
+ )
+
+ dataflow.description = flow_run_ctx.flow.description
+ dataflow.tags = set(flow.tags)
+
+ flow_property_bag: Dict[str, str] = {}
+ flow_property_bag[ID] = str(flow.id)
+ flow_property_bag[CREATED] = str(flow.created)
+ flow_property_bag[UPDATED] = str(flow.updated)
+
+ allowed_flow_keys = [
+ VERSION,
+ FLOW_RUN_NAME,
+ RETRIES,
+ TASK_RUNNER,
+ TIMEOUT_SECONDS,
+ PERSIST_RESULT,
+ LOG_PRINTS,
+ ON_COMPLETION,
+ ON_FAILURE,
+ ON_CANCELLATION,
+ ON_CRASHED,
+ ]
+
+ for key in allowed_flow_keys:
+ if (
+ hasattr(flow_run_ctx.flow, key)
+ and getattr(flow_run_ctx.flow, key) is not None
+ ):
+ flow_property_bag[key] = repr(getattr(flow_run_ctx.flow, key))
+
+ dataflow.properties = flow_property_bag
+
+ return dataflow
+
+ def _emit_tasks(
+ self,
+ flow_run_ctx: FlowRunContext,
+ dataflow: DataFlow,
+ workspace_name: Optional[str] = None,
+ ) -> None:
+ """
+ Emit prefect tasks metadata to datahub rest. Add upstream dependencies if
+ present for each task.
+
+ Args:
+ flow_run_ctx (FlowRunContext): The prefect current running flow run context
+ dataflow (DataFlow): The datahub dataflow entity.
+ workspace_name Optional(str): The prefect cloud workpace name.
+ """
+ try:
+ assert flow_run_ctx.flow_run
+
+ graph_json = asyncio.run(
+ self._get_flow_run_graph(str(flow_run_ctx.flow_run.id))
+ )
+
+ if graph_json is None:
+ return
+
+ task_run_key_map: Dict[str, str] = {}
+
+ for prefect_future in flow_run_ctx.task_run_futures:
+ if prefect_future.task_run is not None:
+ task_run_key_map[
+ str(prefect_future.task_run.id)
+ ] = prefect_future.task_run.task_key
+
+ for node in graph_json:
+ datajob_urn = DataJobUrn.create_from_ids(
+ data_flow_urn=str(dataflow.urn),
+ job_id=task_run_key_map[node[ID]],
+ )
+
+ datajob: Optional[DataJob] = None
+
+ if str(datajob_urn) in self._datajobs_to_emit:
+ datajob = cast(DataJob, self._datajobs_to_emit[str(datajob_urn)])
+ else:
+ datajob = self._generate_datajob(
+ flow_run_ctx=flow_run_ctx, task_key=task_run_key_map[node[ID]]
+ )
+
+ if datajob is not None:
+ for each in node[UPSTREAM_DEPENDENCIES]:
+ upstream_task_urn = DataJobUrn.create_from_ids(
+ data_flow_urn=str(dataflow.urn),
+ job_id=task_run_key_map[each[ID]],
+ )
+ datajob.upstream_urns.extend([upstream_task_urn])
+
+ datajob.emit(self.emitter)
+
+ if workspace_name is not None:
+ self._emit_browsepath(str(datajob.urn), workspace_name)
+
+ self._emit_task_run(
+ datajob=datajob,
+ flow_run_name=flow_run_ctx.flow_run.name,
+ task_run_id=UUID(node[ID]),
+ )
+ except Exception:
+ get_run_logger().debug(traceback.format_exc())
+
+ def _emit_flow_run(self, dataflow: DataFlow, flow_run_id: UUID) -> None:
+ """
+ Emit prefect flow run to datahub rest. Prefect flow run get mapped with datahub
+ data process instance entity which get's generate from provided dataflow entity.
+ Assign flow run properties to data process instance properties.
+
+ Args:
+ dataflow (DataFlow): The datahub dataflow entity used to create \
+ data process instance.
+ flow_run_id (UUID): The prefect current running flow run id.
+ """
+
+ async def get_flow_run(flow_run_id: UUID) -> FlowRun:
+ client = orchestration.get_client()
+
+ if not hasattr(client, "read_flow_run"):
+ raise ValueError("Client does not support async read_flow_run method")
+
+ response = client.read_flow_run(flow_run_id=flow_run_id)
+
+ if asyncio.iscoroutine(response):
+ response = await response
+
+ return FlowRun.parse_obj(response)
+
+ flow_run: FlowRun = asyncio.run(get_flow_run(flow_run_id))
+
+ assert flow_run
+
+ if self.platform_instance is not None:
+ dpi_id = f"{self.platform_instance}.{flow_run.name}"
+ else:
+ dpi_id = flow_run.name
+
+ dpi = DataProcessInstance.from_dataflow(dataflow=dataflow, id=dpi_id)
+
+ dpi_property_bag: Dict[str, str] = {}
+
+ allowed_flow_run_keys = [
+ ID,
+ CREATED,
+ UPDATED,
+ CREATED_BY,
+ AUTO_SCHEDULED,
+ ESTIMATED_RUN_TIME,
+ START_TIME,
+ TOTAL_RUN_TIME,
+ NEXT_SCHEDULED_START_TIME,
+ TAGS,
+ RUN_COUNT,
+ ]
+
+ for key in allowed_flow_run_keys:
+ if hasattr(flow_run, key) and getattr(flow_run, key) is not None:
+ dpi_property_bag[key] = str(getattr(flow_run, key))
+
+ dpi.properties.update(dpi_property_bag)
+
+ if flow_run.start_time is not None:
+ dpi.emit_process_start(
+ emitter=self.emitter,
+ start_timestamp_millis=int(flow_run.start_time.timestamp() * 1000),
+ )
+
+ def _emit_task_run(
+ self, datajob: DataJob, flow_run_name: str, task_run_id: UUID
+ ) -> None:
+ """
+ Emit prefect task run to datahub rest. Prefect task run get mapped with datahub
+ data process instance entity which get's generate from provided datajob entity.
+ Assign task run properties to data process instance properties.
+
+ Args:
+ datajob (DataJob): The datahub datajob entity used to create \
+ data process instance.
+ flow_run_name (str): The prefect current running flow run name.
+ task_run_id (str): The prefect task run id.
+ """
+
+ async def get_task_run(task_run_id: UUID) -> TaskRun:
+ client = orchestration.get_client()
+
+ if not hasattr(client, "read_task_run"):
+ raise ValueError("Client does not support async read_task_run method")
+
+ response = client.read_task_run(task_run_id=task_run_id)
+
+ if asyncio.iscoroutine(response):
+ response = await response
+
+ return TaskRun.parse_obj(response)
+
+ task_run: TaskRun = asyncio.run(get_task_run(task_run_id))
+
+ assert task_run
+
+ if self.platform_instance is not None:
+ dpi_id = f"{self.platform_instance}.{flow_run_name}.{task_run.name}"
+ else:
+ dpi_id = f"{flow_run_name}.{task_run.name}"
+
+ dpi = DataProcessInstance.from_datajob(
+ datajob=datajob,
+ id=dpi_id,
+ clone_inlets=True,
+ clone_outlets=True,
+ )
+
+ dpi_property_bag: Dict[str, str] = {}
+
+ allowed_task_run_keys = [
+ ID,
+ FLOW_RUN_ID,
+ CREATED,
+ UPDATED,
+ ESTIMATED_RUN_TIME,
+ START_TIME,
+ END_TIME,
+ TOTAL_RUN_TIME,
+ NEXT_SCHEDULED_START_TIME,
+ TAGS,
+ RUN_COUNT,
+ ]
+
+ for key in allowed_task_run_keys:
+ if hasattr(task_run, key) and getattr(task_run, key) is not None:
+ dpi_property_bag[key] = str(getattr(task_run, key))
+
+ dpi.properties.update(dpi_property_bag)
+
+ state_result_map: Dict[str, InstanceRunResult] = {
+ COMPLETE: InstanceRunResult.SUCCESS,
+ FAILED: InstanceRunResult.FAILURE,
+ CANCELLED: InstanceRunResult.SKIPPED,
+ }
+
+ if task_run.state_name not in state_result_map:
+ raise Exception(
+ f"State should be either complete, failed or cancelled and it was "
+ f"{task_run.state_name}"
+ )
+
+ result = state_result_map[task_run.state_name]
+
+ if task_run.start_time is not None:
+ dpi.emit_process_start(
+ emitter=self.emitter,
+ start_timestamp_millis=int(task_run.start_time.timestamp() * 1000),
+ emit_template=False,
+ )
+
+ if task_run.end_time is not None:
+ dpi.emit_process_end(
+ emitter=self.emitter,
+ end_timestamp_millis=int(task_run.end_time.timestamp() * 1000),
+ result=result,
+ result_type=ORCHESTRATOR,
+ )
+
+ def add_task(
+ self,
+ inputs: Optional[List[_Entity]] = None,
+ outputs: Optional[List[_Entity]] = None,
+ ) -> None:
+ """
+ Store prefect current running task metadata temporarily which later get emit
+ to datahub rest only if user calls emit_flow. Prefect task gets mapped with
+ datahub datajob entity. Assign provided inputs and outputs as datajob inlets
+ and outlets respectively.
+
+ Args:
+ inputs (Optional[list]): The list of task inputs.
+ outputs (Optional[list]): The list of task outputs.
+
+ Example:
+ Emit the task metadata as show below:
+ ```python
+ from prefect import flow, task
+ from prefect_datahub.dataset import Dataset
+ from prefect_datahub.datahub_emitter import DatahubEmitter
+
+ datahub_emitter = DatahubEmitter.load("MY_BLOCK_NAME")
+
+ @task(name="Transform", description="Transform the data")
+ def transform(data):
+ data = data.split(" ")
+ datahub_emitter.add_task(
+ inputs=[Dataset("snowflake", "mydb.schema.tableA")],
+ outputs=[Dataset("snowflake", "mydb.schema.tableC")],
+ )
+ return data
+
+ @flow(name="ETL flow", description="Extract transform load flow")
+ def etl():
+ data = transform("This is data")
+ datahub_emitter.emit_flow()
+ ```
+ """
+ try:
+ flow_run_ctx = FlowRunContext.get()
+ task_run_ctx = TaskRunContext.get()
+
+ assert flow_run_ctx
+ assert task_run_ctx
+
+ datajob = self._generate_datajob(
+ flow_run_ctx=flow_run_ctx, task_run_ctx=task_run_ctx
+ )
+
+ if datajob is not None:
+ if inputs is not None:
+ datajob.inlets.extend(self._entities_to_urn_list(inputs))
+ if outputs is not None:
+ datajob.outlets.extend(self._entities_to_urn_list(outputs))
+ self._datajobs_to_emit[str(datajob.urn)] = cast(_Entity, datajob)
+ except Exception:
+ get_run_logger().debug(traceback.format_exc())
+
+ def emit_flow(self) -> None:
+ """
+ Emit prefect current running flow metadata to datahub rest. Prefect flow gets
+ mapped with datahub dataflow entity. If the user hasn't called add_task in
+ the task function still emit_flow will emit a task but without task name,
+ description,tags and properties.
+
+
+ Example:
+ Emit the flow metadata as show below:
+ ```python
+ from prefect import flow, task
+ from prefect_datahub.datahub_emitter import DatahubEmitter
+
+ datahub_emitter = DatahubEmitter.load("MY_BLOCK_NAME")
+
+ @flow(name="ETL flow", description="Extract transform load flow")
+ def etl():
+ data = extract()
+ data = transform(data)
+ load(data)
+ datahub_emitter.emit_flow()
+ ```
+ """
+ try:
+ flow_run_ctx = FlowRunContext.get()
+
+ assert flow_run_ctx
+ assert flow_run_ctx.flow_run
+
+ workspace_name = self._get_workspace()
+
+ # Emit flow and flow run
+ get_run_logger().info("Emitting flow to datahub...")
+ dataflow = self._generate_dataflow(flow_run_ctx=flow_run_ctx)
+
+ if dataflow is not None:
+ dataflow.emit(self.emitter)
+
+ if workspace_name is not None:
+ self._emit_browsepath(str(dataflow.urn), workspace_name)
+
+ self._emit_flow_run(dataflow, flow_run_ctx.flow_run.id)
+
+ self._emit_tasks(flow_run_ctx, dataflow, workspace_name)
+ except Exception:
+ get_run_logger().debug(traceback.format_exc())
diff --git a/metadata-ingestion-modules/prefect-plugin/src/prefect_datahub/entities.py b/metadata-ingestion-modules/prefect-plugin/src/prefect_datahub/entities.py
new file mode 100644
index 00000000000000..e2711d0925d977
--- /dev/null
+++ b/metadata-ingestion-modules/prefect-plugin/src/prefect_datahub/entities.py
@@ -0,0 +1,46 @@
+from abc import abstractmethod
+from typing import Optional
+
+import attr
+import datahub.emitter.mce_builder as builder
+from datahub.utilities.urns.urn import guess_entity_type
+
+
+class _Entity:
+ @property
+ @abstractmethod
+ def urn(self) -> str:
+ pass
+
+
+@attr.s(auto_attribs=True, str=True)
+class Dataset(_Entity):
+ platform: str
+ name: str
+ env: str = builder.DEFAULT_ENV
+ platform_instance: Optional[str] = None
+
+ @property
+ def urn(self):
+ return builder.make_dataset_urn_with_platform_instance(
+ platform=self.platform,
+ name=self.name,
+ platform_instance=self.platform_instance,
+ env=self.env,
+ )
+
+
+@attr.s(str=True)
+class Urn(_Entity):
+ _urn: str = attr.ib()
+
+ @_urn.validator
+ def _validate_urn(self, attribute, value):
+ if not value.startswith("urn:"):
+ raise ValueError("invalid urn provided: urns must start with 'urn:'")
+ if guess_entity_type(value) != "dataset":
+ raise ValueError("Datajob input/output currently only supports datasets")
+
+ @property
+ def urn(self):
+ return self._urn
diff --git a/metadata-ingestion-modules/prefect-plugin/src/prefect_datahub/example/__init__.py b/metadata-ingestion-modules/prefect-plugin/src/prefect_datahub/example/__init__.py
new file mode 100644
index 00000000000000..e69de29bb2d1d6
diff --git a/metadata-ingestion-modules/prefect-plugin/src/prefect_datahub/example/flow.py b/metadata-ingestion-modules/prefect-plugin/src/prefect_datahub/example/flow.py
new file mode 100644
index 00000000000000..3f404d04887084
--- /dev/null
+++ b/metadata-ingestion-modules/prefect-plugin/src/prefect_datahub/example/flow.py
@@ -0,0 +1,52 @@
+from typing import List, Tuple
+
+from prefect import flow, task
+
+from prefect_datahub.datahub_emitter import DatahubEmitter
+from prefect_datahub.entities import Dataset
+
+datahub_emitter_block = DatahubEmitter.load("datahub-emitter-test")
+
+
+@task(name="Extract", description="Extract the data")
+def extract() -> str:
+ data = "This is data"
+ return data
+
+
+@task(name="Transform", description="Transform the data")
+def transform(
+ data: str, datahub_emitter: DatahubEmitter
+) -> Tuple[List[str], DatahubEmitter]:
+ data_list_str = data.split(" ")
+ datahub_emitter.add_task(
+ inputs=[
+ Dataset(
+ platform="snowflake",
+ name="mydb.schema.tableA",
+ env=datahub_emitter.env,
+ platform_instance=datahub_emitter.platform_instance,
+ )
+ ],
+ outputs=[
+ Dataset(
+ platform="snowflake",
+ name="mydb.schema.tableB",
+ env=datahub_emitter.env,
+ platform_instance=datahub_emitter.platform_instance,
+ )
+ ],
+ )
+ return data_list_str, datahub_emitter
+
+
+@flow(name="ETL", description="Extract transform load flow")
+def etl() -> None:
+ datahub_emitter = datahub_emitter_block
+ data = extract()
+ return_value = transform(data, datahub_emitter) # type: ignore
+ emitter = return_value[1]
+ emitter.emit_flow()
+
+
+etl()
diff --git a/metadata-ingestion-modules/prefect-plugin/src/prefect_datahub/example/save_block.py b/metadata-ingestion-modules/prefect-plugin/src/prefect_datahub/example/save_block.py
new file mode 100644
index 00000000000000..33996785f70cca
--- /dev/null
+++ b/metadata-ingestion-modules/prefect-plugin/src/prefect_datahub/example/save_block.py
@@ -0,0 +1,9 @@
+from prefect_datahub.datahub_emitter import DatahubEmitter
+
+datahub_emitter = DatahubEmitter(
+ datahub_rest_url="http://localhost:8080",
+ env="DEV",
+ platform_instance="local_prefect",
+ token=None, # generate auth token in the datahub and provide here if gms endpoint is secure
+)
+datahub_emitter.save("datahub-emitter-test") # type: ignore
diff --git a/metadata-ingestion-modules/prefect-plugin/tests/integration/integration_test_dummy.py b/metadata-ingestion-modules/prefect-plugin/tests/integration/integration_test_dummy.py
new file mode 100644
index 00000000000000..10cf3ad0a608ae
--- /dev/null
+++ b/metadata-ingestion-modules/prefect-plugin/tests/integration/integration_test_dummy.py
@@ -0,0 +1,2 @@
+def test_dummy():
+ pass
diff --git a/metadata-ingestion-modules/prefect-plugin/tests/unit/test_block_standards.py b/metadata-ingestion-modules/prefect-plugin/tests/unit/test_block_standards.py
new file mode 100644
index 00000000000000..12801a01ad07e5
--- /dev/null
+++ b/metadata-ingestion-modules/prefect-plugin/tests/unit/test_block_standards.py
@@ -0,0 +1,31 @@
+import re
+from typing import Type
+
+import pytest
+from prefect.blocks.core import Block
+
+from prefect_datahub.datahub_emitter import DatahubEmitter
+
+
+@pytest.mark.parametrize("block", [DatahubEmitter])
+class TestAllBlocksAdhereToStandards:
+ @pytest.fixture
+ def block(self, block):
+ return block
+
+ def test_has_a_description(self, block: Type[Block]) -> None:
+ assert block.get_description()
+
+ def test_has_a_valid_code_example(self, block: Type[Block]) -> None:
+ code_example = block.get_code_example()
+ assert code_example is not None, f"{block.__name__} is missing a code example"
+ import_pattern = rf"from .* import {block.__name__}"
+ assert re.search(import_pattern, code_example) is not None, (
+ f"The code example for {block.__name__} is missing an import statement"
+ f" matching the pattern {import_pattern}"
+ )
+ block_load_pattern = rf'.* = {block.__name__}\.load\("BLOCK_NAME"\)'
+ assert re.search(block_load_pattern, code_example), (
+ f"The code example for {block.__name__} is missing a .load statement"
+ f" matching the pattern {block_load_pattern}"
+ )
diff --git a/metadata-ingestion-modules/prefect-plugin/tests/unit/test_datahub_emitter.py b/metadata-ingestion-modules/prefect-plugin/tests/unit/test_datahub_emitter.py
new file mode 100644
index 00000000000000..ba50cddc986b66
--- /dev/null
+++ b/metadata-ingestion-modules/prefect-plugin/tests/unit/test_datahub_emitter.py
@@ -0,0 +1,824 @@
+import asyncio
+import json
+import logging
+from typing import Dict, List, Optional, cast
+from unittest.mock import MagicMock, Mock, patch
+from uuid import UUID
+
+import pytest
+from datahub.api.entities.datajob import DataJob
+from datahub.utilities.urns.dataset_urn import DatasetUrn
+from prefect.client.schemas import FlowRun, TaskRun, Workspace
+from prefect.futures import PrefectFuture
+from prefect.server.schemas.core import Flow
+from prefect.task_runners import SequentialTaskRunner
+from requests.models import Response
+
+from prefect_datahub.datahub_emitter import DatahubEmitter
+from prefect_datahub.entities import Dataset, _Entity
+
+mock_transform_task_json: Dict = {
+ "name": "transform",
+ "description": "Transform the actual data",
+ "task_key": "__main__.transform",
+ "tags": ["etl flow task"],
+}
+
+mock_extract_task_run_json: Dict = {
+ "id": "fa14a52b-d271-4c41-99cb-6b42ca7c070b",
+ "created": "2023-06-06T05:51:54.822707+00:00",
+ "updated": "2023-06-06T05:51:55.126000+00:00",
+ "name": "Extract-0",
+ "flow_run_id": "c3b947e5-3fa1-4b46-a2e2-58d50c938f2e",
+ "task_key": "__main__.extract",
+ "dynamic_key": "0",
+ "cache_key": None,
+ "cache_expiration": None,
+ "task_version": None,
+ "empirical_policy": {
+ "max_retries": 0,
+ "retry_delay_seconds": 0.0,
+ "retries": 0,
+ "retry_delay": 0,
+ "retry_jitter_factor": None,
+ },
+ "tags": [],
+ "state_id": "e280decd-2cc8-4428-a70f-149bcaf95b3c",
+ "task_inputs": {},
+ "state_type": "COMPLETED",
+ "state_name": "Completed",
+ "run_count": 1,
+ "flow_run_run_count": 1,
+ "expected_start_time": "2023-06-06T05:51:54.822183+00:00",
+ "next_scheduled_start_time": None,
+ "start_time": "2023-06-06T05:51:55.016264+00:00",
+ "end_time": "2023-06-06T05:51:55.096534+00:00",
+ "total_run_time": 0.08027,
+ "estimated_run_time": 0.08027,
+ "estimated_start_time_delta": 0.194081,
+ "state": {
+ "id": "e280decd-2cc8-4428-a70f-149bcaf95b3c",
+ "type": "COMPLETED",
+ "name": "Completed",
+ "timestamp": "2023-06-06T05:51:55.096534+00:00",
+ "message": None,
+ "data": {"type": "unpersisted"},
+ "state_details": {
+ "flow_run_id": "c3b947e5-3fa1-4b46-a2e2-58d50c938f2e",
+ "task_run_id": "fa14a52b-d271-4c41-99cb-6b42ca7c070b",
+ "child_flow_run_id": None,
+ "scheduled_time": None,
+ "cache_key": None,
+ "cache_expiration": None,
+ "untrackable_result": False,
+ "pause_timeout": None,
+ "pause_reschedule": False,
+ "pause_key": None,
+ "refresh_cache": None,
+ },
+ },
+}
+
+mock_transform_task_run_json: Dict = {
+ "id": "dd15ee83-5d28-4bf1-804f-f84eab9f9fb7",
+ "created": "2023-06-06T05:51:55.160372+00:00",
+ "updated": "2023-06-06T05:51:55.358000+00:00",
+ "name": "transform-0",
+ "flow_run_id": "c3b947e5-3fa1-4b46-a2e2-58d50c938f2e",
+ "task_key": "__main__.transform",
+ "dynamic_key": "0",
+ "cache_key": None,
+ "cache_expiration": None,
+ "task_version": None,
+ "empirical_policy": {
+ "max_retries": 0,
+ "retry_delay_seconds": 0.0,
+ "retries": 0,
+ "retry_delay": 0,
+ "retry_jitter_factor": None,
+ },
+ "tags": [],
+ "state_id": "971ad82e-6e5f-4691-abab-c900358e96c2",
+ "task_inputs": {
+ "actual_data": [
+ {"input_type": "task_run", "id": "fa14a52b-d271-4c41-99cb-6b42ca7c070b"}
+ ]
+ },
+ "state_type": "COMPLETED",
+ "state_name": "Completed",
+ "run_count": 1,
+ "flow_run_run_count": 1,
+ "expected_start_time": "2023-06-06T05:51:55.159416+00:00",
+ "next_scheduled_start_time": None,
+ "start_time": "2023-06-06T05:51:55.243159+00:00",
+ "end_time": "2023-06-06T05:51:55.332950+00:00",
+ "total_run_time": 0.089791,
+ "estimated_run_time": 0.089791,
+ "estimated_start_time_delta": 0.083743,
+ "state": {
+ "id": "971ad82e-6e5f-4691-abab-c900358e96c2",
+ "type": "COMPLETED",
+ "name": "Completed",
+ "timestamp": "2023-06-06T05:51:55.332950+00:00",
+ "message": None,
+ "data": {"type": "unpersisted"},
+ "state_details": {
+ "flow_run_id": "c3b947e5-3fa1-4b46-a2e2-58d50c938f2e",
+ "task_run_id": "dd15ee83-5d28-4bf1-804f-f84eab9f9fb7",
+ "child_flow_run_id": None,
+ "scheduled_time": None,
+ "cache_key": None,
+ "cache_expiration": None,
+ "untrackable_result": False,
+ "pause_timeout": None,
+ "pause_reschedule": False,
+ "pause_key": None,
+ "refresh_cache": None,
+ },
+ },
+}
+mock_load_task_run_json: Dict = {
+ "id": "f19f83ea-316f-4781-8cbe-1d5d8719afc3",
+ "created": "2023-06-06T05:51:55.389823+00:00",
+ "updated": "2023-06-06T05:51:55.566000+00:00",
+ "name": "Load_task-0",
+ "flow_run_id": "c3b947e5-3fa1-4b46-a2e2-58d50c938f2e",
+ "task_key": "__main__.load",
+ "dynamic_key": "0",
+ "cache_key": None,
+ "cache_expiration": None,
+ "task_version": None,
+ "empirical_policy": {
+ "max_retries": 0,
+ "retry_delay_seconds": 0.0,
+ "retries": 0,
+ "retry_delay": 0,
+ "retry_jitter_factor": None,
+ },
+ "tags": [],
+ "state_id": "0cad13c8-84e4-4bcf-8616-c5904e10dcb4",
+ "task_inputs": {
+ "data": [
+ {"input_type": "task_run", "id": "dd15ee83-5d28-4bf1-804f-f84eab9f9fb7"}
+ ]
+ },
+ "state_type": "COMPLETED",
+ "state_name": "Completed",
+ "run_count": 1,
+ "flow_run_run_count": 1,
+ "expected_start_time": "2023-06-06T05:51:55.389075+00:00",
+ "next_scheduled_start_time": None,
+ "start_time": "2023-06-06T05:51:55.461812+00:00",
+ "end_time": "2023-06-06T05:51:55.535954+00:00",
+ "total_run_time": 0.074142,
+ "estimated_run_time": 0.074142,
+ "estimated_start_time_delta": 0.072737,
+ "state": {
+ "id": "0cad13c8-84e4-4bcf-8616-c5904e10dcb4",
+ "type": "COMPLETED",
+ "name": "Completed",
+ "timestamp": "2023-06-06T05:51:55.535954+00:00",
+ "message": None,
+ "data": {"type": "unpersisted"},
+ "state_details": {
+ "flow_run_id": "c3b947e5-3fa1-4b46-a2e2-58d50c938f2e",
+ "task_run_id": "f19f83ea-316f-4781-8cbe-1d5d8719afc3",
+ "child_flow_run_id": None,
+ "scheduled_time": None,
+ "cache_key": None,
+ "cache_expiration": None,
+ "untrackable_result": True,
+ "pause_timeout": None,
+ "pause_reschedule": False,
+ "pause_key": None,
+ "refresh_cache": None,
+ },
+ },
+}
+mock_flow_json: Dict = {
+ "id": "cc65498f-d950-4114-8cc1-7af9e8fdf91b",
+ "created": "2023-06-02T12:31:10.988697+00:00",
+ "updated": "2023-06-02T12:31:10.988710+00:00",
+ "name": "etl",
+ "description": "Extract transform load flow",
+ "tags": [],
+}
+mock_flow_run_json: Dict = {
+ "id": "c3b947e5-3fa1-4b46-a2e2-58d50c938f2e",
+ "created": "2023-06-06T05:51:54.544266+00:00",
+ "updated": "2023-06-06T05:51:55.622000+00:00",
+ "name": "olivine-beagle",
+ "flow_id": "cc65498f-d950-4114-8cc1-7af9e8fdf91b",
+ "state_id": "ca2db325-d98f-40e7-862e-449cd0cc9a6e",
+ "deployment_id": None,
+ "work_queue_name": None,
+ "flow_version": "3ba54dfa31a7c9af4161aa4cd020a527",
+ "parameters": {},
+ "idempotency_key": None,
+ "context": {},
+ "empirical_policy": {
+ "max_retries": 0,
+ "retry_delay_seconds": 0.0,
+ "retries": 0,
+ "retry_delay": 0,
+ "pause_keys": [],
+ "resuming": False,
+ },
+ "tags": [],
+ "parent_task_run_id": None,
+ "state_type": "COMPLETED",
+ "state_name": "Completed",
+ "run_count": 1,
+ "expected_start_time": "2023-06-06T05:51:54.543357+00:00",
+ "next_scheduled_start_time": None,
+ "start_time": "2023-06-06T05:51:54.750523+00:00",
+ "end_time": "2023-06-06T05:51:55.596446+00:00",
+ "total_run_time": 0.845923,
+ "estimated_run_time": 0.845923,
+ "estimated_start_time_delta": 0.207166,
+ "auto_scheduled": False,
+ "infrastructure_document_id": None,
+ "infrastructure_pid": None,
+ "created_by": None,
+ "work_pool_name": None,
+ "state": {
+ "id": "ca2db325-d98f-40e7-862e-449cd0cc9a6e",
+ "type": "COMPLETED",
+ "name": "Completed",
+ "timestamp": "2023-06-06T05:51:55.596446+00:00",
+ "message": "All states completed.",
+ "data": {"type": "unpersisted"},
+ "state_details": {
+ "flow_run_id": "c3b947e5-3fa1-4b46-a2e2-58d50c938f2e",
+ "task_run_id": None,
+ "child_flow_run_id": None,
+ "scheduled_time": None,
+ "cache_key": None,
+ "cache_expiration": None,
+ "untrackable_result": False,
+ "pause_timeout": None,
+ "pause_reschedule": False,
+ "pause_key": None,
+ "refresh_cache": None,
+ },
+ },
+}
+mock_graph_json: List[Dict] = [
+ {
+ "id": "fa14a52b-d271-4c41-99cb-6b42ca7c070b",
+ "name": "Extract-0",
+ "upstream_dependencies": [],
+ "state": {
+ "id": "e280decd-2cc8-4428-a70f-149bcaf95b3c",
+ "type": "COMPLETED",
+ "name": "Completed",
+ "timestamp": "2023-06-06T05:51:55.096534+00:00",
+ "message": None,
+ "data": {"type": "unpersisted"},
+ "state_details": {
+ "flow_run_id": "c3b947e5-3fa1-4b46-a2e2-58d50c938f2e",
+ "task_run_id": "fa14a52b-d271-4c41-99cb-6b42ca7c070b",
+ "child_flow_run_id": None,
+ "scheduled_time": None,
+ "cache_key": None,
+ "cache_expiration": None,
+ "untrackable_result": False,
+ "pause_timeout": None,
+ "pause_reschedule": False,
+ "pause_key": None,
+ "refresh_cache": None,
+ },
+ },
+ "expected_start_time": "2023-06-06T05:51:54.822183+00:00",
+ "start_time": "2023-06-06T05:51:55.016264+00:00",
+ "end_time": "2023-06-06T05:51:55.096534+00:00",
+ "total_run_time": 0.08027,
+ "estimated_run_time": 0.08027,
+ "untrackable_result": False,
+ },
+ {
+ "id": "f19f83ea-316f-4781-8cbe-1d5d8719afc3",
+ "name": "Load_task-0",
+ "upstream_dependencies": [
+ {"input_type": "task_run", "id": "dd15ee83-5d28-4bf1-804f-f84eab9f9fb7"}
+ ],
+ "state": {
+ "id": "0cad13c8-84e4-4bcf-8616-c5904e10dcb4",
+ "type": "COMPLETED",
+ "name": "Completed",
+ "timestamp": "2023-06-06T05:51:55.535954+00:00",
+ "message": None,
+ "data": {"type": "unpersisted"},
+ "state_details": {
+ "flow_run_id": "c3b947e5-3fa1-4b46-a2e2-58d50c938f2e",
+ "task_run_id": "f19f83ea-316f-4781-8cbe-1d5d8719afc3",
+ "child_flow_run_id": None,
+ "scheduled_time": None,
+ "cache_key": None,
+ "cache_expiration": None,
+ "untrackable_result": True,
+ "pause_timeout": None,
+ "pause_reschedule": False,
+ "pause_key": None,
+ "refresh_cache": None,
+ },
+ },
+ "expected_start_time": "2023-06-06T05:51:55.389075+00:00",
+ "start_time": "2023-06-06T05:51:55.461812+00:00",
+ "end_time": "2023-06-06T05:51:55.535954+00:00",
+ "total_run_time": 0.074142,
+ "estimated_run_time": 0.074142,
+ "untrackable_result": True,
+ },
+ {
+ "id": "dd15ee83-5d28-4bf1-804f-f84eab9f9fb7",
+ "name": "transform-0",
+ "upstream_dependencies": [
+ {"input_type": "task_run", "id": "fa14a52b-d271-4c41-99cb-6b42ca7c070b"}
+ ],
+ "state": {
+ "id": "971ad82e-6e5f-4691-abab-c900358e96c2",
+ "type": "COMPLETED",
+ "name": "Completed",
+ "timestamp": "2023-06-06T05:51:55.332950+00:00",
+ "message": None,
+ "data": {"type": "unpersisted"},
+ "state_details": {
+ "flow_run_id": "c3b947e5-3fa1-4b46-a2e2-58d50c938f2e",
+ "task_run_id": "dd15ee83-5d28-4bf1-804f-f84eab9f9fb7",
+ "child_flow_run_id": None,
+ "scheduled_time": None,
+ "cache_key": None,
+ "cache_expiration": None,
+ "untrackable_result": False,
+ "pause_timeout": None,
+ "pause_reschedule": False,
+ "pause_key": None,
+ "refresh_cache": None,
+ },
+ },
+ "expected_start_time": "2023-06-06T05:51:55.159416+00:00",
+ "start_time": "2023-06-06T05:51:55.243159+00:00",
+ "end_time": "2023-06-06T05:51:55.332950+00:00",
+ "total_run_time": 0.089791,
+ "estimated_run_time": 0.089791,
+ "untrackable_result": False,
+ },
+]
+mock_workspace_json: Dict = {
+ "account_id": "33e98cfe-ad06-4ceb-a500-c11148499f75",
+ "account_name": "shubhamjagtapgslabcom",
+ "account_handle": "shubhamjagtapgslabcom",
+ "workspace_id": "157eb822-1b3b-4338-ae80-98edd5d00cb9",
+ "workspace_name": "datahub",
+ "workspace_description": "",
+ "workspace_handle": "datahub",
+}
+
+
+async def mock_task_run_future():
+ extract_prefect_future: PrefectFuture = PrefectFuture(
+ name=mock_extract_task_run_json["name"],
+ key=UUID("4552629a-ac04-4590-b286-27642292739f"),
+ task_runner=SequentialTaskRunner(),
+ )
+ extract_prefect_future.task_run = cast(
+ None, TaskRun.parse_obj(mock_extract_task_run_json)
+ )
+ transform_prefect_future: PrefectFuture = PrefectFuture(
+ name=mock_transform_task_run_json["name"],
+ key=UUID("40fff3e5-5ef4-4b8b-9cc8-786f91bcc656"),
+ task_runner=SequentialTaskRunner(),
+ )
+ transform_prefect_future.task_run = cast(
+ None, TaskRun.parse_obj(mock_transform_task_run_json)
+ )
+ load_prefect_future: PrefectFuture = PrefectFuture(
+ name=mock_load_task_run_json["name"],
+ key=UUID("7565f596-9eb0-4330-ba34-963e7839883e"),
+ task_runner=SequentialTaskRunner(),
+ )
+ load_prefect_future.task_run = cast(
+ None, TaskRun.parse_obj(mock_load_task_run_json)
+ )
+ return [extract_prefect_future, transform_prefect_future, load_prefect_future]
+
+
+@pytest.fixture(scope="module")
+def mock_run_logger():
+ with patch(
+ "prefect_datahub.datahub_emitter.get_run_logger",
+ return_value=logging.getLogger(),
+ ) as mock_logger:
+ yield mock_logger
+
+
+@pytest.fixture(scope="module")
+def mock_run_context(mock_run_logger):
+ task_run_ctx = MagicMock()
+ task_run_ctx.task.task_key = mock_transform_task_json["task_key"]
+ task_run_ctx.task.name = mock_transform_task_json["name"]
+ task_run_ctx.task.description = mock_transform_task_json["description"]
+ task_run_ctx.task.tags = mock_transform_task_json["tags"]
+
+ flow_run_ctx = MagicMock()
+ flow_run_ctx.flow.name = mock_flow_json["name"]
+ flow_run_ctx.flow.description = mock_flow_json["description"]
+ flow_run_obj = FlowRun.parse_obj(mock_flow_run_json)
+ flow_run_ctx.flow_run.id = flow_run_obj.id
+ flow_run_ctx.flow_run.name = flow_run_obj.name
+ flow_run_ctx.flow_run.flow_id = flow_run_obj.flow_id
+ flow_run_ctx.flow_run.start_time = flow_run_obj.start_time
+ flow_run_ctx.task_run_futures = asyncio.run(mock_task_run_future())
+
+ with patch(
+ "prefect_datahub.datahub_emitter.TaskRunContext"
+ ) as mock_task_run_ctx, patch(
+ "prefect_datahub.datahub_emitter.FlowRunContext"
+ ) as mock_flow_run_ctx:
+ mock_task_run_ctx.get.return_value = task_run_ctx
+ mock_flow_run_ctx.get.return_value = flow_run_ctx
+ yield (task_run_ctx, flow_run_ctx)
+
+
+async def mock_task_run(*args, **kwargs):
+ task_run_id = str(kwargs["task_run_id"])
+ if task_run_id == "fa14a52b-d271-4c41-99cb-6b42ca7c070b":
+ return TaskRun.parse_obj(mock_extract_task_run_json)
+ elif task_run_id == "dd15ee83-5d28-4bf1-804f-f84eab9f9fb7":
+ return TaskRun.parse_obj(mock_transform_task_run_json)
+ elif task_run_id == "f19f83ea-316f-4781-8cbe-1d5d8719afc3":
+ return TaskRun.parse_obj(mock_load_task_run_json)
+ return None
+
+
+async def mock_flow(*args, **kwargs):
+ return Flow.parse_obj(mock_flow_json)
+
+
+async def mock_flow_run(*args, **kwargs):
+ return FlowRun.parse_obj(mock_flow_run_json)
+
+
+async def mock_flow_run_graph(*args, **kwargs):
+ response = Response()
+ response.status_code = 200
+ response._content = json.dumps(mock_graph_json, separators=(",", ":")).encode(
+ "utf-8"
+ )
+ return response
+
+
+async def mock_api_healthcheck(*args, **kwargs):
+ return None
+
+
+async def mock_read_workspaces(*args, **kwargs):
+ return [Workspace.parse_obj(mock_workspace_json)]
+
+
+@pytest.fixture(scope="module")
+def mock_prefect_client():
+ prefect_client_mock = MagicMock()
+ prefect_client_mock.read_flow.side_effect = mock_flow
+ prefect_client_mock.read_flow_run.side_effect = mock_flow_run
+ prefect_client_mock.read_task_run.side_effect = mock_task_run
+ prefect_client_mock._client.get.side_effect = mock_flow_run_graph
+ with patch("prefect_datahub.datahub_emitter.orchestration") as mock_client:
+ mock_client.get_client.return_value = prefect_client_mock
+ yield prefect_client_mock
+
+
+@pytest.fixture(scope="module")
+def mock_prefect_cloud_client():
+ prefect_cloud_client_mock = MagicMock()
+ prefect_cloud_client_mock.api_healthcheck.side_effect = mock_api_healthcheck
+ prefect_cloud_client_mock.read_workspaces.side_effect = mock_read_workspaces
+ with patch("prefect_datahub.datahub_emitter.cloud") as mock_client, patch(
+ "prefect_datahub.datahub_emitter.PREFECT_API_URL.value",
+ return_value="https://api.prefect.cloud/api/accounts/33e98cfe-ad06-4ceb-"
+ "a500-c11148499f75/workspaces/157eb822-1b3b-4338-ae80-98edd5d00cb9",
+ ):
+ mock_client.get_cloud_client.return_value = prefect_cloud_client_mock
+ yield prefect_cloud_client_mock
+
+
+@patch("prefect_datahub.datahub_emitter.DatahubRestEmitter", autospec=True)
+def test_entities_to_urn_list(mock_emit):
+ dataset_urn_list = DatahubEmitter()._entities_to_urn_list(
+ [Dataset("snowflake", "mydb.schema.tableA")]
+ )
+ for dataset_urn in dataset_urn_list:
+ assert isinstance(dataset_urn, DatasetUrn)
+
+
+@patch("prefect_datahub.datahub_emitter.DatahubRestEmitter", autospec=True)
+def test_get_flow_run_graph(mock_emit, mock_prefect_client):
+ graph_json = asyncio.run(
+ DatahubEmitter()._get_flow_run_graph("c3b947e5-3fa1-4b46-a2e2-58d50c938f2e")
+ )
+ assert isinstance(graph_json, list)
+
+
+@patch("prefect_datahub.datahub_emitter.DatahubRestEmitter", autospec=True)
+def test__get_workspace(mock_emit, mock_prefect_cloud_client):
+ workspace_name = DatahubEmitter()._get_workspace()
+ assert workspace_name == "datahub"
+
+
+@patch("prefect_datahub.datahub_emitter.DatahubRestEmitter", autospec=True)
+def test_add_task(mock_emit, mock_run_context):
+ mock_emitter = Mock()
+ mock_emit.return_value = mock_emitter
+
+ datahub_emitter = DatahubEmitter()
+ inputs: Optional[List[_Entity]] = [Dataset("snowflake", "mydb.schema.tableA")]
+ outputs: Optional[List[_Entity]] = [Dataset("snowflake", "mydb.schema.tableC")]
+ datahub_emitter.add_task(
+ inputs=inputs,
+ outputs=outputs,
+ )
+
+ task_run_ctx = mock_run_context[0]
+ flow_run_ctx = mock_run_context[1]
+
+ expected_datajob_urn = (
+ f"urn:li:dataJob:(urn:li:dataFlow:"
+ f"(prefect,{flow_run_ctx.flow.name},PROD),{task_run_ctx.task.task_key})"
+ )
+
+ assert expected_datajob_urn in datahub_emitter._datajobs_to_emit.keys()
+ actual_datajob = datahub_emitter._datajobs_to_emit[expected_datajob_urn]
+ assert isinstance(actual_datajob, DataJob)
+ assert str(actual_datajob.flow_urn) == "urn:li:dataFlow:(prefect,etl,PROD)"
+ assert actual_datajob.name == task_run_ctx.task.name
+ assert actual_datajob.description == task_run_ctx.task.description
+ assert actual_datajob.tags == task_run_ctx.task.tags
+ assert (
+ str(actual_datajob.inlets[0])
+ == "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableA,PROD)"
+ )
+ assert (
+ str(actual_datajob.outlets[0])
+ == "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)"
+ )
+ assert mock_emit.emit.call_count == 0
+
+
+@patch("prefect_datahub.datahub_emitter.DatahubRestEmitter", autospec=True)
+def test_emit_flow(
+ mock_emit, mock_run_context, mock_prefect_client, mock_prefect_cloud_client
+):
+ mock_emitter = Mock()
+ mock_emit.return_value = mock_emitter
+
+ platform_instance = "datahub_workspace"
+
+ datahub_emitter = DatahubEmitter(platform_instance=platform_instance)
+ datahub_emitter.add_task()
+ datahub_emitter.emit_flow()
+
+ task_run_ctx = mock_run_context[0]
+ flow_run_ctx = mock_run_context[1]
+
+ expected_dataflow_urn = (
+ f"urn:li:dataFlow:(prefect,{platform_instance}.{flow_run_ctx.flow.name},PROD)"
+ )
+
+ expected_dataflow_urn = (
+ f"urn:li:dataFlow:(prefect,{platform_instance}.{flow_run_ctx.flow.name},PROD)"
+ )
+
+ # Ignore the first call (index 0) which is a connection call
+ # DataFlow assertions
+ assert mock_emitter.method_calls[1][1][0].aspectName == "dataFlowInfo"
+ assert mock_emitter.method_calls[1][1][0].entityUrn == expected_dataflow_urn
+ assert mock_emitter.method_calls[2][1][0].aspectName == "status"
+ assert mock_emitter.method_calls[2][1][0].entityUrn == expected_dataflow_urn
+ assert mock_emitter.method_calls[3][1][0].aspectName == "ownership"
+ assert mock_emitter.method_calls[3][1][0].entityUrn == expected_dataflow_urn
+ assert mock_emitter.method_calls[4][1][0].aspectName == "globalTags"
+ assert mock_emitter.method_calls[4][1][0].entityUrn == expected_dataflow_urn
+ assert mock_emitter.method_calls[5][1][0].aspectName == "browsePaths"
+ assert mock_emitter.method_calls[5][1][0].entityUrn == expected_dataflow_urn
+
+ # DataProcessInstance assertions for the flow
+ assert (
+ mock_emitter.method_calls[10][1][0].aspectName
+ == "dataProcessInstanceProperties"
+ )
+ assert (
+ mock_emitter.method_calls[10][1][0].entityUrn
+ == "urn:li:dataProcessInstance:56231547bcc2781e0c14182ceab6c9ac"
+ )
+ assert (
+ mock_emitter.method_calls[11][1][0].aspectName
+ == "dataProcessInstanceRelationships"
+ )
+ assert (
+ mock_emitter.method_calls[11][1][0].entityUrn
+ == "urn:li:dataProcessInstance:56231547bcc2781e0c14182ceab6c9ac"
+ )
+ assert (
+ mock_emitter.method_calls[12][1][0].aspectName == "dataProcessInstanceRunEvent"
+ )
+ assert (
+ mock_emitter.method_calls[12][1][0].entityUrn
+ == "urn:li:dataProcessInstance:56231547bcc2781e0c14182ceab6c9ac"
+ )
+
+ # DataJob assertions for extract
+ assert mock_emitter.method_calls[13][1][0].aspectName == "dataJobInfo"
+ assert (
+ mock_emitter.method_calls[13][1][0].entityUrn
+ == f"urn:li:dataJob:({expected_dataflow_urn},__main__.extract)"
+ )
+ assert mock_emitter.method_calls[14][1][0].aspectName == "status"
+ assert (
+ mock_emitter.method_calls[14][1][0].entityUrn
+ == f"urn:li:dataJob:({expected_dataflow_urn},__main__.extract)"
+ )
+ assert mock_emitter.method_calls[15][1][0].aspectName == "dataJobInputOutput"
+ assert (
+ mock_emitter.method_calls[15][1][0].entityUrn
+ == f"urn:li:dataJob:({expected_dataflow_urn},__main__.extract)"
+ )
+ assert mock_emitter.method_calls[16][1][0].aspectName == "ownership"
+ assert (
+ mock_emitter.method_calls[16][1][0].entityUrn
+ == f"urn:li:dataJob:({expected_dataflow_urn},__main__.extract)"
+ )
+ assert mock_emitter.method_calls[17][1][0].aspectName == "globalTags"
+ assert (
+ mock_emitter.method_calls[17][1][0].entityUrn
+ == f"urn:li:dataJob:({expected_dataflow_urn},__main__.extract)"
+ )
+ assert mock_emitter.method_calls[18][1][0].aspectName == "browsePaths"
+ assert (
+ mock_emitter.method_calls[18][1][0].entityUrn
+ == f"urn:li:dataJob:({expected_dataflow_urn},__main__.extract)"
+ )
+
+ # DataProcessInstance assertions for extract
+ assert (
+ mock_emitter.method_calls[19][1][0].aspectName
+ == "dataProcessInstanceProperties"
+ )
+ assert (
+ mock_emitter.method_calls[19][1][0].entityUrn
+ == "urn:li:dataProcessInstance:b048ba729c1403f229a0760f8765d691"
+ )
+ assert (
+ mock_emitter.method_calls[20][1][0].aspectName
+ == "dataProcessInstanceRelationships"
+ )
+ assert (
+ mock_emitter.method_calls[20][1][0].entityUrn
+ == "urn:li:dataProcessInstance:b048ba729c1403f229a0760f8765d691"
+ )
+ assert (
+ mock_emitter.method_calls[21][1][0].aspectName == "dataProcessInstanceRunEvent"
+ )
+ assert (
+ mock_emitter.method_calls[21][1][0].entityUrn
+ == "urn:li:dataProcessInstance:b048ba729c1403f229a0760f8765d691"
+ )
+ assert (
+ mock_emitter.method_calls[22][1][0].aspectName == "dataProcessInstanceRunEvent"
+ )
+ assert (
+ mock_emitter.method_calls[22][1][0].entityUrn
+ == "urn:li:dataProcessInstance:b048ba729c1403f229a0760f8765d691"
+ )
+
+ # DataJob assertions for load
+ assert mock_emitter.method_calls[23][1][0].aspectName == "dataJobInfo"
+ assert (
+ mock_emitter.method_calls[23][1][0].entityUrn
+ == f"urn:li:dataJob:({expected_dataflow_urn},__main__.load)"
+ )
+ assert mock_emitter.method_calls[24][1][0].aspectName == "status"
+ assert (
+ mock_emitter.method_calls[24][1][0].entityUrn
+ == f"urn:li:dataJob:({expected_dataflow_urn},__main__.load)"
+ )
+ assert mock_emitter.method_calls[25][1][0].aspectName == "dataJobInputOutput"
+ assert (
+ mock_emitter.method_calls[25][1][0].entityUrn
+ == f"urn:li:dataJob:({expected_dataflow_urn},__main__.load)"
+ )
+ assert mock_emitter.method_calls[26][1][0].aspectName == "ownership"
+ assert (
+ mock_emitter.method_calls[26][1][0].entityUrn
+ == f"urn:li:dataJob:({expected_dataflow_urn},__main__.load)"
+ )
+ assert mock_emitter.method_calls[27][1][0].aspectName == "globalTags"
+ assert (
+ mock_emitter.method_calls[27][1][0].entityUrn
+ == f"urn:li:dataJob:({expected_dataflow_urn},__main__.load)"
+ )
+ assert mock_emitter.method_calls[28][1][0].aspectName == "browsePaths"
+ assert (
+ mock_emitter.method_calls[28][1][0].entityUrn
+ == f"urn:li:dataJob:({expected_dataflow_urn},__main__.load)"
+ )
+
+ # DataProcessInstance assertions for load
+ assert (
+ mock_emitter.method_calls[29][1][0].aspectName
+ == "dataProcessInstanceProperties"
+ )
+ assert (
+ mock_emitter.method_calls[29][1][0].entityUrn
+ == "urn:li:dataProcessInstance:e7df9fe09bb4da19687b8199e5ee5038"
+ )
+ assert (
+ mock_emitter.method_calls[30][1][0].aspectName
+ == "dataProcessInstanceRelationships"
+ )
+ assert (
+ mock_emitter.method_calls[30][1][0].entityUrn
+ == "urn:li:dataProcessInstance:e7df9fe09bb4da19687b8199e5ee5038"
+ )
+ assert (
+ mock_emitter.method_calls[31][1][0].aspectName == "dataProcessInstanceRunEvent"
+ )
+ assert (
+ mock_emitter.method_calls[31][1][0].entityUrn
+ == "urn:li:dataProcessInstance:e7df9fe09bb4da19687b8199e5ee5038"
+ )
+ assert (
+ mock_emitter.method_calls[32][1][0].aspectName == "dataProcessInstanceRunEvent"
+ )
+ assert (
+ mock_emitter.method_calls[32][1][0].entityUrn
+ == "urn:li:dataProcessInstance:e7df9fe09bb4da19687b8199e5ee5038"
+ )
+
+ # DataJob assertions for transform
+ assert mock_emitter.method_calls[33][1][0].aspectName == "dataJobInfo"
+ assert (
+ mock_emitter.method_calls[33][1][0].entityUrn
+ == f"urn:li:dataJob:({expected_dataflow_urn},__main__.transform)"
+ )
+ assert mock_emitter.method_calls[34][1][0].aspectName == "status"
+ assert (
+ mock_emitter.method_calls[34][1][0].entityUrn
+ == f"urn:li:dataJob:({expected_dataflow_urn},__main__.transform)"
+ )
+ assert mock_emitter.method_calls[35][1][0].aspectName == "dataJobInputOutput"
+ assert (
+ mock_emitter.method_calls[35][1][0].entityUrn
+ == f"urn:li:dataJob:({expected_dataflow_urn},__main__.transform)"
+ )
+ assert mock_emitter.method_calls[36][1][0].aspectName == "ownership"
+ assert (
+ mock_emitter.method_calls[36][1][0].entityUrn
+ == f"urn:li:dataJob:({expected_dataflow_urn},__main__.transform)"
+ )
+ assert mock_emitter.method_calls[37][1][0].aspectName == "globalTags"
+ assert (
+ mock_emitter.method_calls[37][1][0].entityUrn
+ == f"urn:li:dataJob:({expected_dataflow_urn},__main__.transform)"
+ )
+ assert (
+ mock_emitter.method_calls[37][1][0].aspect.tags[0].tag
+ == f"urn:li:tag:{task_run_ctx.task.tags[0]}"
+ )
+ assert mock_emitter.method_calls[38][1][0].aspectName == "browsePaths"
+ assert (
+ mock_emitter.method_calls[38][1][0].entityUrn
+ == f"urn:li:dataJob:({expected_dataflow_urn},__main__.transform)"
+ )
+
+ # DataProcessInstance assertions for transform
+ assert (
+ mock_emitter.method_calls[39][1][0].aspectName
+ == "dataProcessInstanceProperties"
+ )
+ assert (
+ mock_emitter.method_calls[39][1][0].entityUrn
+ == "urn:li:dataProcessInstance:bfa255d4d1fba52d23a52c9de4f6d0a6"
+ )
+ assert (
+ mock_emitter.method_calls[40][1][0].aspectName
+ == "dataProcessInstanceRelationships"
+ )
+ assert (
+ mock_emitter.method_calls[40][1][0].entityUrn
+ == "urn:li:dataProcessInstance:bfa255d4d1fba52d23a52c9de4f6d0a6"
+ )
+ assert (
+ mock_emitter.method_calls[41][1][0].aspectName == "dataProcessInstanceRunEvent"
+ )
+ assert (
+ mock_emitter.method_calls[41][1][0].entityUrn
+ == "urn:li:dataProcessInstance:bfa255d4d1fba52d23a52c9de4f6d0a6"
+ )
+ assert (
+ mock_emitter.method_calls[42][1][0].aspectName == "dataProcessInstanceRunEvent"
+ )
+ assert (
+ mock_emitter.method_calls[42][1][0].entityUrn
+ == "urn:li:dataProcessInstance:bfa255d4d1fba52d23a52c9de4f6d0a6"
+ )
diff --git a/metadata-ingestion/developing.md b/metadata-ingestion/developing.md
index b37c4e5ad96738..9293fc7a369dc7 100644
--- a/metadata-ingestion/developing.md
+++ b/metadata-ingestion/developing.md
@@ -69,6 +69,16 @@ source venv/bin/activate
datahub version # should print "DataHub CLI version: unavailable (installed in develop mode)"
```
+### (Optional) Set up your Python environment for developing on Prefect Plugin
+From the repository root:
+
+```shell
+cd metadata-ingestion-modules/prefect-plugin
+../../gradlew :metadata-ingestion-modules:prefect-plugin:installDev
+source venv/bin/activate
+datahub version # should print "DataHub CLI version: unavailable (installed in develop mode)"
+```
+
### (Optional) Set up your Python environment for developing on GX Plugin
From the repository root:
@@ -276,4 +286,4 @@ tox -- --update-golden-files
# Update golden files for a specific environment.
tox -e py310-airflow26 -- --update-golden-files
-```
+```
\ No newline at end of file
diff --git a/metadata-service/war/src/main/resources/boot/data_platforms.json b/metadata-service/war/src/main/resources/boot/data_platforms.json
index 4830311996fd94..f9288c843cd9dd 100644
--- a/metadata-service/war/src/main/resources/boot/data_platforms.json
+++ b/metadata-service/war/src/main/resources/boot/data_platforms.json
@@ -68,6 +68,16 @@
"logoUrl": "/assets/platforms/couchbaselogo.png"
}
},
+ {
+ "urn": "urn:li:dataPlatform:dagster",
+ "aspect": {
+ "datasetNameDelimiter": "/",
+ "name": "dagster",
+ "displayName": "Dagster",
+ "type": "OTHERS",
+ "logoUrl": "/assets/platforms/dagsterlogo.png"
+ }
+ },
{
"urn": "urn:li:dataPlatform:external",
"aspect": {
@@ -247,6 +257,16 @@
"logoUrl": "/assets/platforms/postgreslogo.png"
}
},
+ {
+ "urn": "urn:li:dataPlatform:presto",
+ "aspect": {
+ "datasetNameDelimiter": ".",
+ "name": "prefect",
+ "displayName": "Prefect",
+ "type": "OTHERS",
+ "logoUrl": "/assets/platforms/prefectlogo.png"
+ }
+ },
{
"urn": "urn:li:dataPlatform:presto",
"aspect": {
diff --git a/settings.gradle b/settings.gradle
index 899ca8f6f869b5..fa1fdb9f1a67ce 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -63,6 +63,7 @@ include 'ingestion-scheduler'
include 'metadata-ingestion-modules:airflow-plugin'
include 'metadata-ingestion-modules:gx-plugin'
include 'metadata-ingestion-modules:dagster-plugin'
+include 'metadata-ingestion-modules:prefect-plugin'
include 'smoke-test'
include 'metadata-auth:auth-api'
include 'metadata-service:schema-registry-api'