Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The insert by period materialization for TSQL #56

Merged
merged 1 commit into from
Oct 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 121 additions & 0 deletions macros/materializations/insert_by_period_helpers.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
{% macro get_start_stop_dates(timestamp_field, date_source_models) %}

{% if config.get('start_date', default=none) is not none %}

{%- set start_date = config.get('start_date') -%}
{%- set stop_date = config.get('stop_date', default=none) -%}

{% do return({'start_date': start_date,'stop_date': stop_date}) %}

{% elif date_source_models is not none %}

{% if date_source_models is string %}
{% set date_source_models = [date_source_models] %}
{% endif %}
{% set query_sql %}
WITH stage AS (
{% for source_model in date_source_models %}
SELECT {{ timestamp_field }} FROM {{ ref(source_model) }}
{% if not loop.last %} UNION ALL {% endif %}
{% endfor %})

SELECT MIN({{ timestamp_field }}) AS MIN, MAX({{ timestamp_field }}) AS MAX
FROM stage
{% endset %}

{% set min_max_dict = dbt_utils.get_query_results_as_dict(query_sql) %}

{% set start_date = min_max_dict['MIN'][0] | string %}
{% set stop_date = min_max_dict['MAX'][0] | string %}
{% set min_max_dates = {"start_date": start_date, "stop_date": stop_date} %}

{% do return(min_max_dates) %}

{% else %}
{%- if execute -%}
{{ exceptions.raise_compiler_error("Invalid 'insert_by_period' configuration. Must provide 'start_date' and 'stop_date' and/or 'date_source_models' options.") }}
{%- endif -%}
{% endif %}

{% endmacro %}

{% macro check_placeholder(model_sql, placeholder='__PERIOD_FILTER__') %}

{%- if model_sql.find(placeholder) == -1 -%}
{%- set error_message -%}
Model '{{ model.unique_id }}' does not include the required string '__PERIOD_FILTER__' in its sql
{%- endset -%}
{{ exceptions.raise_compiler_error(error_message) }}
{%- endif -%}

{% endmacro %}

{%- macro replace_placeholder_with_period_filter(core_sql, timestamp_field, start_timestamp, stop_timestamp, offset, period) -%}

{%- set period_filter -%}

(CAST({{ timestamp_field }} AS DATE) >= DATEADD({{period}}, {{offset}}, CAST('{{ start_timestamp }}' AS DATE)) AND
CAST({{ timestamp_field }} AS DATE) < DATEADD({{period}}, {{offset}} + 1, CAST('{{ start_timestamp }}' AS DATE))) AND
(CAST({{ timestamp_field }} AS DATE) >= CAST('{{start_timestamp}}' AS DATE))
{%- endset -%}

{%- set filtered_sql = core_sql | replace("__PERIOD_FILTER__", period_filter) -%}

{% do return(filtered_sql) %}


{%- endmacro %}

{% macro get_period_boundaries(target_schema, target_table, timestamp_field, start_date, stop_date, period) -%}

{% set period_boundary_sql -%}
with data as (
select
coalesce(max({{ timestamp_field }}), '{{ start_date }}') as start_timestamp,
coalesce({{ dbt_utils.dateadd('millisecond', 86399999, "nullif('" ~ stop_date | lower ~ "','none')") }},
{{ dbt_utils.current_timestamp() }} ) as stop_timestamp
from {{ target_schema }}.{{ target_table }}
)
select
start_timestamp,
stop_timestamp,
{{ dbt_utils.datediff('start_timestamp',
'stop_timestamp',
period) }} + 1 as num_periods
from data
{%- endset %}

{% set period_boundaries_dict = dbt_utils.get_query_results_as_dict(period_boundary_sql) %}

{% set period_boundaries = {'start_timestamp': period_boundaries_dict['start_timestamp'][0] | string,
'stop_timestamp': period_boundaries_dict['stop_timestamp'][0] | string,
'num_periods': period_boundaries_dict['num_periods'][0] | int} %}

{% do return(period_boundaries) %}
{%- endmacro %}

{%- macro get_period_of_load(period, offset, start_timestamp) -%}

{% set period_of_load_sql -%}
SELECT DATEADD({{ period }}, {{ offset }}, CAST('{{start_timestamp}}' AS DATE)) AS period_of_load
{%- endset %}

{% set period_of_load_dict = dbt_utils.get_query_results_as_dict(period_of_load_sql) %}

{% set period_of_load = period_of_load_dict['period_of_load'][0] | string %}

{% do return(period_of_load) %}
{%- endmacro -%}

{%- macro get_period_filter_sql(target_cols_csv, base_sql, timestamp_field, period, start_timestamp, stop_timestamp, offset) -%}

{%- set filtered_sql = {'sql': base_sql} -%}

{%- do filtered_sql.update({'sql': replace_placeholder_with_period_filter(filtered_sql.sql,
timestamp_field,
start_timestamp,
stop_timestamp,
offset, period)}) -%}
{{ filtered_sql.sql }}

{%- endmacro %}
179 changes: 179 additions & 0 deletions macros/materializations/insert_by_period_materialization.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@

{% materialization insert_by_period, adapter = 'sqlserver' -%}

{%- set full_refresh_mode = flags.FULL_REFRESH -%}

{%- set target_relation = this -%}
{%- set existing_relation = load_relation(this) -%}
{%- set tmp_relation = sqlserver__make_temp_relation(this) -%}

{%- set target_relation = api.Relation.create(
database = target_relation.database,
schema = target_relation.schema,
identifier = target_relation.identifier,
type = 'table'
) -%}



{%- set timestamp_field = config.require('timestamp_field') -%}
{%- set date_source_models = config.get('date_source_models', default=none) -%}
{%- set unique_key = config.get('unique_key', default=none) -%}

{%- set start_stop_dates = get_start_stop_dates(timestamp_field, date_source_models) | as_native -%}

{%- set period = config.get('period', default='day') -%}

{%- set to_drop = [] -%}

{%- do check_placeholder(sql) -%}

{{ run_hooks(pre_hooks, inside_transaction=False) }}

-- `BEGIN` happens here:
{{ run_hooks(pre_hooks, inside_transaction=True) }}

{% if existing_relation is none %}

{% set filtered_sql = replace_placeholder_with_period_filter(sql, timestamp_field,
start_stop_dates.start_date,
start_stop_dates.stop_date,
0, period) %}
{% set build_sql = sqlserver__create_table_as(False, target_relation, filtered_sql) %}

{% do to_drop.append(tmp_relation) %}

{% elif existing_relation.is_view or full_refresh_mode %}
{#-- Make sure the backup doesn't exist so we don't encounter issues with the rename below #}
{% set backup_identifier = existing_relation.identifier ~ "__dbt_backup" %}
{% set backup_relation = existing_relation.incorporate(path={"identifier": backup_identifier}) %}

{% do adapter.drop_relation(backup_relation) %}
{% do adapter.rename_relation(target_relation, backup_relation) %}

{% set filtered_sql = replace_placeholder_with_period_filter(sql, timestamp_field,
start_stop_dates.start_date,
start_stop_dates.stop_date,
0, period) %}
{% set build_sql = sqlserver__create_table_as(False, target_relation, filtered_sql) %}

{% do to_drop.append(tmp_relation) %}
{% do to_drop.append(backup_relation) %}
{% else %}

{% set period_boundaries = get_period_boundaries(schema,
target_relation.name,
timestamp_field,
start_stop_dates.start_date,
start_stop_dates.stop_date,
period) %}

{% set target_columns = adapter.get_columns_in_relation(target_relation) %}
{%- set target_cols_csv = target_columns | map(attribute='quoted') | join(', ') -%}
{%- set loop_vars = {'sum_rows_inserted': 0} -%}

{% for i in range(1, period_boundaries.num_periods) -%}

{%- set iteration_number = i + 1 -%}
{%- set period_of_load = get_period_of_load(period, i, period_boundaries.start_timestamp) -%}

{{ dbt_utils.log_info("Running for {} {} of {} ({}) [{}]".format(period, iteration_number, period_boundaries.num_periods, period_of_load, model.unique_id)) }}

{%- set tmp_identifier = target_relation.identifier ~ '__dbt_incremental_period' ~ i ~ '_tmp' -%}
{%- set tmp_relation = api.Relation.create(
identifier=tmp_identifier,
database = target_relation.database,
schema = target_relation.schema,
type='table') -%}


{% set tmp_table_sql = get_period_filter_sql(target_cols_csv, sql, timestamp_field, period,
period_boundaries.start_timestamp,
period_boundaries.stop_timestamp, i) %}

{% call statement() -%}
{{ sqlserver__create_table_as(True, tmp_relation, tmp_table_sql) }}
{%- endcall %}

{{ adapter.expand_target_column_types(from_relation=tmp_relation,
to_relation=target_relation) }}

{%- set insert_query_name = 'main-' ~ i -%}
{% call statement(insert_query_name, fetch_result=True) -%}

{%- if unique_key is not none -%}
delete
from {{ target_relation }}
where ({{ unique_key }}) in (
select ({{ unique_key }})
from {{ tmp_relation }}
);
{%- endif %}

insert into {{ target_relation }} ({{ target_cols_csv }})
(
select {{ target_cols_csv }}
from {{ tmp_relation.include(schema=True) }}
);
{%- endcall %}

{% set result = load_result(insert_query_name) %}

{% if 'response' in result.keys() %} {# added in v0.19.0 #}
{% set rows_inserted = result['response']['rows_affected'] %}
{% else %} {# older versions #}
{% set rows_inserted = result['status'].split(" ")[2] | int %}
{% endif %}

{%- set sum_rows_inserted = loop_vars['sum_rows_inserted'] + rows_inserted -%}
{%- do loop_vars.update({'sum_rows_inserted': sum_rows_inserted}) %}

{{ dbt_utils.log_info("Ran for {} {} of {} ({}); {} records inserted [{}]".format(period, iteration_number,
period_boundaries.num_periods,
period_of_load, rows_inserted,
model.unique_id)) }}

{% do to_drop.append(tmp_relation) %}
{% do adapter.commit() %}

{% endfor %}

{% call noop_statement('main', "INSERT {}".format(loop_vars['sum_rows_inserted']) ) -%}
{{ tmp_table_sql }}
{%- endcall %}

{% endif %}

{% if build_sql is defined %}
{% call statement("main", fetch_result=True) %}
{{ build_sql }}
{% endcall %}

{% set result = load_result('main') %}

{% if 'response' in result.keys() %} {# added in v0.19.0 #}
{% set rows_inserted = result['response']['rows_affected'] %}
{% else %} {# older versions #}
{% set rows_inserted = result['status'].split(" ")[2] | int %}
{% endif %}

{% call noop_statement('main', "BASE LOAD {}".format(rows_inserted)) -%}
{{ build_sql }}
{%- endcall %}

{% do adapter.commit() %}
{% endif %}

{{ run_hooks(post_hooks, inside_transaction=True) }}

{% for rel in to_drop %}
{% if rel.type is not none %}
{% do adapter.drop_relation(rel) %}
{% endif %}
{% endfor %}

{{ run_hooks(post_hooks, inside_transaction=False) }}

{{ return({'relations': [target_relation]}) }}

{%- endmaterialization %}