Skip to content

Commit

Permalink
Unquote columns in archives
Browse files Browse the repository at this point in the history
  • Loading branch information
Jacob Beck committed Apr 12, 2019
1 parent 1eb5003 commit ff94d87
Show file tree
Hide file tree
Showing 15 changed files with 497 additions and 511 deletions.
2 changes: 1 addition & 1 deletion core/dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ def valid_archive_target(self, relation):
expected_type=self.Relation)

columns = self.get_columns_in_relation(relation)
names = set(c.name for c in columns)
names = set(c.name.lower() for c in columns)
expanded_keys = ('scd_id', 'valid_from', 'valid_to')
extra = []
missing = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
{% endmacro %}

{% macro default__archive_hash_arguments(args) %}
md5({% for arg in args %}{{ adapter.quote(arg) }}{% if not loop.last %} || '|' || {% endif %}{% endfor %})
md5({% for arg in args %}{{ arg }}{% if not loop.last %} || '|' || {% endif %}{% endfor %})
{% endmacro %}

{% macro create_temporary_table(sql, relation) %}
Expand Down Expand Up @@ -48,10 +48,10 @@

{% macro default__archive_update(target_relation, tmp_relation) %}
update {{ target_relation }}
set {{ adapter.quote('dbt_valid_to') }} = tmp.{{ adapter.quote('dbt_valid_to') }}
set dbt_valid_to = tmp.dbt_valid_to
from {{ tmp_relation }} as tmp
where tmp.{{ adapter.quote('dbt_scd_id') }} = {{ target_relation }}.{{ adapter.quote('dbt_scd_id') }}
and {{ adapter.quote('change_type') }} = 'update';
where tmp.dbt_scd_id = {{ target_relation }}.dbt_scd_id
and change_type = 'update';
{% endmacro %}


Expand All @@ -67,20 +67,6 @@
to_timestamp_ntz({{ current_timestamp() }})
{%- endmacro %}

{#
{% macro archive_current_timestamp() -%}
{{ adapter_macro('archive_current_timestamp') }}
{%- endmacro %}

{% macro default__archive_current_timestamp() -%}
{{ current_timestamp() }}
{%- endmacro %}

{% macro snowflake__archive_current_timestamp() -%}
TO_TIMESTAMP_NTZ(archive_current_timestamp())
{%- endmacro %}
#}


{% macro archive_select_generic(source_sql, target_relation, transforms, scd_hash) -%}
with source as (
Expand All @@ -89,14 +75,14 @@
{{ transforms }}
merged as (

select *, 'update' as {{ adapter.quote('change_type') }} from updates
select *, 'update' as change_type from updates
union all
select *, 'insert' as {{ adapter.quote('change_type') }} from insertions
select *, 'insert' as change_type from insertions

)

select *,
{{ scd_hash }} as {{ adapter.quote('dbt_scd_id') }}
{{ scd_hash }} as dbt_scd_id
from merged

{%- endmacro %}
Expand All @@ -111,25 +97,25 @@

select
{% for col in source_columns %}
{{ adapter.quote(col.name) }} {% if not loop.last %},{% endif %}
{{ col.name }} {% if not loop.last %},{% endif %}
{% endfor %},
{{ updated_at }} as {{ adapter.quote('dbt_updated_at') }},
{{ unique_key }} as {{ adapter.quote('dbt_pk') }},
{{ updated_at }} as {{ adapter.quote('dbt_valid_from') }},
{{ timestamp_column.literal('null') }} as {{ adapter.quote('tmp_valid_to') }}
{{ updated_at }} as dbt_updated_at,
{{ unique_key }} as dbt_pk,
{{ updated_at }} as dbt_valid_from,
{{ timestamp_column.literal('null') }} as tmp_valid_to
from source
),

archived_data as (

select
{% for col in source_columns %}
{{ adapter.quote(col.name) }},
{{ col.name }},
{% endfor %}
{{ updated_at }} as {{ adapter.quote('dbt_updated_at') }},
{{ unique_key }} as {{ adapter.quote('dbt_pk') }},
{{ adapter.quote('dbt_valid_from') }},
{{ adapter.quote('dbt_valid_to') }} as {{ adapter.quote('tmp_valid_to') }}
{{ updated_at }} as dbt_updated_at,
{{ unique_key }} as dbt_pk,
dbt_valid_from,
dbt_valid_to as tmp_valid_to
from {{ target_relation }}

),
Expand All @@ -138,30 +124,30 @@

select
current_data.*,
{{ timestamp_column.literal('null') }} as {{ adapter.quote('dbt_valid_to') }}
{{ timestamp_column.literal('null') }} as dbt_valid_to
from current_data
left outer join archived_data
on archived_data.{{ adapter.quote('dbt_pk') }} = current_data.{{ adapter.quote('dbt_pk') }}
on archived_data.dbt_pk = current_data.dbt_pk
where
archived_data.{{ adapter.quote('dbt_pk') }} is null
archived_data.dbt_pk is null
or (
archived_data.{{ adapter.quote('dbt_pk') }} is not null
and archived_data.{{ adapter.quote('dbt_updated_at') }} < current_data.{{ adapter.quote('dbt_updated_at') }}
and archived_data.{{ adapter.quote('tmp_valid_to') }} is null
archived_data.dbt_pk is not null
and archived_data.dbt_updated_at < current_data.dbt_updated_at
and archived_data.tmp_valid_to is null
)
),

updates as (

select
archived_data.*,
current_data.{{ adapter.quote('dbt_updated_at') }} as {{ adapter.quote('dbt_valid_to') }}
current_data.dbt_updated_at as dbt_valid_to
from current_data
left outer join archived_data
on archived_data.{{ adapter.quote('dbt_pk') }} = current_data.{{ adapter.quote('dbt_pk') }}
where archived_data.{{ adapter.quote('dbt_pk') }} is not null
and archived_data.{{ adapter.quote('dbt_updated_at') }} < current_data.{{ adapter.quote('dbt_updated_at') }}
and archived_data.{{ adapter.quote('tmp_valid_to') }} is null
on archived_data.dbt_pk = current_data.dbt_pk
where archived_data.dbt_pk is not null
and archived_data.dbt_updated_at < current_data.dbt_updated_at
and archived_data.tmp_valid_to is null
),
{%- endset %}
{%- set scd_hash = archive_hash_arguments(['dbt_pk', 'dbt_updated_at']) -%}
Expand All @@ -174,40 +160,40 @@

{# if we recognize the primary key, it's the newest record, and anything we care about has changed, it's an update candidate #}
{%- set update_candidate -%}
archived_data.{{ adapter.quote('dbt_pk') }} is not null
archived_data.dbt_pk is not null
and (
{%- for col in check_cols %}
current_data.{{ adapter.quote(col) }} <> archived_data.{{ adapter.quote(col) }}
current_data.{{ col }} <> archived_data.{{ col }}
{%- if not loop.last %} or {% endif %}
{% endfor -%}
)
and archived_data.{{ adapter.quote('tmp_valid_to') }} is null
and archived_data.tmp_valid_to is null
{%- endset %}

{% set transforms -%}
current_data as (

select
{% for col in source_columns %}
{{ adapter.quote(col.name) }} {% if not loop.last %},{% endif %}
{{ col.name }} {% if not loop.last %},{% endif %}
{% endfor %},
{{ archive_get_time() }} as {{ adapter.quote('dbt_updated_at') }},
{{ unique_key }} as {{ adapter.quote('dbt_pk') }},
{{ archive_get_time() }} as {{ adapter.quote('dbt_valid_from') }},
{{ timestamp_column.literal('null') }} as {{ adapter.quote('tmp_valid_to') }}
{{ archive_get_time() }} as dbt_updated_at,
{{ unique_key }} as dbt_pk,
{{ archive_get_time() }} as dbt_valid_from,
{{ timestamp_column.literal('null') }} as tmp_valid_to
from source
),

archived_data as (

select
{% for col in source_columns %}
{{ adapter.quote(col.name) }},
{{ col.name }},
{% endfor %}
{{ adapter.quote('dbt_updated_at') }},
{{ unique_key }} as {{ adapter.quote('dbt_pk') }},
{{ adapter.quote('dbt_valid_from') }},
{{ adapter.quote('dbt_valid_to') }} as {{ adapter.quote('tmp_valid_to') }}
dbt_updated_at,
{{ unique_key }} as dbt_pk,
dbt_valid_from,
dbt_valid_to as tmp_valid_to
from {{ target_relation }}

),
Expand All @@ -216,23 +202,23 @@

select
current_data.*,
{{ timestamp_column.literal('null') }} as {{ adapter.quote('dbt_valid_to') }}
{{ timestamp_column.literal('null') }} as dbt_valid_to
from current_data
left outer join archived_data
on archived_data.{{ adapter.quote('dbt_pk') }} = current_data.{{ adapter.quote('dbt_pk') }}
on archived_data.dbt_pk = current_data.dbt_pk
where
archived_data.{{ adapter.quote('dbt_pk') }} is null
archived_data.dbt_pk is null
or ( {{ update_candidate }} )
),

updates as (

select
archived_data.*,
{{ archive_get_time() }} as {{ adapter.quote('dbt_valid_to') }}
{{ archive_get_time() }} as dbt_valid_to
from current_data
left outer join archived_data
on archived_data.{{ adapter.quote('dbt_pk') }} = current_data.{{ adapter.quote('dbt_pk') }}
on archived_data.dbt_pk = current_data.dbt_pk
where {{ update_candidate }}
),
{%- endset %}
Expand Down Expand Up @@ -345,7 +331,7 @@
{{ column_list(dest_columns) }}
)
select {{ column_list(dest_columns) }} from {{ tmp_relation }}
where {{ adapter.quote('change_type') }} = 'insert';
where change_type = 'insert';
{% endcall %}

{{ adapter.commit() }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@

{% macro column_list(columns) %}
{%- for col in columns %}
{{ adapter.quote(col.name) }} {% if not loop.last %},{% endif %}
{{ col.name }} {% if not loop.last %},{% endif %}
{% endfor -%}
{% endmacro %}


{% macro column_list_for_create_table(columns) %}
{%- for col in columns %}
{{ adapter.quote(col.name) }} {{ col.data_type }} {%- if not loop.last %},{% endif %}
{{ col.name }} {{ col.data_type }} {%- if not loop.last %},{% endif %}
{% endfor -%}
{% endmacro %}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@


{% macro bigquery__archive_hash_arguments(args) %}
to_hex(md5(concat({% for arg in args %}cast({{ adapter.quote(arg) }} as string){% if not loop.last %}, '|',{% endif %}{% endfor %})))
to_hex(md5(concat({% for arg in args %}cast({{ arg }} as string){% if not loop.last %}, '|',{% endif %}{% endfor %})))
{% endmacro %}

{% macro bigquery__create_columns(relation, columns) %}
Expand All @@ -15,8 +15,8 @@

{% macro bigquery__archive_update(target_relation, tmp_relation) %}
update {{ target_relation }} as dest
set dest.{{ adapter.quote('dbt_valid_to') }} = tmp.{{ adapter.quote('dbt_valid_to') }}
set dest.dbt_valid_to = tmp.dbt_valid_to
from {{ tmp_relation }} as tmp
where tmp.{{ adapter.quote('dbt_scd_id') }} = dest.{{ adapter.quote('dbt_scd_id') }}
and {{ adapter.quote('change_type') }} = 'update';
where tmp.dbt_scd_id = dest.dbt_scd_id
and change_type = 'update';
{% endmacro %}
22 changes: 11 additions & 11 deletions test/integration/004_simple_archive_test/invalidate_postgres.sql
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@

-- update records 11 - 21. Change email and updated_at field
update {schema}.seed set
"updated_at" = "updated_at" + interval '1 hour',
"email" = 'new_' || "email"
where "id" >= 10 and "id" <= 20;
updated_at = updated_at + interval '1 hour',
email = 'new_' || email
where id >= 10 and id <= 20;


-- invalidate records 11 - 21
update {schema}.archive_expected set
"dbt_valid_to" = "updated_at" + interval '1 hour'
where "id" >= 10 and "id" <= 20;
dbt_valid_to = updated_at + interval '1 hour'
where id >= 10 and id <= 20;


update {schema}.archive_castillo_expected set
"dbt_valid_to" = "updated_at" + interval '1 hour'
where "id" >= 10 and "id" <= 20;
dbt_valid_to = updated_at + interval '1 hour'
where id >= 10 and id <= 20;


update {schema}.archive_alvarez_expected set
"dbt_valid_to" = "updated_at" + interval '1 hour'
where "id" >= 10 and "id" <= 20;
dbt_valid_to = updated_at + interval '1 hour'
where id >= 10 and id <= 20;


update {schema}.archive_kelly_expected set
"dbt_valid_to" = "updated_at" + interval '1 hour'
where "id" >= 10 and "id" <= 20;
dbt_valid_to = updated_at + interval '1 hour'
where id >= 10 and id <= 20;
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@

-- update records 11 - 21. Change email and updated_at field
update {database}.{schema}.seed set
"updated_at" = DATEADD(hour, 1, "updated_at"),
"email" = 'new_' || "email"
where "id" >= 10 and "id" <= 20;
updated_at = DATEADD(hour, 1, updated_at),
email = 'new_' || email
where id >= 10 and id <= 20;


-- invalidate records 11 - 21
update {database}.{schema}.archive_expected set
"dbt_valid_to" = DATEADD(hour, 1, "updated_at")
where "id" >= 10 and "id" <= 20;
dbt_valid_to = DATEADD(hour, 1, updated_at)
where id >= 10 and id <= 20;
Loading

0 comments on commit ff94d87

Please sign in to comment.