Skip to content

Commit

Permalink
Fix assorted source freshness edgecases so check is run or actionable…
Browse files Browse the repository at this point in the history
… information (#9825)

* Ensure BaseRunner handles nodes without `build_path`

Some nodes, like SourceDefinition nodes, don't have a `build_path` property.
This is problematic because we take in nodes with no type checking, and
assume they have properties sometimes, like `build_path`. This was just
the case in BaseRunner's `_handle_generic_exception` and
`_handle_interal_exception` methods. Thus to stop dbt from crashing when
trying to handle an exception related to a node without a `build_path`,
we added an private method to the BaseRunner class for safely trying
to get `build_path`.

* Use keyword arguments when instantiating `Note` events in freshness.py

Previously we were passing arguments during the `Note` event instantiations
in freshness.py as positional arguments. This would cause not the desired
`Note` event to be emitted, but instead get the message
```
[Note] Don't use positional arguments when constructing logging events
```
which was our fault, not the users'. Additionally, we were passing the
level for the event in the `Note` instantiation when we needed to be
passing it to the `fire_event` method.

* Raise error when `loaded_at_field` is `None` and metadata check isn't possible

Previously if a source freshness check didn't have a `loaded_at_field` and
metadata source freshness wasn't supported by the adapter, then we'd log
a warning message and let the source freshness check continue. This was problematic
because the source freshness check couldn't actually continue and the process
would raise an error in the form
```
type object argument after ** must be a mapping, not NoneType
```
because the `freshness` variable was never getting set. This error wasn't particularly
helpful for any person running into it. So instead of letting that error
happen we now deliberately raise an error with helpful information.

* Add test which ensures bad source freshness checks raise appropriate error

This test directly tests that when a source freshness check doesn't have a
`loaded_at_field` and the adapter in use doesn't support metadata checks,
then the appropriate error message gets raised. That is, it directly tests
the change made in a162d53. This test indirectly tests the changes in both
7ec2f82 and 7b0ff31 as the appropriate error can only be raised because
we've fixed other upstream issues via those commits.

* Add changelog entry for source freshness edgecase fixes
  • Loading branch information
QMalcolm committed Mar 27, 2024
1 parent 27e17a7 commit 4f35aaa
Show file tree
Hide file tree
Showing 7 changed files with 149 additions and 33 deletions.
7 changes: 7 additions & 0 deletions .changes/unreleased/Fixes-20240326-162100.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Fixes
body: Fix assorted source freshness edgecases so check is run or actionable information
is given
time: 2024-03-26T16:21:00.008936-07:00
custom:
Author: QMalcolm
Issue: "9078"
7 changes: 5 additions & 2 deletions core/dbt/task/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,9 @@ def __init__(self, config, adapter, node, node_index, num_nodes) -> None:
def compile(self, manifest: Manifest) -> Any:
pass

def _node_build_path(self) -> Optional[str]:
return self.node.build_path if hasattr(self.node, "build_path") else None

def get_result_status(self, result) -> Dict[str, str]:
if result.status == NodeStatus.Error:
return {"node_status": "error", "node_error": str(result.message)}
Expand Down Expand Up @@ -339,15 +342,15 @@ def _handle_catchable_exception(self, e, ctx):
def _handle_internal_exception(self, e, ctx):
fire_event(
InternalErrorOnRun(
build_path=self.node.build_path, exc=str(e), node_info=get_node_info()
build_path=self._node_build_path(), exc=str(e), node_info=get_node_info()
)
)
return str(e)

def _handle_generic_exception(self, e, ctx):
fire_event(
GenericExceptionOnRun(
build_path=self.node.build_path,
build_path=self._node_build_path(),
unique_id=self.node.unique_id,
exc=str(e),
node_info=get_node_info(),
Expand Down
11 changes: 5 additions & 6 deletions core/dbt/task/freshness.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,9 @@ def execute(self, compiled_node, manifest):
if compiled_node.freshness.filter is not None:
fire_event(
Note(
f"A filter cannot be applied to a metadata freshness check on source '{compiled_node.name}'.",
EventLevel.WARN,
)
msg=f"A filter cannot be applied to a metadata freshness check on source '{compiled_node.name}'."
),
EventLevel.WARN,
)

adapter_response, freshness = self.adapter.calculate_freshness_from_metadata(
Expand All @@ -131,9 +131,8 @@ def execute(self, compiled_node, manifest):

status = compiled_node.freshness.status(freshness["age"])
else:
status = FreshnessStatus.Warn
fire_event(
Note(f"Skipping freshness for source {compiled_node.name}."),
raise DbtRuntimeError(
f"Could not compute freshness for source {compiled_node.name}: no 'loaded_at_field' provided and {self.adapter.type()} adapter does not support metadata-based freshness checks."
)

# adapter_response was not returned in previous versions, so this will be None
Expand Down
80 changes: 79 additions & 1 deletion tests/functional/sources/test_source_freshness.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from datetime import datetime, timedelta

import dbt.version
from dbt.artifacts.schemas.freshness import FreshnessResult
from dbt.artifacts.schemas.results import FreshnessStatus
from dbt.cli.main import dbtRunner
from tests.functional.sources.common_source_setup import BaseSourcesTest
from tests.functional.sources.fixtures import (
Expand Down Expand Up @@ -384,7 +386,7 @@ class TestMetadataFreshnessFails:
def models(self):
return {"schema.yml": freshness_via_metadata_schema_yml}

def test_metadata_freshness_fails(self, project):
def test_metadata_freshness_unsupported_parse_warning(self, project):
"""Since the default test adapter (postgres) does not support metadata
based source freshness checks, trying to use that mechanism should
result in a parse-time warning."""
Expand All @@ -399,3 +401,79 @@ def warning_probe(e):
runner.invoke(["parse"])

assert got_warning

def test_metadata_freshness_unsupported_error_when_run(self, project):

runner = dbtRunner()
result = runner.invoke(["source", "freshness"])
assert isinstance(result.result, FreshnessResult)
assert len(result.result.results) == 1
freshness_result = result.result.results[0]
assert freshness_result.status == FreshnessStatus.RuntimeErr
assert "Could not compute freshness for source test_table" in freshness_result.message


class TestHooksInSourceFreshness(SuccessfulSourceFreshnessTest):
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"config-version": 2,
"on-run-start": ["{{ log('on-run-start hooks called') }}"],
"on-run-end": ["{{ log('on-run-end hooks called') }}"],
"flags": {
"source_freshness_run_project_hooks": True,
},
}

def test_hooks_do_run_for_source_freshness(
self,
project,
):
_, log_output = self.run_dbt_and_capture_with_vars(
project,
[
"source",
"freshness",
],
expect_pass=False,
)
assert "on-run-start" in log_output
assert "on-run-end" in log_output


class TestHooksInSourceFreshnessError:
@pytest.fixture(scope="class")
def models(self):
return {
"schema.yml": error_models_schema_yml,
"model.sql": error_models_model_sql,
}

@pytest.fixture(scope="class")
def project_config_update(self):
return {
"config-version": 2,
"on-run-start": ["select fake_column from table_does_not_exist"],
"flags": {
"source_freshness_run_project_hooks": True,
},
}

def test_hooks_do_not_run_for_source_freshness(
self,
project,
):
run_result_error = None

def run_result_error_probe(e):
nonlocal run_result_error
if (
e.info.name == "RunResultError"
and e.info.level == "error"
and "on-run-start" in e.info.msg
):
run_result_error = e.info.msg

runner = dbtRunner(callbacks=[run_result_error_probe])
runner.invoke(["source", "freshness"])
assert 'relation "table_does_not_exist" does not exist' in run_result_error
29 changes: 29 additions & 0 deletions tests/unit/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import pytest

from dbt.artifacts.resources import Quoting, SourceConfig
from dbt.artifacts.resources.types import NodeType
from dbt.contracts.graph.nodes import SourceDefinition


@pytest.fixture
def basic_parsed_source_definition_object():
return SourceDefinition(
columns={},
database="some_db",
description="",
fqn=["test", "source", "my_source", "my_source_table"],
identifier="my_source_table",
loader="stitch",
name="my_source_table",
original_file_path="/root/models/sources.yml",
package_name="test",
path="/root/models/sources.yml",
quoting=Quoting(),
resource_type=NodeType.Source,
schema="some_schema",
source_description="my source description",
source_name="my_source",
unique_id="test.source.my_source.my_source_table",
tags=[],
config=SourceConfig(),
)
24 changes: 24 additions & 0 deletions tests/unit/task/test_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from dbt.task.base import BaseRunner
from dbt.contracts.graph.nodes import SourceDefinition


class MockRunner(BaseRunner):
def compile(self):
pass


class TestBaseRunner:
def test_handle_generic_exception_handles_nodes_without_build_path(
self, basic_parsed_source_definition_object: SourceDefinition
):
# Source definition nodes don't have `build_path` attributes. Thus, this
# test will fail if _handle_generic_exception doesn't account for this
runner = MockRunner(
config=None,
adapter=None,
node=basic_parsed_source_definition_object,
node_index=None,
num_nodes=None,
)
assert not hasattr(basic_parsed_source_definition_object, "build_path")
runner._handle_generic_exception(Exception("bad thing happened"), ctx=None)
24 changes: 0 additions & 24 deletions tests/unit/test_contracts_graph_parsed.py
Original file line number Diff line number Diff line change
Expand Up @@ -1944,30 +1944,6 @@ def basic_parsed_source_definition_dict():
}


@pytest.fixture
def basic_parsed_source_definition_object():
return SourceDefinition(
columns={},
database="some_db",
description="",
fqn=["test", "source", "my_source", "my_source_table"],
identifier="my_source_table",
loader="stitch",
name="my_source_table",
original_file_path="/root/models/sources.yml",
package_name="test",
path="/root/models/sources.yml",
quoting=Quoting(),
resource_type=NodeType.Source,
schema="some_schema",
source_description="my source description",
source_name="my_source",
unique_id="test.source.my_source.my_source_table",
tags=[],
config=SourceConfig(),
)


@pytest.fixture
def complex_parsed_source_definition_dict():
return {
Expand Down

0 comments on commit 4f35aaa

Please sign in to comment.