Skip to content

Commit

Permalink
Ensure that target_schema from snapshot config is promoted to node le…
Browse files Browse the repository at this point in the history
…vel (#8117)

(cherry picked from commit fe9c875)
  • Loading branch information
gshank authored and github-actions[bot] committed Jul 27, 2023
1 parent 57660c9 commit 35e08af
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 7 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20230717-160652.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Copy target_schema from config into snapshot node
time: 2023-07-17T16:06:52.957724-04:00
custom:
Author: gshank
Issue: "6745"
3 changes: 3 additions & 0 deletions core/dbt/contracts/graph/model_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,8 @@ class SnapshotConfig(EmptySnapshotConfig):
@classmethod
def validate(cls, data):
super().validate(data)
# Note: currently you can't just set these keys in schema.yml because this validation
# will fail when parsing the snapshot node.
if not data.get("strategy") or not data.get("unique_key") or not data.get("target_schema"):
raise ValidationError(
"Snapshots must be configured with a 'strategy', 'unique_key', "
Expand Down Expand Up @@ -649,6 +651,7 @@ def validate(cls, data):
if data.get("materialized") and data.get("materialized") != "snapshot":
raise ValidationError("A snapshot must have a materialized value of 'snapshot'")

# Called by "calculate_node_config_dict" in ContextConfigGenerator
def finalize_and_validate(self):
data = self.to_dict(omit_none=True)
self.validate(data)
Expand Down
21 changes: 15 additions & 6 deletions core/dbt/parser/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,7 @@ def __init__(self, config: RuntimeConfig, manifest: Manifest, component: str) ->
self.package_updaters = package_updaters
self.component = component

def __call__(self, parsed_node: Any, config_dict: Dict[str, Any]) -> None:
override = config_dict.get(self.component)
def __call__(self, parsed_node: Any, override: Optional[str]) -> None:
if parsed_node.package_name in self.package_updaters:
new_value = self.package_updaters[parsed_node.package_name](override, parsed_node)
else:
Expand Down Expand Up @@ -280,9 +279,19 @@ def update_parsed_node_config_dict(
def update_parsed_node_relation_names(
self, parsed_node: IntermediateNode, config_dict: Dict[str, Any]
) -> None:
self._update_node_database(parsed_node, config_dict)
self._update_node_schema(parsed_node, config_dict)
self._update_node_alias(parsed_node, config_dict)

# These call the RelationUpdate callable to go through generate_name macros
self._update_node_database(parsed_node, config_dict.get("database"))
self._update_node_schema(parsed_node, config_dict.get("schema"))
self._update_node_alias(parsed_node, config_dict.get("alias"))

# Snapshot nodes use special "target_database" and "target_schema" fields for some reason
if parsed_node.resource_type == NodeType.Snapshot:
if "target_database" in config_dict and config_dict["target_database"]:
parsed_node.database = config_dict["target_database"]
if "target_schema" in config_dict and config_dict["target_schema"]:
parsed_node.schema = config_dict["target_schema"]

self._update_node_relation_name(parsed_node)

def update_parsed_node_config(
Expand Down Expand Up @@ -349,7 +358,7 @@ def update_parsed_node_config(
# do this once before we parse the node database/schema/alias, so
# parsed_node.config is what it would be if they did nothing
self.update_parsed_node_config_dict(parsed_node, config_dict)
# This updates the node database/schema/alias
# This updates the node database/schema/alias/relation_name
self.update_parsed_node_relation_names(parsed_node, config_dict)

# tests don't have hooks
Expand Down
32 changes: 32 additions & 0 deletions tests/functional/simple_snapshot/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,18 @@
owner: 'a_owner'
"""

models__schema_with_target_schema_yml = """
version: 2
snapshots:
- name: snapshot_actual
tests:
- mutually_exclusive_ranges
config:
meta:
owner: 'a_owner'
target_schema: schema_from_schema_yml
"""

models__ref_snapshot_sql = """
select * from {{ ref('snapshot_actual') }}
"""
Expand Down Expand Up @@ -281,6 +293,26 @@
{% endsnapshot %}
"""

snapshots_pg__snapshot_no_target_schema_sql = """
{% snapshot snapshot_actual %}
{{
config(
target_database=var('target_database', database),
unique_key='id || ' ~ "'-'" ~ ' || first_name',
strategy='timestamp',
updated_at='updated_at',
)
}}
{% if var('invalidate_hard_deletes', 'false') | as_bool %}
{{ config(invalidate_hard_deletes=True) }}
{% endif %}
select * from {{target.database}}.{{target.schema}}.seed
{% endsnapshot %}
"""

models_slow__gen_sql = """
Expand Down
39 changes: 38 additions & 1 deletion tests/functional/simple_snapshot/test_basic_snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@
from datetime import datetime
import pytz
import pytest
from dbt.tests.util import run_dbt, check_relations_equal, relation_from_name
from dbt.tests.util import run_dbt, check_relations_equal, relation_from_name, write_file
from tests.functional.simple_snapshot.fixtures import (
models__schema_yml,
models__schema_with_target_schema_yml,
models__ref_snapshot_sql,
seeds__seed_newcol_csv,
seeds__seed_csv,
snapshots_pg__snapshot_sql,
snapshots_pg__snapshot_no_target_schema_sql,
macros__test_no_overlaps_sql,
macros_custom_snapshot__custom_sql,
snapshots_pg_custom_namespaced__snapshot_sql,
Expand Down Expand Up @@ -123,6 +125,41 @@ def test_basic_ref(self, project):
ref_setup(project, num_snapshot_models=1)


class TestBasicTargetSchemaConfig(Basic):
@pytest.fixture(scope="class")
def snapshots(self):
return {"snapshot.sql": snapshots_pg__snapshot_no_target_schema_sql}

@pytest.fixture(scope="class")
def project_config_update(self, unique_schema):
return {
"snapshots": {
"test": {
"target_schema": unique_schema + "_alt",
}
}
}

def test_target_schema(self, project):
manifest = run_dbt(["parse"])
assert len(manifest.nodes) == 5
# ensure that the schema in the snapshot node is the same as target_schema
snapshot_id = "snapshot.test.snapshot_actual"
snapshot_node = manifest.nodes[snapshot_id]
assert snapshot_node.schema == f"{project.test_schema}_alt"
assert (
snapshot_node.relation_name
== f'"{project.database}"."{project.test_schema}_alt"."snapshot_actual"'
)
assert snapshot_node.meta == {"owner": "a_owner"}

# write out schema.yml file and check again
write_file(models__schema_with_target_schema_yml, "models", "schema.yml")
manifest = run_dbt(["parse"])
snapshot_node = manifest.nodes[snapshot_id]
assert snapshot_node.schema == "schema_from_schema_yml"


class CustomNamespace:
@pytest.fixture(scope="class")
def snapshots(self):
Expand Down

0 comments on commit 35e08af

Please sign in to comment.