diff --git a/cosmos/constants.py b/cosmos/constants.py index 7562fe9bc..d512faf16 100644 --- a/cosmos/constants.py +++ b/cosmos/constants.py @@ -5,7 +5,7 @@ import aenum from airflow.providers.amazon.aws.hooks.s3 import S3Hook from airflow.providers.google.cloud.hooks.gcs import GCSHook -from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeHook +from airflow.providers.microsoft.azure.hooks.wasb import WasbHook from packaging.version import Version DBT_PROFILE_PATH = Path(os.path.expanduser("~")).joinpath(".dbt/profiles.yml") @@ -38,7 +38,7 @@ FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP = { S3_FILE_SCHEME: S3Hook.default_conn_name, GS_FILE_SCHEME: GCSHook.default_conn_name, - ABFS_FILE_SCHEME: AzureDataLakeHook.default_conn_name, + ABFS_FILE_SCHEME: WasbHook.default_conn_name, } diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index ec6d16bf1..f911cd31e 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -596,7 +596,7 @@ def load_from_dbt_manifest(self) -> None: nodes = {} if TYPE_CHECKING: - assert self.project.manifest_path is not None + assert self.project.manifest_path is not None # pragma: no cover with self.project.manifest_path.open() as fp: manifest = json.load(fp) diff --git a/dev/dags/cosmos_manifest_example.py b/dev/dags/cosmos_manifest_example.py index 225b38ddc..a96eff971 100644 --- a/dev/dags/cosmos_manifest_example.py +++ b/dev/dags/cosmos_manifest_example.py @@ -6,12 +6,17 @@ from datetime import datetime from pathlib import Path -from cosmos import DbtDag, ExecutionConfig, LoadMode, ProfileConfig, ProjectConfig, RenderConfig +from airflow.decorators import dag +from airflow.operators.empty import EmptyOperator + +from cosmos import DbtTaskGroup, ExecutionConfig, LoadMode, ProfileConfig, ProjectConfig, RenderConfig from cosmos.profiles import DbtProfileConfigVars, PostgresUserPasswordProfileMapping DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt" DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH)) +execution_config = ExecutionConfig(dbt_project_path=DBT_ROOT_PATH / "jaffle_shop") + profile_config = ProfileConfig( profile_name="default", target_name="dev", @@ -22,22 +27,84 @@ ), ) -# [START local_example] -cosmos_manifest_example = DbtDag( - # dbt/cosmos-specific parameters - project_config=ProjectConfig( - manifest_path=DBT_ROOT_PATH / "jaffle_shop" / "target" / "manifest.json", - project_name="jaffle_shop", - ), - profile_config=profile_config, - render_config=RenderConfig(load_method=LoadMode.DBT_MANIFEST, select=["path:seeds/raw_customers.csv"]), - execution_config=ExecutionConfig(dbt_project_path=DBT_ROOT_PATH / "jaffle_shop"), - operator_args={"install_deps": True}, - # normal dag parameters +render_config = RenderConfig(load_method=LoadMode.DBT_MANIFEST, select=["path:seeds/raw_customers.csv"]) + + +@dag( schedule_interval="@daily", start_date=datetime(2023, 1, 1), catchup=False, - dag_id="cosmos_manifest_example", default_args={"retries": 2}, ) -# [END local_example] +def cosmos_manifest_example() -> None: + + pre_dbt = EmptyOperator(task_id="pre_dbt") + + # [START local_example] + local_example = DbtTaskGroup( + group_id="local_example", + project_config=ProjectConfig( + manifest_path=DBT_ROOT_PATH / "jaffle_shop" / "target" / "manifest.json", + project_name="jaffle_shop", + ), + profile_config=profile_config, + render_config=render_config, + execution_config=execution_config, + operator_args={"install_deps": True}, + ) + # [END local_example] + + # [START aws_s3_example] + aws_s3_example = DbtTaskGroup( + group_id="aws_s3_example", + project_config=ProjectConfig( + manifest_path="s3://cosmos-manifest-test/manifest.json", + manifest_conn_id="aws_s3_conn", + # `manifest_conn_id` is optional. If not provided, the default connection ID `aws_default` is used. + project_name="jaffle_shop", + ), + profile_config=profile_config, + render_config=render_config, + execution_config=execution_config, + operator_args={"install_deps": True}, + ) + # [END aws_s3_example] + + # [START gcp_gs_example] + gcp_gs_example = DbtTaskGroup( + group_id="gcp_gs_example", + project_config=ProjectConfig( + manifest_path="gs://cosmos-manifest-test/manifest.json", + manifest_conn_id="gcp_gs_conn", + # `manifest_conn_id` is optional. If not provided, the default connection ID `google_cloud_default` is used. + project_name="jaffle_shop", + ), + profile_config=profile_config, + render_config=render_config, + execution_config=execution_config, + operator_args={"install_deps": True}, + ) + # [END gcp_gs_example] + + # [START azure_abfs_example] + azure_abfs_example = DbtTaskGroup( + group_id="azure_abfs_example", + project_config=ProjectConfig( + manifest_path="abfs://cosmos-manifest-test/manifest.json", + manifest_conn_id="azure_abfs_conn", + # `manifest_conn_id` is optional. If not provided, the default connection ID `wasb_default` is used. + project_name="jaffle_shop", + ), + profile_config=profile_config, + render_config=render_config, + execution_config=execution_config, + operator_args={"install_deps": True}, + ) + # [END azure_abfs_example] + + post_dbt = EmptyOperator(task_id="post_dbt") + + (pre_dbt >> local_example >> aws_s3_example >> gcp_gs_example >> azure_abfs_example >> post_dbt) + + +cosmos_manifest_example() diff --git a/tests/test_config.py b/tests/test_config.py index 162dde4fd..d557dd4fc 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -298,3 +298,36 @@ def test_execution_config_with_invocation_option(execution_mode, invocation_mode def test_execution_config_default_config(execution_mode, expected_invocation_mode): execution_config = ExecutionConfig(execution_mode=execution_mode) assert execution_config.invocation_mode == expected_invocation_mode + + +@pytest.mark.parametrize( + "manifest_path, given_manifest_conn_id, used_manifest_conn_id", + [ + ("s3://cosmos-manifest-test/manifest.json", None, "aws_default"), + ("s3://cosmos-manifest-test/manifest.json", "aws_s3_conn", "aws_s3_conn"), + ("gs://cosmos-manifest-test/manifest.json", None, "google_cloud_default"), + ("gs://cosmos-manifest-test/manifest.json", "gcp_gs_conn", "gcp_gs_conn"), + ("abfs://cosmos-manifest-test/manifest.json", None, "wasb_default"), + ("abfs://cosmos-manifest-test/manifest.json", "azure_abfs_conn", "azure_abfs_conn"), + ], +) +def test_remote_manifest_path(manifest_path, given_manifest_conn_id, used_manifest_conn_id): + if AIRFLOW_IO_AVAILABLE: + project_config = ProjectConfig( + dbt_project_path="/tmp/some-path", manifest_path=manifest_path, manifest_conn_id=given_manifest_conn_id + ) + + from airflow.io.path import ObjectStoragePath + + assert project_config.manifest_path == ObjectStoragePath(manifest_path, conn_id=used_manifest_conn_id) + else: + from airflow.version import version as airflow_version + + error_msg = ( + f"The manifest path {manifest_path} uses a remote file scheme, but the required Object Storage feature is " + f"unavailable in Airflow version {airflow_version}. Please upgrade to Airflow 2.8 or later." + ) + with pytest.raises(CosmosValueError, match=error_msg): + _ = ProjectConfig( + dbt_project_path="/tmp/some-path", manifest_path=manifest_path, manifest_conn_id=given_manifest_conn_id + )