Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/snapshot staging is not atomic #2390

Merged
merged 6 commits into from
May 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
- Add support for `sql_header` config in incremental models ([#2136](https://github.com/fishtown-analytics/dbt/issues/2136), [#2200](https://github.com/fishtown-analytics/dbt/pull/2200))
- The ambiguous alias check now examines the node's database value as well as the schema/identifier ([#2326](https://github.com/fishtown-analytics/dbt/issues/2326), [#2387](https://github.com/fishtown-analytics/dbt/pull/2387))
- Postgres array types can now be returned via `run_query` macro calls ([#2337](https://github.com/fishtown-analytics/dbt/issues/2337), [#2376](https://github.com/fishtown-analytics/dbt/pull/2376))
- Fix for non-atomic snapshot staging table creation ([#1884](https://github.com/fishtown-analytics/dbt/issues/1884), [#2390](https://github.com/fishtown-analytics/dbt/pull/2390))
- Fix for snapshot errors when strategy changes from `check` to `timestamp` between runs ([#2350](https://github.com/fishtown-analytics/dbt/issues/2350), [#2391](https://github.com/fishtown-analytics/dbt/pull/2391))

### Under the hood
- Added more tests for source inheritance ([#2264](https://github.com/fishtown-analytics/dbt/issues/2264), [#2291](https://github.com/fishtown-analytics/dbt/pull/2291))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
{% endmacro %}


{% macro snapshot_staging_table_inserts(strategy, source_sql, target_relation) -%}
{% macro snapshot_staging_table(strategy, source_sql, target_relation) -%}

with snapshot_query as (

Expand All @@ -40,14 +40,27 @@

),

source_data as (
insertions_source_data as (

select *,
{{ strategy.scd_id }} as dbt_scd_id,
select
*,
{{ strategy.unique_key }} as dbt_unique_key,
{{ strategy.updated_at }} as dbt_updated_at,
{{ strategy.updated_at }} as dbt_valid_from,
nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as dbt_valid_to,
{{ strategy.scd_id }} as dbt_scd_id

from snapshot_query
),

updates_source_data as (

select
*,
{{ strategy.unique_key }} as dbt_unique_key,
{{ strategy.updated_at }} as dbt_updated_at,
{{ strategy.updated_at }} as dbt_valid_from,
nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as dbt_valid_to
{{ strategy.updated_at }} as dbt_valid_to

from snapshot_query
),
Expand All @@ -58,7 +71,7 @@
'insert' as dbt_change_type,
source_data.*

from source_data
from insertions_source_data as source_data
left outer join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
where snapshotted_data.dbt_unique_key is null
or (
Expand All @@ -69,58 +82,25 @@
)
)

)

select * from insertions

{%- endmacro %}


{% macro snapshot_staging_table_updates(strategy, source_sql, target_relation) -%}

with snapshot_query as (

{{ source_sql }}

),

snapshotted_data as (

select *,
{{ strategy.unique_key }} as dbt_unique_key

from {{ target_relation }}

),

source_data as (

select
*,
{{ strategy.scd_id }} as dbt_scd_id,
{{ strategy.unique_key }} as dbt_unique_key,
{{ strategy.updated_at }} as dbt_updated_at,
{{ strategy.updated_at }} as dbt_valid_from

from snapshot_query
),

updates as (

select
'update' as dbt_change_type,
snapshotted_data.dbt_scd_id,
source_data.dbt_valid_from as dbt_valid_to
source_data.*,
snapshotted_data.dbt_scd_id

from source_data
from updates_source_data as source_data
join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
where snapshotted_data.dbt_valid_to is null
and (
{{ strategy.row_changed }}
)

)

select * from insertions
union all
select * from updates

{%- endmacro %}
Expand Down Expand Up @@ -159,18 +139,10 @@
{% macro build_snapshot_staging_table(strategy, sql, target_relation) %}
{% set tmp_relation = make_temp_relation(target_relation) %}

{% set inserts_select = snapshot_staging_table_inserts(strategy, sql, target_relation) %}
{% set updates_select = snapshot_staging_table_updates(strategy, sql, target_relation) %}

{% call statement('build_snapshot_staging_relation_inserts') %}
{{ create_table_as(True, tmp_relation, inserts_select) }}
{% endcall %}
{% set select = snapshot_staging_table(strategy, sql, target_relation) %}

{% call statement('build_snapshot_staging_relation_updates') %}
insert into {{ tmp_relation }} (dbt_change_type, dbt_scd_id, dbt_valid_to)
select dbt_change_type, dbt_scd_id, dbt_valid_to from (
{{ updates_select }}
) dbt_sbq
{% call statement('build_snapshot_staging_relation') %}
{{ create_table_as(True, tmp_relation, select) }}
{% endcall %}

{% do return(tmp_relation) %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,17 @@
{% set primary_key = config['unique_key'] %}
{% set updated_at = config['updated_at'] %}

{#/*
The snapshot relation might not have an {{ updated_at }} value if the
snapshot strategy is changed from `check` to `timestamp`. We
should use a dbt-created column for the comparison in the snapshot
table instead of assuming that the user-supplied {{ updated_at }}
will be present in the historical data.

See https://github.com/fishtown-analytics/dbt/issues/2350
*/ #}
{% set row_changed_expr -%}
({{ snapshotted_rel }}.{{ updated_at }} < {{ current_rel }}.{{ updated_at }})
({{ snapshotted_rel }}.dbt_valid_from < {{ current_rel }}.{{ updated_at }})
{%- endset %}

{% set scd_id_expr = snapshot_hash_arguments([primary_key, updated_at]) %}
Expand Down Expand Up @@ -126,7 +135,7 @@
select {{ snapshot_get_time() }} as snapshot_start
{%- endset %}

{# don't access the column by name, to avoid dealing with casing issues on snowflake #}
{#-- don't access the column by name, to avoid dealing with casing issues on snowflake #}
{%- set now = run_query(select_current_time)[0][0] -%}
{% if now is none or now is undefined -%}
{%- do exceptions.raise_compiler_error('Could not get a snapshot start time from the database') -%}
Expand Down
44 changes: 44 additions & 0 deletions test/integration/004_simple_snapshot_test/models-slow/gen.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@

{{ config(materialized='ephemeral') }}


/*
Generates 50 rows that "appear" to update every
second to a query-er.
1 2020-04-21 20:44:00-04 0
2 2020-04-21 20:43:59-04 59
3 2020-04-21 20:43:58-04 58
4 2020-04-21 20:43:57-04 57
.... 1 second later ....
1 2020-04-21 20:44:01-04 1
2 2020-04-21 20:44:00-04 0
3 2020-04-21 20:43:59-04 59
4 2020-04-21 20:43:58-04 58
This view uses pg_sleep(2) to make queries against
the view take a non-trivial amount of time
Use statement_timestamp() as it changes during a transactions.
If we used now() or current_time or similar, then the timestamp
of the start of the transaction would be returned instead.
*/

with gen as (

select
id,
date_trunc('second', statement_timestamp()) - (interval '1 second' * id) as updated_at

from generate_series(1, 10) id

)

select
id,
updated_at,
extract(seconds from updated_at)::int as seconds

from gen, pg_sleep(2)
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@

{# /*
Given the repro case for the snapshot build, we'd
expect to see both records have color='pink'
in their most recent rows.
*/ #}

with expected as (

select 1 as id, 'pink' as color union all
select 2 as id, 'pink' as color

),

actual as (

select id, color
from {{ ref('my_snapshot') }}
where color = 'pink'
and dbt_valid_to is null

)

select * from expected
except
select * from actual

union all

select * from actual
except
select * from expected
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@

{#
REPRO:
1. Run with check strategy
2. Add a new ts column and run with check strategy
3. Run with timestamp strategy on new ts column

Expect: new entry is added for changed rows in (3)
#}


{% snapshot my_snapshot %}

{#--------------- Configuration ------------ #}

{{ config(
target_schema=schema,
unique_key='id'
) }}

{% if var('strategy') == 'timestamp' %}
{{ config(strategy='timestamp', updated_at='updated_at') }}
{% else %}
{{ config(strategy='check', check_cols=['color']) }}
{% endif %}

{#--------------- Test setup ------------ #}

{% if var('step') == 1 %}

select 1 as id, 'blue' as color
union all
select 2 as id, 'red' as color

{% elif var('step') == 2 %}

-- change id=1 color from blue to green
-- id=2 is unchanged when using the check strategy
select 1 as id, 'green' as color, '2020-01-01'::date as updated_at
union all
select 2 as id, 'red' as color, '2020-01-01'::date as updated_at

{% elif var('step') == 3 %}

-- bump timestamp for both records. Expect that after this runs
-- using the timestamp strategy, both ids should have the color
-- 'pink' in the database. This should be in the future b/c we're
-- going to compare to the check timestamp, which will be _now_
select 1 as id, 'pink' as color, (now() + interval '1 day')::date as updated_at
union all
select 2 as id, 'pink' as color, (now() + interval '1 day')::date as updated_at

{% endif %}

{% endsnapshot %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@

/*
Assert that the dbt_valid_from of the latest record
is equal to the dbt_valid_to of the previous record
*/

with snapshot as (

select * from {{ ref('my_slow_snapshot') }}

)

select
snap1.id,
snap1.dbt_valid_from as new_valid_from,
snap2.dbt_valid_from as old_valid_from,
snap2.dbt_valid_to as old_valid_to

from snapshot as snap1
join snapshot as snap2 on snap1.id = snap2.id
where snap1.dbt_valid_to is null
and snap2.dbt_valid_to is not null
and snap1.dbt_valid_from != snap2.dbt_valid_to
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@

{% snapshot my_slow_snapshot %}

{{
config(
target_database=var('target_database', database),
target_schema=schema,
unique_key='id',
strategy='timestamp',
updated_at='updated_at'
)
}}

select
id,
updated_at,
seconds

from {{ ref('gen') }}

{% endsnapshot %}
Loading