From 3056edaffb8c263c607f7ff4ec12572a58c89a17 Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Wed, 14 Jun 2023 23:52:52 -0500 Subject: [PATCH 01/24] Rebase Jerco's clone code --- core/dbt/cli/main.py | 38 ++++ .../global_project/macros/adapters/clone.sql | 115 +++++++++++ core/dbt/task/clone.py | 185 ++++++++++++++++++ .../defer_state/test_defer_state.py | 62 ++++++ 4 files changed, 400 insertions(+) create mode 100644 core/dbt/include/global_project/macros/adapters/clone.sql create mode 100644 core/dbt/task/clone.py diff --git a/core/dbt/cli/main.py b/core/dbt/cli/main.py index bd8d92a4d62..3eae0e8910e 100644 --- a/core/dbt/cli/main.py +++ b/core/dbt/cli/main.py @@ -23,6 +23,7 @@ from dbt.events.base_types import EventMsg from dbt.task.build import BuildTask from dbt.task.clean import CleanTask +from dbt.task.clone import CloneTask from dbt.task.compile import CompileTask from dbt.task.debug import DebugTask from dbt.task.deps import DepsTask @@ -608,6 +609,43 @@ def retry(ctx, **kwargs): return results, success +# dbt clone +@cli.command("clone") +@click.pass_context +@p.defer_state +@p.exclude +@p.full_refresh +@p.profile +@p.profiles_dir +@p.project_dir +@p.resource_type +@p.select +@p.selector +@p.state # required +@p.target +@p.target_path +@p.threads +@p.vars +@p.version_check +@requires.preflight +@requires.profile +@requires.project +@requires.runtime_config +@requires.manifest +@requires.postflight +def clone(ctx, **kwargs): + """Create clones of selected nodes based on their location in the manifest provided to --state.""" + task = CloneTask( + ctx.obj["flags"], + ctx.obj["runtime_config"], + ctx.obj["manifest"], + ) + + results = task.run() + success = task.interpret_results(results) + return results, success + + # dbt run operation @cli.command("run-operation") @click.pass_context diff --git a/core/dbt/include/global_project/macros/adapters/clone.sql b/core/dbt/include/global_project/macros/adapters/clone.sql new file mode 100644 index 00000000000..8525983619e --- /dev/null +++ b/core/dbt/include/global_project/macros/adapters/clone.sql @@ -0,0 +1,115 @@ +{% macro can_clone_tables() %} + {{ return(adapter.dispatch('can_clone_tables', 'dbt')()) }} +{% endmacro %} + + +{% macro default__can_clone_tables() %} + {{ return(False) }} +{% endmacro %} + + +{% macro snowflake__can_clone_tables() %} + {{ return(True) }} +{% endmacro %} + + +{% macro get_pointer_sql(to_relation) %} + {{ return(adapter.dispatch('get_pointer_sql', 'dbt')(to_relation)) }} +{% endmacro %} + + +{% macro default__get_pointer_sql(to_relation) %} + {% set pointer_sql %} + select * from {{ to_relation }} + {% endset %} + {{ return(pointer_sql) }} +{% endmacro %} + + +{% macro get_clone_table_sql(this_relation, state_relation) %} + {{ return(adapter.dispatch('get_clone_table_sql', 'dbt')(this_relation, state_relation)) }} +{% endmacro %} + + +{% macro default__get_clone_table_sql(this_relation, state_relation) %} + create or replace table {{ this_relation }} clone {{ state_relation }} +{% endmacro %} + + +{% macro snowflake__get_clone_table_sql(this_relation, state_relation) %} + create or replace + {{ "transient" if config.get("transient", true) }} + table {{ this_relation }} + clone {{ state_relation }} + {{ "copy grants" if config.get("copy_grants", false) }} +{% endmacro %} + + +{%- materialization clone, default -%} + + {%- set relations = {'relations': []} -%} + + {%- if not state_relation -%} + -- nothing to do + {{ log("No relation found in state manifest for " ~ model.unique_id) }} + {{ return(relations) }} + {%- endif -%} + + {%- set existing_relation = load_cached_relation(this) -%} + + {%- if existing_relation and not flags.FULL_REFRESH -%} + -- noop! + {{ log("Relation " ~ existing_relation ~ " already exists") }} + {{ return(relations) }} + {%- endif -%} + + {%- set other_existing_relation = load_cached_relation(state_relation) -%} + + -- If this is a database that can do zero-copy cloning of tables, and the other relation is a table, then this will be a table + -- Otherwise, this will be a view + + {% set can_clone_tables = can_clone_tables() %} + + {%- if other_existing_relation and other_existing_relation.type == 'table' and can_clone_tables -%} + + {%- set target_relation = this.incorporate(type='table') -%} + {% if existing_relation is not none and not existing_relation.is_table %} + {{ log("Dropping relation " ~ existing_relation ~ " because it is of type " ~ existing_relation.type) }} + {{ drop_relation_if_exists(existing_relation) }} + {% endif %} + + -- as a general rule, data platforms that can clone tables can also do atomic 'create or replace' + {% call statement('main') %} + {{ get_clone_table_sql(target_relation, state_relation) }} + {% endcall %} + + {% set should_revoke = should_revoke(existing_relation, full_refresh_mode=True) %} + {% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %} + {% do persist_docs(target_relation, model) %} + + {{ return({'relations': [target_relation]}) }} + + {%- else -%} + + {%- set target_relation = this.incorporate(type='view') -%} + + -- TODO: this should probably be illegal + -- I'm just doing it out of convenience to reuse the 'view' materialization logic + {%- do context.update({ + 'sql': get_pointer_sql(state_relation), + 'compiled_code': get_pointer_sql(state_relation) + }) -%} + + -- reuse the view materialization + -- TODO: support actual dispatch for materialization macros + {% set search_name = "materialization_view_" ~ adapter.type() %} + {% if not search_name in context %} + {% set search_name = "materialization_view_default" %} + {% endif %} + {% set materialization_macro = context[search_name] %} + {% set relations = materialization_macro() %} + {{ return(relations) }} + + {%- endif -%} + +{%- endmaterialization -%} diff --git a/core/dbt/task/clone.py b/core/dbt/task/clone.py new file mode 100644 index 00000000000..cab0d6a2de4 --- /dev/null +++ b/core/dbt/task/clone.py @@ -0,0 +1,185 @@ +import threading +from typing import AbstractSet, Optional, Any, List, Iterable, Set + +from dbt.dataclass_schema import dbtClassMixin + +from dbt.contracts.graph.manifest import WritableManifest +from dbt.contracts.results import RunStatus, RunResult +from dbt.exceptions import DbtInternalError, DbtRuntimeError, CompilationError +from dbt.graph import ResourceTypeSelector +from dbt.node_types import NodeType +from dbt.parser.manifest import write_manifest +from dbt.task.base import BaseRunner +from dbt.task.runnable import GraphRunnableTask +from dbt.task.run import _validate_materialization_relations_dict +from dbt.adapters.base import BaseRelation +from dbt.clients.jinja import MacroGenerator +from dbt.context.providers import generate_runtime_model_context + + +class CloneRunner(BaseRunner): + def before_execute(self): + pass + + def after_execute(self, result): + pass + + def _build_run_model_result(self, model, context): + result = context["load_result"]("main") + if result: + status = RunStatus.Success + message = str(result.response) + else: + status = RunStatus.Success + message = "No-op" + adapter_response = {} + if result and isinstance(result.response, dbtClassMixin): + adapter_response = result.response.to_dict(omit_none=True) + return RunResult( + node=model, + status=status, + timing=[], + thread_id=threading.current_thread().name, + execution_time=0, + message=message, + adapter_response=adapter_response, + failures=None, + ) + + def compile(self, manifest): + # no-op + return self.node + + def _materialization_relations(self, result: Any, model) -> List[BaseRelation]: + if isinstance(result, str): + msg = ( + 'The materialization ("{}") did not explicitly return a ' + "list of relations to add to the cache.".format(str(model.get_materialization())) + ) + raise CompilationError(msg, node=model) + + if isinstance(result, dict): + return _validate_materialization_relations_dict(result, model) + + msg = ( + "Invalid return value from materialization, expected a dict " + 'with key "relations", got: {}'.format(str(result)) + ) + raise CompilationError(msg, node=model) + + def execute(self, model, manifest): + context = generate_runtime_model_context(model, self.config, manifest) + materialization_macro = manifest.find_materialization_macro_by_name( + self.config.project_name, "clone", self.adapter.type() + ) + + if "config" not in context: + raise DbtInternalError( + "Invalid materialization context generated, missing config: {}".format(context) + ) + + context_config = context["config"] + + hook_ctx = self.adapter.pre_model_hook(context_config) + try: + result = MacroGenerator( + materialization_macro, context, stack=context["context_macro_stack"] + )() + finally: + self.adapter.post_model_hook(context_config, hook_ctx) + + for relation in self._materialization_relations(result, model): + self.adapter.cache_added(relation.incorporate(dbt_created=True)) + + return self._build_run_model_result(model, context) + + +class CloneTask(GraphRunnableTask): + def raise_on_first_error(self): + return False + + def get_model_schemas(self, adapter, selected_uids: Iterable[str]) -> Set[BaseRelation]: + if self.manifest is None: + raise DbtInternalError("manifest was None in get_model_schemas") + result: Set[BaseRelation] = set() + + for node in self.manifest.nodes.values(): + if node.unique_id not in selected_uids: + continue + if node.is_relational and not node.is_ephemeral: + relation = adapter.Relation.create_from(self.config, node) + result.add(relation.without_identifier()) + + # cache the 'other' schemas too! + if node.state_relation: # type: ignore + other_relation = adapter.Relation.create_from_node( + self.config, node.state_relation # type: ignore + ) + result.add(other_relation.without_identifier()) + + return result + + def before_run(self, adapter, selected_uids: AbstractSet[str]): + with adapter.connection_named("master"): + # unlike in other tasks, we want to add information from the --state manifest *before* caching! + self.defer_to_manifest(adapter, selected_uids) + # only create *our* schemas, but cache *other* schemas in addition + schemas_to_create = super().get_model_schemas(adapter, selected_uids) + self.create_schemas(adapter, schemas_to_create) + schemas_to_cache = self.get_model_schemas(adapter, selected_uids) + self.populate_adapter_cache(adapter, schemas_to_cache) + + @property + def resource_types(self): + if not self.args.resource_types: + return NodeType.refable() + + values = set(self.args.resource_types) + + if "all" in values: + values.remove("all") + values.update(NodeType.refable()) + + values = [NodeType(val) for val in values if val in NodeType.refable()] + + return list(values) + + def get_node_selector(self) -> ResourceTypeSelector: + resource_types = self.resource_types + + if self.manifest is None or self.graph is None: + raise DbtInternalError("manifest and graph must be set to get perform node selection") + return ResourceTypeSelector( + graph=self.graph, + manifest=self.manifest, + previous_state=self.previous_state, + resource_types=resource_types, + ) + + def get_runner_type(self, _): + return CloneRunner + + def _get_deferred_manifest(self) -> Optional[WritableManifest]: + state = self.previous_state + if state is None: + raise DbtRuntimeError( + "--state is required for cloning relations from another environment" + ) + + if state.manifest is None: + raise DbtRuntimeError(f'Could not find manifest in --state path: "{self.args.state}"') + return state.manifest + + # Note that this is different behavior from --defer with other commands, which *merge* + # selected nodes from this manifest + unselected nodes from the other manifest + def defer_to_manifest(self, adapter, selected_uids: AbstractSet[str]): + deferred_manifest = self._get_deferred_manifest() + if deferred_manifest is None: + return + if self.manifest is None: + raise DbtInternalError( + "Expected to defer to manifest, but there is no runtime manifest to defer from!" + ) + self.manifest.add_from_artifact(other=deferred_manifest) + # TODO: is it wrong to write the manifest here? I think it's right... + write_manifest(self.manifest, self.config.target_path) diff --git a/tests/functional/defer_state/test_defer_state.py b/tests/functional/defer_state/test_defer_state.py index 960b517c490..8c8e925ee37 100644 --- a/tests/functional/defer_state/test_defer_state.py +++ b/tests/functional/defer_state/test_defer_state.py @@ -345,3 +345,65 @@ def test_defer_state_flag(self, project, unique_schema, other_schema): assert results.results[0].status == RunStatus.Success assert results.results[0].node.name == "table_model" assert results.results[0].adapter_response["rows_affected"] == 2 + + +get_schema_name_sql = """ +{% macro generate_schema_name(custom_schema_name, node) -%} + {%- set default_schema = target.schema -%} + {%- if custom_schema_name is not none -%} + {{ return(default_schema ~ '_' ~ custom_schema_name|trim) }} + -- put seeds into a separate schema in "prod", to verify that cloning in "dev" still works + {%- elif target.name == 'default' and node.resource_type == 'seed' -%} + {{ return(default_schema ~ '_' ~ 'seeds') }} + {%- else -%} + {{ return(default_schema) }} + {%- endif -%} +{%- endmacro %} +""" + + +class TestCloneToOther(BaseDeferState): + @pytest.fixture(scope="class") + def macros(self): + return { + "macros.sql": macros_sql, + "infinite_macros.sql": infinite_macros_sql, + "get_schema_name.sql": get_schema_name_sql, + } + + def test_clone(self, project, unique_schema, other_schema): + project.create_test_schema(other_schema) + self.run_and_save_state(project.project_root) + + clone_args = [ + "clone", + "--state", + "target", + "--target", + "otherschema", + "--target-path", + "target_otherschema", + ] + + results = run_dbt(clone_args) + # TODO: need an "adapter zone" version of this test that checks to see + # how many of the cloned objects are "pointers" (views) versus "true clones" (tables) + # e.g. on Postgres we expect to see 4 views + # whereas on Snowflake we'd expect to see 3 cloned tables + 1 view + assert [r.message for r in results] == ["CREATE VIEW"] * 4 + schema_relations = project.adapter.list_relations( + database=project.database, schema=other_schema + ) + assert [r.type for r in schema_relations] == ["view"] * 4 + + # objects already exist, so this is a no-op + results = run_dbt(clone_args) + assert [r.message for r in results] == ["No-op"] * 4 + + # recreate all objects + results = run_dbt(clone_args + ["--full-refresh"]) + assert [r.message for r in results] == ["CREATE VIEW"] * 4 + + # select only models this time + results = run_dbt(clone_args + ["--resource-type", "model"]) + assert len(results) == 2 From f63c3799e45142090158266c7b2d500c4afa9db5 Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Thu, 15 Jun 2023 10:48:44 -0500 Subject: [PATCH 02/24] fix unit tests --- .user.yml | 1 + core/dbt/cli/flags.py | 1 + core/dbt/cli/types.py | 1 + core/dbt/task/retry.py | 3 +++ profiles.yml | 0 5 files changed, 6 insertions(+) create mode 100644 .user.yml create mode 100644 profiles.yml diff --git a/.user.yml b/.user.yml new file mode 100644 index 00000000000..76645fe2295 --- /dev/null +++ b/.user.yml @@ -0,0 +1 @@ +id: 81a4e8c7-f578-4dd7-809d-954c1b8fd087 diff --git a/core/dbt/cli/flags.py b/core/dbt/cli/flags.py index a16829fc79f..2513d1cc702 100644 --- a/core/dbt/cli/flags.py +++ b/core/dbt/cli/flags.py @@ -373,6 +373,7 @@ def command_args(command: CliCommand) -> ArgsList: CMD_DICT: Dict[CliCommand, ClickCommand] = { CliCommand.BUILD: cli.build, CliCommand.CLEAN: cli.clean, + CliCommand.CLONE: cli.clone, CliCommand.COMPILE: cli.compile, CliCommand.DOCS_GENERATE: cli.docs_generate, CliCommand.DOCS_SERVE: cli.docs_serve, diff --git a/core/dbt/cli/types.py b/core/dbt/cli/types.py index be87d67135e..14028a69451 100644 --- a/core/dbt/cli/types.py +++ b/core/dbt/cli/types.py @@ -8,6 +8,7 @@ class Command(Enum): BUILD = "build" CLEAN = "clean" COMPILE = "compile" + CLONE = "clone" DOCS_GENERATE = "generate" DOCS_SERVE = "serve" DEBUG = "debug" diff --git a/core/dbt/task/retry.py b/core/dbt/task/retry.py index df870540c8e..3a14932aea8 100644 --- a/core/dbt/task/retry.py +++ b/core/dbt/task/retry.py @@ -9,6 +9,7 @@ from dbt.graph import GraphQueue from dbt.task.base import ConfiguredTask from dbt.task.build import BuildTask +from dbt.task.clone import CloneTask from dbt.task.compile import CompileTask from dbt.task.generate import GenerateTask from dbt.task.run import RunTask @@ -22,6 +23,7 @@ TASK_DICT = { "build": BuildTask, "compile": CompileTask, + "clone": CloneTask, "generate": GenerateTask, "seed": SeedTask, "snapshot": SnapshotTask, @@ -33,6 +35,7 @@ CMD_DICT = { "build": CliCommand.BUILD, "compile": CliCommand.COMPILE, + "clone": CliCommand.CLONE, "generate": CliCommand.DOCS_GENERATE, "seed": CliCommand.SEED, "snapshot": CliCommand.SNAPSHOT, diff --git a/profiles.yml b/profiles.yml new file mode 100644 index 00000000000..e69de29bb2d From afdada9277da94d6cb5d324fe2c7c3ca923d4fa9 Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Thu, 15 Jun 2023 13:48:23 -0500 Subject: [PATCH 03/24] Delete .user.yml --- .user.yml | 1 - 1 file changed, 1 deletion(-) delete mode 100644 .user.yml diff --git a/.user.yml b/.user.yml deleted file mode 100644 index 76645fe2295..00000000000 --- a/.user.yml +++ /dev/null @@ -1 +0,0 @@ -id: 81a4e8c7-f578-4dd7-809d-954c1b8fd087 From 23b4113d6aa667cba4d4f3e07524d2f23ed056e9 Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Thu, 15 Jun 2023 13:48:48 -0500 Subject: [PATCH 04/24] Delete profiles.yml --- profiles.yml | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 profiles.yml diff --git a/profiles.yml b/profiles.yml deleted file mode 100644 index e69de29bb2d..00000000000 From dafb2509c4edf6ba55a348495745443a96a9fe9e Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Thu, 15 Jun 2023 17:52:49 -0500 Subject: [PATCH 05/24] get integration test working --- core/dbt/context/providers.py | 14 ++++++++++++++ .../global_project/macros/adapters/clone.sql | 4 ++-- core/dbt/task/clone.py | 4 +--- tests/functional/defer_state/test_defer_state.py | 13 ++++++++----- 4 files changed, 25 insertions(+), 10 deletions(-) diff --git a/core/dbt/context/providers.py b/core/dbt/context/providers.py index e4bc2cde06a..41baa24dfc0 100644 --- a/core/dbt/context/providers.py +++ b/core/dbt/context/providers.py @@ -1432,6 +1432,20 @@ def this(self) -> Optional[RelationProxy]: return None return self.db_wrapper.Relation.create_from(self.config, self.model) + @contextproperty + def state_relation(self) -> Optional[RelationProxy]: + """ + For commands which add information about this node's corresponding + production version (via a --state artifact), access the Relation + object for that stateful other + """ + if getattr(self.model, "state_relation", None): + return self.db_wrapper.Relation.create_from_node( + self.config, self.model.state_relation # type: ignore + ) + else: + return None + # This is called by '_context_for', used in 'render_with_context' def generate_parser_model_context( diff --git a/core/dbt/include/global_project/macros/adapters/clone.sql b/core/dbt/include/global_project/macros/adapters/clone.sql index 8525983619e..bc760d4a354 100644 --- a/core/dbt/include/global_project/macros/adapters/clone.sql +++ b/core/dbt/include/global_project/macros/adapters/clone.sql @@ -51,7 +51,7 @@ {%- if not state_relation -%} -- nothing to do - {{ log("No relation found in state manifest for " ~ model.unique_id) }} + {{ log("No relation found in state manifest for " ~ model.unique_id, info=True) }} {{ return(relations) }} {%- endif -%} @@ -59,7 +59,7 @@ {%- if existing_relation and not flags.FULL_REFRESH -%} -- noop! - {{ log("Relation " ~ existing_relation ~ " already exists") }} + {{ log("Relation " ~ existing_relation ~ " already exists", info=True) }} {{ return(relations) }} {%- endif -%} diff --git a/core/dbt/task/clone.py b/core/dbt/task/clone.py index cab0d6a2de4..3ec5a7d12b2 100644 --- a/core/dbt/task/clone.py +++ b/core/dbt/task/clone.py @@ -82,9 +82,7 @@ def execute(self, model, manifest): hook_ctx = self.adapter.pre_model_hook(context_config) try: - result = MacroGenerator( - materialization_macro, context, stack=context["context_macro_stack"] - )() + result = MacroGenerator(materialization_macro, context)() finally: self.adapter.post_model_hook(context_config, hook_ctx) diff --git a/tests/functional/defer_state/test_defer_state.py b/tests/functional/defer_state/test_defer_state.py index 8c8e925ee37..540718c968f 100644 --- a/tests/functional/defer_state/test_defer_state.py +++ b/tests/functional/defer_state/test_defer_state.py @@ -87,7 +87,7 @@ def copy_state(self, project_root): f"{project_root}/target/manifest.json", f"{project_root}/state/manifest.json" ) - def run_and_save_state(self, project_root): + def run_and_save_state(self, project_root, with_snapshot=False): results = run_dbt(["seed"]) assert len(results) == 1 assert not any(r.node.deferred for r in results) @@ -97,6 +97,11 @@ def run_and_save_state(self, project_root): results = run_dbt(["test"]) assert len(results) == 2 + if with_snapshot: + results = run_dbt(["snapshot"]) + assert len(results) == 1 + assert not any(r.node.deferred for r in results) + # copy files self.copy_state(project_root) @@ -373,16 +378,14 @@ def macros(self): def test_clone(self, project, unique_schema, other_schema): project.create_test_schema(other_schema) - self.run_and_save_state(project.project_root) + self.run_and_save_state(project.project_root, with_snapshot=True) clone_args = [ "clone", "--state", - "target", + "state", "--target", "otherschema", - "--target-path", - "target_otherschema", ] results = run_dbt(clone_args) From 31870f9016f62d11cabeb1f1fb934316de4ab118 Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Thu, 15 Jun 2023 18:01:11 -0500 Subject: [PATCH 06/24] add state_relation to required_model_keys --- tests/unit/test_context.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/test_context.py b/tests/unit/test_context.py index 1c02a650b9a..5bb895b844e 100644 --- a/tests/unit/test_context.py +++ b/tests/unit/test_context.py @@ -230,7 +230,7 @@ def assert_has_keys(required_keys: Set[str], maybe_keys: Set[str], ctx: Dict[str "submit_python_job", "dbt_metadata_envs", } -REQUIRED_MODEL_KEYS = REQUIRED_MACRO_KEYS | {"this", "compiled_code"} +REQUIRED_MODEL_KEYS = REQUIRED_MACRO_KEYS | {"this", "compiled_code", "state_relation"} MAYBE_KEYS = frozenset({"debug"}) From 692e54287dc225a441a69205f23cfb2edacfff81 Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Fri, 16 Jun 2023 10:50:44 -0500 Subject: [PATCH 07/24] Move macros into models directory --- .../unreleased/Features-20230616-104849.yaml | 6 ++ .../models/clone/can_clone_table.sql | 7 +++ .../models/clone}/clone.sql | 57 ++----------------- .../models/clone/create_or_replace_clone.sql | 7 +++ .../models/clone/get_clone_target.sql | 10 ++++ 5 files changed, 35 insertions(+), 52 deletions(-) create mode 100644 .changes/unreleased/Features-20230616-104849.yaml create mode 100644 core/dbt/include/global_project/macros/materializations/models/clone/can_clone_table.sql rename core/dbt/include/global_project/macros/{adapters => materializations/models/clone}/clone.sql (61%) create mode 100644 core/dbt/include/global_project/macros/materializations/models/clone/create_or_replace_clone.sql create mode 100644 core/dbt/include/global_project/macros/materializations/models/clone/get_clone_target.sql diff --git a/.changes/unreleased/Features-20230616-104849.yaml b/.changes/unreleased/Features-20230616-104849.yaml new file mode 100644 index 00000000000..11d14e299a0 --- /dev/null +++ b/.changes/unreleased/Features-20230616-104849.yaml @@ -0,0 +1,6 @@ +kind: Features +body: dbt clone +time: 2023-06-16T10:48:49.079961-05:00 +custom: + Author: jtcohen6 aranke + Issue: "7258" diff --git a/core/dbt/include/global_project/macros/materializations/models/clone/can_clone_table.sql b/core/dbt/include/global_project/macros/materializations/models/clone/can_clone_table.sql new file mode 100644 index 00000000000..89628bfab35 --- /dev/null +++ b/core/dbt/include/global_project/macros/materializations/models/clone/can_clone_table.sql @@ -0,0 +1,7 @@ +{% macro can_clone_table() %} + {{ return(adapter.dispatch('can_clone_table', 'dbt')()) }} +{% endmacro %} + +{% macro default__can_clone_table() %} + {{ return(False) }} +{% endmacro %} diff --git a/core/dbt/include/global_project/macros/adapters/clone.sql b/core/dbt/include/global_project/macros/materializations/models/clone/clone.sql similarity index 61% rename from core/dbt/include/global_project/macros/adapters/clone.sql rename to core/dbt/include/global_project/macros/materializations/models/clone/clone.sql index bc760d4a354..ec89df9431c 100644 --- a/core/dbt/include/global_project/macros/adapters/clone.sql +++ b/core/dbt/include/global_project/macros/materializations/models/clone/clone.sql @@ -1,50 +1,3 @@ -{% macro can_clone_tables() %} - {{ return(adapter.dispatch('can_clone_tables', 'dbt')()) }} -{% endmacro %} - - -{% macro default__can_clone_tables() %} - {{ return(False) }} -{% endmacro %} - - -{% macro snowflake__can_clone_tables() %} - {{ return(True) }} -{% endmacro %} - - -{% macro get_pointer_sql(to_relation) %} - {{ return(adapter.dispatch('get_pointer_sql', 'dbt')(to_relation)) }} -{% endmacro %} - - -{% macro default__get_pointer_sql(to_relation) %} - {% set pointer_sql %} - select * from {{ to_relation }} - {% endset %} - {{ return(pointer_sql) }} -{% endmacro %} - - -{% macro get_clone_table_sql(this_relation, state_relation) %} - {{ return(adapter.dispatch('get_clone_table_sql', 'dbt')(this_relation, state_relation)) }} -{% endmacro %} - - -{% macro default__get_clone_table_sql(this_relation, state_relation) %} - create or replace table {{ this_relation }} clone {{ state_relation }} -{% endmacro %} - - -{% macro snowflake__get_clone_table_sql(this_relation, state_relation) %} - create or replace - {{ "transient" if config.get("transient", true) }} - table {{ this_relation }} - clone {{ state_relation }} - {{ "copy grants" if config.get("copy_grants", false) }} -{% endmacro %} - - {%- materialization clone, default -%} {%- set relations = {'relations': []} -%} @@ -68,9 +21,9 @@ -- If this is a database that can do zero-copy cloning of tables, and the other relation is a table, then this will be a table -- Otherwise, this will be a view - {% set can_clone_tables = can_clone_tables() %} + {% set can_clone_table = can_clone_table() %} - {%- if other_existing_relation and other_existing_relation.type == 'table' and can_clone_tables -%} + {%- if other_existing_relation and other_existing_relation.type == 'table' and can_clone_table -%} {%- set target_relation = this.incorporate(type='table') -%} {% if existing_relation is not none and not existing_relation.is_table %} @@ -80,7 +33,7 @@ -- as a general rule, data platforms that can clone tables can also do atomic 'create or replace' {% call statement('main') %} - {{ get_clone_table_sql(target_relation, state_relation) }} + {{ create_or_replace_clone(target_relation, state_relation) }} {% endcall %} {% set should_revoke = should_revoke(existing_relation, full_refresh_mode=True) %} @@ -96,8 +49,8 @@ -- TODO: this should probably be illegal -- I'm just doing it out of convenience to reuse the 'view' materialization logic {%- do context.update({ - 'sql': get_pointer_sql(state_relation), - 'compiled_code': get_pointer_sql(state_relation) + 'sql': get_clone_target(state_relation), + 'compiled_code': get_clone_target(state_relation) }) -%} -- reuse the view materialization diff --git a/core/dbt/include/global_project/macros/materializations/models/clone/create_or_replace_clone.sql b/core/dbt/include/global_project/macros/materializations/models/clone/create_or_replace_clone.sql new file mode 100644 index 00000000000..fd766b76c88 --- /dev/null +++ b/core/dbt/include/global_project/macros/materializations/models/clone/create_or_replace_clone.sql @@ -0,0 +1,7 @@ +{% macro create_or_replace_clone(this_relation, state_relation) %} + {{ return(adapter.dispatch('create_or_replace_clone', 'dbt')(this_relation, state_relation)) }} +{% endmacro %} + +{% macro default__create_or_replace_clone(this_relation, state_relation) %} + create or replace table {{ this_relation }} clone {{ state_relation }} +{% endmacro %} diff --git a/core/dbt/include/global_project/macros/materializations/models/clone/get_clone_target.sql b/core/dbt/include/global_project/macros/materializations/models/clone/get_clone_target.sql new file mode 100644 index 00000000000..46a3d740135 --- /dev/null +++ b/core/dbt/include/global_project/macros/materializations/models/clone/get_clone_target.sql @@ -0,0 +1,10 @@ +{% macro get_clone_target(to_relation) %} + {{ return(adapter.dispatch('get_clone_target', 'dbt')(to_relation)) }} +{% endmacro %} + +{% macro default__get_clone_target(to_relation) %} + {% set target_sql %} + select * from {{ to_relation }} + {% endset %} + {{ return(target_sql) }} +{% endmacro %} From 69a3dbdcf2396514260fcc12a3e10b0897340b15 Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Fri, 16 Jun 2023 16:31:21 -0500 Subject: [PATCH 08/24] Simplify test --- .../defer_state/test_defer_state.py | 25 +++++++++---------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/tests/functional/defer_state/test_defer_state.py b/tests/functional/defer_state/test_defer_state.py index 540718c968f..32ac6e36494 100644 --- a/tests/functional/defer_state/test_defer_state.py +++ b/tests/functional/defer_state/test_defer_state.py @@ -7,10 +7,8 @@ from dbt.cli.exceptions import DbtUsageException from dbt.contracts.results import RunStatus -from dbt.tests.util import run_dbt, write_file, rm_file - from dbt.exceptions import DbtRuntimeError - +from dbt.tests.util import run_dbt, write_file, rm_file from tests.functional.defer_state.fixtures import ( seed_csv, table_model_sql, @@ -389,24 +387,25 @@ def test_clone(self, project, unique_schema, other_schema): ] results = run_dbt(clone_args) - # TODO: need an "adapter zone" version of this test that checks to see - # how many of the cloned objects are "pointers" (views) versus "true clones" (tables) - # e.g. on Postgres we expect to see 4 views - # whereas on Snowflake we'd expect to see 3 cloned tables + 1 view - assert [r.message for r in results] == ["CREATE VIEW"] * 4 + assert len(results) == 4 + + assert all("create view" in r.message.lower() for r in results) schema_relations = project.adapter.list_relations( database=project.database, schema=other_schema ) - assert [r.type for r in schema_relations] == ["view"] * 4 + assert all(r.type == "view" for r in schema_relations) # objects already exist, so this is a no-op results = run_dbt(clone_args) - assert [r.message for r in results] == ["No-op"] * 4 + assert len(results) == 4 + assert all("no-op" in r.message.lower() for r in results) # recreate all objects - results = run_dbt(clone_args + ["--full-refresh"]) - assert [r.message for r in results] == ["CREATE VIEW"] * 4 + results = run_dbt([*clone_args, "--full-refresh"]) + assert len(results) == 4 + assert all("create view" in r.message.lower() for r in results) # select only models this time - results = run_dbt(clone_args + ["--resource-type", "model"]) + results = run_dbt([*clone_args, "--resource-type", "model"]) assert len(results) == 2 + assert all("no-op" in r.message.lower() for r in results) From b5dde06e38678e3e648f5cd9c88b22dac100cf47 Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Tue, 20 Jun 2023 16:54:05 -0500 Subject: [PATCH 09/24] add no state test, move state_relation to maybe_keys --- tests/functional/defer_state/test_defer_state.py | 16 ++++++++++++++++ tests/unit/test_context.py | 4 ++-- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/tests/functional/defer_state/test_defer_state.py b/tests/functional/defer_state/test_defer_state.py index 32ac6e36494..aedf2186655 100644 --- a/tests/functional/defer_state/test_defer_state.py +++ b/tests/functional/defer_state/test_defer_state.py @@ -409,3 +409,19 @@ def test_clone(self, project, unique_schema, other_schema): results = run_dbt([*clone_args, "--resource-type", "model"]) assert len(results) == 2 assert all("no-op" in r.message.lower() for r in results) + + def test_clone_no_state(self, project, unique_schema, other_schema): + project.create_test_schema(other_schema) + self.run_and_save_state(project.project_root, with_snapshot=True) + + clone_args = [ + "clone", + "--target", + "otherschema", + ] + + with pytest.raises( + DbtRuntimeError, + match="--state is required for cloning relations from another environment", + ): + run_dbt(clone_args) diff --git a/tests/unit/test_context.py b/tests/unit/test_context.py index 5bb895b844e..2d7f6e99112 100644 --- a/tests/unit/test_context.py +++ b/tests/unit/test_context.py @@ -230,8 +230,8 @@ def assert_has_keys(required_keys: Set[str], maybe_keys: Set[str], ctx: Dict[str "submit_python_job", "dbt_metadata_envs", } -REQUIRED_MODEL_KEYS = REQUIRED_MACRO_KEYS | {"this", "compiled_code", "state_relation"} -MAYBE_KEYS = frozenset({"debug"}) +REQUIRED_MODEL_KEYS = REQUIRED_MACRO_KEYS | {"this", "compiled_code"} +MAYBE_KEYS = frozenset({"debug", "state_relation"}) POSTGRES_PROFILE_DATA = { From c93685611ae5f1db28509bb391843249b958051e Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Wed, 21 Jun 2023 09:42:51 -0500 Subject: [PATCH 10/24] rename: state_relation -> defer_relation --- core/dbt/context/providers.py | 6 +++--- core/dbt/contracts/graph/manifest.py | 8 ++++---- core/dbt/contracts/graph/nodes.py | 8 ++++---- .../macros/materializations/models/clone/clone.sql | 12 +++++------- .../models/clone/create_or_replace_clone.sql | 8 ++++---- core/dbt/task/clone.py | 4 ++-- tests/unit/test_context.py | 2 +- tests/unit/test_manifest.py | 8 ++++---- 8 files changed, 27 insertions(+), 29 deletions(-) diff --git a/core/dbt/context/providers.py b/core/dbt/context/providers.py index 41baa24dfc0..6cffa624ded 100644 --- a/core/dbt/context/providers.py +++ b/core/dbt/context/providers.py @@ -1433,15 +1433,15 @@ def this(self) -> Optional[RelationProxy]: return self.db_wrapper.Relation.create_from(self.config, self.model) @contextproperty - def state_relation(self) -> Optional[RelationProxy]: + def defer_relation(self) -> Optional[RelationProxy]: """ For commands which add information about this node's corresponding production version (via a --state artifact), access the Relation object for that stateful other """ - if getattr(self.model, "state_relation", None): + if getattr(self.model, "defer_relation", None): return self.db_wrapper.Relation.create_from_node( - self.config, self.model.state_relation # type: ignore + self.config, self.model.defer_relation # type: ignore ) else: return None diff --git a/core/dbt/contracts/graph/manifest.py b/core/dbt/contracts/graph/manifest.py index 5df82b79672..dab8dd46054 100644 --- a/core/dbt/contracts/graph/manifest.py +++ b/core/dbt/contracts/graph/manifest.py @@ -1177,8 +1177,8 @@ def add_from_artifact( for unique_id, node in other.nodes.items(): current = self.nodes.get(unique_id) if current and (node.resource_type in refables and not node.is_ephemeral): - state_relation = RelationalNode(node.database, node.schema, node.alias) - self.nodes[unique_id] = current.replace(state_relation=state_relation) + defer_relation = RelationalNode(node.database, node.schema, node.alias) + self.nodes[unique_id] = current.replace(defer_relation=defer_relation) # Methods that were formerly in ParseResult @@ -1402,8 +1402,8 @@ def __post_serialize__(self, dct): for unique_id, node in dct["nodes"].items(): if "config_call_dict" in node: del node["config_call_dict"] - if "state_relation" in node: - del node["state_relation"] + if "defer_relation" in node: + del node["defer_relation"] return dct diff --git a/core/dbt/contracts/graph/nodes.py b/core/dbt/contracts/graph/nodes.py index d0f2930b4ef..5acace0aa44 100644 --- a/core/dbt/contracts/graph/nodes.py +++ b/core/dbt/contracts/graph/nodes.py @@ -280,7 +280,7 @@ def add_public_node(self, value: str): @dataclass -class StateRelation(dbtClassMixin): +class DeferRelation(dbtClassMixin): alias: str database: Optional[str] schema: str @@ -585,7 +585,7 @@ class ModelNode(CompiledNode): version: Optional[NodeVersion] = None latest_version: Optional[NodeVersion] = None deprecation_date: Optional[datetime] = None - state_relation: Optional[StateRelation] = None + defer_relation: Optional[DeferRelation] = None @property def is_latest_version(self) -> bool: @@ -767,7 +767,7 @@ class SeedNode(ParsedNode): # No SQLDefaults! # and we need the root_path to load the seed later root_path: Optional[str] = None depends_on: MacroDependsOn = field(default_factory=MacroDependsOn) - state_relation: Optional[StateRelation] = None + defer_relation: Optional[DeferRelation] = None def same_seeds(self, other: "SeedNode") -> bool: # for seeds, we check the hashes. If the hashes are different types, @@ -966,7 +966,7 @@ class IntermediateSnapshotNode(CompiledNode): class SnapshotNode(CompiledNode): resource_type: NodeType = field(metadata={"restrict": [NodeType.Snapshot]}) config: SnapshotConfig - state_relation: Optional[StateRelation] = None + defer_relation: Optional[DeferRelation] = None # ==================================== diff --git a/core/dbt/include/global_project/macros/materializations/models/clone/clone.sql b/core/dbt/include/global_project/macros/materializations/models/clone/clone.sql index ec89df9431c..f46f94fbf6d 100644 --- a/core/dbt/include/global_project/macros/materializations/models/clone/clone.sql +++ b/core/dbt/include/global_project/macros/materializations/models/clone/clone.sql @@ -2,7 +2,7 @@ {%- set relations = {'relations': []} -%} - {%- if not state_relation -%} + {%- if not defer_relation -%} -- nothing to do {{ log("No relation found in state manifest for " ~ model.unique_id, info=True) }} {{ return(relations) }} @@ -16,7 +16,7 @@ {{ return(relations) }} {%- endif -%} - {%- set other_existing_relation = load_cached_relation(state_relation) -%} + {%- set other_existing_relation = load_cached_relation(defer_relation) -%} -- If this is a database that can do zero-copy cloning of tables, and the other relation is a table, then this will be a table -- Otherwise, this will be a view @@ -33,7 +33,7 @@ -- as a general rule, data platforms that can clone tables can also do atomic 'create or replace' {% call statement('main') %} - {{ create_or_replace_clone(target_relation, state_relation) }} + {{ create_or_replace_clone(target_relation, defer_relation) }} {% endcall %} {% set should_revoke = should_revoke(existing_relation, full_refresh_mode=True) %} @@ -46,11 +46,9 @@ {%- set target_relation = this.incorporate(type='view') -%} - -- TODO: this should probably be illegal - -- I'm just doing it out of convenience to reuse the 'view' materialization logic {%- do context.update({ - 'sql': get_clone_target(state_relation), - 'compiled_code': get_clone_target(state_relation) + 'sql': get_clone_target(defer_relation), + 'compiled_code': get_clone_target(defer_relation) }) -%} -- reuse the view materialization diff --git a/core/dbt/include/global_project/macros/materializations/models/clone/create_or_replace_clone.sql b/core/dbt/include/global_project/macros/materializations/models/clone/create_or_replace_clone.sql index fd766b76c88..204e9e874e4 100644 --- a/core/dbt/include/global_project/macros/materializations/models/clone/create_or_replace_clone.sql +++ b/core/dbt/include/global_project/macros/materializations/models/clone/create_or_replace_clone.sql @@ -1,7 +1,7 @@ -{% macro create_or_replace_clone(this_relation, state_relation) %} - {{ return(adapter.dispatch('create_or_replace_clone', 'dbt')(this_relation, state_relation)) }} +{% macro create_or_replace_clone(this_relation, defer_relation) %} + {{ return(adapter.dispatch('create_or_replace_clone', 'dbt')(this_relation, defer_relation)) }} {% endmacro %} -{% macro default__create_or_replace_clone(this_relation, state_relation) %} - create or replace table {{ this_relation }} clone {{ state_relation }} +{% macro default__create_or_replace_clone(this_relation, defer_relation) %} + create or replace table {{ this_relation }} clone {{ defer_relation }} {% endmacro %} diff --git a/core/dbt/task/clone.py b/core/dbt/task/clone.py index 3ec5a7d12b2..56cf46824b6 100644 --- a/core/dbt/task/clone.py +++ b/core/dbt/task/clone.py @@ -109,9 +109,9 @@ def get_model_schemas(self, adapter, selected_uids: Iterable[str]) -> Set[BaseRe result.add(relation.without_identifier()) # cache the 'other' schemas too! - if node.state_relation: # type: ignore + if node.defer_relation: # type: ignore other_relation = adapter.Relation.create_from_node( - self.config, node.state_relation # type: ignore + self.config, node.defer_relation # type: ignore ) result.add(other_relation.without_identifier()) diff --git a/tests/unit/test_context.py b/tests/unit/test_context.py index 2d7f6e99112..3034b85562f 100644 --- a/tests/unit/test_context.py +++ b/tests/unit/test_context.py @@ -231,7 +231,7 @@ def assert_has_keys(required_keys: Set[str], maybe_keys: Set[str], ctx: Dict[str "dbt_metadata_envs", } REQUIRED_MODEL_KEYS = REQUIRED_MACRO_KEYS | {"this", "compiled_code"} -MAYBE_KEYS = frozenset({"debug", "state_relation"}) +MAYBE_KEYS = frozenset({"debug", "defer_relation"}) POSTGRES_PROFILE_DATA = { diff --git a/tests/unit/test_manifest.py b/tests/unit/test_manifest.py index 0b07fee1e78..9e156324317 100644 --- a/tests/unit/test_manifest.py +++ b/tests/unit/test_manifest.py @@ -679,14 +679,14 @@ def test_add_from_artifact(self): assert "model.root.nested2" not in original_manifest.nodes # old node removed should not have state relation in original manifest - assert original_manifest.nodes["model.root.nested"].state_relation is None + assert original_manifest.nodes["model.root.nested"].defer_relation is None # for all other nodes, check that state relation is updated for k, v in original_manifest.nodes.items(): if k != "model.root.nested": - self.assertEqual("other_" + v.database, v.state_relation.database) - self.assertEqual("other_" + v.schema, v.state_relation.schema) - self.assertEqual("other_" + v.alias, v.state_relation.alias) + self.assertEqual("other_" + v.database, v.defer_relation.database) + self.assertEqual("other_" + v.schema, v.defer_relation.schema) + self.assertEqual("other_" + v.alias, v.defer_relation.alias) class MixedManifestTest(unittest.TestCase): From 2b0eb25aca1ba89266fa1d41ea09e6627f74db58 Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Wed, 21 Jun 2023 10:05:28 -0500 Subject: [PATCH 11/24] missed a spot --- tests/unit/test_manifest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/test_manifest.py b/tests/unit/test_manifest.py index 9e156324317..19b072183eb 100644 --- a/tests/unit/test_manifest.py +++ b/tests/unit/test_manifest.py @@ -91,7 +91,7 @@ "latest_version", "constraints", "deprecation_date", - "state_relation", + "defer_relation", } ) From bcecced9753ad5bc318059310ccd7c4270caccf8 Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Wed, 21 Jun 2023 10:40:58 -0500 Subject: [PATCH 12/24] Move _get_deferred_manifest to GraphRunnableTask --- core/dbt/task/clone.py | 27 +++++++-------------------- core/dbt/task/compile.py | 15 ++------------- core/dbt/task/runnable.py | 12 ++++++++++++ 3 files changed, 21 insertions(+), 33 deletions(-) diff --git a/core/dbt/task/clone.py b/core/dbt/task/clone.py index 56cf46824b6..87fb1a78106 100644 --- a/core/dbt/task/clone.py +++ b/core/dbt/task/clone.py @@ -1,20 +1,18 @@ import threading -from typing import AbstractSet, Optional, Any, List, Iterable, Set +from typing import AbstractSet, Any, List, Iterable, Set -from dbt.dataclass_schema import dbtClassMixin - -from dbt.contracts.graph.manifest import WritableManifest +from dbt.adapters.base import BaseRelation +from dbt.clients.jinja import MacroGenerator +from dbt.context.providers import generate_runtime_model_context from dbt.contracts.results import RunStatus, RunResult -from dbt.exceptions import DbtInternalError, DbtRuntimeError, CompilationError +from dbt.dataclass_schema import dbtClassMixin +from dbt.exceptions import DbtInternalError, CompilationError from dbt.graph import ResourceTypeSelector from dbt.node_types import NodeType from dbt.parser.manifest import write_manifest from dbt.task.base import BaseRunner -from dbt.task.runnable import GraphRunnableTask from dbt.task.run import _validate_materialization_relations_dict -from dbt.adapters.base import BaseRelation -from dbt.clients.jinja import MacroGenerator -from dbt.context.providers import generate_runtime_model_context +from dbt.task.runnable import GraphRunnableTask class CloneRunner(BaseRunner): @@ -157,17 +155,6 @@ def get_node_selector(self) -> ResourceTypeSelector: def get_runner_type(self, _): return CloneRunner - def _get_deferred_manifest(self) -> Optional[WritableManifest]: - state = self.previous_state - if state is None: - raise DbtRuntimeError( - "--state is required for cloning relations from another environment" - ) - - if state.manifest is None: - raise DbtRuntimeError(f'Could not find manifest in --state path: "{self.args.state}"') - return state.manifest - # Note that this is different behavior from --defer with other commands, which *merge* # selected nodes from this manifest + unselected nodes from the other manifest def defer_to_manifest(self, adapter, selected_uids: AbstractSet[str]): diff --git a/core/dbt/task/compile.py b/core/dbt/task/compile.py index 1e6ecce7ee4..23155c7bc79 100644 --- a/core/dbt/task/compile.py +++ b/core/dbt/task/compile.py @@ -6,7 +6,7 @@ from dbt.events.base_types import EventLevel from dbt.events.functions import fire_event from dbt.events.types import CompiledNode, Note -from dbt.exceptions import DbtInternalError, DbtRuntimeError +from dbt.exceptions import DbtInternalError from dbt.graph import ResourceTypeSelector from dbt.node_types import NodeType from dbt.parser.manifest import write_manifest, process_node @@ -97,18 +97,7 @@ def task_end_messages(self, results): ) def _get_deferred_manifest(self) -> Optional[WritableManifest]: - if not self.args.defer: - return None - - state = self.previous_defer_state or self.previous_state - if not state: - raise DbtRuntimeError( - "Received a --defer argument, but no value was provided to --state" - ) - - if not state.manifest: - raise DbtRuntimeError(f'Could not find manifest in --state path: "{state}"') - return state.manifest + return super()._get_deferred_manifest() if self.args.defer else None def defer_to_manifest(self, adapter, selected_uids: AbstractSet[str]): deferred_manifest = self._get_deferred_manifest() diff --git a/core/dbt/task/runnable.py b/core/dbt/task/runnable.py index b1a74fd1126..c3a12f83343 100644 --- a/core/dbt/task/runnable.py +++ b/core/dbt/task/runnable.py @@ -54,6 +54,7 @@ import dbt.exceptions from dbt.flags import get_flags import dbt.utils +from dbt.contracts.graph.manifest import WritableManifest RESULT_FILE_NAME = "run_results.json" RUNNING_STATE = DbtProcessState("running") @@ -580,3 +581,14 @@ def get_result(self, results, elapsed_time, generated_at): def task_end_messages(self, results): print_run_end_messages(results) + + def _get_deferred_manifest(self) -> Optional[WritableManifest]: + state = self.previous_defer_state or self.previous_state + if not state: + raise DbtRuntimeError( + "Received a --defer argument, but no value was provided to --state" + ) + + if not state.manifest: + raise DbtRuntimeError(f'Could not find manifest in --state path: "{state}"') + return state.manifest From 73272835d89fc4801662a6c1a241145514a4d1ba Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Wed, 21 Jun 2023 11:56:15 -0500 Subject: [PATCH 13/24] Reword error message --- core/dbt/task/runnable.py | 2 +- tests/functional/defer_state/test_defer_state.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/dbt/task/runnable.py b/core/dbt/task/runnable.py index c3a12f83343..7e5495d6064 100644 --- a/core/dbt/task/runnable.py +++ b/core/dbt/task/runnable.py @@ -586,7 +586,7 @@ def _get_deferred_manifest(self) -> Optional[WritableManifest]: state = self.previous_defer_state or self.previous_state if not state: raise DbtRuntimeError( - "Received a --defer argument, but no value was provided to --state" + "--state or --defer-state are required for deferral, but neither was provided" ) if not state.manifest: diff --git a/tests/functional/defer_state/test_defer_state.py b/tests/functional/defer_state/test_defer_state.py index aedf2186655..0ac1d25fc6d 100644 --- a/tests/functional/defer_state/test_defer_state.py +++ b/tests/functional/defer_state/test_defer_state.py @@ -422,6 +422,6 @@ def test_clone_no_state(self, project, unique_schema, other_schema): with pytest.raises( DbtRuntimeError, - match="--state is required for cloning relations from another environment", + match="--state or --defer-state are required for deferral, but neither was provided", ): run_dbt(clone_args) From 0543303d19cff6f48bc71fc2f8fc443bae36959e Mon Sep 17 00:00:00 2001 From: Matthew McKnight <91097623+McKnight-42@users.noreply.github.com> Date: Thu, 22 Jun 2023 10:35:20 -0500 Subject: [PATCH 14/24] =?UTF-8?q?create=20adapter=20zone=20vesions=20of=20?= =?UTF-8?q?dbt=5Fclone=20tests=20to=20be=20inherited=20by=20oth=E2=80=A6?= =?UTF-8?q?=20(#7918)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dbt/tests/adapter/dbt_clone/fixtures.py | 106 +++++++++ .../tests/adapter/dbt_clone/test_dbt_clone.py | 203 ++++++++++++++++++ .../defer_state/test_defer_state.py | 77 ------- 3 files changed, 309 insertions(+), 77 deletions(-) create mode 100644 tests/adapter/dbt/tests/adapter/dbt_clone/fixtures.py create mode 100644 tests/adapter/dbt/tests/adapter/dbt_clone/test_dbt_clone.py diff --git a/tests/adapter/dbt/tests/adapter/dbt_clone/fixtures.py b/tests/adapter/dbt/tests/adapter/dbt_clone/fixtures.py new file mode 100644 index 00000000000..aa1d9bf80b2 --- /dev/null +++ b/tests/adapter/dbt/tests/adapter/dbt_clone/fixtures.py @@ -0,0 +1,106 @@ +seed_csv = """id,name +1,Alice +2,Bob +""" + +table_model_sql = """ +{{ config(materialized='table') }} +select * from {{ ref('ephemeral_model') }} + +-- establish a macro dependency to trigger state:modified.macros +-- depends on: {{ my_macro() }} +""" + +view_model_sql = """ +select * from {{ ref('seed') }} + +-- establish a macro dependency that trips infinite recursion if not handled +-- depends on: {{ my_infinitely_recursive_macro() }} +""" + +ephemeral_model_sql = """ +{{ config(materialized='ephemeral') }} +select * from {{ ref('view_model') }} +""" + +exposures_yml = """ +version: 2 +exposures: + - name: my_exposure + type: application + depends_on: + - ref('view_model') + owner: + email: test@example.com +""" + +schema_yml = """ +version: 2 +models: + - name: view_model + columns: + - name: id + tests: + - unique: + severity: error + - not_null + - name: name +""" + +get_schema_name_sql = """ +{% macro generate_schema_name(custom_schema_name, node) -%} + {%- set default_schema = target.schema -%} + {%- if custom_schema_name is not none -%} + {{ return(default_schema ~ '_' ~ custom_schema_name|trim) }} + -- put seeds into a separate schema in "prod", to verify that cloning in "dev" still works + {%- elif target.name == 'default' and node.resource_type == 'seed' -%} + {{ return(default_schema ~ '_' ~ 'seeds') }} + {%- else -%} + {{ return(default_schema) }} + {%- endif -%} +{%- endmacro %} +""" + +snapshot_sql = """ +{% snapshot my_cool_snapshot %} + + {{ + config( + target_database=database, + target_schema=schema, + unique_key='id', + strategy='check', + check_cols=['id'], + ) + }} + select * from {{ ref('view_model') }} + +{% endsnapshot %} +""" +macros_sql = """ +{% macro my_macro() %} + {% do log('in a macro' ) %} +{% endmacro %} +""" + +infinite_macros_sql = """ +{# trigger infinite recursion if not handled #} + +{% macro my_infinitely_recursive_macro() %} + {{ return(adapter.dispatch('my_infinitely_recursive_macro')()) }} +{% endmacro %} + +{% macro default__my_infinitely_recursive_macro() %} + {% if unmet_condition %} + {{ my_infinitely_recursive_macro() }} + {% else %} + {{ return('') }} + {% endif %} +{% endmacro %} +""" + +custom_can_clone_tables_false_macros_sql = """ +{% macro can_clone_table() %} + {{ return(False) }} +{% endmacro %} +""" diff --git a/tests/adapter/dbt/tests/adapter/dbt_clone/test_dbt_clone.py b/tests/adapter/dbt/tests/adapter/dbt_clone/test_dbt_clone.py new file mode 100644 index 00000000000..ca1474819fa --- /dev/null +++ b/tests/adapter/dbt/tests/adapter/dbt_clone/test_dbt_clone.py @@ -0,0 +1,203 @@ +import os +import shutil +from copy import deepcopy +import pytest +from collections import Counter +from dbt.exceptions import DbtRuntimeError +from dbt.tests.util import run_dbt +from dbt.tests.adapter.dbt_clone.fixtures import ( + seed_csv, + table_model_sql, + view_model_sql, + ephemeral_model_sql, + exposures_yml, + schema_yml, + snapshot_sql, + get_schema_name_sql, + macros_sql, + infinite_macros_sql, + custom_can_clone_tables_false_macros_sql, +) + + +class BaseClone: + @pytest.fixture(scope="class") + def models(self): + return { + "table_model.sql": table_model_sql, + "view_model.sql": view_model_sql, + "ephemeral_model.sql": ephemeral_model_sql, + "schema.yml": schema_yml, + "exposures.yml": exposures_yml, + } + + @pytest.fixture(scope="class") + def macros(self): + return { + "macros.sql": macros_sql, + "infinite_macros.sql": infinite_macros_sql, + "get_schema_name.sql": get_schema_name_sql, + } + + @pytest.fixture(scope="class") + def seeds(self): + return { + "seed.csv": seed_csv, + } + + @pytest.fixture(scope="class") + def snapshots(self): + return { + "snapshot.sql": snapshot_sql, + } + + @pytest.fixture(scope="class") + def other_schema(self, unique_schema): + return unique_schema + "_other" + + @property + def project_config_update(self): + return { + "seeds": { + "test": { + "quote_columns": False, + } + } + } + + @pytest.fixture(scope="class") + def profiles_config_update(self, dbt_profile_target, unique_schema, other_schema): + outputs = {"default": dbt_profile_target, "otherschema": deepcopy(dbt_profile_target)} + outputs["default"]["schema"] = unique_schema + outputs["otherschema"]["schema"] = other_schema + return {"test": {"outputs": outputs, "target": "default"}} + + def copy_state(self, project_root): + state_path = os.path.join(project_root, "state") + if not os.path.exists(state_path): + os.makedirs(state_path) + shutil.copyfile( + f"{project_root}/target/manifest.json", f"{project_root}/state/manifest.json" + ) + + def run_and_save_state(self, project_root, with_snapshot=False): + results = run_dbt(["seed"]) + assert len(results) == 1 + assert not any(r.node.deferred for r in results) + results = run_dbt(["run"]) + assert len(results) == 2 + assert not any(r.node.deferred for r in results) + results = run_dbt(["test"]) + assert len(results) == 2 + + if with_snapshot: + results = run_dbt(["snapshot"]) + assert len(results) == 1 + assert not any(r.node.deferred for r in results) + + # copy files + self.copy_state(project_root) + + +# -- Below we define base classes for tests you import the one based on if your adapter uses dbt clone or not -- +class BaseClonePossible(BaseClone): + def test_can_clone_true(self, project, unique_schema, other_schema): + project.create_test_schema(other_schema) + self.run_and_save_state(project.project_root, with_snapshot=True) + + clone_args = [ + "clone", + "--state", + "state", + "--target", + "otherschema", + ] + + results = run_dbt(clone_args) + assert len(results) == 4 + + schema_relations = project.adapter.list_relations( + database=project.database, schema=other_schema + ) + types = [r.type for r in schema_relations] + count_types = Counter(types) + assert count_types == Counter({"table": 3, "view": 1}) + + # objects already exist, so this is a no-op + results = run_dbt(clone_args) + assert len(results) == 4 + assert all("no-op" in r.message.lower() for r in results) + + # recreate all objects + results = run_dbt([*clone_args, "--full-refresh"]) + assert len(results) == 4 + + # select only models this time + results = run_dbt([*clone_args, "--resource-type", "model"]) + assert len(results) == 2 + assert all("no-op" in r.message.lower() for r in results) + + def test_clone_no_state(self, project, unique_schema, other_schema): + project.create_test_schema(other_schema) + self.run_and_save_state(project.project_root, with_snapshot=True) + + clone_args = [ + "clone", + "--target", + "otherschema", + ] + + with pytest.raises( + DbtRuntimeError, + match="--state or --defer-state are required for deferral, but neither was provided", + ): + run_dbt(clone_args) + + +class BaseCloneNotPossible(BaseClone): + @pytest.fixture(scope="class") + def macros(self): + return { + "macros.sql": macros_sql, + "my_can_clone_tables.sql": custom_can_clone_tables_false_macros_sql, + "infinite_macros.sql": infinite_macros_sql, + "get_schema_name.sql": get_schema_name_sql, + } + + def test_can_clone_false(self, project, unique_schema, other_schema): + project.create_test_schema(other_schema) + self.run_and_save_state(project.project_root, with_snapshot=True) + + clone_args = [ + "clone", + "--state", + "state", + "--target", + "otherschema", + ] + + results = run_dbt(clone_args) + assert len(results) == 4 + + schema_relations = project.adapter.list_relations( + database=project.database, schema=other_schema + ) + assert all(r.type == "view" for r in schema_relations) + + # objects already exist, so this is a no-op + results = run_dbt(clone_args) + assert len(results) == 4 + assert all("no-op" in r.message.lower() for r in results) + + # recreate all objects + results = run_dbt([*clone_args, "--full-refresh"]) + assert len(results) == 4 + + # select only models this time + results = run_dbt([*clone_args, "--resource-type", "model"]) + assert len(results) == 2 + assert all("no-op" in r.message.lower() for r in results) + + +class TestPostgresCloneNotPossible(BaseCloneNotPossible): + pass diff --git a/tests/functional/defer_state/test_defer_state.py b/tests/functional/defer_state/test_defer_state.py index 0ac1d25fc6d..f8b062e1076 100644 --- a/tests/functional/defer_state/test_defer_state.py +++ b/tests/functional/defer_state/test_defer_state.py @@ -348,80 +348,3 @@ def test_defer_state_flag(self, project, unique_schema, other_schema): assert results.results[0].status == RunStatus.Success assert results.results[0].node.name == "table_model" assert results.results[0].adapter_response["rows_affected"] == 2 - - -get_schema_name_sql = """ -{% macro generate_schema_name(custom_schema_name, node) -%} - {%- set default_schema = target.schema -%} - {%- if custom_schema_name is not none -%} - {{ return(default_schema ~ '_' ~ custom_schema_name|trim) }} - -- put seeds into a separate schema in "prod", to verify that cloning in "dev" still works - {%- elif target.name == 'default' and node.resource_type == 'seed' -%} - {{ return(default_schema ~ '_' ~ 'seeds') }} - {%- else -%} - {{ return(default_schema) }} - {%- endif -%} -{%- endmacro %} -""" - - -class TestCloneToOther(BaseDeferState): - @pytest.fixture(scope="class") - def macros(self): - return { - "macros.sql": macros_sql, - "infinite_macros.sql": infinite_macros_sql, - "get_schema_name.sql": get_schema_name_sql, - } - - def test_clone(self, project, unique_schema, other_schema): - project.create_test_schema(other_schema) - self.run_and_save_state(project.project_root, with_snapshot=True) - - clone_args = [ - "clone", - "--state", - "state", - "--target", - "otherschema", - ] - - results = run_dbt(clone_args) - assert len(results) == 4 - - assert all("create view" in r.message.lower() for r in results) - schema_relations = project.adapter.list_relations( - database=project.database, schema=other_schema - ) - assert all(r.type == "view" for r in schema_relations) - - # objects already exist, so this is a no-op - results = run_dbt(clone_args) - assert len(results) == 4 - assert all("no-op" in r.message.lower() for r in results) - - # recreate all objects - results = run_dbt([*clone_args, "--full-refresh"]) - assert len(results) == 4 - assert all("create view" in r.message.lower() for r in results) - - # select only models this time - results = run_dbt([*clone_args, "--resource-type", "model"]) - assert len(results) == 2 - assert all("no-op" in r.message.lower() for r in results) - - def test_clone_no_state(self, project, unique_schema, other_schema): - project.create_test_schema(other_schema) - self.run_and_save_state(project.project_root, with_snapshot=True) - - clone_args = [ - "clone", - "--target", - "otherschema", - ] - - with pytest.raises( - DbtRuntimeError, - match="--state or --defer-state are required for deferral, but neither was provided", - ): - run_dbt(clone_args) From 5aa42094405681d7433ddcfdff38dcb9a3feddd2 Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Thu, 22 Jun 2023 10:40:10 -0500 Subject: [PATCH 15/24] Add Matt McKnight to contributors list --- .changes/unreleased/Features-20230616-104849.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.changes/unreleased/Features-20230616-104849.yaml b/.changes/unreleased/Features-20230616-104849.yaml index 11d14e299a0..1a8396a61bf 100644 --- a/.changes/unreleased/Features-20230616-104849.yaml +++ b/.changes/unreleased/Features-20230616-104849.yaml @@ -2,5 +2,5 @@ kind: Features body: dbt clone time: 2023-06-16T10:48:49.079961-05:00 custom: - Author: jtcohen6 aranke + Author: jtcohen6 aranke McKnight-42 Issue: "7258" From 65eaa0a002b860fd8e5add0efd3e06d62036983b Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Mon, 26 Jun 2023 09:11:09 -0500 Subject: [PATCH 16/24] remove context.update hack --- core/dbt/context/providers.py | 24 ++++++++++++------- .../materializations/models/clone/clone.sql | 5 ---- .../models/clone/get_clone_target.sql | 10 -------- 3 files changed, 15 insertions(+), 24 deletions(-) delete mode 100644 core/dbt/include/global_project/macros/materializations/models/clone/get_clone_target.sql diff --git a/core/dbt/context/providers.py b/core/dbt/context/providers.py index e9d1c82c91b..d7b7ce71b2e 100644 --- a/core/dbt/context/providers.py +++ b/core/dbt/context/providers.py @@ -1367,20 +1367,26 @@ def post_hooks(self) -> List[Dict[str, Any]]: @contextproperty def sql(self) -> Optional[str]: # only doing this in sql model for backward compatible - if ( - getattr(self.model, "extra_ctes_injected", None) - and self.model.language == ModelLanguage.sql # type: ignore[union-attr] - ): - # TODO CT-211 - return self.model.compiled_code # type: ignore[union-attr] - return None + if self.model.language == ModelLanguage.sql: # type: ignore[union-attr] + if getattr(self.model, "defer_relation", None): + return f"select * from {self.model.defer_relation.database}.{self.model.defer_relation.schema}.{self.model.defer_relation.alias}" # type: ignore[union-attr] + elif getattr(self.model, "extra_ctes_injected", None): + # TODO CT-211 + return self.model.compiled_code # type: ignore[union-attr] + else: + return None + else: + return None @contextproperty def compiled_code(self) -> Optional[str]: - if getattr(self.model, "extra_ctes_injected", None): + if getattr(self.model, "defer_relation", None): + return f"select * from {self.model.defer_relation.database}.{self.model.defer_relation.schema}.{self.model.defer_relation.alias}" # type: ignore[union-attr] + elif getattr(self.model, "extra_ctes_injected", None): # TODO CT-211 return self.model.compiled_code # type: ignore[union-attr] - return None + else: + return None @contextproperty def database(self) -> str: diff --git a/core/dbt/include/global_project/macros/materializations/models/clone/clone.sql b/core/dbt/include/global_project/macros/materializations/models/clone/clone.sql index f46f94fbf6d..fd1549da13e 100644 --- a/core/dbt/include/global_project/macros/materializations/models/clone/clone.sql +++ b/core/dbt/include/global_project/macros/materializations/models/clone/clone.sql @@ -46,11 +46,6 @@ {%- set target_relation = this.incorporate(type='view') -%} - {%- do context.update({ - 'sql': get_clone_target(defer_relation), - 'compiled_code': get_clone_target(defer_relation) - }) -%} - -- reuse the view materialization -- TODO: support actual dispatch for materialization macros {% set search_name = "materialization_view_" ~ adapter.type() %} diff --git a/core/dbt/include/global_project/macros/materializations/models/clone/get_clone_target.sql b/core/dbt/include/global_project/macros/materializations/models/clone/get_clone_target.sql deleted file mode 100644 index 46a3d740135..00000000000 --- a/core/dbt/include/global_project/macros/materializations/models/clone/get_clone_target.sql +++ /dev/null @@ -1,10 +0,0 @@ -{% macro get_clone_target(to_relation) %} - {{ return(adapter.dispatch('get_clone_target', 'dbt')(to_relation)) }} -{% endmacro %} - -{% macro default__get_clone_target(to_relation) %} - {% set target_sql %} - select * from {{ to_relation }} - {% endset %} - {{ return(target_sql) }} -{% endmacro %} From 6a8ecb0aa53e11e05a6aae2ea8d757c7d6fd7d14 Mon Sep 17 00:00:00 2001 From: Matthew McKnight Date: Tue, 27 Jun 2023 11:56:18 -0500 Subject: [PATCH 17/24] add clean_up method to drop alt schema names after tests run --- .../tests/adapter/dbt_clone/test_dbt_clone.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/tests/adapter/dbt/tests/adapter/dbt_clone/test_dbt_clone.py b/tests/adapter/dbt/tests/adapter/dbt_clone/test_dbt_clone.py index ca1474819fa..a7d1c6cd400 100644 --- a/tests/adapter/dbt/tests/adapter/dbt_clone/test_dbt_clone.py +++ b/tests/adapter/dbt/tests/adapter/dbt_clone/test_dbt_clone.py @@ -1,7 +1,7 @@ +import pytest import os import shutil from copy import deepcopy -import pytest from collections import Counter from dbt.exceptions import DbtRuntimeError from dbt.tests.util import run_dbt @@ -200,4 +200,18 @@ def test_can_clone_false(self, project, unique_schema, other_schema): class TestPostgresCloneNotPossible(BaseCloneNotPossible): + @pytest.fixture(autouse=True) + def clean_up(self, project): + yield + with project.adapter.connection_named("__test"): + relation = project.adapter.Relation.create( + database=project.database, schema=f"{project.test_schema}_seeds" + ) + project.adapter.drop_schema(relation) + + relation = project.adapter.Relation.create( + database=project.database, schema=project.test_schema + ) + project.adapter.drop_schema(relation) + pass From 5be28bc85268c1093ce2ba625ae81d905c2fb266 Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Tue, 27 Jun 2023 12:20:44 -0500 Subject: [PATCH 18/24] Add context to comments --- core/dbt/context/providers.py | 2 ++ .../macros/materializations/models/clone/clone.sql | 1 + 2 files changed, 3 insertions(+) diff --git a/core/dbt/context/providers.py b/core/dbt/context/providers.py index d7b7ce71b2e..a620b63b2f2 100644 --- a/core/dbt/context/providers.py +++ b/core/dbt/context/providers.py @@ -1368,6 +1368,8 @@ def post_hooks(self) -> List[Dict[str, Any]]: def sql(self) -> Optional[str]: # only doing this in sql model for backward compatible if self.model.language == ModelLanguage.sql: # type: ignore[union-attr] + # If the model is deferred and the adapter doesn't support zero-copy cloning, then select * from the prod + # relation if getattr(self.model, "defer_relation", None): return f"select * from {self.model.defer_relation.database}.{self.model.defer_relation.schema}.{self.model.defer_relation.alias}" # type: ignore[union-attr] elif getattr(self.model, "extra_ctes_injected", None): diff --git a/core/dbt/include/global_project/macros/materializations/models/clone/clone.sql b/core/dbt/include/global_project/macros/materializations/models/clone/clone.sql index fd1549da13e..b78ca9d01ab 100644 --- a/core/dbt/include/global_project/macros/materializations/models/clone/clone.sql +++ b/core/dbt/include/global_project/macros/materializations/models/clone/clone.sql @@ -48,6 +48,7 @@ -- reuse the view materialization -- TODO: support actual dispatch for materialization macros + -- Tracking ticket: https://github.com/dbt-labs/dbt-core/issues/7799 {% set search_name = "materialization_view_" ~ adapter.type() %} {% if not search_name in context %} {% set search_name = "materialization_view_default" %} From 6efe6e348bf2420710378629f8650031be2c5a62 Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Wed, 28 Jun 2023 10:13:26 -0500 Subject: [PATCH 19/24] Use relation_name instead of constructing string --- core/dbt/context/providers.py | 4 ++-- core/dbt/contracts/graph/manifest.py | 10 +++++++--- core/dbt/contracts/graph/nodes.py | 14 ++------------ 3 files changed, 11 insertions(+), 17 deletions(-) diff --git a/core/dbt/context/providers.py b/core/dbt/context/providers.py index 180ea8fade5..ffe8a663611 100644 --- a/core/dbt/context/providers.py +++ b/core/dbt/context/providers.py @@ -1377,7 +1377,7 @@ def sql(self) -> Optional[str]: # If the model is deferred and the adapter doesn't support zero-copy cloning, then select * from the prod # relation if getattr(self.model, "defer_relation", None): - return f"select * from {self.model.defer_relation.database}.{self.model.defer_relation.schema}.{self.model.defer_relation.alias}" # type: ignore[union-attr] + return f"select * from {self.model.defer_relation.relation_name}" # type: ignore[union-attr] elif getattr(self.model, "extra_ctes_injected", None): # TODO CT-211 return self.model.compiled_code # type: ignore[union-attr] @@ -1389,7 +1389,7 @@ def sql(self) -> Optional[str]: @contextproperty def compiled_code(self) -> Optional[str]: if getattr(self.model, "defer_relation", None): - return f"select * from {self.model.defer_relation.database}.{self.model.defer_relation.schema}.{self.model.defer_relation.alias}" # type: ignore[union-attr] + return f"select * from {self.model.defer_relation.relation_name}" # type: ignore[union-attr] elif getattr(self.model, "extra_ctes_injected", None): # TODO CT-211 return self.model.compiled_code # type: ignore[union-attr] diff --git a/core/dbt/contracts/graph/manifest.py b/core/dbt/contracts/graph/manifest.py index 479a54b44c5..5ce23373707 100644 --- a/core/dbt/contracts/graph/manifest.py +++ b/core/dbt/contracts/graph/manifest.py @@ -35,7 +35,7 @@ ManifestNode, Metric, ModelNode, - RelationalNode, + DeferRelation, ResultNode, SemanticModel, SourceDefinition, @@ -1177,8 +1177,12 @@ def add_from_artifact( refables = set(NodeType.refable()) for unique_id, node in other.nodes.items(): current = self.nodes.get(unique_id) - if current and (node.resource_type in refables and not node.is_ephemeral): - defer_relation = RelationalNode(node.database, node.schema, node.alias) + if current and ( + node.resource_type in refables and not node.is_ephemeral and node.relation_name + ): + defer_relation = DeferRelation( + node.database, node.schema, node.alias, node.relation_name + ) self.nodes[unique_id] = current.replace(defer_relation=defer_relation) # Methods that were formerly in ParseResult diff --git a/core/dbt/contracts/graph/nodes.py b/core/dbt/contracts/graph/nodes.py index f9de4d52d50..f3dbe66da55 100644 --- a/core/dbt/contracts/graph/nodes.py +++ b/core/dbt/contracts/graph/nodes.py @@ -258,8 +258,9 @@ def add_macro(self, value: str): @dataclass -class RelationalNode(HasRelationMetadata): +class DeferRelation(HasRelationMetadata): alias: str + relation_name: str @property def identifier(self): @@ -275,17 +276,6 @@ def add_node(self, value: str): self.nodes.append(value) -@dataclass -class DeferRelation(dbtClassMixin): - alias: str - database: Optional[str] - schema: str - - @property - def identifier(self): - return self.alias - - @dataclass class ParsedNodeMandatory(GraphNode, HasRelationMetadata, Replaceable): alias: str From 81dad80b5b4c08ba47091e56a0d6c36a0f0a0d82 Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Wed, 28 Jun 2023 12:10:19 -0500 Subject: [PATCH 20/24] Fix add_from_artifact test --- tests/unit/test_manifest.py | 73 +++++++++++++++++++------------------ 1 file changed, 38 insertions(+), 35 deletions(-) diff --git a/tests/unit/test_manifest.py b/tests/unit/test_manifest.py index f64c1ce722c..9c3e1bcea02 100644 --- a/tests/unit/test_manifest.py +++ b/tests/unit/test_manifest.py @@ -664,41 +664,6 @@ def test_deepcopy_copies_flat_graph(self): copy = original.deepcopy() self.assertEqual(original.flat_graph, copy.flat_graph) - def test_add_from_artifact(self): - original_nodes = deepcopy(self.nested_nodes) - other_nodes = deepcopy(self.nested_nodes) - - nested2 = other_nodes.pop("model.root.nested") - nested2.name = "nested2" - nested2.alias = "nested2" - nested2.fqn = ["root", "nested2"] - - other_nodes["model.root.nested2"] = nested2 - - for k, v in other_nodes.items(): - v.database = "other_" + v.database - v.schema = "other_" + v.schema - v.alias = "other_" + v.alias - - other_nodes[k] = v - - original_manifest = Manifest(nodes=original_nodes) - other_manifest = Manifest(nodes=other_nodes) - original_manifest.add_from_artifact(other_manifest.writable_manifest()) - - # new node added should not be in original manifest - assert "model.root.nested2" not in original_manifest.nodes - - # old node removed should not have state relation in original manifest - assert original_manifest.nodes["model.root.nested"].defer_relation is None - - # for all other nodes, check that state relation is updated - for k, v in original_manifest.nodes.items(): - if k != "model.root.nested": - self.assertEqual("other_" + v.database, v.defer_relation.database) - self.assertEqual("other_" + v.schema, v.defer_relation.schema) - self.assertEqual("other_" + v.alias, v.defer_relation.alias) - class MixedManifestTest(unittest.TestCase): def setUp(self): @@ -1039,6 +1004,44 @@ def test_build_flat_graph(self): self.assertEqual(frozenset(node), REQUIRED_PARSED_NODE_KEYS) self.assertEqual(compiled_count, 2) + def test_add_from_artifact(self): + original_nodes = deepcopy(self.nested_nodes) + other_nodes = deepcopy(self.nested_nodes) + + nested2 = other_nodes.pop("model.root.nested") + nested2.name = "nested2" + nested2.alias = "nested2" + nested2.fqn = ["root", "nested2"] + + other_nodes["model.root.nested2"] = nested2 + + for k, v in other_nodes.items(): + v.database = "other_" + v.database + v.schema = "other_" + v.schema + v.alias = "other_" + v.alias + if v.relation_name: + v.relation_name = "other_" + v.relation_name + + other_nodes[k] = v + + original_manifest = Manifest(nodes=original_nodes) + other_manifest = Manifest(nodes=other_nodes) + original_manifest.add_from_artifact(other_manifest.writable_manifest()) + + # new node added should not be in original manifest + assert "model.root.nested2" not in original_manifest.nodes + + # old node removed should not have state relation in original manifest + assert original_manifest.nodes["model.root.nested"].defer_relation is None + + # for all other nodes, check that state relation is updated + for k, v in original_manifest.nodes.items(): + if v.defer_relation: + self.assertEqual("other_" + v.database, v.defer_relation.database) + self.assertEqual("other_" + v.schema, v.defer_relation.schema) + self.assertEqual("other_" + v.alias, v.defer_relation.alias) + self.assertEqual("other_" + v.relation_name, v.defer_relation.relation_name) + # Tests of the manifest search code (find_X_by_Y) From 88a726b0afb425055e0ad00b62205edd1e7a8524 Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Wed, 28 Jun 2023 12:19:10 -0500 Subject: [PATCH 21/24] remove node.relation_name check --- core/dbt/contracts/graph/manifest.py | 4 +--- core/dbt/contracts/graph/nodes.py | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/core/dbt/contracts/graph/manifest.py b/core/dbt/contracts/graph/manifest.py index a9c37fd50bc..a2254dc9f08 100644 --- a/core/dbt/contracts/graph/manifest.py +++ b/core/dbt/contracts/graph/manifest.py @@ -1217,9 +1217,7 @@ def add_from_artifact( refables = set(NodeType.refable()) for unique_id, node in other.nodes.items(): current = self.nodes.get(unique_id) - if current and ( - node.resource_type in refables and not node.is_ephemeral and node.relation_name - ): + if current and (node.resource_type in refables and not node.is_ephemeral): defer_relation = DeferRelation( node.database, node.schema, node.alias, node.relation_name ) diff --git a/core/dbt/contracts/graph/nodes.py b/core/dbt/contracts/graph/nodes.py index 41a4f455e9c..8b633feb5a1 100644 --- a/core/dbt/contracts/graph/nodes.py +++ b/core/dbt/contracts/graph/nodes.py @@ -260,7 +260,7 @@ def add_macro(self, value: str): @dataclass class DeferRelation(HasRelationMetadata): alias: str - relation_name: str + relation_name: Optional[str] @property def identifier(self): From 020cde55571e1f86b8912783b1daedf51a4e1753 Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Wed, 28 Jun 2023 12:46:14 -0500 Subject: [PATCH 22/24] add if v.relation_name test --- tests/unit/test_manifest.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/unit/test_manifest.py b/tests/unit/test_manifest.py index 9c3e1bcea02..843d2c9bf90 100644 --- a/tests/unit/test_manifest.py +++ b/tests/unit/test_manifest.py @@ -1040,7 +1040,8 @@ def test_add_from_artifact(self): self.assertEqual("other_" + v.database, v.defer_relation.database) self.assertEqual("other_" + v.schema, v.defer_relation.schema) self.assertEqual("other_" + v.alias, v.defer_relation.alias) - self.assertEqual("other_" + v.relation_name, v.defer_relation.relation_name) + if v.relation_name: + self.assertEqual("other_" + v.relation_name, v.defer_relation.relation_name) # Tests of the manifest search code (find_X_by_Y) From e761aa00f88b4964de31cc915a5398966f5a12f2 Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Wed, 28 Jun 2023 14:04:59 -0500 Subject: [PATCH 23/24] fix seed relation_name bug --- core/dbt/context/providers.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/dbt/context/providers.py b/core/dbt/context/providers.py index f18cf553f68..4cfaa142e25 100644 --- a/core/dbt/context/providers.py +++ b/core/dbt/context/providers.py @@ -1382,7 +1382,8 @@ def sql(self) -> Optional[str]: # If the model is deferred and the adapter doesn't support zero-copy cloning, then select * from the prod # relation if getattr(self.model, "defer_relation", None): - return f"select * from {self.model.defer_relation.relation_name}" # type: ignore[union-attr] + # TODO https://github.com/dbt-labs/dbt-core/issues/7976 + return f"select * from {self.model.defer_relation.relation_name or str(self.defer_relation)}" # type: ignore[union-attr] elif getattr(self.model, "extra_ctes_injected", None): # TODO CT-211 return self.model.compiled_code # type: ignore[union-attr] @@ -1394,7 +1395,8 @@ def sql(self) -> Optional[str]: @contextproperty def compiled_code(self) -> Optional[str]: if getattr(self.model, "defer_relation", None): - return f"select * from {self.model.defer_relation.relation_name}" # type: ignore[union-attr] + # TODO https://github.com/dbt-labs/dbt-core/issues/7976 + return f"select * from {self.model.defer_relation.relation_name or str(self.defer_relation)}" # type: ignore[union-attr] elif getattr(self.model, "extra_ctes_injected", None): # TODO CT-211 return self.model.compiled_code # type: ignore[union-attr] From cd78b6475faa696eb686dc1150289f5733838d4e Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Wed, 28 Jun 2023 14:34:16 -0500 Subject: [PATCH 24/24] Skip `test_semantic_model_deleted_partial_parsing` --- tests/functional/semantic_models/test_semantic_model_parsing.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/functional/semantic_models/test_semantic_model_parsing.py b/tests/functional/semantic_models/test_semantic_model_parsing.py index b77d4a2ac21..734031f229c 100644 --- a/tests/functional/semantic_models/test_semantic_model_parsing.py +++ b/tests/functional/semantic_models/test_semantic_model_parsing.py @@ -111,6 +111,7 @@ def test_semantic_model_changed_partial_parsing(self, project): semantic_model = manifest.semantic_models["semantic_model.test.revenue"] assert semantic_model.dimensions[0].type_params.time_granularity == TimeGranularity.WEEK + @pytest.mark.skip(reason="Tracking in https://github.com/dbt-labs/dbt-core/issues/7977") def test_semantic_model_deleted_partial_parsing(self, project): # First, use the default schema.yml to define our semantic model, and # run the dbt parse command