Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(#525) drop existing relation at end of full-refresh incremental build #1682

Merged
merged 4 commits into from
Oct 15, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,18 @@
{{ make_hook_config(sql, inside_transaction=False) }}
{% endmacro %}


{% macro drop_relation_if_exists(relation) %}
{% if relation is not none %}
{{ adapter.drop_relation(relation) }}
{% endif %}
{% endmacro %}


{% macro load_relation(relation) %}
{% do return(adapter.get_relation(
database=relation.database,
schema=relation.schema,
identifier=relation.identifier
)) -%}
{% endmacro %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@

{% macro incremental_upsert(tmp_relation, target_relation, unique_key=none, statement_name="main") %}
{%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%}
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}

{%- if unique_key is not none -%}
delete
from {{ target_relation }}
where ({{ unique_key }}) in (
select ({{ unique_key }})
from {{ tmp_relation.include(schema=False, database=False) }}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this include() necessary/even correct for all databases? I think whatever made your tmp_relation should be giving you the correct include policy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is such a great catch! Yes - this macro should definitely expect the tmp_relation to already have a valid include policy for the given database.

);
{%- endif %}

insert into {{ target_relation }} ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }}
from {{ tmp_relation }}
);
{%- endmacro %}
Original file line number Diff line number Diff line change
@@ -1,81 +1,50 @@
{% macro dbt__incremental_delete(target_relation, tmp_relation) -%}

{%- set unique_key = config.require('unique_key') -%}

delete
from {{ target_relation }}
where ({{ unique_key }}) in (
select ({{ unique_key }})
from {{ tmp_relation.include(schema=False, database=False) }}
);

{%- endmacro %}

{% materialization incremental, default -%}
{%- set unique_key = config.get('unique_key') -%}

{%- set identifier = model['alias'] -%}
{%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%}
{%- set target_relation = api.Relation.create(identifier=identifier, schema=schema, database=database, type='table') -%}
{%- set tmp_relation = make_temp_relation(target_relation) %}

{%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%}
{% set unique_key = config.get('unique_key') %}
{% set full_refresh_mode = flags.FULL_REFRESH %}

{%- set exists_as_table = (old_relation is not none and old_relation.is_table) -%}
{%- set exists_not_as_table = (old_relation is not none and not old_relation.is_table) -%}
{% set target_relation = this %}
{% set existing_relation = load_relation(this) %}
{% set tmp_relation = make_temp_relation(this) %}

{%- set should_drop = (full_refresh_mode or exists_not_as_table) -%}

-- setup
{% if old_relation is none -%}
-- noop
{%- elif should_drop -%}
{{ adapter.drop_relation(old_relation) }}
{%- set old_relation = none -%}
{%- endif %}
{# -- set the type so our rename / drop uses the correct syntax #}
{% set backup_type = existing_relation.type | default("table") %}
{% set backup_relation = make_temp_relation(this, "__dbt_backup").incorporate(type=backup_type) %}

{{ run_hooks(pre_hooks, inside_transaction=False) }}

-- `BEGIN` happens here:
{{ run_hooks(pre_hooks, inside_transaction=True) }}

-- build model
{% if full_refresh_mode or old_relation is none -%}
{%- call statement('main') -%}
{{ create_table_as(False, target_relation, sql) }}
{%- endcall -%}
{%- else -%}
{%- call statement() -%}

{{ dbt.create_table_as(True, tmp_relation, sql) }}

{%- endcall -%}

{{ adapter.expand_target_column_types(from_relation=tmp_relation,
to_relation=target_relation) }}

{%- call statement('main') -%}
{% set dest_columns = adapter.get_columns_in_relation(target_relation) %}
{% set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') %}

{% if unique_key is not none -%}

{{ dbt__incremental_delete(target_relation, tmp_relation) }}

{%- endif %}

insert into {{ target_relation }} ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }}
from {{ tmp_relation }}
);
{% endcall %}
{%- endif %}
{% set to_drop = [] %}
{% if existing_relation is none %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% elif existing_relation.is_view or full_refresh_mode %}
{% do adapter.rename_relation(target_relation, backup_relation) %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% do to_drop.append(backup_relation) %}
{% else %}
{% set tmp_relation = make_temp_relation(target_relation) %}
{% do run_query(create_table_as(True, tmp_relation, sql)) %}
{% do adapter.expand_target_column_types(
from_relation=tmp_relation,
to_relation=target_relation) %}
{% set build_sql = incremental_upsert(tmp_relation, target_relation, unique_key=unique_key) %}
{% endif %}

{% call statement("main") %}
{{ build_sql }}
{% endcall %}

{{ run_hooks(post_hooks, inside_transaction=True) }}

-- `COMMIT` happens here
{{ adapter.commit() }}
{% do adapter.commit() %}

{% for rel in to_drop %}
{% do drop_relation(rel) %}
{% endfor %}

{{ run_hooks(post_hooks, inside_transaction=False) }}

Expand Down
30 changes: 30 additions & 0 deletions plugins/bigquery/dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,36 @@ def execute_model(self, model, materialization, sql_override=None,

return res

@available.parse_none
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to be pedantic about types here: This should probably be @available.parse(lambda *a, **k: True) (or False).

def is_replaceable(self, relation, conf_partition, conf_cluster):
"""
Check if a given partition and clustering column spec for a table
can replace an existing relation in the database. BigQuery does not
allow tables to be replaced with another table that has a different
partitioning spec. This method returns True if the given config spec is
identical to that of the existing table.
"""
try:
table = self.connections.get_bq_table(
database=relation.database,
schema=relation.schema,
identifier=relation.identifier
)
except google.cloud.exceptions.NotFound:
return True

table_partition = table.time_partitioning
if table_partition is not None:
table_partition = table_partition.field

table_cluster = table.clustering_fields

if isinstance(conf_cluster, str):
conf_cluster = [conf_cluster]

return table_partition == conf_partition \
and table_cluster == conf_cluster
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does order matter? If not, we should compare set(table_cluster) == set(conf_cluster).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah -- the order is significant -- clustering works like ordering whereby the table is clustered by the first clustering key, then the second, and so on.

This query fails if you run it twice, swapping the order of the clustering keys on the second run:

create or replace table dbt_dbanin.debug_clustering
partition by date_day
cluster by id, name
as (
  select current_date as date_day, 1 as id, 'drew' as name
);


@available.parse_none
def alter_table_add_columns(self, relation, columns):

Expand Down
9 changes: 6 additions & 3 deletions plugins/bigquery/dbt/include/bigquery/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,12 @@
{% do opts.update({'expiration_timestamp': 'TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL 12 hour)'}) %}
{% endif %}

OPTIONS({% for opt_key, opt_val in opts.items() %}
{{ opt_key }}={{ opt_val }}{{ "," if not loop.last }}
{% endfor %})
{% set options -%}
OPTIONS({% for opt_key, opt_val in opts.items() %}
{{ opt_key }}={{ opt_val }}{{ "," if not loop.last }}
{% endfor %})
{%- endset %}
{% do return(options) %}
{%- endmacro -%}

{% macro bigquery__create_table_as(temporary, relation, sql) -%}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,50 +2,45 @@
{% materialization incremental, adapter='bigquery' -%}

{%- set unique_key = config.get('unique_key') -%}

{%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%}

{%- set identifier = model['alias'] -%}

{%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%}
{%- set target_relation = this %}
{%- set existing_relation = load_relation(this) %}
{%- set tmp_relation = make_temp_relation(this) %}

{%- set target_relation = api.Relation.create(database=database, identifier=identifier, schema=schema, type='table') -%}
{%- set partition_by = config.get('partition_by', none) -%}
{%- set cluster_by = config.get('cluster_by', none) -%}

{%- set exists_as_table = (old_relation is not none and old_relation.is_table) -%}
{%- set exists_not_as_table = (old_relation is not none and not old_relation.is_table) -%}

{%- set should_drop = (full_refresh_mode or exists_not_as_table) -%}
{%- set force_create = (full_refresh_mode) -%}
{{ run_hooks(pre_hooks) }}

-- setup
{% if old_relation is none -%}
-- noop
{%- elif should_drop -%}
{{ adapter.drop_relation(old_relation) }}
{%- set old_relation = none -%}
{%- endif %}
{% if existing_relation is none %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% elif existing_relation.is_view %}
{#-- There's no way to atomically replace a view with a table on BQ --#}
{{ adapter.drop_relation(existing_relation) }}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% elif full_refresh_mode %}
{#-- If the partition/cluster config has changed, then we must drop and recreate --#}
{% if not adapter.is_replaceable(existing_relation, partition_by, cluster_by) %}
{% do log("Hard refreshing " ~ existing_relation ~ " because it is not replaceable") %}
{{ adapter.drop_relation(existing_relation) }}
{% endif %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% else %}
{% set dest_columns = adapter.get_columns_in_relation(existing_relation) %}

{% set source_sql -%}
{#-- wrap sql in parens to make it a subquery --#}
(
{{ sql }}
)
{%- endset -%}


{{ run_hooks(pre_hooks) }}

-- build model
{% if force_create or old_relation is none -%}
{%- call statement('main') -%}
{{ create_table_as(False, target_relation, sql) }}
{%- endcall -%}
{%- else -%}
{% set dest_columns = adapter.get_columns_in_relation(target_relation) %}
{%- call statement('main') -%}
{{ get_merge_sql(target_relation, source_sql, unique_key, dest_columns) }}
{% endcall %}
{%- endif %}
{% set source_sql -%}
(
{{ sql }}
)
{%- endset -%}
{% set build_sql = get_merge_sql(target_relation, source_sql, unique_key, dest_columns) %}
{% endif %}

{%- call statement('main') -%}
{{ build_sql }}
{% endcall %}

{{ run_hooks(post_hooks) }}

Expand Down
Loading