Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CT-1646] [CT-1170] [Bug] Insert overwrite incremental models not using dbt_tmp table #427

Closed
2 tasks done
naveen-shankar opened this issue Sep 13, 2022 · 2 comments · Fixed by #630
Closed
2 tasks done
Labels
bug Something isn't working help_wanted Extra attention is needed

Comments

@naveen-shankar
Copy link

Is this a new bug in dbt-core?

  • I believe this is a new bug in dbt-core
  • I have searched the existing issues, and I could not find an existing issue for this bug

Current Behavior

When I run an incremental table update using the insert_overwrite strategy, the created dbt_tmp table isn't used to update the destination table. Instead, the model code is rerun from scratch.

Expected Behavior

Rather than rerunning the model code from scratch, I expected the destination table update to use the created dbt_tmp table.

Steps To Reproduce

The specific model is called gross_and_net_churn.sql. I'll exclude the main body of the model for brevity (I've confirmed this is happening with multiple models), but the config params are:

{% set partitions_to_replace = days_between_two_dates(var("subscriptions_crawler_start_date"), var("subscriptions_crawler_end_date")) %}

{{
    config(
        materialized="incremental",
        incremental_strategy="insert_overwrite",
        partition_by = {"field": "report_date", "data_type": "timestamp", "granularity": "day"},
        partitions = partitions_to_replace,
        cluster_by = ["report_date", "client"]
    )
}}

On dbt CLI, I ran:

dbt run --select models/subscription_retention/gross_and_net_churn.sql  

I then looked at which queries got executed in the BigQuery console.

The first query run created the dbt_tmp table:

/* {"app": "dbt", "dbt_version": "1.2.1", "profile_name": "data_science", "target_name": "dev", "node_id": "model.subscriptions.gross_and_net_churn"} */

  create or replace table `duolingo-data-science`.`dbt_naveen`.`gross_and_net_churn__dbt_tmp`
  partition by timestamp_trunc(report_date, day)
  cluster by report_date, client
  OPTIONS(
      description="""""",
    
      expiration_timestamp=TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL 12 hour)
    )
  as (

[Main body of model]    

  );

The second query run did the actual update:

/* {"app": "dbt", "dbt_version": "1.2.1", "profile_name": "data_science", "target_name": "dev", "node_id": "model.subscriptions.gross_and_net_churn"} */

    merge into `duolingo-data-science`.`dbt_naveen`.`gross_and_net_churn` as DBT_INTERNAL_DEST
        using (

[Main body of model]          

        ) as DBT_INTERNAL_SOURCE
        on FALSE

    when not matched by source
         and timestamp_trunc(DBT_INTERNAL_DEST.report_date, day) in (
              '2022-08-13', '2022-08-14', '2022-08-15', '2022-08-16', '2022-08-17', '2022-08-18', '2022-08-19', '2022-08-20', '2022-08-21', '2022-08-22', '2022-08-23', '2022-08-24', '2022-08-25', '2022-08-26', '2022-08-27', '2022-08-28', '2022-08-29', '2022-08-30', '2022-08-31', '2022-09-01', '2022-09-02', '2022-09-03', '2022-09-04', '2022-09-05', '2022-09-06', '2022-09-07', '2022-09-08', '2022-09-09', '2022-09-10', '2022-09-11', '2022-09-12'
          ) 
        then delete

    when not matched then insert
        (`report_date`, `client`, `subscription_type`, `is_new_years`, `subscription_tier`, `country_code`, `active_subscriptions`, `expirations`, `expirations_and_winbacks`, `winbacks`)
    values
        (`report_date`, `client`, `subscription_type`, `is_new_years`, `subscription_tier`, `country_code`, `active_subscriptions`, `expirations`, `expirations_and_winbacks`, `winbacks`)

Just as the documentation suggests, I would've expected the second query to use the gross_and_net_churn__dbt_tmp table, rather than rerunning the entire main body of the model. Or in other words, it seems like the second query should be:

using (gross_and_net_churn__dbt_tmp)

rather than

using ([Main body of model])

As far as I can tell, the created gross_and_net_churn__dbt_tmp table isn't used at all, anywhere.

So the output is correct, but dbt is seemingly unnecessarily running the same query twice, leading to a performance hit.

Relevant log output

No response

Environment

- OS: Monterey 12.4
- Python: 3.8.0
- dbt: 1.2.1

Which database adapter are you using with dbt?

bigquery

Additional Context

I've seen this happen with a couple models. In one example, the first query took 17 minutes and the second query took 23 minutes.

@naveen-shankar naveen-shankar added bug Something isn't working triage labels Sep 13, 2022
@github-actions github-actions bot changed the title [Bug] <title> [CT-1170] [Bug] <title> Sep 13, 2022
@naveen-shankar naveen-shankar changed the title [CT-1170] [Bug] <title> [CT-1170] [Bug] Insert overwrite incremental models not using dbt_tmp table Sep 13, 2022
@jtcohen6 jtcohen6 self-assigned this Sep 21, 2022
@dbeatty10
Copy link
Contributor

Thank for noticing and reporting this @naveen-shankar !

Reproduction case

Using dbt=1.3.1, I was able to reproduce the logs you described by using the following example (which I tried to make similar to #424):

models/incremental_example.sql

{% set partitions_to_replace = [
  "date_sub(current_date, interval 1 day)",
  "date_sub(current_date, interval 2 day)"
] %}

{{
    config(
        materialized="incremental",
        incremental_strategy="insert_overwrite",
        cluster_by="id",
        partition_by={
            "field": "date_time",
            "data_type": "datetime",
            "granularity": "day"
        },
        partitions=partitions_to_replace,
        on_schema_change="sync_all_columns"
    )
}}


with data as (

    {% if not is_incremental() %}

        select 1 as id, cast('2020-01-01' as datetime) as date_time union all
        select 2 as id, cast('2020-01-01' as datetime) as date_time union all
        select 3 as id, cast('2020-01-01' as datetime) as date_time union all
        select 4 as id, cast('2020-01-01' as datetime) as date_time

    {% else %}

        -- we want to overwrite the 4 records in the 2020-01-01 partition
        -- with the 2 records below, but add two more in the 2020-01-02 partition
        select 10 as id, cast('2020-01-01' as datetime) as date_time union all
        select 20 as id, cast('2020-01-01' as datetime) as date_time union all
        select 30 as id, cast('2020-01-02' as datetime) as date_time union all
        select 40 as id, cast('2020-01-02' as datetime) as date_time

    {% endif %}

)

select * from data

After running the following commands, the logs in logs/dbt.log contain the output you described:

dbt run -s incremental_example.sql --full-refresh
dbt run -s incremental_example.sql

What's going on

I needed to do a deep dive to inspect what is happening here. Although I don't yet understand all the moving pieces, there are a couple things I can say with confidence:

  • where the dbt_tmp table is used
  • where it isn't used

As far as I can tell, the created gross_and_net_churn__dbt_tmp table isn't used at all, anywhere.

Your gross_and_net_churn__dbt_tmp table is actually used when looking for schema changes. The relevant output in the logs is something like this:

    In `your-project`.`your_schema`.`gross_and_net_churn`:
        Schema changed: False
        Source columns not in target: []
        Target columns not in source: []
        New column types: []

It is using the bigquery Python API under the hood, so that's why you don't see the actual SQL that's using the dbt_tmp table.

the created dbt_tmp table isn't used to update the destination table. Instead, the model code is rerun from scratch

You're absolutely right -- it is rerunning it from scratch. And this in contradiction to the docs that currently state the expected SQL is something like:

...
merge into {{ destination_table }} DEST
using {{ model_name }}__dbt_tmp SRC
on FALSE
...

I don't know why it is rebuilding it rather than that using the _dbt_tmp table though -- it might be intentional, but it might be an accidental oversight too.

Someone on our end will need to inspect this more deeply to determine if it can just reuse the temp table or if it must not use it for some reason.

Since I believe this behavior appears specific to BigQuery, I'm going to move this issue to the relevant repository.

Acceptance criteria

  • We know if the _dbt_tmp temp table can be safely reused in the using clause of the merge into statement or not
  • If it can be re-used, then do it
  • if it can't be re-used, then document why not

@dbeatty10 dbeatty10 removed the triage label Dec 12, 2022
@dbeatty10 dbeatty10 transferred this issue from dbt-labs/dbt-core Dec 12, 2022
@github-actions github-actions bot changed the title [CT-1170] [Bug] Insert overwrite incremental models not using dbt_tmp table [CT-1646] [CT-1170] [Bug] Insert overwrite incremental models not using dbt_tmp table Dec 12, 2022
@jtcohen6 jtcohen6 removed their assignment Jan 12, 2023
@jtcohen6
Copy link
Contributor

jtcohen6 commented Jan 26, 2023

Going to mark this one as help_wanted. It would be a performance improvement, at parity with existing functionality. I've outlined below the code paths that a contributor would need to follow for the fix.


Doug is spot on that, when on_schema_change is enabled, we need to first create the model in a temp table that we can use to detect schema changes.

In that cases, we should be setting tmp_relation_exists to True:

We pass that tmp_relation_exists argument into bq_generate_incremental_insert_overwrite_build_sql:

{% set build_sql = bq_generate_incremental_insert_overwrite_build_sql(
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions
) %}


[Aside: For some reason, the next two macros don't have a keyword argument for tmp_relation_exists — instead, it's called on_schema_change — this doesn't actually break the functionality, but it does make it much more confusing to follow the code path:]

{% macro bq_generate_incremental_insert_overwrite_build_sql(
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, on_schema_change, copy_partitions
) %}

{% set build_sql = bq_insert_overwrite_sql(
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, on_schema_change, copy_partitions
) %}


Finally, we see it clearly here: "dynamic" insert_overwrite accepts a boolean argument for tmp_relation_exists, but "static" insert_overwrite doesn't:

{% if partitions is not none and partitions != [] %} {# static #}
{{ bq_static_insert_overwrite_sql(tmp_relation, target_relation, sql, partition_by, partitions, dest_columns, copy_partitions) }}
{% else %} {# dynamic #}
{{ bq_dynamic_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions) }}
{% endif %}

So I think all we'd need to fix this is, have the "static" version start accepting the tmp_relation_exists argument, and update this conditional logic to use it, instead of just repeating {{ sql }}:

{%- set source_sql -%}
(
{%- if partition_by.time_ingestion_partitioning -%}
{{ wrap_with_time_ingestion_partitioning_sql(build_partition_time_exp(partition_by), sql, True) }}
{%- else -%}
{{sql}}
{%- endif -%}
)
{%- endset -%}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working help_wanted Extra attention is needed
Projects
None yet
3 participants