Skip to content

Commit

Permalink
(#1884) Fix for non-atomic snapshot staging tables
Browse files Browse the repository at this point in the history
  • Loading branch information
drewbanin committed May 2, 2020
1 parent 68babfb commit 8681dd8
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 55 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
- Fix "Object of type Decimal is not JSON serializable" error when BigQuery queries returned numeric types in nested data structures ([#2336](https://github.com/fishtown-analytics/dbt/issues/2336), [#2348](https://github.com/fishtown-analytics/dbt/pull/2348))
- No longer query the information_schema.schemata view on bigquery ([#2320](https://github.com/fishtown-analytics/dbt/issues/2320), [#2382](https://github.com/fishtown-analytics/dbt/pull/2382))
- Add support for `sql_header` config in incremental models ([#2136](https://github.com/fishtown-analytics/dbt/issues/2136), [#2200](https://github.com/fishtown-analytics/dbt/pull/2200))
- Fix for non-atomic snapshot staging table creation ([#1884](https://github.com/fishtown-analytics/dbt/issues/1884), [#2390](https://github.com/fishtown-analytics/dbt/pull/2390))

### Under the hood
- Added more tests for source inheritance ([#2264](https://github.com/fishtown-analytics/dbt/issues/2264), [#2291](https://github.com/fishtown-analytics/dbt/pull/2291))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
{% endmacro %}


{% macro snapshot_staging_table_inserts(strategy, source_sql, target_relation) -%}
{% macro snapshot_staging_table(strategy, source_sql, target_relation) -%}

with snapshot_query as (

Expand All @@ -40,14 +40,27 @@

),

source_data as (
insertions_source_data as (

select *,
{{ strategy.scd_id }} as dbt_scd_id,
select
*,
{{ strategy.unique_key }} as dbt_unique_key,
{{ strategy.updated_at }} as dbt_updated_at,
{{ strategy.updated_at }} as dbt_valid_from,
nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as dbt_valid_to,
{{ strategy.scd_id }} as dbt_scd_id

from snapshot_query
),

updates_source_data as (

select
*,
{{ strategy.unique_key }} as dbt_unique_key,
{{ strategy.updated_at }} as dbt_updated_at,
{{ strategy.updated_at }} as dbt_valid_from,
nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as dbt_valid_to
{{ strategy.updated_at }} as dbt_valid_to

from snapshot_query
),
Expand All @@ -58,7 +71,7 @@
'insert' as dbt_change_type,
source_data.*

from source_data
from insertions_source_data as source_data
left outer join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
where snapshotted_data.dbt_unique_key is null
or (
Expand All @@ -69,58 +82,25 @@
)
)

)

select * from insertions

{%- endmacro %}


{% macro snapshot_staging_table_updates(strategy, source_sql, target_relation) -%}

with snapshot_query as (

{{ source_sql }}

),

snapshotted_data as (

select *,
{{ strategy.unique_key }} as dbt_unique_key

from {{ target_relation }}

),

source_data as (

select
*,
{{ strategy.scd_id }} as dbt_scd_id,
{{ strategy.unique_key }} as dbt_unique_key,
{{ strategy.updated_at }} as dbt_updated_at,
{{ strategy.updated_at }} as dbt_valid_from

from snapshot_query
),

updates as (

select
'update' as dbt_change_type,
snapshotted_data.dbt_scd_id,
source_data.dbt_valid_from as dbt_valid_to
source_data.*,
snapshotted_data.dbt_scd_id

from source_data
from updates_source_data as source_data
join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
where snapshotted_data.dbt_valid_to is null
and (
{{ strategy.row_changed }}
)

)

select * from insertions
union all
select * from updates

{%- endmacro %}
Expand Down Expand Up @@ -159,18 +139,10 @@
{% macro build_snapshot_staging_table(strategy, sql, target_relation) %}
{% set tmp_relation = make_temp_relation(target_relation) %}

{% set inserts_select = snapshot_staging_table_inserts(strategy, sql, target_relation) %}
{% set updates_select = snapshot_staging_table_updates(strategy, sql, target_relation) %}

{% call statement('build_snapshot_staging_relation_inserts') %}
{{ create_table_as(True, tmp_relation, inserts_select) }}
{% endcall %}
{% set select = snapshot_staging_table(strategy, sql, target_relation) %}

{% call statement('build_snapshot_staging_relation_updates') %}
insert into {{ tmp_relation }} (dbt_change_type, dbt_scd_id, dbt_valid_to)
select dbt_change_type, dbt_scd_id, dbt_valid_to from (
{{ updates_select }}
) dbt_sbq
{% call statement('build_snapshot_staging_relation') %}
{{ create_table_as(True, tmp_relation, select) }}
{% endcall %}

{% do return(tmp_relation) %}
Expand Down
44 changes: 44 additions & 0 deletions test/integration/004_simple_snapshot_test/models-slow/gen.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@

{{ config(materialized='ephemeral') }}


/*
Generates 50 rows that "appear" to update every
second to a query-er.
1 2020-04-21 20:44:00-04 0
2 2020-04-21 20:43:59-04 59
3 2020-04-21 20:43:58-04 58
4 2020-04-21 20:43:57-04 57
.... 1 second later ....
1 2020-04-21 20:44:01-04 1
2 2020-04-21 20:44:00-04 0
3 2020-04-21 20:43:59-04 59
4 2020-04-21 20:43:58-04 58
This view uses pg_sleep(2) to make queries against
the view take a non-trivial amount of time
Use statement_timestamp() as it changes during a transactions.
If we used now() or current_time or similar, then the timestamp
of the start of the transaction would be returned instead.
*/

with gen as (

select
id,
date_trunc('second', statement_timestamp()) - (interval '1 second' * id) as updated_at

from generate_series(1, 10) id

)

select
id,
updated_at,
extract(seconds from updated_at)::int as seconds

from gen, pg_sleep(2)
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@

/*
Assert that the dbt_valid_from of the latest record
is equal to the dbt_valid_to of the previous record
*/

with snapshot as (

select * from {{ ref('my_slow_snapshot') }}

)

select
snap1.id,
snap1.dbt_valid_from as new_valid_from,
snap2.dbt_valid_from as old_valid_from,
snap2.dbt_valid_to as old_valid_to

from snapshot as snap1
join snapshot as snap2 on snap1.id = snap2.id
where snap1.dbt_valid_to is null
and snap2.dbt_valid_to is not null
and snap1.dbt_valid_from != snap2.dbt_valid_to
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@

{% snapshot my_slow_snapshot %}

{{
config(
target_database=var('target_database', database),
target_schema=schema,
unique_key='id',
strategy='timestamp',
updated_at='updated_at'
)
}}

select
id,
updated_at,
seconds

from {{ ref('gen') }}

{% endsnapshot %}
33 changes: 33 additions & 0 deletions test/integration/004_simple_snapshot_test/test_simple_snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -677,3 +677,36 @@ def test__postgres__long_text(self):
self.assertEqual(len(results), 2)
got_names = set(r.get('longstring') for r in results)
self.assertEqual(got_names, {'a' * 500, 'short'})


class TestSlowQuery(DBTIntegrationTest):

@property
def schema(self):
return "simple_snapshot_004"

@property
def models(self):
return "models-slow"

def run_snapshot(self):
return self.run_dbt(['snapshot'])

@property
def project_config(self):
return {
"config-version": 2,
"snapshot-paths": ['test-snapshots-slow'],
"test-paths": ["test-snapshots-slow-tests"],
}

@use_profile('postgres')
def test__postgres__slow(self):
results = self.run_dbt(['snapshot'])
self.assertEqual(len(results), 1)

results = self.run_dbt(['snapshot'])
self.assertEqual(len(results), 1)

results = self.run_dbt(['test'])
self.assertEqual(len(results), 1)

0 comments on commit 8681dd8

Please sign in to comment.