Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CT-2280] Tracking changes in the non-chronological insertions of data in dbt snapshots (check strategy) #7138

Open
3 tasks done
Tracked by #10151
HansalShah007 opened this issue Mar 8, 2023 · 6 comments
Labels
enhancement New feature or request snapshots Issues related to dbt's snapshot functionality

Comments

@HansalShah007
Copy link

HansalShah007 commented Mar 8, 2023

Is this your first time submitting a feature request?

  • I have read the expectations for open source contributors
  • I have searched the existing issues, and I could not find an existing issue for this feature
  • I am requesting a straightforward extension of existing dbt functionality, rather than a Big Idea better suited to a discussion

Describe the feature

Current Behaviour

Check strategy of snapshots is only able to correctly store the changes in the data if they come in chronological order. It is not able to handle non-chronological insertion of data. For instance, assume we have an entry for a unique_id in the snapshot table as shown in the table below.

Current Snapshot Table

unique_id check_cols dbt_scd_id dbt_updated_at dbt_valid_from dbt_valid_to
1 X checksum1 2022-07-16T03:58:00.000+0000 2022-07-16T03:58:00.000+0000

When a new row with the same unique_key is introduced for a timestamp 2022-07-15T03:58:00.000+0000 and values for one of the check_cols is different from that of the currently valid row, then it will go ahead and change the snapshot table as follow.

New row in the source table

unique_id check_cols as_of_datetime
1 Y 2022-07-15T03:58:00.000+0000

Updated snapshot table

unique_id check_cols dbt_scd_id dbt_updated_at dbt_valid_from dbt_valid_to
1 X checksum1 2022-07-16T03:58:00.000+0000 2022-07-16T03:58:00.000+0000 2022-07-15T03:58:00.000+0000
1 Y checksum2 2022-07-15T03:58:00.000+0000 2022-07-15T03:58:00.000+0000

Apparent from the changes made to the snapshots table, check strategy is not able to handle the non-chronological changes made to the data.

Expected Behaviour

For the above example, the expected output for the snapshot table should have been as shown in the table below.

Expected snapshot table

unique_id check_cols dbt_scd_id dbt_updated_at dbt_valid_from dbt_valid_to
1 Y checksum2 2022-07-15T03:58:00.000+0000 2022-07-15T03:58:00.000+0000 2022-07-16T03:58:00.000+0000
1 X checksum1 2022-07-16T03:58:00.000+0000 2022-07-16T03:58:00.000+0000

Describe alternatives you've considered

Updates in the source code

To solve this problem, I have made some changes to the source code of the snapshots, specifically in the following macros and the snapshot materialization strategy:

  • default__snapshot_merge_sql
  • default__snapshot_staging_table

I have introduced changes in the way inserts and updates are identified in the default__snapshot_staging_table macro, for tracking the changes in the non-chronologically incoming data.

The new code introduces the following dbt_change_types.

Types of Inserts

Insertion Type 1

When a latest record arrives for an already existing unique_id that has some changes or a previously unseen unique_id is encountered.

Scenario 1

Data already existing in snapshot table for unique_id 1

unique_id some_columns… dbt_valid_from dbt_valid_to
1 X 15-07-2022 null

New Data (with some changes)

unique_id some_columns… as_of_date
1 Y 16-07-2022

Final Snapshot (Insertion Type 1)

unique_id some_columns… dbt_valid_from dbt_valid_to dbt_change_type
1 X 15-07-2022 16-07-2022 Update 1
1 Y 16-07-2022 null Insertion 1

Scenario 2

Data already existing in snapshot table

unique_id some_columns… dbt_valid_from dbt_valid_to
1 X 15-07-2022 null

New Data (with some changes)

unique_id some_columns… as_of_date
2 Z 16-07-2022

Final Snapshot (Insertion Type 1)

unique_id some_columns… dbt_valid_from dbt_valid_to dbt_change_type
1 X 15-07-2022 null -
2 Z 16-07-2022 null Insertion 1

Insertion Type 2

When an older record of the same unique_id arrives, which is different from the its nearest future version.

Data already existing in snapshot table for unique_id 1

unique_id some_columns… dbt_valid_from dbt_valid_to
1 X 15-07-2022 17-07-2022
1 Y 17-07-2022 null

New Data (with some changes)

unique_id some_columns… as_of_date
1 Z 14-07-2022

Final Snapshot (Insertion Type 2)

unique_id some_columns… dbt_valid_from dbt_valid_to dbt_change_type
1 Z 14-07-2022 15-07-2022 Insertion 2
1 X 15-07-2022 17-07-2022 -
1 Y 17-02-2022 null -

Types of Updates

Update Type 1

When a later record of the same unique_id arrives, which is different from the its nearest past version and has a dbt_valid_from < dbt_valid_to (snapshot table record).

Update Type 2

When an older record of the same unique_id arrives, which is not different from the its future version.

Data already existing in snapshot table for unique_id 1

unique_id some_columns… dbt_valid_from dbt_valid_to
1 X 15-07-2022 17-07-2022
1 Y 17-07-2022 null

New Data (with some changes)

unique_id some_columns… as_of_date
1 Y 16-07-2022

Final Snapshot (Update Type 1 and 2 performed simultaneously)

unique_id some_columns… dbt_valid_from dbt_valid_to dbt_change_type
1 X 15-07-2022 16-07-2022 Update 1
1 Y 16-02-2022 null Update 2

Source Code Changes

Snapshot Materialization

{% materialization snapshot, default %}
  {%- set config = model['config'] -%}

  {%- set target_table = model.get('alias', model.get('name')) -%}

  {%- set strategy_name = config.get('strategy') -%}
  {%- set unique_key = config.get('unique_key') %}
  -- grab current tables grants config for comparision later on
  {%- set grant_config = config.get('grants') -%}

  {% set target_relation_exists, target_relation = get_or_create_relation(
          database=model.database,
          schema=model.schema,
          identifier=target_table,
          type='table') -%}

  {%- if not target_relation.is_table -%}
    {% do exceptions.relation_wrong_type(target_relation, 'table') %}
  {%- endif -%}


  {{ run_hooks(pre_hooks, inside_transaction=False) }}

  {{ run_hooks(pre_hooks, inside_transaction=True) }}

  {% set strategy_macro = strategy_dispatch(strategy_name) %}
  {% set strategy = strategy_macro(model, "snapshotted_data", "source_data", config, target_relation_exists) %}

  {% if not target_relation_exists %}

      {% set build_sql = build_snapshot_table(strategy, model['compiled_code']) %}
      {% set final_sql = create_table_as(False, target_relation, build_sql) %}

  {% else %}

      {{ adapter.valid_snapshot_target(target_relation) }}

      {% set staging_table = build_snapshot_staging_table(strategy, sql, target_relation) %}

      -- this may no-op if the database does not require column expansion
      {% do adapter.expand_target_column_types(from_relation=staging_table,
                                               to_relation=target_relation) %}

      {% set missing_columns = adapter.get_missing_columns(staging_table, target_relation)
                                   | rejectattr('name', 'equalto', 'dbt_change_type')
                                   | rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE')
                                   | rejectattr('name', 'equalto', 'dbt_unique_key')
                                   | rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY')
                                   | rejectattr('name', 'equalto', 'dbt_new_scd_id')
                                   | rejectattr('name', 'equalto', 'DBT_NEW_SCD_ID')
                                   | list %}

      {% do create_columns(target_relation, missing_columns) %}

      {% set source_columns = adapter.get_columns_in_relation(staging_table)
                                   | rejectattr('name', 'equalto', 'dbt_change_type')
                                   | rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE')
                                   | rejectattr('name', 'equalto', 'dbt_unique_key')
                                   | rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY')
                                   | rejectattr('name', 'equalto', 'dbt_new_scd_id')
                                   | rejectattr('name', 'equalto', 'DBT_NEW_SCD_ID')
                                   | list %}
     
      {% set quoted_source_columns = [] %}
      {% for column in source_columns %}
        {% do quoted_source_columns.append(adapter.quote(column.name)) %}
      {% endfor %}
      
      {% set final_sql = snapshot_merge_sql(
            target = target_relation,
            source = staging_table,
            insert_cols = quoted_source_columns
         )
      %}

  {% endif %}

  {% call statement('main') %}
      {{ final_sql }}
  {% endcall %}

  {% set should_revoke = should_revoke(target_relation_exists, full_refresh_mode=False) %}
  {% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}

  {% do persist_docs(target_relation, model) %}

  {% if not target_relation_exists %}
    {% do create_indexes(target_relation) %}
  {% endif %}

  {{ run_hooks(post_hooks, inside_transaction=True) }}

  {{ adapter.commit() }}

  {% if staging_table is defined %}
      {% do post_snapshot(staging_table) %}
  {% endif %}

  {{ run_hooks(post_hooks, inside_transaction=False) }}

  {{ return({'relations': [target_relation]}) }}

{% endmaterialization %}

Snapshot Staging Table

{% macro default__snapshot_staging_table(strategy, source_sql, target_relation) -%}

    with snapshot_query as (

        {{ source_sql }}

    ),

    snapshotted_data as (

        select *,
            {{ strategy.unique_key }} as dbt_unique_key

        from {{ target_relation }}
        where dbt_valid_to is null

    ),

    insertions_source_data as (

        select
            *,
            {{ strategy.unique_key }} as dbt_unique_key,
            {{ strategy.updated_at }} as dbt_updated_at,
            {{ strategy.updated_at }} as dbt_valid_from,
            nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as dbt_valid_to,
            {{ strategy.scd_id }} as dbt_scd_id

        from snapshot_query
    ),

    updates_source_data as (

        select
            *,
            {{ strategy.unique_key }} as dbt_unique_key,
            {{ strategy.updated_at }} as dbt_updated_at,
            {{ strategy.updated_at }} as dbt_valid_from,
            {{ strategy.updated_at }} as dbt_valid_to

        from snapshot_query
    ),

    {%- if strategy.invalidate_hard_deletes %}

    deletes_source_data as (

        select
            *,
            {{ strategy.updated_at }} as dbt_valid_from,
            {{ strategy.unique_key }} as dbt_unique_key
        from snapshot_query
    ),
    {% endif %}

    insertions1 as (

        select
            'insert1' as dbt_change_type,
            source_data.*
            
        from insertions_source_data as source_data
        left outer join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
        where snapshotted_data.dbt_unique_key is null
           or (
                snapshotted_data.dbt_unique_key is not null
            and (
                {{ strategy.row_changed }}
            )
            and snapshotted_data.dbt_valid_from < source_data.dbt_valid_from
        )

    ),
    
    insertions2 as (

        select
            'insert2' as dbt_change_type,
            source_data.* except(source_data.dbt_valid_to, source_data.dbt_scd_id),
            snapshotted_data.dbt_valid_from as dbt_valid_to,
            source_data.dbt_scd_id

        from insertions_source_data as source_data
        inner join (
          select
            t.* except(t.rn)
          from (
            select
              snap_data.*,
              row_number() over(partition by snap_data.dbt_unique_key order by snap_data.dbt_valid_from asc) as rn
            from (
              select
                *,
                {{ strategy.unique_key }} as dbt_unique_key
              from {{ target_relation }}
            ) snap_data inner join insertions_source_data as source_data
            on
              snap_data.dbt_unique_key = source_data.dbt_unique_key
            where snap_data.dbt_valid_from > source_data.dbt_valid_from
          ) t
          where t.rn = 1
        ) snapshotted_data 
        on 
          source_data.dbt_unique_key = snapshotted_data.dbt_unique_key
        where (
          {{ strategy.row_changed }}
        )

    ),
    
    updates1 as (
      
      select
        'update1' as dbt_change_type,
        snapshotted_data.* except(snapshotted_data.dbt_valid_to, snapshotted_data.dbt_scd_id, snapshotted_data.dbt_valid_from),
        snapshotted_data.dbt_valid_from as dbt_valid_from, 
        source_data.dbt_valid_to as dbt_valid_to,
        snapshotted_data.dbt_scd_id
      from (
        select
          t.* except(t.rn)
        from (
          select
            snap_data.*,
            row_number() over(partition by snap_data.dbt_unique_key order by snap_data.dbt_valid_from desc) as rn
          from (
            select
              *,
              {{ strategy.unique_key }} as dbt_unique_key
            from {{ target_relation }}
          ) snap_data inner join updates_source_data as source_data
          on
            snap_data.dbt_unique_key = source_data.dbt_unique_key
          where snap_data.dbt_valid_from < source_data.dbt_valid_from
        ) t
        where t.rn = 1
      ) snapshotted_data inner join updates_source_data as source_data
      on
        snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
      where snapshotted_data.dbt_valid_to > source_data.dbt_valid_from 
      and (
        {{ strategy.row_changed }}
      )
    ),
    
     updates2 as (
      
      select
        'update2' as dbt_change_type,
        snapshotted_data.* except(snapshotted_data.dbt_valid_from, snapshotted_data.dbt_scd_id, snapshotted_data.dbt_valid_to),
        source_data.dbt_valid_from as dbt_valid_from,
        snapshotted_data.dbt_valid_to as dbt_valid_to,
        snapshotted_data.dbt_scd_id as dbt_scd_id,
        source_data.dbt_scd_id as dbt_new_scd_id
        
      from (
        select
          t.* except(t.rn)
        from (
          select
            snap_data.*,
            row_number() over(partition by snap_data.dbt_unique_key order by snap_data.dbt_valid_from asc) as rn
          from (
            select
              *,
              {{ strategy.unique_key }} as dbt_unique_key
            from {{ target_relation }}
          ) snap_data inner join insertions_source_data as source_data
          on
            snap_data.dbt_unique_key = source_data.dbt_unique_key
          where snap_data.dbt_valid_from > source_data.dbt_valid_from
        ) t
        where t.rn = 1
      ) snapshotted_data inner join insertions_source_data as source_data
      on
        snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
      where not (
        {{ strategy.row_changed }}
      )
    )
      

    {%- if strategy.invalidate_hard_deletes -%}
    ,
    
    deletes as (

        select
            'delete' as dbt_change_type,
            source_data.*,
            {{ snapshot_get_time() }} as dbt_valid_from,
            {{ snapshot_get_time() }} as dbt_updated_at,
            {{ snapshot_get_time() }} as dbt_valid_to,
            snapshotted_data.dbt_scd_id

        from snapshotted_data
        left join deletes_source_data as source_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
        where source_data.dbt_unique_key is null
    )
    
    {%- endif %}
    
    select *, null as dbt_new_scd_id from insertions1
    union all
    select *, null as dbt_new_scd_id from insertions2
    union all
    select *, null as dbt_new_scd_id from updates1
    union all
    select * from updates2
    {%- if strategy.invalidate_hard_deletes %}
    union all
    select *, null as dbt_new_scd_id from deletes
    {% endif %}
    
{%- endmacro %}

Snapshot Merge SQL

{% macro default__snapshot_merge_sql(target, source, insert_cols) -%}
    {% set insert_cols_csv = insert_cols | join(', ') %}

    merge into {{ target }} as DBT_INTERNAL_DEST
    using {{ source }} as DBT_INTERNAL_SOURCE
    on DBT_INTERNAL_SOURCE.dbt_scd_id = DBT_INTERNAL_DEST.dbt_scd_id

    when matched
     and DBT_INTERNAL_DEST.dbt_valid_to is null
     and DBT_INTERNAL_SOURCE.dbt_change_type in ('delete')
        then update
        set dbt_valid_to = DBT_INTERNAL_SOURCE.dbt_valid_to
    
    when matched
      and DBT_INTERNAL_SOURCE.dbt_change_type in ('update1')
          then update
          set 
              dbt_valid_from = DBT_INTERNAL_SOURCE.dbt_valid_from,
              dbt_valid_to = DBT_INTERNAL_SOURCE.dbt_valid_to
    
    when matched
      and DBT_INTERNAL_SOURCE.dbt_change_type in ('update2')
          then update
          set 
              dbt_valid_from = DBT_INTERNAL_SOURCE.dbt_valid_from,
              dbt_valid_to = DBT_INTERNAL_SOURCE.dbt_valid_to,
              dbt_scd_id = DBT_INTERNAL_SOURCE.dbt_new_scd_id
              
    when not matched
     and DBT_INTERNAL_SOURCE.dbt_change_type in ('insert1', 'insert2')
        then insert ({{ insert_cols_csv }})
        values ({{ insert_cols_csv }})
    
{% endmacro %}

Who will this benefit?

This feature will help to snapshot historical data in any order, it does not matter in what order they feed the data to the dbt snapshots. There is always a possibility that the organizations get access to data that is older than the data with which they started to generate the snapshots. This feature will allow them to feed the historical data in any order and they would still be able to generate snapshots as if the changes in the data were captured in the chronological order.

Are you interested in contributing this feature?

Already made some changes in the source code as described above.

Anything else?

No response

@HansalShah007 HansalShah007 added enhancement New feature or request triage labels Mar 8, 2023
@github-actions github-actions bot changed the title Tracking changes in the non-chronological insertions of data in dbt snapshots (check strategy) [CT-2280] Tracking changes in the non-chronological insertions of data in dbt snapshots (check strategy) Mar 8, 2023
@dbeatty10 dbeatty10 added the snapshots Issues related to dbt's snapshot functionality label Mar 8, 2023
@dbeatty10 dbeatty10 self-assigned this Mar 8, 2023
@dbeatty10
Copy link
Contributor

Thanks for opening this @HansalShah007 ! And thanks for a fantastic write-up 🤩

You are very correct that non-chronological snapshots are not currently supported and will lead to undesirable output 😢

We'd be open to considering an update to snapshots that could support any order. The ideal implementation would be backwards-compatible, work across (most) databases, and have a similar (or better) execution time to the current implementation.

Questions

How are you feeding the data to the dbt snapshots in your example? Do you have one table per day similar to the following (with a date or timestamp at the end of the table name)?

  • customers_2023_03_01
  • customers_2023_03_02
  • customers_2023_03_03

If so, presumably you'd have some way of parsing the applicable datetime and passing it to the updated_at snapshot config?

Or is there some other way you are feeding the data in?

An alternative

This is the 2nd problem listed in the discussion in #7018 -- one alternative would be to detect if a snapshot is out-of-order and raise an exception rather than inserting. (Would love your feedback on any of the unrelated problems/solutions too!)

The upside of raising an error is that no snapshots are inserted out-of-order. But the downside is that adding data that is older isn't possible (which would preclude the situation you described where an organization gets access to older data).

Something related

You discussed the check strategy specifically, but the timestamp strategy has a similar chronological requirement when invalidate_hard_deletes: True. We'd need something like a new hard_deletes_updated_at parameter discussed here in order to support non-chronological snapshots for the timestamp strategy.

@HansalShah007
Copy link
Author

HansalShah007 commented Mar 12, 2023

Hey @dbeatty10! Thanks for responding back.

Answers

  • Yes, I have one table per day with a similar format for the filename. I am generating a as_of_datetime column for all the rows inside the file using the date available at the end of the filename and giving that as the updated_at parameter to the snapshot configuration.

Views on the alternative

  • The solution (updates in the source code) already handles the out-of-order check and updates the existing snapshot table rows accordingly. Instead of raising an error, it updates the rows as per the logic described in my initial issue. As far as the time complexity of the solution is concerned, it is O(r*c) where r = max (number_of_rows_in_source_table, number_of_rows_in_snapshot_table) and c = number of columns in the check_cols parameter. This is the same as the time complexity of the current source code.
  • Ideally the solution should work for all the databases that currently support snapshot check strategy. However, I am not very sure about what you mean by backwards-compatibility here.

Updates made in solution since the issue was opened

  • I have made some changes in the default__snapshot_staging_table since the issue was opened. I have also introduced a way to handle hard deletes for the case when one batch of data belongs to only one date, i.e., all rows in the source table belong to the same timestamp. So, if we have the following entries in the snapshot table (as shown below) for a particular unique_id, and then it is found that a row for this unique_id was missing for a timestamp in the history, then the records will be invalidated accordingly. The example below will make things clear.

Snapshot Table

unique_id check_cols dbt_scd_id dbt_updated_at dbt_valid_from dbt_valid_to
1 X checksum1 2022-07-16T03:58:00.000+0000 2022-07-15T03:58:00.000+0000 2022-07-17T03:58:00.000+0000
1 Y checksum2 2022-07-15T03:58:00.000+0000 2022-07-17T03:58:00.000+0000

Now, if the data for unique_id 1 was missing from the batch of data belonging to the 2022-07-16T03:58:00.000+0000 timestamp, then the snapshot table should now look something like this.

unique_id check_cols dbt_scd_id dbt_updated_at dbt_valid_from dbt_valid_to
1 X checksum1 2022-07-16T03:58:00.000+0000 2022-07-15T03:58:00.000+0000 2022-07-16T03:58:00.000+0000
1 Y checksum2 2022-07-15T03:58:00.000+0000 2022-07-17T03:58:00.000+0000

I am not very sure about how this logic will work out for timestamp strategy.

Updated Snapshot Staging Table

{% macro default__snapshot_staging_table(strategy, source_sql, target_relation) -%}

  {%- call statement('batch_datetime', fetch_result=True) -%}
      select 
        max({{ strategy.updated_at }}) as batch_datetime 
      from (
        {{ source_sql }}
      ) source_table
  {%- endcall -%}

  {%- set batch_datetime = load_result('batch_datetime')['data'][0][0] -%}

    with snapshot_query as (

        {{ source_sql }}

    ),

    snapshotted_data as (

        select *,
            {{ strategy.unique_key }} as dbt_unique_key

        from {{ target_relation }}
        where dbt_valid_to is null

    ),

    insertions_source_data as (

        select
            *,
            {{ strategy.unique_key }} as dbt_unique_key,
            {{ strategy.updated_at }} as dbt_updated_at,
            {{ strategy.updated_at }} as dbt_valid_from,
            nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as dbt_valid_to,
            {{ strategy.scd_id }} as dbt_scd_id

        from snapshot_query
    ),

    updates_source_data as (

        select
            *,
            {{ strategy.unique_key }} as dbt_unique_key,
            {{ strategy.updated_at }} as dbt_updated_at,
            {{ strategy.updated_at }} as dbt_valid_from,
            {{ strategy.updated_at }} as dbt_valid_to

        from snapshot_query
    ),

    {%- if strategy.invalidate_hard_deletes %}

    deletes_source_data as (

        select
            *,
            {{ strategy.updated_at }} as dbt_valid_from,
            {{ strategy.unique_key }} as dbt_unique_key
        from snapshot_query
    ),
    {% endif %}

    inserts1 as (

        select
            'insert1' as dbt_change_type,
            source_data.*
            
        from insertions_source_data as source_data
        left outer join (
          select
            t.* except(t.rn)
          from (
            select
              snap_data.*,
              row_number() over(partition by snap_data.dbt_unique_key order by snap_data.dbt_valid_from desc) as rn
            from (
              select
                *,
                {{ strategy.unique_key }} as dbt_unique_key
              from {{ target_relation }}
            ) snap_data
          ) t
          where t.rn = 1
        ) snapshotted_data 
        on 
          snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
        where snapshotted_data.dbt_unique_key is null
           or (
                snapshotted_data.dbt_unique_key is not null
            and (
                {{ strategy.row_changed }}
            )
            and coalesce(snapshotted_data.dbt_valid_to,snapshotted_data.dbt_valid_from) < source_data.dbt_valid_from
        )

    ),
    
    inserts2 as (

        select
            'insert2' as dbt_change_type,
            source_data.* except(source_data.dbt_valid_to, source_data.dbt_scd_id),
            snapshotted_data.dbt_valid_from as dbt_valid_to,
            source_data.dbt_scd_id

        from insertions_source_data as source_data
        inner join (
          select
            t.* except(t.rn)
          from (
            select
              snap_data.*,
              row_number() over(partition by snap_data.dbt_unique_key order by snap_data.dbt_valid_from asc) as rn
            from (
              select
                *,
                {{ strategy.unique_key }} as dbt_unique_key
              from {{ target_relation }}
            ) snap_data inner join insertions_source_data as source_data
            on
              snap_data.dbt_unique_key = source_data.dbt_unique_key
            where snap_data.dbt_valid_from > source_data.dbt_valid_from
          ) t
          where t.rn = 1
        ) snapshotted_data 
        on 
          source_data.dbt_unique_key = snapshotted_data.dbt_unique_key
        where (
          {{ strategy.row_changed }}
        )

    ),
    
    updates2 as (
      
      select
        'update2' as dbt_change_type,
        snapshotted_data.* except(snapshotted_data.dbt_valid_from, snapshotted_data.dbt_scd_id, snapshotted_data.dbt_valid_to),
        source_data.dbt_valid_from as dbt_valid_from,
        snapshotted_data.dbt_valid_to as dbt_valid_to,
        snapshotted_data.dbt_scd_id as dbt_scd_id,
        source_data.dbt_scd_id as dbt_new_scd_id
        
      from (
        select
          t.* except(t.rn)
        from (
          select
            snap_data.*,
            row_number() over(partition by snap_data.dbt_unique_key order by snap_data.dbt_valid_from asc) as rn
          from (
            select
              *,
              {{ strategy.unique_key }} as dbt_unique_key
            from {{ target_relation }}
          ) snap_data inner join insertions_source_data as source_data
          on
            snap_data.dbt_unique_key = source_data.dbt_unique_key
          where snap_data.dbt_valid_from > source_data.dbt_valid_from
        ) t
        where t.rn = 1
      ) snapshotted_data inner join insertions_source_data as source_data
      on
        snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
      where not (
        {{ strategy.row_changed }}
      )
    ),
    
    updates1 as (
      
      select
        'update1' as dbt_change_type,
        snapshotted_data.* except(snapshotted_data.dbt_valid_to, snapshotted_data.dbt_scd_id, snapshotted_data.dbt_valid_from),
        snapshotted_data.dbt_valid_from as dbt_valid_from, 
        source_data.dbt_valid_to as dbt_valid_to,
        snapshotted_data.dbt_scd_id
      from (
        select
          t.* except(t.rn)
        from (
          select
            snap_data.*,
            row_number() over(partition by snap_data.dbt_unique_key order by snap_data.dbt_valid_from desc) as rn
          from (
            select
              *,
              {{ strategy.unique_key }} as dbt_unique_key
            from {{ target_relation }}
          ) snap_data inner join updates_source_data as source_data
          on
            snap_data.dbt_unique_key = source_data.dbt_unique_key
          where snap_data.dbt_valid_from < source_data.dbt_valid_from
        ) t
        where t.rn = 1
      ) snapshotted_data inner join updates_source_data as source_data
      on
        snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
      where coalesce(snapshotted_data.dbt_valid_to, cast("9999-12-31T00:00:00.000+0000" as TIMESTAMP)) > source_data.dbt_valid_from 
      and (
        {{ strategy.row_changed }}
      )
    )
  
    {%- if strategy.invalidate_hard_deletes -%}
    ,
    
    deletes as (
      
      select
        'delete' as dbt_change_type,
        snapshotted_data.* except(snapshotted_data.dbt_valid_to, snapshotted_data.dbt_scd_id, snapshotted_data.dbt_valid_from, snapshotted_data.dbt_updated_at),
        snapshotted_data.dbt_updated_at as dbt_updated_at,
        snapshotted_data.dbt_valid_from as dbt_valid_from, 
        cast("{{batch_datetime}}" as TIMESTAMP) as dbt_valid_to,
        snapshotted_data.dbt_scd_id
      
      from (
        select
          t.* except(t.rn)
        from (
          select
            snap_data.*,
            row_number() over(partition by snap_data.dbt_unique_key order by snap_data.dbt_valid_from desc) as rn
          from (
            select
              *,
              {{ strategy.unique_key }} as dbt_unique_key
            from {{ target_relation }}
          ) snap_data
          where snap_data.dbt_valid_from < cast("{{batch_datetime}}" as TIMESTAMP)
        ) t
        where t.rn = 1
      ) snapshotted_data left join deletes_source_data as source_data
      on
        snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
      where source_data.dbt_unique_key is null
    )
    
    {%- endif %}
    
    select *, null as dbt_new_scd_id from inserts1
    union all
    select *, null as dbt_new_scd_id from inserts2
    union all
    select *, null as dbt_new_scd_id from updates1
    union all
    select * from updates2
    {%- if strategy.invalidate_hard_deletes %}
    union all
    select *, null as dbt_new_scd_id from deletes
    {% endif %}
    
{%- endmacro %}

@github-actions
Copy link
Contributor

github-actions bot commented Sep 9, 2023

This issue has been marked as Stale because it has been open for 180 days with no activity. If you would like the issue to remain open, please comment on the issue or else it will be closed in 7 days.

@github-actions github-actions bot added the stale Issues that have gone stale label Sep 9, 2023
@sdesmarteau
Copy link

Any updates on this issue?

@HansalShah77
Copy link

No updates yet, from the DBT team. However, I have successfully implemented the changes I have mentioned in the replies above and everything seems to work fine. I have been using those changes made in the macro code for quite a while now and I have not come across a logical error.

@github-actions github-actions bot removed the stale Issues that have gone stale label Sep 13, 2023
@dbeatty10
Copy link
Contributor

Sorry for the radio silence here!

We'd definitely be interested in supporting non-chronological snapshots.

⚠️ The main difficulties are related to not accidentally introducing breaking changes along the way or leaving uncovered edge cases. We only have one 'shot to do things right when running dbt snapshot, so we want to be confident in any code changes.

The considerations are complicated enough that we're likely to take this work on ourselves rather than accept a community-submitted PR, so I'm not going to label this as help_wanted at this time.

See below for some of the most important acceptance criteria for this feature.

🏆 @HansalShah007 has done some pioneering work described here that can inform the implementation 🏆

Acceptance criteria

  • backwards-compatible with pre-existing tables that have been created via snapshots
  • work for both the check and timestamp strategies
  • non-chronological snapshots yield identical output to chronological snapshots
    • support non-chronological hard deletes for both strategies (potentially via a new config parameter like hard_deletes_updated_at discussed here)
  • similar (or better) execution time to the current default implementation
  • (ideally) work within dbt-bigquery, dbt-redshift, and dbt-snowflake "as-is" without needing an adapter-specific override of default__snapshot_staging_table

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request snapshots Issues related to dbt's snapshot functionality
Projects
None yet
Development

No branches or pull requests

4 participants