Skip to content

Commit

Permalink
Merge pull request #1994 from fishtown-analytics/fix/overlapping-vali…
Browse files Browse the repository at this point in the history
…d-timestamps

Avoid overlapping validity timestamps (#1736)
  • Loading branch information
beckjake authored Dec 13, 2019
2 parents 89dd04d + 382a993 commit a702f58
Show file tree
Hide file tree
Showing 9 changed files with 154 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,30 @@
{% endmacro %}


{% macro snapshot_string_as_time(timestamp) -%}
{{ adapter_macro('snapshot_string_as_time', timestamp) }}
{%- endmacro %}


{% macro default__snapshot_string_as_time(timestamp) %}
{% do exceptions.raise_not_implemented(
'snapshot_string_as_time macro not implemented for adapter '+adapter.type()
) %}
{% endmacro %}

{% macro snapshot_check_strategy(node, snapshotted_rel, current_rel, config, target_exists) %}
{% set check_cols_config = config['check_cols'] %}
{% set primary_key = config['unique_key'] %}
{% set updated_at = snapshot_get_time() %}
{% set select_current_time -%}
select {{ snapshot_get_time() }} as snapshot_start
{%- endset %}

{# don't access the column by name, to avoid dealing with casing issues on snowflake #}
{%- set now = run_query(select_current_time)[0][0] -%}
{% if now is none or now is undefined -%}
{%- do exceptions.raise_compiler_error('Could not get a snapshot start time from the database') -%}
{%- endif %}
{% set updated_at = snapshot_string_as_time(now) %}

{% if check_cols_config == 'all' %}
{% set check_cols = get_columns_in_query(node['injected_sql']) %}
Expand Down
6 changes: 6 additions & 0 deletions plugins/bigquery/dbt/include/bigquery/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@
{%- endmacro %}


{% macro bigquery__snapshot_string_as_time(timestamp) -%}
{%- set result = 'TIMESTAMP("' ~ timestamp ~ '")' -%}
{{ return(result) }}
{%- endmacro %}


{% macro bigquery__list_schemas(database) -%}
{{ return(adapter.list_schemas()) }}
{% endmacro %}
Expand Down
6 changes: 6 additions & 0 deletions plugins/postgres/dbt/include/postgres/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@
now()
{%- endmacro %}

{% macro postgres__snapshot_string_as_time(timestamp) -%}
{%- set result = "'" ~ timestamp ~ "'::timestamp without time zone" -%}
{{ return(result) }}
{%- endmacro %}


{% macro postgres__snapshot_get_time() -%}
{{ current_timestamp() }}::timestamp without time zone
{%- endmacro %}
Expand Down
6 changes: 6 additions & 0 deletions plugins/redshift/dbt/include/redshift/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,12 @@
{{ current_timestamp() }}::timestamp
{%- endmacro %}


{% macro redshift__snapshot_string_as_time(timestamp) -%}
{%- set result = "'" ~ timestamp ~ "'::timestamp" -%}
{{ return(result) }}
{%- endmacro %}

{% macro redshift__make_temp_relation(base_relation, suffix) %}
{% do return(postgres__make_temp_relation(base_relation, suffix)) %}
{% endmacro %}
7 changes: 7 additions & 0 deletions plugins/snowflake/dbt/include/snowflake/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,13 @@
convert_timezone('UTC', current_timestamp())
{%- endmacro %}


{% macro snowflake__snapshot_string_as_time(timestamp) -%}
{%- set result = "to_timestamp_ntz('" ~ timestamp ~ "')" -%}
{{ return(result) }}
{%- endmacro %}


{% macro snowflake__snapshot_get_time() -%}
to_timestamp_ntz({{ current_timestamp() }})
{%- endmacro %}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
{% macro get_snapshot_unique_id() -%}
{{ return(adapter_macro('get_snapshot_unique_id', )) }}
{%- endmacro %}

{% macro default__get_snapshot_unique_id() -%}
{% do return("id || '-' || first_name") %}
{%- endmacro %}


{% macro bigquery__get_snapshot_unique_id() -%}
{%- do return('concat(cast(id as string), "-", first_name)') -%}
{%- endmacro %}

{#
mostly copy+pasted from dbt_utils, but I removed some parameters and added
a query that calls get_snapshot_unique_id
#}
{% macro test_mutually_exclusive_ranges(model) %}

with base as (
select {{ get_snapshot_unique_id() }} as dbt_unique_id,
*
from {{ model }}
),
window_functions as (

select
dbt_valid_from as lower_bound,
coalesce(dbt_valid_to, '2099-1-1T00:00:01') as upper_bound,

lead(dbt_valid_from) over (
partition by dbt_unique_id
order by dbt_valid_from
) as next_lower_bound,

row_number() over (
partition by dbt_unique_id
order by dbt_valid_from desc
) = 1 as is_last_record

from base

),

calc as (
-- We want to return records where one of our assumptions fails, so we'll use
-- the `not` function with `and` statements so we can write our assumptions nore cleanly
select
*,

-- For each record: lower_bound should be < upper_bound.
-- Coalesce it to return an error on the null case (implicit assumption
-- these columns are not_null)
coalesce(
lower_bound < upper_bound,
is_last_record
) as lower_bound_less_than_upper_bound,

-- For each record: upper_bound {{ allow_gaps_operator }} the next lower_bound.
-- Coalesce it to handle null cases for the last record.
coalesce(
upper_bound = next_lower_bound,
is_last_record,
false
) as upper_bound_equal_to_next_lower_bound

from window_functions

),

validation_errors as (

select
*
from calc

where not(
-- THE FOLLOWING SHOULD BE TRUE --
lower_bound_less_than_upper_bound
and upper_bound_equal_to_next_lower_bound
)
)

select count(*) from validation_errors
{% endmacro %}
5 changes: 5 additions & 0 deletions test/integration/004_simple_snapshot_test/models/schema.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
version: 2
models:
- name: snapshot_actual
tests:
- mutually_exclusive_ranges
29 changes: 18 additions & 11 deletions test/integration/004_simple_snapshot_test/test_simple_snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def assert_case_tables_equal(self, actual, expected):
self.assertTablesEqual(actual, expected)

def assert_expected(self):
self.run_dbt(['test'])
self.assert_case_tables_equal('snapshot_actual', 'snapshot_expected')


Expand All @@ -42,6 +43,7 @@ def project_config(self):
return {
"data-paths": ['data'],
"snapshot-paths": ['test-snapshots-pg'],
'macro-paths': ['macros'],
}

@use_profile('postgres')
Expand Down Expand Up @@ -98,7 +100,7 @@ class TestCustomSnapshotFiles(BaseSimpleSnapshotTest):
def project_config(self):
return {
'data-paths': ['data'],
'macro-paths': ['custom-snapshot-macros'],
'macro-paths': ['custom-snapshot-macros', 'macros'],
'snapshot-paths': ['test-snapshots-pg-custom'],
}

Expand Down Expand Up @@ -128,7 +130,7 @@ class TestNamespacedCustomSnapshotFiles(BaseSimpleSnapshotTest):
def project_config(self):
return {
'data-paths': ['data'],
'macro-paths': ['custom-snapshot-macros'],
'macro-paths': ['custom-snapshot-macros', 'macros'],
'snapshot-paths': ['test-snapshots-pg-custom-namespaced'],
}

Expand All @@ -152,7 +154,7 @@ class TestInvalidNamespacedCustomSnapshotFiles(BaseSimpleSnapshotTest):
def project_config(self):
return {
'data-paths': ['data'],
'macro-paths': ['custom-snapshot-macros'],
'macro-paths': ['custom-snapshot-macros', 'macros'],
'snapshot-paths': ['test-snapshots-pg-custom-invalid'],
}

Expand All @@ -179,6 +181,7 @@ def project_config(self):
"data-paths": ['data'],
"snapshot-paths": ['test-snapshots-select',
'test-snapshots-pg'],
'macro-paths': ['macros'],
}

@use_profile('postgres')
Expand Down Expand Up @@ -236,7 +239,8 @@ def project_config(self):
'strategy': 'timestamp',
'updated_at': 'updated_at',
}
}
},
'macro-paths': ['macros'],
}


Expand All @@ -253,16 +257,15 @@ def models(self):
def project_config(self):
return {
"snapshot-paths": ['test-snapshots-bq'],
'macro-paths': ['macros'],
}

def assert_expected(self):
self.run_dbt(['test'])
self.assertTablesEqual('snapshot_actual', 'snapshot_expected')

@use_profile('bigquery')
def test__bigquery__simple_snapshot(self):
self.use_default_project()
self.use_profile('bigquery')

self.run_sql_file("seed_bq.sql")

self.run_dbt(["snapshot"])
Expand All @@ -276,11 +279,8 @@ def test__bigquery__simple_snapshot(self):

self.assert_expected()


@use_profile('bigquery')
def test__bigquery__snapshot_with_new_field(self):
self.use_default_project()
self.use_profile('bigquery')

self.run_sql_file("seed_bq.sql")

Expand Down Expand Up @@ -341,6 +341,7 @@ def project_config(self):
paths = ['test-snapshots-bq']
return {
'snapshot-paths': paths,
'macro-paths': ['macros'],
}

def run_snapshot(self):
Expand Down Expand Up @@ -395,6 +396,7 @@ def project_config(self):
paths = ['test-snapshots-pg']
return {
'snapshot-paths': paths,
'macro-paths': ['macros'],
}

def target_schema(self):
Expand Down Expand Up @@ -427,6 +429,7 @@ def models(self):
def project_config(self):
return {
"snapshot-paths": ['test-snapshots-invalid'],
'macro-paths': ['macros'],
}

@use_profile('postgres')
Expand Down Expand Up @@ -456,6 +459,7 @@ def project_config(self):
return {
"data-paths": ['data'],
"snapshot-paths": ['test-check-col-snapshots'],
'macro-paths': ['macros'],
}


Expand All @@ -472,7 +476,8 @@ def project_config(self):
"strategy": "check",
"check_cols": ["email"],
}
}
},
'macro-paths': ['macros'],
}


Expand All @@ -493,6 +498,7 @@ def project_config(self):
return {
"data-paths": ['data'],
"snapshot-paths": ['test-check-col-snapshots-bq'],
'macro-paths': ['macros'],
}

@use_profile('bigquery')
Expand Down Expand Up @@ -562,6 +568,7 @@ def run_snapshot(self):
def project_config(self):
return {
"snapshot-paths": ['test-snapshots-longtext'],
'macro-paths': ['macros'],
}

@use_profile('postgres')
Expand Down
4 changes: 0 additions & 4 deletions test/integration/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -545,10 +545,6 @@ def run_dbt_and_check(self, args=None, strict=True, parser=False, profiles_dir=T
final_args.append('--log-cache-events')

logger.info("Invoking dbt with {}".format(final_args))
if args is None:
args = ["run"]

logger.info("Invoking dbt with {}".format(args))
return dbt.handle_and_check(final_args)

def run_sql_file(self, path, kwargs=None):
Expand Down

0 comments on commit a702f58

Please sign in to comment.