Skip to content

Commit

Permalink
Implmement check_cols
Browse files Browse the repository at this point in the history
Contracts: some anyOf shenanigans to add support for check_cols
Macros: split apart archive selection, probably too much copy+paste
Legacy: Archive configs now include a "timestamp" strategy when parsed from dbt_project.yml
Add integration tests
fix aliases test
Unquote columns in archives
handle null columns
attr -> use_profile
  • Loading branch information
Jacob Beck committed Apr 23, 2019
1 parent d66584f commit 416cc72
Show file tree
Hide file tree
Showing 19 changed files with 840 additions and 534 deletions.
2 changes: 1 addition & 1 deletion core/dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ def valid_archive_target(self, relation):
expected_type=self.Relation)

columns = self.get_columns_in_relation(relation)
names = set(c.name for c in columns)
names = set(c.name.lower() for c in columns)
expanded_keys = ('scd_id', 'valid_from', 'valid_to')
extra = []
missing = []
Expand Down
41 changes: 38 additions & 3 deletions core/dbt/contracts/graph/parsed.py
Original file line number Diff line number Diff line change
Expand Up @@ -452,9 +452,44 @@ def config(self, value):
'unique_key': {
'type': 'string',
},
'strategy': {
'enum': ['timestamp'],
},
'anyOf': [
{
'properties': {
'strategy': {
'enum': ['timestamp'],
},
'updated_at': {
'type': 'string',
'description': (
'The column name with the timestamp to compare'
),
},
},
'required': ['updated_at'],
},
{
'properties': {
'strategy': {
'enum': ['check'],
},
'check_cols': {
'oneOf': [
{
'type': 'array',
'items': {'type': 'string'},
'description': 'The columns to check',
'minLength': 1,
},
{
'enum': ['all'],
'description': 'Check all columns',
},
],
},
},
'required': ['check_cols'],
}
]
},
'required': [
'target_database', 'target_schema', 'unique_key', 'strategy',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
Create SCD Hash SQL fields cross-db
#}

{% macro archive_scd_hash() %}
{{ adapter_macro('archive_scd_hash') }}
{% macro archive_hash_arguments(args) %}
{{ adapter_macro('archive_hash_arguments', args) }}
{% endmacro %}

{% macro default__archive_scd_hash() %}
md5("dbt_pk" || '|' || "dbt_updated_at")
{% macro default__archive_hash_arguments(args) %}
md5({% for arg in args %}coalesce(cast({{ arg }} as varchar ), '') {% if not loop.last %} || '|' || {% endif %}{% endfor %})
{% endmacro %}

{% macro create_temporary_table(sql, relation) %}
Expand Down Expand Up @@ -48,46 +48,74 @@

{% macro default__archive_update(target_relation, tmp_relation) %}
update {{ target_relation }}
set {{ adapter.quote('dbt_valid_to') }} = tmp.{{ adapter.quote('dbt_valid_to') }}
set dbt_valid_to = tmp.dbt_valid_to
from {{ tmp_relation }} as tmp
where tmp.{{ adapter.quote('dbt_scd_id') }} = {{ target_relation }}.{{ adapter.quote('dbt_scd_id') }}
and {{ adapter.quote('change_type') }} = 'update';
where tmp.dbt_scd_id = {{ target_relation }}.dbt_scd_id
and change_type = 'update';
{% endmacro %}


{#
Cross-db compatible archival implementation
#}
{% macro archive_select(source_sql, target_relation, source_columns, unique_key, updated_at) %}
{% macro archive_get_time() -%}
{{ adapter_macro('archive_get_time') }}
{%- endmacro %}

{% set timestamp_column = api.Column.create('_', 'timestamp') %}
{% macro default__archive_get_time() -%}
{{ current_timestamp() }}
{%- endmacro %}

{% macro snowflake__archive_get_time() -%}
to_timestamp_ntz({{ current_timestamp() }})
{%- endmacro %}


{% macro archive_select_generic(source_sql, target_relation, transforms, scd_hash) -%}
with source as (
{{ source_sql }}
),
{{ transforms }}
merged as (

select *, 'update' as change_type from updates
union all
select *, 'insert' as change_type from insertions

)

select *,
{{ scd_hash }} as dbt_scd_id
from merged

{%- endmacro %}

{#
Cross-db compatible archival implementation
#}
{% macro archive_select_timestamp(source_sql, target_relation, source_columns, unique_key, updated_at) -%}
{% set timestamp_column = api.Column.create('_', 'timestamp') %}
{% set transforms -%}
current_data as (

select
{% for col in source_columns %}
{{ adapter.quote(col.name) }} {% if not loop.last %},{% endif %}
{{ col.name }} {% if not loop.last %},{% endif %}
{% endfor %},
{{ updated_at }} as {{ adapter.quote('dbt_updated_at') }},
{{ unique_key }} as {{ adapter.quote('dbt_pk') }},
{{ updated_at }} as {{ adapter.quote('dbt_valid_from') }},
{{ timestamp_column.literal('null') }} as {{ adapter.quote('tmp_valid_to') }}
{{ updated_at }} as dbt_updated_at,
{{ unique_key }} as dbt_pk,
{{ updated_at }} as dbt_valid_from,
{{ timestamp_column.literal('null') }} as tmp_valid_to
from source
),

archived_data as (

select
{% for col in source_columns %}
{{ adapter.quote(col.name) }},
{{ col.name }},
{% endfor %}
{{ updated_at }} as {{ adapter.quote('dbt_updated_at') }},
{{ unique_key }} as {{ adapter.quote('dbt_pk') }},
{{ adapter.quote('dbt_valid_from') }},
{{ adapter.quote('dbt_valid_to') }} as {{ adapter.quote('tmp_valid_to') }}
{{ updated_at }} as dbt_updated_at,
{{ unique_key }} as dbt_pk,
dbt_valid_from,
dbt_valid_to as tmp_valid_to
from {{ target_relation }}

),
Expand All @@ -96,44 +124,110 @@

select
current_data.*,
{{ timestamp_column.literal('null') }} as {{ adapter.quote('dbt_valid_to') }}
{{ timestamp_column.literal('null') }} as dbt_valid_to
from current_data
left outer join archived_data
on archived_data.{{ adapter.quote('dbt_pk') }} = current_data.{{ adapter.quote('dbt_pk') }}
where archived_data.{{ adapter.quote('dbt_pk') }} is null or (
archived_data.{{ adapter.quote('dbt_pk') }} is not null and
current_data.{{ adapter.quote('dbt_updated_at') }} > archived_data.{{ adapter.quote('dbt_updated_at') }} and
archived_data.{{ adapter.quote('tmp_valid_to') }} is null
on archived_data.dbt_pk = current_data.dbt_pk
where
archived_data.dbt_pk is null
or (
archived_data.dbt_pk is not null
and archived_data.dbt_updated_at < current_data.dbt_updated_at
and archived_data.tmp_valid_to is null
)
),

updates as (

select
archived_data.*,
current_data.{{ adapter.quote('dbt_updated_at') }} as {{ adapter.quote('dbt_valid_to') }}
current_data.dbt_updated_at as dbt_valid_to
from current_data
left outer join archived_data
on archived_data.{{ adapter.quote('dbt_pk') }} = current_data.{{ adapter.quote('dbt_pk') }}
where archived_data.{{ adapter.quote('dbt_pk') }} is not null
and archived_data.{{ adapter.quote('dbt_updated_at') }} < current_data.{{ adapter.quote('dbt_updated_at') }}
and archived_data.{{ adapter.quote('tmp_valid_to') }} is null
on archived_data.dbt_pk = current_data.dbt_pk
where archived_data.dbt_pk is not null
and archived_data.dbt_updated_at < current_data.dbt_updated_at
and archived_data.tmp_valid_to is null
),
{%- endset %}
{%- set scd_hash = archive_hash_arguments(['dbt_pk', 'dbt_updated_at']) -%}
{{ archive_select_generic(source_sql, target_relation, transforms, scd_hash) }}
{%- endmacro %}


{% macro archive_select_check_cols(source_sql, target_relation, source_columns, unique_key, check_cols) -%}
{%- set timestamp_column = api.Column.create('_', 'timestamp') -%}

{# if we recognize the primary key, it's the newest record, and anything we care about has changed, it's an update candidate #}
{%- set update_candidate -%}
archived_data.dbt_pk is not null
and (
{%- for col in check_cols %}
current_data.{{ col }} <> archived_data.{{ col }}
{%- if not loop.last %} or {% endif %}
{% endfor -%}
)
and archived_data.tmp_valid_to is null
{%- endset %}

merged as (
{% set transforms -%}
current_data as (

select *, 'update' as {{ adapter.quote('change_type') }} from updates
union all
select *, 'insert' as {{ adapter.quote('change_type') }} from insertions
select
{% for col in source_columns %}
{{ col.name }} {% if not loop.last %},{% endif %}
{% endfor %},
{{ archive_get_time() }} as dbt_updated_at,
{{ unique_key }} as dbt_pk,
{{ archive_get_time() }} as dbt_valid_from,
{{ timestamp_column.literal('null') }} as tmp_valid_to
from source
),

)
archived_data as (

select *,
{{ archive_scd_hash() }} as {{ adapter.quote('dbt_scd_id') }}
from merged
select
{% for col in source_columns %}
{{ col.name }},
{% endfor %}
dbt_updated_at,
{{ unique_key }} as dbt_pk,
dbt_valid_from,
dbt_valid_to as tmp_valid_to
from {{ target_relation }}

{% endmacro %}
),

insertions as (

select
current_data.*,
{{ timestamp_column.literal('null') }} as dbt_valid_to
from current_data
left outer join archived_data
on archived_data.dbt_pk = current_data.dbt_pk
where
archived_data.dbt_pk is null
or ( {{ update_candidate }} )
),

updates as (

select
archived_data.*,
{{ archive_get_time() }} as dbt_valid_to
from current_data
left outer join archived_data
on archived_data.dbt_pk = current_data.dbt_pk
where {{ update_candidate }}
),
{%- endset %}

{%- set hash_components = ['dbt_pk'] %}
{%- do hash_components.extend(check_cols) -%}
{%- set scd_hash = archive_hash_arguments(hash_components) -%}
{{ archive_select_generic(source_sql, target_relation, transforms, scd_hash) }}
{%- endmacro %}

{# this is gross #}
{% macro create_empty_table_as(sql) %}
Expand All @@ -157,6 +251,7 @@
{%- set target_database = config.get('target_database') -%}
{%- set target_schema = config.get('target_schema') -%}
{%- set target_table = model.get('alias', model.get('name')) -%}
{%- set strategy = config.get('strategy') -%}

{{ create_schema(target_database, target_schema) }}

Expand All @@ -179,15 +274,13 @@
{%- set source_columns = adapter.get_columns_in_relation(source_info_model) -%}

{%- set unique_key = config.get('unique_key') -%}
{%- set updated_at = config.get('updated_at') -%}
{%- set dest_columns = source_columns + [
api.Column.create('dbt_valid_from', 'timestamp'),
api.Column.create('dbt_valid_to', 'timestamp'),
api.Column.create('dbt_scd_id', 'string'),
api.Column.create('dbt_updated_at', 'timestamp'),
] -%}


{% call statement() %}
{{ create_archive_table(target_relation, dest_columns) }}
{% endcall %}
Expand All @@ -204,7 +297,19 @@
{% set tmp_table_sql -%}

with dbt_archive_sbq as (
{{ archive_select(model['injected_sql'], target_relation, source_columns, unique_key, updated_at) }}

{% if strategy == 'timestamp' %}
{%- set updated_at = config.get('updated_at') -%}
{{ archive_select_timestamp(model['injected_sql'], target_relation, source_columns, unique_key, updated_at) }}
{% elif strategy == 'check' %}
{%- set check_cols = config.get('check_cols') -%}
{% if check_cols == 'all' %}
{% set check_cols = source_columns | map(attribute='name') | list %}
{% endif %}
{{ archive_select_check_cols(model['injected_sql'], target_relation, source_columns, unique_key, check_cols)}}
{% else %}
{{ exceptions.raise_compiler_error('Got invalid strategy "{}"'.format(strategy)) }}
{% endif %}
)
select * from dbt_archive_sbq

Expand All @@ -226,7 +331,7 @@
{{ column_list(dest_columns) }}
)
select {{ column_list(dest_columns) }} from {{ tmp_relation }}
where {{ adapter.quote('change_type') }} = 'insert';
where change_type = 'insert';
{% endcall %}

{{ adapter.commit() }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@

{% macro column_list(columns) %}
{%- for col in columns %}
{{ adapter.quote(col.name) }} {% if not loop.last %},{% endif %}
{{ col.name }} {% if not loop.last %},{% endif %}
{% endfor -%}
{% endmacro %}


{% macro column_list_for_create_table(columns) %}
{%- for col in columns %}
{{ adapter.quote(col.name) }} {{ col.data_type }} {%- if not loop.last %},{% endif %}
{{ col.name }} {{ col.data_type }} {%- if not loop.last %},{% endif %}
{% endfor -%}
{% endmacro %}

Expand Down
2 changes: 2 additions & 0 deletions core/dbt/parser/archives.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ def parse_archives_from_project(cls, config):

source_schema = archive_config['source_schema']
cfg['target_schema'] = archive_config.get('target_schema')
# project-defined archives always use the 'timestamp' strategy.
cfg['strategy'] = 'timestamp'

fake_path = [cfg['target_database'], cfg['target_schema'],
cfg['target_table']]
Expand Down
Loading

0 comments on commit 416cc72

Please sign in to comment.