From 416cc72498986c99a683003cc3a0c44272253000 Mon Sep 17 00:00:00 2001 From: Jacob Beck Date: Wed, 10 Apr 2019 10:07:32 -0600 Subject: [PATCH] Implmement check_cols Contracts: some anyOf shenanigans to add support for check_cols Macros: split apart archive selection, probably too much copy+paste Legacy: Archive configs now include a "timestamp" strategy when parsed from dbt_project.yml Add integration tests fix aliases test Unquote columns in archives handle null columns attr -> use_profile --- core/dbt/adapters/base/impl.py | 2 +- core/dbt/contracts/graph/parsed.py | 41 +- .../materializations/archive/archive.sql | 197 +++++++--- .../macros/materializations/helpers.sql | 4 +- core/dbt/parser/archives.py | 2 + .../macros/materializations/archive.sql | 11 +- .../invalidate_postgres.sql | 22 +- .../invalidate_snowflake.sql | 10 +- .../004_simple_archive_test/seed.sql | 278 ++++++------- .../004_simple_archive_test/seed_bq.sql | 80 ++-- .../test-archives-invalid/archive.sql | 8 +- .../test-archives-pg/archive.sql | 4 +- .../test-archives-select/archives.sql | 12 +- .../test-check-col-archives-bq/archive.sql | 27 ++ .../test-check-col-archives/archive.sql | 28 ++ .../test_simple_archive.py | 177 +++++++-- .../004_simple_archive_test/update.sql | 370 +++++++++--------- .../004_simple_archive_test/update_bq.sql | 94 ++--- .../test_custom_aliases.py | 7 +- 19 files changed, 840 insertions(+), 534 deletions(-) create mode 100644 test/integration/004_simple_archive_test/test-check-col-archives-bq/archive.sql create mode 100644 test/integration/004_simple_archive_test/test-check-col-archives/archive.sql diff --git a/core/dbt/adapters/base/impl.py b/core/dbt/adapters/base/impl.py index 69321ad7bd0..1207645fa83 100644 --- a/core/dbt/adapters/base/impl.py +++ b/core/dbt/adapters/base/impl.py @@ -550,7 +550,7 @@ def valid_archive_target(self, relation): expected_type=self.Relation) columns = self.get_columns_in_relation(relation) - names = set(c.name for c in columns) + names = set(c.name.lower() for c in columns) expanded_keys = ('scd_id', 'valid_from', 'valid_to') extra = [] missing = [] diff --git a/core/dbt/contracts/graph/parsed.py b/core/dbt/contracts/graph/parsed.py index 6741d095f16..d0e77c20ab5 100644 --- a/core/dbt/contracts/graph/parsed.py +++ b/core/dbt/contracts/graph/parsed.py @@ -452,9 +452,44 @@ def config(self, value): 'unique_key': { 'type': 'string', }, - 'strategy': { - 'enum': ['timestamp'], - }, + 'anyOf': [ + { + 'properties': { + 'strategy': { + 'enum': ['timestamp'], + }, + 'updated_at': { + 'type': 'string', + 'description': ( + 'The column name with the timestamp to compare' + ), + }, + }, + 'required': ['updated_at'], + }, + { + 'properties': { + 'strategy': { + 'enum': ['check'], + }, + 'check_cols': { + 'oneOf': [ + { + 'type': 'array', + 'items': {'type': 'string'}, + 'description': 'The columns to check', + 'minLength': 1, + }, + { + 'enum': ['all'], + 'description': 'Check all columns', + }, + ], + }, + }, + 'required': ['check_cols'], + } + ] }, 'required': [ 'target_database', 'target_schema', 'unique_key', 'strategy', diff --git a/core/dbt/include/global_project/macros/materializations/archive/archive.sql b/core/dbt/include/global_project/macros/materializations/archive/archive.sql index 83d4bd3d755..6e3e96c5f59 100644 --- a/core/dbt/include/global_project/macros/materializations/archive/archive.sql +++ b/core/dbt/include/global_project/macros/materializations/archive/archive.sql @@ -2,12 +2,12 @@ Create SCD Hash SQL fields cross-db #} -{% macro archive_scd_hash() %} - {{ adapter_macro('archive_scd_hash') }} +{% macro archive_hash_arguments(args) %} + {{ adapter_macro('archive_hash_arguments', args) }} {% endmacro %} -{% macro default__archive_scd_hash() %} - md5("dbt_pk" || '|' || "dbt_updated_at") +{% macro default__archive_hash_arguments(args) %} + md5({% for arg in args %}coalesce(cast({{ arg }} as varchar ), '') {% if not loop.last %} || '|' || {% endif %}{% endfor %}) {% endmacro %} {% macro create_temporary_table(sql, relation) %} @@ -48,33 +48,61 @@ {% macro default__archive_update(target_relation, tmp_relation) %} update {{ target_relation }} - set {{ adapter.quote('dbt_valid_to') }} = tmp.{{ adapter.quote('dbt_valid_to') }} + set dbt_valid_to = tmp.dbt_valid_to from {{ tmp_relation }} as tmp - where tmp.{{ adapter.quote('dbt_scd_id') }} = {{ target_relation }}.{{ adapter.quote('dbt_scd_id') }} - and {{ adapter.quote('change_type') }} = 'update'; + where tmp.dbt_scd_id = {{ target_relation }}.dbt_scd_id + and change_type = 'update'; {% endmacro %} -{# - Cross-db compatible archival implementation -#} -{% macro archive_select(source_sql, target_relation, source_columns, unique_key, updated_at) %} +{% macro archive_get_time() -%} + {{ adapter_macro('archive_get_time') }} +{%- endmacro %} - {% set timestamp_column = api.Column.create('_', 'timestamp') %} +{% macro default__archive_get_time() -%} + {{ current_timestamp() }} +{%- endmacro %} + +{% macro snowflake__archive_get_time() -%} + to_timestamp_ntz({{ current_timestamp() }}) +{%- endmacro %} + + +{% macro archive_select_generic(source_sql, target_relation, transforms, scd_hash) -%} with source as ( {{ source_sql }} ), + {{ transforms }} + merged as ( + + select *, 'update' as change_type from updates + union all + select *, 'insert' as change_type from insertions + + ) + + select *, + {{ scd_hash }} as dbt_scd_id + from merged +{%- endmacro %} + +{# + Cross-db compatible archival implementation +#} +{% macro archive_select_timestamp(source_sql, target_relation, source_columns, unique_key, updated_at) -%} + {% set timestamp_column = api.Column.create('_', 'timestamp') %} + {% set transforms -%} current_data as ( select {% for col in source_columns %} - {{ adapter.quote(col.name) }} {% if not loop.last %},{% endif %} + {{ col.name }} {% if not loop.last %},{% endif %} {% endfor %}, - {{ updated_at }} as {{ adapter.quote('dbt_updated_at') }}, - {{ unique_key }} as {{ adapter.quote('dbt_pk') }}, - {{ updated_at }} as {{ adapter.quote('dbt_valid_from') }}, - {{ timestamp_column.literal('null') }} as {{ adapter.quote('tmp_valid_to') }} + {{ updated_at }} as dbt_updated_at, + {{ unique_key }} as dbt_pk, + {{ updated_at }} as dbt_valid_from, + {{ timestamp_column.literal('null') }} as tmp_valid_to from source ), @@ -82,12 +110,12 @@ select {% for col in source_columns %} - {{ adapter.quote(col.name) }}, + {{ col.name }}, {% endfor %} - {{ updated_at }} as {{ adapter.quote('dbt_updated_at') }}, - {{ unique_key }} as {{ adapter.quote('dbt_pk') }}, - {{ adapter.quote('dbt_valid_from') }}, - {{ adapter.quote('dbt_valid_to') }} as {{ adapter.quote('tmp_valid_to') }} + {{ updated_at }} as dbt_updated_at, + {{ unique_key }} as dbt_pk, + dbt_valid_from, + dbt_valid_to as tmp_valid_to from {{ target_relation }} ), @@ -96,14 +124,16 @@ select current_data.*, - {{ timestamp_column.literal('null') }} as {{ adapter.quote('dbt_valid_to') }} + {{ timestamp_column.literal('null') }} as dbt_valid_to from current_data left outer join archived_data - on archived_data.{{ adapter.quote('dbt_pk') }} = current_data.{{ adapter.quote('dbt_pk') }} - where archived_data.{{ adapter.quote('dbt_pk') }} is null or ( - archived_data.{{ adapter.quote('dbt_pk') }} is not null and - current_data.{{ adapter.quote('dbt_updated_at') }} > archived_data.{{ adapter.quote('dbt_updated_at') }} and - archived_data.{{ adapter.quote('tmp_valid_to') }} is null + on archived_data.dbt_pk = current_data.dbt_pk + where + archived_data.dbt_pk is null + or ( + archived_data.dbt_pk is not null + and archived_data.dbt_updated_at < current_data.dbt_updated_at + and archived_data.tmp_valid_to is null ) ), @@ -111,29 +141,93 @@ select archived_data.*, - current_data.{{ adapter.quote('dbt_updated_at') }} as {{ adapter.quote('dbt_valid_to') }} + current_data.dbt_updated_at as dbt_valid_to from current_data left outer join archived_data - on archived_data.{{ adapter.quote('dbt_pk') }} = current_data.{{ adapter.quote('dbt_pk') }} - where archived_data.{{ adapter.quote('dbt_pk') }} is not null - and archived_data.{{ adapter.quote('dbt_updated_at') }} < current_data.{{ adapter.quote('dbt_updated_at') }} - and archived_data.{{ adapter.quote('tmp_valid_to') }} is null + on archived_data.dbt_pk = current_data.dbt_pk + where archived_data.dbt_pk is not null + and archived_data.dbt_updated_at < current_data.dbt_updated_at + and archived_data.tmp_valid_to is null ), + {%- endset %} + {%- set scd_hash = archive_hash_arguments(['dbt_pk', 'dbt_updated_at']) -%} + {{ archive_select_generic(source_sql, target_relation, transforms, scd_hash) }} +{%- endmacro %} + + +{% macro archive_select_check_cols(source_sql, target_relation, source_columns, unique_key, check_cols) -%} + {%- set timestamp_column = api.Column.create('_', 'timestamp') -%} + + {# if we recognize the primary key, it's the newest record, and anything we care about has changed, it's an update candidate #} + {%- set update_candidate -%} + archived_data.dbt_pk is not null + and ( + {%- for col in check_cols %} + current_data.{{ col }} <> archived_data.{{ col }} + {%- if not loop.last %} or {% endif %} + {% endfor -%} + ) + and archived_data.tmp_valid_to is null + {%- endset %} - merged as ( + {% set transforms -%} + current_data as ( - select *, 'update' as {{ adapter.quote('change_type') }} from updates - union all - select *, 'insert' as {{ adapter.quote('change_type') }} from insertions + select + {% for col in source_columns %} + {{ col.name }} {% if not loop.last %},{% endif %} + {% endfor %}, + {{ archive_get_time() }} as dbt_updated_at, + {{ unique_key }} as dbt_pk, + {{ archive_get_time() }} as dbt_valid_from, + {{ timestamp_column.literal('null') }} as tmp_valid_to + from source + ), - ) + archived_data as ( - select *, - {{ archive_scd_hash() }} as {{ adapter.quote('dbt_scd_id') }} - from merged + select + {% for col in source_columns %} + {{ col.name }}, + {% endfor %} + dbt_updated_at, + {{ unique_key }} as dbt_pk, + dbt_valid_from, + dbt_valid_to as tmp_valid_to + from {{ target_relation }} -{% endmacro %} + ), + + insertions as ( + + select + current_data.*, + {{ timestamp_column.literal('null') }} as dbt_valid_to + from current_data + left outer join archived_data + on archived_data.dbt_pk = current_data.dbt_pk + where + archived_data.dbt_pk is null + or ( {{ update_candidate }} ) + ), + + updates as ( + + select + archived_data.*, + {{ archive_get_time() }} as dbt_valid_to + from current_data + left outer join archived_data + on archived_data.dbt_pk = current_data.dbt_pk + where {{ update_candidate }} + ), + {%- endset %} + {%- set hash_components = ['dbt_pk'] %} + {%- do hash_components.extend(check_cols) -%} + {%- set scd_hash = archive_hash_arguments(hash_components) -%} + {{ archive_select_generic(source_sql, target_relation, transforms, scd_hash) }} +{%- endmacro %} {# this is gross #} {% macro create_empty_table_as(sql) %} @@ -157,6 +251,7 @@ {%- set target_database = config.get('target_database') -%} {%- set target_schema = config.get('target_schema') -%} {%- set target_table = model.get('alias', model.get('name')) -%} + {%- set strategy = config.get('strategy') -%} {{ create_schema(target_database, target_schema) }} @@ -179,7 +274,6 @@ {%- set source_columns = adapter.get_columns_in_relation(source_info_model) -%} {%- set unique_key = config.get('unique_key') -%} - {%- set updated_at = config.get('updated_at') -%} {%- set dest_columns = source_columns + [ api.Column.create('dbt_valid_from', 'timestamp'), api.Column.create('dbt_valid_to', 'timestamp'), @@ -187,7 +281,6 @@ api.Column.create('dbt_updated_at', 'timestamp'), ] -%} - {% call statement() %} {{ create_archive_table(target_relation, dest_columns) }} {% endcall %} @@ -204,7 +297,19 @@ {% set tmp_table_sql -%} with dbt_archive_sbq as ( - {{ archive_select(model['injected_sql'], target_relation, source_columns, unique_key, updated_at) }} + + {% if strategy == 'timestamp' %} + {%- set updated_at = config.get('updated_at') -%} + {{ archive_select_timestamp(model['injected_sql'], target_relation, source_columns, unique_key, updated_at) }} + {% elif strategy == 'check' %} + {%- set check_cols = config.get('check_cols') -%} + {% if check_cols == 'all' %} + {% set check_cols = source_columns | map(attribute='name') | list %} + {% endif %} + {{ archive_select_check_cols(model['injected_sql'], target_relation, source_columns, unique_key, check_cols)}} + {% else %} + {{ exceptions.raise_compiler_error('Got invalid strategy "{}"'.format(strategy)) }} + {% endif %} ) select * from dbt_archive_sbq @@ -226,7 +331,7 @@ {{ column_list(dest_columns) }} ) select {{ column_list(dest_columns) }} from {{ tmp_relation }} - where {{ adapter.quote('change_type') }} = 'insert'; + where change_type = 'insert'; {% endcall %} {{ adapter.commit() }} diff --git a/core/dbt/include/global_project/macros/materializations/helpers.sql b/core/dbt/include/global_project/macros/materializations/helpers.sql index c2eadfdab39..da78eb93506 100644 --- a/core/dbt/include/global_project/macros/materializations/helpers.sql +++ b/core/dbt/include/global_project/macros/materializations/helpers.sql @@ -14,14 +14,14 @@ {% macro column_list(columns) %} {%- for col in columns %} - {{ adapter.quote(col.name) }} {% if not loop.last %},{% endif %} + {{ col.name }} {% if not loop.last %},{% endif %} {% endfor -%} {% endmacro %} {% macro column_list_for_create_table(columns) %} {%- for col in columns %} - {{ adapter.quote(col.name) }} {{ col.data_type }} {%- if not loop.last %},{% endif %} + {{ col.name }} {{ col.data_type }} {%- if not loop.last %},{% endif %} {% endfor -%} {% endmacro %} diff --git a/core/dbt/parser/archives.py b/core/dbt/parser/archives.py index 4be53bbafe4..981570a48da 100644 --- a/core/dbt/parser/archives.py +++ b/core/dbt/parser/archives.py @@ -36,6 +36,8 @@ def parse_archives_from_project(cls, config): source_schema = archive_config['source_schema'] cfg['target_schema'] = archive_config.get('target_schema') + # project-defined archives always use the 'timestamp' strategy. + cfg['strategy'] = 'timestamp' fake_path = [cfg['target_database'], cfg['target_schema'], cfg['target_table']] diff --git a/plugins/bigquery/dbt/include/bigquery/macros/materializations/archive.sql b/plugins/bigquery/dbt/include/bigquery/macros/materializations/archive.sql index 551667649f1..7a95f440f83 100644 --- a/plugins/bigquery/dbt/include/bigquery/macros/materializations/archive.sql +++ b/plugins/bigquery/dbt/include/bigquery/macros/materializations/archive.sql @@ -4,11 +4,10 @@ {% endmacro %} -{% macro bigquery__archive_scd_hash() %} - to_hex(md5(concat(cast(`dbt_pk` as string), '|', cast(`dbt_updated_at` as string)))) +{% macro bigquery__archive_hash_arguments(args) %} + to_hex(md5(concat({% for arg in args %}coalesce(cast({{ arg }} as string), ''){% if not loop.last %}, '|',{% endif %}{% endfor %}))) {% endmacro %} - {% macro bigquery__create_columns(relation, columns) %} {{ adapter.alter_table_add_columns(relation, columns) }} {% endmacro %} @@ -16,8 +15,8 @@ {% macro bigquery__archive_update(target_relation, tmp_relation) %} update {{ target_relation }} as dest - set dest.{{ adapter.quote('dbt_valid_to') }} = tmp.{{ adapter.quote('dbt_valid_to') }} + set dest.dbt_valid_to = tmp.dbt_valid_to from {{ tmp_relation }} as tmp - where tmp.{{ adapter.quote('dbt_scd_id') }} = dest.{{ adapter.quote('dbt_scd_id') }} - and {{ adapter.quote('change_type') }} = 'update'; + where tmp.dbt_scd_id = dest.dbt_scd_id + and change_type = 'update'; {% endmacro %} diff --git a/test/integration/004_simple_archive_test/invalidate_postgres.sql b/test/integration/004_simple_archive_test/invalidate_postgres.sql index 693fa228880..491afccfac2 100644 --- a/test/integration/004_simple_archive_test/invalidate_postgres.sql +++ b/test/integration/004_simple_archive_test/invalidate_postgres.sql @@ -1,27 +1,27 @@ -- update records 11 - 21. Change email and updated_at field update {schema}.seed set - "updated_at" = "updated_at" + interval '1 hour', - "email" = 'new_' || "email" -where "id" >= 10 and "id" <= 20; + updated_at = updated_at + interval '1 hour', + email = 'new_' || email +where id >= 10 and id <= 20; -- invalidate records 11 - 21 update {schema}.archive_expected set - "dbt_valid_to" = "updated_at" + interval '1 hour' -where "id" >= 10 and "id" <= 20; + dbt_valid_to = updated_at + interval '1 hour' +where id >= 10 and id <= 20; update {schema}.archive_castillo_expected set - "dbt_valid_to" = "updated_at" + interval '1 hour' -where "id" >= 10 and "id" <= 20; + dbt_valid_to = updated_at + interval '1 hour' +where id >= 10 and id <= 20; update {schema}.archive_alvarez_expected set - "dbt_valid_to" = "updated_at" + interval '1 hour' -where "id" >= 10 and "id" <= 20; + dbt_valid_to = updated_at + interval '1 hour' +where id >= 10 and id <= 20; update {schema}.archive_kelly_expected set - "dbt_valid_to" = "updated_at" + interval '1 hour' -where "id" >= 10 and "id" <= 20; + dbt_valid_to = updated_at + interval '1 hour' +where id >= 10 and id <= 20; diff --git a/test/integration/004_simple_archive_test/invalidate_snowflake.sql b/test/integration/004_simple_archive_test/invalidate_snowflake.sql index edf22d42e4b..86e3a3c7a40 100644 --- a/test/integration/004_simple_archive_test/invalidate_snowflake.sql +++ b/test/integration/004_simple_archive_test/invalidate_snowflake.sql @@ -1,12 +1,12 @@ -- update records 11 - 21. Change email and updated_at field update {database}.{schema}.seed set - "updated_at" = DATEADD(hour, 1, "updated_at"), - "email" = 'new_' || "email" -where "id" >= 10 and "id" <= 20; + updated_at = DATEADD(hour, 1, updated_at), + email = 'new_' || email +where id >= 10 and id <= 20; -- invalidate records 11 - 21 update {database}.{schema}.archive_expected set - "dbt_valid_to" = DATEADD(hour, 1, "updated_at") -where "id" >= 10 and "id" <= 20; + dbt_valid_to = DATEADD(hour, 1, updated_at) +where id >= 10 and id <= 20; diff --git a/test/integration/004_simple_archive_test/seed.sql b/test/integration/004_simple_archive_test/seed.sql index f1b103abcd9..9edda7a3565 100644 --- a/test/integration/004_simple_archive_test/seed.sql +++ b/test/integration/004_simple_archive_test/seed.sql @@ -1,32 +1,32 @@ create table {database}.{schema}.seed ( - "id" INTEGER, - "first_name" VARCHAR(50), - "last_name" VARCHAR(50), - "email" VARCHAR(50), - "gender" VARCHAR(50), - "ip_address" VARCHAR(20), - "updated_at" TIMESTAMP WITHOUT TIME ZONE + id INTEGER, + first_name VARCHAR(50), + last_name VARCHAR(50), + email VARCHAR(50), + gender VARCHAR(50), + ip_address VARCHAR(20), + updated_at TIMESTAMP WITHOUT TIME ZONE ); create table {database}.{schema}.archive_expected ( - "id" INTEGER, - "first_name" VARCHAR(50), - "last_name" VARCHAR(50), - "email" VARCHAR(50), - "gender" VARCHAR(50), - "ip_address" VARCHAR(20), + id INTEGER, + first_name VARCHAR(50), + last_name VARCHAR(50), + email VARCHAR(50), + gender VARCHAR(50), + ip_address VARCHAR(20), -- archival fields - "updated_at" TIMESTAMP WITHOUT TIME ZONE, - "dbt_valid_from" TIMESTAMP WITHOUT TIME ZONE, - "dbt_valid_to" TIMESTAMP WITHOUT TIME ZONE, - "dbt_scd_id" VARCHAR(255), - "dbt_updated_at" TIMESTAMP WITHOUT TIME ZONE + updated_at TIMESTAMP WITHOUT TIME ZONE, + dbt_valid_from TIMESTAMP WITHOUT TIME ZONE, + dbt_valid_to TIMESTAMP WITHOUT TIME ZONE, + dbt_scd_id VARCHAR(255), + dbt_updated_at TIMESTAMP WITHOUT TIME ZONE ); -- seed inserts -insert into {database}.{schema}.seed ("id", "first_name", "last_name", "email", "gender", "ip_address", "updated_at") values +insert into {database}.{schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values (1, 'Judith', 'Kennedy', 'jkennedy0@phpbb.com', 'Female', '54.60.24.128', '2015-12-24 12:19:28'), (2, 'Arthur', 'Kelly', 'akelly1@eepurl.com', 'Male', '62.56.24.215', '2015-10-28 16:22:15'), (3, 'Rachel', 'Moreno', 'rmoreno2@msu.edu', 'Female', '31.222.249.23', '2016-04-05 02:05:30'), @@ -51,171 +51,171 @@ insert into {database}.{schema}.seed ("id", "first_name", "last_name", "email", -- populate archive table insert into {database}.{schema}.archive_expected ( - "id", - "first_name", - "last_name", - "email", - "gender", - "ip_address", - "updated_at", - "dbt_valid_from", - "dbt_valid_to", - "dbt_updated_at", - "dbt_scd_id" + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + dbt_valid_from, + dbt_valid_to, + dbt_updated_at, + dbt_scd_id ) select - "id", - "first_name", - "last_name", - "email", - "gender", - "ip_address", - "updated_at", + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, -- fields added by archival - "updated_at" as dbt_valid_from, + updated_at as dbt_valid_from, null::timestamp as dbt_valid_to, - "updated_at" as dbt_updated_at, - md5("id" || '-' || "first_name" || '|' || "updated_at"::text) as dbt_scd_id + updated_at as dbt_updated_at, + md5(id || '-' || first_name || '|' || updated_at::text) as dbt_scd_id from {database}.{schema}.seed; create table {database}.{schema}.archive_castillo_expected ( - "id" INTEGER, - "first_name" VARCHAR(50), - "last_name" VARCHAR(50), - "email" VARCHAR(50), - "gender" VARCHAR(50), - "ip_address" VARCHAR(20), + id INTEGER, + first_name VARCHAR(50), + last_name VARCHAR(50), + email VARCHAR(50), + gender VARCHAR(50), + ip_address VARCHAR(20), -- archival fields - "updated_at" TIMESTAMP WITHOUT TIME ZONE, - "dbt_valid_from" TIMESTAMP WITHOUT TIME ZONE, - "dbt_valid_to" TIMESTAMP WITHOUT TIME ZONE, - "dbt_scd_id" VARCHAR(255), - "dbt_updated_at" TIMESTAMP WITHOUT TIME ZONE + updated_at TIMESTAMP WITHOUT TIME ZONE, + dbt_valid_from TIMESTAMP WITHOUT TIME ZONE, + dbt_valid_to TIMESTAMP WITHOUT TIME ZONE, + dbt_scd_id VARCHAR(255), + dbt_updated_at TIMESTAMP WITHOUT TIME ZONE ); -- one entry insert into {database}.{schema}.archive_castillo_expected ( - "id", - "first_name", - "last_name", - "email", - "gender", - "ip_address", - "updated_at", - "dbt_valid_from", - "dbt_valid_to", - "dbt_updated_at", - "dbt_scd_id" + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + dbt_valid_from, + dbt_valid_to, + dbt_updated_at, + dbt_scd_id ) select - "id", - "first_name", - "last_name", - "email", - "gender", - "ip_address", - "updated_at", + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, -- fields added by archival - "updated_at" as dbt_valid_from, + updated_at as dbt_valid_from, null::timestamp as dbt_valid_to, - "updated_at" as dbt_updated_at, - md5("id" || '-' || "first_name" || '|' || "updated_at"::text) as dbt_scd_id -from {database}.{schema}.seed where "last_name" = 'Castillo'; + updated_at as dbt_updated_at, + md5(id || '-' || first_name || '|' || updated_at::text) as dbt_scd_id +from {database}.{schema}.seed where last_name = 'Castillo'; create table {database}.{schema}.archive_alvarez_expected ( - "id" INTEGER, - "first_name" VARCHAR(50), - "last_name" VARCHAR(50), - "email" VARCHAR(50), - "gender" VARCHAR(50), - "ip_address" VARCHAR(20), + id INTEGER, + first_name VARCHAR(50), + last_name VARCHAR(50), + email VARCHAR(50), + gender VARCHAR(50), + ip_address VARCHAR(20), -- archival fields - "updated_at" TIMESTAMP WITHOUT TIME ZONE, - "dbt_valid_from" TIMESTAMP WITHOUT TIME ZONE, - "dbt_valid_to" TIMESTAMP WITHOUT TIME ZONE, - "dbt_scd_id" VARCHAR(255), - "dbt_updated_at" TIMESTAMP WITHOUT TIME ZONE + updated_at TIMESTAMP WITHOUT TIME ZONE, + dbt_valid_from TIMESTAMP WITHOUT TIME ZONE, + dbt_valid_to TIMESTAMP WITHOUT TIME ZONE, + dbt_scd_id VARCHAR(255), + dbt_updated_at TIMESTAMP WITHOUT TIME ZONE ); -- 0 entries insert into {database}.{schema}.archive_alvarez_expected ( - "id", - "first_name", - "last_name", - "email", - "gender", - "ip_address", - "updated_at", - "dbt_valid_from", - "dbt_valid_to", - "dbt_updated_at", - "dbt_scd_id" + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + dbt_valid_from, + dbt_valid_to, + dbt_updated_at, + dbt_scd_id ) select - "id", - "first_name", - "last_name", - "email", - "gender", - "ip_address", - "updated_at", + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, -- fields added by archival - "updated_at" as dbt_valid_from, + updated_at as dbt_valid_from, null::timestamp as dbt_valid_to, - "updated_at" as dbt_updated_at, - md5("id" || '-' || "first_name" || '|' || "updated_at"::text) as dbt_scd_id -from {database}.{schema}.seed where "last_name" = 'Alvarez'; + updated_at as dbt_updated_at, + md5(id || '-' || first_name || '|' || updated_at::text) as dbt_scd_id +from {database}.{schema}.seed where last_name = 'Alvarez'; create table {database}.{schema}.archive_kelly_expected ( - "id" INTEGER, - "first_name" VARCHAR(50), - "last_name" VARCHAR(50), - "email" VARCHAR(50), - "gender" VARCHAR(50), - "ip_address" VARCHAR(20), + id INTEGER, + first_name VARCHAR(50), + last_name VARCHAR(50), + email VARCHAR(50), + gender VARCHAR(50), + ip_address VARCHAR(20), -- archival fields - "updated_at" TIMESTAMP WITHOUT TIME ZONE, - "dbt_valid_from" TIMESTAMP WITHOUT TIME ZONE, - "dbt_valid_to" TIMESTAMP WITHOUT TIME ZONE, - "dbt_scd_id" VARCHAR(255), - "dbt_updated_at" TIMESTAMP WITHOUT TIME ZONE + updated_at TIMESTAMP WITHOUT TIME ZONE, + dbt_valid_from TIMESTAMP WITHOUT TIME ZONE, + dbt_valid_to TIMESTAMP WITHOUT TIME ZONE, + dbt_scd_id VARCHAR(255), + dbt_updated_at TIMESTAMP WITHOUT TIME ZONE ); -- 2 entries insert into {database}.{schema}.archive_kelly_expected ( - "id", - "first_name", - "last_name", - "email", - "gender", - "ip_address", - "updated_at", - "dbt_valid_from", - "dbt_valid_to", - "dbt_updated_at", - "dbt_scd_id" + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + dbt_valid_from, + dbt_valid_to, + dbt_updated_at, + dbt_scd_id ) select - "id", - "first_name", - "last_name", - "email", - "gender", - "ip_address", - "updated_at", + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, -- fields added by archival - "updated_at" as dbt_valid_from, + updated_at as dbt_valid_from, null::timestamp as dbt_valid_to, - "updated_at" as dbt_updated_at, - md5("id" || '-' || "first_name" || '|' || "updated_at"::text) as dbt_scd_id -from {database}.{schema}.seed where "last_name" = 'Kelly'; + updated_at as dbt_updated_at, + md5(id || '-' || first_name || '|' || updated_at::text) as dbt_scd_id +from {database}.{schema}.seed where last_name = 'Kelly'; diff --git a/test/integration/004_simple_archive_test/seed_bq.sql b/test/integration/004_simple_archive_test/seed_bq.sql index ab2b6010a9f..9ef91799604 100644 --- a/test/integration/004_simple_archive_test/seed_bq.sql +++ b/test/integration/004_simple_archive_test/seed_bq.sql @@ -1,32 +1,32 @@ create table {database}.{schema}.seed ( - `id` INT64, - `first_name` STRING, - `last_name` STRING, - `email` STRING, - `gender` STRING, - `ip_address` STRING, - `updated_at` TIMESTAMP + id INT64, + first_name STRING, + last_name STRING, + email STRING, + gender STRING, + ip_address STRING, + updated_at TIMESTAMP ); create table {database}.{schema}.archive_expected ( - `id` INT64, - `first_name` STRING, - `last_name` STRING, - `email` STRING, - `gender` STRING, - `ip_address` STRING, + id INT64, + first_name STRING, + last_name STRING, + email STRING, + gender STRING, + ip_address STRING, -- archival fields - `updated_at` TIMESTAMP, - `dbt_valid_from` TIMESTAMP, - `dbt_valid_to` TIMESTAMP, - `dbt_scd_id` STRING, - `dbt_updated_at` TIMESTAMP + updated_at TIMESTAMP, + dbt_valid_from TIMESTAMP, + dbt_valid_to TIMESTAMP, + dbt_scd_id STRING, + dbt_updated_at TIMESTAMP ); -- seed inserts -insert {database}.{schema}.seed (`id`, `first_name`, `last_name`, `email`, `gender`, `ip_address`, `updated_at`) values +insert {database}.{schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values (1, 'Judith', 'Kennedy', 'jkennedy0@phpbb.com', 'Female', '54.60.24.128', '2015-12-24 12:19:28'), (2, 'Arthur', 'Kelly', 'akelly1@eepurl.com', 'Male', '62.56.24.215', '2015-10-28 16:22:15'), (3, 'Rachel', 'Moreno', 'rmoreno2@msu.edu', 'Female', '31.222.249.23', '2016-04-05 02:05:30'), @@ -51,31 +51,31 @@ insert {database}.{schema}.seed (`id`, `first_name`, `last_name`, `email`, `gend -- populate archive table insert {database}.{schema}.archive_expected ( - `id`, - `first_name`, - `last_name`, - `email`, - `gender`, - `ip_address`, - `updated_at`, - `dbt_valid_from`, - `dbt_valid_to`, - `dbt_updated_at`, - `dbt_scd_id` + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + dbt_valid_from, + dbt_valid_to, + dbt_updated_at, + dbt_scd_id ) select - `id`, - `first_name`, - `last_name`, - `email`, - `gender`, - `ip_address`, - `updated_at`, + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, -- fields added by archival - `updated_at` as dbt_valid_from, + updated_at as dbt_valid_from, cast(null as timestamp) as dbt_valid_to, - `updated_at` as dbt_updated_at, - to_hex(md5(concat(cast(`id` as string), '-', `first_name`, '|', cast(`updated_at` as string)))) as dbt_scd_id + updated_at as dbt_updated_at, + to_hex(md5(concat(cast(id as string), '-', first_name, '|', cast(updated_at as string)))) as dbt_scd_id from {database}.{schema}.seed; diff --git a/test/integration/004_simple_archive_test/test-archives-invalid/archive.sql b/test/integration/004_simple_archive_test/test-archives-invalid/archive.sql index fc8d9408bf4..3bbe49664c1 100644 --- a/test/integration/004_simple_archive_test/test-archives-invalid/archive.sql +++ b/test/integration/004_simple_archive_test/test-archives-invalid/archive.sql @@ -1,12 +1,10 @@ -{% archive archive_sad %} - - {# missing target_database #} +{% archive no_target_database %} {{ config( target_schema=schema, - unique_key='"id" || ' ~ "'-'" ~ ' || "first_name"', + unique_key='id || ' ~ "'-'" ~ ' || first_name', strategy='timestamp', - updated_at='"updated_at"', + updated_at='updated_at', ) }} select * from {{database}}.{{schema}}.seed diff --git a/test/integration/004_simple_archive_test/test-archives-pg/archive.sql b/test/integration/004_simple_archive_test/test-archives-pg/archive.sql index eaf156500a4..9117a8df1a4 100644 --- a/test/integration/004_simple_archive_test/test-archives-pg/archive.sql +++ b/test/integration/004_simple_archive_test/test-archives-pg/archive.sql @@ -4,9 +4,9 @@ config( target_database=var('target_database', database), target_schema=schema, - unique_key='"id" || ' ~ "'-'" ~ ' || "first_name"', + unique_key='id || ' ~ "'-'" ~ ' || first_name', strategy='timestamp', - updated_at='"updated_at"', + updated_at='updated_at', ) }} select * from {{database}}.{{schema}}.seed diff --git a/test/integration/004_simple_archive_test/test-archives-select/archives.sql b/test/integration/004_simple_archive_test/test-archives-select/archives.sql index 58f5b630603..30e78fe720d 100644 --- a/test/integration/004_simple_archive_test/test-archives-select/archives.sql +++ b/test/integration/004_simple_archive_test/test-archives-select/archives.sql @@ -4,9 +4,9 @@ config( target_database=var('target_database', database), target_schema=schema, - unique_key='"id" || ' ~ "'-'" ~ ' || "first_name"', + unique_key='id || ' ~ "'-'" ~ ' || first_name', strategy='timestamp', - updated_at='"updated_at"', + updated_at='updated_at', ) }} select * from {{database}}.{{schema}}.seed where last_name = 'Castillo' @@ -19,9 +19,9 @@ config( target_database=var('target_database', database), target_schema=schema, - unique_key='"id" || ' ~ "'-'" ~ ' || "first_name"', + unique_key='id || ' ~ "'-'" ~ ' || first_name', strategy='timestamp', - updated_at='"updated_at"', + updated_at='updated_at', ) }} select * from {{database}}.{{schema}}.seed where last_name = 'Alvarez' @@ -35,9 +35,9 @@ config( target_database=var('target_database', database), target_schema=schema, - unique_key='"id" || ' ~ "'-'" ~ ' || "first_name"', + unique_key='id || ' ~ "'-'" ~ ' || first_name', strategy='timestamp', - updated_at='"updated_at"', + updated_at='updated_at', ) }} select * from {{database}}.{{schema}}.seed where last_name = 'Kelly' diff --git a/test/integration/004_simple_archive_test/test-check-col-archives-bq/archive.sql b/test/integration/004_simple_archive_test/test-check-col-archives-bq/archive.sql new file mode 100644 index 00000000000..40a2563291f --- /dev/null +++ b/test/integration/004_simple_archive_test/test-check-col-archives-bq/archive.sql @@ -0,0 +1,27 @@ +{% archive archive_actual %} + {{ + config( + target_database=var('target_database', database), + target_schema=schema, + unique_key='concat(cast(id as string) , "-", first_name)', + strategy='check', + check_cols=('email',), + ) + }} + select * from `{{database}}`.`{{schema}}`.seed +{% endarchive %} + + +{# This should be exactly the same #} +{% archive archive_checkall %} + {{ + config( + target_database=var('target_database', database), + target_schema=schema, + unique_key='concat(cast(id as string) , "-", first_name)', + strategy='check', + check_cols='all', + ) + }} + select * from `{{database}}`.`{{schema}}`.seed +{% endarchive %} diff --git a/test/integration/004_simple_archive_test/test-check-col-archives/archive.sql b/test/integration/004_simple_archive_test/test-check-col-archives/archive.sql new file mode 100644 index 00000000000..c3ee6fe2038 --- /dev/null +++ b/test/integration/004_simple_archive_test/test-check-col-archives/archive.sql @@ -0,0 +1,28 @@ +{% archive archive_actual %} + + {{ + config( + target_database=var('target_database', database), + target_schema=schema, + unique_key='id || ' ~ "'-'" ~ ' || first_name', + strategy='check', + check_cols=['email'], + ) + }} + select * from {{database}}.{{schema}}.seed + +{% endarchive %} + +{# This should be exactly the same #} +{% archive archive_checkall %} + {{ + config( + target_database=var('target_database', database), + target_schema=schema, + unique_key='id || ' ~ "'-'" ~ ' || first_name', + strategy='check', + check_cols='all', + ) + }} + select * from {{database}}.{{schema}}.seed +{% endarchive %} diff --git a/test/integration/004_simple_archive_test/test_simple_archive.py b/test/integration/004_simple_archive_test/test_simple_archive.py index 51479fdf229..e2b36b53a28 100644 --- a/test/integration/004_simple_archive_test/test_simple_archive.py +++ b/test/integration/004_simple_archive_test/test_simple_archive.py @@ -1,9 +1,10 @@ -from nose.plugins.attrib import attr -from test.integration.base import DBTIntegrationTest +from test.integration.base import DBTIntegrationTest, use_profile import dbt.exceptions class TestSimpleArchive(DBTIntegrationTest): + NUM_ARCHIVE_MODELS = 1 + @property def schema(self): return "simple_archive_004" @@ -32,8 +33,8 @@ def project_config(self): { "source_table": source_table, "target_table": "archive_actual", - "updated_at": '"updated_at"', - "unique_key": '''"id" || '-' || "first_name"''' + "updated_at": 'updated_at', + "unique_key": '''id || '-' || first_name''' }, ], }, @@ -44,58 +45,67 @@ def dbt_run_seed_archive(self): self.run_sql_file('test/integration/004_simple_archive_test/seed.sql') results = self.run_archive() - self.assertEqual(len(results), 1) + self.assertEqual(len(results), self.NUM_ARCHIVE_MODELS) + + def assert_case_tables_equal(self, actual, expected): + if self.adapter_type == 'snowflake': + actual = actual.upper() + expected = expected.upper() + self.assertTablesEqual(actual, expected) - @attr(type='postgres') + def assert_expected(self): + self.assert_case_tables_equal('archive_actual', 'archive_expected') + + @use_profile('postgres') def test__postgres__simple_archive(self): self.dbt_run_seed_archive() - self.assertTablesEqual("archive_expected","archive_actual") + self.assert_expected() self.run_sql_file("test/integration/004_simple_archive_test/invalidate_postgres.sql") self.run_sql_file("test/integration/004_simple_archive_test/update.sql") results = self.run_archive() - self.assertEqual(len(results), 1) + self.assertEqual(len(results), self.NUM_ARCHIVE_MODELS) - self.assertTablesEqual("archive_expected","archive_actual") + self.assert_expected() - @attr(type='snowflake') + @use_profile('snowflake') def test__snowflake__simple_archive(self): self.dbt_run_seed_archive() - self.assertTablesEqual("ARCHIVE_EXPECTED", "ARCHIVE_ACTUAL") + self.assert_expected() self.run_sql_file("test/integration/004_simple_archive_test/invalidate_snowflake.sql") self.run_sql_file("test/integration/004_simple_archive_test/update.sql") results = self.run_archive() - self.assertEqual(len(results), 1) + self.assertEqual(len(results), self.NUM_ARCHIVE_MODELS) - self.assertTablesEqual("ARCHIVE_EXPECTED", "ARCHIVE_ACTUAL") + self.assert_expected() - @attr(type='redshift') + @use_profile('redshift') def test__redshift__simple_archive(self): self.dbt_run_seed_archive() - self.assertTablesEqual("archive_expected","archive_actual") + self.assert_expected() self.run_sql_file("test/integration/004_simple_archive_test/invalidate_postgres.sql") self.run_sql_file("test/integration/004_simple_archive_test/update.sql") results = self.run_archive() - self.assertEqual(len(results), 1) + self.assertEqual(len(results), self.NUM_ARCHIVE_MODELS) - self.assertTablesEqual("archive_expected","archive_actual") + self.assert_expected() - @attr(type='presto') + @use_profile('presto') def test__presto__simple_archive_disabled(self): results = self.run_dbt(["seed"]) - self.assertEqual(len(results), 1) + self.assertEqual(len(results), self.NUM_ARCHIVE_MODELS) # presto does not run archives results = self.run_dbt(["archive"], expect_pass=False) - self.assertEqual(len(results), 1) + self.assertEqual(len(results), self.NUM_ARCHIVE_MODELS) self.assertIn('not implemented for presto', results[0].error) @@ -128,7 +138,10 @@ def project_config(self): ] } - @attr(type='bigquery') + def assert_expected(self): + self.assertTablesEqual('archive_actual', 'archive_expected') + + @use_profile('bigquery') def test__bigquery__simple_archive(self): self.use_default_project() self.use_profile('bigquery') @@ -137,17 +150,17 @@ def test__bigquery__simple_archive(self): self.run_dbt(["archive"]) - self.assertTablesEqual("archive_expected", "archive_actual") + self.assert_expected() self.run_sql_file("test/integration/004_simple_archive_test/invalidate_bigquery.sql") self.run_sql_file("test/integration/004_simple_archive_test/update_bq.sql") self.run_dbt(["archive"]) - self.assertTablesEqual("archive_expected", "archive_actual") + self.assert_expected() - @attr(type='bigquery') + @use_profile('bigquery') def test__bigquery__archive_with_new_field(self): self.use_default_project() self.use_profile('bigquery') @@ -209,8 +222,8 @@ def archive_project_config(self): return { "source_table": 'SEED', "target_table": "archive_actual", - "updated_at": '"updated_at"', - "unique_key": '''"id" || '-' || "first_name"''' + "updated_at": 'updated_at', + "unique_key": '''id || '-' || first_name''' } else: return { @@ -236,7 +249,7 @@ def project_config(self): def run_archive(self): return self.run_dbt(['archive']) - @attr(type='snowflake') + @use_profile('snowflake') def test__snowflake__cross_archive(self): self.run_sql_file("test/integration/004_simple_archive_test/seed.sql") @@ -253,7 +266,7 @@ def test__snowflake__cross_archive(self): self.assertTablesEqual("ARCHIVE_EXPECTED", "ARCHIVE_ACTUAL", table_b_db=self.alternative_database) - @attr(type='bigquery') + @use_profile('bigquery') def test__bigquery__cross_archive(self): self.run_sql_file("test/integration/004_simple_archive_test/seed_bq.sql") @@ -277,7 +290,7 @@ def project_config(self): "archive-paths": ['test/integration/004_simple_archive_test/test-archives-pg'], } - @attr(type='postgres') + @use_profile('postgres') def test__postgres_ref_archive(self): self.dbt_run_seed_archive() results = self.run_dbt(['run']) @@ -301,7 +314,7 @@ def project_config(self): 'test/integration/004_simple_archive_test/test-archives-pg'], } - @attr(type='postgres') + @use_profile('postgres') def test__postgres__select_archives(self): self.run_sql_file('test/integration/004_simple_archive_test/seed.sql') @@ -322,7 +335,7 @@ def test__postgres__select_archives(self): self.assertTablesEqual('archive_kelly', 'archive_kelly_expected') self.assertTablesEqual('archive_actual', 'archive_expected') - @attr(type='postgres') + @use_profile('postgres') def test__postgres_exclude_archives(self): self.run_sql_file('test/integration/004_simple_archive_test/seed.sql') results = self.run_dbt(['archive', '--exclude', 'archive_castillo']) @@ -332,7 +345,7 @@ def test__postgres_exclude_archives(self): self.assertTablesEqual('archive_kelly', 'archive_kelly_expected') self.assertTablesEqual('archive_actual', 'archive_expected') - @attr(type='postgres') + @use_profile('postgres') def test__postgres_select_archives(self): self.run_sql_file('test/integration/004_simple_archive_test/seed.sql') results = self.run_dbt(['archive', '--models', 'archive_castillo']) @@ -381,9 +394,107 @@ def project_config(self): "archive-paths": ['test/integration/004_simple_archive_test/test-archives-invalid'], } - @attr(type='postgres') + @use_profile('postgres') def test__postgres__invalid(self): with self.assertRaises(dbt.exceptions.CompilationException) as exc: self.run_dbt(['compile'], expect_pass=False) self.assertIn('target_database', str(exc.exception)) + + +class TestCheckCols(TestSimpleArchiveFiles): + NUM_ARCHIVE_MODELS = 2 + def _assertTablesEqualSql(self, relation_a, relation_b, columns=None): + # When building the equality tests, only test columns that don't start + # with 'dbt_', because those are time-sensitive + if columns is None: + columns = [c for c in self.get_relation_columns(relation_a) if not c[0].lower().startswith('dbt_')] + return super(TestCheckCols, self)._assertTablesEqualSql( + relation_a, + relation_b, + columns=columns + ) + + def assert_expected(self): + super(TestCheckCols, self).assert_expected() + self.assert_case_tables_equal('archive_checkall', 'archive_expected') + + @property + def project_config(self): + return { + "data-paths": ['test/integration/004_simple_archive_test/data'], + "archive-paths": ['test/integration/004_simple_archive_test/test-check-col-archives'], + } + + +class TestCheckColsBigquery(TestSimpleArchiveFilesBigquery): + def _assertTablesEqualSql(self, relation_a, relation_b, columns=None): + # When building the equality tests, only test columns that don't start + # with 'dbt_', because those are time-sensitive + if columns is None: + columns = [c for c in self.get_relation_columns(relation_a) if not c[0].lower().startswith('dbt_')] + return super(TestCheckColsBigquery, self)._assertTablesEqualSql( + relation_a, + relation_b, + columns=columns + ) + + def assert_expected(self): + super(TestCheckColsBigquery, self).assert_expected() + self.assertTablesEqual('archive_checkall', 'archive_expected') + + @property + def project_config(self): + return { + "data-paths": ['test/integration/004_simple_archive_test/data'], + "archive-paths": ['test/integration/004_simple_archive_test/test-check-col-archives-bq'], + } + + @use_profile('bigquery') + def test__bigquery__archive_with_new_field(self): + self.use_default_project() + self.use_profile('bigquery') + + self.run_sql_file("test/integration/004_simple_archive_test/seed_bq.sql") + + self.run_dbt(["archive"]) + + self.assertTablesEqual("archive_expected", "archive_actual") + self.assertTablesEqual("archive_expected", "archive_checkall") + + self.run_sql_file("test/integration/004_simple_archive_test/invalidate_bigquery.sql") + self.run_sql_file("test/integration/004_simple_archive_test/update_bq.sql") + + # This adds new fields to the source table, and updates the expected archive output accordingly + self.run_sql_file("test/integration/004_simple_archive_test/add_column_to_source_bq.sql") + + # this should fail because `check="all"` will try to compare the nested field + self.run_dbt(['archive'], expect_pass=False) + + self.run_dbt(["archive", '-m', 'archive_actual']) + + # A more thorough test would assert that archived == expected, but BigQuery does not support the + # "EXCEPT DISTINCT" operator on nested fields! Instead, just check that schemas are congruent. + + expected_cols = self.get_table_columns( + database=self.default_database, + schema=self.unique_schema(), + table='archive_expected' + ) + archived_cols = self.get_table_columns( + database=self.default_database, + schema=self.unique_schema(), + table='archive_actual' + ) + + self.assertTrue(len(expected_cols) > 0, "source table does not exist -- bad test") + self.assertEqual(len(expected_cols), len(archived_cols), "actual and expected column lengths are different") + + for (expected_col, actual_col) in zip(expected_cols, archived_cols): + expected_name, expected_type, _ = expected_col + actual_name, actual_type, _ = actual_col + self.assertTrue(expected_name is not None) + self.assertTrue(expected_type is not None) + + self.assertEqual(expected_name, actual_name, "names are different") + self.assertEqual(expected_type, actual_type, "data types are different") diff --git a/test/integration/004_simple_archive_test/update.sql b/test/integration/004_simple_archive_test/update.sql index eef8ea703c9..0959cf9fa3f 100644 --- a/test/integration/004_simple_archive_test/update.sql +++ b/test/integration/004_simple_archive_test/update.sql @@ -1,130 +1,130 @@ -- insert v2 of the 11 - 21 records insert into {database}.{schema}.archive_expected ( - "id", - "first_name", - "last_name", - "email", - "gender", - "ip_address", - "updated_at", - "dbt_valid_from", - "dbt_valid_to", - "dbt_updated_at", - "dbt_scd_id" + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + dbt_valid_from, + dbt_valid_to, + dbt_updated_at, + dbt_scd_id ) select - "id", - "first_name", - "last_name", - "email", - "gender", - "ip_address", - "updated_at", + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, -- fields added by archival - "updated_at" as "dbt_valid_from", - null::timestamp as "dbt_valid_to", - "updated_at" as "dbt_updated_at", - md5("id" || '-' || "first_name" || '|' || "updated_at"::text) as "dbt_scd_id" + updated_at as dbt_valid_from, + null::timestamp as dbt_valid_to, + updated_at as dbt_updated_at, + md5(id || '-' || first_name || '|' || updated_at::text) as dbt_scd_id from {database}.{schema}.seed -where "id" >= 10 and "id" <= 20; +where id >= 10 and id <= 20; insert into {database}.{schema}.archive_castillo_expected ( - "id", - "first_name", - "last_name", - "email", - "gender", - "ip_address", - "updated_at", - "dbt_valid_from", - "dbt_valid_to", - "dbt_updated_at", - "dbt_scd_id" + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + dbt_valid_from, + dbt_valid_to, + dbt_updated_at, + dbt_scd_id ) select - "id", - "first_name", - "last_name", - "email", - "gender", - "ip_address", - "updated_at", + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, -- fields added by archival - "updated_at" as "dbt_valid_from", - null::timestamp as "dbt_valid_to", - "updated_at" as "dbt_updated_at", - md5("id" || '-' || "first_name" || '|' || "updated_at"::text) as "dbt_scd_id" + updated_at as dbt_valid_from, + null::timestamp as dbt_valid_to, + updated_at as dbt_updated_at, + md5(id || '-' || first_name || '|' || updated_at::text) as dbt_scd_id from {database}.{schema}.seed -where "id" >= 10 and "id" <= 20 and "last_name" = 'Castillo'; +where id >= 10 and id <= 20 and last_name = 'Castillo'; insert into {database}.{schema}.archive_alvarez_expected ( - "id", - "first_name", - "last_name", - "email", - "gender", - "ip_address", - "updated_at", - "dbt_valid_from", - "dbt_valid_to", - "dbt_updated_at", - "dbt_scd_id" + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + dbt_valid_from, + dbt_valid_to, + dbt_updated_at, + dbt_scd_id ) select - "id", - "first_name", - "last_name", - "email", - "gender", - "ip_address", - "updated_at", + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, -- fields added by archival - "updated_at" as "dbt_valid_from", - null::timestamp as "dbt_valid_to", - "updated_at" as "dbt_updated_at", - md5("id" || '-' || "first_name" || '|' || "updated_at"::text) as "dbt_scd_id" + updated_at as dbt_valid_from, + null::timestamp as dbt_valid_to, + updated_at as dbt_updated_at, + md5(id || '-' || first_name || '|' || updated_at::text) as dbt_scd_id from {database}.{schema}.seed -where "id" >= 10 and "id" <= 20 and "last_name" = 'Alvarez'; +where id >= 10 and id <= 20 and last_name = 'Alvarez'; insert into {database}.{schema}.archive_kelly_expected ( - "id", - "first_name", - "last_name", - "email", - "gender", - "ip_address", - "updated_at", - "dbt_valid_from", - "dbt_valid_to", - "dbt_updated_at", - "dbt_scd_id" + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + dbt_valid_from, + dbt_valid_to, + dbt_updated_at, + dbt_scd_id ) select - "id", - "first_name", - "last_name", - "email", - "gender", - "ip_address", - "updated_at", + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, -- fields added by archival - "updated_at" as "dbt_valid_from", - null::timestamp as "dbt_valid_to", - "updated_at" as "dbt_updated_at", - md5("id" || '-' || "first_name" || '|' || "updated_at"::text) as "dbt_scd_id" + updated_at as dbt_valid_from, + null::timestamp as dbt_valid_to, + updated_at as dbt_updated_at, + md5(id || '-' || first_name || '|' || updated_at::text) as dbt_scd_id from {database}.{schema}.seed -where "id" >= 10 and "id" <= 20 and "last_name" = 'Kelly'; +where id >= 10 and id <= 20 and last_name = 'Kelly'; -- insert 10 new records -insert into {database}.{schema}.seed ("id", "first_name", "last_name", "email", "gender", "ip_address", "updated_at") values +insert into {database}.{schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values (21, 'Judy', 'Robinson', 'jrobinsonk@blogs.com', 'Female', '208.21.192.232', '2016-09-18 08:27:38'), (22, 'Kevin', 'Alvarez', 'kalvarezl@buzzfeed.com', 'Male', '228.106.146.9', '2016-07-29 03:07:37'), (23, 'Barbara', 'Carr', 'bcarrm@pen.io', 'Female', '106.165.140.17', '2015-09-24 13:27:23'), @@ -139,123 +139,123 @@ insert into {database}.{schema}.seed ("id", "first_name", "last_name", "email", -- add these new records to the archive table insert into {database}.{schema}.archive_expected ( - "id", - "first_name", - "last_name", - "email", - "gender", - "ip_address", - "updated_at", - "dbt_valid_from", - "dbt_valid_to", - "dbt_updated_at", - "dbt_scd_id" + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + dbt_valid_from, + dbt_valid_to, + dbt_updated_at, + dbt_scd_id ) select - "id", - "first_name", - "last_name", - "email", - "gender", - "ip_address", - "updated_at", + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, -- fields added by archival - "updated_at" as "dbt_valid_from", - null::timestamp as "dbt_valid_to", - "updated_at" as "dbt_updated_at", - md5("id" || '-' || "first_name" || '|' || "updated_at"::text) as "dbt_scd_id" + updated_at as dbt_valid_from, + null::timestamp as dbt_valid_to, + updated_at as dbt_updated_at, + md5(id || '-' || first_name || '|' || updated_at::text) as dbt_scd_id from {database}.{schema}.seed -where "id" > 20; +where id > 20; -- add these new records to the archive table insert into {database}.{schema}.archive_castillo_expected ( - "id", - "first_name", - "last_name", - "email", - "gender", - "ip_address", - "updated_at", - "dbt_valid_from", - "dbt_valid_to", - "dbt_updated_at", - "dbt_scd_id" + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + dbt_valid_from, + dbt_valid_to, + dbt_updated_at, + dbt_scd_id ) select - "id", - "first_name", - "last_name", - "email", - "gender", - "ip_address", - "updated_at", + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, -- fields added by archival - "updated_at" as "dbt_valid_from", - null::timestamp as "dbt_valid_to", - "updated_at" as "dbt_updated_at", - md5("id" || '-' || "first_name" || '|' || "updated_at"::text) as "dbt_scd_id" + updated_at as dbt_valid_from, + null::timestamp as dbt_valid_to, + updated_at as dbt_updated_at, + md5(id || '-' || first_name || '|' || updated_at::text) as dbt_scd_id from {database}.{schema}.seed -where "id" > 20 and "last_name" = 'Castillo'; +where id > 20 and last_name = 'Castillo'; insert into {database}.{schema}.archive_alvarez_expected ( - "id", - "first_name", - "last_name", - "email", - "gender", - "ip_address", - "updated_at", - "dbt_valid_from", - "dbt_valid_to", - "dbt_updated_at", - "dbt_scd_id" + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + dbt_valid_from, + dbt_valid_to, + dbt_updated_at, + dbt_scd_id ) select - "id", - "first_name", - "last_name", - "email", - "gender", - "ip_address", - "updated_at", + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, -- fields added by archival - "updated_at" as "dbt_valid_from", - null::timestamp as "dbt_valid_to", - "updated_at" as "dbt_updated_at", - md5("id" || '-' || "first_name" || '|' || "updated_at"::text) as "dbt_scd_id" + updated_at as dbt_valid_from, + null::timestamp as dbt_valid_to, + updated_at as dbt_updated_at, + md5(id || '-' || first_name || '|' || updated_at::text) as dbt_scd_id from {database}.{schema}.seed -where "id" > 20 and "last_name" = 'Alvarez'; +where id > 20 and last_name = 'Alvarez'; insert into {database}.{schema}.archive_kelly_expected ( - "id", - "first_name", - "last_name", - "email", - "gender", - "ip_address", - "updated_at", - "dbt_valid_from", - "dbt_valid_to", - "dbt_updated_at", - "dbt_scd_id" + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + dbt_valid_from, + dbt_valid_to, + dbt_updated_at, + dbt_scd_id ) select - "id", - "first_name", - "last_name", - "email", - "gender", - "ip_address", - "updated_at", + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, -- fields added by archival - "updated_at" as "dbt_valid_from", - null::timestamp as "dbt_valid_to", - "updated_at" as "dbt_updated_at", - md5("id" || '-' || "first_name" || '|' || "updated_at"::text) as "dbt_scd_id" + updated_at as dbt_valid_from, + null::timestamp as dbt_valid_to, + updated_at as dbt_updated_at, + md5(id || '-' || first_name || '|' || updated_at::text) as dbt_scd_id from {database}.{schema}.seed -where "id" > 20 and "last_name" = 'Kelly'; +where id > 20 and last_name = 'Kelly'; diff --git a/test/integration/004_simple_archive_test/update_bq.sql b/test/integration/004_simple_archive_test/update_bq.sql index f7208ac3caa..aa56fb839a9 100644 --- a/test/integration/004_simple_archive_test/update_bq.sql +++ b/test/integration/004_simple_archive_test/update_bq.sql @@ -1,38 +1,38 @@ -- insert v2 of the 11 - 21 records insert {database}.{schema}.archive_expected ( - `id`, - `first_name`, - `last_name`, - `email`, - `gender`, - `ip_address`, - `updated_at`, - `dbt_valid_from`, - `dbt_valid_to`, - `dbt_updated_at`, - `dbt_scd_id` + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + dbt_valid_from, + dbt_valid_to, + dbt_updated_at, + dbt_scd_id ) select - `id`, - `first_name`, - `last_name`, - `email`, - `gender`, - `ip_address`, - `updated_at`, + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, -- fields added by archival - `updated_at` as `dbt_valid_from`, - cast(null as timestamp) as `dbt_valid_to`, - `updated_at` as `dbt_updated_at`, - to_hex(md5(concat(cast(`id` as string), '-', `first_name`, '|', cast(`updated_at` as string)))) as `dbt_scd_id` + updated_at as dbt_valid_from, + cast(null as timestamp) as dbt_valid_to, + updated_at as dbt_updated_at, + to_hex(md5(concat(cast(id as string), '-', first_name, '|', cast(updated_at as string)))) as dbt_scd_id from {database}.{schema}.seed -where `id` >= 10 and `id` <= 20; +where id >= 10 and id <= 20; -- insert 10 new records -insert into {database}.{schema}.seed (`id`, `first_name`, `last_name`, `email`, `gender`, `ip_address`, `updated_at`) values +insert into {database}.{schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values (21, 'Judy', 'Robinson', 'jrobinsonk@blogs.com', 'Female', '208.21.192.232', '2016-09-18 08:27:38'), (22, 'Kevin', 'Alvarez', 'kalvarezl@buzzfeed.com', 'Male', '228.106.146.9', '2016-07-29 03:07:37'), (23, 'Barbara', 'Carr', 'bcarrm@pen.io', 'Female', '106.165.140.17', '2015-09-24 13:27:23'), @@ -47,32 +47,32 @@ insert into {database}.{schema}.seed (`id`, `first_name`, `last_name`, `email`, -- add these new records to the archive table insert {database}.{schema}.archive_expected ( - `id`, - `first_name`, - `last_name`, - `email`, - `gender`, - `ip_address`, - `updated_at`, - `dbt_valid_from`, - `dbt_valid_to`, - `dbt_updated_at`, - `dbt_scd_id` + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + dbt_valid_from, + dbt_valid_to, + dbt_updated_at, + dbt_scd_id ) select - `id`, - `first_name`, - `last_name`, - `email`, - `gender`, - `ip_address`, - `updated_at`, + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, -- fields added by archival - `updated_at` as `dbt_valid_from`, - cast(null as timestamp) as `dbt_valid_to`, - `updated_at` as `dbt_updated_at`, - to_hex(md5(concat(cast(`id` as string), '-', `first_name`, '|', cast(`updated_at` as string)))) as `dbt_scd_id` + updated_at as dbt_valid_from, + cast(null as timestamp) as dbt_valid_to, + updated_at as dbt_updated_at, + to_hex(md5(concat(cast(id as string), '-', first_name, '|', cast(updated_at as string)))) as dbt_scd_id from {database}.{schema}.seed -where `id` > 20; +where id > 20; diff --git a/test/integration/043_custom_aliases_test/test_custom_aliases.py b/test/integration/043_custom_aliases_test/test_custom_aliases.py index be2e30d7a49..131941f5e74 100644 --- a/test/integration/043_custom_aliases_test/test_custom_aliases.py +++ b/test/integration/043_custom_aliases_test/test_custom_aliases.py @@ -1,4 +1,4 @@ -from test.integration.base import DBTIntegrationTest +from test.integration.base import DBTIntegrationTest, use_profile class TestAliases(DBTIntegrationTest): @@ -16,7 +16,8 @@ def project_config(self): "macro-paths": ['test/integration/043_custom_aliases_test/macros'], } - def test__customer_alias_name(self): + @use_profile('postgres') + def test_postgres_customer_alias_name(self): results = self.run_dbt(['run']) - self.assertEqual(len(results), 1) + self.assertEqual(len(results), 2) self.run_dbt(['test'])