diff --git a/.changes/unreleased/Fixes-20240326-162100.yaml b/.changes/unreleased/Fixes-20240326-162100.yaml new file mode 100644 index 00000000000..f4c181dbb31 --- /dev/null +++ b/.changes/unreleased/Fixes-20240326-162100.yaml @@ -0,0 +1,7 @@ +kind: Fixes +body: Fix assorted source freshness edgecases so check is run or actionable information + is given +time: 2024-03-26T16:21:00.008936-07:00 +custom: + Author: QMalcolm + Issue: "9078" diff --git a/core/dbt/task/base.py b/core/dbt/task/base.py index 454a0f53d25..b3314f12359 100644 --- a/core/dbt/task/base.py +++ b/core/dbt/task/base.py @@ -209,6 +209,9 @@ def __init__(self, config, adapter, node, node_index, num_nodes) -> None: def compile(self, manifest: Manifest) -> Any: pass + def _node_build_path(self) -> Optional[str]: + return self.node.build_path if hasattr(self.node, "build_path") else None + def get_result_status(self, result) -> Dict[str, str]: if result.status == NodeStatus.Error: return {"node_status": "error", "node_error": str(result.message)} @@ -339,7 +342,7 @@ def _handle_catchable_exception(self, e, ctx): def _handle_internal_exception(self, e, ctx): fire_event( InternalErrorOnRun( - build_path=self.node.build_path, exc=str(e), node_info=get_node_info() + build_path=self._node_build_path(), exc=str(e), node_info=get_node_info() ) ) return str(e) @@ -347,7 +350,7 @@ def _handle_internal_exception(self, e, ctx): def _handle_generic_exception(self, e, ctx): fire_event( GenericExceptionOnRun( - build_path=self.node.build_path, + build_path=self._node_build_path(), unique_id=self.node.unique_id, exc=str(e), node_info=get_node_info(), diff --git a/core/dbt/task/freshness.py b/core/dbt/task/freshness.py index bb8a9c31ce3..114dae326fb 100644 --- a/core/dbt/task/freshness.py +++ b/core/dbt/task/freshness.py @@ -119,9 +119,9 @@ def execute(self, compiled_node, manifest): if compiled_node.freshness.filter is not None: fire_event( Note( - f"A filter cannot be applied to a metadata freshness check on source '{compiled_node.name}'.", - EventLevel.WARN, - ) + msg=f"A filter cannot be applied to a metadata freshness check on source '{compiled_node.name}'." + ), + EventLevel.WARN, ) adapter_response, freshness = self.adapter.calculate_freshness_from_metadata( @@ -131,9 +131,8 @@ def execute(self, compiled_node, manifest): status = compiled_node.freshness.status(freshness["age"]) else: - status = FreshnessStatus.Warn - fire_event( - Note(f"Skipping freshness for source {compiled_node.name}."), + raise DbtRuntimeError( + f"Could not compute freshness for source {compiled_node.name}: no 'loaded_at_field' provided and {self.adapter.type()} adapter does not support metadata-based freshness checks." ) # adapter_response was not returned in previous versions, so this will be None diff --git a/tests/functional/sources/test_source_freshness.py b/tests/functional/sources/test_source_freshness.py index b7c1c93916d..95563656458 100644 --- a/tests/functional/sources/test_source_freshness.py +++ b/tests/functional/sources/test_source_freshness.py @@ -4,6 +4,8 @@ from datetime import datetime, timedelta import dbt.version +from dbt.artifacts.schemas.freshness import FreshnessResult +from dbt.artifacts.schemas.results import FreshnessStatus from dbt.cli.main import dbtRunner from tests.functional.sources.common_source_setup import BaseSourcesTest from tests.functional.sources.fixtures import ( @@ -384,7 +386,7 @@ class TestMetadataFreshnessFails: def models(self): return {"schema.yml": freshness_via_metadata_schema_yml} - def test_metadata_freshness_fails(self, project): + def test_metadata_freshness_unsupported_parse_warning(self, project): """Since the default test adapter (postgres) does not support metadata based source freshness checks, trying to use that mechanism should result in a parse-time warning.""" @@ -399,3 +401,79 @@ def warning_probe(e): runner.invoke(["parse"]) assert got_warning + + def test_metadata_freshness_unsupported_error_when_run(self, project): + + runner = dbtRunner() + result = runner.invoke(["source", "freshness"]) + assert isinstance(result.result, FreshnessResult) + assert len(result.result.results) == 1 + freshness_result = result.result.results[0] + assert freshness_result.status == FreshnessStatus.RuntimeErr + assert "Could not compute freshness for source test_table" in freshness_result.message + + +class TestHooksInSourceFreshness(SuccessfulSourceFreshnessTest): + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "config-version": 2, + "on-run-start": ["{{ log('on-run-start hooks called') }}"], + "on-run-end": ["{{ log('on-run-end hooks called') }}"], + "flags": { + "source_freshness_run_project_hooks": True, + }, + } + + def test_hooks_do_run_for_source_freshness( + self, + project, + ): + _, log_output = self.run_dbt_and_capture_with_vars( + project, + [ + "source", + "freshness", + ], + expect_pass=False, + ) + assert "on-run-start" in log_output + assert "on-run-end" in log_output + + +class TestHooksInSourceFreshnessError: + @pytest.fixture(scope="class") + def models(self): + return { + "schema.yml": error_models_schema_yml, + "model.sql": error_models_model_sql, + } + + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "config-version": 2, + "on-run-start": ["select fake_column from table_does_not_exist"], + "flags": { + "source_freshness_run_project_hooks": True, + }, + } + + def test_hooks_do_not_run_for_source_freshness( + self, + project, + ): + run_result_error = None + + def run_result_error_probe(e): + nonlocal run_result_error + if ( + e.info.name == "RunResultError" + and e.info.level == "error" + and "on-run-start" in e.info.msg + ): + run_result_error = e.info.msg + + runner = dbtRunner(callbacks=[run_result_error_probe]) + runner.invoke(["source", "freshness"]) + assert 'relation "table_does_not_exist" does not exist' in run_result_error diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py new file mode 100644 index 00000000000..6f45963cb78 --- /dev/null +++ b/tests/unit/conftest.py @@ -0,0 +1,29 @@ +import pytest + +from dbt.artifacts.resources import Quoting, SourceConfig +from dbt.artifacts.resources.types import NodeType +from dbt.contracts.graph.nodes import SourceDefinition + + +@pytest.fixture +def basic_parsed_source_definition_object(): + return SourceDefinition( + columns={}, + database="some_db", + description="", + fqn=["test", "source", "my_source", "my_source_table"], + identifier="my_source_table", + loader="stitch", + name="my_source_table", + original_file_path="/root/models/sources.yml", + package_name="test", + path="/root/models/sources.yml", + quoting=Quoting(), + resource_type=NodeType.Source, + schema="some_schema", + source_description="my source description", + source_name="my_source", + unique_id="test.source.my_source.my_source_table", + tags=[], + config=SourceConfig(), + ) diff --git a/tests/unit/task/test_base.py b/tests/unit/task/test_base.py new file mode 100644 index 00000000000..5ad48f48385 --- /dev/null +++ b/tests/unit/task/test_base.py @@ -0,0 +1,24 @@ +from dbt.task.base import BaseRunner +from dbt.contracts.graph.nodes import SourceDefinition + + +class MockRunner(BaseRunner): + def compile(self): + pass + + +class TestBaseRunner: + def test_handle_generic_exception_handles_nodes_without_build_path( + self, basic_parsed_source_definition_object: SourceDefinition + ): + # Source definition nodes don't have `build_path` attributes. Thus, this + # test will fail if _handle_generic_exception doesn't account for this + runner = MockRunner( + config=None, + adapter=None, + node=basic_parsed_source_definition_object, + node_index=None, + num_nodes=None, + ) + assert not hasattr(basic_parsed_source_definition_object, "build_path") + runner._handle_generic_exception(Exception("bad thing happened"), ctx=None) diff --git a/tests/unit/test_contracts_graph_parsed.py b/tests/unit/test_contracts_graph_parsed.py index 5628847a3e4..f8ea3b92ab9 100644 --- a/tests/unit/test_contracts_graph_parsed.py +++ b/tests/unit/test_contracts_graph_parsed.py @@ -1944,30 +1944,6 @@ def basic_parsed_source_definition_dict(): } -@pytest.fixture -def basic_parsed_source_definition_object(): - return SourceDefinition( - columns={}, - database="some_db", - description="", - fqn=["test", "source", "my_source", "my_source_table"], - identifier="my_source_table", - loader="stitch", - name="my_source_table", - original_file_path="/root/models/sources.yml", - package_name="test", - path="/root/models/sources.yml", - quoting=Quoting(), - resource_type=NodeType.Source, - schema="some_schema", - source_description="my source description", - source_name="my_source", - unique_id="test.source.my_source.my_source_table", - tags=[], - config=SourceConfig(), - ) - - @pytest.fixture def complex_parsed_source_definition_dict(): return {