From 95a0587499daf897517e45d7887a2a6a690295e7 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Mon, 14 Oct 2019 22:48:52 -0400 Subject: [PATCH] handle changing partition/cluster configs on BQ --- .../bigquery/dbt/adapters/bigquery/impl.py | 42 +++++++++++++++++++ .../dbt/include/bigquery/macros/adapters.sql | 9 ++-- .../macros/materializations/incremental.sql | 8 ++++ 3 files changed, 56 insertions(+), 3 deletions(-) diff --git a/plugins/bigquery/dbt/adapters/bigquery/impl.py b/plugins/bigquery/dbt/adapters/bigquery/impl.py index 596aff2a66c..d9ce05f8cac 100644 --- a/plugins/bigquery/dbt/adapters/bigquery/impl.py +++ b/plugins/bigquery/dbt/adapters/bigquery/impl.py @@ -346,6 +346,48 @@ def execute_model(self, model, materialization, sql_override=None, return res + def _get_table(self, relation): + logger.debug('Fetching metadata for relation {}'.format(relation)) + conn = self.connections.get_thread_connection() + client = conn.handle + table_ref = self.connections.table_ref( + relation.database, + relation.schema, + relation.identifier, + conn + ) + + # Handle 404 + try: + return client.get_table(table_ref) + except (google.cloud.exceptions.NotFound) as e: + return None + + @available.parse_none + def is_replaceable(self, relation, conf_partition, conf_cluster): + """ + Check if a given partition and clustering column spec for a table + can replace an existing relation in the database. BigQuery does not + allow tables to be replaced with another table that has a different + partitioning spec. This method returns True if the given config spec is + identical to that of the existing table. + """ + table = self._get_table(relation) + if not table: + return True + + table_partition = table.time_partitioning + if table_partition is not None: + table_partition = table_partition.field + + table_cluster = table.clustering_fields + + if isinstance(conf_cluster, str): + conf_cluster = [conf_cluster] + + return table_partition == conf_partition \ + and table_cluster == conf_cluster + @available.parse_none def alter_table_add_columns(self, relation, columns): diff --git a/plugins/bigquery/dbt/include/bigquery/macros/adapters.sql b/plugins/bigquery/dbt/include/bigquery/macros/adapters.sql index 60d5d4e509e..719860ea40c 100644 --- a/plugins/bigquery/dbt/include/bigquery/macros/adapters.sql +++ b/plugins/bigquery/dbt/include/bigquery/macros/adapters.sql @@ -37,9 +37,12 @@ {% do opts.update({'expiration_timestamp': 'TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL 12 hour)'}) %} {% endif %} - OPTIONS({% for opt_key, opt_val in opts.items() %} - {{ opt_key }}={{ opt_val }}{{ "," if not loop.last }} - {% endfor %}) + {% set options -%} + OPTIONS({% for opt_key, opt_val in opts.items() %} + {{ opt_key }}={{ opt_val }}{{ "," if not loop.last }} + {% endfor %}) + {%- endset %} + {% do return(options) %} {%- endmacro -%} {% macro bigquery__create_table_as(temporary, relation, sql) -%} diff --git a/plugins/bigquery/dbt/include/bigquery/macros/materializations/incremental.sql b/plugins/bigquery/dbt/include/bigquery/macros/materializations/incremental.sql index 9899eac8cc8..d2e62e4f8e6 100644 --- a/plugins/bigquery/dbt/include/bigquery/macros/materializations/incremental.sql +++ b/plugins/bigquery/dbt/include/bigquery/macros/materializations/incremental.sql @@ -8,6 +8,9 @@ {%- set existing_relation = load_relation(this) %} {%- set tmp_relation = make_temp_relation(this) %} + {%- set partition_by = config.get('partition_by', none) -%} + {%- set cluster_by = config.get('cluster_by', none) -%} + {{ run_hooks(pre_hooks) }} {% if existing_relation is none %} @@ -17,6 +20,11 @@ {{ adapter.drop_relation(existing_relation) }} {% set build_sql = create_table_as(False, target_relation, sql) %} {% elif full_refresh_mode %} + {#-- If the partition/cluster config has changed, then we must drop and recreate --#} + {% if not adapter.is_replaceable(existing_relation, partition_by, cluster_by) %} + {% do log("Hard refreshing " ~ existing_relation ~ " because it is not replaceable") %} + {{ adapter.drop_relation(existing_relation) }} + {% endif %} {% set build_sql = create_table_as(False, target_relation, sql) %} {% else %} {% set dest_columns = adapter.get_columns_in_relation(existing_relation) %}