Skip to content

Commit

Permalink
create adapter zone vesions of dbt_clone tests to be inherited by oth…
Browse files Browse the repository at this point in the history
…er adapters
  • Loading branch information
McKnight-42 committed Jun 21, 2023
1 parent 7327283 commit 5a77754
Show file tree
Hide file tree
Showing 3 changed files with 310 additions and 77 deletions.
106 changes: 106 additions & 0 deletions tests/adapter/dbt/tests/adapter/dbt_clone/fixtures.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
seed_csv = """id,name
1,Alice
2,Bob
"""

table_model_sql = """
{{ config(materialized='table') }}
select * from {{ ref('ephemeral_model') }}
-- establish a macro dependency to trigger state:modified.macros
-- depends on: {{ my_macro() }}
"""

view_model_sql = """
select * from {{ ref('seed') }}
-- establish a macro dependency that trips infinite recursion if not handled
-- depends on: {{ my_infinitely_recursive_macro() }}
"""

ephemeral_model_sql = """
{{ config(materialized='ephemeral') }}
select * from {{ ref('view_model') }}
"""

exposures_yml = """
version: 2
exposures:
- name: my_exposure
type: application
depends_on:
- ref('view_model')
owner:
email: test@example.com
"""

schema_yml = """
version: 2
models:
- name: view_model
columns:
- name: id
tests:
- unique:
severity: error
- not_null
- name: name
"""

get_schema_name_sql = """
{% macro generate_schema_name(custom_schema_name, node) -%}
{%- set default_schema = target.schema -%}
{%- if custom_schema_name is not none -%}
{{ return(default_schema ~ '_' ~ custom_schema_name|trim) }}
-- put seeds into a separate schema in "prod", to verify that cloning in "dev" still works
{%- elif target.name == 'default' and node.resource_type == 'seed' -%}
{{ return(default_schema ~ '_' ~ 'seeds') }}
{%- else -%}
{{ return(default_schema) }}
{%- endif -%}
{%- endmacro %}
"""

snapshot_sql = """
{% snapshot my_cool_snapshot %}
{{
config(
target_database=database,
target_schema=schema,
unique_key='id',
strategy='check',
check_cols=['id'],
)
}}
select * from {{ ref('view_model') }}
{% endsnapshot %}
"""
macros_sql = """
{% macro my_macro() %}
{% do log('in a macro' ) %}
{% endmacro %}
"""

infinite_macros_sql = """
{# trigger infinite recursion if not handled #}
{% macro my_infinitely_recursive_macro() %}
{{ return(adapter.dispatch('my_infinitely_recursive_macro')()) }}
{% endmacro %}
{% macro default__my_infinitely_recursive_macro() %}
{% if unmet_condition %}
{{ my_infinitely_recursive_macro() }}
{% else %}
{{ return('') }}
{% endif %}
{% endmacro %}
"""

custom_can_clone_tables_false_macros_sql = """
{% macro can_clone_tables() %}
{{ return(False) }}
{% endmacro %}
"""
204 changes: 204 additions & 0 deletions tests/adapter/dbt/tests/adapter/dbt_clone/test_dbt_clone.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
import os
import shutil
from copy import deepcopy
import pytest
from collections import Counter
from dbt.exceptions import DbtRuntimeError
from dbt.tests.util import run_dbt
from dbt.tests.adapter.dbt_clone.fixtures import (
seed_csv,
table_model_sql,
view_model_sql,
ephemeral_model_sql,
exposures_yml,
schema_yml,
snapshot_sql,
get_schema_name_sql,
macros_sql,
infinite_macros_sql,
custom_can_clone_tables_false_macros_sql,
)


class BaseClone:
@pytest.fixture(scope="class")
def models(self):
return {
"table_model.sql": table_model_sql,
"view_model.sql": view_model_sql,
"ephemeral_model.sql": ephemeral_model_sql,
"schema.yml": schema_yml,
"exposures.yml": exposures_yml,
}

@pytest.fixture(scope="class")
def macros(self):
return {
"macros.sql": macros_sql,
"infinite_macros.sql": infinite_macros_sql,
"get_schema_name.sql": get_schema_name_sql,
}

@pytest.fixture(scope="class")
def seeds(self):
return {
"seed.csv": seed_csv,
}

@pytest.fixture(scope="class")
def snapshots(self):
return {
"snapshot.sql": snapshot_sql,
}

@pytest.fixture(scope="class")
def other_schema(self, unique_schema):
return unique_schema + "_other"

@property
def project_config_update(self):
return {
"seeds": {
"test": {
"quote_columns": False,
}
}
}

@pytest.fixture(scope="class")
def profiles_config_update(self, dbt_profile_target, unique_schema, other_schema):
outputs = {"default": dbt_profile_target, "otherschema": deepcopy(dbt_profile_target)}
outputs["default"]["schema"] = unique_schema
outputs["otherschema"]["schema"] = other_schema
return {"test": {"outputs": outputs, "target": "default"}}

def copy_state(self, project_root):
state_path = os.path.join(project_root, "state")
if not os.path.exists(state_path):
os.makedirs(state_path)
shutil.copyfile(
f"{project_root}/target/manifest.json", f"{project_root}/state/manifest.json"
)

def run_and_save_state(self, project_root, with_snapshot=False):
results = run_dbt(["seed"])
assert len(results) == 1
assert not any(r.node.deferred for r in results)
results = run_dbt(["run"])
assert len(results) == 2
assert not any(r.node.deferred for r in results)
results = run_dbt(["test"])
assert len(results) == 2

if with_snapshot:
results = run_dbt(["snapshot"])
assert len(results) == 1
assert not any(r.node.deferred for r in results)

# copy files
self.copy_state(project_root)


# -- Below we define base classes for tests you import the one based on if your adapter uses dbt clone or not --
class BaseClonePossible(BaseClone):
def test_can_clone_true(self, project, unique_schema, other_schema):
project.create_test_schema(other_schema)
self.run_and_save_state(project.project_root, with_snapshot=True)

clone_args = [
"clone",
"--state",
"state",
"--target",
"otherschema",
]

results = run_dbt(clone_args)
assert len(results) == 4

# assert all("create view" in r.message.lower() for r in results)
schema_relations = project.adapter.list_relations(
database=project.database, schema=other_schema
)
types = [r.type for r in schema_relations]
count_types = Counter(types)
assert count_types == Counter({"table": 3, "view": 1})

# objects already exist, so this is a no-op
results = run_dbt(clone_args)
assert len(results) == 4
assert all("no-op" in r.message.lower() for r in results)

# recreate all objects
results = run_dbt([*clone_args, "--full-refresh"])
assert len(results) == 4

# select only models this time
results = run_dbt([*clone_args, "--resource-type", "model"])
assert len(results) == 2
assert all("no-op" in r.message.lower() for r in results)

def test_clone_no_state(self, project, unique_schema, other_schema):
project.create_test_schema(other_schema)
self.run_and_save_state(project.project_root, with_snapshot=True)

clone_args = [
"clone",
"--target",
"otherschema",
]

with pytest.raises(
DbtRuntimeError,
match="--state is required for cloning relations from another environment",
):
run_dbt(clone_args)


class BaseCloneNotPossible(BaseClone):
@pytest.fixture(scope="class")
def macros(self):
return {
"macros.sql": macros_sql,
"my_can_clone_tables.sql": custom_can_clone_tables_false_macros_sql,
"infinite_macros.sql": infinite_macros_sql,
"get_schema_name.sql": get_schema_name_sql,
}

def test_can_clone_false(self, project, unique_schema, other_schema):
project.create_test_schema(other_schema)
self.run_and_save_state(project.project_root, with_snapshot=True)

clone_args = [
"clone",
"--state",
"state",
"--target",
"otherschema",
]

results = run_dbt(clone_args)
assert len(results) == 4

schema_relations = project.adapter.list_relations(
database=project.database, schema=other_schema
)
assert all(r.type == "view" for r in schema_relations)

# objects already exist, so this is a no-op
results = run_dbt(clone_args)
assert len(results) == 4
assert all("no-op" in r.message.lower() for r in results)

# recreate all objects
results = run_dbt([*clone_args, "--full-refresh"])
assert len(results) == 4

# select only models this time
results = run_dbt([*clone_args, "--resource-type", "model"])
assert len(results) == 2
assert all("no-op" in r.message.lower() for r in results)


class TestPostgresCloneNotPossible(BaseCloneNotPossible):
pass
77 changes: 0 additions & 77 deletions tests/functional/defer_state/test_defer_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,80 +348,3 @@ def test_defer_state_flag(self, project, unique_schema, other_schema):
assert results.results[0].status == RunStatus.Success
assert results.results[0].node.name == "table_model"
assert results.results[0].adapter_response["rows_affected"] == 2


get_schema_name_sql = """
{% macro generate_schema_name(custom_schema_name, node) -%}
{%- set default_schema = target.schema -%}
{%- if custom_schema_name is not none -%}
{{ return(default_schema ~ '_' ~ custom_schema_name|trim) }}
-- put seeds into a separate schema in "prod", to verify that cloning in "dev" still works
{%- elif target.name == 'default' and node.resource_type == 'seed' -%}
{{ return(default_schema ~ '_' ~ 'seeds') }}
{%- else -%}
{{ return(default_schema) }}
{%- endif -%}
{%- endmacro %}
"""


class TestCloneToOther(BaseDeferState):
@pytest.fixture(scope="class")
def macros(self):
return {
"macros.sql": macros_sql,
"infinite_macros.sql": infinite_macros_sql,
"get_schema_name.sql": get_schema_name_sql,
}

def test_clone(self, project, unique_schema, other_schema):
project.create_test_schema(other_schema)
self.run_and_save_state(project.project_root, with_snapshot=True)

clone_args = [
"clone",
"--state",
"state",
"--target",
"otherschema",
]

results = run_dbt(clone_args)
assert len(results) == 4

assert all("create view" in r.message.lower() for r in results)
schema_relations = project.adapter.list_relations(
database=project.database, schema=other_schema
)
assert all(r.type == "view" for r in schema_relations)

# objects already exist, so this is a no-op
results = run_dbt(clone_args)
assert len(results) == 4
assert all("no-op" in r.message.lower() for r in results)

# recreate all objects
results = run_dbt([*clone_args, "--full-refresh"])
assert len(results) == 4
assert all("create view" in r.message.lower() for r in results)

# select only models this time
results = run_dbt([*clone_args, "--resource-type", "model"])
assert len(results) == 2
assert all("no-op" in r.message.lower() for r in results)

def test_clone_no_state(self, project, unique_schema, other_schema):
project.create_test_schema(other_schema)
self.run_and_save_state(project.project_root, with_snapshot=True)

clone_args = [
"clone",
"--target",
"otherschema",
]

with pytest.raises(
DbtRuntimeError,
match="--state or --defer-state are required for deferral, but neither was provided",
):
run_dbt(clone_args)

0 comments on commit 5a77754

Please sign in to comment.