diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt/asset_decorator.py b/python_modules/libraries/dagster-dbt/dagster_dbt/asset_decorator.py index cb7b91df40bcb..b91a0e73206fb 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt/asset_decorator.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt/asset_decorator.py @@ -1,3 +1,5 @@ +import os +from pathlib import Path from typing import ( Any, Callable, @@ -24,10 +26,17 @@ TimeWindowPartitionsDefinition, multi_asset, ) +from dagster._core.definitions.metadata.source_code import ( + CodeReferencesMetadataSet, + CodeReferencesMetadataValue, + LocalFileCodeReference, +) from dagster._utils.warnings import ( experimental_warning, ) +from dagster_dbt.dbt_project import DbtProject + from .asset_utils import ( DAGSTER_DBT_EXCLUDE_METADATA_KEY, DAGSTER_DBT_MANIFEST_METADATA_KEY, @@ -67,6 +76,7 @@ def dbt_assets( backfill_policy: Optional[BackfillPolicy] = None, op_tags: Optional[Mapping[str, Any]] = None, required_resource_keys: Optional[Set[str]] = None, + project: Optional[DbtProject] = None, ) -> Callable[[Callable[..., Any]], AssetsDefinition]: """Create a definition for how to compute a set of dbt resources, described by a manifest.json. When invoking dbt commands using :py:class:`~dagster_dbt.DbtCliResource`'s @@ -98,6 +108,9 @@ def dbt_assets( are not strings will be json encoded and must meet the criteria that `json.loads(json.dumps(value)) == value`. required_resource_keys (Optional[Set[str]]): Set of required resource handles. + project (Optional[DbtProject]): A DbtProject instance which provides a pointer to the dbt + project location and manifest. Not required, but needed to attach code references from + model code to Dagster assets. Examples: Running ``dbt build`` for a dbt project: @@ -349,6 +362,7 @@ def partitionshop_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource io_manager_key=io_manager_key, manifest=manifest, dagster_dbt_translator=dagster_dbt_translator, + project=project, ) if op_tags and DAGSTER_DBT_SELECT_METADATA_KEY in op_tags: @@ -391,12 +405,54 @@ def partitionshop_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource ) +def _attach_sql_model_code_reference( + existing_metadata: Dict[str, Any], + dbt_resource_props: Dict[str, Any], + project: DbtProject, +) -> Dict[str, Any]: + """Pulls the SQL model location for a dbt resource and attaches it as a code reference to the + existing metadata. + """ + existing_references_meta = CodeReferencesMetadataSet.extract(existing_metadata) + references = ( + existing_references_meta.code_references.code_references + if existing_references_meta.code_references + else [] + ) + + if "original_file_path" not in dbt_resource_props: + raise DagsterInvalidDefinitionError( + "Cannot attach SQL model code reference because 'original_file_path' is not present" + " in the dbt resource properties." + ) + + # attempt to get root_path, which is removed from manifests in newer dbt versions + relative_path = Path(dbt_resource_props["original_file_path"]) + abs_path = project.project_dir.joinpath(relative_path).resolve() + + return { + **existing_metadata, + **CodeReferencesMetadataSet( + code_references=CodeReferencesMetadataValue( + code_references=[ + *references, + LocalFileCodeReference( + file_path=os.fspath(abs_path), + line_number=1, + ), + ], + ) + ), + } + + def get_dbt_multi_asset_args( dbt_nodes: Mapping[str, Any], dbt_unique_id_deps: Mapping[str, FrozenSet[str]], io_manager_key: Optional[str], manifest: Mapping[str, Any], dagster_dbt_translator: DagsterDbtTranslator, + project: Optional[DbtProject], ) -> Tuple[ Sequence[AssetDep], Dict[str, AssetOut], @@ -429,17 +485,31 @@ def get_dbt_multi_asset_args( unique_ids_for_asset_key.add(unique_id) resource_types_for_asset_key.add(dbt_resource_props["resource_type"]) + metadata = { + **dagster_dbt_translator.get_metadata(dbt_resource_props), + DAGSTER_DBT_MANIFEST_METADATA_KEY: DbtManifestWrapper(manifest=manifest), + DAGSTER_DBT_TRANSLATOR_METADATA_KEY: dagster_dbt_translator, + } + if dagster_dbt_translator.settings.enable_code_references: + if not project: + raise DagsterInvalidDefinitionError( + "enable_code_references requires a DbtProject to be supplied" + " to the @dbt_assets decorator." + ) + + metadata = _attach_sql_model_code_reference( + existing_metadata=metadata, + dbt_resource_props=dbt_resource_props, + project=project, + ) + outs[output_name] = AssetOut( key=asset_key, dagster_type=Nothing, io_manager_key=io_manager_key, description=dagster_dbt_translator.get_description(dbt_resource_props), is_required=False, - metadata={ - **dagster_dbt_translator.get_metadata(dbt_resource_props), - DAGSTER_DBT_MANIFEST_METADATA_KEY: DbtManifestWrapper(manifest=manifest), - DAGSTER_DBT_TRANSLATOR_METADATA_KEY: dagster_dbt_translator, - }, + metadata=metadata, owners=dagster_dbt_translator.get_owners( { **dbt_resource_props, diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt/dagster_dbt_translator.py b/python_modules/libraries/dagster-dbt/dagster_dbt/dagster_dbt_translator.py index a67bd768acd5d..d8f95535e6501 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt/dagster_dbt_translator.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt/dagster_dbt_translator.py @@ -39,6 +39,7 @@ class DagsterDbtTranslatorSettings: enable_asset_checks: bool = True enable_duplicate_source_asset_keys: bool = False + enable_code_references: bool = False class DagsterDbtTranslator: diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/test_code_references.py b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/test_code_references.py new file mode 100644 index 0000000000000..03041c6a39920 --- /dev/null +++ b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/test_code_references.py @@ -0,0 +1,130 @@ +import inspect +import os +from typing import Any, Dict + +import pytest +from dagster._core.definitions.definitions_class import Definitions +from dagster._core.definitions.metadata.source_code import ( + LocalFileCodeReference, + UrlCodeReference, + link_to_source_control, + with_source_code_references, +) +from dagster._core.errors import DagsterInvalidDefinitionError +from dagster_dbt import DbtProject +from dagster_dbt.asset_decorator import dbt_assets +from dagster_dbt.dagster_dbt_translator import DagsterDbtTranslator, DagsterDbtTranslatorSettings + +from ..dbt_projects import ( + test_jaffle_shop_path, +) + +JAFFLE_SHOP_ROOT_PATH = os.path.normpath(test_jaffle_shop_path) + + +def test_basic_attach_code_references(test_jaffle_shop_manifest: Dict[str, Any]) -> None: + @dbt_assets( + manifest=test_jaffle_shop_manifest, + dagster_dbt_translator=DagsterDbtTranslator( + settings=DagsterDbtTranslatorSettings(enable_code_references=True) + ), + project=DbtProject(project_dir=os.fspath(test_jaffle_shop_path)), + ) + def my_dbt_assets(): ... + + for asset_key, asset_metadata in my_dbt_assets.metadata_by_key.items(): + assert "dagster/code_references" in asset_metadata + + references = asset_metadata["dagster/code_references"].code_references + assert len(references) == 1 + + reference = references[0] + assert isinstance(reference, LocalFileCodeReference) + assert reference.file_path.endswith( + asset_key.path[-1] + ".sql" + ) or reference.file_path.endswith(asset_key.path[-1] + ".csv") + assert os.path.exists(reference.file_path), reference.file_path + + +def test_basic_attach_code_references_no_project_dir( + test_jaffle_shop_manifest: Dict[str, Any], +) -> None: + # expect exception because enable_code_references=True but no project_dir + with pytest.raises(DagsterInvalidDefinitionError): + + @dbt_assets( + manifest=test_jaffle_shop_manifest, + dagster_dbt_translator=DagsterDbtTranslator( + settings=DagsterDbtTranslatorSettings(enable_code_references=True) + ), + ) + def my_dbt_assets(): ... + + +def test_with_source_code_references_wrapper(test_jaffle_shop_manifest: Dict[str, Any]) -> None: + @dbt_assets( + manifest=test_jaffle_shop_manifest, + dagster_dbt_translator=DagsterDbtTranslator( + settings=DagsterDbtTranslatorSettings(enable_code_references=True) + ), + project=DbtProject(project_dir=os.fspath(test_jaffle_shop_path)), + ) + def my_dbt_assets(): ... + + defs = Definitions(assets=with_source_code_references([my_dbt_assets])) + + assets = defs.get_asset_graph().all_asset_keys + + for asset_key in assets: + asset_metadata = defs.get_assets_def(asset_key).metadata_by_key[asset_key] + assert "dagster/code_references" in asset_metadata + + references = asset_metadata["dagster/code_references"].code_references + assert len(references) == 2 + + code_reference = references[1] + assert isinstance(code_reference, LocalFileCodeReference) + assert code_reference.file_path.endswith("test_code_references.py") + + +def test_link_to_source_control_wrapper(test_jaffle_shop_manifest: Dict[str, Any]) -> None: + @dbt_assets( + manifest=test_jaffle_shop_manifest, + dagster_dbt_translator=DagsterDbtTranslator( + settings=DagsterDbtTranslatorSettings(enable_code_references=True) + ), + project=DbtProject(project_dir=os.fspath(test_jaffle_shop_path)), + ) + def my_dbt_assets(): ... + + defs = Definitions( + assets=link_to_source_control( + with_source_code_references([my_dbt_assets]), + source_control_url="https://github.com/dagster-io/jaffle_shop", + source_control_branch="master", + repository_root_absolute_path=JAFFLE_SHOP_ROOT_PATH, + ) + ) + + assets = defs.get_asset_graph().all_asset_keys + + for asset_key in assets: + asset_metadata = defs.get_assets_def(asset_key).metadata_by_key[asset_key] + assert "dagster/code_references" in asset_metadata + + references = asset_metadata["dagster/code_references"].code_references + assert len(references) == 2 + + model_reference = references[0] + assert isinstance(model_reference, UrlCodeReference) + assert model_reference.url.startswith( + "https://github.com/dagster-io/jaffle_shop/tree/master/" + ) + assert model_reference.url.endswith( + asset_key.path[-1] + ".sql#L1" + ) or model_reference.url.endswith(asset_key.path[-1] + ".csv#L1") + + source_reference = references[1] + assert isinstance(source_reference, UrlCodeReference) + line_no = inspect.getsourcelines(my_dbt_assets.op.compute_fn.decorated_fn)[1] + assert source_reference.url.endswith(f"test_code_references.py#L{line_no}")