Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(#1576) use the information schema on BigQuery #1795

Merged
merged 4 commits into from
Oct 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 6 additions & 125 deletions plugins/bigquery/dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import copy

import dbt.deprecations
import dbt.exceptions
import dbt.flags as flags
Expand Down Expand Up @@ -387,127 +385,10 @@ def load_dataframe(self, database, schema, table_name, agate_table,
with self.connections.exception_handler("LOAD TABLE"):
self.poll_until_job_completes(job, timeout)

###
# The get_catalog implementation for bigquery
###
def _flat_columns_in_table(self, table):
"""An iterator over the flattened columns for a given schema and table.
Resolves child columns as having the name "parent.child".
"""
for col in self._get_dbt_columns_from_bq_table(table):
yield from col.flatten()

@classmethod
def _get_stats_column_names(cls):
"""Construct a tuple of the column names for stats. Each stat has 4
columns of data.
"""
columns = []
stats = ('num_bytes', 'num_rows', 'location', 'partitioning_type',
'clustering_fields')
stat_components = ('label', 'value', 'description', 'include')
for stat_id in stats:
for stat_component in stat_components:
columns.append('stats:{}:{}'.format(stat_id, stat_component))
return tuple(columns)

@classmethod
def _get_stats_columns(cls, table, relation_type):
"""Given a table, return an iterator of key/value pairs for stats
column names/values.
"""
column_names = cls._get_stats_column_names()

# agate does not handle the array of column names gracefully
clustering_value = None
if table.clustering_fields is not None:
clustering_value = ','.join(table.clustering_fields)
# cast num_bytes/num_rows to str before they get to agate, or else
# agate will incorrectly decide they are booleans.
column_values = (
'Number of bytes',
str(table.num_bytes),
'The number of bytes this table consumes',
relation_type == 'table',

'Number of rows',
str(table.num_rows),
'The number of rows in this table',
relation_type == 'table',

'Location',
table.location,
'The geographic location of this table',
True,

'Partitioning Type',
table.partitioning_type,
'The partitioning type used for this table',
relation_type == 'table',

'Clustering Fields',
clustering_value,
'The clustering fields for this table',
relation_type == 'table',
)
return zip(column_names, column_values)

def get_catalog(self, manifest):
connection = self.connections.get_thread_connection()
client = connection.handle

schemas = manifest.get_used_schemas()

column_names = (
'table_database',
'table_schema',
'table_name',
'table_type',
'table_comment',
# does not exist in bigquery, but included for consistency
'table_owner',
'column_name',
'column_index',
'column_type',
'column_comment',
)
all_names = column_names + self._get_stats_column_names()
columns = []
def _catalog_filter_table(self, table, manifest):
# BigQuery doesn't allow ":" chars in column names -- remap them here.
table = table.rename(column_names={
col.name: col.name.replace('__', ':') for col in table.columns
})

for database_name, schema_name in schemas:
relations = self.list_relations(database_name, schema_name)
for relation in relations:

# This relation contains a subset of the info we care about.
# Fetch the full table object here
table_ref = self.connections.table_ref(
database_name,
relation.schema,
relation.identifier,
connection
)
table = client.get_table(table_ref)

flattened = self._flat_columns_in_table(table)
relation_stats = dict(self._get_stats_columns(table,
relation.type))

for index, column in enumerate(flattened, start=1):
column_data = (
relation.database,
relation.schema,
relation.name,
relation.type,
None,
None,
column.name,
index,
column.data_type,
None,
)
column_dict = dict(zip(column_names, column_data))
column_dict.update(copy.deepcopy(relation_stats))

columns.append(column_dict)

return dbt.clients.agate_helper.table_from_data(columns, all_names)
return super()._catalog_filter_table(table, manifest)
34 changes: 34 additions & 0 deletions plugins/bigquery/dbt/adapters/bigquery/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
BaseRelation, ComponentName
)
from dbt.utils import filter_null_values
from typing import TypeVar


Self = TypeVar('Self', bound='BigQueryRelation')


@dataclass(frozen=True, eq=False, repr=False)
Expand Down Expand Up @@ -40,3 +44,33 @@ def project(self):
@property
def dataset(self):
return self.schema

def information_schema(self: Self, identifier=None) -> Self:
# BigQuery (usually) addresses information schemas at the dataset
# level. This method overrides the BaseRelation method to return an
# Information Schema relation as project.dataset.information_schem

include_policy = self.include_policy.replace(
database=self.database is not None,
schema=self.schema is not None,
identifier=True
)

# Quote everything on BigQuery -- identifiers are case-sensitive,
# even when quoted.
quote_policy = self.quote_policy.replace(
database=True,
schema=True,
identifier=True,
)

path = self.path.replace(
schema=self.schema,
identifier='INFORMATION_SCHEMA'
)

return self.replace(
quote_policy=quote_policy,
include_policy=include_policy,
path=path,
)
Loading