Skip to content

Commit

Permalink
so much for being a SQLAdapter
Browse files Browse the repository at this point in the history
  • Loading branch information
Jacob Beck committed Nov 1, 2019
1 parent 1315569 commit 13c68c1
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 39 deletions.
1 change: 0 additions & 1 deletion core/dbt/adapters/sql/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,6 @@ def check_schema_exists(self, database, schema):
identifier='INFORMATION_SCHEMA',
quote_policy=self.config.quoting
).information_schema()
information_schema.render()

kwargs = {'information_schema': information_schema, 'schema': schema}
results = self.execute_macro(
Expand Down
45 changes: 41 additions & 4 deletions plugins/bigquery/dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
import dbt.clients.gcloud
import dbt.clients.agate_helper

from dbt.adapters.base import available, RelationType
from dbt.adapters.sql import SQLAdapter
from dbt.adapters.base import BaseAdapter, available, RelationType
from dbt.adapters.bigquery.relation import (
BigQueryRelation
)
Expand Down Expand Up @@ -34,7 +33,7 @@ def _stub_relation(*args, **kwargs):
)


class BigQueryAdapter(SQLAdapter):
class BigQueryAdapter(BaseAdapter):

RELATION_TYPES = {
'TABLE': RelationType.Table,
Expand Down Expand Up @@ -83,6 +82,17 @@ def rename_relation(self, from_relation, to_relation):
'`rename_relation` is not implemented for this adapter!'
)

@available
def list_schemas(self, database):
conn = self.connections.get_thread_connection()
client = conn.handle

with self.connections.exception_handler('list dataset'):
# this is similar to how we have to deal with listing tables
all_datasets = client.list_datasets(project=database,
max_results=10000)
return [ds.dataset_id for ds in all_datasets]

@available.parse(lambda *a, **k: False)
def check_schema_exists(self, database: str, schema: str) -> bool:
conn = self.connections.get_thread_connection()
Expand All @@ -96,7 +106,7 @@ def check_schema_exists(self, database: str, schema: str) -> bool:
# which appear in neither the information_schema.schemata view nor the
# list_datasets method.
try:
next(client.list_tables(bigquery_dataset, max_results=1))
next(iter(client.list_tables(bigquery_dataset, max_results=1)))
except StopIteration:
pass
except google.api_core.exceptions.NotFound:
Expand Down Expand Up @@ -125,6 +135,33 @@ def expand_target_column_types(self, from_relation, to_relation):
# This is a no-op on BigQuery
pass

def list_relations_without_caching(self, information_schema, schema):
connection = self.connections.get_thread_connection()
client = connection.handle

bigquery_dataset = self.connections.dataset(
information_schema.database, information_schema.schema, connection
)

all_tables = client.list_tables(
bigquery_dataset,
# BigQuery paginates tables by alphabetizing them, and using
# the name of the last table on a page as the key for the
# next page. If that key table gets dropped before we run
# list_relations, then this will 404. So, we avoid this
# situation by making the page size sufficiently large.
# see: https://github.com/fishtown-analytics/dbt/issues/726
# TODO: cache the list of relations up front, and then we
# won't need to do this
max_results=100000)

# This will 404 if the dataset does not exist. This behavior mirrors
# the implementation of list_relations for other adapters
try:
return [self._bq_table_to_relation(table) for table in all_tables]
except google.api_core.exceptions.NotFound:
return []

def get_relation(self, database, schema, identifier):
if self._schema_is_cached(database, schema):
# if it's in the cache, use the parent's model of going through
Expand Down
45 changes: 11 additions & 34 deletions plugins/bigquery/dbt/include/bigquery/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -89,44 +89,21 @@
{% endmacro %}


{% macro bigquery__list_schemas(database) -%}
{% set sql %}
select distinct schema_name
from {{ information_schema_name(database) }}.SCHEMATA
where UPPER(catalog_name) like UPPER('{{ database }}')
{% endset %}
{{ return(run_query(sql)) }}
{% endmacro %}

{% macro empty_table() %}
{# This is the only way I know in jinja to get an empty agate table #}
{% do store_result('_empty_table', '', None) %}
{{ return(load_result('_empty_table')['table']) }}
{% endmacro %}


{% macro bigquery__list_relations_without_caching(information_schema, schema) -%}
{# In bigquery, you can't query the full information schema, you can only do so
by schema (so 'database.schema.information_schema.tables'). But our schema
value is case-insensitive for annoying reasons involving quoting. So you
have figure out what schemas match the given schema first, and query them each.
#}
{%- set query -%}
select
table_catalog as database,
table_name as name,
table_schema as schema,
case when table_type = 'BASE TABLE' then 'table'
when table_type = 'VIEW' then 'view'
when table_type = 'EXTERNAL TABLE' then 'external'
else table_type
end as table_type
from {{ information_schema.replace(information_schema_view='TABLES') }}
{%- endset -%}
{{ return(run_query(query)) }}
{{ return(adapter.list_relations_without_caching(information_schema, schema)) }}
{%- endmacro %}


{% macro bigquery__current_timestamp() -%}
CURRENT_TIMESTAMP()
{%- endmacro %}


{% macro bigquery__list_schemas(database) -%}
{{ return(adapter.list_schemas()) }}
{% endmacro %}


{% macro bigquery__check_schema_exists(information_schema, schema) %}
{{ return(adapter.check_schema_exists(information_schema.database, schema)) }}
{% endmacro %}

0 comments on commit 13c68c1

Please sign in to comment.