Skip to content

Commit

Permalink
handle changing partition/cluster configs on BQ
Browse files Browse the repository at this point in the history
  • Loading branch information
drewbanin committed Oct 15, 2019
1 parent 0f1693a commit 95a0587
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 3 deletions.
42 changes: 42 additions & 0 deletions plugins/bigquery/dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,48 @@ def execute_model(self, model, materialization, sql_override=None,

return res

def _get_table(self, relation):
logger.debug('Fetching metadata for relation {}'.format(relation))
conn = self.connections.get_thread_connection()
client = conn.handle
table_ref = self.connections.table_ref(
relation.database,
relation.schema,
relation.identifier,
conn
)

# Handle 404
try:
return client.get_table(table_ref)
except (google.cloud.exceptions.NotFound) as e:
return None

@available.parse_none
def is_replaceable(self, relation, conf_partition, conf_cluster):
"""
Check if a given partition and clustering column spec for a table
can replace an existing relation in the database. BigQuery does not
allow tables to be replaced with another table that has a different
partitioning spec. This method returns True if the given config spec is
identical to that of the existing table.
"""
table = self._get_table(relation)
if not table:
return True

table_partition = table.time_partitioning
if table_partition is not None:
table_partition = table_partition.field

table_cluster = table.clustering_fields

if isinstance(conf_cluster, str):
conf_cluster = [conf_cluster]

return table_partition == conf_partition \
and table_cluster == conf_cluster

@available.parse_none
def alter_table_add_columns(self, relation, columns):

Expand Down
9 changes: 6 additions & 3 deletions plugins/bigquery/dbt/include/bigquery/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,12 @@
{% do opts.update({'expiration_timestamp': 'TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL 12 hour)'}) %}
{% endif %}

OPTIONS({% for opt_key, opt_val in opts.items() %}
{{ opt_key }}={{ opt_val }}{{ "," if not loop.last }}
{% endfor %})
{% set options -%}
OPTIONS({% for opt_key, opt_val in opts.items() %}
{{ opt_key }}={{ opt_val }}{{ "," if not loop.last }}
{% endfor %})
{%- endset %}
{% do return(options) %}
{%- endmacro -%}

{% macro bigquery__create_table_as(temporary, relation, sql) -%}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
{%- set existing_relation = load_relation(this) %}
{%- set tmp_relation = make_temp_relation(this) %}

{%- set partition_by = config.get('partition_by', none) -%}
{%- set cluster_by = config.get('cluster_by', none) -%}

{{ run_hooks(pre_hooks) }}

{% if existing_relation is none %}
Expand All @@ -17,6 +20,11 @@
{{ adapter.drop_relation(existing_relation) }}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% elif full_refresh_mode %}
{#-- If the partition/cluster config has changed, then we must drop and recreate --#}
{% if not adapter.is_replaceable(existing_relation, partition_by, cluster_by) %}
{% do log("Hard refreshing " ~ existing_relation ~ " because it is not replaceable") %}
{{ adapter.drop_relation(existing_relation) }}
{% endif %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% else %}
{% set dest_columns = adapter.get_columns_in_relation(existing_relation) %}
Expand Down

0 comments on commit 95a0587

Please sign in to comment.