Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for re-used check cols in snapshots #1614

Merged
merged 6 commits into from
Jul 24, 2019

Conversation

drewbanin
Copy link
Contributor

@drewbanin drewbanin commented Jul 17, 2019

Related to #1590

An issue existed in Snapshots on 0.14.0 for BigQuery and Snowflake which caused dbt snapshot to work incorrectly when the check strategy was used in a specific scenario. If the check cols cycled between values and ended up in a previously known state, then dbt would set the dbt_valid_to date to be non-null for the "current" record, but no corresponding "new" record would be inserted.

This is a function of how dbt merges snapshot changes into the destination table (pseudocode):

merge into dest
using source
on scd_id

when matched then update
when not matched then insert

The problem here is that when not matched wouldn't be evaluated for these records, because there was a match on the dbt_scd_id field. This PR mitigates that problem by including a "row version" in the dbt_scd_id hash when the check_cols strategy is specified. For incremental snapshot builds, the check strategy will incorporate the number of instances of the specified dbt_unique_key into the dbt_scd_id hash. So, a set of reused values for the check_cols will result in a different hash on a subsequent invocation of dbt snapshot.

This PR also includes improvements to the status shown for BigQuery create/insert/merge log lines, as it was helpful in debugging.

Edit: See also #1709 for an improved approach to mitigating this problem (thanks to @mikaelene for the suggestion and sample code!)

@drewbanin drewbanin added the snapshots Issues related to dbt's snapshot functionality label Jul 17, 2019
@drewbanin drewbanin added this to the 0.14.1 milestone Jul 17, 2019
@drewbanin drewbanin requested a review from beckjake July 22, 2019 13:56
Copy link
Contributor

@beckjake beckjake left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great



{% macro default__snapshot_hash_arguments(args) %}
{% macro default__snapshot_hash_arguments(args) -%}
md5({% for arg in args %}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to make this {%- for arg in args -%}...{%- endfor %}, or does that shove all of this onto one line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah - i was trying to make the whitespace a little cleaner here - it was pretty hard to find what I was looking for in the logs while it was all on one line (especially with the new subquery).

This output looks something like:

    source_data as (

        select *,
            md5(
        coalesce(cast(number as varchar ), '')
         || '|' || 
    
        coalesce(cast((
             select count(*) from snapshotted_data
             where snapshotted_data.dbt_unique_key = number
            ) as varchar ), '')
         || '|' || 
    
        coalesce(cast(name as varchar ), '')
        
    ) as dbt_scd_id,

Which isn't the most beautiful SQL I've ever seen, but should be ok

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding the whitespace controls makes it look like this:

    source_data as (

        select *,
            md5(coalesce(cast(number as varchar ), '')
         || '|' || coalesce(cast((
             select count(*) from snapshotted_data
             where snapshotted_data.dbt_unique_key = number
            ) as varchar ), '')
         || '|' || coalesce(cast(name as varchar ), '')
        ) as dbt_scd_id,
    ),

I will update!

{% if target_exists %}
{% set row_version -%}
(
select count(*) from {{ snapshotted_rel }}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if there are two rows with the same primary key in one snapshot operation? Obviously user error, but do we do anything like error about it, or just give bad results?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that will result in "undefined behavior" -- the exact outcome depends on:

  1. if the specified check_cols values are duplicated as well, or if they differ across the duplicated primary key
  2. the exact state of the database

Whichever way you slice it, dbt is going to do something undesirable if the specified primary key is duplicated in the snapshot operation though. On pg/redshift, you'll end up with dupes in your snapshot table. On snowflake/bq, you should see a nondeterministic merge error.

We could:

  1. run a uniqueness check on the staging temp table before loading into the destination table
  2. use a row_number() (or similar) to dedupe records when building the staging table
  • this would solve some problems but create others!
  1. leave it to the user to verify that their snapshot unique_keys are unique
  • updating documentation
  • making it possible to specify a schema.yml spec for snapshots?

@@ -202,15 +202,31 @@ def raw_execute(self, sql, fetch=False):

def execute(self, sql, auto_begin=False, fetch=None):
# auto_begin is ignored on bigquery, and only included for consistency
_, iterator = self.raw_execute(sql, fetch=fetch)
query_job, iterator = self.raw_execute(sql, fetch=fetch)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

to_hex(md5(concat({% for arg in args %}coalesce(cast({{ arg }} as string), ''){% if not loop.last %}, '|',{% endif %}{% endfor %})))
{% endmacro %}
{% macro bigquery__snapshot_hash_arguments(args) -%}
to_hex(md5(concat({% for arg in args %}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have similar for-loop whitespace formatting questions as I did on snowflake, here

Copy link
Contributor Author

@drewbanin drewbanin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really good points @beckjake - thanks for the feedback! Let me know if you have any thoughts about the right path forward



{% macro default__snapshot_hash_arguments(args) %}
{% macro default__snapshot_hash_arguments(args) -%}
md5({% for arg in args %}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah - i was trying to make the whitespace a little cleaner here - it was pretty hard to find what I was looking for in the logs while it was all on one line (especially with the new subquery).

This output looks something like:

    source_data as (

        select *,
            md5(
        coalesce(cast(number as varchar ), '')
         || '|' || 
    
        coalesce(cast((
             select count(*) from snapshotted_data
             where snapshotted_data.dbt_unique_key = number
            ) as varchar ), '')
         || '|' || 
    
        coalesce(cast(name as varchar ), '')
        
    ) as dbt_scd_id,

Which isn't the most beautiful SQL I've ever seen, but should be ok



{% macro default__snapshot_hash_arguments(args) %}
{% macro default__snapshot_hash_arguments(args) -%}
md5({% for arg in args %}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding the whitespace controls makes it look like this:

    source_data as (

        select *,
            md5(coalesce(cast(number as varchar ), '')
         || '|' || coalesce(cast((
             select count(*) from snapshotted_data
             where snapshotted_data.dbt_unique_key = number
            ) as varchar ), '')
         || '|' || coalesce(cast(name as varchar ), '')
        ) as dbt_scd_id,
    ),

I will update!

{% if target_exists %}
{% set row_version -%}
(
select count(*) from {{ snapshotted_rel }}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that will result in "undefined behavior" -- the exact outcome depends on:

  1. if the specified check_cols values are duplicated as well, or if they differ across the duplicated primary key
  2. the exact state of the database

Whichever way you slice it, dbt is going to do something undesirable if the specified primary key is duplicated in the snapshot operation though. On pg/redshift, you'll end up with dupes in your snapshot table. On snowflake/bq, you should see a nondeterministic merge error.

We could:

  1. run a uniqueness check on the staging temp table before loading into the destination table
  2. use a row_number() (or similar) to dedupe records when building the staging table
  • this would solve some problems but create others!
  1. leave it to the user to verify that their snapshot unique_keys are unique
  • updating documentation
  • making it possible to specify a schema.yml spec for snapshots?

@beckjake
Copy link
Contributor

@drewbanin Given those failure modes, I think it's fine to just document this pattern and leave it to the user. It sounds like there won't be any loss of data: on BigQuery/Snowflake you'll get an error and on Postgres/Redshift you'll get a pretty dumb database state, but that's fine. If users want to run a snapshot and ensure uniqueness before running, they can always set the table up as a source or model and provide appropriate schema tests on the source, right?

@drewbanin drewbanin merged commit b12484b into dev/0.14.1 Jul 24, 2019
@drewbanin drewbanin deleted the fix/snapshot-check-cols-cycle branch July 24, 2019 16:56
@drewbanin drewbanin changed the title possible fix for re-used check cols on BQ Fix for re-used check cols in snapshots Aug 5, 2019
@mikaelene
Copy link
Contributor

I am late in the game here. But why can't you just use {% set scd_id_expr = snapshot_hash_arguments([primary_key, updated_at]) %} in macro snapshot_check_strategy ?

Then:

  • The updates would have [primaryKey and null] as scd_id and have a match.
  • The inserts would have [primaryKey and current_timestamp] as scd_id and not be matched.

Or am I missing something here?

@drewbanin
Copy link
Contributor Author

Hey @mikaelene - the check strategy is intended to be used when the source table does not have a reliable updated_at field. If you do have a reliable updated_at timestamp, then I'd always encourage you to use the timestamp strategy instead!

@mikaelene
Copy link
Contributor

Hi @drewbanin . I totally got that. But in the checkstrategy:

  • we are using row_changed to verify if any row have changed since last by looking att the check_cols.

  • scd_id is used as a unique identifier for each snapshooted row. If you used [primary_key, updated_at], that would be a unique identifier and could be used instead of generating a row_version in the snapshot_check_strategy

  • You then generate inserts and updates based on the row_changed flag.

      select
          'insert' as dbt_change_type,
          source_data.*
    
      from source_data
      left outer join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
      where snapshotted_data.dbt_unique_key is null
         or (
              snapshotted_data.dbt_unique_key is not null
          and snapshotted_data.dbt_valid_to is null
          and (
              {{ strategy.row_changed }}
          )
      )
    
    
      select
          'update' as dbt_change_type,
          snapshotted_data.dbt_scd_id,
          source_data.dbt_valid_from as dbt_valid_to
    
      from source_data
      join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
      where snapshotted_data.dbt_valid_to is null
      and (
          {{ strategy.row_changed }}
      )
    

If you merge themn on scd_id_expr = snapshot_hash_arguments([primary_key, updated_at])

You will end up with the right inserts for not matched and the right updates for matched and you would not need to:

  • generate a large hash with all columns
  • Loop through rows to generate a unique_id with a rownumber (cause you already have one in scd_id)

@drewbanin
Copy link
Contributor Author

scd_id is used as a unique identifier for each snapshooted row. If you used [primary_key, updated_at], that would be a unique identifier and could be used instead of generating a row_version in the snapshot_check_strategy

What's value are you thinking we'd use for the updated_at shown here? The dbt_valid_from / dbt_valid_to timestamps that dbt uses are generated by using the current_timestamp() value from the database. If we tried to use the wall clock time as the updated_at value, then every row would look like it had changed on every invocation of dbt snapshot!

@mikaelene
Copy link
Contributor

updated_at will be current_timestamp() but it is not used to identify changed rows.

If you read the logs of a check_cols run you see that the first SQL executed is to identify changed rows. The select 'insert' as dbt_change_type,... and the select 'update' as dbt_change_type,... macros above.

  • They takes the last row of each dbt_unique_key (in the snapshotted table) snapshotted_data.dbt_valid_to is null.
  • Joins them with the current condition of the source table on the columns to check for changes (eg. snapshotted_data.ATTEMPTS != source_data.ATTEMPTS). Excluding not changed rows.
  • The scd_id_expr is not used in the join! Only rows changed will be added to the staging table regardless of what we use for scd_id_expr.

The 'insert'-rows will have an scd_id_expr of hash(primary_key, current_timestamp())
The 'update'-rows will have the scd_id_expr from the last snapshotted set of rows. snapshotted_data.dbt_scd_id

The merge will then insert the changed rows with NULL in dbt_valid_to and update only the last version of the changed rows.

I'm on a business trip and don't have my dev-pc for running the tests. But if you checkout 0.14.0 and change row 110 to {% set scd_id_expr = snapshot_hash_arguments([primary_key, updated_at]) %}. I think the new tests for this issue will pass. I'll see if I can find time do that next week.

Sorry for bothering you on this old fix! I stumbled over it on a link somewhere and couldn't help to notice that there might be an easier solution. Even if there are a possibillity that I am missing something vital :-)

@drewbanin
Copy link
Contributor Author

Hey @mikaelene - thanks for this! I think the key thing here is that dbt performs this merge in two different ways:

  • On Postgres and Redshift, dbt runs an update + insert
  • On BigQuery and Snowflake, dbt runs a merge statement

You can check out the source code for the postgres + redshift approach as well as the bigquery + snowflake approach in the dbt source code.

In the BigQuery + Snowflake approach (the default!) the merge statement does indeed merge on the dbt_scd_id column. On Snowflake and BigQuery, the database will raise an error if more than one record in the "source" dataset matches a single row in the "target" dataset. This PR ensures that the dbt_scd_id will not be duplicated for "check" snapshots even if a set of check values is recycled across snapshots.

We pursued the approach presented in this PR in order to keep the core snapshot logic consistent across adapters while still supporting each of postgres, redshift, snowflake, and bigquery.

I do think you're right: the logic here isn't strictly necessary on redshift and postgres. It may turn out that in the future, it will make sense to let the implementation for snapshots vary more across databases. For now though, there's a ton of merit to keeping things consistent.

Thanks for taking the time to write this up! Happy to answer any further questions if you have them.

@mikaelene
Copy link
Contributor

Hi @drewbanin ,Thanks for getting back to me, event if i am annoying and stubborn :-).

I am using the default (merge) approach in my sqlserver-adapter and it has been working for us since 0.14.0RC. I don't have access to Snowflake or BigQuery so I can't test on them.

I made that change in my adapter after noticing this on 0.14.0 before you had started working on this PR.
https://github.com/mikaelene/dbt-sqlserver/blob/master/dbt/include/sqlserver/macros/materializations/snapshot/strategies.sql

The dbt_scd_id will be unique in the source-merge-table (and in the target), even if they are recycled.

Consider this table to be snappshotted:

Primary_Key Color
1 Red

The first run it will only have an insert-record in the source-table for the merge with a dbt_scd_id of hash(1, 2019-08-28 06:14:00:00) regardless of which checkcols and values we provide.
The source-table in the merge will be

dbt_change_type Primary_Key Color dbt_scd_id dbt_unique_key dbt_updated_at dbt_valid_from dbt_valid_to
insert 1 Red hash(1, 2019-08-28 06:14:00:00) 1 2019-08-28 06:14:00:00 2019-08-28 06:14:00:00

If the table change to

Primary_Key Color
1 Blue

The second run it will have below in the source-merge-table:

dbt_change_type Primary_Key Color dbt_scd_id dbt_unique_key dbt_updated_at dbt_valid_from dbt_valid_to
insert 1 Blue hash(1, 2019-08-29 06:14:00:00) 1 2019-08-29 06:14:00:00 2019-08-29 06:14:00:00)
update hash(1, 2019-08-28 06:14:00:00) 2019-08-29 06:14:00:00 2019-08-29 06:14:00:00

If it change back to

Primary_Key Color
1 Red

The third run it will have below in the source-merge-table:

dbt_change_type Primary_Key Color dbt_scd_id dbt_unique_key dbt_updated_at dbt_valid_from dbt_valid_to
insert 1 Red hash(1, 2019-08-30 06:14:00:00) 1 2019-08-30 06:14:00:00 2019-08-30 06:14:00:00
update hash(1, 2019-08-29 06:14:00:00) 2019-08-30 06:14:00:00 2019-08-30 06:14:00:00

The resulting snappshotted table will have something like:

Primary_Key Color dbt_scd_id dbt_unique_key dbt_updated_at dbt_valid_from dbt_valid_to
1 Red hash(1, 2019-08-28 06:14:00:00) 1 2019-08-29 06:14:00:00 2019-08-28 06:14:00:00 2019-08-29 06:14:00:00
1 Blue hash(1, 2019-08-29 06:14:00:00) 1 2019-08-30 06:14:00:00 2019-08-29 06:14:00:00 2019-08-30 06:14:00:00
1 Red hash(1, 2019-08-30 06:14:00:00) 1 2019-08-30 06:14:00:00 2019-08-30 06:14:00:00)

This approach will be database agnostic as well.

@drewbanin
Copy link
Contributor Author

Hey @mikaelene - thanks for going really deep here!

I think you're totally right! Sorry for being obtuse earlier - the design you're advocating for is both simpler and faster than the approach implemented in this PR! I needed to play around with this a lot to convince myself that you are right... but it definitely appears to me that you are :)

We're deep in the RC period for the 0.14.1 release, but I'm going to see if we can slip this update in. The simpler we can keep the snapshot code, the better IMO. Thanks so much for taking the time!

@drewbanin
Copy link
Contributor Author

The scd_id_expr is not used in the join! Only rows changed will be added to the staging table regardless of what we use for scd_id_expr.

This was the key thing I didn't internalize when reading your comments. I had it in my head that the scd_id_expr needed to be reproducible for historical records, but that's not necessary! Because we used the snapshotted value of the dbt_scd_id in the update, the insert is free to create its own dbt_scd_id. So cool, so good. Thanks again :)

@drewbanin
Copy link
Contributor Author

@mikaelene
Copy link
Contributor

Great to hear @drewbanin!

I saw you already merged the changes. I'll give it a spin shortly :-)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
snapshots Issues related to dbt's snapshot functionality
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants