From 7d490d4886daab2c16c2a1d517f86a27a8f5dac9 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Fri, 26 Apr 2019 14:00:42 -0400 Subject: [PATCH 1/6] Implement archival using a merge abstraction --- core/dbt/adapters/base/impl.py | 19 +- .../global_project/macros/adapters/common.sql | 40 +- .../global_project/macros/etc/datetime.sql | 4 + .../materializations/archive/archive.sql | 409 +++++++----------- .../archive/archive_merge.sql | 27 ++ .../materializations/archive/strategies.sql | 114 +++++ .../incremental/incremental.sql | 10 +- core/dbt/parser/archives.py | 22 +- .../bigquery/dbt/adapters/bigquery/impl.py | 16 - .../dbt/include/bigquery/macros/adapters.sql | 11 + .../macros/materializations/archive.sql | 16 +- .../dbt/include/postgres/macros/adapters.sql | 16 + .../macros/materializations/archive_merge.sql | 18 + .../dbt/include/redshift/macros/adapters.sql | 21 +- .../macros/materializations/archive_merge.sql | 4 + .../dbt/include/snowflake/macros/adapters.sql | 4 + .../macros/materializations/incremental.sql | 7 +- .../004_simple_archive_test/seed.sql | 13 +- .../test-archives-bq/archive.sql | 2 +- .../test-archives-invalid/archive.sql | 2 +- .../test-archives-longtext/longtext.sql | 2 +- .../test-archives-pg/archive.sql | 2 +- .../test-archives-select/archives.sql | 6 +- .../test-check-col-archives-bq/archive.sql | 4 +- .../test-check-col-archives/archive.sql | 4 +- .../test_concurrent_transaction.py | 7 +- .../033_event_tracking_test/test_events.py | 2 +- test/integration/base.py | 15 +- tox.ini | 40 +- 29 files changed, 488 insertions(+), 369 deletions(-) create mode 100644 core/dbt/include/global_project/macros/materializations/archive/archive_merge.sql create mode 100644 core/dbt/include/global_project/macros/materializations/archive/strategies.sql create mode 100644 plugins/postgres/dbt/include/postgres/macros/materializations/archive_merge.sql create mode 100644 plugins/redshift/dbt/include/redshift/macros/materializations/archive_merge.sql diff --git a/core/dbt/adapters/base/impl.py b/core/dbt/adapters/base/impl.py index e99f999f717..16d4bd300ac 100644 --- a/core/dbt/adapters/base/impl.py +++ b/core/dbt/adapters/base/impl.py @@ -387,6 +387,7 @@ def list_schemas(self, database): '`list_schemas` is not implemented for this adapter!' ) + @available.parse(lambda *a, **k: False) def check_schema_exists(self, database, schema): """Check if a schema exists. @@ -584,7 +585,14 @@ def valid_archive_target(self, relation): dbt.exceptions.raise_compiler_error(msg) @available.parse_none - def expand_target_column_types(self, temp_table, to_relation): + def expand_target_column_types(self, from_relation, to_relation): + if not isinstance(from_relation, self.Relation): + dbt.exceptions.invalid_type_error( + method_name='expand_target_column_types', + arg_name='from_relation', + got_value=from_relation, + expected_type=self.Relation) + if not isinstance(to_relation, self.Relation): dbt.exceptions.invalid_type_error( method_name='expand_target_column_types', @@ -592,14 +600,7 @@ def expand_target_column_types(self, temp_table, to_relation): got_value=to_relation, expected_type=self.Relation) - goal = self.Relation.create( - database=None, - schema=None, - identifier=temp_table, - type='table', - quote_policy=self.config.quoting - ) - self.expand_column_types(goal, to_relation) + self.expand_column_types(from_relation, to_relation) def list_relations(self, database, schema): if self._schema_is_cached(database, schema): diff --git a/core/dbt/include/global_project/macros/adapters/common.sql b/core/dbt/include/global_project/macros/adapters/common.sql index 2fde6e96115..c02239d6f49 100644 --- a/core/dbt/include/global_project/macros/adapters/common.sql +++ b/core/dbt/include/global_project/macros/adapters/common.sql @@ -28,6 +28,22 @@ {%- endif -%} {%- endmacro %} +{% macro get_columns_in_query(select_sql) -%} + {{ return(adapter_macro('get_columns_in_query', select_sql)) }} +{% endmacro %} + +{% macro default__get_columns_in_query(select_sql) %} + {% call statement('get_columns_in_query', fetch_result=True, auto_begin=False) -%} + select * from ( + {{ select_sql }} + ) as __dbt_sbq + where false + limit 0 + {% endcall %} + + {{ return(load_result('get_columns_in_query').table.columns | map(attribute='name') | list) }} +{% endmacro %} + {% macro create_schema(database_name, schema_name) -%} {{ adapter_macro('create_schema', database_name, schema_name) }} {% endmacro %} @@ -71,17 +87,6 @@ {% endmacro %} -{% macro create_archive_table(relation, columns) -%} - {{ adapter_macro('create_archive_table', relation, columns) }} -{%- endmacro %} - -{% macro default__create_archive_table(relation, columns) -%} - create table if not exists {{ relation }} ( - {{ column_list_for_create_table(columns) }} - ); -{% endmacro %} - - {% macro get_catalog(information_schemas) -%} {{ return(adapter_macro('get_catalog', information_schemas)) }} {%- endmacro %} @@ -249,3 +254,16 @@ {% endcall %} {{ return(load_result('check_schema_exists').table) }} {% endmacro %} + +{% macro make_temp_relation(base_relation, suffix='__dbt_tmp') %} + {{ return(adapter_macro('make_temp_relation', base_relation, suffix))}} +{% endmacro %} + +{% macro default__make_temp_relation(base_relation, suffix) %} + {% set tmp_identifier = base_relation.identifier ~ suffix %} + {% set tmp_relation = base_relation.incorporate( + path={"identifier": tmp_identifier}, + table_name=tmp_identifier) -%} + + {% do return(tmp_relation) %} +{% endmacro %} diff --git a/core/dbt/include/global_project/macros/etc/datetime.sql b/core/dbt/include/global_project/macros/etc/datetime.sql index 28a7654110b..f94e8251b2f 100644 --- a/core/dbt/include/global_project/macros/etc/datetime.sql +++ b/core/dbt/include/global_project/macros/etc/datetime.sql @@ -54,3 +54,7 @@ {{ return(dates_in_range(start_date, end_date, in_fmt=date_fmt)) }} {% endmacro %} +{% macro py_current_timestring() %} + {% set dt = modules.datetime.datetime.now() %} + {% do return(dt.strftime("%Y%m%d%H%M%S%f")) %} +{% endmacro %} diff --git a/core/dbt/include/global_project/macros/materializations/archive/archive.sql b/core/dbt/include/global_project/macros/materializations/archive/archive.sql index ead07b657d7..a21bd27213b 100644 --- a/core/dbt/include/global_project/macros/materializations/archive/archive.sql +++ b/core/dbt/include/global_project/macros/materializations/archive/archive.sql @@ -1,26 +1,3 @@ -{# - Create SCD Hash SQL fields cross-db -#} - -{% macro archive_hash_arguments(args) %} - {{ adapter_macro('archive_hash_arguments', args) }} -{% endmacro %} - -{% macro default__archive_hash_arguments(args) %} - md5({% for arg in args %}coalesce(cast({{ arg }} as varchar ), '') {% if not loop.last %} || '|' || {% endif %}{% endfor %}) -{% endmacro %} - -{% macro create_temporary_table(sql, relation) %} - {{ return(adapter_macro('create_temporary_table', sql, relation)) }} -{% endmacro %} - -{% macro default__create_temporary_table(sql, relation) %} - {% call statement() %} - {{ create_table_as(True, relation, sql) }} - {% endcall %} - {{ return(relation) }} -{% endmacro %} - {# Add new columns to the table if applicable #} @@ -36,86 +13,41 @@ {% endfor %} {% endmacro %} -{# - Run the update part of an archive query. Different databases have - tricky differences in their `update` semantics. Table projection is - not allowed on Redshift/pg, but is effectively required on bq. -#} -{% macro archive_update(target_relation, tmp_relation) %} - {{ adapter_macro('archive_update', target_relation, tmp_relation) }} +{% macro post_archive(staging_relation) %} + {{ adapter_macro('post_archive', staging_relation) }} {% endmacro %} -{% macro default__archive_update(target_relation, tmp_relation) %} - update {{ target_relation }} - set dbt_valid_to = tmp.dbt_valid_to - from {{ tmp_relation }} as tmp - where tmp.dbt_scd_id = {{ target_relation }}.dbt_scd_id - and change_type = 'update'; +{% macro default__post_archive(staging_relation) %} + {# no-op #} {% endmacro %} -{% macro archive_get_time() -%} - {{ adapter_macro('archive_get_time') }} -{%- endmacro %} - -{% macro default__archive_get_time() -%} - {{ current_timestamp() }} -{%- endmacro %} +{% macro archive_staging_table_inserts(strategy, source_sql, target_relation) -%} -{% macro snowflake__archive_get_time() -%} - to_timestamp_ntz({{ current_timestamp() }}) -{%- endmacro %} + with archive_query as ( + {{ source_sql }} -{% macro archive_select_generic(source_sql, target_relation, transforms, scd_hash) -%} - with source as ( - {{ source_sql }} ), - {{ transforms }} - merged as ( - select *, 'update' as change_type from updates - union all - select *, 'insert' as change_type from insertions + source_data as ( - ) - - select *, - {{ scd_hash }} as dbt_scd_id - from merged - -{%- endmacro %} + select *, + {{ strategy.scd_id }} as dbt_scd_id, + {{ strategy.unique_key }} as dbt_unique_key, + {{ strategy.updated_at }} as dbt_updated_at, + {{ strategy.updated_at }} as dbt_valid_from, + nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as dbt_valid_to -{# - Cross-db compatible archival implementation -#} -{% macro archive_select_timestamp(source_sql, target_relation, source_columns, unique_key, updated_at) -%} - {% set timestamp_column = api.Column.create('_', 'timestamp') %} - {% set transforms -%} - current_data as ( - - select - {% for col in source_columns %} - {{ col.name }} {% if not loop.last %},{% endif %} - {% endfor %}, - {{ updated_at }} as dbt_updated_at, - {{ unique_key }} as dbt_pk, - {{ updated_at }} as dbt_valid_from, - {{ timestamp_column.literal('null') }} as tmp_valid_to - from source + from archive_query ), archived_data as ( - select - {% for col in source_columns %} - {{ col.name }}, - {% endfor %} - {{ updated_at }} as dbt_updated_at, - {{ unique_key }} as dbt_pk, - dbt_valid_from, - dbt_valid_to as tmp_valid_to + select *, + {{ strategy.unique_key }} as dbt_unique_key + from {{ target_relation }} ), @@ -123,125 +55,125 @@ insertions as ( select - current_data.*, - {{ timestamp_column.literal('null') }} as dbt_valid_to - from current_data - left outer join archived_data - on archived_data.dbt_pk = current_data.dbt_pk - where - archived_data.dbt_pk is null - or ( - archived_data.dbt_pk is not null - and archived_data.dbt_updated_at < current_data.dbt_updated_at - and archived_data.tmp_valid_to is null + 'insert' as dbt_change_type, + source_data.* + + from source_data + left outer join archived_data on archived_data.dbt_unique_key = source_data.dbt_unique_key + where archived_data.dbt_unique_key is null + or ( + archived_data.dbt_unique_key is not null + and archived_data.dbt_valid_to is null + and ( + {{ strategy.row_changed }} + ) ) - ), - updates as ( + ) + + select * from insertions - select - archived_data.*, - current_data.dbt_updated_at as dbt_valid_to - from current_data - left outer join archived_data - on archived_data.dbt_pk = current_data.dbt_pk - where archived_data.dbt_pk is not null - and archived_data.dbt_updated_at < current_data.dbt_updated_at - and archived_data.tmp_valid_to is null - ), - {%- endset %} - {%- set scd_hash = archive_hash_arguments(['dbt_pk', 'dbt_updated_at']) -%} - {{ archive_select_generic(source_sql, target_relation, transforms, scd_hash) }} {%- endmacro %} -{% macro archive_select_check_cols(source_sql, target_relation, source_columns, unique_key, check_cols) -%} - {%- set timestamp_column = api.Column.create('_', 'timestamp') -%} +{% macro archive_staging_table_updates(strategy, source_sql, target_relation) -%} - {# if we recognize the primary key, it's the newest record, and anything we care about has changed, it's an update candidate #} - {%- set update_candidate -%} - archived_data.dbt_pk is not null - and ( - {%- for col in check_cols %} - current_data.{{ col }} <> archived_data.{{ col }} - {%- if not loop.last %} or {% endif %} - {% endfor -%} - ) - and archived_data.tmp_valid_to is null - {%- endset %} + with archive_query as ( - {% set transforms -%} - current_data as ( + {{ source_sql }} - select - {% for col in source_columns %} - {{ col.name }} {% if not loop.last %},{% endif %} - {% endfor %}, - {{ archive_get_time() }} as dbt_updated_at, - {{ unique_key }} as dbt_pk, - {{ archive_get_time() }} as dbt_valid_from, - {{ timestamp_column.literal('null') }} as tmp_valid_to - from source ), - archived_data as ( + source_data as ( select - {% for col in source_columns %} - {{ col.name }}, - {% endfor %} - dbt_updated_at, - {{ unique_key }} as dbt_pk, - dbt_valid_from, - dbt_valid_to as tmp_valid_to - from {{ target_relation }} + *, + {{ strategy.scd_id }} as dbt_scd_id, + {{ strategy.unique_key }} as dbt_unique_key, + {{ strategy.updated_at }} as dbt_updated_at, + {{ strategy.updated_at }} as dbt_valid_from + from archive_query ), - insertions as ( + archived_data as ( + + select *, + {{ strategy.unique_key }} as dbt_unique_key + + from {{ target_relation }} - select - current_data.*, - {{ timestamp_column.literal('null') }} as dbt_valid_to - from current_data - left outer join archived_data - on archived_data.dbt_pk = current_data.dbt_pk - where - archived_data.dbt_pk is null - or ( {{ update_candidate }} ) ), updates as ( select - archived_data.*, - {{ archive_get_time() }} as dbt_valid_to - from current_data - left outer join archived_data - on archived_data.dbt_pk = current_data.dbt_pk - where {{ update_candidate }} - ), - {%- endset %} + 'update' as dbt_change_type, + archived_data.dbt_scd_id, + source_data.dbt_valid_from as dbt_valid_to + + from source_data + join archived_data on archived_data.dbt_unique_key = source_data.dbt_unique_key + where archived_data.dbt_valid_to is null + and ( + {{ strategy.row_changed }} + ) + + ) + + select * from updates - {%- set hash_components = ['dbt_pk'] %} - {%- do hash_components.extend(check_cols) -%} - {%- set scd_hash = archive_hash_arguments(hash_components) -%} - {{ archive_select_generic(source_sql, target_relation, transforms, scd_hash) }} {%- endmacro %} -{# this is gross #} -{% macro create_empty_table_as(sql) %} - {% set tmp_relation = api.Relation.create(identifier=model['name']+'_dbt_archival_view_tmp', type='view') %} - {% set limited_sql -%} - with cte as ( - {{ sql }} - ) - select * from cte limit 0 - {%- endset %} - {%- set tmp_relation = create_temporary_table(limited_sql, tmp_relation) -%} - {{ return(tmp_relation) }} +{% macro build_archive_table(strategy, sql) %} + + select *, + {{ strategy.scd_id }} as dbt_scd_id, + {{ strategy.updated_at }} as dbt_updated_at, + {{ strategy.updated_at }} as dbt_valid_from, + nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as dbt_valid_to + from ( + {{ sql }} + ) sbq + +{% endmacro %} + + +{% macro get_or_create_relation(database, schema, identifier, type) %} + {%- set target_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) %} + {% if target_relation %} + {% do return([true, target_relation]) %} + {% endif %} + + {%- set new_relation = api.Relation.create( + database=database, + schema=schema, + identifier=identifier, + type=type + ) -%} + {% do return([false, new_relation]) %} +{% endmacro %} + +{% macro build_archive_staging_table(strategy, sql, target_relation) %} + {% set tmp_relation = make_temp_relation(target_relation) %} + + {% set inserts_select = archive_staging_table_inserts(strategy, sql, target_relation) %} + {% set updates_select = archive_staging_table_updates(strategy, sql, target_relation) %} + + {% call statement('build_archive_staging_relation_inserts') %} + {{ create_table_as(True, tmp_relation, inserts_select) }} + {% endcall %} + + {% call statement('build_archive_staging_relation_updates') %} + insert into {{ tmp_relation }} (dbt_change_type, dbt_scd_id, dbt_valid_to) + select dbt_change_type, dbt_scd_id, dbt_valid_to from ( + {{ updates_select }} + ) dbt_sbq; + {% endcall %} + + {% do return(tmp_relation) %} {% endmacro %} @@ -251,95 +183,74 @@ {%- set target_database = config.get('target_database') -%} {%- set target_schema = config.get('target_schema') -%} {%- set target_table = model.get('alias', model.get('name')) -%} - {%- set strategy = config.get('strategy') -%} - {% set information_schema = api.Relation.create( - database=target_database, - schema=target_schema, - identifier=target_table).information_schema() %} + {%- set strategy_name = config.get('strategy') -%} + {%- set unique_key = config.get('unique_key') %} - {% if not check_schema_exists(information_schema, target_schema) %} - {{ create_schema(target_database, target_schema) }} + {% if not adapter.check_schema_exists(target_database, target_schema) %} + {% do create_schema(target_database, target_schema) %} {% endif %} - {%- set target_relation = adapter.get_relation( - database=target_database, - schema=target_schema, - identifier=target_table) -%} - - {%- if target_relation is none -%} - {%- set target_relation = api.Relation.create( - database=target_database, - schema=target_schema, - identifier=target_table) -%} - {%- elif not target_relation.is_table -%} - {{ exceptions.relation_wrong_type(target_relation, 'table') }} - {%- endif -%} - - {% set source_info_model = create_empty_table_as(model['injected_sql']) %} - - {%- set source_columns = adapter.get_columns_in_relation(source_info_model) -%} + {% set target_relation_exists, target_relation = get_or_create_relation( + database=target_database, + schema=target_schema, + identifier=target_table, + type='table') -%} - {%- set unique_key = config.get('unique_key') -%} - {%- set dest_columns = source_columns + [ - api.Column.create('dbt_valid_from', 'timestamp'), - api.Column.create('dbt_valid_to', 'timestamp'), - api.Column.create('dbt_scd_id', 'string'), - api.Column.create('dbt_updated_at', 'timestamp'), - ] -%} + {%- if not target_relation.is_table -%} + {% do exceptions.relation_wrong_type(target_relation, 'table') %} + {%- endif -%} - {% call statement() %} - {{ create_archive_table(target_relation, dest_columns) }} - {% endcall %} + {% set strategy_macro = strategy_dispatch(strategy_name) %} + {% set strategy = strategy_macro(model, "archived_data", "source_data", config) %} - {% set missing_columns = adapter.get_missing_columns(source_info_model, target_relation) %} + {% if not target_relation_exists %} - {{ create_columns(target_relation, missing_columns) }} + {% set build_sql = build_archive_table(strategy, model['injected_sql']) %} + {% call statement('main') -%} + {{ create_table_as(False, target_relation, build_sql) }} + {% endcall %} - {{ adapter.valid_archive_target(target_relation) }} + {% else %} - {%- set identifier = model['alias'] -%} - {%- set tmp_identifier = model['name'] + '__dbt_archival_tmp' -%} + {{ adapter.valid_archive_target(target_relation) }} - {% set tmp_table_sql -%} + {% set staging_table = build_archive_staging_table(strategy, sql, target_relation) %} - with dbt_archive_sbq as ( + {% do adapter.expand_target_column_types(from_relation=staging_table, + to_relation=target_relation) %} - {% if strategy == 'timestamp' %} - {%- set updated_at = config.get('updated_at') -%} - {{ archive_select_timestamp(model['injected_sql'], target_relation, source_columns, unique_key, updated_at) }} - {% elif strategy == 'check' %} - {%- set check_cols = config.get('check_cols') -%} - {% if check_cols == 'all' %} - {% set check_cols = source_columns | map(attribute='name') | list %} - {% endif %} - {{ archive_select_check_cols(model['injected_sql'], target_relation, source_columns, unique_key, check_cols)}} - {% else %} - {{ exceptions.raise_compiler_error('Got invalid strategy "{}"'.format(strategy)) }} - {% endif %} - ) - select * from dbt_archive_sbq + {% set missing_columns = adapter.get_missing_columns(staging_table, target_relation) + | rejectattr('name', 'equalto', 'dbt_change_type') + | rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE') + | rejectattr('name', 'equalto', 'dbt_unique_key') + | rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY') + | list %} - {%- endset %} + {% do create_columns(target_relation, missing_columns) %} - {%- set tmp_relation = api.Relation.create(identifier=tmp_identifier, type='table') -%} - {%- set tmp_relation = create_temporary_table(tmp_table_sql, tmp_relation) -%} + {% set source_columns = adapter.get_columns_in_relation(staging_table) + | rejectattr('name', 'equalto', 'dbt_change_type') + | rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE') + | rejectattr('name', 'equalto', 'dbt_unique_key') + | rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY') + | list %} - {{ adapter.expand_target_column_types(temp_table=tmp_identifier, - to_relation=target_relation) }} + {% call statement('main') %} + {{ archive_merge_sql( + target = target_relation, + source = staging_table, + insert_cols = source_columns + ) + }} + {% endcall %} - {% call statement('_') -%} - {{ archive_update(target_relation, tmp_relation) }} - {% endcall %} + {% endif %} - {% call statement('main') -%} + {{ adapter.commit() }} - insert into {{ target_relation }} ( - {{ column_list(dest_columns) }} - ) - select {{ column_list(dest_columns) }} from {{ tmp_relation }} - where change_type = 'insert'; - {% endcall %} + {% if staging_table is defined %} + {% do post_archive(staging_table) %} + {% endif %} - {{ adapter.commit() }} {% endmaterialization %} diff --git a/core/dbt/include/global_project/macros/materializations/archive/archive_merge.sql b/core/dbt/include/global_project/macros/materializations/archive/archive_merge.sql new file mode 100644 index 00000000000..9b7ae0d25b7 --- /dev/null +++ b/core/dbt/include/global_project/macros/materializations/archive/archive_merge.sql @@ -0,0 +1,27 @@ + +{% macro archive_merge_sql(target, source, insert_cols) -%} + {{ adapter_macro('archive_merge_sql', target, source, insert_cols) }} +{%- endmacro %} + + +{% macro default__archive_merge_sql(target, source, insert_cols) -%} + {%- set insert_cols_csv = insert_cols| map(attribute="name") | join(', ') -%} + + merge into {{ target }} as DBT_INTERNAL_DEST + using {{ source }} as DBT_INTERNAL_SOURCE + on DBT_INTERNAL_SOURCE.dbt_scd_id = DBT_INTERNAL_DEST.dbt_scd_id + + when matched + and DBT_INTERNAL_DEST.dbt_valid_to is null + and DBT_INTERNAL_SOURCE.dbt_change_type = 'update' + then update + set dbt_valid_to = DBT_INTERNAL_SOURCE.dbt_valid_to + + when not matched + and DBT_INTERNAL_SOURCE.dbt_change_type = 'insert' + then insert ({{ insert_cols_csv }}) + values ({{ insert_cols_csv }}) + ; +{% endmacro %} + + diff --git a/core/dbt/include/global_project/macros/materializations/archive/strategies.sql b/core/dbt/include/global_project/macros/materializations/archive/strategies.sql new file mode 100644 index 00000000000..3fc7db24eac --- /dev/null +++ b/core/dbt/include/global_project/macros/materializations/archive/strategies.sql @@ -0,0 +1,114 @@ +{# + Dispatch strategies by name, optionally qualified to a package +#} +{% macro strategy_dispatch(name) -%} +{% set original_name = name %} + {% if '.' in name %} + {% set package_name, name = name.split(".", 1) %} + {% else %} + {% set package_name = none %} + {% endif %} + + {% if package_name is none %} + {% set package_context = context %} + {% elif package_name in context %} + {% set package_context = context[package_name] %} + {% else %} + {% set error_msg %} + Could not find package '{{package_name}}', called with '{{original_name}}' + {% endset %} + {{ exceptions.raise_compiler_error(error_msg | trim) }} + {% endif %} + + {%- set search_name = 'archive_' ~ name ~ '_strategy' -%} + + {% if search_name not in package_context %} + {% set error_msg %} + The specified strategy macro '{{name}}' was not found in package '{{ package_name }}' + {% endset %} + {{ exceptions.raise_compiler_error(error_msg | trim) }} + {% endif %} + {{ return(package_context[search_name]) }} +{%- endmacro %} + + +{# + Create SCD Hash SQL fields cross-db +#} +{% macro archive_hash_arguments(args) %} + {{ adapter_macro('archive_hash_arguments', args) }} +{% endmacro %} + + +{% macro default__archive_hash_arguments(args) %} + md5({% for arg in args %} + coalesce(cast({{ arg }} as varchar ), '') {% if not loop.last %} || '|' || {% endif %} + {% endfor %}) +{% endmacro %} + + +{# + Get the current time cross-db +#} +{% macro archive_get_time() -%} + {{ adapter_macro('archive_get_time') }} +{%- endmacro %} + +{% macro default__archive_get_time() -%} + {{ current_timestamp() }} +{%- endmacro %} + + +{# + Core strategy definitions +#} +{% macro archive_timestamp_strategy(node, archived_rel, current_rel, config) %} + {% set primary_key = config['unique_key'] %} + {% set updated_at = config['updated_at'] %} + + {% set row_changed_expr -%} + ({{ archived_rel }}.{{ updated_at }} < {{ current_rel }}.{{ updated_at }}) + {%- endset %} + + {% set scd_id_expr = archive_hash_arguments([primary_key, updated_at]) %} + + {% do return({ + "unique_key": primary_key, + "updated_at": updated_at, + "row_changed": row_changed_expr, + "scd_id": scd_id_expr + }) %} +{% endmacro %} + + +{% macro archive_check_strategy(node, archived_rel, current_rel, config) %} + {% set check_cols_config = config['check_cols'] %} + {% set primary_key = config['unique_key'] %} + {% set updated_at = archive_get_time() %} + + {% if check_cols_config == 'all' %} + {% set check_cols = get_columns_in_query(node['injected_sql']) %} + {% elif check_cols_config is iterable and (check_cols_config | length) > 0 %} + {% set check_cols = check_cols_config %} + {% else %} + {% do exceptions.raise_compiler_error("Invalid value for 'check_cols': " ~ check_cols_config) %} + {% endif %} + + {% set row_changed_expr -%} + ( + {% for col in check_cols %} + {{ archived_rel }}.{{ col }} != {{ current_rel }}.{{ col }} + {%- if not loop.last %} or {% endif %} + {% endfor %} + ) + {%- endset %} + + {% set scd_id_expr = archive_hash_arguments(check_cols) %} + + {% do return({ + "unique_key": primary_key, + "updated_at": updated_at, + "row_changed": row_changed_expr, + "scd_id": scd_id_expr + }) %} +{% endmacro %} diff --git a/core/dbt/include/global_project/macros/materializations/incremental/incremental.sql b/core/dbt/include/global_project/macros/materializations/incremental/incremental.sql index 0dd87ce10eb..66be151e444 100644 --- a/core/dbt/include/global_project/macros/materializations/incremental/incremental.sql +++ b/core/dbt/include/global_project/macros/materializations/incremental/incremental.sql @@ -15,13 +15,9 @@ {%- set unique_key = config.get('unique_key') -%} {%- set identifier = model['alias'] -%} - {%- set tmp_identifier = model['name'] + '__dbt_incremental_tmp' -%} - {%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%} {%- set target_relation = api.Relation.create(identifier=identifier, schema=schema, database=database, type='table') -%} - {%- set tmp_relation = api.Relation.create(identifier=tmp_identifier, - schema=schema, - database=database, type='table') -%} + {%- set tmp_relation = make_temp_relation(target_relation) %} {%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%} @@ -55,7 +51,7 @@ {%- endcall -%} - {{ adapter.expand_target_column_types(temp_table=tmp_identifier, + {{ adapter.expand_target_column_types(from_relation=tmp_relation, to_relation=target_relation) }} {%- call statement('main') -%} @@ -71,7 +67,7 @@ insert into {{ target_relation }} ({{ dest_cols_csv }}) ( select {{ dest_cols_csv }} - from {{ tmp_relation.include(schema=False, database=False) }} + from {{ tmp_relation }} ); {% endcall %} {%- endif %} diff --git a/core/dbt/parser/archives.py b/core/dbt/parser/archives.py index 981570a48da..fb285bac4fb 100644 --- a/core/dbt/parser/archives.py +++ b/core/dbt/parser/archives.py @@ -11,6 +11,19 @@ import os +def set_archive_attributes(node): + config_keys = { + 'target_database': 'database', + 'target_schema': 'schema' + } + + for config_key, node_key in config_keys.items(): + if config_key in node.config: + setattr(node, node_key, node.config[config_key]) + + return node + + class ArchiveParser(MacrosKnownParser): @classmethod def parse_archives_from_project(cls, config): @@ -87,12 +100,15 @@ def load_and_parse(self): archive.package_name, archive.name) - to_return[node_path] = self.parse_node( + parsed_node = self.parse_node( archive, node_path, self.all_projects.get(archive.package_name), archive_config=archive_config) + # TODO : Add tests for this + to_return[node_path] = set_archive_attributes(parsed_node) + return to_return @@ -138,7 +154,9 @@ def get_fqn(cls, node, package_project_config, extra=[]): def validate_archives(node): if node.resource_type == NodeType.Archive: try: - return ParsedArchiveNode(**node.to_shallow_dict()) + parsed_node = ParsedArchiveNode(**node.to_shallow_dict()) + return set_archive_attributes(parsed_node) + except dbt.exceptions.JSONValidationException as exc: raise dbt.exceptions.CompilationException(str(exc), node) else: diff --git a/plugins/bigquery/dbt/adapters/bigquery/impl.py b/plugins/bigquery/dbt/adapters/bigquery/impl.py index 8710ccb13bc..0e8046a2628 100644 --- a/plugins/bigquery/dbt/adapters/bigquery/impl.py +++ b/plugins/bigquery/dbt/adapters/bigquery/impl.py @@ -343,22 +343,6 @@ def execute_model(self, model, materialization, sql_override=None, return res - @available.parse(_stub_relation) - def create_temporary_table(self, sql, **kwargs): - # BQ queries always return a temp table with their results - query_job, _ = self.connections.raw_execute(sql) - bq_table = query_job.destination - - return self.Relation.create( - database=bq_table.project, - schema=bq_table.dataset_id, - identifier=bq_table.table_id, - quote_policy={ - 'schema': True, - 'identifier': True - }, - type=BigQueryRelation.Table) - @available.parse_none def alter_table_add_columns(self, relation, columns): diff --git a/plugins/bigquery/dbt/include/bigquery/macros/adapters.sql b/plugins/bigquery/dbt/include/bigquery/macros/adapters.sql index 1c87ce4dc18..83c7926ff34 100644 --- a/plugins/bigquery/dbt/include/bigquery/macros/adapters.sql +++ b/plugins/bigquery/dbt/include/bigquery/macros/adapters.sql @@ -34,6 +34,11 @@ create or replace table {{ relation }} {{ partition_by(raw_partition_by) }} {{ cluster_by(raw_cluster_by) }} + {% if temporary %} + OPTIONS( + expiration_timestamp=TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL 12 hour) + ) + {% endif %} as ( {{ sql }} ); @@ -54,6 +59,12 @@ {{ adapter.drop_schema(database_name, schema_name) }} {% endmacro %} +{% macro bigquery__drop_relation(relation) -%} + {% call statement('drop_relation') -%} + drop {{ relation.type }} if exists {{ relation }} + {%- endcall %} +{% endmacro %} + {% macro bigquery__get_columns_in_relation(relation) -%} {{ return(adapter.get_columns_in_relation(relation)) }} {% endmacro %} diff --git a/plugins/bigquery/dbt/include/bigquery/macros/materializations/archive.sql b/plugins/bigquery/dbt/include/bigquery/macros/materializations/archive.sql index 7a95f440f83..87b10589778 100644 --- a/plugins/bigquery/dbt/include/bigquery/macros/materializations/archive.sql +++ b/plugins/bigquery/dbt/include/bigquery/macros/materializations/archive.sql @@ -1,9 +1,3 @@ -{% macro bigquery__create_temporary_table(sql, relation) %} - {% set tmp_relation = adapter.create_temporary_table(sql) %} - {{ return(tmp_relation) }} -{% endmacro %} - - {% macro bigquery__archive_hash_arguments(args) %} to_hex(md5(concat({% for arg in args %}coalesce(cast({{ arg }} as string), ''){% if not loop.last %}, '|',{% endif %}{% endfor %}))) {% endmacro %} @@ -12,11 +6,7 @@ {{ adapter.alter_table_add_columns(relation, columns) }} {% endmacro %} - -{% macro bigquery__archive_update(target_relation, tmp_relation) %} - update {{ target_relation }} as dest - set dest.dbt_valid_to = tmp.dbt_valid_to - from {{ tmp_relation }} as tmp - where tmp.dbt_scd_id = dest.dbt_scd_id - and change_type = 'update'; +{% macro bigquery__post_archive(staging_relation) %} + -- Clean up the archive temp table + {% do drop_relation(staging_relation) %} {% endmacro %} diff --git a/plugins/postgres/dbt/include/postgres/macros/adapters.sql b/plugins/postgres/dbt/include/postgres/macros/adapters.sql index 0bda7fc9ad4..356b32eb66a 100644 --- a/plugins/postgres/dbt/include/postgres/macros/adapters.sql +++ b/plugins/postgres/dbt/include/postgres/macros/adapters.sql @@ -91,3 +91,19 @@ {% macro postgres__current_timestamp() -%} now() {%- endmacro %} + +{% macro postgres__archive_get_time() -%} + {{ current_timestamp() }}::timestamp without time zone +{%- endmacro %} + +{% macro postgres__make_temp_relation(base_relation, suffix) %} + {% set tmp_identifier = base_relation.identifier ~ suffix ~ py_current_timestring() %} + {% do return(base_relation.incorporate( + table_name=tmp_identifier, + path={ + "identifier": tmp_identifier, + "schema": none, + "database": none + })) -%} +{% endmacro %} + diff --git a/plugins/postgres/dbt/include/postgres/macros/materializations/archive_merge.sql b/plugins/postgres/dbt/include/postgres/macros/materializations/archive_merge.sql new file mode 100644 index 00000000000..9665dbd73ca --- /dev/null +++ b/plugins/postgres/dbt/include/postgres/macros/materializations/archive_merge.sql @@ -0,0 +1,18 @@ + +{% macro postgres__archive_merge_sql(target, source, insert_cols) -%} + {%- set insert_cols_csv = insert_cols | map(attribute="name") | join(', ') -%} + + update {{ target }} + set dbt_valid_to = DBT_INTERNAL_SOURCE.dbt_valid_to + from {{ source }} as DBT_INTERNAL_SOURCE + where DBT_INTERNAL_SOURCE.dbt_scd_id = {{ target }}.dbt_scd_id + and DBT_INTERNAL_SOURCE.dbt_change_type = 'update' + and {{ target }}.dbt_valid_to is null; + + insert into {{ target }} ({{ insert_cols_csv }}) + select {% for column in insert_cols -%} + DBT_INTERNAL_SOURCE.{{ column.name }} {%- if not loop.last %}, {%- endif %} + {%- endfor %} + from {{ source }} as DBT_INTERNAL_SOURCE + where DBT_INTERNAL_SOURCE.dbt_change_type = 'insert'; +{% endmacro %} diff --git a/plugins/redshift/dbt/include/redshift/macros/adapters.sql b/plugins/redshift/dbt/include/redshift/macros/adapters.sql index 29f6ad0b16f..37d79d3416f 100644 --- a/plugins/redshift/dbt/include/redshift/macros/adapters.sql +++ b/plugins/redshift/dbt/include/redshift/macros/adapters.sql @@ -1,3 +1,4 @@ + {% macro dist(dist) %} {%- if dist is not none -%} {%- set dist = dist.strip().lower() -%} @@ -57,15 +58,6 @@ {% endmacro %} -{% macro redshift__create_archive_table(relation, columns) -%} - create table if not exists {{ relation }} ( - {{ column_list_for_create_table(columns) }} - ) - {{ dist('dbt_updated_at') }} - {{ sort('compound', ['dbt_scd_id']) }}; -{%- endmacro %} - - {% macro redshift__create_schema(database_name, schema_name) -%} {{ postgres__create_schema(database_name, schema_name) }} {% endmacro %} @@ -171,10 +163,15 @@ {% macro redshift__check_schema_exists(information_schema, schema) -%} {{ return(postgres__check_schema_exists(information_schema, schema)) }} {%- endmacro %} -list_schemas - -%} {% macro redshift__current_timestamp() -%} getdate() {%- endmacro %} + +{% macro redshift__archive_get_time() -%} + {{ current_timestamp() }}::timestamp +{%- endmacro %} + +{% macro redshift__make_temp_relation(base_relation, suffix) %} + {% do return(postgres__make_temp_relation(base_relation, suffix)) %} +{% endmacro %} diff --git a/plugins/redshift/dbt/include/redshift/macros/materializations/archive_merge.sql b/plugins/redshift/dbt/include/redshift/macros/materializations/archive_merge.sql new file mode 100644 index 00000000000..efde2e8373c --- /dev/null +++ b/plugins/redshift/dbt/include/redshift/macros/materializations/archive_merge.sql @@ -0,0 +1,4 @@ + +{% macro redshift__archive_merge_sql(target, source, insert_cols) -%} + {{ postgres__archive_merge_sql(target, source, insert_cols) }} +{% endmacro %} diff --git a/plugins/snowflake/dbt/include/snowflake/macros/adapters.sql b/plugins/snowflake/dbt/include/snowflake/macros/adapters.sql index 7057c6fd653..99eb9224dbd 100644 --- a/plugins/snowflake/dbt/include/snowflake/macros/adapters.sql +++ b/plugins/snowflake/dbt/include/snowflake/macros/adapters.sql @@ -79,6 +79,10 @@ convert_timezone('UTC', current_timestamp()) {%- endmacro %} +{% macro snowflake__archive_get_time() -%} + to_timestamp_ntz({{ current_timestamp() }}) +{%- endmacro %} + {% macro snowflake__rename_relation(from_relation, to_relation) -%} {% call statement('rename_relation') -%} diff --git a/plugins/snowflake/dbt/include/snowflake/macros/materializations/incremental.sql b/plugins/snowflake/dbt/include/snowflake/macros/materializations/incremental.sql index ef7a2ec8e35..87be13a8190 100644 --- a/plugins/snowflake/dbt/include/snowflake/macros/materializations/incremental.sql +++ b/plugins/snowflake/dbt/include/snowflake/macros/materializations/incremental.sql @@ -11,10 +11,7 @@ identifier=identifier, type='table') -%} - {%- set tmp_relation = api.Relation.create(database=database, - schema=schema, - identifier=identifier ~ "__dbt_tmp", - type='table') -%} + {%- set tmp_relation = make_temp_relation(target_relation) %} -- setup {{ run_hooks(pre_hooks, inside_transaction=False) }} @@ -42,7 +39,7 @@ {{ create_table_as(true, tmp_relation, sql) }} {%- endcall -%} - {{ adapter.expand_target_column_types(temp_table=tmp_relation.identifier, + {{ adapter.expand_target_column_types(from_relation=tmp_relation, to_relation=target_relation) }} {% set incremental_sql %} ( diff --git a/test/integration/004_simple_archive_test/seed.sql b/test/integration/004_simple_archive_test/seed.sql index cee3748faa4..d0ee03181c5 100644 --- a/test/integration/004_simple_archive_test/seed.sql +++ b/test/integration/004_simple_archive_test/seed.sql @@ -1,4 +1,4 @@ - create table {database}.{schema}.seed ( +create table {database}.{schema}.seed ( id INTEGER, first_name VARCHAR(50), last_name VARCHAR(50), @@ -20,7 +20,7 @@ create table {database}.{schema}.archive_expected ( updated_at TIMESTAMP WITHOUT TIME ZONE, dbt_valid_from TIMESTAMP WITHOUT TIME ZONE, dbt_valid_to TIMESTAMP WITHOUT TIME ZONE, - dbt_scd_id VARCHAR(256), + dbt_scd_id VARCHAR(32), dbt_updated_at TIMESTAMP WITHOUT TIME ZONE ); @@ -79,8 +79,6 @@ select md5(id || '-' || first_name || '|' || updated_at::text) as dbt_scd_id from {database}.{schema}.seed; - - create table {database}.{schema}.archive_castillo_expected ( id INTEGER, first_name VARCHAR(50), @@ -93,8 +91,9 @@ create table {database}.{schema}.archive_castillo_expected ( updated_at TIMESTAMP WITHOUT TIME ZONE, dbt_valid_from TIMESTAMP WITHOUT TIME ZONE, dbt_valid_to TIMESTAMP WITHOUT TIME ZONE, - dbt_scd_id VARCHAR(256), + dbt_scd_id VARCHAR(32), dbt_updated_at TIMESTAMP WITHOUT TIME ZONE + ); -- one entry @@ -139,7 +138,7 @@ create table {database}.{schema}.archive_alvarez_expected ( updated_at TIMESTAMP WITHOUT TIME ZONE, dbt_valid_from TIMESTAMP WITHOUT TIME ZONE, dbt_valid_to TIMESTAMP WITHOUT TIME ZONE, - dbt_scd_id VARCHAR(256), + dbt_scd_id VARCHAR(32), dbt_updated_at TIMESTAMP WITHOUT TIME ZONE ); @@ -185,7 +184,7 @@ create table {database}.{schema}.archive_kelly_expected ( updated_at TIMESTAMP WITHOUT TIME ZONE, dbt_valid_from TIMESTAMP WITHOUT TIME ZONE, dbt_valid_to TIMESTAMP WITHOUT TIME ZONE, - dbt_scd_id VARCHAR(256), + dbt_scd_id VARCHAR(32), dbt_updated_at TIMESTAMP WITHOUT TIME ZONE ); diff --git a/test/integration/004_simple_archive_test/test-archives-bq/archive.sql b/test/integration/004_simple_archive_test/test-archives-bq/archive.sql index d7dec9d043e..aff119c410b 100644 --- a/test/integration/004_simple_archive_test/test-archives-bq/archive.sql +++ b/test/integration/004_simple_archive_test/test-archives-bq/archive.sql @@ -9,6 +9,6 @@ updated_at='updated_at', ) }} - select * from `{{database}}`.`{{schema}}`.seed + select * from `{{target.database}}`.`{{schema}}`.seed {% endarchive %} diff --git a/test/integration/004_simple_archive_test/test-archives-invalid/archive.sql b/test/integration/004_simple_archive_test/test-archives-invalid/archive.sql index 3bbe49664c1..35340368781 100644 --- a/test/integration/004_simple_archive_test/test-archives-invalid/archive.sql +++ b/test/integration/004_simple_archive_test/test-archives-invalid/archive.sql @@ -7,6 +7,6 @@ updated_at='updated_at', ) }} - select * from {{database}}.{{schema}}.seed + select * from {{target.database}}.{{schema}}.seed {% endarchive %} diff --git a/test/integration/004_simple_archive_test/test-archives-longtext/longtext.sql b/test/integration/004_simple_archive_test/test-archives-longtext/longtext.sql index d8c671cb1b9..c16ce784889 100644 --- a/test/integration/004_simple_archive_test/test-archives-longtext/longtext.sql +++ b/test/integration/004_simple_archive_test/test-archives-longtext/longtext.sql @@ -8,5 +8,5 @@ updated_at='updated_at', ) }} - select * from {{database}}.{{schema}}.super_long + select * from {{target.database}}.{{schema}}.super_long {% endarchive %} diff --git a/test/integration/004_simple_archive_test/test-archives-pg/archive.sql b/test/integration/004_simple_archive_test/test-archives-pg/archive.sql index 9117a8df1a4..4810fd4a519 100644 --- a/test/integration/004_simple_archive_test/test-archives-pg/archive.sql +++ b/test/integration/004_simple_archive_test/test-archives-pg/archive.sql @@ -9,6 +9,6 @@ updated_at='updated_at', ) }} - select * from {{database}}.{{schema}}.seed + select * from {{target.database}}.{{schema}}.seed {% endarchive %} diff --git a/test/integration/004_simple_archive_test/test-archives-select/archives.sql b/test/integration/004_simple_archive_test/test-archives-select/archives.sql index 30e78fe720d..562ec89b3ce 100644 --- a/test/integration/004_simple_archive_test/test-archives-select/archives.sql +++ b/test/integration/004_simple_archive_test/test-archives-select/archives.sql @@ -9,7 +9,7 @@ updated_at='updated_at', ) }} - select * from {{database}}.{{schema}}.seed where last_name = 'Castillo' + select * from {{target.database}}.{{schema}}.seed where last_name = 'Castillo' {% endarchive %} @@ -24,7 +24,7 @@ updated_at='updated_at', ) }} - select * from {{database}}.{{schema}}.seed where last_name = 'Alvarez' + select * from {{target.database}}.{{schema}}.seed where last_name = 'Alvarez' {% endarchive %} @@ -40,6 +40,6 @@ updated_at='updated_at', ) }} - select * from {{database}}.{{schema}}.seed where last_name = 'Kelly' + select * from {{target.database}}.{{schema}}.seed where last_name = 'Kelly' {% endarchive %} diff --git a/test/integration/004_simple_archive_test/test-check-col-archives-bq/archive.sql b/test/integration/004_simple_archive_test/test-check-col-archives-bq/archive.sql index 40a2563291f..50eece23b5f 100644 --- a/test/integration/004_simple_archive_test/test-check-col-archives-bq/archive.sql +++ b/test/integration/004_simple_archive_test/test-check-col-archives-bq/archive.sql @@ -8,7 +8,7 @@ check_cols=('email',), ) }} - select * from `{{database}}`.`{{schema}}`.seed + select * from `{{target.database}}`.`{{schema}}`.seed {% endarchive %} @@ -23,5 +23,5 @@ check_cols='all', ) }} - select * from `{{database}}`.`{{schema}}`.seed + select * from `{{target.database}}`.`{{schema}}`.seed {% endarchive %} diff --git a/test/integration/004_simple_archive_test/test-check-col-archives/archive.sql b/test/integration/004_simple_archive_test/test-check-col-archives/archive.sql index c3ee6fe2038..314b227634a 100644 --- a/test/integration/004_simple_archive_test/test-check-col-archives/archive.sql +++ b/test/integration/004_simple_archive_test/test-check-col-archives/archive.sql @@ -9,7 +9,7 @@ check_cols=['email'], ) }} - select * from {{database}}.{{schema}}.seed + select * from {{target.database}}.{{schema}}.seed {% endarchive %} @@ -24,5 +24,5 @@ check_cols='all', ) }} - select * from {{database}}.{{schema}}.seed + select * from {{target.database}}.{{schema}}.seed {% endarchive %} diff --git a/test/integration/032_concurrent_transaction_test/test_concurrent_transaction.py b/test/integration/032_concurrent_transaction_test/test_concurrent_transaction.py index 1b47ded1d10..22498f2f36e 100644 --- a/test/integration/032_concurrent_transaction_test/test_concurrent_transaction.py +++ b/test/integration/032_concurrent_transaction_test/test_concurrent_transaction.py @@ -1,6 +1,7 @@ from test.integration.base import DBTIntegrationTest, use_profile -import threading +import threading, traceback from dbt.adapters.factory import get_adapter +from dbt.logger import GLOBAL_LOGGER as logger class BaseTestConcurrentTransaction(DBTIntegrationTest): @@ -31,7 +32,7 @@ def run_select_and_check(self, rel, sql): connection_name = '__test_{}'.format(id(threading.current_thread())) try: with get_adapter(self.config).connection_named(connection_name) as conn: - res = self.run_sql_common(self.transform_sql(sql), 'one', conn) + res = self.run_sql_common(self.transform_sql(sql), 'one', conn, verbose=True) # The result is the output of f_sleep(), which is True if res[0] == True: @@ -40,6 +41,8 @@ def run_select_and_check(self, rel, sql): self.query_state[rel] = 'bad' except Exception as e: + logger.info("Caught exception: {}".format(e)) + traceback.print_exc() if 'concurrent transaction' in str(e): self.query_state[rel] = 'error: {}'.format(e) else: diff --git a/test/integration/033_event_tracking_test/test_events.py b/test/integration/033_event_tracking_test/test_events.py index 03f9ad3a9d2..61900e36752 100644 --- a/test/integration/033_event_tracking_test/test_events.py +++ b/test/integration/033_event_tracking_test/test_events.py @@ -610,7 +610,7 @@ def test__event_tracking_archive(self): model_id='3cdcd0fef985948fd33af308468da3b9', index=1, total=1, - status='INSERT 0 1', + status='SELECT 1', materialization='archive' ), self.build_context('archive', 'end', result_type='ok') diff --git a/test/integration/base.py b/test/integration/base.py index c7c67edde2b..516b65f0800 100644 --- a/test/integration/base.py +++ b/test/integration/base.py @@ -129,7 +129,8 @@ def redshift_profile(self): 'user': os.getenv('REDSHIFT_TEST_USER'), 'pass': os.getenv('REDSHIFT_TEST_PASS'), 'dbname': os.getenv('REDSHIFT_TEST_DBNAME'), - 'schema': self.unique_schema() + 'schema': self.unique_schema(), + 'keepalives_idle': 5 } }, 'target': 'default2' @@ -534,10 +535,14 @@ def run_sql_presto(self, sql, fetch, conn): conn.handle.commit() conn.transaction_open = False - def run_sql_common(self, sql, fetch, conn): + def run_sql_common(self, sql, fetch, conn, verbose=False): with conn.handle.cursor() as cursor: try: + if verbose: + logger.debug('running sql: {}'.format(sql)) cursor.execute(sql) + if verbose: + logger.debug('result from sql: {}'.format(cursor.statusmessage)) conn.handle.commit() if fetch == 'one': return cursor.fetchone() @@ -546,9 +551,9 @@ def run_sql_common(self, sql, fetch, conn): else: return except BaseException as e: - conn.handle.rollback() print(sql) print(e) + conn.handle.rollback() raise e finally: conn.transaction_open = False @@ -1015,7 +1020,9 @@ def _assertTableColumnsEqual(self, relation_a, relation_b): text_types = {'text', 'character varying', 'character', 'varchar'} - self.assertEqual(len(table_a_result), len(table_b_result)) + self.assertEqual(len(table_a_result), len(table_b_result), + "{} vs. {}".format(table_a_result, table_b_result)) + for a_column, b_column in zip(table_a_result, table_b_result): a_name, a_type, a_size = a_column b_name, b_type, b_size = b_column diff --git a/tox.ini b/tox.ini index 2134d39fb3c..976a3fa25e5 100644 --- a/tox.ini +++ b/tox.ini @@ -10,14 +10,14 @@ deps = [testenv:unit-py27] basepython = python2.7 -commands = /bin/bash -c '{envpython} -m pytest -v {posargs} --cov=dbt --cov-branch --cov-report html:htmlcov test/unit' +commands = /bin/bash -c '{envpython} -m pytest --durations 0 -v {posargs} --cov=dbt --cov-branch --cov-report html:htmlcov test/unit' deps = -r{toxinidir}/requirements.txt -r{toxinidir}/dev_requirements.txt [testenv:unit-py36] basepython = python3.6 -commands = /bin/bash -c '{envpython} -m pytest -v {posargs} --cov=dbt --cov-branch --cov-report html:htmlcov test/unit' +commands = /bin/bash -c '{envpython} -m pytest --durations 0 -v {posargs} --cov=dbt --cov-branch --cov-report html:htmlcov test/unit' deps = -r{toxinidir}/requirements.txt -r{toxinidir}/dev_requirements.txt @@ -27,7 +27,7 @@ basepython = python2.7 passenv = * setenv = HOME=/home/dbt_test_user -commands = /bin/bash -c '{envpython} -m pytest -v -m profile_postgres {posargs} --cov=dbt --cov-branch --cov-report html:htmlcov test/integration/*' +commands = /bin/bash -c '{envpython} -m pytest --durations 0 -v -m profile_postgres {posargs} --cov=dbt --cov-branch --cov-report html:htmlcov test/integration/*' deps = -e {toxinidir}/core -e {toxinidir}/plugins/postgres @@ -38,7 +38,7 @@ basepython = python2.7 passenv = * setenv = HOME=/home/dbt_test_user -commands = /bin/bash -c '{envpython} -m pytest -v -m profile_snowflake {posargs} --cov=dbt --cov-branch --cov-report html:htmlcov test/integration/*' +commands = /bin/bash -c '{envpython} -m pytest --durations 0 -v -m profile_snowflake {posargs} --cov=dbt --cov-branch --cov-report html:htmlcov test/integration/*' deps = -e {toxinidir}/core -e {toxinidir}/plugins/snowflake @@ -49,7 +49,7 @@ basepython = python2.7 passenv = * setenv = HOME=/home/dbt_test_user -commands = /bin/bash -c '{envpython} -m pytest -v -m profile_bigquery {posargs} --cov=dbt --cov-branch --cov-report html:htmlcov test/integration/*' +commands = /bin/bash -c '{envpython} -m pytest --durations 0 -v -m profile_bigquery {posargs} --cov=dbt --cov-branch --cov-report html:htmlcov test/integration/*' deps = -e {toxinidir}/core -e {toxinidir}/plugins/bigquery @@ -60,7 +60,7 @@ basepython = python2.7 passenv = * setenv = HOME=/home/dbt_test_user -commands = /bin/bash -c '{envpython} -m pytest -v -m profile_redshift {posargs} --cov=dbt --cov-branch --cov-report html:htmlcov test/integration/*' +commands = /bin/bash -c '{envpython} -m pytest --durations 0 -v -m profile_redshift {posargs} --cov=dbt --cov-branch --cov-report html:htmlcov test/integration/*' deps = -e {toxinidir}/core -e {toxinidir}/plugins/postgres @@ -72,7 +72,7 @@ basepython = python2.7 passenv = * setenv = HOME=/home/dbt_test_user -commands = /bin/bash -c '{envpython} -m pytest -v -m profile_presto {posargs} --cov=dbt --cov-branch --cov-report html:htmlcov test/integration/*' +commands = /bin/bash -c '{envpython} -m pytest --durations 0 -v -m profile_presto {posargs} --cov=dbt --cov-branch --cov-report html:htmlcov test/integration/*' deps = -e {toxinidir}/core -e {toxinidir}/plugins/presto @@ -83,7 +83,7 @@ basepython = python3.6 passenv = * setenv = HOME=/home/dbt_test_user -commands = /bin/bash -c '{envpython} -m pytest -v -m profile_postgres --cov=dbt --cov-branch --cov-report html:htmlcov {posargs} test/integration/*' +commands = /bin/bash -c '{envpython} -m pytest --durations 0 -v -m profile_postgres --cov=dbt --cov-branch --cov-report html:htmlcov {posargs} test/integration/*' deps = -e {toxinidir}/core -e {toxinidir}/plugins/postgres @@ -94,7 +94,7 @@ basepython = python3.6 passenv = * setenv = HOME=/home/dbt_test_user -commands = /bin/bash -c '{envpython} -m pytest -v -m profile_snowflake {posargs} --cov=dbt --cov-branch --cov-report html:htmlcov test/integration/*' +commands = /bin/bash -c '{envpython} -m pytest --durations 0 -v -m profile_snowflake {posargs} --cov=dbt --cov-branch --cov-report html:htmlcov test/integration/*' deps = -e {toxinidir}/core -e {toxinidir}/plugins/snowflake @@ -105,7 +105,7 @@ basepython = python3.6 passenv = * setenv = HOME=/home/dbt_test_user -commands = /bin/bash -c '{envpython} -m pytest -v -m profile_bigquery {posargs} --cov=dbt --cov-branch --cov-report html:htmlcov test/integration/*' +commands = /bin/bash -c '{envpython} -m pytest --durations 0 -v -m profile_bigquery {posargs} --cov=dbt --cov-branch --cov-report html:htmlcov test/integration/*' deps = -e {toxinidir}/core -e {toxinidir}/plugins/bigquery @@ -116,7 +116,7 @@ basepython = python3.6 passenv = * setenv = HOME=/home/dbt_test_user -commands = /bin/bash -c '{envpython} -m pytest -v -m profile_redshift {posargs} --cov=dbt --cov-branch --cov-report html:htmlcov test/integration/*' +commands = /bin/bash -c '{envpython} -m pytest --durations 0 -v -m profile_redshift {posargs} --cov=dbt --cov-branch --cov-report html:htmlcov test/integration/*' deps = -e {toxinidir}/core -e {toxinidir}/plugins/postgres @@ -128,7 +128,7 @@ basepython = python3.6 passenv = * setenv = HOME=/home/dbt_test_user -commands = /bin/bash -c '{envpython} -m pytest -v -m profile_presto {posargs} --cov=dbt --cov-branch --cov-report html:htmlcov test/integration/*' +commands = /bin/bash -c '{envpython} -m pytest --durations 0 -v -m profile_presto {posargs} --cov=dbt --cov-branch --cov-report html:htmlcov test/integration/*' deps = -e {toxinidir}/core -e {toxinidir}/plugins/presto @@ -139,7 +139,7 @@ basepython = python2.7 passenv = * setenv = HOME=/home/dbt_test_user -commands = /bin/bash -c '{envpython} -m pytest -v {posargs}' +commands = /bin/bash -c '{envpython} -m pytest --durations 0 -v {posargs}' deps = -r{toxinidir}/requirements.txt -r{toxinidir}/dev_requirements.txt @@ -149,7 +149,7 @@ basepython = python3.6 passenv = * setenv = HOME=/home/dbt_test_user -commands = /bin/bash -c '{envpython} -m pytest -v {posargs}' +commands = /bin/bash -c '{envpython} -m pytest --durations 0 -v {posargs}' deps = -r{toxinidir}/requirements.txt -r{toxinidir}/dev_requirements.txt @@ -160,7 +160,7 @@ passenv = * setenv = DBT_CONFIG_DIR = ~/.dbt DBT_INVOCATION_ENV = ci-appveyor -commands = pytest -v -m 'profile_postgres or profile_snowflake or profile_bigquery or profile_redshift' --cov=dbt --cov-branch --cov-report html:htmlcov test/integration test/unit +commands = pytest --durations 0 -v -m 'profile_postgres or profile_snowflake or profile_bigquery or profile_redshift' --cov=dbt --cov-branch --cov-report html:htmlcov test/integration test/unit deps = -r{toxinidir}/requirements.txt -r{toxinidir}/dev_requirements.txt @@ -171,7 +171,7 @@ passenv = * setenv = DBT_CONFIG_DIR = ~/.dbt DBT_INVOCATION_ENV = ci-appveyor -commands = python -m pytest -v {posargs} --cov=dbt --cov-branch --cov-report html:htmlcov test/unit +commands = python -m pytest --durations 0 -v {posargs} --cov=dbt --cov-branch --cov-report html:htmlcov test/unit deps = -r{toxinidir}/requirements.txt -r{toxinidir}/dev_requirements.txt @@ -183,7 +183,7 @@ passenv = * setenv = DBT_CONFIG_DIR = ~/.dbt DBT_INVOCATION_ENV = ci-appveyor -commands = python -m pytest -v -m profile_postgres {posargs} --cov=dbt --cov-branch --cov-report html:htmlcov test/integration +commands = python -m pytest --durations 0 -v -m profile_postgres {posargs} --cov=dbt --cov-branch --cov-report html:htmlcov test/integration deps = -e {toxinidir}/core -e {toxinidir}/plugins/postgres @@ -196,7 +196,7 @@ passenv = * setenv = DBT_CONFIG_DIR = ~/.dbt DBT_INVOCATION_ENV = ci-appveyor -commands = python -m pytest -v -m profile_snowflake {posargs} --cov=dbt --cov-branch --cov-report html:htmlcov test/integration +commands = python -m pytest --durations 0 -v -m profile_snowflake {posargs} --cov=dbt --cov-branch --cov-report html:htmlcov test/integration deps = -e {toxinidir}/core -e {toxinidir}/plugins/snowflake @@ -209,7 +209,7 @@ passenv = * setenv = DBT_CONFIG_DIR = ~/.dbt DBT_INVOCATION_ENV = ci-appveyor -commands = python -m pytest -v -m profile_bigquery {posargs} --cov=dbt --cov-branch --cov-report html:htmlcov test/integration +commands = python -m pytest --durations 0 -v -m profile_bigquery {posargs} --cov=dbt --cov-branch --cov-report html:htmlcov test/integration deps = -e {toxinidir}/core -e {toxinidir}/plugins/bigquery @@ -222,7 +222,7 @@ passenv = * setenv = DBT_CONFIG_DIR = ~/.dbt DBT_INVOCATION_ENV = ci-appveyor -commands = python -m pytest -v -m profile_redshift {posargs} --cov=dbt --cov-branch --cov-report html:htmlcov test/integration +commands = python -m pytest --durations 0 -v -m profile_redshift {posargs} --cov=dbt --cov-branch --cov-report html:htmlcov test/integration deps = -e {toxinidir}/core -e {toxinidir}/plugins/postgres From b98ea32add3292134dfd3bc9d22b2a090a338719 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Thu, 30 May 2019 12:06:01 -0400 Subject: [PATCH 2/6] code review --- core/dbt/parser/archives.py | 1 - .../test-archives-bq/archive.sql | 2 +- .../test-archives-invalid/archive.sql | 2 +- .../test-archives-longtext/longtext.sql | 2 +- .../test-archives-pg/archive.sql | 4 +-- .../test-archives-select/archives.sql | 4 +-- .../test-check-col-archives-bq/archive.sql | 4 +-- .../test-check-col-archives/archive.sql | 4 +-- .../test_simple_archive.py | 25 +++++++++++++++++++ 9 files changed, 36 insertions(+), 12 deletions(-) diff --git a/core/dbt/parser/archives.py b/core/dbt/parser/archives.py index fb285bac4fb..fd74cd7f2b4 100644 --- a/core/dbt/parser/archives.py +++ b/core/dbt/parser/archives.py @@ -106,7 +106,6 @@ def load_and_parse(self): self.all_projects.get(archive.package_name), archive_config=archive_config) - # TODO : Add tests for this to_return[node_path] = set_archive_attributes(parsed_node) return to_return diff --git a/test/integration/004_simple_archive_test/test-archives-bq/archive.sql b/test/integration/004_simple_archive_test/test-archives-bq/archive.sql index aff119c410b..d7dec9d043e 100644 --- a/test/integration/004_simple_archive_test/test-archives-bq/archive.sql +++ b/test/integration/004_simple_archive_test/test-archives-bq/archive.sql @@ -9,6 +9,6 @@ updated_at='updated_at', ) }} - select * from `{{target.database}}`.`{{schema}}`.seed + select * from `{{database}}`.`{{schema}}`.seed {% endarchive %} diff --git a/test/integration/004_simple_archive_test/test-archives-invalid/archive.sql b/test/integration/004_simple_archive_test/test-archives-invalid/archive.sql index 35340368781..3bbe49664c1 100644 --- a/test/integration/004_simple_archive_test/test-archives-invalid/archive.sql +++ b/test/integration/004_simple_archive_test/test-archives-invalid/archive.sql @@ -7,6 +7,6 @@ updated_at='updated_at', ) }} - select * from {{target.database}}.{{schema}}.seed + select * from {{database}}.{{schema}}.seed {% endarchive %} diff --git a/test/integration/004_simple_archive_test/test-archives-longtext/longtext.sql b/test/integration/004_simple_archive_test/test-archives-longtext/longtext.sql index c16ce784889..d8c671cb1b9 100644 --- a/test/integration/004_simple_archive_test/test-archives-longtext/longtext.sql +++ b/test/integration/004_simple_archive_test/test-archives-longtext/longtext.sql @@ -8,5 +8,5 @@ updated_at='updated_at', ) }} - select * from {{target.database}}.{{schema}}.super_long + select * from {{database}}.{{schema}}.super_long {% endarchive %} diff --git a/test/integration/004_simple_archive_test/test-archives-pg/archive.sql b/test/integration/004_simple_archive_test/test-archives-pg/archive.sql index 4810fd4a519..133465078c1 100644 --- a/test/integration/004_simple_archive_test/test-archives-pg/archive.sql +++ b/test/integration/004_simple_archive_test/test-archives-pg/archive.sql @@ -3,12 +3,12 @@ {{ config( target_database=var('target_database', database), - target_schema=schema, + target_schema=var('target_schema', schema), unique_key='id || ' ~ "'-'" ~ ' || first_name', strategy='timestamp', updated_at='updated_at', ) }} - select * from {{target.database}}.{{schema}}.seed + select * from {{target.database}}.{{target.schema}}.seed {% endarchive %} diff --git a/test/integration/004_simple_archive_test/test-archives-select/archives.sql b/test/integration/004_simple_archive_test/test-archives-select/archives.sql index 562ec89b3ce..7250715e379 100644 --- a/test/integration/004_simple_archive_test/test-archives-select/archives.sql +++ b/test/integration/004_simple_archive_test/test-archives-select/archives.sql @@ -9,7 +9,7 @@ updated_at='updated_at', ) }} - select * from {{target.database}}.{{schema}}.seed where last_name = 'Castillo' + select * from {{database}}.{{schema}}.seed where last_name = 'Castillo' {% endarchive %} @@ -24,7 +24,7 @@ updated_at='updated_at', ) }} - select * from {{target.database}}.{{schema}}.seed where last_name = 'Alvarez' + select * from {{database}}.{{schema}}.seed where last_name = 'Alvarez' {% endarchive %} diff --git a/test/integration/004_simple_archive_test/test-check-col-archives-bq/archive.sql b/test/integration/004_simple_archive_test/test-check-col-archives-bq/archive.sql index 50eece23b5f..40a2563291f 100644 --- a/test/integration/004_simple_archive_test/test-check-col-archives-bq/archive.sql +++ b/test/integration/004_simple_archive_test/test-check-col-archives-bq/archive.sql @@ -8,7 +8,7 @@ check_cols=('email',), ) }} - select * from `{{target.database}}`.`{{schema}}`.seed + select * from `{{database}}`.`{{schema}}`.seed {% endarchive %} @@ -23,5 +23,5 @@ check_cols='all', ) }} - select * from `{{target.database}}`.`{{schema}}`.seed + select * from `{{database}}`.`{{schema}}`.seed {% endarchive %} diff --git a/test/integration/004_simple_archive_test/test-check-col-archives/archive.sql b/test/integration/004_simple_archive_test/test-check-col-archives/archive.sql index 314b227634a..c3ee6fe2038 100644 --- a/test/integration/004_simple_archive_test/test-check-col-archives/archive.sql +++ b/test/integration/004_simple_archive_test/test-check-col-archives/archive.sql @@ -9,7 +9,7 @@ check_cols=['email'], ) }} - select * from {{target.database}}.{{schema}}.seed + select * from {{database}}.{{schema}}.seed {% endarchive %} @@ -24,5 +24,5 @@ check_cols='all', ) }} - select * from {{target.database}}.{{schema}}.seed + select * from {{database}}.{{schema}}.seed {% endarchive %} diff --git a/test/integration/004_simple_archive_test/test_simple_archive.py b/test/integration/004_simple_archive_test/test_simple_archive.py index 04d9bd3eed4..c7bf29c3ce5 100644 --- a/test/integration/004_simple_archive_test/test_simple_archive.py +++ b/test/integration/004_simple_archive_test/test_simple_archive.py @@ -382,6 +382,31 @@ def run_archive(self): return self.run_dbt(['archive', '--vars', '{{"target_database": {}}}'.format(self.alternative_database)]) +class TestCrossSchemaArchiveFiles(TestSimpleArchive): + @property + def project_config(self): + paths = ['test/integration/004_simple_archive_test/test-archives-pg'] + return { + 'archive-paths': paths, + } + + def target_schema(self): + return "{}_archived".format(self.unique_schema()) + + def run_archive(self): + return self.run_dbt(['archive', '--vars', '{{"target_schema": {}}}'.format(self.target_schema())]) + + @use_profile('postgres') + def test__postgres_ref_archive_cross_schema(self): + self.run_sql_file('test/integration/004_simple_archive_test/seed_pg.sql') + + results = self.run_archive() + self.assertEqual(len(results), self.NUM_ARCHIVE_MODELS) + + results = self.run_dbt(['run', '--vars', '{{"target_schema": {}}}'.format(self.target_schema())]) + self.assertEqual(len(results), 1) + + class TestBadArchive(DBTIntegrationTest): @property def schema(self): From 81f4c1bd7cff93dd67ee9ea0f67d69f87a0f7631 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Thu, 30 May 2019 12:18:58 -0400 Subject: [PATCH 3/6] cleanup merge --- .../test_concurrent_transaction.py | 2 -- test/integration/base.py | 15 ++++----------- 2 files changed, 4 insertions(+), 13 deletions(-) diff --git a/test/integration/032_concurrent_transaction_test/test_concurrent_transaction.py b/test/integration/032_concurrent_transaction_test/test_concurrent_transaction.py index 25ec97c3812..d9cce7292eb 100644 --- a/test/integration/032_concurrent_transaction_test/test_concurrent_transaction.py +++ b/test/integration/032_concurrent_transaction_test/test_concurrent_transaction.py @@ -51,8 +51,6 @@ def run_select_and_check(self, rel, sql): self.query_state[rel] = 'bad' except Exception as e: - logger.info("Caught exception: {}".format(e)) - traceback.print_exc() if 'concurrent transaction' in str(e): self.query_state[rel] = 'error: {}'.format(e) else: diff --git a/test/integration/base.py b/test/integration/base.py index 516b65f0800..c7c67edde2b 100644 --- a/test/integration/base.py +++ b/test/integration/base.py @@ -129,8 +129,7 @@ def redshift_profile(self): 'user': os.getenv('REDSHIFT_TEST_USER'), 'pass': os.getenv('REDSHIFT_TEST_PASS'), 'dbname': os.getenv('REDSHIFT_TEST_DBNAME'), - 'schema': self.unique_schema(), - 'keepalives_idle': 5 + 'schema': self.unique_schema() } }, 'target': 'default2' @@ -535,14 +534,10 @@ def run_sql_presto(self, sql, fetch, conn): conn.handle.commit() conn.transaction_open = False - def run_sql_common(self, sql, fetch, conn, verbose=False): + def run_sql_common(self, sql, fetch, conn): with conn.handle.cursor() as cursor: try: - if verbose: - logger.debug('running sql: {}'.format(sql)) cursor.execute(sql) - if verbose: - logger.debug('result from sql: {}'.format(cursor.statusmessage)) conn.handle.commit() if fetch == 'one': return cursor.fetchone() @@ -551,9 +546,9 @@ def run_sql_common(self, sql, fetch, conn, verbose=False): else: return except BaseException as e: + conn.handle.rollback() print(sql) print(e) - conn.handle.rollback() raise e finally: conn.transaction_open = False @@ -1020,9 +1015,7 @@ def _assertTableColumnsEqual(self, relation_a, relation_b): text_types = {'text', 'character varying', 'character', 'varchar'} - self.assertEqual(len(table_a_result), len(table_b_result), - "{} vs. {}".format(table_a_result, table_b_result)) - + self.assertEqual(len(table_a_result), len(table_b_result)) for a_column, b_column in zip(table_a_result, table_b_result): a_name, a_type, a_size = a_column b_name, b_type, b_size = b_column From 69621fe6f9c1b1d0b21df1572392fe4e061013e3 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Thu, 30 May 2019 12:21:42 -0400 Subject: [PATCH 4/6] cleanup tests --- .../004_simple_archive_test/test-archives-select/archives.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/integration/004_simple_archive_test/test-archives-select/archives.sql b/test/integration/004_simple_archive_test/test-archives-select/archives.sql index 7250715e379..30e78fe720d 100644 --- a/test/integration/004_simple_archive_test/test-archives-select/archives.sql +++ b/test/integration/004_simple_archive_test/test-archives-select/archives.sql @@ -40,6 +40,6 @@ updated_at='updated_at', ) }} - select * from {{target.database}}.{{schema}}.seed where last_name = 'Kelly' + select * from {{database}}.{{schema}}.seed where last_name = 'Kelly' {% endarchive %} From 94ae9fd4a7904f288895e90faff7db9b80c9b283 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Thu, 30 May 2019 13:08:27 -0400 Subject: [PATCH 5/6] fix test --- test/integration/004_simple_archive_test/test_simple_archive.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/integration/004_simple_archive_test/test_simple_archive.py b/test/integration/004_simple_archive_test/test_simple_archive.py index 4786352439b..b58e15ece78 100644 --- a/test/integration/004_simple_archive_test/test_simple_archive.py +++ b/test/integration/004_simple_archive_test/test_simple_archive.py @@ -397,7 +397,7 @@ def run_archive(self): return self.run_dbt(['archive', '--vars', '{{"target_schema": {}}}'.format(self.target_schema())]) @use_profile('postgres') - def test__postgres_ref_archive_cross_schema(self): + def test__postgres__simple_archive(self): self.run_sql_file('test/integration/004_simple_archive_test/seed_pg.sql') results = self.run_archive() From 82793a02d3d8c987b26a10577e41e2f8ff975b62 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Sat, 1 Jun 2019 11:40:14 -0400 Subject: [PATCH 6/6] fix for tests in different logical databases --- .../test-archives-bq/archive.sql | 2 +- .../test-archives-invalid/archive.sql | 2 +- .../test-archives-longtext/longtext.sql | 2 +- .../test-archives-select/archives.sql | 6 +++--- .../test-check-col-archives-bq/archive.sql | 4 ++-- .../test-check-col-archives/archive.sql | 4 ++-- .../004_simple_archive_test/test_simple_archive.py | 14 ++++++++++++-- 7 files changed, 22 insertions(+), 12 deletions(-) diff --git a/test/integration/004_simple_archive_test/test-archives-bq/archive.sql b/test/integration/004_simple_archive_test/test-archives-bq/archive.sql index d7dec9d043e..aff119c410b 100644 --- a/test/integration/004_simple_archive_test/test-archives-bq/archive.sql +++ b/test/integration/004_simple_archive_test/test-archives-bq/archive.sql @@ -9,6 +9,6 @@ updated_at='updated_at', ) }} - select * from `{{database}}`.`{{schema}}`.seed + select * from `{{target.database}}`.`{{schema}}`.seed {% endarchive %} diff --git a/test/integration/004_simple_archive_test/test-archives-invalid/archive.sql b/test/integration/004_simple_archive_test/test-archives-invalid/archive.sql index 3bbe49664c1..35340368781 100644 --- a/test/integration/004_simple_archive_test/test-archives-invalid/archive.sql +++ b/test/integration/004_simple_archive_test/test-archives-invalid/archive.sql @@ -7,6 +7,6 @@ updated_at='updated_at', ) }} - select * from {{database}}.{{schema}}.seed + select * from {{target.database}}.{{schema}}.seed {% endarchive %} diff --git a/test/integration/004_simple_archive_test/test-archives-longtext/longtext.sql b/test/integration/004_simple_archive_test/test-archives-longtext/longtext.sql index d8c671cb1b9..c16ce784889 100644 --- a/test/integration/004_simple_archive_test/test-archives-longtext/longtext.sql +++ b/test/integration/004_simple_archive_test/test-archives-longtext/longtext.sql @@ -8,5 +8,5 @@ updated_at='updated_at', ) }} - select * from {{database}}.{{schema}}.super_long + select * from {{target.database}}.{{schema}}.super_long {% endarchive %} diff --git a/test/integration/004_simple_archive_test/test-archives-select/archives.sql b/test/integration/004_simple_archive_test/test-archives-select/archives.sql index 30e78fe720d..562ec89b3ce 100644 --- a/test/integration/004_simple_archive_test/test-archives-select/archives.sql +++ b/test/integration/004_simple_archive_test/test-archives-select/archives.sql @@ -9,7 +9,7 @@ updated_at='updated_at', ) }} - select * from {{database}}.{{schema}}.seed where last_name = 'Castillo' + select * from {{target.database}}.{{schema}}.seed where last_name = 'Castillo' {% endarchive %} @@ -24,7 +24,7 @@ updated_at='updated_at', ) }} - select * from {{database}}.{{schema}}.seed where last_name = 'Alvarez' + select * from {{target.database}}.{{schema}}.seed where last_name = 'Alvarez' {% endarchive %} @@ -40,6 +40,6 @@ updated_at='updated_at', ) }} - select * from {{database}}.{{schema}}.seed where last_name = 'Kelly' + select * from {{target.database}}.{{schema}}.seed where last_name = 'Kelly' {% endarchive %} diff --git a/test/integration/004_simple_archive_test/test-check-col-archives-bq/archive.sql b/test/integration/004_simple_archive_test/test-check-col-archives-bq/archive.sql index 40a2563291f..50eece23b5f 100644 --- a/test/integration/004_simple_archive_test/test-check-col-archives-bq/archive.sql +++ b/test/integration/004_simple_archive_test/test-check-col-archives-bq/archive.sql @@ -8,7 +8,7 @@ check_cols=('email',), ) }} - select * from `{{database}}`.`{{schema}}`.seed + select * from `{{target.database}}`.`{{schema}}`.seed {% endarchive %} @@ -23,5 +23,5 @@ check_cols='all', ) }} - select * from `{{database}}`.`{{schema}}`.seed + select * from `{{target.database}}`.`{{schema}}`.seed {% endarchive %} diff --git a/test/integration/004_simple_archive_test/test-check-col-archives/archive.sql b/test/integration/004_simple_archive_test/test-check-col-archives/archive.sql index c3ee6fe2038..314b227634a 100644 --- a/test/integration/004_simple_archive_test/test-check-col-archives/archive.sql +++ b/test/integration/004_simple_archive_test/test-check-col-archives/archive.sql @@ -9,7 +9,7 @@ check_cols=['email'], ) }} - select * from {{database}}.{{schema}}.seed + select * from {{target.database}}.{{schema}}.seed {% endarchive %} @@ -24,5 +24,5 @@ check_cols='all', ) }} - select * from {{database}}.{{schema}}.seed + select * from {{target.database}}.{{schema}}.seed {% endarchive %} diff --git a/test/integration/004_simple_archive_test/test_simple_archive.py b/test/integration/004_simple_archive_test/test_simple_archive.py index b58e15ece78..b6e92a7fa40 100644 --- a/test/integration/004_simple_archive_test/test_simple_archive.py +++ b/test/integration/004_simple_archive_test/test_simple_archive.py @@ -382,7 +382,17 @@ def run_archive(self): return self.run_dbt(['archive', '--vars', '{{"target_database": {}}}'.format(self.alternative_database)]) -class TestCrossSchemaArchiveFiles(TestSimpleArchive): +class TestCrossSchemaArchiveFiles(DBTIntegrationTest): + NUM_ARCHIVE_MODELS = 1 + + @property + def schema(self): + return "simple_archive_004" + + @property + def models(self): + return "test/integration/004_simple_archive_test/models" + @property def project_config(self): paths = ['test/integration/004_simple_archive_test/test-archives-pg'] @@ -397,7 +407,7 @@ def run_archive(self): return self.run_dbt(['archive', '--vars', '{{"target_schema": {}}}'.format(self.target_schema())]) @use_profile('postgres') - def test__postgres__simple_archive(self): + def test__postgres__cross_schema_archive(self): self.run_sql_file('test/integration/004_simple_archive_test/seed_pg.sql') results = self.run_archive()