diff --git a/CHANGELOG.md b/CHANGELOG.md index 37e18e2242b..869464ac31c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,6 +38,7 @@ - On `run_cli` API calls that are passed `--vars` differing from the server's `--vars`, the RPC server rebuilds the manifest for that call. ([#2265](https://github.com/fishtown-analytics/dbt/issues/2265), [#2363](https://github.com/fishtown-analytics/dbt/pull/2363)) - Fix "Object of type Decimal is not JSON serializable" error when BigQuery queries returned numeric types in nested data structures ([#2336](https://github.com/fishtown-analytics/dbt/issues/2336), [#2348](https://github.com/fishtown-analytics/dbt/pull/2348)) - No longer query the information_schema.schemata view on bigquery ([#2320](https://github.com/fishtown-analytics/dbt/issues/2320), [#2382](https://github.com/fishtown-analytics/dbt/pull/2382)) +- Add support for `sql_header` config in incremental models ([#2136](https://github.com/fishtown-analytics/dbt/issues/2136), [#2200](https://github.com/fishtown-analytics/dbt/pull/2200)) ### Under the hood - Added more tests for source inheritance ([#2264](https://github.com/fishtown-analytics/dbt/issues/2264), [#2291](https://github.com/fishtown-analytics/dbt/pull/2291)) diff --git a/core/dbt/include/global_project/macros/materializations/common/merge.sql b/core/dbt/include/global_project/macros/materializations/common/merge.sql index dcbcc1a356d..778fdf8ac72 100644 --- a/core/dbt/include/global_project/macros/materializations/common/merge.sql +++ b/core/dbt/include/global_project/macros/materializations/common/merge.sql @@ -10,14 +10,15 @@ {%- endmacro %} -{% macro get_insert_overwrite_merge_sql(target, source, dest_columns, predicates) -%} - {{ adapter_macro('get_insert_overwrite_merge_sql', target, source, dest_columns, predicates) }} +{% macro get_insert_overwrite_merge_sql(target, source, dest_columns, predicates, include_sql_header=false) -%} + {{ adapter_macro('get_insert_overwrite_merge_sql', target, source, dest_columns, predicates, include_sql_header) }} {%- endmacro %} {% macro default__get_merge_sql(target, source, unique_key, dest_columns, predicates) -%} {%- set predicates = [] if predicates is none else [] + predicates -%} {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} + {%- set sql_header = config.get('sql_header', none) -%} {% if unique_key %} {% set unique_key_match %} @@ -28,6 +29,8 @@ {% do predicates.append('FALSE') %} {% endif %} + {{ sql_header if sql_header is not none }} + merge into {{ target }} as DBT_INTERNAL_DEST using {{ source }} as DBT_INTERNAL_SOURCE on {{ predicates | join(' and ') }} @@ -84,9 +87,12 @@ {% endmacro %} -{% macro default__get_insert_overwrite_merge_sql(target, source, dest_columns, predicates) -%} +{% macro default__get_insert_overwrite_merge_sql(target, source, dest_columns, predicates, include_sql_header) -%} {%- set predicates = [] if predicates is none else [] + predicates -%} {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} + {%- set sql_header = config.get('sql_header', none) -%} + + {{ sql_header if sql_header is not none and include_sql_header }} merge into {{ target }} as DBT_INTERNAL_DEST using {{ source }} as DBT_INTERNAL_SOURCE diff --git a/plugins/bigquery/dbt/include/bigquery/macros/materializations/incremental.sql b/plugins/bigquery/dbt/include/bigquery/macros/materializations/incremental.sql index 18a0c0bc350..6aabf8d7521 100644 --- a/plugins/bigquery/dbt/include/bigquery/macros/materializations/incremental.sql +++ b/plugins/bigquery/dbt/include/bigquery/macros/materializations/incremental.sql @@ -34,7 +34,7 @@ ) {%- endset -%} - {{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate]) }} + {{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header=true) }} {% else %} {# dynamic #} @@ -66,8 +66,12 @@ from {{ tmp_relation }} ); + {# + TODO: include_sql_header is a hack; consider a better approach that includes + the sql_header at the materialization-level instead + #} -- 3. run the merge statement - {{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate]) }}; + {{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header=false) }}; -- 4. clean up the temp table drop table if exists {{ tmp_relation }} diff --git a/plugins/snowflake/dbt/include/snowflake/macros/materializations/merge.sql b/plugins/snowflake/dbt/include/snowflake/macros/materializations/merge.sql index e3a5d5cd085..0c48eb9493a 100644 --- a/plugins/snowflake/dbt/include/snowflake/macros/materializations/merge.sql +++ b/plugins/snowflake/dbt/include/snowflake/macros/materializations/merge.sql @@ -7,9 +7,12 @@ #} {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute='name')) -%} + {%- set sql_header = config.get('sql_header', none) -%} {%- if unique_key is none -%} + {{ sql_header if sql_header is not none }} + insert into {{ target }} ({{ dest_cols_csv }}) ( select {{ dest_cols_csv }} diff --git a/test/integration/022_bigquery_test/models/sql_header_model_incr.sql b/test/integration/022_bigquery_test/models/sql_header_model_incr.sql new file mode 100644 index 00000000000..f93280a3bfd --- /dev/null +++ b/test/integration/022_bigquery_test/models/sql_header_model_incr.sql @@ -0,0 +1,15 @@ + +{{ config(materialized="incremental") }} + +{# This will fail if it is not extracted correctly #} +{% call set_sql_header(config) %} + CREATE TEMPORARY FUNCTION a_to_b(str STRING) + RETURNS STRING AS ( + CASE + WHEN LOWER(str) = 'a' THEN 'b' + ELSE str + END + ); +{% endcall %} + +select a_to_b(dupe) as dupe from {{ ref('view_model') }} diff --git a/test/integration/022_bigquery_test/models/sql_header_model_incr_insert_overwrite.sql b/test/integration/022_bigquery_test/models/sql_header_model_incr_insert_overwrite.sql new file mode 100644 index 00000000000..467a0d8d7b6 --- /dev/null +++ b/test/integration/022_bigquery_test/models/sql_header_model_incr_insert_overwrite.sql @@ -0,0 +1,31 @@ + +{# + Ensure that the insert overwrite incremental strategy + works correctly when a UDF is used in a sql_header. The + failure mode here is that dbt might inject the UDF header + twice: once for the `create table` and then again for the + merge statement. +#} + +{{ config( + materialized="incremental", + incremental_strategy='insert_overwrite', + partition_by={"field": "dt", "data_type": "date"} +) }} + +{# This will fail if it is not extracted correctly #} +{% call set_sql_header(config) %} + CREATE TEMPORARY FUNCTION a_to_b(str STRING) + RETURNS STRING AS ( + CASE + WHEN LOWER(str) = 'a' THEN 'b' + ELSE str + END + ); +{% endcall %} + +select + current_date() as dt, + a_to_b(dupe) as dupe + +from {{ ref('view_model') }} diff --git a/test/integration/022_bigquery_test/models/sql_header_model_incr_insert_overwrite_static.sql b/test/integration/022_bigquery_test/models/sql_header_model_incr_insert_overwrite_static.sql new file mode 100644 index 00000000000..4d760a0fd96 --- /dev/null +++ b/test/integration/022_bigquery_test/models/sql_header_model_incr_insert_overwrite_static.sql @@ -0,0 +1,32 @@ + +{# + Ensure that the insert overwrite incremental strategy + works correctly when a UDF is used in a sql_header. The + failure mode here is that dbt might inject the UDF header + twice: once for the `create table` and then again for the + merge statement. +#} + +{{ config( + materialized="incremental", + incremental_strategy='insert_overwrite', + partition_by={"field": "dt", "data_type": "date"}, + partitions=["'2020-01-1'"] +) }} + +{# This will fail if it is not extracted correctly #} +{% call set_sql_header(config) %} + CREATE TEMPORARY FUNCTION a_to_b(str STRING) + RETURNS STRING AS ( + CASE + WHEN LOWER(str) = 'a' THEN 'b' + ELSE str + END + ); +{% endcall %} + +select + cast('2020-01-01' as date) as dt, + a_to_b(dupe) as dupe + +from {{ ref('view_model') }} diff --git a/test/integration/022_bigquery_test/test_simple_bigquery_view.py b/test/integration/022_bigquery_test/test_simple_bigquery_view.py index b4869e2bfa2..690955862a3 100644 --- a/test/integration/022_bigquery_test/test_simple_bigquery_view.py +++ b/test/integration/022_bigquery_test/test_simple_bigquery_view.py @@ -55,7 +55,7 @@ def test__bigquery_simple_run(self): self.run_dbt(['seed', '--full-refresh']) results = self.run_dbt() # Bump expected number of results when adding new model - self.assertEqual(len(results), 8) + self.assertEqual(len(results), 11) self.assert_nondupes_pass() @@ -66,7 +66,7 @@ class TestUnderscoreBigQueryRun(TestBaseBigQueryRun): def test_bigquery_run_twice(self): self.run_dbt(['seed']) results = self.run_dbt() - self.assertEqual(len(results), 8) + self.assertEqual(len(results), 11) results = self.run_dbt() - self.assertEqual(len(results), 8) + self.assertEqual(len(results), 11) self.assert_nondupes_pass()