Skip to content

Commit

Permalink
Merge pull request #2200 from fishtown-analytics/fix/sql-header-in-in…
Browse files Browse the repository at this point in the history
…crementals

Fix missing sql_header for incremental models
  • Loading branch information
drewbanin authored May 1, 2020
2 parents 6e1665d + f3d4377 commit 68babfb
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
- On `run_cli` API calls that are passed `--vars` differing from the server's `--vars`, the RPC server rebuilds the manifest for that call. ([#2265](https://github.com/fishtown-analytics/dbt/issues/2265), [#2363](https://github.com/fishtown-analytics/dbt/pull/2363))
- Fix "Object of type Decimal is not JSON serializable" error when BigQuery queries returned numeric types in nested data structures ([#2336](https://github.com/fishtown-analytics/dbt/issues/2336), [#2348](https://github.com/fishtown-analytics/dbt/pull/2348))
- No longer query the information_schema.schemata view on bigquery ([#2320](https://github.com/fishtown-analytics/dbt/issues/2320), [#2382](https://github.com/fishtown-analytics/dbt/pull/2382))
- Add support for `sql_header` config in incremental models ([#2136](https://github.com/fishtown-analytics/dbt/issues/2136), [#2200](https://github.com/fishtown-analytics/dbt/pull/2200))

### Under the hood
- Added more tests for source inheritance ([#2264](https://github.com/fishtown-analytics/dbt/issues/2264), [#2291](https://github.com/fishtown-analytics/dbt/pull/2291))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@
{%- endmacro %}


{% macro get_insert_overwrite_merge_sql(target, source, dest_columns, predicates) -%}
{{ adapter_macro('get_insert_overwrite_merge_sql', target, source, dest_columns, predicates) }}
{% macro get_insert_overwrite_merge_sql(target, source, dest_columns, predicates, include_sql_header=false) -%}
{{ adapter_macro('get_insert_overwrite_merge_sql', target, source, dest_columns, predicates, include_sql_header) }}
{%- endmacro %}


{% macro default__get_merge_sql(target, source, unique_key, dest_columns, predicates) -%}
{%- set predicates = [] if predicates is none else [] + predicates -%}
{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
{%- set sql_header = config.get('sql_header', none) -%}

{% if unique_key %}
{% set unique_key_match %}
Expand All @@ -28,6 +29,8 @@
{% do predicates.append('FALSE') %}
{% endif %}

{{ sql_header if sql_header is not none }}

merge into {{ target }} as DBT_INTERNAL_DEST
using {{ source }} as DBT_INTERNAL_SOURCE
on {{ predicates | join(' and ') }}
Expand Down Expand Up @@ -84,9 +87,12 @@
{% endmacro %}


{% macro default__get_insert_overwrite_merge_sql(target, source, dest_columns, predicates) -%}
{% macro default__get_insert_overwrite_merge_sql(target, source, dest_columns, predicates, include_sql_header) -%}
{%- set predicates = [] if predicates is none else [] + predicates -%}
{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
{%- set sql_header = config.get('sql_header', none) -%}

{{ sql_header if sql_header is not none and include_sql_header }}

merge into {{ target }} as DBT_INTERNAL_DEST
using {{ source }} as DBT_INTERNAL_SOURCE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
)
{%- endset -%}

{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate]) }}
{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header=true) }}

{% else %} {# dynamic #}

Expand Down Expand Up @@ -66,8 +66,12 @@
from {{ tmp_relation }}
);

{#
TODO: include_sql_header is a hack; consider a better approach that includes
the sql_header at the materialization-level instead
#}
-- 3. run the merge statement
{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate]) }};
{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header=false) }};

-- 4. clean up the temp table
drop table if exists {{ tmp_relation }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@
#}

{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute='name')) -%}
{%- set sql_header = config.get('sql_header', none) -%}

{%- if unique_key is none -%}

{{ sql_header if sql_header is not none }}

insert into {{ target }} ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@

{{ config(materialized="incremental") }}

{# This will fail if it is not extracted correctly #}
{% call set_sql_header(config) %}
CREATE TEMPORARY FUNCTION a_to_b(str STRING)
RETURNS STRING AS (
CASE
WHEN LOWER(str) = 'a' THEN 'b'
ELSE str
END
);
{% endcall %}

select a_to_b(dupe) as dupe from {{ ref('view_model') }}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@

{#
Ensure that the insert overwrite incremental strategy
works correctly when a UDF is used in a sql_header. The
failure mode here is that dbt might inject the UDF header
twice: once for the `create table` and then again for the
merge statement.
#}

{{ config(
materialized="incremental",
incremental_strategy='insert_overwrite',
partition_by={"field": "dt", "data_type": "date"}
) }}

{# This will fail if it is not extracted correctly #}
{% call set_sql_header(config) %}
CREATE TEMPORARY FUNCTION a_to_b(str STRING)
RETURNS STRING AS (
CASE
WHEN LOWER(str) = 'a' THEN 'b'
ELSE str
END
);
{% endcall %}

select
current_date() as dt,
a_to_b(dupe) as dupe

from {{ ref('view_model') }}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@

{#
Ensure that the insert overwrite incremental strategy
works correctly when a UDF is used in a sql_header. The
failure mode here is that dbt might inject the UDF header
twice: once for the `create table` and then again for the
merge statement.
#}

{{ config(
materialized="incremental",
incremental_strategy='insert_overwrite',
partition_by={"field": "dt", "data_type": "date"},
partitions=["'2020-01-1'"]
) }}

{# This will fail if it is not extracted correctly #}
{% call set_sql_header(config) %}
CREATE TEMPORARY FUNCTION a_to_b(str STRING)
RETURNS STRING AS (
CASE
WHEN LOWER(str) = 'a' THEN 'b'
ELSE str
END
);
{% endcall %}

select
cast('2020-01-01' as date) as dt,
a_to_b(dupe) as dupe

from {{ ref('view_model') }}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def test__bigquery_simple_run(self):
self.run_dbt(['seed', '--full-refresh'])
results = self.run_dbt()
# Bump expected number of results when adding new model
self.assertEqual(len(results), 8)
self.assertEqual(len(results), 11)
self.assert_nondupes_pass()


Expand All @@ -66,7 +66,7 @@ class TestUnderscoreBigQueryRun(TestBaseBigQueryRun):
def test_bigquery_run_twice(self):
self.run_dbt(['seed'])
results = self.run_dbt()
self.assertEqual(len(results), 8)
self.assertEqual(len(results), 11)
results = self.run_dbt()
self.assertEqual(len(results), 8)
self.assertEqual(len(results), 11)
self.assert_nondupes_pass()

0 comments on commit 68babfb

Please sign in to comment.