-
Notifications
You must be signed in to change notification settings - Fork 12
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Feature/38 incremental materialization strategies (#62)
* going back to incremental default implementation - splitting delete and insert into two statements * default implementation from core - bug in need_swap * parallel pytest worker and default python 3.10.9 * removing python versioning bc poetry is not able to manage it * new envlist format
- Loading branch information
Showing
10 changed files
with
178 additions
and
225 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
134 changes: 68 additions & 66 deletions
134
dbt/include/exasol/macros/materializations/incremental.sql
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,91 +1,93 @@ | ||
{% macro incremental_delete(target_relation, tmp_relation) -%} | ||
{%- set unique_key = config.get('unique_key') -%} | ||
|
||
{% if unique_key %} | ||
{% if unique_key is sequence and unique_key is not string %} | ||
delete from {{ target_relation }} | ||
where exists (select 1 from {{ tmp_relation }} | ||
where | ||
{% for key in unique_key %} | ||
{{ tmp_relation }}.{{ key }} = {{ target_relation }}.{{ key }} | ||
{{ "and " if not loop.last }} | ||
{% endfor %} | ||
); | ||
{% else %} | ||
delete from {{ target_relation }} | ||
where ( | ||
{{ unique_key }}) in ( | ||
select ({{ unique_key }}) | ||
from {{ tmp_relation }} | ||
); | ||
{% endif %} | ||
{%endif%} | ||
{%- endmacro %} | ||
|
||
{% macro incremental_insert(tmp_relation, target_relation, unique_key=none, statement_name="main") %} | ||
{%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%} | ||
{%- set dest_cols_csv = dest_columns | join(', ', attribute='name') -%} | ||
|
||
insert into {{ target_relation }} ({{ dest_cols_csv }}) | ||
( | ||
select {{ dest_cols_csv }} | ||
from {{ tmp_relation.schema }}.{{ tmp_relation.identifier }} | ||
); | ||
{%- endmacro %} | ||
{% materialization incremental, adapter='exasol' -%} | ||
|
||
-- relations | ||
{%- set existing_relation = load_relation(this) -%} | ||
|
||
{% materialization incremental, adapter='exasol' -%} | ||
{%- set target_relation = this.incorporate(type='table') -%} | ||
{%- set temp_relation = make_temp_relation(target_relation)-%} | ||
{%- set intermediate_relation = make_intermediate_relation(target_relation)-%} | ||
{%- set backup_relation_type = 'table' if existing_relation is none else existing_relation.type -%} | ||
{%- set backup_relation = make_backup_relation(target_relation, backup_relation_type) -%} | ||
|
||
-- configs | ||
{%- set unique_key = config.get('unique_key') -%} | ||
{%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%} | ||
{%- set identifier = model['alias'] -%} | ||
{%- set target_relation = api.Relation.create(identifier=identifier, schema=schema, database=database, type='table') -%} | ||
{% set existing_relation = adapter.get_relation(database=database, schema=schema, identifier = identifier) %} | ||
{% set tmp_relation = make_temp_relation(target_relation) %} | ||
{%- set full_refresh_mode = (should_full_refresh() or existing_relation.is_view) -%} | ||
{%- set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') -%} | ||
|
||
-- the temp_ and backup_ relations should not already exist in the database; get_relation | ||
-- will return None in that case. Otherwise, we get a relation that we can drop | ||
-- later, before we try to use this name for the current operation. This has to happen before | ||
-- BEGIN, in a separate transaction | ||
{%- set preexisting_intermediate_relation = load_cached_relation(intermediate_relation)-%} | ||
{%- set preexisting_backup_relation = load_cached_relation(backup_relation) -%} | ||
-- grab current tables grants config for comparision later on | ||
{% set grant_config = config.get('grants') %} | ||
{{ drop_relation_if_exists(preexisting_intermediate_relation) }} | ||
{{ drop_relation_if_exists(preexisting_backup_relation) }} | ||
|
||
-- grab current tables grants config for comparision later on | ||
{%- set grant_config = config.get('grants') -%} | ||
|
||
-- setup | ||
{{ run_hooks(pre_hooks, inside_transaction=False) }} | ||
|
||
-- `BEGIN` happens here: | ||
{{ run_hooks(pre_hooks, inside_transaction=True) }} | ||
|
||
{% set to_drop = [] %} | ||
|
||
{% if existing_relation is none %} | ||
{% set build_sql = create_table_as(False, target_relation, sql) %} | ||
{% elif existing_relation.is_view %} | ||
{% do adapter.drop_relation(existing_relation) %} | ||
{% set build_sql = create_table_as(False, target_relation, sql) %} | ||
{% set build_sql = get_create_table_as_sql(False, target_relation, sql) %} | ||
{% elif full_refresh_mode %} | ||
{% do drop_relation(existing_relation) %} | ||
{% set build_sql = create_table_as(False, target_relation, sql) %} | ||
{% set build_sql = get_create_table_as_sql(False, intermediate_relation, sql) %} | ||
{% set need_swap = true %} | ||
{% else %} | ||
{% do run_query(create_table_as(True, tmp_relation, sql)) %} | ||
{% do run_query(incremental_delete(target_relation, tmp_relation)) %} | ||
{% set build_sql = incremental_insert(tmp_relation, target_relation) %} | ||
{% do run_query(get_create_table_as_sql(True, temp_relation, sql)) %} | ||
{% do adapter.expand_target_column_types( | ||
from_relation=temp_relation, | ||
to_relation=target_relation) %} | ||
{#-- Process schema changes. Returns dict of changes if successful. Use source columns for upserting/merging --#} | ||
{% set dest_columns = process_schema_changes(on_schema_change, temp_relation, existing_relation) %} | ||
{% if not dest_columns %} | ||
{% set dest_columns = adapter.get_columns_in_relation(existing_relation) %} | ||
{% endif %} | ||
|
||
{#-- Get the incremental_strategy, the macro to use for the strategy, and build the sql --#} | ||
{% set incremental_strategy = config.get('incremental_strategy') or 'default' %} | ||
{% set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) %} | ||
{% set strategy_sql_macro_func = adapter.get_incremental_strategy_macro(context, incremental_strategy) %} | ||
{% set strategy_arg_dict = ({'target_relation': target_relation, 'temp_relation': temp_relation, 'unique_key': unique_key, 'dest_columns': dest_columns, 'incremental_predicates': incremental_predicates }) %} | ||
{% set build_sql = strategy_sql_macro_func(strategy_arg_dict) %} | ||
|
||
{% endif %} | ||
|
||
|
||
{%- call statement('main') -%} | ||
{{ build_sql }} | ||
{%- endcall -%} | ||
{% call statement("main") %} | ||
{{ build_sql }} | ||
{% endcall %} | ||
|
||
{% if tmp_relation is not none %} | ||
{% do adapter.drop_relation(tmp_relation) %} | ||
{% if need_swap %} | ||
{% do adapter.rename_relation(existing_relation, backup_relation) %} | ||
{% do adapter.rename_relation(intermediate_relation, target_relation) %} | ||
{% do to_drop.append(backup_relation) %} | ||
{% endif %} | ||
|
||
{{ run_hooks(post_hooks, inside_transaction=True) }} | ||
|
||
{% set should_revoke = should_revoke(existing_relation, full_refresh_mode=full_refresh_mode) %} | ||
{% do apply_grants(target_relation, grant_config, should_revoke) %} | ||
{% set should_revoke = should_revoke(existing_relation, full_refresh_mode) %} | ||
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %} | ||
|
||
{% do persist_docs(target_relation, model) %} | ||
|
||
{% if existing_relation is none or existing_relation.is_view or should_full_refresh() %} | ||
{% do create_indexes(target_relation) %} | ||
{% endif %} | ||
|
||
{{ run_hooks(post_hooks, inside_transaction=True) }} | ||
|
||
-- `COMMIT` happens here | ||
{{ adapter.commit() }} | ||
{% do adapter.commit() %} | ||
|
||
{{ run_hooks(post_hooks, inside_transaction=False) }} | ||
{% for rel in to_drop %} | ||
{% do adapter.drop_relation(rel) %} | ||
{% endfor %} | ||
|
||
{% do persist_docs(target_relation, model) %} | ||
{{ run_hooks(post_hooks, inside_transaction=False) }} | ||
|
||
{{ return({'relations': [target_relation]}) }} | ||
{%- endmaterialization %} | ||
|
||
{%- endmaterialization %} |
9 changes: 9 additions & 0 deletions
9
dbt/include/exasol/macros/materializations/incremental_strategies.sql
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
{% macro exasol__get_incremental_default_sql(arg_dict) %} | ||
|
||
{% if arg_dict["unique_key"] %} | ||
{% do return(get_incremental_delete_insert_sql(arg_dict)) %} | ||
{% else %} | ||
{% do return(get_incremental_append_sql(arg_dict)) %} | ||
{% endif %} | ||
|
||
{% endmacro %} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
{% macro exasol__get_delete_insert_merge_sql(target, source, unique_key, dest_columns) -%} | ||
|
||
{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} | ||
|
||
{% if unique_key %} | ||
{% if unique_key is sequence and unique_key is not string %} | ||
delete from {{ target }} | ||
where exists ( select 1 from {{ source }} | ||
where | ||
{% for key in unique_key %} | ||
{{ source }}.{{ key }} = {{ target }}.{{ key }} | ||
{{ "and " if not loop.last }} | ||
{% endfor %} | ||
) | ||
{% else %} | ||
delete from {{ target }} | ||
where ( | ||
{{ unique_key }}) in ( | ||
select ({{ unique_key }}) | ||
from {{ source }} | ||
) | ||
|
||
{% endif %} | ||
{% endif %} | ||
|
||
|SEPARATEMEPLEASE| | ||
|
||
insert into {{ target }} ({{ dest_cols_csv }}) | ||
( | ||
select {{ dest_cols_csv }} | ||
from {{ source }} | ||
) | ||
|
||
{%- endmacro %} |
This file was deleted.
Oops, something went wrong.
Oops, something went wrong.