Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow snapshots to be defined in YAML. #10749

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240903-132428.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Enable `--resource-type` and `--exclude-resource-type` CLI flags and environment variables for `dbt test`
time: 2024-09-03T13:24:28.592837+01:00
custom:
Author: TowardOliver dbeatty10
Issue: "10656"
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240920-110447.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Allow snapshots to be defined in YAML.
time: 2024-09-20T11:04:47.703117-04:00
custom:
Author: peterallenwebb
Issue: "10246"
6 changes: 6 additions & 0 deletions .changes/unreleased/Under the Hood-20240911-162730.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Under the Hood
body: Add Snowplow tracking for behavior flag deprecations
time: 2024-09-11T16:27:30.293832-04:00
custom:
Author: mikealfare
Issue: "10552"
6 changes: 6 additions & 0 deletions .changes/unreleased/Under the Hood-20240916-102201.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Under the Hood
body: Replace `TestSelector` with `ResourceTypeSelector`
time: 2024-09-16T10:22:01.339462-06:00
custom:
Author: dbeatty10
Issue: "10718"
7 changes: 7 additions & 0 deletions .changes/unreleased/Under the Hood-20240918-170325.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Under the Hood
body: Standardize returning `ResourceTypeSelector` instances in `dbt list` and `dbt
build`
time: 2024-09-18T17:03:25.639516-06:00
custom:
Author: dbeatty10
Issue: "10739"
2 changes: 2 additions & 0 deletions core/dbt/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -785,6 +785,8 @@ def freshness(ctx, **kwargs):
@click.pass_context
@global_flags
@p.exclude
@p.resource_type
@p.exclude_resource_type
@p.profiles_dir
@p.project_dir
@p.select
Expand Down
3 changes: 3 additions & 0 deletions core/dbt/events/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
from functools import partial
from typing import Callable, List

from dbt.tracking import track_behavior_change_warn
from dbt_common.events.base_types import EventLevel, EventMsg
from dbt_common.events.event_manager_client import (
add_callback_to_manager,
add_logger_to_manager,
cleanup_event_logger,
get_event_manager,
Expand Down Expand Up @@ -68,6 +70,7 @@ def setup_event_logger(flags, callbacks: List[Callable[[EventMsg], None]] = [])
make_log_dir_if_missing(flags.LOG_PATH)
event_manager = get_event_manager()
event_manager.callbacks = callbacks.copy()
add_callback_to_manager(track_behavior_change_warn)

if flags.LOG_LEVEL != "none":
line_format = _line_format_from_str(flags.LOG_FORMAT, LineFormat.PlainText)
Expand Down
5 changes: 5 additions & 0 deletions core/dbt/node_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@
NodeType.Snapshot,
]

TEST_NODE_TYPES: List["NodeType"] = [
NodeType.Test,
NodeType.Unit,
]

VERSIONED_NODE_TYPES: List["NodeType"] = [
NodeType.Model,
]
49 changes: 49 additions & 0 deletions core/dbt/parser/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@

# PatchParser.parse()
if "snapshots" in dct:
self._add_yaml_snapshot_nodes_to_manifest(dct["snapshots"], block)
snapshot_parse_result = TestablePatchParser(self, yaml_block, "snapshots").parse()
for test_block in snapshot_parse_result.test_blocks:
self.generic_test_parser.parse_tests(test_block)
Expand Down Expand Up @@ -265,6 +266,54 @@
saved_query_parser = SavedQueryParser(self, yaml_block)
saved_query_parser.parse()

def _add_yaml_snapshot_nodes_to_manifest(
self, snapshots: List[Dict[str, Any]], block: FileBlock
) -> None:
"""We support the creation of simple snapshots in yaml, without an
accompanying SQL definition. For such snapshots, the user must supply
a 'relation' property to indicate the target of the snapshot. This
function looks for such snapshots and adds a node to manifest for each
one we find, since they were not added during SQL parsing."""

rebuild_refs = False
for snapshot in snapshots:
if "relation" in snapshot:
from dbt.parser import SnapshotParser

if "name" not in snapshot:
raise ParsingError("A snapshot must define the 'name' property. ")

Check warning on line 284 in core/dbt/parser/schemas.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/parser/schemas.py#L284

Added line #L284 was not covered by tests

# Reuse the logic of SnapshotParser as far as possible to create
# a new node we can add to the manifest.
parser = SnapshotParser(self.project, self.manifest, self.root_project)
fqn = [self.project.project_name, "snapshots", snapshot["name"]]
snapshot_node = parser._create_parsetime_node(
block,
self.get_compiled_path(block),
parser.initial_config(fqn),
fqn,
snapshot["name"],
)

# Parse the expected ref() or source() expression given by
# 'relation' so that we know what we are snapshotting.
source_or_ref = statically_parse_ref_or_source(snapshot["relation"])
if isinstance(source_or_ref, RefArgs):
snapshot_node.refs.append(source_or_ref)
else:
snapshot_node.sources.append(source_or_ref)

Check warning on line 304 in core/dbt/parser/schemas.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/parser/schemas.py#L304

Added line #L304 was not covered by tests

# Implement the snapshot SQL as a simple select *
snapshot_node.raw_code = "select * from {{ " + snapshot["relation"] + " }}"

# Add our new node to the manifest, and note that ref lookup collections
# will need to be rebuilt.
self.manifest.add_node_nofile(snapshot_node)
rebuild_refs = True

if rebuild_refs:
self.manifest.rebuild_ref_lookup()


Parsed = TypeVar("Parsed", UnpatchedSourceDefinition, ParsedNodePatch, ParsedMacroPatch)
NodeTarget = TypeVar("NodeTarget", UnparsedNodeUpdate, UnparsedAnalysisUpdate, UnparsedModelUpdate)
Expand Down
8 changes: 0 additions & 8 deletions core/dbt/task/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from dbt.graph import Graph, GraphQueue, ResourceTypeSelector
from dbt.node_types import NodeType
from dbt.task.base import BaseRunner, resource_types_from_args
from dbt.task.test import TestSelector
from dbt_common.events.functions import fire_event

from .run import ModelRunner as run_model_runner
Expand Down Expand Up @@ -197,13 +196,6 @@ def get_node_selector(self, no_unit_tests=False) -> ResourceTypeSelector:

resource_types = self.resource_types(no_unit_tests)

if resource_types == [NodeType.Test]:
return TestSelector(
graph=self.graph,
manifest=self.manifest,
previous_state=self.previous_state,
resource_types=resource_types,
)
return ResourceTypeSelector(
graph=self.graph,
manifest=self.manifest,
Expand Down
25 changes: 8 additions & 17 deletions core/dbt/task/list.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from dbt.node_types import NodeType
from dbt.task.base import resource_types_from_args
from dbt.task.runnable import GraphRunnableTask
from dbt.task.test import TestSelector
from dbt_common.events.contextvars import task_contextvars
from dbt_common.events.functions import fire_event, warn_or_error
from dbt_common.events.types import PrintEvent
Expand Down Expand Up @@ -197,24 +196,16 @@ def selection_arg(self):
else:
return self.args.select

def get_node_selector(self):
def get_node_selector(self) -> ResourceTypeSelector:
if self.manifest is None or self.graph is None:
raise DbtInternalError("manifest and graph must be set to get perform node selection")
if self.resource_types == [NodeType.Test]:
return TestSelector(
graph=self.graph,
manifest=self.manifest,
previous_state=self.previous_state,
resource_types=self.resource_types,
)
else:
return ResourceTypeSelector(
graph=self.graph,
manifest=self.manifest,
previous_state=self.previous_state,
resource_types=self.resource_types,
include_empty_nodes=True,
)
return ResourceTypeSelector(
graph=self.graph,
manifest=self.manifest,
previous_state=self.previous_state,
resource_types=self.resource_types,
include_empty_nodes=True,
)

def interpret_results(self, results):
# list command should always return 0 as exit code
Expand Down
76 changes: 54 additions & 22 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,48 @@
)
raise CompilationError(msg, node=model)

def _execute_model(
self,
hook_ctx: Any,
context_config: Any,
model: ModelNode,
context: Dict[str, Any],
materialization_macro: MacroProtocol,
) -> RunResult:
try:
result = MacroGenerator(
materialization_macro, context, stack=context["context_macro_stack"]
)()
finally:
self.adapter.post_model_hook(context_config, hook_ctx)

for relation in self._materialization_relations(result, model):
self.adapter.cache_added(relation.incorporate(dbt_created=True))

return self._build_run_model_result(model, context)

def _execute_microbatch_model(
self,
hook_ctx: Any,
context_config: Any,
model: ModelNode,
manifest: Manifest,
context: Dict[str, Any],
materialization_macro: MacroProtocol,
) -> RunResult:
batch_results = None
try:
batch_results = self._execute_microbatch_materialization(
model, manifest, context, materialization_macro
)
finally:
self.adapter.post_model_hook(context_config, hook_ctx)

if batch_results is not None:
return self._build_run_microbatch_model_result(model, batch_results)
else:
return self._build_run_model_result(model, context)

Check warning on line 393 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L393

Added line #L393 was not covered by tests

def execute(self, model, manifest):
context = generate_runtime_model_context(model, self.config, manifest)

Expand Down Expand Up @@ -378,29 +420,19 @@
)

hook_ctx = self.adapter.pre_model_hook(context_config)
batch_results = None
try:
if (
os.environ.get("DBT_EXPERIMENTAL_MICROBATCH")
and model.config.materialized == "incremental"
and model.config.incremental_strategy == "microbatch"
):
batch_results = self._execute_microbatch_materialization(
model, manifest, context, materialization_macro
)
else:
result = MacroGenerator(
materialization_macro, context, stack=context["context_macro_stack"]
)()
for relation in self._materialization_relations(result, model):
self.adapter.cache_added(relation.incorporate(dbt_created=True))
finally:
self.adapter.post_model_hook(context_config, hook_ctx)

if batch_results:
return self._build_run_microbatch_model_result(model, batch_results)

return self._build_run_model_result(model, context)
if (
os.environ.get("DBT_EXPERIMENTAL_MICROBATCH")
and model.config.materialized == "incremental"
and model.config.incremental_strategy == "microbatch"
):
return self._execute_microbatch_model(
hook_ctx, context_config, model, manifest, context, materialization_macro
)
else:
return self._execute_model(
hook_ctx, context_config, model, context, materialization_macro
)

def _execute_microbatch_materialization(
self,
Expand Down
43 changes: 26 additions & 17 deletions core/dbt/task/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,17 @@
import re
import threading
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Type, Union
from typing import (
TYPE_CHECKING,
Any,
Collection,
Dict,
List,
Optional,
Tuple,
Type,
Union,
)

import daff

Expand All @@ -25,9 +35,9 @@
from dbt.exceptions import BooleanError, DbtInternalError
from dbt.flags import get_flags
from dbt.graph import ResourceTypeSelector
from dbt.node_types import NodeType
from dbt.node_types import TEST_NODE_TYPES, NodeType
from dbt.parser.unit_tests import UnitTestManifestLoader
from dbt.task.base import BaseRunner
from dbt.task.base import BaseRunner, resource_types_from_args
from dbt.utils import _coerce_decimal, strtobool
from dbt_common.dataclass_schema import dbtClassMixin
from dbt_common.events.format import pluralize
Expand Down Expand Up @@ -375,18 +385,6 @@ def _render_daff_diff(self, daff_diff: daff.TableDiff) -> str:
return rendered


class TestSelector(ResourceTypeSelector):
def __init__(
self, graph, manifest, previous_state, resource_types=[NodeType.Test, NodeType.Unit]
) -> None:
super().__init__(
graph=graph,
manifest=manifest,
previous_state=previous_state,
resource_types=resource_types,
)


class TestTask(RunTask):
"""
Testing:
Expand All @@ -399,13 +397,24 @@ class TestTask(RunTask):
def raise_on_first_error(self) -> bool:
return False

def get_node_selector(self) -> TestSelector:
@property
def resource_types(self) -> List[NodeType]:
resource_types: Collection[NodeType] = resource_types_from_args(
self.args, set(TEST_NODE_TYPES), set(TEST_NODE_TYPES)
)

# filter out any non-test node types
resource_types = [rt for rt in resource_types if rt in TEST_NODE_TYPES]
return list(resource_types)

def get_node_selector(self) -> ResourceTypeSelector:
if self.manifest is None or self.graph is None:
raise DbtInternalError("manifest and graph must be set to get perform node selection")
return TestSelector(
return ResourceTypeSelector(
graph=self.graph,
manifest=self.manifest,
previous_state=self.previous_state,
resource_types=self.resource_types,
)

def get_runner_type(self, _) -> Optional[Type[BaseRunner]]:
Expand Down
Loading
Loading