From 13c68c1b3cd52ea08baf0c2f71c968680771f9db Mon Sep 17 00:00:00 2001 From: Jacob Beck Date: Fri, 1 Nov 2019 10:29:40 -0600 Subject: [PATCH] so much for being a SQLAdapter --- core/dbt/adapters/sql/impl.py | 1 - .../bigquery/dbt/adapters/bigquery/impl.py | 45 +++++++++++++++++-- .../dbt/include/bigquery/macros/adapters.sql | 45 +++++-------------- 3 files changed, 52 insertions(+), 39 deletions(-) diff --git a/core/dbt/adapters/sql/impl.py b/core/dbt/adapters/sql/impl.py index 63ef8109b77..e15bc143a2c 100644 --- a/core/dbt/adapters/sql/impl.py +++ b/core/dbt/adapters/sql/impl.py @@ -227,7 +227,6 @@ def check_schema_exists(self, database, schema): identifier='INFORMATION_SCHEMA', quote_policy=self.config.quoting ).information_schema() - information_schema.render() kwargs = {'information_schema': information_schema, 'schema': schema} results = self.execute_macro( diff --git a/plugins/bigquery/dbt/adapters/bigquery/impl.py b/plugins/bigquery/dbt/adapters/bigquery/impl.py index fdf3c2f0374..2dda118ce26 100644 --- a/plugins/bigquery/dbt/adapters/bigquery/impl.py +++ b/plugins/bigquery/dbt/adapters/bigquery/impl.py @@ -4,8 +4,7 @@ import dbt.clients.gcloud import dbt.clients.agate_helper -from dbt.adapters.base import available, RelationType -from dbt.adapters.sql import SQLAdapter +from dbt.adapters.base import BaseAdapter, available, RelationType from dbt.adapters.bigquery.relation import ( BigQueryRelation ) @@ -34,7 +33,7 @@ def _stub_relation(*args, **kwargs): ) -class BigQueryAdapter(SQLAdapter): +class BigQueryAdapter(BaseAdapter): RELATION_TYPES = { 'TABLE': RelationType.Table, @@ -83,6 +82,17 @@ def rename_relation(self, from_relation, to_relation): '`rename_relation` is not implemented for this adapter!' ) + @available + def list_schemas(self, database): + conn = self.connections.get_thread_connection() + client = conn.handle + + with self.connections.exception_handler('list dataset'): + # this is similar to how we have to deal with listing tables + all_datasets = client.list_datasets(project=database, + max_results=10000) + return [ds.dataset_id for ds in all_datasets] + @available.parse(lambda *a, **k: False) def check_schema_exists(self, database: str, schema: str) -> bool: conn = self.connections.get_thread_connection() @@ -96,7 +106,7 @@ def check_schema_exists(self, database: str, schema: str) -> bool: # which appear in neither the information_schema.schemata view nor the # list_datasets method. try: - next(client.list_tables(bigquery_dataset, max_results=1)) + next(iter(client.list_tables(bigquery_dataset, max_results=1))) except StopIteration: pass except google.api_core.exceptions.NotFound: @@ -125,6 +135,33 @@ def expand_target_column_types(self, from_relation, to_relation): # This is a no-op on BigQuery pass + def list_relations_without_caching(self, information_schema, schema): + connection = self.connections.get_thread_connection() + client = connection.handle + + bigquery_dataset = self.connections.dataset( + information_schema.database, information_schema.schema, connection + ) + + all_tables = client.list_tables( + bigquery_dataset, + # BigQuery paginates tables by alphabetizing them, and using + # the name of the last table on a page as the key for the + # next page. If that key table gets dropped before we run + # list_relations, then this will 404. So, we avoid this + # situation by making the page size sufficiently large. + # see: https://github.com/fishtown-analytics/dbt/issues/726 + # TODO: cache the list of relations up front, and then we + # won't need to do this + max_results=100000) + + # This will 404 if the dataset does not exist. This behavior mirrors + # the implementation of list_relations for other adapters + try: + return [self._bq_table_to_relation(table) for table in all_tables] + except google.api_core.exceptions.NotFound: + return [] + def get_relation(self, database, schema, identifier): if self._schema_is_cached(database, schema): # if it's in the cache, use the parent's model of going through diff --git a/plugins/bigquery/dbt/include/bigquery/macros/adapters.sql b/plugins/bigquery/dbt/include/bigquery/macros/adapters.sql index 9589f148ba7..126e6d3cd0f 100644 --- a/plugins/bigquery/dbt/include/bigquery/macros/adapters.sql +++ b/plugins/bigquery/dbt/include/bigquery/macros/adapters.sql @@ -89,44 +89,21 @@ {% endmacro %} -{% macro bigquery__list_schemas(database) -%} - {% set sql %} - select distinct schema_name - from {{ information_schema_name(database) }}.SCHEMATA - where UPPER(catalog_name) like UPPER('{{ database }}') - {% endset %} - {{ return(run_query(sql)) }} -{% endmacro %} - -{% macro empty_table() %} - {# This is the only way I know in jinja to get an empty agate table #} - {% do store_result('_empty_table', '', None) %} - {{ return(load_result('_empty_table')['table']) }} -{% endmacro %} - - {% macro bigquery__list_relations_without_caching(information_schema, schema) -%} - {# In bigquery, you can't query the full information schema, you can only do so - by schema (so 'database.schema.information_schema.tables'). But our schema - value is case-insensitive for annoying reasons involving quoting. So you - have figure out what schemas match the given schema first, and query them each. - #} - {%- set query -%} - select - table_catalog as database, - table_name as name, - table_schema as schema, - case when table_type = 'BASE TABLE' then 'table' - when table_type = 'VIEW' then 'view' - when table_type = 'EXTERNAL TABLE' then 'external' - else table_type - end as table_type - from {{ information_schema.replace(information_schema_view='TABLES') }} - {%- endset -%} - {{ return(run_query(query)) }} + {{ return(adapter.list_relations_without_caching(information_schema, schema)) }} {%- endmacro %} {% macro bigquery__current_timestamp() -%} CURRENT_TIMESTAMP() {%- endmacro %} + + +{% macro bigquery__list_schemas(database) -%} + {{ return(adapter.list_schemas()) }} +{% endmacro %} + + +{% macro bigquery__check_schema_exists(information_schema, schema) %} + {{ return(adapter.check_schema_exists(information_schema.database, schema)) }} +{% endmacro %}