From 4a8826a06795be883cc164391dcac76c771b19cb Mon Sep 17 00:00:00 2001 From: Kevin Cameron Grismore Date: Mon, 2 Dec 2024 16:00:46 -0600 Subject: [PATCH] add from_profiles_yml --- .../prefect_dbt/cli/configs/base.py | 68 ++++++++++ .../prefect-dbt/prefect_dbt/utilities.py | 40 ++++++ .../tests/cli/configs/test_base.py | 119 ++++++++++++++++++ .../prefect-dbt/tests/test_utilities.py | 77 ++++++++++++ 4 files changed, 304 insertions(+) create mode 100644 src/integrations/prefect-dbt/prefect_dbt/utilities.py create mode 100644 src/integrations/prefect-dbt/tests/test_utilities.py diff --git a/src/integrations/prefect-dbt/prefect_dbt/cli/configs/base.py b/src/integrations/prefect-dbt/prefect_dbt/cli/configs/base.py index 4aff4a005c1cb..44dbdb21cae92 100644 --- a/src/integrations/prefect-dbt/prefect_dbt/cli/configs/base.py +++ b/src/integrations/prefect-dbt/prefect_dbt/cli/configs/base.py @@ -7,6 +7,7 @@ from pydantic import BaseModel, Field from prefect.blocks.core import Block +from prefect_dbt.utilities import load_profiles_yml class DbtConfigs(Block, abc.ABC): @@ -147,6 +148,73 @@ class TargetConfigs(BaseTargetConfigs): _logo_url = "https://images.ctfassets.net/gm98wzqotmnx/5zE9lxfzBHjw3tnEup4wWL/9a001902ed43a84c6c96d23b24622e19/dbt-bit_tm.png?h=250" # noqa _documentation_url = "https://docs.prefect.io/integrations/prefect-dbt" # noqa + @classmethod + def from_profiles_yml( + cls, + profile_name: Optional[str] = None, + target_name: Optional[str] = None, + profiles_dir: Optional[str] = None, + allow_field_overrides: bool = False, + ) -> "TargetConfigs": + """ + Create a TargetConfigs instance from a dbt profiles.yml file. + + Args: + profile_name: Name of the profile to use from profiles.yml. + If None, uses the first profile. + target_name: Name of the target to use from the profile. + If None, uses the first target in the selected profile. + profiles_dir: Path to the directory containing profiles.yml. + If None, uses the default profiles directory. + + Returns: + A TargetConfigs instance populated from the profiles.yml target. + + Raises: + ValueError: If profiles.yml is not found or if profile/target is invalid + """ + profiles = load_profiles_yml(profiles_dir) + + # If no profile specified, use first non-config one + if profile_name is None: + for name in profiles: + if name != "config": + profile_name = name + break + elif profile_name not in profiles: + raise ValueError(f"Profile {profile_name} not found in profiles.yml") + + profile = profiles[profile_name] + if "outputs" not in profile: + raise ValueError(f"No outputs found in profile {profile_name}") + + outputs = profile["outputs"] + + # If no target specified, use first one + if target_name is None: + target_name = next(iter(outputs)) + elif target_name not in outputs: + raise ValueError( + f"Target {target_name} not found in profile {profile_name}" + ) + + target_config = outputs[target_name] + + type = target_config.pop("type") + schema = None + possible_keys = ["schema", "path", "dataset", "database"] + for key in possible_keys: + if key in target_config: + schema = target_config.pop(key) + break + + if schema is None: + raise ValueError(f"No schema found. Expected one of: {possible_keys}") + threads = target_config.pop("threads", 4) + return cls( + type=type, schema=schema, threads=threads, extras=target_config or None + ) + class GlobalConfigs(DbtConfigs): """ diff --git a/src/integrations/prefect-dbt/prefect_dbt/utilities.py b/src/integrations/prefect-dbt/prefect_dbt/utilities.py new file mode 100644 index 0000000000000..9430e869ef0a6 --- /dev/null +++ b/src/integrations/prefect-dbt/prefect_dbt/utilities.py @@ -0,0 +1,40 @@ +""" +Utility functions for prefect-dbt +""" +import os +from typing import Any, Dict, Optional + +import yaml + + +def get_profiles_dir() -> str: + """Get the dbt profiles directory from environment or default location.""" + profiles_dir = os.getenv("DBT_PROFILES_DIR") + if not profiles_dir: + profiles_dir = os.path.expanduser("~/.dbt") + return profiles_dir + + +def load_profiles_yml(profiles_dir: Optional[str]) -> Dict[str, Any]: + """ + Load and parse the profiles.yml file. + + Args: + profiles_dir: Path to the directory containing profiles.yml. + If None, uses the default profiles directory. + + Returns: + Dict containing the parsed profiles.yml contents + + Raises: + ValueError: If profiles.yml is not found + """ + if profiles_dir is None: + profiles_dir = get_profiles_dir() + + profiles_path = os.path.join(profiles_dir, "profiles.yml") + if not os.path.exists(profiles_path): + raise ValueError(f"No profiles.yml found at {profiles_path}") + + with open(profiles_path, "r") as f: + return yaml.safe_load(f) diff --git a/src/integrations/prefect-dbt/tests/cli/configs/test_base.py b/src/integrations/prefect-dbt/tests/cli/configs/test_base.py index dafd1aa625c12..2043e34c25126 100644 --- a/src/integrations/prefect-dbt/tests/cli/configs/test_base.py +++ b/src/integrations/prefect-dbt/tests/cli/configs/test_base.py @@ -1,8 +1,46 @@ from pathlib import Path +from unittest.mock import patch import pytest from prefect_dbt.cli.configs.base import GlobalConfigs, TargetConfigs +SAMPLE_PROFILES = { + "jaffle_shop": { + "outputs": { + "dev": { + "type": "duckdb", + "path": "jaffle_shop.duckdb", + "schema": "main", + "threads": 4, + }, + "prod": { + "type": "duckdb", + "path": "/data/prod/jaffle_shop.duckdb", + "schema": "main", + "threads": 8, + }, + } + }, + "other_project": { + "outputs": { + "dev": { + "type": "duckdb", + "path": "other_project.duckdb", + "schema": "analytics", + "threads": 4, + } + } + }, + "config": {"partial_parse": True}, +} + + +@pytest.fixture +def mock_load_profiles(): + with patch("prefect_dbt.cli.configs.base.load_profiles_yml") as mock: + mock.return_value = SAMPLE_PROFILES + yield mock + def test_target_configs_get_configs(): target_configs = TargetConfigs( @@ -41,3 +79,84 @@ def test_global_configs(): global_configs = GlobalConfigs(log_format="json", send_anonymous_usage_stats=False) assert global_configs.log_format == "json" assert global_configs.send_anonymous_usage_stats is False + + +def test_from_profiles_yml_default_profile_target(mock_load_profiles): + """Test loading with default profile and target""" + target_configs = TargetConfigs.from_profiles_yml() + + assert target_configs.type == "duckdb" + assert target_configs.schema_ == "main" + assert target_configs.threads == 4 + assert target_configs.extras == {"path": "jaffle_shop.duckdb"} + + +def test_from_profiles_yml_explicit_profile_target(mock_load_profiles): + """Test loading with explicitly specified profile and target""" + target_configs = TargetConfigs.from_profiles_yml( + profile_name="other_project", target_name="dev" + ) + + assert target_configs.type == "duckdb" + assert target_configs.schema_ == "analytics" + assert target_configs.threads == 4 + assert target_configs.extras == {"path": "other_project.duckdb"} + + +def test_from_profiles_yml_invalid_profile(mock_load_profiles): + """Test error when invalid profile name provided""" + with pytest.raises(ValueError, match="Profile invalid_profile not found"): + TargetConfigs.from_profiles_yml(profile_name="invalid_profile") + + +def test_from_profiles_yml_invalid_target(mock_load_profiles): + """Test error when invalid target name provided""" + with pytest.raises(ValueError, match="Target invalid_target not found"): + TargetConfigs.from_profiles_yml( + profile_name="jaffle_shop", target_name="invalid_target" + ) + + +def test_from_profiles_yml_no_outputs(mock_load_profiles): + """Test error when profile has no outputs section""" + mock_load_profiles.return_value = {"broken": {"some_other_key": {}}} + with pytest.raises(ValueError, match="No outputs found in profile broken"): + TargetConfigs.from_profiles_yml(profile_name="broken") + + +def test_from_profiles_yml_no_schema(mock_load_profiles): + """Test error when target has no schema or equivalent field""" + mock_load_profiles.return_value = { + "test": { + "outputs": { + "dev": { + "type": "postgres", + "threads": 4, + # Missing schema field + "host": "localhost", + } + } + } + } + with pytest.raises(ValueError, match="No schema found"): + TargetConfigs.from_profiles_yml(profile_name="test") + + +def test_from_profiles_yml_alternative_schema_keys(mock_load_profiles): + """Test that alternative schema keys (dataset, database) work""" + mock_profiles = { + "test": { + "outputs": { + "dev": { + "type": "bigquery", + "threads": 4, + "dataset": "my_dataset", # Alternative to schema + "project": "my_project", + } + } + } + } + mock_load_profiles.return_value = mock_profiles + + target_configs = TargetConfigs.from_profiles_yml(profile_name="test") + assert target_configs.schema_ == "my_dataset" diff --git a/src/integrations/prefect-dbt/tests/test_utilities.py b/src/integrations/prefect-dbt/tests/test_utilities.py new file mode 100644 index 0000000000000..af38777f67332 --- /dev/null +++ b/src/integrations/prefect-dbt/tests/test_utilities.py @@ -0,0 +1,77 @@ +import os +from pathlib import Path + +import pytest +import yaml +from prefect_dbt.utilities import get_profiles_dir, load_profiles_yml + +SAMPLE_PROFILES = { + "jaffle_shop": { + "outputs": { + "dev": { + "type": "duckdb", + "path": "jaffle_shop.duckdb", + "schema": "main", + "threads": 4, + } + } + } +} + + +@pytest.fixture +def temp_profiles_dir(tmp_path): + profiles_dir = tmp_path / ".dbt" + profiles_dir.mkdir() + + profiles_path = profiles_dir / "profiles.yml" + with open(profiles_path, "w") as f: + yaml.dump(SAMPLE_PROFILES, f) + + return str(profiles_dir) + + +def test_get_profiles_dir_default(): + if "DBT_PROFILES_DIR" in os.environ: + del os.environ["DBT_PROFILES_DIR"] + + expected = os.path.expanduser("~/.dbt") + assert get_profiles_dir() == expected + + +def test_get_profiles_dir_from_env(): + test_path = "/custom/path" + os.environ["DBT_PROFILES_DIR"] = test_path + try: + assert get_profiles_dir() == test_path + finally: + del os.environ["DBT_PROFILES_DIR"] + + +def test_load_profiles_yml_success(temp_profiles_dir): + profiles = load_profiles_yml(temp_profiles_dir) + assert profiles == SAMPLE_PROFILES + + +def test_load_profiles_yml_default_dir(monkeypatch, temp_profiles_dir): + monkeypatch.setenv("DBT_PROFILES_DIR", temp_profiles_dir) + profiles = load_profiles_yml(None) + assert profiles == SAMPLE_PROFILES + + +def test_load_profiles_yml_file_not_found(): + nonexistent_dir = "/path/that/does/not/exist" + with pytest.raises( + ValueError, + match=f"No profiles.yml found at {os.path.join(nonexistent_dir, 'profiles.yml')}", + ): + load_profiles_yml(nonexistent_dir) + + +def test_load_profiles_yml_invalid_yaml(temp_profiles_dir): + profiles_path = Path(temp_profiles_dir) / "profiles.yml" + with open(profiles_path, "w") as f: + f.write("invalid: yaml: content:\nindentation error") + + with pytest.raises(yaml.YAMLError): + load_profiles_yml(temp_profiles_dir)