-
Notifications
You must be signed in to change notification settings - Fork 179
/
merge.sql
81 lines (60 loc) · 3.21 KB
/
merge.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
{% macro snowflake__get_merge_sql(target, source_sql, unique_key, dest_columns, incremental_predicates) -%}
{#
Workaround for Snowflake not being happy with a merge on a constant-false predicate.
When no unique_key is provided, this macro will do a regular insert. If a unique_key
is provided, then this macro will do a proper merge instead.
#}
{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute='name')) -%}
{%- set sql_header = config.get('sql_header', none) -%}
{%- set dml -%}
{%- if unique_key is none -%}
{{ sql_header if sql_header is not none }}
insert into {{ target }} ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }}
from {{ source_sql }}
)
{%- else -%}
{{ default__get_merge_sql(target, source_sql, unique_key, dest_columns, incremental_predicates) }}
{%- endif -%}
{%- endset -%}
{% do return(snowflake_dml_explicit_transaction(dml)) %}
{% endmacro %}
{% macro snowflake__get_delete_insert_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) %}
{% set dml = default__get_delete_insert_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) %}
{% do return(snowflake_dml_explicit_transaction(dml)) %}
{% endmacro %}
{% macro snowflake__snapshot_merge_sql(target, source, insert_cols) %}
{% set dml = default__snapshot_merge_sql(target, source, insert_cols) %}
{% do return(snowflake_dml_explicit_transaction(dml)) %}
{% endmacro %}
{% macro snowflake__get_incremental_append_sql(get_incremental_append_sql) %}
{% set dml = default__get_incremental_append_sql(get_incremental_append_sql) %}
{% do return(snowflake_dml_explicit_transaction(dml)) %}
{% endmacro %}
{% macro snowflake__get_incremental_microbatch_sql(arg_dict) %}
{%- set target = arg_dict["target_relation"] -%}
{%- set source = arg_dict["temp_relation"] -%}
{%- set dest_columns = arg_dict["dest_columns"] -%}
{%- set incremental_predicates = [] if arg_dict.get('incremental_predicates') is none else arg_dict.get('incremental_predicates') -%}
{#-- Add additional incremental_predicates to filter for batch --#}
{% if model.config.get("__dbt_internal_microbatch_event_time_start") -%}
{% do incremental_predicates.append("DBT_INTERNAL_TARGET." ~ model.config.event_time ~ " >= TIMESTAMP '" ~ model.config.__dbt_internal_microbatch_event_time_start ~ "'") %}
{% endif %}
{% if model.config.__dbt_internal_microbatch_event_time_end -%}
{% do incremental_predicates.append("DBT_INTERNAL_TARGET." ~ model.config.event_time ~ " < TIMESTAMP '" ~ model.config.__dbt_internal_microbatch_event_time_end ~ "'") %}
{% endif %}
{% do arg_dict.update({'incremental_predicates': incremental_predicates}) %}
delete from {{ target }} DBT_INTERNAL_TARGET
where (
{% for predicate in incremental_predicates %}
{%- if not loop.first %}and {% endif -%} {{ predicate }}
{% endfor %}
);
{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
insert into {{ target }} ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }}
from {{ source }}
)
{% endmacro %}