Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add from_profiles_yml to dbt TargetConfigs #16178

Merged
merged 7 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docs/v3/api-ref/rest-api/server/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -16968,7 +16968,9 @@
"next_page": {
"anyOf": [
{
"type": "string"
"type": "string",
"minLength": 1,
"format": "uri"
},
{
"type": "null"
Expand Down
74 changes: 74 additions & 0 deletions src/integrations/prefect-dbt/prefect_dbt/cli/configs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from pydantic import BaseModel, Field

from prefect.blocks.core import Block
from prefect_dbt.utilities import load_profiles_yml


class DbtConfigs(Block, abc.ABC):
Expand Down Expand Up @@ -147,6 +148,79 @@ class TargetConfigs(BaseTargetConfigs):
_logo_url = "https://images.ctfassets.net/gm98wzqotmnx/5zE9lxfzBHjw3tnEup4wWL/9a001902ed43a84c6c96d23b24622e19/dbt-bit_tm.png?h=250" # noqa
_documentation_url = "https://docs.prefect.io/integrations/prefect-dbt" # noqa

@classmethod
def from_profiles_yml(
cls,
kevingrismore marked this conversation as resolved.
Show resolved Hide resolved
profile_name: Optional[str] = None,
target_name: Optional[str] = None,
profiles_dir: Optional[str] = None,
allow_field_overrides: bool = False,
) -> "TargetConfigs":
"""
Create a TargetConfigs instance from a dbt profiles.yml file.

Args:
profile_name: Name of the profile to use from profiles.yml.
If None, uses the first profile.
kevingrismore marked this conversation as resolved.
Show resolved Hide resolved
target_name: Name of the target to use from the profile.
If None, uses the default target in the selected profile.
profiles_dir: Path to the directory containing profiles.yml.
If None, uses the default profiles directory.
allow_field_overrides: If enabled, fields from dbt target configs
will override fields provided in extras and credentials.

Returns:
A TargetConfigs instance populated from the profiles.yml target.

Raises:
ValueError: If profiles.yml is not found or if profile/target is invalid
"""
profiles = load_profiles_yml(profiles_dir)

# If no profile specified, use first non-config one
if profile_name is None:
for name in profiles:
if name != "config":
profile_name = name
break
elif profile_name not in profiles:
raise ValueError(f"Profile {profile_name} not found in profiles.yml")

profile = profiles[profile_name]
if "outputs" not in profile:
raise ValueError(f"No outputs found in profile {profile_name}")

outputs = profile["outputs"]

# If no target specified, use default target
if target_name is None:
target_name = profile["target"]
elif target_name not in outputs:
raise ValueError(
f"Target {target_name} not found in profile {profile_name}"
)

target_config = outputs[target_name]

type = target_config.pop("type")
schema = None
possible_keys = ["schema", "path", "dataset", "database"]
for key in possible_keys:
if key in target_config:
schema = target_config.pop(key)
break

if schema is None:
raise ValueError(f"No schema found. Expected one of: {possible_keys}")
threads = target_config.pop("threads", 4)
return cls(
type=type,
schema=schema,
threads=threads,
extras=target_config or None,
allow_field_overrides=allow_field_overrides,
)


class GlobalConfigs(DbtConfigs):
"""
Expand Down
40 changes: 40 additions & 0 deletions src/integrations/prefect-dbt/prefect_dbt/utilities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""
Utility functions for prefect-dbt
"""
import os
from typing import Any, Dict, Optional

import yaml


def get_profiles_dir() -> str:
"""Get the dbt profiles directory from environment or default location."""
profiles_dir = os.getenv("DBT_PROFILES_DIR")
if not profiles_dir:
profiles_dir = os.path.expanduser("~/.dbt")
return profiles_dir


def load_profiles_yml(profiles_dir: Optional[str]) -> Dict[str, Any]:
"""
Load and parse the profiles.yml file.

Args:
profiles_dir: Path to the directory containing profiles.yml.
If None, uses the default profiles directory.

Returns:
Dict containing the parsed profiles.yml contents

Raises:
ValueError: If profiles.yml is not found
"""
if profiles_dir is None:
profiles_dir = get_profiles_dir()

profiles_path = os.path.join(profiles_dir, "profiles.yml")
if not os.path.exists(profiles_path):
raise ValueError(f"No profiles.yml found at {profiles_path}")

with open(profiles_path, "r") as f:
return yaml.safe_load(f)
116 changes: 116 additions & 0 deletions src/integrations/prefect-dbt/tests/cli/configs/test_base.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,48 @@
from pathlib import Path
from unittest.mock import patch

import pytest
from prefect_dbt.cli.configs.base import GlobalConfigs, TargetConfigs

SAMPLE_PROFILES = {
"jaffle_shop": {
"outputs": {
"dev": {
"type": "duckdb",
"path": "jaffle_shop.duckdb",
"schema": "main",
"threads": 4,
},
"prod": {
"type": "duckdb",
"path": "/data/prod/jaffle_shop.duckdb",
"schema": "main",
"threads": 8,
},
},
"target": "prod",
},
"other_project": {
"outputs": {
"dev": {
"type": "duckdb",
"path": "other_project.duckdb",
"schema": "analytics",
"threads": 4,
}
},
"target": "dev",
},
"config": {"partial_parse": True},
}


@pytest.fixture
def mock_load_profiles():
with patch("prefect_dbt.cli.configs.base.load_profiles_yml") as mock:
mock.return_value = SAMPLE_PROFILES
yield mock


def test_target_configs_get_configs():
target_configs = TargetConfigs(
Expand Down Expand Up @@ -41,3 +81,79 @@ def test_global_configs():
global_configs = GlobalConfigs(log_format="json", send_anonymous_usage_stats=False)
assert global_configs.log_format == "json"
assert global_configs.send_anonymous_usage_stats is False


def test_from_profiles_yml_default_profile_target(mock_load_profiles):
target_configs = TargetConfigs.from_profiles_yml()

assert target_configs.type == "duckdb"
assert target_configs.schema_ == "main"
assert target_configs.threads == 8
assert target_configs.extras == {"path": "/data/prod/jaffle_shop.duckdb"}


def test_from_profiles_yml_explicit_profile_target(mock_load_profiles):
target_configs = TargetConfigs.from_profiles_yml(
profile_name="other_project", target_name="dev"
)

assert target_configs.type == "duckdb"
assert target_configs.schema_ == "analytics"
assert target_configs.threads == 4
assert target_configs.extras == {"path": "other_project.duckdb"}


def test_from_profiles_yml_invalid_profile(mock_load_profiles):
with pytest.raises(ValueError, match="Profile invalid_profile not found"):
TargetConfigs.from_profiles_yml(profile_name="invalid_profile")


def test_from_profiles_yml_invalid_target(mock_load_profiles):
with pytest.raises(ValueError, match="Target invalid_target not found"):
TargetConfigs.from_profiles_yml(
profile_name="jaffle_shop", target_name="invalid_target"
)


def test_from_profiles_yml_no_outputs(mock_load_profiles):
mock_load_profiles.return_value = {"broken": {"some_other_key": {}}}
with pytest.raises(ValueError, match="No outputs found in profile broken"):
TargetConfigs.from_profiles_yml(profile_name="broken")


def test_from_profiles_yml_no_schema(mock_load_profiles):
mock_load_profiles.return_value = {
"test": {
"outputs": {
"dev": {
"type": "postgres",
"threads": 4,
# Missing schema field
"host": "localhost",
}
},
"target": "dev",
}
}
with pytest.raises(ValueError, match="No schema found"):
TargetConfigs.from_profiles_yml(profile_name="test")


def test_from_profiles_yml_alternative_schema_keys(mock_load_profiles):
mock_profiles = {
"test": {
"outputs": {
"dev": {
"type": "bigquery",
"threads": 4,
"dataset": "my_dataset", # Alternative to schema
"project": "my_project",
}
},
"target": "dev",
}
}
mock_load_profiles.return_value = mock_profiles

target_configs = TargetConfigs.from_profiles_yml(profile_name="test")
assert target_configs.schema_ == "my_dataset"
Loading
Loading