Skip to content

Commit

Permalink
Add from_profiles_yml to dbt TargetConfigs (#16178)
Browse files Browse the repository at this point in the history
  • Loading branch information
kevingrismore authored Dec 4, 2024
1 parent d47081f commit 47aa8f2
Show file tree
Hide file tree
Showing 7 changed files with 1,214 additions and 883 deletions.
77 changes: 76 additions & 1 deletion src/integrations/prefect-dbt/prefect_dbt/cli/configs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@

import abc
from pathlib import Path
from typing import Any, Dict, Optional
from typing import Any, Dict, Optional, Type

from pydantic import BaseModel, Field
from typing_extensions import Self

from prefect.blocks.core import Block
from prefect_dbt.utilities import load_profiles_yml


class DbtConfigs(Block, abc.ABC):
Expand Down Expand Up @@ -147,6 +149,79 @@ class TargetConfigs(BaseTargetConfigs):
_logo_url = "https://images.ctfassets.net/gm98wzqotmnx/5zE9lxfzBHjw3tnEup4wWL/9a001902ed43a84c6c96d23b24622e19/dbt-bit_tm.png?h=250" # noqa
_documentation_url = "https://docs.prefect.io/integrations/prefect-dbt" # noqa

@classmethod
def from_profiles_yml(
cls: Type[Self],
profile_name: Optional[str] = None,
target_name: Optional[str] = None,
profiles_dir: Optional[str] = None,
allow_field_overrides: bool = False,
) -> "TargetConfigs":
"""
Create a TargetConfigs instance from a dbt profiles.yml file.
Args:
profile_name: Name of the profile to use from profiles.yml.
If None, uses the first profile.
target_name: Name of the target to use from the profile.
If None, uses the default target in the selected profile.
profiles_dir: Path to the directory containing profiles.yml.
If None, uses the default profiles directory.
allow_field_overrides: If enabled, fields from dbt target configs
will override fields provided in extras and credentials.
Returns:
A TargetConfigs instance populated from the profiles.yml target.
Raises:
ValueError: If profiles.yml is not found or if profile/target is invalid
"""
profiles = load_profiles_yml(profiles_dir)

# If no profile specified, use first non-config one
if profile_name is None:
for name in profiles:
if name != "config":
profile_name = name
break
elif profile_name not in profiles:
raise ValueError(f"Profile {profile_name} not found in profiles.yml")

profile = profiles[profile_name]
if "outputs" not in profile:
raise ValueError(f"No outputs found in profile {profile_name}")

outputs = profile["outputs"]

# If no target specified, use default target
if target_name is None:
target_name = profile["target"]
elif target_name not in outputs:
raise ValueError(
f"Target {target_name} not found in profile {profile_name}"
)

target_config = outputs[target_name]

type = target_config.pop("type")
schema = None
possible_keys = ["schema", "path", "dataset", "database"]
for key in possible_keys:
if key in target_config:
schema = target_config.pop(key)
break

if schema is None:
raise ValueError(f"No schema found. Expected one of: {possible_keys}")
threads = target_config.pop("threads", 4)
return cls(
type=type,
schema=schema,
threads=threads,
extras=target_config or None,
allow_field_overrides=allow_field_overrides,
)


class GlobalConfigs(DbtConfigs):
"""
Expand Down
40 changes: 40 additions & 0 deletions src/integrations/prefect-dbt/prefect_dbt/utilities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""
Utility functions for prefect-dbt
"""
import os
from typing import Any, Dict, Optional

import yaml


def get_profiles_dir() -> str:
"""Get the dbt profiles directory from environment or default location."""
profiles_dir = os.getenv("DBT_PROFILES_DIR")
if not profiles_dir:
profiles_dir = os.path.expanduser("~/.dbt")
return profiles_dir


def load_profiles_yml(profiles_dir: Optional[str]) -> Dict[str, Any]:
"""
Load and parse the profiles.yml file.
Args:
profiles_dir: Path to the directory containing profiles.yml.
If None, uses the default profiles directory.
Returns:
Dict containing the parsed profiles.yml contents
Raises:
ValueError: If profiles.yml is not found
"""
if profiles_dir is None:
profiles_dir = get_profiles_dir()

profiles_path = os.path.join(profiles_dir, "profiles.yml")
if not os.path.exists(profiles_path):
raise ValueError(f"No profiles.yml found at {profiles_path}")

with open(profiles_path, "r") as f:
return yaml.safe_load(f)
116 changes: 116 additions & 0 deletions src/integrations/prefect-dbt/tests/cli/configs/test_base.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,48 @@
from pathlib import Path
from unittest.mock import patch

import pytest
from prefect_dbt.cli.configs.base import GlobalConfigs, TargetConfigs

SAMPLE_PROFILES = {
"jaffle_shop": {
"outputs": {
"dev": {
"type": "duckdb",
"path": "jaffle_shop.duckdb",
"schema": "main",
"threads": 4,
},
"prod": {
"type": "duckdb",
"path": "/data/prod/jaffle_shop.duckdb",
"schema": "main",
"threads": 8,
},
},
"target": "prod",
},
"other_project": {
"outputs": {
"dev": {
"type": "duckdb",
"path": "other_project.duckdb",
"schema": "analytics",
"threads": 4,
}
},
"target": "dev",
},
"config": {"partial_parse": True},
}


@pytest.fixture
def mock_load_profiles():
with patch("prefect_dbt.cli.configs.base.load_profiles_yml") as mock:
mock.return_value = SAMPLE_PROFILES
yield mock


def test_target_configs_get_configs():
target_configs = TargetConfigs(
Expand Down Expand Up @@ -41,3 +81,79 @@ def test_global_configs():
global_configs = GlobalConfigs(log_format="json", send_anonymous_usage_stats=False)
assert global_configs.log_format == "json"
assert global_configs.send_anonymous_usage_stats is False


def test_from_profiles_yml_default_profile_target(mock_load_profiles):
target_configs = TargetConfigs.from_profiles_yml()

assert target_configs.type == "duckdb"
assert target_configs.schema_ == "main"
assert target_configs.threads == 8
assert target_configs.extras == {"path": "/data/prod/jaffle_shop.duckdb"}


def test_from_profiles_yml_explicit_profile_target(mock_load_profiles):
target_configs = TargetConfigs.from_profiles_yml(
profile_name="other_project", target_name="dev"
)

assert target_configs.type == "duckdb"
assert target_configs.schema_ == "analytics"
assert target_configs.threads == 4
assert target_configs.extras == {"path": "other_project.duckdb"}


def test_from_profiles_yml_invalid_profile(mock_load_profiles):
with pytest.raises(ValueError, match="Profile invalid_profile not found"):
TargetConfigs.from_profiles_yml(profile_name="invalid_profile")


def test_from_profiles_yml_invalid_target(mock_load_profiles):
with pytest.raises(ValueError, match="Target invalid_target not found"):
TargetConfigs.from_profiles_yml(
profile_name="jaffle_shop", target_name="invalid_target"
)


def test_from_profiles_yml_no_outputs(mock_load_profiles):
mock_load_profiles.return_value = {"broken": {"some_other_key": {}}}
with pytest.raises(ValueError, match="No outputs found in profile broken"):
TargetConfigs.from_profiles_yml(profile_name="broken")


def test_from_profiles_yml_no_schema(mock_load_profiles):
mock_load_profiles.return_value = {
"test": {
"outputs": {
"dev": {
"type": "postgres",
"threads": 4,
# Missing schema field
"host": "localhost",
}
},
"target": "dev",
}
}
with pytest.raises(ValueError, match="No schema found"):
TargetConfigs.from_profiles_yml(profile_name="test")


def test_from_profiles_yml_alternative_schema_keys(mock_load_profiles):
mock_profiles = {
"test": {
"outputs": {
"dev": {
"type": "bigquery",
"threads": 4,
"dataset": "my_dataset", # Alternative to schema
"project": "my_project",
}
},
"target": "dev",
}
}
mock_load_profiles.return_value = mock_profiles

target_configs = TargetConfigs.from_profiles_yml(profile_name="test")
assert target_configs.schema_ == "my_dataset"
Loading

0 comments on commit 47aa8f2

Please sign in to comment.