From a8909069d4cf980c886b99443b9f8185e63c4b7f Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Wed, 27 Mar 2024 08:41:17 -0700 Subject: [PATCH] Fix assorted source freshness edgecases so check is run or actionable information (#9825) * Ensure BaseRunner handles nodes without `build_path` Some nodes, like SourceDefinition nodes, don't have a `build_path` property. This is problematic because we take in nodes with no type checking, and assume they have properties sometimes, like `build_path`. This was just the case in BaseRunner's `_handle_generic_exception` and `_handle_interal_exception` methods. Thus to stop dbt from crashing when trying to handle an exception related to a node without a `build_path`, we added an private method to the BaseRunner class for safely trying to get `build_path`. * Use keyword arguments when instantiating `Note` events in freshness.py Previously we were passing arguments during the `Note` event instantiations in freshness.py as positional arguments. This would cause not the desired `Note` event to be emitted, but instead get the message ``` [Note] Don't use positional arguments when constructing logging events ``` which was our fault, not the users'. Additionally, we were passing the level for the event in the `Note` instantiation when we needed to be passing it to the `fire_event` method. * Raise error when `loaded_at_field` is `None` and metadata check isn't possible Previously if a source freshness check didn't have a `loaded_at_field` and metadata source freshness wasn't supported by the adapter, then we'd log a warning message and let the source freshness check continue. This was problematic because the source freshness check couldn't actually continue and the process would raise an error in the form ``` type object argument after ** must be a mapping, not NoneType ``` because the `freshness` variable was never getting set. This error wasn't particularly helpful for any person running into it. So instead of letting that error happen we now deliberately raise an error with helpful information. * Add test which ensures bad source freshness checks raise appropriate error This test directly tests that when a source freshness check doesn't have a `loaded_at_field` and the adapter in use doesn't support metadata checks, then the appropriate error message gets raised. That is, it directly tests the change made in a162d53a8. This test indirectly tests the changes in both 7ec2f82a9 and 7b0ff3198 as the appropriate error can only be raised because we've fixed other upstream issues via those commits. * Add changelog entry for source freshness edgecase fixes --- .../unreleased/Fixes-20240326-162100.yaml | 7 +++++ core/dbt/task/base.py | 7 +++-- core/dbt/task/freshness.py | 11 ++++--- .../sources/test_source_freshness.py | 14 ++++++++- tests/unit/conftest.py | 29 +++++++++++++++++++ tests/unit/task/test_base.py | 24 +++++++++++++++ tests/unit/test_contracts_graph_parsed.py | 24 --------------- 7 files changed, 83 insertions(+), 33 deletions(-) create mode 100644 .changes/unreleased/Fixes-20240326-162100.yaml create mode 100644 tests/unit/conftest.py create mode 100644 tests/unit/task/test_base.py diff --git a/.changes/unreleased/Fixes-20240326-162100.yaml b/.changes/unreleased/Fixes-20240326-162100.yaml new file mode 100644 index 00000000000..f4c181dbb31 --- /dev/null +++ b/.changes/unreleased/Fixes-20240326-162100.yaml @@ -0,0 +1,7 @@ +kind: Fixes +body: Fix assorted source freshness edgecases so check is run or actionable information + is given +time: 2024-03-26T16:21:00.008936-07:00 +custom: + Author: QMalcolm + Issue: "9078" diff --git a/core/dbt/task/base.py b/core/dbt/task/base.py index 454a0f53d25..b3314f12359 100644 --- a/core/dbt/task/base.py +++ b/core/dbt/task/base.py @@ -209,6 +209,9 @@ def __init__(self, config, adapter, node, node_index, num_nodes) -> None: def compile(self, manifest: Manifest) -> Any: pass + def _node_build_path(self) -> Optional[str]: + return self.node.build_path if hasattr(self.node, "build_path") else None + def get_result_status(self, result) -> Dict[str, str]: if result.status == NodeStatus.Error: return {"node_status": "error", "node_error": str(result.message)} @@ -339,7 +342,7 @@ def _handle_catchable_exception(self, e, ctx): def _handle_internal_exception(self, e, ctx): fire_event( InternalErrorOnRun( - build_path=self.node.build_path, exc=str(e), node_info=get_node_info() + build_path=self._node_build_path(), exc=str(e), node_info=get_node_info() ) ) return str(e) @@ -347,7 +350,7 @@ def _handle_internal_exception(self, e, ctx): def _handle_generic_exception(self, e, ctx): fire_event( GenericExceptionOnRun( - build_path=self.node.build_path, + build_path=self._node_build_path(), unique_id=self.node.unique_id, exc=str(e), node_info=get_node_info(), diff --git a/core/dbt/task/freshness.py b/core/dbt/task/freshness.py index bb8a9c31ce3..114dae326fb 100644 --- a/core/dbt/task/freshness.py +++ b/core/dbt/task/freshness.py @@ -119,9 +119,9 @@ def execute(self, compiled_node, manifest): if compiled_node.freshness.filter is not None: fire_event( Note( - f"A filter cannot be applied to a metadata freshness check on source '{compiled_node.name}'.", - EventLevel.WARN, - ) + msg=f"A filter cannot be applied to a metadata freshness check on source '{compiled_node.name}'." + ), + EventLevel.WARN, ) adapter_response, freshness = self.adapter.calculate_freshness_from_metadata( @@ -131,9 +131,8 @@ def execute(self, compiled_node, manifest): status = compiled_node.freshness.status(freshness["age"]) else: - status = FreshnessStatus.Warn - fire_event( - Note(f"Skipping freshness for source {compiled_node.name}."), + raise DbtRuntimeError( + f"Could not compute freshness for source {compiled_node.name}: no 'loaded_at_field' provided and {self.adapter.type()} adapter does not support metadata-based freshness checks." ) # adapter_response was not returned in previous versions, so this will be None diff --git a/tests/functional/sources/test_source_freshness.py b/tests/functional/sources/test_source_freshness.py index b7c1c93916d..4ef187f932a 100644 --- a/tests/functional/sources/test_source_freshness.py +++ b/tests/functional/sources/test_source_freshness.py @@ -4,6 +4,8 @@ from datetime import datetime, timedelta import dbt.version +from dbt.artifacts.schemas.freshness import FreshnessResult +from dbt.artifacts.schemas.results import FreshnessStatus from dbt.cli.main import dbtRunner from tests.functional.sources.common_source_setup import BaseSourcesTest from tests.functional.sources.fixtures import ( @@ -384,7 +386,7 @@ class TestMetadataFreshnessFails: def models(self): return {"schema.yml": freshness_via_metadata_schema_yml} - def test_metadata_freshness_fails(self, project): + def test_metadata_freshness_unsupported_parse_warning(self, project): """Since the default test adapter (postgres) does not support metadata based source freshness checks, trying to use that mechanism should result in a parse-time warning.""" @@ -399,3 +401,13 @@ def warning_probe(e): runner.invoke(["parse"]) assert got_warning + + def test_metadata_freshness_unsupported_error_when_run(self, project): + + runner = dbtRunner() + result = runner.invoke(["source", "freshness"]) + assert isinstance(result.result, FreshnessResult) + assert len(result.result.results) == 1 + freshness_result = result.result.results[0] + assert freshness_result.status == FreshnessStatus.RuntimeErr + assert "Could not compute freshness for source test_table" in freshness_result.message diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py new file mode 100644 index 00000000000..6f45963cb78 --- /dev/null +++ b/tests/unit/conftest.py @@ -0,0 +1,29 @@ +import pytest + +from dbt.artifacts.resources import Quoting, SourceConfig +from dbt.artifacts.resources.types import NodeType +from dbt.contracts.graph.nodes import SourceDefinition + + +@pytest.fixture +def basic_parsed_source_definition_object(): + return SourceDefinition( + columns={}, + database="some_db", + description="", + fqn=["test", "source", "my_source", "my_source_table"], + identifier="my_source_table", + loader="stitch", + name="my_source_table", + original_file_path="/root/models/sources.yml", + package_name="test", + path="/root/models/sources.yml", + quoting=Quoting(), + resource_type=NodeType.Source, + schema="some_schema", + source_description="my source description", + source_name="my_source", + unique_id="test.source.my_source.my_source_table", + tags=[], + config=SourceConfig(), + ) diff --git a/tests/unit/task/test_base.py b/tests/unit/task/test_base.py new file mode 100644 index 00000000000..5ad48f48385 --- /dev/null +++ b/tests/unit/task/test_base.py @@ -0,0 +1,24 @@ +from dbt.task.base import BaseRunner +from dbt.contracts.graph.nodes import SourceDefinition + + +class MockRunner(BaseRunner): + def compile(self): + pass + + +class TestBaseRunner: + def test_handle_generic_exception_handles_nodes_without_build_path( + self, basic_parsed_source_definition_object: SourceDefinition + ): + # Source definition nodes don't have `build_path` attributes. Thus, this + # test will fail if _handle_generic_exception doesn't account for this + runner = MockRunner( + config=None, + adapter=None, + node=basic_parsed_source_definition_object, + node_index=None, + num_nodes=None, + ) + assert not hasattr(basic_parsed_source_definition_object, "build_path") + runner._handle_generic_exception(Exception("bad thing happened"), ctx=None) diff --git a/tests/unit/test_contracts_graph_parsed.py b/tests/unit/test_contracts_graph_parsed.py index 5628847a3e4..f8ea3b92ab9 100644 --- a/tests/unit/test_contracts_graph_parsed.py +++ b/tests/unit/test_contracts_graph_parsed.py @@ -1944,30 +1944,6 @@ def basic_parsed_source_definition_dict(): } -@pytest.fixture -def basic_parsed_source_definition_object(): - return SourceDefinition( - columns={}, - database="some_db", - description="", - fqn=["test", "source", "my_source", "my_source_table"], - identifier="my_source_table", - loader="stitch", - name="my_source_table", - original_file_path="/root/models/sources.yml", - package_name="test", - path="/root/models/sources.yml", - quoting=Quoting(), - resource_type=NodeType.Source, - schema="some_schema", - source_description="my source description", - source_name="my_source", - unique_id="test.source.my_source.my_source_table", - tags=[], - config=SourceConfig(), - ) - - @pytest.fixture def complex_parsed_source_definition_dict(): return {