From ac85e38985a91582f7d52b08b287d736207c0e93 Mon Sep 17 00:00:00 2001 From: Kevin Cameron Grismore Date: Mon, 2 Dec 2024 16:00:46 -0600 Subject: [PATCH 1/6] add from_profiles_yml --- .../prefect_dbt/cli/configs/base.py | 68 ++++++++++ .../prefect-dbt/prefect_dbt/utilities.py | 40 ++++++ .../tests/cli/configs/test_base.py | 119 ++++++++++++++++++ .../prefect-dbt/tests/test_utilities.py | 77 ++++++++++++ 4 files changed, 304 insertions(+) create mode 100644 src/integrations/prefect-dbt/prefect_dbt/utilities.py create mode 100644 src/integrations/prefect-dbt/tests/test_utilities.py diff --git a/src/integrations/prefect-dbt/prefect_dbt/cli/configs/base.py b/src/integrations/prefect-dbt/prefect_dbt/cli/configs/base.py index 4aff4a005c1c..44dbdb21cae9 100644 --- a/src/integrations/prefect-dbt/prefect_dbt/cli/configs/base.py +++ b/src/integrations/prefect-dbt/prefect_dbt/cli/configs/base.py @@ -7,6 +7,7 @@ from pydantic import BaseModel, Field from prefect.blocks.core import Block +from prefect_dbt.utilities import load_profiles_yml class DbtConfigs(Block, abc.ABC): @@ -147,6 +148,73 @@ class TargetConfigs(BaseTargetConfigs): _logo_url = "https://images.ctfassets.net/gm98wzqotmnx/5zE9lxfzBHjw3tnEup4wWL/9a001902ed43a84c6c96d23b24622e19/dbt-bit_tm.png?h=250" # noqa _documentation_url = "https://docs.prefect.io/integrations/prefect-dbt" # noqa + @classmethod + def from_profiles_yml( + cls, + profile_name: Optional[str] = None, + target_name: Optional[str] = None, + profiles_dir: Optional[str] = None, + allow_field_overrides: bool = False, + ) -> "TargetConfigs": + """ + Create a TargetConfigs instance from a dbt profiles.yml file. + + Args: + profile_name: Name of the profile to use from profiles.yml. + If None, uses the first profile. + target_name: Name of the target to use from the profile. + If None, uses the first target in the selected profile. + profiles_dir: Path to the directory containing profiles.yml. + If None, uses the default profiles directory. + + Returns: + A TargetConfigs instance populated from the profiles.yml target. + + Raises: + ValueError: If profiles.yml is not found or if profile/target is invalid + """ + profiles = load_profiles_yml(profiles_dir) + + # If no profile specified, use first non-config one + if profile_name is None: + for name in profiles: + if name != "config": + profile_name = name + break + elif profile_name not in profiles: + raise ValueError(f"Profile {profile_name} not found in profiles.yml") + + profile = profiles[profile_name] + if "outputs" not in profile: + raise ValueError(f"No outputs found in profile {profile_name}") + + outputs = profile["outputs"] + + # If no target specified, use first one + if target_name is None: + target_name = next(iter(outputs)) + elif target_name not in outputs: + raise ValueError( + f"Target {target_name} not found in profile {profile_name}" + ) + + target_config = outputs[target_name] + + type = target_config.pop("type") + schema = None + possible_keys = ["schema", "path", "dataset", "database"] + for key in possible_keys: + if key in target_config: + schema = target_config.pop(key) + break + + if schema is None: + raise ValueError(f"No schema found. Expected one of: {possible_keys}") + threads = target_config.pop("threads", 4) + return cls( + type=type, schema=schema, threads=threads, extras=target_config or None + ) + class GlobalConfigs(DbtConfigs): """ diff --git a/src/integrations/prefect-dbt/prefect_dbt/utilities.py b/src/integrations/prefect-dbt/prefect_dbt/utilities.py new file mode 100644 index 000000000000..9430e869ef0a --- /dev/null +++ b/src/integrations/prefect-dbt/prefect_dbt/utilities.py @@ -0,0 +1,40 @@ +""" +Utility functions for prefect-dbt +""" +import os +from typing import Any, Dict, Optional + +import yaml + + +def get_profiles_dir() -> str: + """Get the dbt profiles directory from environment or default location.""" + profiles_dir = os.getenv("DBT_PROFILES_DIR") + if not profiles_dir: + profiles_dir = os.path.expanduser("~/.dbt") + return profiles_dir + + +def load_profiles_yml(profiles_dir: Optional[str]) -> Dict[str, Any]: + """ + Load and parse the profiles.yml file. + + Args: + profiles_dir: Path to the directory containing profiles.yml. + If None, uses the default profiles directory. + + Returns: + Dict containing the parsed profiles.yml contents + + Raises: + ValueError: If profiles.yml is not found + """ + if profiles_dir is None: + profiles_dir = get_profiles_dir() + + profiles_path = os.path.join(profiles_dir, "profiles.yml") + if not os.path.exists(profiles_path): + raise ValueError(f"No profiles.yml found at {profiles_path}") + + with open(profiles_path, "r") as f: + return yaml.safe_load(f) diff --git a/src/integrations/prefect-dbt/tests/cli/configs/test_base.py b/src/integrations/prefect-dbt/tests/cli/configs/test_base.py index dafd1aa625c1..2043e34c2512 100644 --- a/src/integrations/prefect-dbt/tests/cli/configs/test_base.py +++ b/src/integrations/prefect-dbt/tests/cli/configs/test_base.py @@ -1,8 +1,46 @@ from pathlib import Path +from unittest.mock import patch import pytest from prefect_dbt.cli.configs.base import GlobalConfigs, TargetConfigs +SAMPLE_PROFILES = { + "jaffle_shop": { + "outputs": { + "dev": { + "type": "duckdb", + "path": "jaffle_shop.duckdb", + "schema": "main", + "threads": 4, + }, + "prod": { + "type": "duckdb", + "path": "/data/prod/jaffle_shop.duckdb", + "schema": "main", + "threads": 8, + }, + } + }, + "other_project": { + "outputs": { + "dev": { + "type": "duckdb", + "path": "other_project.duckdb", + "schema": "analytics", + "threads": 4, + } + } + }, + "config": {"partial_parse": True}, +} + + +@pytest.fixture +def mock_load_profiles(): + with patch("prefect_dbt.cli.configs.base.load_profiles_yml") as mock: + mock.return_value = SAMPLE_PROFILES + yield mock + def test_target_configs_get_configs(): target_configs = TargetConfigs( @@ -41,3 +79,84 @@ def test_global_configs(): global_configs = GlobalConfigs(log_format="json", send_anonymous_usage_stats=False) assert global_configs.log_format == "json" assert global_configs.send_anonymous_usage_stats is False + + +def test_from_profiles_yml_default_profile_target(mock_load_profiles): + """Test loading with default profile and target""" + target_configs = TargetConfigs.from_profiles_yml() + + assert target_configs.type == "duckdb" + assert target_configs.schema_ == "main" + assert target_configs.threads == 4 + assert target_configs.extras == {"path": "jaffle_shop.duckdb"} + + +def test_from_profiles_yml_explicit_profile_target(mock_load_profiles): + """Test loading with explicitly specified profile and target""" + target_configs = TargetConfigs.from_profiles_yml( + profile_name="other_project", target_name="dev" + ) + + assert target_configs.type == "duckdb" + assert target_configs.schema_ == "analytics" + assert target_configs.threads == 4 + assert target_configs.extras == {"path": "other_project.duckdb"} + + +def test_from_profiles_yml_invalid_profile(mock_load_profiles): + """Test error when invalid profile name provided""" + with pytest.raises(ValueError, match="Profile invalid_profile not found"): + TargetConfigs.from_profiles_yml(profile_name="invalid_profile") + + +def test_from_profiles_yml_invalid_target(mock_load_profiles): + """Test error when invalid target name provided""" + with pytest.raises(ValueError, match="Target invalid_target not found"): + TargetConfigs.from_profiles_yml( + profile_name="jaffle_shop", target_name="invalid_target" + ) + + +def test_from_profiles_yml_no_outputs(mock_load_profiles): + """Test error when profile has no outputs section""" + mock_load_profiles.return_value = {"broken": {"some_other_key": {}}} + with pytest.raises(ValueError, match="No outputs found in profile broken"): + TargetConfigs.from_profiles_yml(profile_name="broken") + + +def test_from_profiles_yml_no_schema(mock_load_profiles): + """Test error when target has no schema or equivalent field""" + mock_load_profiles.return_value = { + "test": { + "outputs": { + "dev": { + "type": "postgres", + "threads": 4, + # Missing schema field + "host": "localhost", + } + } + } + } + with pytest.raises(ValueError, match="No schema found"): + TargetConfigs.from_profiles_yml(profile_name="test") + + +def test_from_profiles_yml_alternative_schema_keys(mock_load_profiles): + """Test that alternative schema keys (dataset, database) work""" + mock_profiles = { + "test": { + "outputs": { + "dev": { + "type": "bigquery", + "threads": 4, + "dataset": "my_dataset", # Alternative to schema + "project": "my_project", + } + } + } + } + mock_load_profiles.return_value = mock_profiles + + target_configs = TargetConfigs.from_profiles_yml(profile_name="test") + assert target_configs.schema_ == "my_dataset" diff --git a/src/integrations/prefect-dbt/tests/test_utilities.py b/src/integrations/prefect-dbt/tests/test_utilities.py new file mode 100644 index 000000000000..af38777f6733 --- /dev/null +++ b/src/integrations/prefect-dbt/tests/test_utilities.py @@ -0,0 +1,77 @@ +import os +from pathlib import Path + +import pytest +import yaml +from prefect_dbt.utilities import get_profiles_dir, load_profiles_yml + +SAMPLE_PROFILES = { + "jaffle_shop": { + "outputs": { + "dev": { + "type": "duckdb", + "path": "jaffle_shop.duckdb", + "schema": "main", + "threads": 4, + } + } + } +} + + +@pytest.fixture +def temp_profiles_dir(tmp_path): + profiles_dir = tmp_path / ".dbt" + profiles_dir.mkdir() + + profiles_path = profiles_dir / "profiles.yml" + with open(profiles_path, "w") as f: + yaml.dump(SAMPLE_PROFILES, f) + + return str(profiles_dir) + + +def test_get_profiles_dir_default(): + if "DBT_PROFILES_DIR" in os.environ: + del os.environ["DBT_PROFILES_DIR"] + + expected = os.path.expanduser("~/.dbt") + assert get_profiles_dir() == expected + + +def test_get_profiles_dir_from_env(): + test_path = "/custom/path" + os.environ["DBT_PROFILES_DIR"] = test_path + try: + assert get_profiles_dir() == test_path + finally: + del os.environ["DBT_PROFILES_DIR"] + + +def test_load_profiles_yml_success(temp_profiles_dir): + profiles = load_profiles_yml(temp_profiles_dir) + assert profiles == SAMPLE_PROFILES + + +def test_load_profiles_yml_default_dir(monkeypatch, temp_profiles_dir): + monkeypatch.setenv("DBT_PROFILES_DIR", temp_profiles_dir) + profiles = load_profiles_yml(None) + assert profiles == SAMPLE_PROFILES + + +def test_load_profiles_yml_file_not_found(): + nonexistent_dir = "/path/that/does/not/exist" + with pytest.raises( + ValueError, + match=f"No profiles.yml found at {os.path.join(nonexistent_dir, 'profiles.yml')}", + ): + load_profiles_yml(nonexistent_dir) + + +def test_load_profiles_yml_invalid_yaml(temp_profiles_dir): + profiles_path = Path(temp_profiles_dir) / "profiles.yml" + with open(profiles_path, "w") as f: + f.write("invalid: yaml: content:\nindentation error") + + with pytest.raises(yaml.YAMLError): + load_profiles_yml(temp_profiles_dir) From debe1dd56684c3b69fce6999fe4a9c58ce244fb3 Mon Sep 17 00:00:00 2001 From: Kevin Cameron Grismore Date: Tue, 3 Dec 2024 09:42:12 -0600 Subject: [PATCH 2/6] add allow_field_overrides passthrough --- .../prefect-dbt/prefect_dbt/cli/configs/base.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/integrations/prefect-dbt/prefect_dbt/cli/configs/base.py b/src/integrations/prefect-dbt/prefect_dbt/cli/configs/base.py index 44dbdb21cae9..0ee5e3a34888 100644 --- a/src/integrations/prefect-dbt/prefect_dbt/cli/configs/base.py +++ b/src/integrations/prefect-dbt/prefect_dbt/cli/configs/base.py @@ -166,6 +166,8 @@ def from_profiles_yml( If None, uses the first target in the selected profile. profiles_dir: Path to the directory containing profiles.yml. If None, uses the default profiles directory. + allow_field_overrides: If enabled, fields from dbt target configs + will override fields provided in extras and credentials. Returns: A TargetConfigs instance populated from the profiles.yml target. @@ -212,7 +214,11 @@ def from_profiles_yml( raise ValueError(f"No schema found. Expected one of: {possible_keys}") threads = target_config.pop("threads", 4) return cls( - type=type, schema=schema, threads=threads, extras=target_config or None + type=type, + schema=schema, + threads=threads, + extras=target_config or None, + allow_field_overrides=allow_field_overrides, ) From 85aeff65792d2d352cc91a4056922f640a4ed0d2 Mon Sep 17 00:00:00 2001 From: Kevin Cameron Grismore Date: Tue, 3 Dec 2024 11:55:14 -0600 Subject: [PATCH 3/6] fix cloud tests for `httpx==0.28.0` compatibility --- .../tests/cli/configs/test_base.py | 7 - .../prefect-dbt/tests/cloud/test_jobs.py | 1392 +++++++++-------- .../prefect-dbt/tests/cloud/test_runs.py | 290 ++-- .../prefect-dbt/tests/cloud/test_utils.py | 108 +- 4 files changed, 908 insertions(+), 889 deletions(-) diff --git a/src/integrations/prefect-dbt/tests/cli/configs/test_base.py b/src/integrations/prefect-dbt/tests/cli/configs/test_base.py index 2043e34c2512..3a33e78b35eb 100644 --- a/src/integrations/prefect-dbt/tests/cli/configs/test_base.py +++ b/src/integrations/prefect-dbt/tests/cli/configs/test_base.py @@ -82,7 +82,6 @@ def test_global_configs(): def test_from_profiles_yml_default_profile_target(mock_load_profiles): - """Test loading with default profile and target""" target_configs = TargetConfigs.from_profiles_yml() assert target_configs.type == "duckdb" @@ -92,7 +91,6 @@ def test_from_profiles_yml_default_profile_target(mock_load_profiles): def test_from_profiles_yml_explicit_profile_target(mock_load_profiles): - """Test loading with explicitly specified profile and target""" target_configs = TargetConfigs.from_profiles_yml( profile_name="other_project", target_name="dev" ) @@ -104,13 +102,11 @@ def test_from_profiles_yml_explicit_profile_target(mock_load_profiles): def test_from_profiles_yml_invalid_profile(mock_load_profiles): - """Test error when invalid profile name provided""" with pytest.raises(ValueError, match="Profile invalid_profile not found"): TargetConfigs.from_profiles_yml(profile_name="invalid_profile") def test_from_profiles_yml_invalid_target(mock_load_profiles): - """Test error when invalid target name provided""" with pytest.raises(ValueError, match="Target invalid_target not found"): TargetConfigs.from_profiles_yml( profile_name="jaffle_shop", target_name="invalid_target" @@ -118,14 +114,12 @@ def test_from_profiles_yml_invalid_target(mock_load_profiles): def test_from_profiles_yml_no_outputs(mock_load_profiles): - """Test error when profile has no outputs section""" mock_load_profiles.return_value = {"broken": {"some_other_key": {}}} with pytest.raises(ValueError, match="No outputs found in profile broken"): TargetConfigs.from_profiles_yml(profile_name="broken") def test_from_profiles_yml_no_schema(mock_load_profiles): - """Test error when target has no schema or equivalent field""" mock_load_profiles.return_value = { "test": { "outputs": { @@ -143,7 +137,6 @@ def test_from_profiles_yml_no_schema(mock_load_profiles): def test_from_profiles_yml_alternative_schema_keys(mock_load_profiles): - """Test that alternative schema keys (dataset, database) work""" mock_profiles = { "test": { "outputs": { diff --git a/src/integrations/prefect-dbt/tests/cloud/test_jobs.py b/src/integrations/prefect-dbt/tests/cloud/test_jobs.py index 621dd27e260d..28d33bc2ebd0 100644 --- a/src/integrations/prefect-dbt/tests/cloud/test_jobs.py +++ b/src/integrations/prefect-dbt/tests/cloud/test_jobs.py @@ -2,6 +2,7 @@ import os import pytest +import respx from httpx import Response from prefect_dbt.cloud.credentials import DbtCloudCredentials from prefect_dbt.cloud.exceptions import ( @@ -37,12 +38,6 @@ def dbt_cloud_job(dbt_cloud_credentials): return DbtCloudJob(job_id=10000, dbt_cloud_credentials=dbt_cloud_credentials) -@pytest.fixture -def respx_mock_with_pass_through(respx_mock): - respx_mock.route(host="127.0.0.1").pass_through() - return respx_mock - - HEADERS = { "Authorization": "Bearer my_api_key", "x-dbt-partner-source": "prefect", @@ -51,403 +46,390 @@ def respx_mock_with_pass_through(respx_mock): class TestTriggerDbtCloudJobRun: - async def test_get_dbt_cloud_job_info( - self, respx_mock_with_pass_through, dbt_cloud_credentials - ): - respx_mock_with_pass_through.route(host="127.0.0.1").pass_through() - respx_mock_with_pass_through.get( - "https://cloud.getdbt.com/api/v2/accounts/123456789/jobs/12/", - headers=HEADERS, - ).mock(return_value=Response(200, json={"data": {"id": 10000}})) - - response = await get_dbt_cloud_job_info.fn( - dbt_cloud_credentials=dbt_cloud_credentials, - job_id=12, - order_by="id", - ) - - assert response == {"id": 10000} - - async def test_trigger_job_with_no_options( - self, respx_mock_with_pass_through, dbt_cloud_credentials - ): - respx_mock_with_pass_through.post( - "https://cloud.getdbt.com/api/v2/accounts/123456789/jobs/1/run/", - headers=HEADERS, - ).mock( - return_value=Response( - 200, json={"data": {"id": 10000, "project_id": 12345}} + async def test_get_dbt_cloud_job_info(self, dbt_cloud_credentials): + with respx.mock(using="httpx", assert_all_called=False) as respx_mock: + respx_mock.route(host="127.0.0.1").pass_through() + respx_mock.get( + "https://cloud.getdbt.com/api/v2/accounts/123456789/jobs/12/", + headers=HEADERS, + ).mock(return_value=Response(200, json={"data": {"id": 10000}})) + + response = await get_dbt_cloud_job_info.fn( + dbt_cloud_credentials=dbt_cloud_credentials, + job_id=12, + order_by="id", ) - ) - with disable_run_logger(): - result = await trigger_dbt_cloud_job_run.fn( - dbt_cloud_credentials=dbt_cloud_credentials, - job_id=1, + assert response == {"id": 10000} + + async def test_trigger_job_with_no_options(self, dbt_cloud_credentials): + with respx.mock(using="httpx") as respx_mock: + respx_mock.post( + "https://cloud.getdbt.com/api/v2/accounts/123456789/jobs/1/run/", + headers=HEADERS, + ).mock( + return_value=Response( + 200, json={"data": {"id": 10000, "project_id": 12345}} + ) ) - assert result == {"id": 10000, "project_id": 12345} + with disable_run_logger(): + result = await trigger_dbt_cloud_job_run.fn( + dbt_cloud_credentials=dbt_cloud_credentials, + job_id=1, + ) - request_body = json.loads( - respx_mock_with_pass_through.calls.last.request.content.decode() - ) - assert "Triggered via Prefect" in request_body["cause"] + assert result == {"id": 10000, "project_id": 12345} - async def test_trigger_with_custom_options( - self, respx_mock_with_pass_through, dbt_cloud_credentials - ): - respx_mock_with_pass_through.route(host="127.0.0.1").pass_through() - respx_mock_with_pass_through.post( - "https://cloud.getdbt.com/api/v2/accounts/123456789/jobs/1/run/", - headers=HEADERS, - json={ - "cause": "This is a custom cause", - "git_branch": "staging", - "schema_override": "dbt_cloud_pr_123", - "dbt_version_override": "0.18.0", - "threads_override": 8, - "target_name_override": "staging", - "generate_docs_override": True, - "timeout_seconds_override": 3000, - "steps_override": [ - "dbt seed", - "dbt run --fail-fast", - "dbt test --fail fast", - ], - }, - ).mock( - return_value=Response( - 200, json={"data": {"id": 10000, "project_id": 12345}} - ) - ) + request_body = json.loads(respx_mock.calls.last.request.content.decode()) + assert "Triggered via Prefect" in request_body["cause"] - @flow - async def test_trigger_with_custom_options(): - return await trigger_dbt_cloud_job_run( - dbt_cloud_credentials=dbt_cloud_credentials, - job_id=1, - options=TriggerJobRunOptions( - cause="This is a custom cause", - git_branch="staging", - schema_override="dbt_cloud_pr_123", - dbt_version_override="0.18.0", - target_name_override="staging", - timeout_seconds_override=3000, - generate_docs_override=True, - threads_override=8, - steps_override=[ + async def test_trigger_with_custom_options(self, dbt_cloud_credentials): + with respx.mock(using="httpx") as respx_mock: + respx_mock.route(host="127.0.0.1").pass_through() + respx_mock.post( + "https://cloud.getdbt.com/api/v2/accounts/123456789/jobs/1/run/", + headers=HEADERS, + json={ + "cause": "This is a custom cause", + "git_branch": "staging", + "schema_override": "dbt_cloud_pr_123", + "dbt_version_override": "0.18.0", + "threads_override": 8, + "target_name_override": "staging", + "generate_docs_override": True, + "timeout_seconds_override": 3000, + "steps_override": [ "dbt seed", "dbt run --fail-fast", "dbt test --fail fast", ], - ), - ) - - result = await test_trigger_with_custom_options() - assert result == {"id": 10000, "project_id": 12345} - - async def test_trigger_nonexistent_job( - self, respx_mock_with_pass_through, dbt_cloud_credentials - ): - respx_mock_with_pass_through.route(host="127.0.0.1").pass_through() - respx_mock_with_pass_through.post( - "https://cloud.getdbt.com/api/v2/accounts/123456789/jobs/1/run/", - headers=HEADERS, - ).mock( - return_value=Response(404, json={"status": {"user_message": "Not found!"}}) - ) - - @flow - async def test_trigger_nonexistent_job(): - task_shorter_retry = trigger_dbt_cloud_job_run.with_options( - retries=1, retry_delay_seconds=1 - ) - await task_shorter_retry( - dbt_cloud_credentials=dbt_cloud_credentials, - job_id=1, - ) - - with pytest.raises(DbtCloudJobRunTriggerFailed, match="Not found!"): - await test_trigger_nonexistent_job() + }, + ).mock( + return_value=Response( + 200, json={"data": {"id": 10000, "project_id": 12345}} + ) + ) + + @flow + async def test_trigger_with_custom_options(): + return await trigger_dbt_cloud_job_run( + dbt_cloud_credentials=dbt_cloud_credentials, + job_id=1, + options=TriggerJobRunOptions( + cause="This is a custom cause", + git_branch="staging", + schema_override="dbt_cloud_pr_123", + dbt_version_override="0.18.0", + target_name_override="staging", + timeout_seconds_override=3000, + generate_docs_override=True, + threads_override=8, + steps_override=[ + "dbt seed", + "dbt run --fail-fast", + "dbt test --fail fast", + ], + ), + ) + + result = await test_trigger_with_custom_options() + assert result == {"id": 10000, "project_id": 12345} + + async def test_trigger_nonexistent_job(self, dbt_cloud_credentials): + with respx.mock(using="httpx", assert_all_called=False) as respx_mock: + respx_mock.route(host="127.0.0.1").pass_through() + respx_mock.post( + "https://cloud.getdbt.com/api/v2/accounts/123456789/jobs/1/run/", + headers=HEADERS, + ).mock( + return_value=Response( + 404, json={"status": {"user_message": "Not found!"}} + ) + ) + + @flow + async def test_trigger_nonexistent_job(): + task_shorter_retry = trigger_dbt_cloud_job_run.with_options( + retries=1, retry_delay_seconds=1 + ) + await task_shorter_retry( + dbt_cloud_credentials=dbt_cloud_credentials, + job_id=1, + ) + + with pytest.raises(DbtCloudJobRunTriggerFailed, match="Not found!"): + await test_trigger_nonexistent_job() async def test_trigger_nonexistent_run_id_no_logs( - self, respx_mock_with_pass_through, dbt_cloud_credentials, caplog + self, dbt_cloud_credentials, caplog ): - respx_mock_with_pass_through.route(host="127.0.0.1").pass_through() - respx_mock_with_pass_through.post( - "https://cloud.getdbt.com/api/v2/accounts/123456789/jobs/1/run/", - headers=HEADERS, - ).mock(return_value=Response(200, json={"data": {"project_id": 12345}})) - - @flow - async def trigger_nonexistent_run_id(): - task_shorter_retry = trigger_dbt_cloud_job_run.with_options( - retries=1, retry_delay_seconds=1 - ) - await task_shorter_retry( - dbt_cloud_credentials=dbt_cloud_credentials, - job_id=1, - ) - - await trigger_nonexistent_run_id() + with respx.mock(using="httpx", assert_all_called=False) as respx_mock: + respx_mock.route(host="127.0.0.1").pass_through() + respx_mock.post( + "https://cloud.getdbt.com/api/v2/accounts/123456789/jobs/1/run/", + headers=HEADERS, + ).mock(return_value=Response(200, json={"data": {"project_id": 12345}})) + + @flow + async def trigger_nonexistent_run_id(): + task_shorter_retry = trigger_dbt_cloud_job_run.with_options( + retries=1, retry_delay_seconds=1 + ) + await task_shorter_retry( + dbt_cloud_credentials=dbt_cloud_credentials, + job_id=1, + ) + + await trigger_nonexistent_run_id() class TestTriggerDbtCloudJobRunAndWaitForCompletion: - @pytest.mark.respx(assert_all_called=True) - async def test_run_success( - self, respx_mock_with_pass_through, dbt_cloud_credentials - ): - respx_mock_with_pass_through.route(host="127.0.0.1").pass_through() - respx_mock_with_pass_through.post( - "https://cloud.getdbt.com/api/v2/accounts/123456789/jobs/1/run/", - headers=HEADERS, - ).mock( - return_value=Response( - 200, json={"data": {"id": 10000, "project_id": 12345}} - ) - ) - respx_mock_with_pass_through.get( - "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/10000/", - headers=HEADERS, - ).mock(return_value=Response(200, json={"data": {"id": 10000, "status": 10}})) - respx_mock_with_pass_through.get( - "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/10000/artifacts/", - headers=HEADERS, - ).mock(return_value=Response(200, json={"data": ["manifest.json"]})) - - result = await trigger_dbt_cloud_job_run_and_wait_for_completion( - dbt_cloud_credentials=dbt_cloud_credentials, job_id=1 - ) - assert result == { - "id": 10000, - "status": 10, - "artifact_paths": ["manifest.json"], - } - - @pytest.mark.respx(assert_all_called=True) - async def test_run_success_with_wait( - self, respx_mock_with_pass_through, dbt_cloud_credentials - ): - respx_mock_with_pass_through.route(host="127.0.0.1").pass_through() - respx_mock_with_pass_through.post( - "https://cloud.getdbt.com/api/v2/accounts/123456789/jobs/1/run/", - headers=HEADERS, - ).mock( - return_value=Response( - 200, json={"data": {"id": 10000, "project_id": 12345}} - ) - ) - respx_mock_with_pass_through.get( - "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/10000/", - headers=HEADERS, - ).mock( - side_effect=[ - Response(200, json={"data": {"id": 10000, "status": 1}}), - Response(200, json={"data": {"id": 10000, "status": 3}}), - Response(200, json={"data": {"id": 10000, "status": 10}}), - ] - ) - respx_mock_with_pass_through.get( - "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/10000/artifacts/", - headers=HEADERS, - ).mock(return_value=Response(200, json={"data": ["manifest.json"]})) - - result = await trigger_dbt_cloud_job_run_and_wait_for_completion( - dbt_cloud_credentials=dbt_cloud_credentials, - job_id=1, - poll_frequency_seconds=1, - ) - assert result == { - "id": 10000, - "status": 10, - "artifact_paths": ["manifest.json"], - } - - @pytest.mark.respx(assert_all_called=True) - async def test_run_failure_with_wait_and_retry( - self, respx_mock_with_pass_through, dbt_cloud_credentials - ): - respx_mock_with_pass_through.route(host="127.0.0.1").pass_through() - respx_mock_with_pass_through.post( - "https://cloud.getdbt.com/api/v2/accounts/123456789/jobs/1/run/", - headers=HEADERS, - ).mock( - return_value=Response( - 200, json={"data": {"id": 10000, "project_id": 12345}} - ) - ) - respx_mock_with_pass_through.get( - "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/10000/", - headers=HEADERS, - ).mock( - side_effect=[ - Response(200, json={"data": {"id": 10000, "status": 1}}), - Response(200, json={"data": {"id": 10000, "status": 3}}), - Response( - 200, json={"data": {"id": 10000, "status": 20}} - ), # failed status - ] - ) - - with pytest.raises(DbtCloudJobRunFailed): - await trigger_dbt_cloud_job_run_and_wait_for_completion( - dbt_cloud_credentials=dbt_cloud_credentials, - job_id=1, - poll_frequency_seconds=1, - retry_filtered_models_attempts=1, - ) - - @pytest.mark.respx(assert_all_called=True) - async def test_run_with_unexpected_status( - self, respx_mock_with_pass_through, dbt_cloud_credentials - ): - respx_mock_with_pass_through.route(host="127.0.0.1").pass_through() - respx_mock_with_pass_through.post( - "https://cloud.getdbt.com/api/v2/accounts/123456789/jobs/1/run/", - headers=HEADERS, - ).mock( - return_value=Response( - 200, json={"data": {"id": 10000, "project_id": 12345}} - ) - ) - respx_mock_with_pass_through.get( - "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/10000/", - headers=HEADERS, - ).mock( - side_effect=[ - Response(200, json={"data": {"id": 10000, "status": 1}}), - Response(200, json={"data": {"id": 10000, "status": 3}}), - Response( - 200, json={"data": {"id": 10000, "status": 42}} - ), # unknown status - ] - ) - - with pytest.raises(ValueError, match="42 is not a valid DbtCloudJobRunStatus"): - await trigger_dbt_cloud_job_run_and_wait_for_completion( - dbt_cloud_credentials=dbt_cloud_credentials, - job_id=1, - poll_frequency_seconds=1, - retry_filtered_models_attempts=0, - ) - - @pytest.mark.respx(assert_all_called=True) - async def test_run_failure_no_run_id( - self, respx_mock_with_pass_through, dbt_cloud_credentials - ): - respx_mock_with_pass_through.route(host="127.0.0.1").pass_through() - respx_mock_with_pass_through.post( - "https://cloud.getdbt.com/api/v2/accounts/123456789/jobs/1/run/", - headers=HEADERS, - ).mock(return_value=Response(200, json={"data": {"project_id": 12345}})) - - with pytest.raises(RuntimeError, match="Unable to determine run ID"): - await trigger_dbt_cloud_job_run_and_wait_for_completion( - dbt_cloud_credentials=dbt_cloud_credentials, - job_id=1, - poll_frequency_seconds=1, - ) - - @pytest.mark.respx(assert_all_called=True) - async def test_run_cancelled_with_wait( - self, respx_mock_with_pass_through, dbt_cloud_credentials - ): - respx_mock_with_pass_through.route(host="127.0.0.1").pass_through() - respx_mock_with_pass_through.post( - "https://cloud.getdbt.com/api/v2/accounts/123456789/jobs/1/run/", - headers=HEADERS, - ).mock( - return_value=Response( - 200, json={"data": {"id": 10000, "project_id": 12345}} - ) - ) - respx_mock_with_pass_through.get( - "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/10000/", - headers=HEADERS, - ).mock( - side_effect=[ - Response(200, json={"data": {"id": 10000, "status": 1}}), - Response(200, json={"data": {"id": 10000, "status": 3}}), - Response(200, json={"data": {"id": 10000, "status": 30}}), - ] - ) - - with pytest.raises(DbtCloudJobRunCancelled): - await trigger_dbt_cloud_job_run_and_wait_for_completion( - dbt_cloud_credentials=dbt_cloud_credentials, - job_id=1, - poll_frequency_seconds=1, - retry_filtered_models_attempts=0, - ) - - @pytest.mark.respx(assert_all_called=True) - async def test_run_timed_out( - self, respx_mock_with_pass_through, dbt_cloud_credentials - ): - respx_mock_with_pass_through.post( - "https://cloud.getdbt.com/api/v2/accounts/123456789/jobs/1/run/", - headers=HEADERS, - ).mock( - return_value=Response( - 200, json={"data": {"id": 10000, "project_id": 12345}} - ) - ) - respx_mock_with_pass_through.get( - "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/10000/", - headers=HEADERS, - ).mock( - side_effect=[ - Response(200, json={"data": {"id": 10000, "status": 1}}), - Response(200, json={"data": {"id": 10000, "status": 3}}), - Response(200, json={"data": {"id": 10000, "status": 3}}), - Response(200, json={"data": {"id": 10000, "status": 3}}), - ] - ) - - with pytest.raises(DbtCloudJobRunTimedOut): - await trigger_dbt_cloud_job_run_and_wait_for_completion( + async def test_run_success(self, dbt_cloud_credentials): + with respx.mock(using="httpx") as respx_mock: + respx_mock.route(host="127.0.0.1").pass_through() + respx_mock.post( + "https://cloud.getdbt.com/api/v2/accounts/123456789/jobs/1/run/", + headers=HEADERS, + ).mock( + return_value=Response( + 200, json={"data": {"id": 10000, "project_id": 12345}} + ) + ) + respx_mock.get( + "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/10000/", + headers=HEADERS, + ).mock( + return_value=Response(200, json={"data": {"id": 10000, "status": 10}}) + ) + respx_mock.get( + "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/10000/artifacts/", + headers=HEADERS, + ).mock(return_value=Response(200, json={"data": ["manifest.json"]})) + + result = await trigger_dbt_cloud_job_run_and_wait_for_completion( + dbt_cloud_credentials=dbt_cloud_credentials, job_id=1 + ) + assert result == { + "id": 10000, + "status": 10, + "artifact_paths": ["manifest.json"], + } + + async def test_run_success_with_wait(self, dbt_cloud_credentials): + with respx.mock(using="httpx") as respx_mock: + respx_mock.route(host="127.0.0.1").pass_through() + respx_mock.post( + "https://cloud.getdbt.com/api/v2/accounts/123456789/jobs/1/run/", + headers=HEADERS, + ).mock( + return_value=Response( + 200, json={"data": {"id": 10000, "project_id": 12345}} + ) + ) + respx_mock.get( + "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/10000/", + headers=HEADERS, + ).mock( + side_effect=[ + Response(200, json={"data": {"id": 10000, "status": 1}}), + Response(200, json={"data": {"id": 10000, "status": 3}}), + Response(200, json={"data": {"id": 10000, "status": 10}}), + ] + ) + respx_mock.get( + "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/10000/artifacts/", + headers=HEADERS, + ).mock(return_value=Response(200, json={"data": ["manifest.json"]})) + + result = await trigger_dbt_cloud_job_run_and_wait_for_completion( dbt_cloud_credentials=dbt_cloud_credentials, job_id=1, poll_frequency_seconds=1, - max_wait_seconds=3, - retry_filtered_models_attempts=0, - ) - - @pytest.mark.respx(assert_all_called=True) - async def test_run_success_failed_artifacts( - self, respx_mock_with_pass_through, dbt_cloud_credentials - ): - respx_mock_with_pass_through.post( - "https://cloud.getdbt.com/api/v2/accounts/123456789/jobs/1/run/", - headers=HEADERS, - ).mock( - return_value=Response( - 200, json={"data": {"id": 10000, "project_id": 12345}} - ) - ) - respx_mock_with_pass_through.get( - "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/10000/", - headers=HEADERS, - ).mock(return_value=Response(200, json={"data": {"id": 10000, "status": 10}})) - respx_mock_with_pass_through.get( - "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/10000/artifacts/", - headers=HEADERS, - ).mock( - return_value=Response( - 500, json={"status": {"user_message": "This is what went wrong"}} ) - ) - - result = await trigger_dbt_cloud_job_run_and_wait_for_completion( - dbt_cloud_credentials=dbt_cloud_credentials, job_id=1 - ) - assert result == {"id": 10000, "status": 10} + assert result == { + "id": 10000, + "status": 10, + "artifact_paths": ["manifest.json"], + } + + async def test_run_failure_with_wait_and_retry(self, dbt_cloud_credentials): + with respx.mock(using="httpx") as respx_mock: + respx_mock.route(host="127.0.0.1").pass_through() + respx_mock.post( + "https://cloud.getdbt.com/api/v2/accounts/123456789/jobs/1/run/", + headers=HEADERS, + ).mock( + return_value=Response( + 200, json={"data": {"id": 10000, "project_id": 12345}} + ) + ) + respx_mock.get( + "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/10000/", + headers=HEADERS, + ).mock( + side_effect=[ + Response(200, json={"data": {"id": 10000, "status": 1}}), + Response(200, json={"data": {"id": 10000, "status": 3}}), + Response( + 200, json={"data": {"id": 10000, "status": 20}} + ), # failed status + ] + ) + + with pytest.raises(DbtCloudJobRunFailed): + await trigger_dbt_cloud_job_run_and_wait_for_completion( + dbt_cloud_credentials=dbt_cloud_credentials, + job_id=1, + poll_frequency_seconds=1, + retry_filtered_models_attempts=1, + ) + + async def test_run_with_unexpected_status(self, dbt_cloud_credentials): + with respx.mock(using="httpx") as respx_mock: + respx_mock.route(host="127.0.0.1").pass_through() + respx_mock.post( + "https://cloud.getdbt.com/api/v2/accounts/123456789/jobs/1/run/", + headers=HEADERS, + ).mock( + return_value=Response( + 200, json={"data": {"id": 10000, "project_id": 12345}} + ) + ) + respx_mock.get( + "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/10000/", + headers=HEADERS, + ).mock( + side_effect=[ + Response(200, json={"data": {"id": 10000, "status": 1}}), + Response(200, json={"data": {"id": 10000, "status": 3}}), + Response( + 200, json={"data": {"id": 10000, "status": 42}} + ), # unknown status + ] + ) + + with pytest.raises( + ValueError, match="42 is not a valid DbtCloudJobRunStatus" + ): + await trigger_dbt_cloud_job_run_and_wait_for_completion( + dbt_cloud_credentials=dbt_cloud_credentials, + job_id=1, + poll_frequency_seconds=1, + retry_filtered_models_attempts=0, + ) + + async def test_run_failure_no_run_id(self, dbt_cloud_credentials): + with respx.mock(using="httpx") as respx_mock: + respx_mock.route(host="127.0.0.1").pass_through() + respx_mock.post( + "https://cloud.getdbt.com/api/v2/accounts/123456789/jobs/1/run/", + headers=HEADERS, + ).mock(return_value=Response(200, json={"data": {"project_id": 12345}})) + + with pytest.raises(RuntimeError, match="Unable to determine run ID"): + await trigger_dbt_cloud_job_run_and_wait_for_completion( + dbt_cloud_credentials=dbt_cloud_credentials, + job_id=1, + poll_frequency_seconds=1, + ) + + async def test_run_cancelled_with_wait(self, dbt_cloud_credentials): + with respx.mock(using="httpx") as respx_mock: + respx_mock.route(host="127.0.0.1").pass_through() + respx_mock.post( + "https://cloud.getdbt.com/api/v2/accounts/123456789/jobs/1/run/", + headers=HEADERS, + ).mock( + return_value=Response( + 200, json={"data": {"id": 10000, "project_id": 12345}} + ) + ) + respx_mock.get( + "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/10000/", + headers=HEADERS, + ).mock( + side_effect=[ + Response(200, json={"data": {"id": 10000, "status": 1}}), + Response(200, json={"data": {"id": 10000, "status": 3}}), + Response(200, json={"data": {"id": 10000, "status": 30}}), + ] + ) + + with pytest.raises(DbtCloudJobRunCancelled): + await trigger_dbt_cloud_job_run_and_wait_for_completion( + dbt_cloud_credentials=dbt_cloud_credentials, + job_id=1, + poll_frequency_seconds=1, + retry_filtered_models_attempts=0, + ) + + async def test_run_timed_out(self, dbt_cloud_credentials): + with respx.mock(using="httpx") as respx_mock: + respx_mock.route(host="127.0.0.1").pass_through() + respx_mock.post( + "https://cloud.getdbt.com/api/v2/accounts/123456789/jobs/1/run/", + headers=HEADERS, + ).mock( + return_value=Response( + 200, json={"data": {"id": 10000, "project_id": 12345}} + ) + ) + respx_mock.get( + "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/10000/", + headers=HEADERS, + ).mock( + side_effect=[ + Response(200, json={"data": {"id": 10000, "status": 1}}), + Response(200, json={"data": {"id": 10000, "status": 3}}), + Response(200, json={"data": {"id": 10000, "status": 3}}), + Response(200, json={"data": {"id": 10000, "status": 3}}), + ] + ) + + with pytest.raises(DbtCloudJobRunTimedOut): + await trigger_dbt_cloud_job_run_and_wait_for_completion( + dbt_cloud_credentials=dbt_cloud_credentials, + job_id=1, + poll_frequency_seconds=1, + max_wait_seconds=3, + retry_filtered_models_attempts=0, + ) + + async def test_run_success_failed_artifacts(self, dbt_cloud_credentials): + with respx.mock(using="httpx") as respx_mock: + respx_mock.route(host="127.0.0.1").pass_through() + respx_mock.post( + "https://cloud.getdbt.com/api/v2/accounts/123456789/jobs/1/run/", + headers=HEADERS, + ).mock( + return_value=Response( + 200, json={"data": {"id": 10000, "project_id": 12345}} + ) + ) + respx_mock.get( + "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/10000/", + headers=HEADERS, + ).mock( + return_value=Response(200, json={"data": {"id": 10000, "status": 10}}) + ) + respx_mock.get( + "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/10000/artifacts/", + headers=HEADERS, + ).mock( + return_value=Response( + 500, json={"status": {"user_message": "This is what went wrong"}} + ) + ) + + result = await trigger_dbt_cloud_job_run_and_wait_for_completion( + dbt_cloud_credentials=dbt_cloud_credentials, job_id=1 + ) + assert result == {"id": 10000, "status": 10} class TestRetryDbtCloudRunJobSubsetAndWaitForCompletion: - async def test_run_steps_override_error( - self, respx_mock_with_pass_through, dbt_cloud_credentials - ): + async def test_run_steps_override_error(self, dbt_cloud_credentials): with pytest.raises(ValueError, match="Do not set `steps_override"): await retry_dbt_cloud_job_run_subset_and_wait_for_completion( dbt_cloud_credentials=dbt_cloud_credentials, @@ -467,116 +449,117 @@ async def test_retry_run( self, trigger_job_run_options, exe_command, - respx_mock_with_pass_through, dbt_cloud_credentials, ): - respx_mock_with_pass_through.get( - "https://cloud.getdbt.com/api/v2/accounts/123456789/jobs/1/", - headers=HEADERS, - ).mock( - return_value=Response( - 200, - json={ - "data": { - "id": 10000, - "generate_docs": False, - "generate_sources": False, - } - }, - ) - ) - - # mock get_dbt_cloud_run_info - respx_mock_with_pass_through.get( - "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/10000/", - headers=HEADERS, - ).mock( - return_value=Response( - 200, - json={ - "data": { - "id": 10000, - "status": 20, # failed status - "run_steps": [ - { - "id": 432100123, - "run_id": 10000, - "account_id": 123456789, - "index": 1, - "name": "Clone Git Repository", - "status_humanized": "Success", - }, - { - "id": 432100124, - "run_id": 10000, - "account_id": 123456789, - "index": 2, - "name": "Create Profile from Connection Snowflake ", - "status_humanized": "Success", - }, - { - "id": 432100125, - "run_id": 10000, - "account_id": 123456789, - "index": 3, - "name": "Invoke dbt with `dbt deps`", - "status_humanized": "Success", - }, + with respx.mock(using="httpx", assert_all_called=False) as respx_mock: + respx_mock.route(host="127.0.0.1").pass_through() + respx_mock.get( + "https://cloud.getdbt.com/api/v2/accounts/123456789/jobs/1/", + headers=HEADERS, + ).mock( + return_value=Response( + 200, + json={ + "data": { + "id": 10000, + "generate_docs": False, + "generate_sources": False, + } + }, + ) + ) + + # mock get_dbt_cloud_run_info + respx_mock.get( + "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/10000/", + headers=HEADERS, + ).mock( + return_value=Response( + 200, + json={ + "data": { + "id": 10000, + "status": 20, # failed status + "run_steps": [ + { + "id": 432100123, + "run_id": 10000, + "account_id": 123456789, + "index": 1, + "name": "Clone Git Repository", + "status_humanized": "Success", + }, + { + "id": 432100124, + "run_id": 10000, + "account_id": 123456789, + "index": 2, + "name": "Create Profile from Connection Snowflake ", + "status_humanized": "Success", + }, + { + "id": 432100125, + "run_id": 10000, + "account_id": 123456789, + "index": 3, + "name": "Invoke dbt with `dbt deps`", + "status_humanized": "Success", + }, + { + "run_id": 10000, + "account_id": 123456789, + "index": 4, + "name": f"Invoke dbt with `dbt {exe_command}`", + "status_humanized": "Error", + }, + ], + "job_id": "1", + } + }, + ) + ) + + # mock list_dbt_cloud_run_artifacts + respx_mock.get( + "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/10000/artifacts/", + headers=HEADERS, + ).mock(return_value=Response(200, json={"data": ["run_results.json"]})) + + # mock get_dbt_cloud_run_artifact + respx_mock.get( + "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/10000/artifacts/run_results.json", # noqa + headers=HEADERS, + ).mock( + return_value=Response( + 200, + json={ + "metadata": {"env": {"DBT_CLOUD_JOB_ID": "1"}}, + "results": [ { - "run_id": 10000, - "account_id": 123456789, - "index": 4, - "name": f"Invoke dbt with `dbt {exe_command}`", - "status_humanized": "Error", + "status": "fail", + "message": "FAIL 1", + "failures": None, + "unique_id": "model.jaffle_shop.stg_customers", }, ], - "job_id": "1", - } - }, - ) - ) - - # mock list_dbt_cloud_run_artifacts - respx_mock_with_pass_through.get( - "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/10000/artifacts/", - headers=HEADERS, - ).mock(return_value=Response(200, json={"data": ["run_results.json"]})) - - # mock get_dbt_cloud_run_artifact - respx_mock_with_pass_through.get( - "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/10000/artifacts/run_results.json", # noqa - headers=HEADERS, - ).mock( - return_value=Response( - 200, - json={ - "metadata": {"env": {"DBT_CLOUD_JOB_ID": "1"}}, - "results": [ - { - "status": "fail", - "message": "FAIL 1", - "failures": None, - "unique_id": "model.jaffle_shop.stg_customers", - }, - ], - }, + }, + ) ) - ) - respx_mock_with_pass_through.post( - "https://cloud.getdbt.com/api/v2/accounts/123456789/jobs/1/run/", - headers=HEADERS, - ).mock( - return_value=Response( - 200, json={"data": {"id": 10000, "project_id": 12345}} + respx_mock.post( + "https://cloud.getdbt.com/api/v2/accounts/123456789/jobs/1/run/", + headers=HEADERS, + ).mock( + return_value=Response( + 200, json={"data": {"id": 10000, "project_id": 12345}} + ) ) - ) - with pytest.raises(DbtCloudJobRunFailed, match="Triggered job run with"): - await retry_dbt_cloud_job_run_subset_and_wait_for_completion( - dbt_cloud_credentials=dbt_cloud_credentials, - run_id=10000, - trigger_job_run_options=trigger_job_run_options, - ) + with pytest.raises(DbtCloudJobRunFailed, match="Triggered job run with"): + await retry_dbt_cloud_job_run_subset_and_wait_for_completion( + dbt_cloud_credentials=dbt_cloud_credentials, + run_id=10000, + trigger_job_run_options=trigger_job_run_options, + ) @pytest.fixture @@ -627,228 +610,253 @@ def test_fail(self): class TestTriggerWaitRetryDbtCloudJobRun: - @pytest.mark.respx(assert_all_called=True) - async def test_run_success(self, respx_mock_with_pass_through, dbt_cloud_job): - respx_mock_with_pass_through.post( - "https://cloud.getdbt.com/api/v2/accounts/123456789/jobs/10000/run/", - headers=HEADERS, - ).mock( - return_value=Response( - 200, json={"data": {"id": 10000, "project_id": 12345}} - ) - ) - respx_mock_with_pass_through.get( - "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/10000/", - headers=HEADERS, - ).mock(return_value=Response(200, json={"data": {"id": 10000, "status": 10}})) - respx_mock_with_pass_through.get( - "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/10000/artifacts/", - headers=HEADERS, - ).mock(return_value=Response(200, json={"data": ["manifest.json"]})) - - result = await run_dbt_cloud_job(dbt_cloud_job=dbt_cloud_job) - assert result == { - "id": 10000, - "status": 10, - "artifact_paths": ["manifest.json"], - } - - @pytest.mark.respx(assert_all_called=True) - async def test_run_timeout(self, respx_mock_with_pass_through, dbt_cloud_job): - respx_mock_with_pass_through.post( - "https://cloud.getdbt.com/api/v2/accounts/123456789/jobs/10000/run/", - headers=HEADERS, - ).mock( - return_value=Response( - 200, json={"data": {"id": 10000, "project_id": 12345}} - ) - ) - respx_mock_with_pass_through.get( - "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/10000/", - headers=HEADERS, - ).mock(return_value=Response(200, json={"data": {"id": 10000, "status": 3}})) - - dbt_cloud_job.timeout_seconds = 1 - with pytest.raises(DbtCloudJobRunTimedOut, match="Max wait time of 1"): - await run_dbt_cloud_job(dbt_cloud_job=dbt_cloud_job) + async def test_run_success(self, dbt_cloud_job): + with respx.mock(using="httpx") as respx_mock: + respx_mock.route(host="127.0.0.1").pass_through() + respx_mock.post( + "https://cloud.getdbt.com/api/v2/accounts/123456789/jobs/10000/run/", + headers=HEADERS, + ).mock( + return_value=Response( + 200, json={"data": {"id": 10000, "project_id": 12345}} + ) + ) + respx_mock.get( + "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/10000/", + headers=HEADERS, + ).mock( + return_value=Response(200, json={"data": {"id": 10000, "status": 10}}) + ) + respx_mock.get( + "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/10000/artifacts/", + headers=HEADERS, + ).mock(return_value=Response(200, json={"data": ["manifest.json"]})) + + result = await run_dbt_cloud_job(dbt_cloud_job=dbt_cloud_job) + assert result == { + "id": 10000, + "status": 10, + "artifact_paths": ["manifest.json"], + } + + async def test_run_timeout(self, dbt_cloud_job): + with respx.mock(using="httpx") as respx_mock: + respx_mock.route(host="127.0.0.1").pass_through() + respx_mock.post( + "https://cloud.getdbt.com/api/v2/accounts/123456789/jobs/10000/run/", + headers=HEADERS, + ).mock( + return_value=Response( + 200, json={"data": {"id": 10000, "project_id": 12345}} + ) + ) + respx_mock.get( + "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/10000/", + headers=HEADERS, + ).mock( + return_value=Response(200, json={"data": {"id": 10000, "status": 3}}) + ) + + dbt_cloud_job.timeout_seconds = 1 + with pytest.raises(DbtCloudJobRunTimedOut, match="Max wait time of 1"): + await run_dbt_cloud_job(dbt_cloud_job=dbt_cloud_job) @pytest.mark.parametrize( "exe_command", ["run", "run-operation"], ) - async def test_fail(self, respx_mock_with_pass_through, dbt_cloud_job, exe_command): - respx_mock_with_pass_through.get( - "https://cloud.getdbt.com/api/v2/accounts/123456789/jobs/10000/", - headers=HEADERS, - ).mock( - return_value=Response( - 200, - json={"data": {"id": 10000, "project_id": 12345, "run_steps": [""]}}, - ) - ) - respx_mock_with_pass_through.post( - "https://cloud.getdbt.com/api/v2/accounts/123456789/jobs/10000/run/", - headers=HEADERS, - ).mock( - return_value=Response( - 200, json={"data": {"id": 10000, "project_id": 12345}} - ) - ) - respx_mock_with_pass_through.get( - "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/10000/", - headers=HEADERS, - ).mock(return_value=Response(200, json={"data": {"id": 10000, "status": 20}})) - - respx_mock_with_pass_through.get( - "https://cloud.getdbt.com/api/v2/accounts/123456789/jobs/100000/", - headers=HEADERS, - ).mock( - return_value=Response( - 200, - json={ - "data": { - "id": 10000, - "generate_docs": False, - "generate_sources": False, - } - }, - ) - ) - - # mock get_dbt_cloud_run_info - respx_mock_with_pass_through.get( - "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/10000/", - headers=HEADERS, - ).mock( - return_value=Response( - 200, - json={ - "data": { - "id": 10000, - "status": 20, # failed status - "run_steps": [ - { - "id": 432100123, - "run_id": 10000, - "account_id": 123456789, - "index": 1, - "name": "Clone Git Repository", - "status_humanized": "Success", - }, - { - "id": 432100124, - "run_id": 10000, - "account_id": 123456789, - "index": 2, - "name": "Create Profile from Connection Snowflake ", - "status_humanized": "Success", - }, + async def test_fail(self, dbt_cloud_job, exe_command): + with respx.mock(using="httpx", assert_all_called=False) as respx_mock: + respx_mock.route(host="127.0.0.1").pass_through() + respx_mock.get( + "https://cloud.getdbt.com/api/v2/accounts/123456789/jobs/10000/", + headers=HEADERS, + ).mock( + return_value=Response( + 200, + json={ + "data": {"id": 10000, "project_id": 12345, "run_steps": [""]} + }, + ) + ) + respx_mock.post( + "https://cloud.getdbt.com/api/v2/accounts/123456789/jobs/10000/run/", + headers=HEADERS, + ).mock( + return_value=Response( + 200, json={"data": {"id": 10000, "project_id": 12345}} + ) + ) + respx_mock.get( + "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/10000/", + headers=HEADERS, + ).mock( + return_value=Response(200, json={"data": {"id": 10000, "status": 20}}) + ) + + respx_mock.get( + "https://cloud.getdbt.com/api/v2/accounts/123456789/jobs/100000/", + headers=HEADERS, + ).mock( + return_value=Response( + 200, + json={ + "data": { + "id": 10000, + "generate_docs": False, + "generate_sources": False, + } + }, + ) + ) + + # mock get_dbt_cloud_run_info + respx_mock.get( + "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/10000/", + headers=HEADERS, + ).mock( + return_value=Response( + 200, + json={ + "data": { + "id": 10000, + "status": 20, # failed status + "run_steps": [ + { + "id": 432100123, + "run_id": 10000, + "account_id": 123456789, + "index": 1, + "name": "Clone Git Repository", + "status_humanized": "Success", + }, + { + "id": 432100124, + "run_id": 10000, + "account_id": 123456789, + "index": 2, + "name": "Create Profile from Connection Snowflake ", + "status_humanized": "Success", + }, + { + "id": 432100125, + "run_id": 10000, + "account_id": 123456789, + "index": 3, + "name": "Invoke dbt with `dbt deps`", + "status_humanized": "Success", + }, + { + "run_id": 10000, + "account_id": 123456789, + "index": 4, + "name": f"Invoke dbt with `dbt {exe_command}`", + "status_humanized": "Error", + }, + ], + "job_id": "1", + } + }, + ) + ) + + # mock list_dbt_cloud_run_artifacts + respx_mock.get( + "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/10000/artifacts/", + headers=HEADERS, + ).mock(return_value=Response(200, json={"data": ["run_results.json"]})) + + # mock get_dbt_cloud_run_artifact + respx_mock.get( + "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/10000/artifacts/run_results.json", # noqa + headers=HEADERS, + ).mock( + return_value=Response( + 200, + json={ + "metadata": {"env": {"DBT_CLOUD_JOB_ID": "1"}}, + "results": [ { - "id": 432100125, - "run_id": 10000, - "account_id": 123456789, - "index": 3, - "name": "Invoke dbt with `dbt deps`", - "status_humanized": "Success", - }, - { - "run_id": 10000, - "account_id": 123456789, - "index": 4, - "name": f"Invoke dbt with `dbt {exe_command}`", - "status_humanized": "Error", + "status": "fail", + "message": "FAIL 1", + "failures": None, + "unique_id": "model.jaffle_shop.stg_customers", }, ], - "job_id": "1", - } - }, - ) - ) - - # mock list_dbt_cloud_run_artifacts - respx_mock_with_pass_through.get( - "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/10000/artifacts/", - headers=HEADERS, - ).mock(return_value=Response(200, json={"data": ["run_results.json"]})) - - # mock get_dbt_cloud_run_artifact - respx_mock_with_pass_through.get( - "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/10000/artifacts/run_results.json", # noqa - headers=HEADERS, - ).mock( - return_value=Response( - 200, - json={ - "metadata": {"env": {"DBT_CLOUD_JOB_ID": "1"}}, - "results": [ - { - "status": "fail", - "message": "FAIL 1", - "failures": None, - "unique_id": "model.jaffle_shop.stg_customers", - }, - ], - }, - ) - ) - - with pytest.raises(DbtCloudJobRunFailed, match="dbt Cloud job 10000"): - await run_dbt_cloud_job(dbt_cloud_job=dbt_cloud_job) - - @pytest.mark.respx(assert_all_called=True) - async def test_cancel(self, respx_mock_with_pass_through, dbt_cloud_job): - respx_mock_with_pass_through.post( - "https://cloud.getdbt.com/api/v2/accounts/123456789/jobs/10000/run/", - headers=HEADERS, - ).mock( - return_value=Response( - 200, json={"data": {"id": 10000, "project_id": 12345}} - ) - ) - respx_mock_with_pass_through.get( - "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/10000/", - headers=HEADERS, - ).mock(return_value=Response(200, json={"data": {"id": 10000, "status": 30}})) - - with pytest.raises(DbtCloudJobRunCancelled, match="dbt Cloud job 10000"): - await run_dbt_cloud_job(dbt_cloud_job=dbt_cloud_job) - - @pytest.mark.respx(assert_all_called=True) - async def test_fetch_result_running(self, respx_mock, dbt_cloud_job): - respx_mock.post( - "https://cloud.getdbt.com/api/v2/accounts/123456789/jobs/10000/run/", + }, + ) + ) + + with pytest.raises(DbtCloudJobRunFailed, match="dbt Cloud job 10000"): + await run_dbt_cloud_job(dbt_cloud_job=dbt_cloud_job) + + async def test_cancel(self, dbt_cloud_job): + with respx.mock(using="httpx") as respx_mock: + respx_mock.route(host="127.0.0.1").pass_through() + respx_mock.post( + "https://cloud.getdbt.com/api/v2/accounts/123456789/jobs/10000/run/", + headers=HEADERS, + ).mock( + return_value=Response( + 200, json={"data": {"id": 10000, "project_id": 12345}} + ) + ) + respx_mock.get( + "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/10000/", + headers=HEADERS, + ).mock( + return_value=Response(200, json={"data": {"id": 10000, "status": 30}}) + ) + + with pytest.raises(DbtCloudJobRunCancelled, match="dbt Cloud job 10000"): + await run_dbt_cloud_job(dbt_cloud_job=dbt_cloud_job) + + async def test_fetch_result_running(self, dbt_cloud_job): + with respx.mock(using="httpx", assert_all_called=False) as respx_mock: + respx_mock.route(host="127.0.0.1").pass_through() + respx_mock.post( + "https://cloud.getdbt.com/api/v2/accounts/123456789/jobs/10000/run/", + headers=HEADERS, + ).mock( + return_value=Response( + 200, json={"data": {"id": 10000, "project_id": 12345}} + ) + ) + respx_mock.get( + "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/10000/", + headers=HEADERS, + ).mock( + return_value=Response(200, json={"data": {"id": 10000, "status": 3}}) + ) + + with pytest.raises(DbtCloudJobRunIncomplete, match="dbt Cloud job 10000"): + run = await dbt_cloud_job.trigger() + await run.fetch_result() + + async def test_fail_auth(self, dbt_cloud_job): + with respx.mock(using="httpx") as respx_mock: + respx_mock.route(host="127.0.0.1").pass_through() + respx_mock.post( + "https://cloud.getdbt.com/api/v2/accounts/123456789/jobs/10000/run/", + headers=HEADERS, + ).mock( + return_value=Response( + 404, json={"status": {"user_message": "Not found"}} + ) + ) + with pytest.raises(DbtCloudJobRunTriggerFailed, match="Not found"): + await run_dbt_cloud_job(dbt_cloud_job=dbt_cloud_job, targeted_retries=0) + + +def test_get_job(dbt_cloud_job): + with respx.mock(using="httpx", assert_all_called=False) as respx_mock: + respx_mock.route(host="127.0.0.1").pass_through() + respx_mock.get( + "https://cloud.getdbt.com/api/v2/accounts/123456789/jobs/10000/", headers=HEADERS, ).mock( return_value=Response( 200, json={"data": {"id": 10000, "project_id": 12345}} ) ) - respx_mock.get( - "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/10000/", - headers=HEADERS, - ).mock(return_value=Response(200, json={"data": {"id": 10000, "status": 3}})) - - with pytest.raises(DbtCloudJobRunIncomplete, match="dbt Cloud job 10000"): - run = await dbt_cloud_job.trigger() - await run.fetch_result() - - @pytest.mark.respx(assert_all_called=True) - async def test_fail_auth(self, respx_mock_with_pass_through, dbt_cloud_job): - respx_mock_with_pass_through.post( - "https://cloud.getdbt.com/api/v2/accounts/123456789/jobs/10000/run/", - headers=HEADERS, - ).mock( - return_value=Response(404, json={"status": {"user_message": "Not found"}}) - ) - with pytest.raises(DbtCloudJobRunTriggerFailed, match="Not found"): - await run_dbt_cloud_job(dbt_cloud_job=dbt_cloud_job, targeted_retries=0) - - -def test_get_job(respx_mock_with_pass_through, dbt_cloud_job): - respx_mock_with_pass_through.get( - "https://cloud.getdbt.com/api/v2/accounts/123456789/jobs/10000/", - headers=HEADERS, - ).mock( - return_value=Response(200, json={"data": {"id": 10000, "project_id": 12345}}) - ) - assert dbt_cloud_job.get_job()["id"] == 10000 + assert dbt_cloud_job.get_job()["id"] == 10000 diff --git a/src/integrations/prefect-dbt/tests/cloud/test_runs.py b/src/integrations/prefect-dbt/tests/cloud/test_runs.py index cbb4fb713d1a..85fe89225cff 100644 --- a/src/integrations/prefect-dbt/tests/cloud/test_runs.py +++ b/src/integrations/prefect-dbt/tests/cloud/test_runs.py @@ -1,4 +1,5 @@ import pytest +import respx from httpx import Response from prefect_dbt.cloud.runs import ( DbtCloudGetRunArtifactFailed, @@ -11,163 +12,176 @@ class TestGetDbtCloudRunInfo: - async def test_get_dbt_cloud_run_info(self, respx_mock, dbt_cloud_credentials): - respx_mock.get( - "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/12/", - headers={"Authorization": "Bearer my_api_key"}, - ).mock(return_value=Response(200, json={"data": {"id": 10000}})) - - response = await get_dbt_cloud_run_info.fn( - dbt_cloud_credentials=dbt_cloud_credentials, - run_id=12, - ) - - assert response == {"id": 10000} - - async def test_get_nonexistent_run(self, respx_mock, dbt_cloud_credentials): - respx_mock.get( - "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/12/", - headers={"Authorization": "Bearer my_api_key"}, - ).mock( - return_value=Response(404, json={"status": {"user_message": "Not found!"}}) - ) - with pytest.raises(DbtCloudGetRunFailed, match="Not found!"): - await get_dbt_cloud_run_info.fn( + async def test_get_dbt_cloud_run_info(self, dbt_cloud_credentials): + with respx.mock(using="httpx") as respx_mock: + respx_mock.get( + "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/12/", + headers={"Authorization": "Bearer my_api_key"}, + ).mock(return_value=Response(200, json={"data": {"id": 10000}})) + + response = await get_dbt_cloud_run_info.fn( dbt_cloud_credentials=dbt_cloud_credentials, run_id=12, ) + assert response == {"id": 10000} + + async def test_get_nonexistent_run(self, dbt_cloud_credentials): + with respx.mock(using="httpx") as respx_mock: + respx_mock.get( + "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/12/", + headers={"Authorization": "Bearer my_api_key"}, + ).mock( + return_value=Response( + 404, json={"status": {"user_message": "Not found!"}} + ) + ) + with pytest.raises(DbtCloudGetRunFailed, match="Not found!"): + await get_dbt_cloud_run_info.fn( + dbt_cloud_credentials=dbt_cloud_credentials, + run_id=12, + ) + class TestDbtCloudListRunArtifacts: - async def test_list_artifacts_success(self, respx_mock, dbt_cloud_credentials): - respx_mock.get( - "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/12/artifacts/", - headers={"Authorization": "Bearer my_api_key"}, - ).mock(return_value=Response(200, json={"data": ["manifest.json"]})) - - response = await list_dbt_cloud_run_artifacts.fn( - dbt_cloud_credentials=dbt_cloud_credentials, - run_id=12, - ) - - assert response == ["manifest.json"] - - async def test_list_artifacts_with_step(self, respx_mock, dbt_cloud_credentials): - respx_mock.get( - "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/12/artifacts/?step=1", # noqa - headers={"Authorization": "Bearer my_api_key"}, - ).mock(return_value=Response(200, json={"data": ["manifest.json"]})) - - response = await list_dbt_cloud_run_artifacts.fn( - dbt_cloud_credentials=dbt_cloud_credentials, run_id=12, step=1 - ) - - assert response == ["manifest.json"] - - async def test_list_artifacts_failure(self, respx_mock, dbt_cloud_credentials): - respx_mock.get( - "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/12/artifacts/", - headers={"Authorization": "Bearer my_api_key"}, - ).mock( - return_value=Response( - 500, json={"status": {"user_message": "This is what went wrong"}} - ) - ) - with pytest.raises( - DbtCloudListRunArtifactsFailed, match="This is what went wrong" - ): - await list_dbt_cloud_run_artifacts.fn( + async def test_list_artifacts_success(self, dbt_cloud_credentials): + with respx.mock(using="httpx") as respx_mock: + respx_mock.get( + "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/12/artifacts/", + headers={"Authorization": "Bearer my_api_key"}, + ).mock(return_value=Response(200, json={"data": ["manifest.json"]})) + + response = await list_dbt_cloud_run_artifacts.fn( dbt_cloud_credentials=dbt_cloud_credentials, run_id=12, ) + assert response == ["manifest.json"] + + async def test_list_artifacts_with_step(self, dbt_cloud_credentials): + with respx.mock(using="httpx") as respx_mock: + respx_mock.get( + "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/12/artifacts/?step=1", # noqa + headers={"Authorization": "Bearer my_api_key"}, + ).mock(return_value=Response(200, json={"data": ["manifest.json"]})) + + response = await list_dbt_cloud_run_artifacts.fn( + dbt_cloud_credentials=dbt_cloud_credentials, run_id=12, step=1 + ) + + assert response == ["manifest.json"] + + async def test_list_artifacts_failure(self, dbt_cloud_credentials): + with respx.mock(using="httpx") as respx_mock: + respx_mock.get( + "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/12/artifacts/", + headers={"Authorization": "Bearer my_api_key"}, + ).mock( + return_value=Response( + 500, json={"status": {"user_message": "This is what went wrong"}} + ) + ) + with pytest.raises( + DbtCloudListRunArtifactsFailed, match="This is what went wrong" + ): + await list_dbt_cloud_run_artifacts.fn( + dbt_cloud_credentials=dbt_cloud_credentials, + run_id=12, + ) + class TestDbtCloudGetRunArtifact: - async def test_get_artifact_success(self, respx_mock, dbt_cloud_credentials): - respx_mock.get( - "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/12/artifacts/manifest.json", # noqa - headers={"Authorization": "Bearer my_api_key"}, - ).mock( - return_value=Response( - 200, - json={ - "metadata": { - "dbt_schema_version": "https://schemas.getdbt.com/dbt/catalog/v1.json", # noqa - "dbt_version": "1.1.1", - } - }, + async def test_get_artifact_success(self, dbt_cloud_credentials): + with respx.mock(using="httpx") as respx_mock: + respx_mock.get( + "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/12/artifacts/manifest.json", # noqa + headers={"Authorization": "Bearer my_api_key"}, + ).mock( + return_value=Response( + 200, + json={ + "metadata": { + "dbt_schema_version": "https://schemas.getdbt.com/dbt/catalog/v1.json", # noqa + "dbt_version": "1.1.1", + } + }, + ) ) - ) - response = await get_dbt_cloud_run_artifact.fn( - dbt_cloud_credentials=dbt_cloud_credentials, run_id=12, path="manifest.json" - ) + response = await get_dbt_cloud_run_artifact.fn( + dbt_cloud_credentials=dbt_cloud_credentials, + run_id=12, + path="manifest.json", + ) - assert response == { - "metadata": { - "dbt_schema_version": "https://schemas.getdbt.com/dbt/catalog/v1.json", - "dbt_version": "1.1.1", + assert response == { + "metadata": { + "dbt_schema_version": "https://schemas.getdbt.com/dbt/catalog/v1.json", + "dbt_version": "1.1.1", + } } - } - - async def test_get_non_json_artifact(self, respx_mock, dbt_cloud_credentials): - respx_mock.get( - "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/12/artifacts/compiled/dbt_artifacts/models/dim_dbt__current_models.sql", # noqa - headers={"Authorization": "Bearer my_api_key"}, - ).mock(return_value=Response(200, text="Hi! I'm some SQL!")) - - response = await get_dbt_cloud_run_artifact.fn( - dbt_cloud_credentials=dbt_cloud_credentials, - run_id=12, - path="compiled/dbt_artifacts/models/dim_dbt__current_models.sql", - ) - - assert response == "Hi! I'm some SQL!" - - async def test_get_artifact_with_step(self, respx_mock, dbt_cloud_credentials): - respx_mock.get( - "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/12/artifacts/manifest.json?step=1", # noqa - headers={"Authorization": "Bearer my_api_key"}, - ).mock( - return_value=Response( - 200, - json={ - "metadata": { - "dbt_schema_version": "https://schemas.getdbt.com/dbt/catalog/v1.json", # noqa - "dbt_version": "1.1.1", - } - }, + + async def test_get_non_json_artifact(self, dbt_cloud_credentials): + with respx.mock(using="httpx") as respx_mock: + respx_mock.get( + "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/12/artifacts/compiled/dbt_artifacts/models/dim_dbt__current_models.sql", # noqa + headers={"Authorization": "Bearer my_api_key"}, + ).mock(return_value=Response(200, text="Hi! I'm some SQL!")) + + response = await get_dbt_cloud_run_artifact.fn( + dbt_cloud_credentials=dbt_cloud_credentials, + run_id=12, + path="compiled/dbt_artifacts/models/dim_dbt__current_models.sql", ) - ) - - response = await get_dbt_cloud_run_artifact.fn( - dbt_cloud_credentials=dbt_cloud_credentials, - run_id=12, - path="manifest.json", - step=1, - ) - - assert response == { - "metadata": { - "dbt_schema_version": "https://schemas.getdbt.com/dbt/catalog/v1.json", - "dbt_version": "1.1.1", - } - } - - async def test_get_artifact_failure(self, respx_mock, dbt_cloud_credentials): - respx_mock.get( - "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/12/artifacts/manifest.json", # noqa - headers={"Authorization": "Bearer my_api_key"}, - ).mock( - return_value=Response( - 500, json={"status": {"user_message": "This is what went wrong"}} + + assert response == "Hi! I'm some SQL!" + + async def test_get_artifact_with_step(self, dbt_cloud_credentials): + with respx.mock(using="httpx") as respx_mock: + respx_mock.get( + "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/12/artifacts/manifest.json?step=1", # noqa + headers={"Authorization": "Bearer my_api_key"}, + ).mock( + return_value=Response( + 200, + json={ + "metadata": { + "dbt_schema_version": "https://schemas.getdbt.com/dbt/catalog/v1.json", # noqa + "dbt_version": "1.1.1", + } + }, + ) ) - ) - with pytest.raises( - DbtCloudGetRunArtifactFailed, match="This is what went wrong" - ): - await get_dbt_cloud_run_artifact.fn( + + response = await get_dbt_cloud_run_artifact.fn( dbt_cloud_credentials=dbt_cloud_credentials, run_id=12, path="manifest.json", + step=1, + ) + + assert response == { + "metadata": { + "dbt_schema_version": "https://schemas.getdbt.com/dbt/catalog/v1.json", + "dbt_version": "1.1.1", + } + } + + async def test_get_artifact_failure(self, dbt_cloud_credentials): + with respx.mock(using="httpx") as respx_mock: + respx_mock.get( + "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/12/artifacts/manifest.json", # noqa + headers={"Authorization": "Bearer my_api_key"}, + ).mock( + return_value=Response( + 500, json={"status": {"user_message": "This is what went wrong"}} + ) ) + with pytest.raises( + DbtCloudGetRunArtifactFailed, match="This is what went wrong" + ): + await get_dbt_cloud_run_artifact.fn( + dbt_cloud_credentials=dbt_cloud_credentials, + run_id=12, + path="manifest.json", + ) diff --git a/src/integrations/prefect-dbt/tests/cloud/test_utils.py b/src/integrations/prefect-dbt/tests/cloud/test_utils.py index b3fee6f13661..7f7f3561a867 100644 --- a/src/integrations/prefect-dbt/tests/cloud/test_utils.py +++ b/src/integrations/prefect-dbt/tests/cloud/test_utils.py @@ -1,4 +1,5 @@ import pytest +import respx from httpx import Response from prefect_dbt.cloud.utils import ( DbtCloudAdministrativeApiCallFailed, @@ -7,64 +8,67 @@ class TestCallDbtCloudAdministrativeApiEndpoint: - async def test_endpoint_returns_json(self, dbt_cloud_credentials, respx_mock): - respx_mock.get( - "https://cloud.getdbt.com/api/v2/accounts/123456789/projects/", - headers={"Authorization": "Bearer my_api_key"}, - ).mock( - return_value=Response( - 200, - json={ - "status": { - "code": 200, - "is_success": True, - "user_message": "Success!", - "developer_message": "", + async def test_endpoint_returns_json(self, dbt_cloud_credentials): + with respx.mock(using="httpx") as respx_mock: + respx_mock.get( + "https://cloud.getdbt.com/api/v2/accounts/123456789/projects/", + headers={"Authorization": "Bearer my_api_key"}, + ).mock( + return_value=Response( + 200, + json={ + "status": { + "code": 200, + "is_success": True, + "user_message": "Success!", + "developer_message": "", + }, + "data": [], }, - "data": [], - }, + ) ) - ) - - result = await call_dbt_cloud_administrative_api_endpoint.fn( - dbt_cloud_credentials=dbt_cloud_credentials, - path="/projects/", - http_method="GET", - ) - - assert result == { - "status": { - "code": 200, - "is_success": True, - "user_message": "Success!", - "developer_message": "", - }, - "data": [], - } - - async def test_endpoint_returns_text(self, dbt_cloud_credentials, respx_mock): - respx_mock.get( - "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/12/artifacts/compiled/dbt_artifacts/models/dim_dbt__current_models.sql", # noqa - headers={"Authorization": "Bearer my_api_key"}, - ).mock(return_value=Response(200, text="Hi! I'm some SQL!")) - result = await call_dbt_cloud_administrative_api_endpoint.fn( - dbt_cloud_credentials=dbt_cloud_credentials, - path="/runs/12/artifacts/compiled/dbt_artifacts/models/dim_dbt__current_models.sql", # noqa - http_method="GET", - ) + result = await call_dbt_cloud_administrative_api_endpoint.fn( + dbt_cloud_credentials=dbt_cloud_credentials, + path="/projects/", + http_method="GET", + ) - assert result == "Hi! I'm some SQL!" + assert result == { + "status": { + "code": 200, + "is_success": True, + "user_message": "Success!", + "developer_message": "", + }, + "data": [], + } - async def test_failure(self, dbt_cloud_credentials, respx_mock): - respx_mock.get( - "https://cloud.getdbt.com/api/v2/accounts/123456789/projects/", - headers={"Authorization": "Bearer my_api_key"}, - ).mock(return_value=Response(500, json={})) + async def test_endpoint_returns_text(self, dbt_cloud_credentials): + with respx.mock(using="httpx") as respx_mock: + respx_mock.get( + "https://cloud.getdbt.com/api/v2/accounts/123456789/runs/12/artifacts/compiled/dbt_artifacts/models/dim_dbt__current_models.sql", # noqa + headers={"Authorization": "Bearer my_api_key"}, + ).mock(return_value=Response(200, text="Hi! I'm some SQL!")) - with pytest.raises(DbtCloudAdministrativeApiCallFailed): - await call_dbt_cloud_administrative_api_endpoint.fn( + result = await call_dbt_cloud_administrative_api_endpoint.fn( dbt_cloud_credentials=dbt_cloud_credentials, - path="/projects/", + path="/runs/12/artifacts/compiled/dbt_artifacts/models/dim_dbt__current_models.sql", # noqa http_method="GET", ) + + assert result == "Hi! I'm some SQL!" + + async def test_failure(self, dbt_cloud_credentials): + with respx.mock(using="httpx") as respx_mock: + respx_mock.get( + "https://cloud.getdbt.com/api/v2/accounts/123456789/projects/", + headers={"Authorization": "Bearer my_api_key"}, + ).mock(return_value=Response(500, json={})) + + with pytest.raises(DbtCloudAdministrativeApiCallFailed): + await call_dbt_cloud_administrative_api_endpoint.fn( + dbt_cloud_credentials=dbt_cloud_credentials, + path="/projects/", + http_method="GET", + ) From 74df5038d8361c4a0d6a5fd388364d146ad1a1aa Mon Sep 17 00:00:00 2001 From: Kevin Cameron Grismore Date: Tue, 3 Dec 2024 12:29:58 -0600 Subject: [PATCH 4/6] choose default target if no target name provided --- .../prefect-dbt/prefect_dbt/cli/configs/base.py | 6 +++--- .../prefect-dbt/tests/cli/configs/test_base.py | 16 ++++++++++------ 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/src/integrations/prefect-dbt/prefect_dbt/cli/configs/base.py b/src/integrations/prefect-dbt/prefect_dbt/cli/configs/base.py index 0ee5e3a34888..1bbc4a673940 100644 --- a/src/integrations/prefect-dbt/prefect_dbt/cli/configs/base.py +++ b/src/integrations/prefect-dbt/prefect_dbt/cli/configs/base.py @@ -163,7 +163,7 @@ def from_profiles_yml( profile_name: Name of the profile to use from profiles.yml. If None, uses the first profile. target_name: Name of the target to use from the profile. - If None, uses the first target in the selected profile. + If None, uses the default target in the selected profile. profiles_dir: Path to the directory containing profiles.yml. If None, uses the default profiles directory. allow_field_overrides: If enabled, fields from dbt target configs @@ -192,9 +192,9 @@ def from_profiles_yml( outputs = profile["outputs"] - # If no target specified, use first one + # If no target specified, use default target if target_name is None: - target_name = next(iter(outputs)) + target_name = profile["target"] elif target_name not in outputs: raise ValueError( f"Target {target_name} not found in profile {profile_name}" diff --git a/src/integrations/prefect-dbt/tests/cli/configs/test_base.py b/src/integrations/prefect-dbt/tests/cli/configs/test_base.py index 3a33e78b35eb..3d5dd39a68c5 100644 --- a/src/integrations/prefect-dbt/tests/cli/configs/test_base.py +++ b/src/integrations/prefect-dbt/tests/cli/configs/test_base.py @@ -19,7 +19,8 @@ "schema": "main", "threads": 8, }, - } + }, + "target": "prod", }, "other_project": { "outputs": { @@ -29,7 +30,8 @@ "schema": "analytics", "threads": 4, } - } + }, + "target": "dev", }, "config": {"partial_parse": True}, } @@ -86,8 +88,8 @@ def test_from_profiles_yml_default_profile_target(mock_load_profiles): assert target_configs.type == "duckdb" assert target_configs.schema_ == "main" - assert target_configs.threads == 4 - assert target_configs.extras == {"path": "jaffle_shop.duckdb"} + assert target_configs.threads == 8 + assert target_configs.extras == {"path": "/data/prod/jaffle_shop.duckdb"} def test_from_profiles_yml_explicit_profile_target(mock_load_profiles): @@ -129,7 +131,8 @@ def test_from_profiles_yml_no_schema(mock_load_profiles): # Missing schema field "host": "localhost", } - } + }, + "target": "dev", } } with pytest.raises(ValueError, match="No schema found"): @@ -146,7 +149,8 @@ def test_from_profiles_yml_alternative_schema_keys(mock_load_profiles): "dataset": "my_dataset", # Alternative to schema "project": "my_project", } - } + }, + "target": "dev", } } mock_load_profiles.return_value = mock_profiles From 15f809a655f276285524b21d7c46cd43930de891 Mon Sep 17 00:00:00 2001 From: Kevin Cameron Grismore Date: Tue, 3 Dec 2024 13:28:59 -0600 Subject: [PATCH 5/6] pre-commit docs change --- docs/v3/api-ref/rest-api/server/schema.json | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/docs/v3/api-ref/rest-api/server/schema.json b/docs/v3/api-ref/rest-api/server/schema.json index 13571d0144d9..694f7648d060 100644 --- a/docs/v3/api-ref/rest-api/server/schema.json +++ b/docs/v3/api-ref/rest-api/server/schema.json @@ -16968,7 +16968,9 @@ "next_page": { "anyOf": [ { - "type": "string" + "type": "string", + "minLength": 1, + "format": "uri" }, { "type": "null" From 18738f0701f00c1b3f92aa0d9bfc02b4999e57c0 Mon Sep 17 00:00:00 2001 From: Kevin Cameron Grismore Date: Wed, 4 Dec 2024 09:44:06 -0600 Subject: [PATCH 6/6] docstrings and typehints --- .../prefect-dbt/prefect_dbt/cli/configs/base.py | 13 +++++++------ .../prefect-dbt/tests/test_utilities.py | 9 +++------ 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/src/integrations/prefect-dbt/prefect_dbt/cli/configs/base.py b/src/integrations/prefect-dbt/prefect_dbt/cli/configs/base.py index 1bbc4a673940..1104fc0b7a6c 100644 --- a/src/integrations/prefect-dbt/prefect_dbt/cli/configs/base.py +++ b/src/integrations/prefect-dbt/prefect_dbt/cli/configs/base.py @@ -2,9 +2,10 @@ import abc from pathlib import Path -from typing import Any, Dict, Optional +from typing import Any, Dict, Optional, Type from pydantic import BaseModel, Field +from typing_extensions import Self from prefect.blocks.core import Block from prefect_dbt.utilities import load_profiles_yml @@ -150,7 +151,7 @@ class TargetConfigs(BaseTargetConfigs): @classmethod def from_profiles_yml( - cls, + cls: Type[Self], profile_name: Optional[str] = None, target_name: Optional[str] = None, profiles_dir: Optional[str] = None, @@ -161,13 +162,13 @@ def from_profiles_yml( Args: profile_name: Name of the profile to use from profiles.yml. - If None, uses the first profile. + If None, uses the first profile. target_name: Name of the target to use from the profile. - If None, uses the default target in the selected profile. + If None, uses the default target in the selected profile. profiles_dir: Path to the directory containing profiles.yml. - If None, uses the default profiles directory. + If None, uses the default profiles directory. allow_field_overrides: If enabled, fields from dbt target configs - will override fields provided in extras and credentials. + will override fields provided in extras and credentials. Returns: A TargetConfigs instance populated from the profiles.yml target. diff --git a/src/integrations/prefect-dbt/tests/test_utilities.py b/src/integrations/prefect-dbt/tests/test_utilities.py index af38777f6733..736c04386cac 100644 --- a/src/integrations/prefect-dbt/tests/test_utilities.py +++ b/src/integrations/prefect-dbt/tests/test_utilities.py @@ -39,13 +39,10 @@ def test_get_profiles_dir_default(): assert get_profiles_dir() == expected -def test_get_profiles_dir_from_env(): +def test_get_profiles_dir_from_env(monkeypatch): test_path = "/custom/path" - os.environ["DBT_PROFILES_DIR"] = test_path - try: - assert get_profiles_dir() == test_path - finally: - del os.environ["DBT_PROFILES_DIR"] + monkeypatch.setenv("DBT_PROFILES_DIR", test_path) + assert get_profiles_dir() == test_path def test_load_profiles_yml_success(temp_profiles_dir):