Skip to content

Commit

Permalink
Add tests and integration test example DAG for remote manifest files
Browse files Browse the repository at this point in the history
  • Loading branch information
pankajkoti committed Jul 22, 2024
1 parent 2f3f258 commit 57a5820
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 18 deletions.
12 changes: 12 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ jobs:
AIRFLOW__COSMOS__ENABLE_CACHE_DBT_LS: 0
AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/
AIRFLOW_CONN_EXAMPLE_CONN: postgres://postgres:postgres@0.0.0.0:5432/postgres
AIRFLOW_CONN_AWS_S3_CONN: ${{ secrets.AIRFLOW_CONN_AWS_S3_CONN }}
AIRFLOW_CONN_GCP_GS_CONN: ${{ secrets.AIRFLOW_CONN_GCP_GS_CONN }}
AIRFLOW_CONN_AZURE_ABFS_CONN: ${{ secrets.AIRFLOW_CONN_AZURE_ABFS_CONN }}
DATABRICKS_HOST: mock
DATABRICKS_WAREHOUSE_ID: mock
DATABRICKS_TOKEN: mock
Expand Down Expand Up @@ -213,6 +216,9 @@ jobs:
env:
AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/
AIRFLOW_CONN_EXAMPLE_CONN: postgres://postgres:postgres@0.0.0.0:5432/postgres
AIRFLOW_CONN_AWS_S3_CONN: ${{ secrets.AIRFLOW_CONN_AWS_S3_CONN }}
AIRFLOW_CONN_GCP_GS_CONN: ${{ secrets.AIRFLOW_CONN_GCP_GS_CONN }}
AIRFLOW_CONN_AZURE_ABFS_CONN: ${{ secrets.AIRFLOW_CONN_AZURE_ABFS_CONN }}
PYTHONPATH: /home/runner/work/astronomer-cosmos/astronomer-cosmos/:$PYTHONPATH
AIRFLOW_CONN_DATABRICKS_DEFAULT: ${{ secrets.AIRFLOW_CONN_DATABRICKS_DEFAULT }}
DATABRICKS_CLUSTER_ID: ${{ secrets.DATABRICKS_CLUSTER_ID }}
Expand Down Expand Up @@ -275,6 +281,9 @@ jobs:
env:
AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/
AIRFLOW_CONN_EXAMPLE_CONN: postgres://postgres:postgres@0.0.0.0:5432/postgres
AIRFLOW_CONN_AWS_S3_CONN: ${{ secrets.AIRFLOW_CONN_AWS_S3_CONN }}
AIRFLOW_CONN_GCP_GS_CONN: ${{ secrets.AIRFLOW_CONN_GCP_GS_CONN }}
AIRFLOW_CONN_AZURE_ABFS_CONN: ${{ secrets.AIRFLOW_CONN_AZURE_ABFS_CONN }}
AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT: 90.0
PYTHONPATH: /home/runner/work/astronomer-cosmos/astronomer-cosmos/:$PYTHONPATH
COSMOS_CONN_POSTGRES_PASSWORD: ${{ secrets.COSMOS_CONN_POSTGRES_PASSWORD }}
Expand Down Expand Up @@ -348,6 +357,9 @@ jobs:
env:
AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/
AIRFLOW_CONN_EXAMPLE_CONN: postgres://postgres:postgres@0.0.0.0:5432/postgres
AIRFLOW_CONN_AWS_S3_CONN: ${{ secrets.AIRFLOW_CONN_AWS_S3_CONN }}
AIRFLOW_CONN_GCP_GS_CONN: ${{ secrets.AIRFLOW_CONN_GCP_GS_CONN }}
AIRFLOW_CONN_AZURE_ABFS_CONN: ${{ secrets.AIRFLOW_CONN_AZURE_ABFS_CONN }}
AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT: 90.0
PYTHONPATH: /home/runner/work/astronomer-cosmos/astronomer-cosmos/:$PYTHONPATH
COSMOS_CONN_POSTGRES_PASSWORD: ${{ secrets.COSMOS_CONN_POSTGRES_PASSWORD }}
Expand Down
4 changes: 2 additions & 2 deletions cosmos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import aenum
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.google.cloud.hooks.gcs import GCSHook
from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeHook
from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
from packaging.version import Version

DBT_PROFILE_PATH = Path(os.path.expanduser("~")).joinpath(".dbt/profiles.yml")
Expand Down Expand Up @@ -38,7 +38,7 @@
FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP = {
S3_FILE_SCHEME: S3Hook.default_conn_name,
GS_FILE_SCHEME: GCSHook.default_conn_name,
ABFS_FILE_SCHEME: AzureDataLakeHook.default_conn_name,
ABFS_FILE_SCHEME: WasbHook.default_conn_name,
}


Expand Down
2 changes: 1 addition & 1 deletion cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,7 @@ def load_from_dbt_manifest(self) -> None:
nodes = {}

if TYPE_CHECKING:
assert self.project.manifest_path is not None
assert self.project.manifest_path is not None # pragma: no cover

with self.project.manifest_path.open() as fp:
manifest = json.load(fp)
Expand Down
97 changes: 82 additions & 15 deletions dev/dags/cosmos_manifest_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,17 @@
from datetime import datetime
from pathlib import Path

from cosmos import DbtDag, ExecutionConfig, LoadMode, ProfileConfig, ProjectConfig, RenderConfig
from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator

from cosmos import DbtTaskGroup, ExecutionConfig, LoadMode, ProfileConfig, ProjectConfig, RenderConfig
from cosmos.profiles import DbtProfileConfigVars, PostgresUserPasswordProfileMapping

DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))

execution_config = ExecutionConfig(dbt_project_path=DBT_ROOT_PATH / "jaffle_shop")

profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
Expand All @@ -22,22 +27,84 @@
),
)

# [START local_example]
cosmos_manifest_example = DbtDag(
# dbt/cosmos-specific parameters
project_config=ProjectConfig(
manifest_path=DBT_ROOT_PATH / "jaffle_shop" / "target" / "manifest.json",
project_name="jaffle_shop",
),
profile_config=profile_config,
render_config=RenderConfig(load_method=LoadMode.DBT_MANIFEST, select=["path:seeds/raw_customers.csv"]),
execution_config=ExecutionConfig(dbt_project_path=DBT_ROOT_PATH / "jaffle_shop"),
operator_args={"install_deps": True},
# normal dag parameters
render_config = RenderConfig(load_method=LoadMode.DBT_MANIFEST, select=["path:seeds/raw_customers.csv"])


@dag(
schedule_interval="@daily",
start_date=datetime(2023, 1, 1),
catchup=False,
dag_id="cosmos_manifest_example",
default_args={"retries": 2},
)
# [END local_example]
def cosmos_manifest_example() -> None:

pre_dbt = EmptyOperator(task_id="pre_dbt")

# [START local_example]
local_example = DbtTaskGroup(
group_id="local_example",
project_config=ProjectConfig(
manifest_path=DBT_ROOT_PATH / "jaffle_shop" / "target" / "manifest.json",
project_name="jaffle_shop",
),
profile_config=profile_config,
render_config=render_config,
execution_config=execution_config,
operator_args={"install_deps": True},
)
# [END local_example]

# [START aws_s3_example]
aws_s3_example = DbtTaskGroup(
group_id="aws_s3_example",
project_config=ProjectConfig(
manifest_path="s3://cosmos-manifest-test/manifest.json",
manifest_conn_id="aws_s3_conn",
# `manifest_conn_id` is optional. If not provided, the default connection ID `aws_default` is used.
project_name="jaffle_shop",
),
profile_config=profile_config,
render_config=render_config,
execution_config=execution_config,
operator_args={"install_deps": True},
)
# [END aws_s3_example]

# [START gcp_gs_example]
gcp_gs_example = DbtTaskGroup(
group_id="gcp_gs_example",
project_config=ProjectConfig(
manifest_path="gs://cosmos-manifest-test/manifest.json",
manifest_conn_id="gcp_gs_conn",
# `manifest_conn_id` is optional. If not provided, the default connection ID `google_cloud_default` is used.
project_name="jaffle_shop",
),
profile_config=profile_config,
render_config=render_config,
execution_config=execution_config,
operator_args={"install_deps": True},
)
# [END gcp_gs_example]

# [START azure_abfs_example]
azure_abfs_example = DbtTaskGroup(
group_id="azure_abfs_example",
project_config=ProjectConfig(
manifest_path="abfs://cosmos-manifest-test/manifest.json",
manifest_conn_id="azure_abfs_conn",
# `manifest_conn_id` is optional. If not provided, the default connection ID `wasb_default` is used.
project_name="jaffle_shop",
),
profile_config=profile_config,
render_config=render_config,
execution_config=execution_config,
operator_args={"install_deps": True},
)
# [END azure_abfs_example]

post_dbt = EmptyOperator(task_id="post_dbt")

(pre_dbt >> local_example >> aws_s3_example >> gcp_gs_example >> azure_abfs_example >> post_dbt)


cosmos_manifest_example()
33 changes: 33 additions & 0 deletions tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,3 +298,36 @@ def test_execution_config_with_invocation_option(execution_mode, invocation_mode
def test_execution_config_default_config(execution_mode, expected_invocation_mode):
execution_config = ExecutionConfig(execution_mode=execution_mode)
assert execution_config.invocation_mode == expected_invocation_mode


@pytest.mark.parametrize(
"manifest_path, given_manifest_conn_id, used_manifest_conn_id",
[
("s3://cosmos-manifest-test/manifest.json", None, "aws_default"),
("s3://cosmos-manifest-test/manifest.json", "aws_s3_conn", "aws_s3_conn"),
("gs://cosmos-manifest-test/manifest.json", None, "google_cloud_default"),
("gs://cosmos-manifest-test/manifest.json", "gcp_gs_conn", "gcp_gs_conn"),
("abfs://cosmos-manifest-test/manifest.json", None, "wasb_default"),
("abfs://cosmos-manifest-test/manifest.json", "azure_abfs_conn", "azure_abfs_conn"),
],
)
def test_remote_manifest_path(manifest_path, given_manifest_conn_id, used_manifest_conn_id):
if AIRFLOW_IO_AVAILABLE:
project_config = ProjectConfig(
dbt_project_path="/tmp/some-path", manifest_path=manifest_path, manifest_conn_id=given_manifest_conn_id
)

from airflow.io.path import ObjectStoragePath

assert project_config.manifest_path == ObjectStoragePath(manifest_path, conn_id=used_manifest_conn_id)
else:
from airflow.version import version as airflow_version

error_msg = (
f"The manifest path {manifest_path} uses a remote file scheme, but the required Object Storage feature is "
f"unavailable in Airflow version {airflow_version}. Please upgrade to Airflow 2.8 or later."
)
with pytest.raises(CosmosValueError, match=error_msg):
_ = ProjectConfig(
dbt_project_path="/tmp/some-path", manifest_path=manifest_path, manifest_conn_id=given_manifest_conn_id
)

0 comments on commit 57a5820

Please sign in to comment.