diff --git a/.changes/unreleased/Features-20240903-154133.yaml b/.changes/unreleased/Features-20240903-154133.yaml new file mode 100644 index 00000000000..fe45b8d4d10 --- /dev/null +++ b/.changes/unreleased/Features-20240903-154133.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Allow configuring snapshot column names +time: 2024-09-03T15:41:33.167097-04:00 +custom: + Author: gshank + Issue: "10185" diff --git a/core/dbt/artifacts/resources/v1/snapshot.py b/core/dbt/artifacts/resources/v1/snapshot.py index c9f1acdb50f..464d94bae69 100644 --- a/core/dbt/artifacts/resources/v1/snapshot.py +++ b/core/dbt/artifacts/resources/v1/snapshot.py @@ -1,10 +1,18 @@ -from dataclasses import dataclass +from dataclasses import dataclass, field from typing import Dict, List, Literal, Optional, Union from dbt.artifacts.resources.types import NodeType from dbt.artifacts.resources.v1.components import CompiledResource, DeferRelation from dbt.artifacts.resources.v1.config import NodeConfig -from dbt_common.dataclass_schema import ValidationError +from dbt_common.dataclass_schema import ValidationError, dbtClassMixin + + +@dataclass +class SnapshotMetaColumnNames(dbtClassMixin): + dbt_valid_to: Optional[str] = None + dbt_valid_from: Optional[str] = None + dbt_scd_id: Optional[str] = None + dbt_updated_at: Optional[str] = None @dataclass @@ -17,6 +25,18 @@ class SnapshotConfig(NodeConfig): updated_at: Optional[str] = None # Not using Optional because of serialization issues with a Union of str and List[str] check_cols: Union[str, List[str], None] = None + snapshot_meta_column_names: SnapshotMetaColumnNames = field( + default_factory=SnapshotMetaColumnNames + ) + + @property + def snapshot_table_column_names(self): + return { + "dbt_valid_from": self.snapshot_meta_column_names.dbt_valid_from or "dbt_valid_from", + "dbt_valid_to": self.snapshot_meta_column_names.dbt_valid_to or "dbt_valid_to", + "dbt_scd_id": self.snapshot_meta_column_names.dbt_scd_id or "dbt_scd_id", + "dbt_updated_at": self.snapshot_meta_column_names.dbt_updated_at or "dbt_updated_at", + } def final_validate(self): if not self.strategy or not self.unique_key: diff --git a/core/dbt/context/context_config.py b/core/dbt/context/context_config.py index 51222ceba10..caf83d425fe 100644 --- a/core/dbt/context/context_config.py +++ b/core/dbt/context/context_config.py @@ -8,7 +8,7 @@ from dbt.contracts.graph.model_config import get_config_for from dbt.node_types import NodeType from dbt.utils import fqn_search -from dbt_common.contracts.config.base import BaseConfig, _listify +from dbt_common.contracts.config.base import BaseConfig, merge_config_dicts from dbt_common.exceptions import DbtInternalError @@ -293,55 +293,7 @@ def __init__( def add_config_call(self, opts: Dict[str, Any]) -> None: dct = self._config_call_dict - self._add_config_call(dct, opts) - - @classmethod - def _add_config_call(cls, config_call_dict, opts: Dict[str, Any]) -> None: - # config_call_dict is already encountered configs, opts is new - # This mirrors code in _merge_field_value in model_config.py which is similar but - # operates on config objects. - for k, v in opts.items(): - # MergeBehavior for post-hook and pre-hook is to collect all - # values, instead of overwriting - if k in BaseConfig.mergebehavior["append"]: - if not isinstance(v, list): - v = [v] - if k in config_call_dict: # should always be a list here - config_call_dict[k].extend(v) - else: - config_call_dict[k] = v - - elif k in BaseConfig.mergebehavior["update"]: - if not isinstance(v, dict): - raise DbtInternalError(f"expected dict, got {v}") - if k in config_call_dict and isinstance(config_call_dict[k], dict): - config_call_dict[k].update(v) - else: - config_call_dict[k] = v - elif k in BaseConfig.mergebehavior["dict_key_append"]: - if not isinstance(v, dict): - raise DbtInternalError(f"expected dict, got {v}") - if k in config_call_dict: # should always be a dict - for key, value in v.items(): - extend = False - # This might start with a +, to indicate we should extend the list - # instead of just clobbering it - if key.startswith("+"): - extend = True - if key in config_call_dict[k] and extend: - # extend the list - config_call_dict[k][key].extend(_listify(value)) - else: - # clobber the list - config_call_dict[k][key] = _listify(value) - else: - # This is always a dictionary - config_call_dict[k] = v - # listify everything - for key, value in config_call_dict[k].items(): - config_call_dict[k][key] = _listify(value) - else: - config_call_dict[k] = v + merge_config_dicts(dct, opts) def build_config_dict( self, diff --git a/core/dbt/parser/models.py b/core/dbt/parser/models.py index dd56d06868c..06e11a89649 100644 --- a/core/dbt/parser/models.py +++ b/core/dbt/parser/models.py @@ -22,6 +22,7 @@ from dbt.node_types import ModelLanguage, NodeType from dbt.parser.base import SimpleSQLParser from dbt.parser.search import FileBlock +from dbt_common.contracts.config.base import merge_config_dicts from dbt_common.dataclass_schema import ValidationError from dbt_common.exceptions.macros import UndefinedMacroError from dbt_extractor import ExtractionError, py_extract_from_source # type: ignore @@ -467,7 +468,7 @@ def _get_config_call_dict(static_parser_result: Dict[str, Any]) -> Dict[str, Any config_call_dict: Dict[str, Any] = {} for c in static_parser_result["configs"]: - ContextConfig._add_config_call(config_call_dict, {c[0]: c[1]}) + merge_config_dicts(config_call_dict, {c[0]: c[1]}) return config_call_dict diff --git a/core/setup.py b/core/setup.py index 97b6d8241e8..354326e8896 100644 --- a/core/setup.py +++ b/core/setup.py @@ -71,8 +71,8 @@ "dbt-extractor>=0.5.0,<=0.6", "dbt-semantic-interfaces>=0.7.1,<0.8", # Minor versions for these are expected to be backwards-compatible - "dbt-common>=1.6.0,<2.0", - "dbt-adapters>=1.6.0,<2.0", + "dbt-common>=1.9.0,<2.0", + "dbt-adapters>=1.7.0,<2.0", # ---- # Expect compatibility with all new versions of these packages, so lower bounds only. "packaging>20.9", diff --git a/schemas/dbt/manifest/v12.json b/schemas/dbt/manifest/v12.json index 56b6e71480c..95cefb7654a 100644 --- a/schemas/dbt/manifest/v12.json +++ b/schemas/dbt/manifest/v12.json @@ -6610,6 +6610,57 @@ } ], "default": null + }, + "snapshot_meta_column_names": { + "type": "object", + "title": "SnapshotMetaColumnNames", + "properties": { + "dbt_valid_to": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "default": null + }, + "dbt_valid_from": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "default": null + }, + "dbt_scd_id": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "default": null + }, + "dbt_updated_at": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "default": null + } + }, + "additionalProperties": false } }, "additionalProperties": true @@ -16336,6 +16387,57 @@ } ], "default": null + }, + "snapshot_meta_column_names": { + "type": "object", + "title": "SnapshotMetaColumnNames", + "properties": { + "dbt_valid_to": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "default": null + }, + "dbt_valid_from": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "default": null + }, + "dbt_scd_id": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "default": null + }, + "dbt_updated_at": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "default": null + } + }, + "additionalProperties": false } }, "additionalProperties": true diff --git a/tests/functional/artifacts/expected_manifest.py b/tests/functional/artifacts/expected_manifest.py index a50efab9434..749a5b77619 100644 --- a/tests/functional/artifacts/expected_manifest.py +++ b/tests/functional/artifacts/expected_manifest.py @@ -103,6 +103,12 @@ def get_rendered_snapshot_config(**updates): "post-hook": [], "column_types": {}, "quoting": {}, + "snapshot_meta_column_names": { + "dbt_valid_to": None, + "dbt_valid_from": None, + "dbt_updated_at": None, + "dbt_scd_id": None, + }, "tags": [], "persist_docs": {}, "full_refresh": None, diff --git a/tests/functional/list/test_list.py b/tests/functional/list/test_list.py index de20970038a..84b1e382e89 100644 --- a/tests/functional/list/test_list.py +++ b/tests/functional/list/test_list.py @@ -63,6 +63,12 @@ def expect_snapshot_output(self, happy_path_project): # noqa: F811 "persist_docs": {}, "target_database": happy_path_project.database, "target_schema": happy_path_project.test_schema, + "snapshot_meta_column_names": { + "dbt_scd_id": None, + "dbt_updated_at": None, + "dbt_valid_from": None, + "dbt_valid_to": None, + }, "unique_key": "id", "strategy": "timestamp", "updated_at": "updated_at", diff --git a/tests/functional/snapshots/data/seed_cn.sql b/tests/functional/snapshots/data/seed_cn.sql new file mode 100644 index 00000000000..089200afa47 --- /dev/null +++ b/tests/functional/snapshots/data/seed_cn.sql @@ -0,0 +1,82 @@ +create table {database}.{schema}.seed ( + id INTEGER, + first_name VARCHAR(50), + last_name VARCHAR(50), + email VARCHAR(50), + gender VARCHAR(50), + ip_address VARCHAR(20), + updated_at TIMESTAMP WITHOUT TIME ZONE +); + +create table {database}.{schema}.snapshot_expected ( + id INTEGER, + first_name VARCHAR(50), + last_name VARCHAR(50), + email VARCHAR(50), + gender VARCHAR(50), + ip_address VARCHAR(20), + + -- snapshotting fields + updated_at TIMESTAMP WITHOUT TIME ZONE, + test_valid_from TIMESTAMP WITHOUT TIME ZONE, + test_valid_to TIMESTAMP WITHOUT TIME ZONE, + test_scd_id TEXT, + test_updated_at TIMESTAMP WITHOUT TIME ZONE +); + + +-- seed inserts +-- use the same email for two users to verify that duplicated check_cols values +-- are handled appropriately +insert into {database}.{schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values +(1, 'Judith', 'Kennedy', '(not provided)', 'Female', '54.60.24.128', '2015-12-24 12:19:28'), +(2, 'Arthur', 'Kelly', '(not provided)', 'Male', '62.56.24.215', '2015-10-28 16:22:15'), +(3, 'Rachel', 'Moreno', 'rmoreno2@msu.edu', 'Female', '31.222.249.23', '2016-04-05 02:05:30'), +(4, 'Ralph', 'Turner', 'rturner3@hp.com', 'Male', '157.83.76.114', '2016-08-08 00:06:51'), +(5, 'Laura', 'Gonzales', 'lgonzales4@howstuffworks.com', 'Female', '30.54.105.168', '2016-09-01 08:25:38'), +(6, 'Katherine', 'Lopez', 'klopez5@yahoo.co.jp', 'Female', '169.138.46.89', '2016-08-30 18:52:11'), +(7, 'Jeremy', 'Hamilton', 'jhamilton6@mozilla.org', 'Male', '231.189.13.133', '2016-07-17 02:09:46'), +(8, 'Heather', 'Rose', 'hrose7@goodreads.com', 'Female', '87.165.201.65', '2015-12-29 22:03:56'), +(9, 'Gregory', 'Kelly', 'gkelly8@trellian.com', 'Male', '154.209.99.7', '2016-03-24 21:18:16'), +(10, 'Rachel', 'Lopez', 'rlopez9@themeforest.net', 'Female', '237.165.82.71', '2016-08-20 15:44:49'), +(11, 'Donna', 'Welch', 'dwelcha@shutterfly.com', 'Female', '103.33.110.138', '2016-02-27 01:41:48'), +(12, 'Russell', 'Lawrence', 'rlawrenceb@qq.com', 'Male', '189.115.73.4', '2016-06-11 03:07:09'), +(13, 'Michelle', 'Montgomery', 'mmontgomeryc@scientificamerican.com', 'Female', '243.220.95.82', '2016-06-18 16:27:19'), +(14, 'Walter', 'Castillo', 'wcastillod@pagesperso-orange.fr', 'Male', '71.159.238.196', '2016-10-06 01:55:44'), +(15, 'Robin', 'Mills', 'rmillse@vkontakte.ru', 'Female', '172.190.5.50', '2016-10-31 11:41:21'), +(16, 'Raymond', 'Holmes', 'rholmesf@usgs.gov', 'Male', '148.153.166.95', '2016-10-03 08:16:38'), +(17, 'Gary', 'Bishop', 'gbishopg@plala.or.jp', 'Male', '161.108.182.13', '2016-08-29 19:35:20'), +(18, 'Anna', 'Riley', 'arileyh@nasa.gov', 'Female', '253.31.108.22', '2015-12-11 04:34:27'), +(19, 'Sarah', 'Knight', 'sknighti@foxnews.com', 'Female', '222.220.3.177', '2016-09-26 00:49:06'), +(20, 'Phyllis', 'Fox', null, 'Female', '163.191.232.95', '2016-08-21 10:35:19'); + + +-- populate snapshot table +insert into {database}.{schema}.snapshot_expected ( + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + test_valid_from, + test_valid_to, + test_updated_at, + test_scd_id +) + +select + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + -- fields added by snapshotting + updated_at as test_valid_from, + null::timestamp as test_valid_to, + updated_at as test_updated_at, + md5(id || '-' || first_name || '|' || updated_at::text) as test_scd_id +from {database}.{schema}.seed; diff --git a/tests/functional/snapshots/test_snapshot_column_names.py b/tests/functional/snapshots/test_snapshot_column_names.py new file mode 100644 index 00000000000..85e9f425765 --- /dev/null +++ b/tests/functional/snapshots/test_snapshot_column_names.py @@ -0,0 +1,234 @@ +import os + +import pytest + +from dbt.tests.util import ( + check_relations_equal, + get_manifest, + run_dbt, + run_dbt_and_capture, + update_config_file, +) + +snapshot_actual_sql = """ +{% snapshot snapshot_actual %} + + {{ + config( + unique_key='id || ' ~ "'-'" ~ ' || first_name', + ) + }} + + select * from {{target.database}}.{{target.schema}}.seed + +{% endsnapshot %} +""" + +snapshots_yml = """ +snapshots: + - name: snapshot_actual + config: + strategy: timestamp + updated_at: updated_at + snapshot_meta_column_names: + dbt_valid_to: test_valid_to + dbt_valid_from: test_valid_from + dbt_scd_id: test_scd_id + dbt_updated_at: test_updated_at +""" + +snapshots_no_column_names_yml = """ +snapshots: + - name: snapshot_actual + config: + strategy: timestamp + updated_at: updated_at +""" + +ref_snapshot_sql = """ +select * from {{ ref('snapshot_actual') }} +""" + + +invalidate_sql = """ +-- update records 11 - 21. Change email and updated_at field +update {schema}.seed set + updated_at = updated_at + interval '1 hour', + email = case when id = 20 then 'pfoxj@creativecommons.org' else 'new_' || email end +where id >= 10 and id <= 20; + + +-- invalidate records 11 - 21 +update {schema}.snapshot_expected set + test_valid_to = updated_at + interval '1 hour' +where id >= 10 and id <= 20; + +""" + +update_sql = """ +-- insert v2 of the 11 - 21 records + +insert into {database}.{schema}.snapshot_expected ( + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + test_valid_from, + test_valid_to, + test_updated_at, + test_scd_id +) + +select + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + -- fields added by snapshotting + updated_at as test_valid_from, + null::timestamp as test_valid_to, + updated_at as test_updated_at, + md5(id || '-' || first_name || '|' || updated_at::text) as test_scd_id +from {database}.{schema}.seed +where id >= 10 and id <= 20; +""" + + +class TestSnapshotColumnNames: + @pytest.fixture(scope="class") + def snapshots(self): + return {"snapshot.sql": snapshot_actual_sql} + + @pytest.fixture(scope="class") + def models(self): + return { + "snapshots.yml": snapshots_yml, + "ref_snapshot.sql": ref_snapshot_sql, + } + + def test_snapshot_column_names(self, project): + path = os.path.join(project.test_data_dir, "seed_cn.sql") + project.run_sql_file(path) + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + project.run_sql(invalidate_sql) + project.run_sql(update_sql) + + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + # run_dbt(["test"]) + check_relations_equal(project.adapter, ["snapshot_actual", "snapshot_expected"]) + + +class TestSnapshotColumnNamesFromDbtProject: + @pytest.fixture(scope="class") + def snapshots(self): + return {"snapshot.sql": snapshot_actual_sql} + + @pytest.fixture(scope="class") + def models(self): + return { + "snapshots.yml": snapshots_no_column_names_yml, + "ref_snapshot.sql": ref_snapshot_sql, + } + + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "snapshots": { + "test": { + "+snapshot_meta_column_names": { + "dbt_valid_to": "test_valid_to", + "dbt_valid_from": "test_valid_from", + "dbt_scd_id": "test_scd_id", + "dbt_updated_at": "test_updated_at", + } + } + } + } + + def test_snapshot_column_names_from_project(self, project): + path = os.path.join(project.test_data_dir, "seed_cn.sql") + project.run_sql_file(path) + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + project.run_sql(invalidate_sql) + project.run_sql(update_sql) + + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + # run_dbt(["test"]) + check_relations_equal(project.adapter, ["snapshot_actual", "snapshot_expected"]) + + +class TestSnapshotInvalidColumnNames: + @pytest.fixture(scope="class") + def snapshots(self): + return {"snapshot.sql": snapshot_actual_sql} + + @pytest.fixture(scope="class") + def models(self): + return { + "snapshots.yml": snapshots_no_column_names_yml, + "ref_snapshot.sql": ref_snapshot_sql, + } + + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "snapshots": { + "test": { + "+snapshot_meta_column_names": { + "dbt_valid_to": "test_valid_to", + "dbt_valid_from": "test_valid_from", + "dbt_scd_id": "test_scd_id", + "dbt_updated_at": "test_updated_at", + } + } + } + } + + def test_snapshot_invalid_column_names(self, project): + path = os.path.join(project.test_data_dir, "seed_cn.sql") + project.run_sql_file(path) + results = run_dbt(["snapshot"]) + assert len(results) == 1 + manifest = get_manifest(project.project_root) + snapshot_node = manifest.nodes["snapshot.test.snapshot_actual"] + snapshot_node.config.snapshot_meta_column_names == { + "dbt_valid_to": "test_valid_to", + "dbt_valid_from": "test_valid_from", + "dbt_scd_id": "test_scd_id", + "dbt_updated_at": "test_updated_at", + } + + project.run_sql(invalidate_sql) + project.run_sql(update_sql) + + # Change snapshot_meta_columns and look for an error + different_columns = { + "snapshots": { + "test": { + "+snapshot_meta_column_names": { + "dbt_valid_to": "test_valid_to", + "dbt_updated_at": "test_updated_at", + } + } + } + } + update_config_file(different_columns, "dbt_project.yml") + + results, log_output = run_dbt_and_capture(["snapshot"], expect_pass=False) + assert len(results) == 1 + assert "Compilation Error in snapshot snapshot_actual" in log_output + assert "Snapshot target is missing configured columns" in log_output diff --git a/tests/unit/contracts/graph/test_nodes_parsed.py b/tests/unit/contracts/graph/test_nodes_parsed.py index 7655b7aa444..7acd4c1f02a 100644 --- a/tests/unit/contracts/graph/test_nodes_parsed.py +++ b/tests/unit/contracts/graph/test_nodes_parsed.py @@ -1249,6 +1249,7 @@ def basic_timestamp_snapshot_config_dict(): "quoting": {}, "tags": [], "unique_key": "id", + "snapshot_meta_column_names": {}, "strategy": "timestamp", "updated_at": "last_update", "target_database": "some_snapshot_db", @@ -1285,6 +1286,7 @@ def complex_timestamp_snapshot_config_dict(): "post-hook": [{"sql": 'insert into blah(a, b) select "1", 1', "transaction": True}], "pre-hook": [], "quoting": {}, + "snapshot_meta_column_names": {}, "tags": [], "target_database": "some_snapshot_db", "target_schema": "some_snapshot_schema", @@ -1353,6 +1355,7 @@ def basic_check_snapshot_config_dict(): "post-hook": [], "pre-hook": [], "quoting": {}, + "snapshot_meta_column_names": {}, "tags": [], "target_database": "some_snapshot_db", "target_schema": "some_snapshot_schema", @@ -1391,6 +1394,7 @@ def complex_set_snapshot_config_dict(): "post-hook": [{"sql": 'insert into blah(a, b) select "1", 1', "transaction": True}], "pre-hook": [], "quoting": {}, + "snapshot_meta_column_names": {}, "tags": [], "target_database": "some_snapshot_db", "target_schema": "some_snapshot_schema", @@ -1517,6 +1521,7 @@ def basic_timestamp_snapshot_dict(): "post-hook": [], "pre-hook": [], "quoting": {}, + "snapshot_meta_column_names": {}, "tags": [], "target_database": "some_snapshot_db", "target_schema": "some_snapshot_schema", @@ -1620,6 +1625,7 @@ def basic_check_snapshot_dict(): "post-hook": [], "pre-hook": [], "quoting": {}, + "snapshot_meta_column_names": {}, "tags": [], "target_database": "some_snapshot_db", "target_schema": "some_snapshot_schema",