Skip to content

Commit

Permalink
[dagster-dbt] add translator option to attach model files as code ref…
Browse files Browse the repository at this point in the history
…erence metadata (#21888)

## Summary

Adds the option to a custom `DagsterDbtTranslator` to attach code
reference metadata pointing to the dbt model files for each asset.

Unfortunately, this requires that we also get the `project_dir ` from
the user, since that is only available at run time as part of the
resource. Maybe there's a good way around this? The manifest used to
have a `root_path` field, but this [was recently
removed](dbt-labs/dbt-core#6171).
 
```python
@dbt_assets(
    manifest=...,
    dagster_dbt_translator=DagsterDbtTranslator(
        settings=DagsterDbtTranslatorSettings(attach_sql_model_code_reference=True)
    ),
    project=DbtProject(...)
)
def my_dbt_assets():
  ...
```

## Test Plan

Unit tests.
  • Loading branch information
benpankow authored and nikomancy committed May 22, 2024
1 parent a6c8640 commit 2aa7847
Show file tree
Hide file tree
Showing 3 changed files with 206 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import os
from pathlib import Path
from typing import (
Any,
Callable,
Expand All @@ -24,10 +26,17 @@
TimeWindowPartitionsDefinition,
multi_asset,
)
from dagster._core.definitions.metadata.source_code import (
CodeReferencesMetadataSet,
CodeReferencesMetadataValue,
LocalFileCodeReference,
)
from dagster._utils.warnings import (
experimental_warning,
)

from dagster_dbt.dbt_project import DbtProject

from .asset_utils import (
DAGSTER_DBT_EXCLUDE_METADATA_KEY,
DAGSTER_DBT_MANIFEST_METADATA_KEY,
Expand Down Expand Up @@ -67,6 +76,7 @@ def dbt_assets(
backfill_policy: Optional[BackfillPolicy] = None,
op_tags: Optional[Mapping[str, Any]] = None,
required_resource_keys: Optional[Set[str]] = None,
project: Optional[DbtProject] = None,
) -> Callable[[Callable[..., Any]], AssetsDefinition]:
"""Create a definition for how to compute a set of dbt resources, described by a manifest.json.
When invoking dbt commands using :py:class:`~dagster_dbt.DbtCliResource`'s
Expand Down Expand Up @@ -98,6 +108,9 @@ def dbt_assets(
are not strings will be json encoded and must meet the criteria that
`json.loads(json.dumps(value)) == value`.
required_resource_keys (Optional[Set[str]]): Set of required resource handles.
project (Optional[DbtProject]): A DbtProject instance which provides a pointer to the dbt
project location and manifest. Not required, but needed to attach code references from
model code to Dagster assets.
Examples:
Running ``dbt build`` for a dbt project:
Expand Down Expand Up @@ -349,6 +362,7 @@ def partitionshop_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource
io_manager_key=io_manager_key,
manifest=manifest,
dagster_dbt_translator=dagster_dbt_translator,
project=project,
)

if op_tags and DAGSTER_DBT_SELECT_METADATA_KEY in op_tags:
Expand Down Expand Up @@ -391,12 +405,54 @@ def partitionshop_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource
)


def _attach_sql_model_code_reference(
existing_metadata: Dict[str, Any],
dbt_resource_props: Dict[str, Any],
project: DbtProject,
) -> Dict[str, Any]:
"""Pulls the SQL model location for a dbt resource and attaches it as a code reference to the
existing metadata.
"""
existing_references_meta = CodeReferencesMetadataSet.extract(existing_metadata)
references = (
existing_references_meta.code_references.code_references
if existing_references_meta.code_references
else []
)

if "original_file_path" not in dbt_resource_props:
raise DagsterInvalidDefinitionError(
"Cannot attach SQL model code reference because 'original_file_path' is not present"
" in the dbt resource properties."
)

# attempt to get root_path, which is removed from manifests in newer dbt versions
relative_path = Path(dbt_resource_props["original_file_path"])
abs_path = project.project_dir.joinpath(relative_path).resolve()

return {
**existing_metadata,
**CodeReferencesMetadataSet(
code_references=CodeReferencesMetadataValue(
code_references=[
*references,
LocalFileCodeReference(
file_path=os.fspath(abs_path),
line_number=1,
),
],
)
),
}


def get_dbt_multi_asset_args(
dbt_nodes: Mapping[str, Any],
dbt_unique_id_deps: Mapping[str, FrozenSet[str]],
io_manager_key: Optional[str],
manifest: Mapping[str, Any],
dagster_dbt_translator: DagsterDbtTranslator,
project: Optional[DbtProject],
) -> Tuple[
Sequence[AssetDep],
Dict[str, AssetOut],
Expand Down Expand Up @@ -429,17 +485,31 @@ def get_dbt_multi_asset_args(
unique_ids_for_asset_key.add(unique_id)
resource_types_for_asset_key.add(dbt_resource_props["resource_type"])

metadata = {
**dagster_dbt_translator.get_metadata(dbt_resource_props),
DAGSTER_DBT_MANIFEST_METADATA_KEY: DbtManifestWrapper(manifest=manifest),
DAGSTER_DBT_TRANSLATOR_METADATA_KEY: dagster_dbt_translator,
}
if dagster_dbt_translator.settings.enable_code_references:
if not project:
raise DagsterInvalidDefinitionError(
"enable_code_references requires a DbtProject to be supplied"
" to the @dbt_assets decorator."
)

metadata = _attach_sql_model_code_reference(
existing_metadata=metadata,
dbt_resource_props=dbt_resource_props,
project=project,
)

outs[output_name] = AssetOut(
key=asset_key,
dagster_type=Nothing,
io_manager_key=io_manager_key,
description=dagster_dbt_translator.get_description(dbt_resource_props),
is_required=False,
metadata={
**dagster_dbt_translator.get_metadata(dbt_resource_props),
DAGSTER_DBT_MANIFEST_METADATA_KEY: DbtManifestWrapper(manifest=manifest),
DAGSTER_DBT_TRANSLATOR_METADATA_KEY: dagster_dbt_translator,
},
metadata=metadata,
owners=dagster_dbt_translator.get_owners(
{
**dbt_resource_props,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class DagsterDbtTranslatorSettings:

enable_asset_checks: bool = True
enable_duplicate_source_asset_keys: bool = False
enable_code_references: bool = False


class DagsterDbtTranslator:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import inspect
import os
from typing import Any, Dict

import pytest
from dagster._core.definitions.definitions_class import Definitions
from dagster._core.definitions.metadata.source_code import (
LocalFileCodeReference,
UrlCodeReference,
link_to_source_control,
with_source_code_references,
)
from dagster._core.errors import DagsterInvalidDefinitionError
from dagster_dbt import DbtProject
from dagster_dbt.asset_decorator import dbt_assets
from dagster_dbt.dagster_dbt_translator import DagsterDbtTranslator, DagsterDbtTranslatorSettings

from ..dbt_projects import (
test_jaffle_shop_path,
)

JAFFLE_SHOP_ROOT_PATH = os.path.normpath(test_jaffle_shop_path)


def test_basic_attach_code_references(test_jaffle_shop_manifest: Dict[str, Any]) -> None:
@dbt_assets(
manifest=test_jaffle_shop_manifest,
dagster_dbt_translator=DagsterDbtTranslator(
settings=DagsterDbtTranslatorSettings(enable_code_references=True)
),
project=DbtProject(project_dir=os.fspath(test_jaffle_shop_path)),
)
def my_dbt_assets(): ...

for asset_key, asset_metadata in my_dbt_assets.metadata_by_key.items():
assert "dagster/code_references" in asset_metadata

references = asset_metadata["dagster/code_references"].code_references
assert len(references) == 1

reference = references[0]
assert isinstance(reference, LocalFileCodeReference)
assert reference.file_path.endswith(
asset_key.path[-1] + ".sql"
) or reference.file_path.endswith(asset_key.path[-1] + ".csv")
assert os.path.exists(reference.file_path), reference.file_path


def test_basic_attach_code_references_no_project_dir(
test_jaffle_shop_manifest: Dict[str, Any],
) -> None:
# expect exception because enable_code_references=True but no project_dir
with pytest.raises(DagsterInvalidDefinitionError):

@dbt_assets(
manifest=test_jaffle_shop_manifest,
dagster_dbt_translator=DagsterDbtTranslator(
settings=DagsterDbtTranslatorSettings(enable_code_references=True)
),
)
def my_dbt_assets(): ...


def test_with_source_code_references_wrapper(test_jaffle_shop_manifest: Dict[str, Any]) -> None:
@dbt_assets(
manifest=test_jaffle_shop_manifest,
dagster_dbt_translator=DagsterDbtTranslator(
settings=DagsterDbtTranslatorSettings(enable_code_references=True)
),
project=DbtProject(project_dir=os.fspath(test_jaffle_shop_path)),
)
def my_dbt_assets(): ...

defs = Definitions(assets=with_source_code_references([my_dbt_assets]))

assets = defs.get_asset_graph().all_asset_keys

for asset_key in assets:
asset_metadata = defs.get_assets_def(asset_key).metadata_by_key[asset_key]
assert "dagster/code_references" in asset_metadata

references = asset_metadata["dagster/code_references"].code_references
assert len(references) == 2

code_reference = references[1]
assert isinstance(code_reference, LocalFileCodeReference)
assert code_reference.file_path.endswith("test_code_references.py")


def test_link_to_source_control_wrapper(test_jaffle_shop_manifest: Dict[str, Any]) -> None:
@dbt_assets(
manifest=test_jaffle_shop_manifest,
dagster_dbt_translator=DagsterDbtTranslator(
settings=DagsterDbtTranslatorSettings(enable_code_references=True)
),
project=DbtProject(project_dir=os.fspath(test_jaffle_shop_path)),
)
def my_dbt_assets(): ...

defs = Definitions(
assets=link_to_source_control(
with_source_code_references([my_dbt_assets]),
source_control_url="https://github.com/dagster-io/jaffle_shop",
source_control_branch="master",
repository_root_absolute_path=JAFFLE_SHOP_ROOT_PATH,
)
)

assets = defs.get_asset_graph().all_asset_keys

for asset_key in assets:
asset_metadata = defs.get_assets_def(asset_key).metadata_by_key[asset_key]
assert "dagster/code_references" in asset_metadata

references = asset_metadata["dagster/code_references"].code_references
assert len(references) == 2

model_reference = references[0]
assert isinstance(model_reference, UrlCodeReference)
assert model_reference.url.startswith(
"https://github.com/dagster-io/jaffle_shop/tree/master/"
)
assert model_reference.url.endswith(
asset_key.path[-1] + ".sql#L1"
) or model_reference.url.endswith(asset_key.path[-1] + ".csv#L1")

source_reference = references[1]
assert isinstance(source_reference, UrlCodeReference)
line_no = inspect.getsourcelines(my_dbt_assets.op.compute_fn.decorated_fn)[1]
assert source_reference.url.endswith(f"test_code_references.py#L{line_no}")

0 comments on commit 2aa7847

Please sign in to comment.