Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support using DatasetAlias and fix orphaning unreferenced dataset #1217

Merged
merged 9 commits into from
Sep 30, 2024

Conversation

tatiana
Copy link
Collaborator

@tatiana tatiana commented Sep 23, 2024

Context

Cosmos versions between 1.1 and 1.6 supported automatically emitting Airflow Datasets when using ExecutionMode.LOCAL. Although the datasets generated by these versions of Cosmos could be utilised for dataset-aware scheduling, the implementation had long-standing issues, as described in #522.

The main problems were:

  • Orphaning unreferenced dataset
  • Not displaying dataset inlets/outlets in the Airflow UI

These issues were caused by Cosmos defining task outlets and inlets during Airflow task execution, a feature only partially supported before Airflow 2.10: apache/airflow#34206.

Solution

Airflow 2.10 has introduced the concept of DatasetAlias, as described in the official docs, so operators can dynamically define inlets and outlets during task execution.

This PR uses Airflow DatasetAlias, when possible (Airflow 2.10 or above), and does two things:

  1. Adds a DatasetAlias to every LocalOperator/VirtualenvOperator subclass
  2. Dynamically adds Dataset as outlets during the LocalOperator subclasses execution, associating them to the desired Dataset instance.
  3. Exposes to users a function to retrieve Cosmos' DatasetAlias names, programatically

Caveats

  • Only works for Airflow 2.10 and above

This feature relies on DatasetAlias, only available in Airflow 2.10 and above. If users use previous versions of Airflow, Cosmos behaves like it did before, and the issues described in this task are not solved.

  • Unable to leverage DatasetAlias in airflow dags test

Although the feature described in this PR works well when scheduling DAGs, triggering them via the UI, or using airflow dags trigger, it does not work when users attempt to use dags test or dag.test(). When trying to test the DAG, these commands fail with an sqlalchemy.orm. etc.FlushError. This is a known issue from Airflow 2.10.0 and Airflow 2.10.1 when declaring DatasetAliases, as described in apache/airflow#42495.

To mitigate this second problem, we've introduced a new Airflow variable, AIRFLOW__COSMOS__ENABLE_DATASET_ALIAS, that allows users to disable using dataset aliases when running Cosmos. We'd recommend users who face the sqlalchemy.orm. etc.FlushError in their tests to set this configuration to False only for running tests - until the issue is solved in Airflow. When this configuration is set to False, Cosmos behaves as before the DatasetAlias feature was introduced.

How this feature was validated

Manually triggering this DAG bq_profile_example:

from datetime import datetime

from cosmos import DbtDag, ExecutionConfig, ProjectConfig, ProfileConfig, RenderConfig
from cosmos.constants import TestBehavior, LoadMode

from include.constants import jaffle_shop_path, venv_execution_config

project_config = ProjectConfig(
    dbt_project_path=jaffle_shop_path,
    #partial_parse=True,
    #dbt_vars=self.dbt_configuration,
    #env_vars = self.param_handler.env_vars
)

profile_config = ProfileConfig(
    # these map to dbt/jaffle_shop/profiles.yml
    profile_name="airflow_db",
    target_name="bq",
    profiles_yml_filepath=jaffle_shop_path / "profiles.yml",
)

dbt_profile_example = DbtDag(
    project_config=project_config,
    profile_config=profile_config,
    render_config=render_config,
    execution_config=venv_execution_config,
    schedule_interval="@daily",
    start_date=datetime(2023, 1, 1),
    catchup=False,
    dag_id="bq_profile_example",
    tags=["profiles"],
)

And seeing that the dataset-scheduled DAG dataset_triggered_dag dependent on the first was triggered:

from datetime import datetime

from airflow import DAG
from airflow.datasets import Dataset
from airflow.operators.empty import EmptyOperator


with DAG(
    "dataset_triggered_dag",
    description="A DAG that should be triggered via Dataset/Dataset alias",
    start_date=datetime(2024, 9, 1),
    schedule=[Dataset(uri='bigquery/observability-sandbox-344122.moonactive.stg_customers')],
) as dag:
    t1 = EmptyOperator(
            task_id="task_1",
        )
    t2 = EmptyOperator(
            task_id="task_2",
        )
    t3 = EmptyOperator(
            task_id="task_3",
        )
    
    t1 >> t2 >> t3

Checking Airflow UI:

Screenshot 2024-09-30 at 22 23 23
Screenshot 2024-09-30 at 22 23 51
Screenshot 2024-09-30 at 22 24 19
Screenshot 2024-09-30 at 22 24 34

Related tickets

Closes: #522
Closes: #1119

Copy link

netlify bot commented Sep 23, 2024

Deploy Preview for sunny-pastelito-5ecb04 ready!

Name Link
🔨 Latest commit 43c9ff0
🔍 Latest deploy log https://app.netlify.com/sites/sunny-pastelito-5ecb04/deploys/66fae7d4aa6e3d0008a90988
😎 Deploy Preview https://deploy-preview-1217--sunny-pastelito-5ecb04.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify site configuration.

cosmos/core/airflow.py Outdated Show resolved Hide resolved
cosmos/operators/local.py Outdated Show resolved Hide resolved
@tatiana tatiana changed the title WIP: Support dataset aliases Support dataset aliases and improve dataset experience Sep 30, 2024
@tatiana tatiana changed the title Support dataset aliases and improve dataset experience Support DatasetAlias and fix orphaning unreferenced dataset Sep 30, 2024
@tatiana tatiana changed the title Support DatasetAlias and fix orphaning unreferenced dataset Support using DatasetAlias and fix orphaning unreferenced dataset Sep 30, 2024
Copy link

codecov bot commented Sep 30, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 95.82%. Comparing base (e0a9fd3) to head (43c9ff0).
Report is 1 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1217      +/-   ##
==========================================
+ Coverage   95.78%   95.82%   +0.04%     
==========================================
  Files          65       66       +1     
  Lines        3745     3783      +38     
==========================================
+ Hits         3587     3625      +38     
  Misses        158      158              
Flag Coverage Δ
95.82% <100.00%> (+0.04%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@tatiana tatiana marked this pull request as ready for review September 30, 2024 12:54
@dosubot dosubot bot added size:L This PR changes 100-499 lines, ignoring generated files. area:datasets Related to the Airflow datasets feature/module execution:local Related to Local execution environment labels Sep 30, 2024
Copy link
Contributor

@pankajkoti pankajkoti left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, excited for this one! 👏🏽

cosmos/operators/local.py Outdated Show resolved Hide resolved
@dosubot dosubot bot added the lgtm This PR has been approved by a maintainer label Sep 30, 2024
@tatiana tatiana merged commit 2febf3f into main Sep 30, 2024
68 checks passed
@tatiana tatiana deleted the dataset-alias branch September 30, 2024 19:54
@tatiana tatiana added this to the Cosmos 1.7.0 milestone Sep 30, 2024
tatiana added a commit that referenced this pull request Oct 4, 2024
New Features

* Introduction of experimental support to run dbt BQ models using Airflow deferrable operators by @pankajkoti @pankajastro @tatiana in #1224 #1230.
  This is a first step in this journey and we would really appreciate feedback from the community.

  For more information, check the documentation: https://astronomer.github.io/astronomer-cosmos/getting_started/execution-modes.html#airflow-async-experimental

  This work has been inspired by the talk "Airflow at Monzo: Evolving our data platform as the bank scales" by
  @jonathanrainer @ed-sparkes given at Airflow Summit 2023: https://airflowsummit.org/sessions/2023/airflow-at-monzo-evolving-our-data-platform-as-the-bank-scales/.

* Support using ``DatasetAlias`` and fix orphaning unreferenced dataset by @tatiana in #1217 #1240

  Documentation: https://astronomer.github.io/astronomer-cosmos/configuration/scheduling.html#data-aware-scheduling

* Add GCP_CLOUD_RUN_JOB execution mode by @ags-de #1153

  Learn more about it: https://astronomer.github.io/astronomer-cosmos/getting_started/gcp-cloud-run-job.html

Enhancements

* Create single virtualenv when ``DbtVirtualenvBaseOperator`` has ``virtualenv_dir=None`` and ``is_virtualenv_dir_temporary=True`` by @kesompochy in #1200
* Consistently handle build and imports in ``cosmos/__init__.py`` by @tatiana in #1215
* Add enum constants to init for direct import by @fabiomx in #1184

Bug fixes

* URL encode dataset names to support multibyte characters by @t0momi219 in #1198
* Fix invalid argument (``full_refresh``) passed to DbtTestAwsEksOperator (and others) by @johnhoran in #1175
* Fix ``printer_width`` arg type in ``DbtProfileConfigVars`` by @jessicaschueler in #1191
* Fix task owner fallback by @jmaicher in #1195

Docs

* Add scarf to readme and docs for website analytics by @cmarteepants in #1221
* Add ``virtualenv_dir`` param to ``ExecutionConfig`` docs by @pankajkoti in #1173
* Give credits to @LennartKloppenburg in CHANGELOG.rst by @tatiana #1174
* Refactor docs for async mode execution by @pankajkoti in #1241

Others

* Remove PR branch added for testing a change in CI in #1224 by @pankajkoti in #1233
* Fix CI wrt broken coverage upload artifact @pankajkoti in #1210
* Fix CI issues - Upgrade actions/upload-artifact & actions/download-artifact to v4 and set min version for packaging by @pankajkoti in #1208
* Resolve CI failures for Apache Airflow 2.7 jobs by @pankajkoti in #1182
* CI: Update GCP manifest file path based on new secret update by @pankajkoti in #1237
* Pre-commit hook updates in #1176 #1186, #1186, #1201, #1219, #1231
tatiana added a commit that referenced this pull request Oct 4, 2024
New Features

* Introduction of experimental support to run dbt BQ models using Airflow deferrable operators by @pankajkoti @pankajastro @tatiana in #1224 #1230.
  This is a first step in this journey and we would really appreciate feedback from the community.

  For more information, check the documentation: https://astronomer.github.io/astronomer-cosmos/getting_started/execution-modes.html#airflow-async-experimental

  This work has been inspired by the talk "Airflow at Monzo: Evolving our data platform as the bank scales" by
  @jonathanrainer @ed-sparkes given at Airflow Summit 2023: https://airflowsummit.org/sessions/2023/airflow-at-monzo-evolving-our-data-platform-as-the-bank-scales/.

* Support using ``DatasetAlias`` and fix orphaning unreferenced dataset by @tatiana in #1217 #1240

  Documentation: https://astronomer.github.io/astronomer-cosmos/configuration/scheduling.html#data-aware-scheduling

* Add GCP_CLOUD_RUN_JOB execution mode by @ags-de #1153

  Learn more about it: https://astronomer.github.io/astronomer-cosmos/getting_started/gcp-cloud-run-job.html

Enhancements

* Create single virtualenv when ``DbtVirtualenvBaseOperator`` has ``virtualenv_dir=None`` and ``is_virtualenv_dir_temporary=True`` by @kesompochy in #1200
* Consistently handle build and imports in ``cosmos/__init__.py`` by @tatiana in #1215
* Add enum constants to init for direct import by @fabiomx in #1184

Bug fixes

* URL encode dataset names to support multibyte characters by @t0momi219 in #1198
* Fix invalid argument (``full_refresh``) passed to DbtTestAwsEksOperator (and others) by @johnhoran in #1175
* Fix ``printer_width`` arg type in ``DbtProfileConfigVars`` by @jessicaschueler in #1191
* Fix task owner fallback by @jmaicher in #1195

Docs

* Add scarf to readme and docs for website analytics by @cmarteepants in #1221
* Add ``virtualenv_dir`` param to ``ExecutionConfig`` docs by @pankajkoti in #1173
* Give credits to @LennartKloppenburg in CHANGELOG.rst by @tatiana #1174
* Refactor docs for async mode execution by @pankajkoti in #1241

Others

* Remove PR branch added for testing a change in CI in #1224 by @pankajkoti in #1233
* Fix CI wrt broken coverage upload artifact @pankajkoti in #1210
* Fix CI issues - Upgrade actions/upload-artifact & actions/download-artifact to v4 and set min version for packaging by @pankajkoti in #1208
* Resolve CI failures for Apache Airflow 2.7 jobs by @pankajkoti in #1182
* CI: Update GCP manifest file path based on new secret update by @pankajkoti in #1237
* Pre-commit hook updates in #1176 #1186, #1186, #1201, #1219, #1231
tatiana added a commit that referenced this pull request Oct 4, 2024
**New Features**

* Support using ``DatasetAlias`` and fix orphaning unreferenced dataset
by @tatiana in #1217 #1240

Documentation:
https://astronomer.github.io/astronomer-cosmos/configuration/scheduling.html#data-aware-scheduling

* Add GCP_CLOUD_RUN_JOB execution mode by @ags-de #1153

Learn more about it:
https://astronomer.github.io/astronomer-cosmos/getting_started/gcp-cloud-run-job.html

* Introduction of experimental support to run dbt BQ models using
Airflow deferrable operators by @pankajkoti @pankajastro @tatiana in
#1224 #1230.

This is the first step in the journey of running dbt resources with
native Airflow, and we would appreciate feedback from the community.

For more information, check the documentation:
https://astronomer.github.io/astronomer-cosmos/getting_started/execution-modes.html#airflow-async-experimental

This work has been inspired by the talk "Airflow at Monzo: Evolving our
data platform as the bank scales" by
@jonathanrainer @ed-sparkes given at Airflow Summit 2023:
https://airflowsummit.org/sessions/2023/airflow-at-monzo-evolving-our-data-platform-as-the-bank-scales/.


**Enhancements**

* Create single virtualenv when ``DbtVirtualenvBaseOperator`` has
``virtualenv_dir=None`` and ``is_virtualenv_dir_temporary=True`` by
@kesompochy in #1200
* Consistently handle build and imports in ``cosmos/__init__.py`` by
@tatiana in #1215
* Add enum constants to init for direct import by @fabiomx in #1184

**Bug fixes**

* URL encode dataset names to support multibyte characters by @t0momi219
in #1198
* Fix invalid argument (``full_refresh``) passed to
DbtTestAwsEksOperator (and others) by @johnhoran in #1175
* Fix ``printer_width`` arg type in ``DbtProfileConfigVars`` by
@jessicaschueler in #1191
* Fix task owner fallback by @jmaicher in #1195

**Docs**

* Add scarf to readme and docs for website analytics by @cmarteepants in
#1221
* Add ``virtualenv_dir`` param to ``ExecutionConfig`` docs by
@pankajkoti in #1173
* Give credits to @LennartKloppenburg in CHANGELOG.rst by @tatiana #1174
* Refactor docs for async mode execution by @pankajkoti in #1241

Others

* Remove PR branch added for testing a change in CI in #1224 by
@pankajkoti in #1233
* Fix CI wrt broken coverage upload artifact @pankajkoti in #1210
* Fix CI issues - Upgrade actions/upload-artifact &
actions/download-artifact to v4 and set min version for packaging by
@pankajkoti in #1208
* Resolve CI failures for Apache Airflow 2.7 jobs by @pankajkoti in
#1182
* CI: Update GCP manifest file path based on new secret update by
@pankajkoti in #1237
* Pre-commit hook updates in #1176 #1186, #1186, #1201, #1219, #1231

---------

Co-authored-by: Pankaj Koti <pankajkoti699@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:datasets Related to the Airflow datasets feature/module execution:local Related to Local execution environment lgtm This PR has been approved by a maintainer size:L This PR changes 100-499 lines, ignoring generated files.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Dataset Aliases Airflow orphaning unreferenced Dataset URIs in Cosmos 1.1
3 participants