Skip to content

Commit

Permalink
add from_profiles_yml
Browse files Browse the repository at this point in the history
  • Loading branch information
kevingrismore committed Dec 2, 2024
1 parent beb9980 commit 4a8826a
Show file tree
Hide file tree
Showing 4 changed files with 304 additions and 0 deletions.
68 changes: 68 additions & 0 deletions src/integrations/prefect-dbt/prefect_dbt/cli/configs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from pydantic import BaseModel, Field

from prefect.blocks.core import Block
from prefect_dbt.utilities import load_profiles_yml


class DbtConfigs(Block, abc.ABC):
Expand Down Expand Up @@ -147,6 +148,73 @@ class TargetConfigs(BaseTargetConfigs):
_logo_url = "https://images.ctfassets.net/gm98wzqotmnx/5zE9lxfzBHjw3tnEup4wWL/9a001902ed43a84c6c96d23b24622e19/dbt-bit_tm.png?h=250" # noqa
_documentation_url = "https://docs.prefect.io/integrations/prefect-dbt" # noqa

@classmethod
def from_profiles_yml(
cls,
profile_name: Optional[str] = None,
target_name: Optional[str] = None,
profiles_dir: Optional[str] = None,
allow_field_overrides: bool = False,
) -> "TargetConfigs":
"""
Create a TargetConfigs instance from a dbt profiles.yml file.
Args:
profile_name: Name of the profile to use from profiles.yml.
If None, uses the first profile.
target_name: Name of the target to use from the profile.
If None, uses the first target in the selected profile.
profiles_dir: Path to the directory containing profiles.yml.
If None, uses the default profiles directory.
Returns:
A TargetConfigs instance populated from the profiles.yml target.
Raises:
ValueError: If profiles.yml is not found or if profile/target is invalid
"""
profiles = load_profiles_yml(profiles_dir)

# If no profile specified, use first non-config one
if profile_name is None:
for name in profiles:
if name != "config":
profile_name = name
break
elif profile_name not in profiles:
raise ValueError(f"Profile {profile_name} not found in profiles.yml")

profile = profiles[profile_name]
if "outputs" not in profile:
raise ValueError(f"No outputs found in profile {profile_name}")

outputs = profile["outputs"]

# If no target specified, use first one
if target_name is None:
target_name = next(iter(outputs))
elif target_name not in outputs:
raise ValueError(
f"Target {target_name} not found in profile {profile_name}"
)

target_config = outputs[target_name]

type = target_config.pop("type")
schema = None
possible_keys = ["schema", "path", "dataset", "database"]
for key in possible_keys:
if key in target_config:
schema = target_config.pop(key)
break

if schema is None:
raise ValueError(f"No schema found. Expected one of: {possible_keys}")
threads = target_config.pop("threads", 4)
return cls(
type=type, schema=schema, threads=threads, extras=target_config or None
)


class GlobalConfigs(DbtConfigs):
"""
Expand Down
40 changes: 40 additions & 0 deletions src/integrations/prefect-dbt/prefect_dbt/utilities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""
Utility functions for prefect-dbt
"""
import os
from typing import Any, Dict, Optional

import yaml


def get_profiles_dir() -> str:
"""Get the dbt profiles directory from environment or default location."""
profiles_dir = os.getenv("DBT_PROFILES_DIR")
if not profiles_dir:
profiles_dir = os.path.expanduser("~/.dbt")
return profiles_dir


def load_profiles_yml(profiles_dir: Optional[str]) -> Dict[str, Any]:
"""
Load and parse the profiles.yml file.
Args:
profiles_dir: Path to the directory containing profiles.yml.
If None, uses the default profiles directory.
Returns:
Dict containing the parsed profiles.yml contents
Raises:
ValueError: If profiles.yml is not found
"""
if profiles_dir is None:
profiles_dir = get_profiles_dir()

profiles_path = os.path.join(profiles_dir, "profiles.yml")
if not os.path.exists(profiles_path):
raise ValueError(f"No profiles.yml found at {profiles_path}")

with open(profiles_path, "r") as f:
return yaml.safe_load(f)
119 changes: 119 additions & 0 deletions src/integrations/prefect-dbt/tests/cli/configs/test_base.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,46 @@
from pathlib import Path
from unittest.mock import patch

import pytest
from prefect_dbt.cli.configs.base import GlobalConfigs, TargetConfigs

SAMPLE_PROFILES = {
"jaffle_shop": {
"outputs": {
"dev": {
"type": "duckdb",
"path": "jaffle_shop.duckdb",
"schema": "main",
"threads": 4,
},
"prod": {
"type": "duckdb",
"path": "/data/prod/jaffle_shop.duckdb",
"schema": "main",
"threads": 8,
},
}
},
"other_project": {
"outputs": {
"dev": {
"type": "duckdb",
"path": "other_project.duckdb",
"schema": "analytics",
"threads": 4,
}
}
},
"config": {"partial_parse": True},
}


@pytest.fixture
def mock_load_profiles():
with patch("prefect_dbt.cli.configs.base.load_profiles_yml") as mock:
mock.return_value = SAMPLE_PROFILES
yield mock


def test_target_configs_get_configs():
target_configs = TargetConfigs(
Expand Down Expand Up @@ -41,3 +79,84 @@ def test_global_configs():
global_configs = GlobalConfigs(log_format="json", send_anonymous_usage_stats=False)
assert global_configs.log_format == "json"
assert global_configs.send_anonymous_usage_stats is False


def test_from_profiles_yml_default_profile_target(mock_load_profiles):
"""Test loading with default profile and target"""
target_configs = TargetConfigs.from_profiles_yml()

assert target_configs.type == "duckdb"
assert target_configs.schema_ == "main"
assert target_configs.threads == 4
assert target_configs.extras == {"path": "jaffle_shop.duckdb"}


def test_from_profiles_yml_explicit_profile_target(mock_load_profiles):
"""Test loading with explicitly specified profile and target"""
target_configs = TargetConfigs.from_profiles_yml(
profile_name="other_project", target_name="dev"
)

assert target_configs.type == "duckdb"
assert target_configs.schema_ == "analytics"
assert target_configs.threads == 4
assert target_configs.extras == {"path": "other_project.duckdb"}


def test_from_profiles_yml_invalid_profile(mock_load_profiles):
"""Test error when invalid profile name provided"""
with pytest.raises(ValueError, match="Profile invalid_profile not found"):
TargetConfigs.from_profiles_yml(profile_name="invalid_profile")


def test_from_profiles_yml_invalid_target(mock_load_profiles):
"""Test error when invalid target name provided"""
with pytest.raises(ValueError, match="Target invalid_target not found"):
TargetConfigs.from_profiles_yml(
profile_name="jaffle_shop", target_name="invalid_target"
)


def test_from_profiles_yml_no_outputs(mock_load_profiles):
"""Test error when profile has no outputs section"""
mock_load_profiles.return_value = {"broken": {"some_other_key": {}}}
with pytest.raises(ValueError, match="No outputs found in profile broken"):
TargetConfigs.from_profiles_yml(profile_name="broken")


def test_from_profiles_yml_no_schema(mock_load_profiles):
"""Test error when target has no schema or equivalent field"""
mock_load_profiles.return_value = {
"test": {
"outputs": {
"dev": {
"type": "postgres",
"threads": 4,
# Missing schema field
"host": "localhost",
}
}
}
}
with pytest.raises(ValueError, match="No schema found"):
TargetConfigs.from_profiles_yml(profile_name="test")


def test_from_profiles_yml_alternative_schema_keys(mock_load_profiles):
"""Test that alternative schema keys (dataset, database) work"""
mock_profiles = {
"test": {
"outputs": {
"dev": {
"type": "bigquery",
"threads": 4,
"dataset": "my_dataset", # Alternative to schema
"project": "my_project",
}
}
}
}
mock_load_profiles.return_value = mock_profiles

target_configs = TargetConfigs.from_profiles_yml(profile_name="test")
assert target_configs.schema_ == "my_dataset"
77 changes: 77 additions & 0 deletions src/integrations/prefect-dbt/tests/test_utilities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import os
from pathlib import Path

import pytest
import yaml
from prefect_dbt.utilities import get_profiles_dir, load_profiles_yml

SAMPLE_PROFILES = {
"jaffle_shop": {
"outputs": {
"dev": {
"type": "duckdb",
"path": "jaffle_shop.duckdb",
"schema": "main",
"threads": 4,
}
}
}
}


@pytest.fixture
def temp_profiles_dir(tmp_path):
profiles_dir = tmp_path / ".dbt"
profiles_dir.mkdir()

profiles_path = profiles_dir / "profiles.yml"
with open(profiles_path, "w") as f:
yaml.dump(SAMPLE_PROFILES, f)

return str(profiles_dir)


def test_get_profiles_dir_default():
if "DBT_PROFILES_DIR" in os.environ:
del os.environ["DBT_PROFILES_DIR"]

expected = os.path.expanduser("~/.dbt")
assert get_profiles_dir() == expected


def test_get_profiles_dir_from_env():
test_path = "/custom/path"
os.environ["DBT_PROFILES_DIR"] = test_path
try:
assert get_profiles_dir() == test_path
finally:
del os.environ["DBT_PROFILES_DIR"]


def test_load_profiles_yml_success(temp_profiles_dir):
profiles = load_profiles_yml(temp_profiles_dir)
assert profiles == SAMPLE_PROFILES


def test_load_profiles_yml_default_dir(monkeypatch, temp_profiles_dir):
monkeypatch.setenv("DBT_PROFILES_DIR", temp_profiles_dir)
profiles = load_profiles_yml(None)
assert profiles == SAMPLE_PROFILES


def test_load_profiles_yml_file_not_found():
nonexistent_dir = "/path/that/does/not/exist"
with pytest.raises(
ValueError,
match=f"No profiles.yml found at {os.path.join(nonexistent_dir, 'profiles.yml')}",
):
load_profiles_yml(nonexistent_dir)


def test_load_profiles_yml_invalid_yaml(temp_profiles_dir):
profiles_path = Path(temp_profiles_dir) / "profiles.yml"
with open(profiles_path, "w") as f:
f.write("invalid: yaml: content:\nindentation error")

with pytest.raises(yaml.YAMLError):
load_profiles_yml(temp_profiles_dir)

0 comments on commit 4a8826a

Please sign in to comment.