Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix assorted source freshness edgecases so check is run or actionable information #9825

Merged
merged 5 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changes/unreleased/Fixes-20240326-162100.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Fixes
body: Fix assorted source freshness edgecases so check is run or actionable information
is given
time: 2024-03-26T16:21:00.008936-07:00
custom:
Author: QMalcolm
Issue: "9078"
7 changes: 5 additions & 2 deletions core/dbt/task/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,9 @@ def __init__(self, config, adapter, node, node_index, num_nodes) -> None:
def compile(self, manifest: Manifest) -> Any:
pass

def _node_build_path(self) -> Optional[str]:
return self.node.build_path if hasattr(self.node, "build_path") else None

def get_result_status(self, result) -> Dict[str, str]:
if result.status == NodeStatus.Error:
return {"node_status": "error", "node_error": str(result.message)}
Expand Down Expand Up @@ -339,15 +342,15 @@ def _handle_catchable_exception(self, e, ctx):
def _handle_internal_exception(self, e, ctx):
fire_event(
InternalErrorOnRun(
build_path=self.node.build_path, exc=str(e), node_info=get_node_info()
build_path=self._node_build_path(), exc=str(e), node_info=get_node_info()
)
)
return str(e)

def _handle_generic_exception(self, e, ctx):
fire_event(
GenericExceptionOnRun(
build_path=self.node.build_path,
build_path=self._node_build_path(),
unique_id=self.node.unique_id,
exc=str(e),
node_info=get_node_info(),
Expand Down
11 changes: 5 additions & 6 deletions core/dbt/task/freshness.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,9 @@
if compiled_node.freshness.filter is not None:
fire_event(
Note(
f"A filter cannot be applied to a metadata freshness check on source '{compiled_node.name}'.",
EventLevel.WARN,
)
msg=f"A filter cannot be applied to a metadata freshness check on source '{compiled_node.name}'."
),
EventLevel.WARN,
)

adapter_response, freshness = self.adapter.calculate_freshness_from_metadata(
Expand All @@ -132,9 +132,8 @@

status = compiled_node.freshness.status(freshness["age"])
else:
status = FreshnessStatus.Warn
fire_event(
Note(f"Skipping freshness for source {compiled_node.name}."),
raise DbtRuntimeError(

Check warning on line 135 in core/dbt/task/freshness.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/freshness.py#L135

Added line #L135 was not covered by tests
f"Could not compute freshness for source {compiled_node.name}: no 'loaded_at_field' provided and {self.adapter.type()} adapter does not support metadata-based freshness checks."
)

# adapter_response was not returned in previous versions, so this will be None
Expand Down
14 changes: 13 additions & 1 deletion tests/functional/sources/test_source_freshness.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import yaml

import dbt.version
from dbt.artifacts.schemas.freshness import FreshnessResult
from dbt.artifacts.schemas.results import FreshnessStatus
from dbt.cli.main import dbtRunner
from tests.functional.sources.common_source_setup import BaseSourcesTest
from tests.functional.sources.fixtures import (
Expand Down Expand Up @@ -385,7 +387,7 @@ class TestMetadataFreshnessFails:
def models(self):
return {"schema.yml": freshness_via_metadata_schema_yml}

def test_metadata_freshness_fails(self, project):
def test_metadata_freshness_unsupported_parse_warning(self, project):
"""Since the default test adapter (postgres) does not support metadata
based source freshness checks, trying to use that mechanism should
result in a parse-time warning."""
Expand All @@ -401,6 +403,16 @@ def warning_probe(e):

assert got_warning

def test_metadata_freshness_unsupported_error_when_run(self, project):

runner = dbtRunner()
result = runner.invoke(["source", "freshness"])
assert isinstance(result.result, FreshnessResult)
assert len(result.result.results) == 1
freshness_result = result.result.results[0]
assert freshness_result.status == FreshnessStatus.RuntimeErr
assert "Could not compute freshness for source test_table" in freshness_result.message


class TestHooksInSourceFreshness(SuccessfulSourceFreshnessTest):
@pytest.fixture(scope="class")
Expand Down
29 changes: 29 additions & 0 deletions tests/unit/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import pytest

from dbt.artifacts.resources import Quoting, SourceConfig
from dbt.artifacts.resources.types import NodeType
from dbt.contracts.graph.nodes import SourceDefinition


@pytest.fixture
def basic_parsed_source_definition_object():
return SourceDefinition(
columns={},
database="some_db",
description="",
fqn=["test", "source", "my_source", "my_source_table"],
identifier="my_source_table",
loader="stitch",
name="my_source_table",
original_file_path="/root/models/sources.yml",
package_name="test",
path="/root/models/sources.yml",
quoting=Quoting(),
resource_type=NodeType.Source,
schema="some_schema",
source_description="my source description",
source_name="my_source",
unique_id="test.source.my_source.my_source_table",
tags=[],
config=SourceConfig(),
)
24 changes: 24 additions & 0 deletions tests/unit/task/test_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from dbt.task.base import BaseRunner
from dbt.contracts.graph.nodes import SourceDefinition


class MockRunner(BaseRunner):
def compile(self):
pass


class TestBaseRunner:
def test_handle_generic_exception_handles_nodes_without_build_path(
self, basic_parsed_source_definition_object: SourceDefinition
):
# Source definition nodes don't have `build_path` attributes. Thus, this
# test will fail if _handle_generic_exception doesn't account for this
runner = MockRunner(
config=None,
adapter=None,
node=basic_parsed_source_definition_object,
node_index=None,
num_nodes=None,
)
assert not hasattr(basic_parsed_source_definition_object, "build_path")
runner._handle_generic_exception(Exception("bad thing happened"), ctx=None)
24 changes: 0 additions & 24 deletions tests/unit/test_contracts_graph_parsed.py
Original file line number Diff line number Diff line change
Expand Up @@ -1954,30 +1954,6 @@ def basic_parsed_source_definition_dict():
}


@pytest.fixture
def basic_parsed_source_definition_object():
return SourceDefinition(
columns={},
database="some_db",
description="",
fqn=["test", "source", "my_source", "my_source_table"],
identifier="my_source_table",
loader="stitch",
name="my_source_table",
original_file_path="/root/models/sources.yml",
package_name="test",
path="/root/models/sources.yml",
quoting=Quoting(),
resource_type=NodeType.Source,
schema="some_schema",
source_description="my source description",
source_name="my_source",
unique_id="test.source.my_source.my_source_table",
tags=[],
config=SourceConfig(),
)


@pytest.fixture
def complex_parsed_source_definition_dict():
return {
Expand Down
Loading