Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add UUID/invocation ID to temporary table names #373

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 20 additions & 5 deletions dbt/include/clickhouse/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,27 @@
{%- endcall %}
{% endmacro %}

{% macro clickhouse__make_temp_relation(base_relation, suffix) %}
{% set tmp_identifier = base_relation.identifier ~ suffix %}
{% set tmp_relation = base_relation.incorporate(
{% macro clickhouse__make_intermediate_relation(base_relation, suffix='__dbt_tmp') %}
{%- set intermediate_identifier = base_relation.identifier ~ suffix ~ '_' ~ invocation_id.replace('-', '_') -%}
{%- set intermediate_relation = base_relation.incorporate(path={"identifier": intermediate_identifier}) -%}
{{ return(intermediate_relation) }}
{%- endmacro %}

{% macro clickhouse__make_backup_relation(base_relation, backup_relation_type, suffix='__dbt_backup') %}
{%- set backup_identifier = base_relation.identifier ~ suffix ~ '_' ~ invocation_id.replace('-', '_') -%}
{%- set backup_relation = base_relation.incorporate(
path={"identifier": backup_identifier},
type=backup_relation_type
) -%}
{{ return(backup_relation) }}
{%- endmacro %}

{% macro clickhouse__make_temp_relation(base_relation, suffix='__dbt_tmp') %}
{%- set tmp_identifier = base_relation.identifier ~ suffix ~ '_' ~ invocation_id.replace('-', '_') -%}
{%- set tmp_relation = base_relation.incorporate(
path={"identifier": tmp_identifier, "schema": None}) -%}
{% do return(tmp_relation) %}
{% endmacro %}
{{ return(tmp_relation) }}
{%- endmacro %}


{% macro clickhouse__generate_database_name(custom_database_name=none, node=none) -%}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
{%- set preexisting_intermediate_relation = load_cached_relation(intermediate_relation) -%}
{% endif %}
{% endif %}
{% set view_relation = default__make_temp_relation(target_relation, '__dbt_tmp') %}
{%- set view_relation = make_intermediate_relation(target_relation, '__dbt_view_tmp') -%}
-- drop the temp relations if they exist already in the database
{{ drop_relation_if_exists(preexisting_intermediate_relation) }}
{{ drop_relation_if_exists(preexisting_backup_relation) }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,12 @@
{{ create_schema(target_relation_local) }}
{%- set intermediate_relation = make_intermediate_relation(target_relation_local)-%}
{%- set distributed_intermediate_relation = make_intermediate_relation(target_relation)-%}
{%- set backup_relation_type = 'table' if existing_relation is none else existing_relation.type -%}
{%- set backup_relation = make_backup_relation(target_relation_local, backup_relation_type) -%}
{%- set distributed_backup_relation = make_backup_relation(target_relation, backup_relation_type) -%}
{%- set preexisting_intermediate_relation = load_cached_relation(intermediate_relation)-%}
{%- set preexisting_backup_relation = load_cached_relation(backup_relation) -%}
{%- set view_relation = default__make_temp_relation(target_relation, '__dbt_view_tmp') -%}

{{ drop_relation_if_exists(preexisting_intermediate_relation) }}
{{ drop_relation_if_exists(preexisting_backup_relation) }}
{{ drop_relation_if_exists(view_relation) }}
{{ drop_relation_if_exists(distributed_intermediate_relation) }}

{%- set view_relation = make_intermediate_relation(target_relation, '__dbt_view_tmp') -%}
{{ drop_relation_if_exists(view_relation) }}

{{ run_hooks(pre_hooks, inside_transaction=False) }}
{{ run_hooks(pre_hooks, inside_transaction=True) }}
Expand Down Expand Up @@ -74,6 +69,12 @@
{% endcall %}

{% else %}
{% if existing_relation is none %}
{{ drop_relation_if_exists(existing_relation) }}
{% do run_query(create_distributed_table(target_relation, target_relation_local)) %}
{% set existing_relation = target_relation %}
{% endif %}

{% set incremental_strategy = adapter.calculate_incremental_strategy(config.get('incremental_strategy')) %}
{% set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) %}
{%- if on_schema_change != 'ignore' %}
Expand All @@ -99,6 +100,11 @@
{% endif %}

{% if need_swap %}
{%- set backup_relation_type = 'table' if existing_relation is none else existing_relation.type -%}
{%- set backup_relation = make_backup_relation(target_relation_local, backup_relation_type) -%}
{%- set distributed_backup_relation = make_backup_relation(target_relation, backup_relation_type) -%}
{%- set preexisting_backup_relation = load_cached_relation(backup_relation) -%}
{{ drop_relation_if_exists(preexisting_backup_relation) }}
{% if False %}
{% do adapter.rename_relation(intermediate_relation, backup_relation) %}
{% do exchange_tables_atomic(backup_relation, target_relation_local) %}
Expand All @@ -108,7 +114,7 @@
{% endif %}

-- Structure could have changed, need to update distributed table from replaced local table
{% set target_relation_new = target_relation.incorporate(path={"identifier": target_relation.identifier + '_temp'}) %}
{%- set target_relation_new = make_intermediate_relation(target_relation) -%}
{{ drop_relation_if_exists(target_relation_new) }}
{% do run_query(create_distributed_table(target_relation_new, target_relation_local)) %}

Expand All @@ -124,6 +130,8 @@
{% do to_drop.append(distributed_backup_relation) %}
{% endif %}

{{ drop_relation_if_exists(view_relation) }}

{% set should_revoke = should_revoke(existing_relation, full_refresh_mode) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}
{% do apply_grants(target_relation_local, grant_config, should_revoke=should_revoke) %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,9 @@


{% macro clickhouse__incremental_delete_insert(existing_relation, unique_key, incremental_predicates, is_distributed=False) %}
{% set new_data_relation = existing_relation.incorporate(path={"identifier": existing_relation.identifier
+ '__dbt_new_data_' + invocation_id.replace('-', '_')}) %}
{%- set new_data_relation = make_intermediate_relation(existing_relation, '__dbt_new_data') -%}
{{ drop_relation_if_exists(new_data_relation) }}
{%- set distributed_new_data_relation = existing_relation.incorporate(path={"identifier": existing_relation.identifier + '__dbt_distributed_new_data'}) -%}
{%- set distributed_new_data_relation = make_intermediate_relation(existing_relation, '__dbt_distributed_new_data') -%}

{%- set inserting_relation = new_data_relation -%}

Expand All @@ -223,18 +222,18 @@
{% call statement('delete_existing_data') %}
{% if is_distributed %}
{% set existing_local = existing_relation.incorporate(path={"identifier": this.identifier + local_suffix, "schema": local_db_prefix + this.schema}) if existing_relation is not none else none %}
delete from {{ existing_local }} {{ on_cluster_clause(existing_relation) }} where ({{ unique_key }}) in (select {{ unique_key }}
from {{ inserting_relation }})
delete from {{ existing_local }} {{ on_cluster_clause(existing_relation) }} where ({{ unique_key }}) in (select {{ unique_key }}
from {{ inserting_relation }})
{% else %}
delete from {{ existing_relation }} where ({{ unique_key }}) in (select {{ unique_key }}
from {{ inserting_relation }})
delete from {{ existing_relation }} where ({{ unique_key }}) in (select {{ unique_key }}
from {{ inserting_relation }})
{% endif %}
{%- if incremental_predicates %}
{% for predicate in incremental_predicates %}
and {{ predicate }}
{% endfor %}
{%- endif -%}
{{ adapter.get_model_query_settings(model) }}
{{ adapter.get_model_query_settings(model) }}
{% endcall %}

{%- set dest_columns = adapter.get_columns_in_relation(existing_relation) -%}
Expand All @@ -247,8 +246,7 @@
{% endmacro %}

{% macro clickhouse__incremental_insert_overwrite(existing_relation, intermediate_relation, partition_by) %}
{% set new_data_relation = existing_relation.incorporate(path={"identifier": model['name']
+ '__dbt_new_data_' + invocation_id.replace('-', '_')}) %}
{%- set new_data_relation = make_intermediate_relation(existing_relation, '__dbt_new_data') -%}
{{ drop_relation_if_exists(new_data_relation) }}
{% call statement('create_new_data_temp') -%}
{{ get_create_table_as_sql(False, new_data_relation, sql) }}
Expand Down
2 changes: 1 addition & 1 deletion dbt/include/clickhouse/macros/materializations/table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@
{%- endmacro %}

{% macro clickhouse__insert_into(target_relation, sql, has_contract) %}
{%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%}
{%- set dest_columns = adapter.get_column_schema_from_query(sql) -%}
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}

insert into {{ target_relation }}
Expand Down