Skip to content

Commit

Permalink
(#1576) use the information schema on BigQuery
Browse files Browse the repository at this point in the history
  • Loading branch information
drewbanin committed Oct 15, 2019
1 parent 6287d6d commit 4c624d0
Show file tree
Hide file tree
Showing 4 changed files with 266 additions and 150 deletions.
153 changes: 29 additions & 124 deletions plugins/bigquery/dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import copy

import dbt.deprecations
import dbt.exceptions
import dbt.flags as flags
Expand All @@ -25,6 +23,9 @@
import agate


GET_CATALOG_MACRO_NAME = 'get_catalog'


def _stub_relation(*args, **kwargs):
return BigQueryRelation.create(
database='',
Expand Down Expand Up @@ -387,127 +388,31 @@ def load_dataframe(self, database, schema, table_name, agate_table,
with self.connections.exception_handler("LOAD TABLE"):
self.poll_until_job_completes(job, timeout)

###
# The get_catalog implementation for bigquery
###
def _flat_columns_in_table(self, table):
"""An iterator over the flattened columns for a given schema and table.
Resolves child columns as having the name "parent.child".
"""
for col in self._get_dbt_columns_from_bq_table(table):
yield from col.flatten()

@classmethod
def _get_stats_column_names(cls):
"""Construct a tuple of the column names for stats. Each stat has 4
columns of data.
"""
columns = []
stats = ('num_bytes', 'num_rows', 'location', 'partitioning_type',
'clustering_fields')
stat_components = ('label', 'value', 'description', 'include')
for stat_id in stats:
for stat_component in stat_components:
columns.append('stats:{}:{}'.format(stat_id, stat_component))
return tuple(columns)

@classmethod
def _get_stats_columns(cls, table, relation_type):
"""Given a table, return an iterator of key/value pairs for stats
column names/values.
"""
column_names = cls._get_stats_column_names()

# agate does not handle the array of column names gracefully
clustering_value = None
if table.clustering_fields is not None:
clustering_value = ','.join(table.clustering_fields)
# cast num_bytes/num_rows to str before they get to agate, or else
# agate will incorrectly decide they are booleans.
column_values = (
'Number of bytes',
str(table.num_bytes),
'The number of bytes this table consumes',
relation_type == 'table',

'Number of rows',
str(table.num_rows),
'The number of rows in this table',
relation_type == 'table',

'Location',
table.location,
'The geographic location of this table',
True,

'Partitioning Type',
table.partitioning_type,
'The partitioning type used for this table',
relation_type == 'table',

'Clustering Fields',
clustering_value,
'The clustering fields for this table',
relation_type == 'table',
)
return zip(column_names, column_values)

def get_catalog(self, manifest):
connection = self.connections.get_thread_connection()
client = connection.handle

schemas = manifest.get_used_schemas()

column_names = (
'table_database',
'table_schema',
'table_name',
'table_type',
'table_comment',
# does not exist in bigquery, but included for consistency
'table_owner',
'column_name',
'column_index',
'column_type',
'column_comment',
)
all_names = column_names + self._get_stats_column_names()
columns = []
"""Get the catalog for this manifest by running the get catalog macro.
Returns an agate.Table of catalog information.
"""

for database_name, schema_name in schemas:
relations = self.list_relations(database_name, schema_name)
for relation in relations:

# This relation contains a subset of the info we care about.
# Fetch the full table object here
table_ref = self.connections.table_ref(
database_name,
relation.schema,
relation.identifier,
connection
)
table = client.get_table(table_ref)

flattened = self._flat_columns_in_table(table)
relation_stats = dict(self._get_stats_columns(table,
relation.type))

for index, column in enumerate(flattened, start=1):
column_data = (
relation.database,
relation.schema,
relation.name,
relation.type,
None,
None,
column.name,
index,
column.data_type,
None,
)
column_dict = dict(zip(column_names, column_data))
column_dict.update(copy.deepcopy(relation_stats))

columns.append(column_dict)

return dbt.clients.agate_helper.table_from_data(columns, all_names)
information_schemas = []
for database, schema in manifest.get_used_schemas():
information_schemas.append(self.Relation.create(
database=database,
schema=schema,
quote_policy={
'database': True,
'schema': True
}
))

# make it a list so macros can index into it.
kwargs = {'information_schemas': information_schemas}
table = self.execute_macro(GET_CATALOG_MACRO_NAME,
kwargs=kwargs,
release=True)

# BigQuery doesn't allow ":" chars in column names -- remap them here.
table = table.rename(column_names={
col.name: col.name.replace('__', ':') for col in table.columns
})

return self._catalog_filter_table(table, manifest)
202 changes: 202 additions & 0 deletions plugins/bigquery/dbt/include/bigquery/macros/catalog.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@

{% macro bigquery__get_catalog(information_schemas) -%}

{%- call statement('catalog', fetch_result=True) -%}
{% for information_schema in information_schemas %}
(
with tables as (
select
project_id as table_database,
dataset_id as table_schema,
table_id as original_table_name,

concat(project_id, '.', dataset_id, '.', table_id) as relation_id,

row_count,
size_bytes as size_bytes,
case
when type = 1 then 'table'
when type = 2 then 'view'
else concat('unknown (', cast(type as string), ')')
end as table_type,

REGEXP_CONTAINS(table_id, '^.+[0-9]{8}$') and type = 1 as is_date_shard,
REGEXP_EXTRACT(table_id, '^(.+)[0-9]{8}$') as shard_base_name,
REGEXP_EXTRACT(table_id, '^.+([0-9]{8})$') as shard_name

from {{ information_schema }}.__TABLES__

),

extracted as (

select *,
case
when is_date_shard then shard_base_name
else original_table_name
end as table_name

from tables

),

unsharded_tables as (

select
table_database,
table_schema,
table_name,
table_type,
is_date_shard,

struct(
min(shard_name) as shard_min,
max(shard_name) as shard_max,
count(*) as shard_count
) as table_shards,

sum(size_bytes) as size_bytes,
sum(row_count) as row_count,

max(relation_id) as relation_id

from extracted
group by 1,2,3,4,5

),

info_schema_columns as (

select
concat(table_catalog, '.', table_schema, '.', table_name) as relation_id,
table_catalog as table_database,
table_schema,
table_name,

-- use the "real" column name from the paths query below
column_name as base_column_name,
ordinal_position as column_index,
cast(null as string) as column_comment,

is_partitioning_column,
clustering_ordinal_position

from {{ information_schema }}.INFORMATION_SCHEMA.COLUMNS
where ordinal_position is not null

),

info_schema_column_paths as (

select
concat(table_catalog, '.', table_schema, '.', table_name) as relation_id,
field_path as column_name,
data_type as column_type,
column_name as base_column_name

from {{ information_schema }}.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS
where data_type not like 'STRUCT%'

),

columns as (

select * except (base_column_name)
from info_schema_columns
join info_schema_column_paths using (relation_id, base_column_name)

),

column_stats as (

select
table_database,
table_schema,
table_name,
max(relation_id) as relation_id,
max(case when is_partitioning_column = 'YES' then 1 else 0 end) = 1 as is_partitioned,
max(case when is_partitioning_column = 'YES' then column_name else null end) as partition_column,
max(case when clustering_ordinal_position is not null then 1 else 0 end) = 1 as is_clustered,
array_to_string(
array_agg(
case
when clustering_ordinal_position is not null then column_name
else null
end ignore nulls
order by clustering_ordinal_position
), ', '
) as clustering_columns

from columns
group by 1,2,3

)

select
unsharded_tables.table_database,
unsharded_tables.table_schema,
case
when is_date_shard then concat(unsharded_tables.table_name, '*')
else unsharded_tables.table_name
end as table_name,
unsharded_tables.table_type,

columns.column_name,
-- invent a row number to account for nested fields -- BQ does
-- not treat these nested properties as independent fields
row_number() over (
partition by relation_id
order by columns.column_index, columns.column_name
) as column_index,
columns.column_type,
columns.column_comment,

'Shard count' as `stats__date_shards__label`,
table_shards.shard_count as `stats__date_shards__value`,
'The number of date shards in this table' as `stats__date_shards__description`,
is_date_shard as `stats__date_shards__include`,

'Shard (min)' as `stats__date_shard_min__label`,
table_shards.shard_min as `stats__date_shard_min__value`,
'The first date shard in this table' as `stats__date_shard_min__description`,
is_date_shard as `stats__date_shard_min__include`,

'Shard (max)' as `stats__date_shard_max__label`,
table_shards.shard_max as `stats__date_shard_max__value`,
'The last date shard in this table' as `stats__date_shard_max__description`,
is_date_shard as `stats__date_shard_max__include`,

'# Rows' as `stats__num_rows__label`,
row_count as `stats__num_rows__value`,
'Approximate count of rows in this table' as `stats__num_rows__description`,
(unsharded_tables.table_type = 'table') as `stats__num_rows__include`,

'Approximate Size' as `stats__num_bytes__label`,
size_bytes as `stats__num_bytes__value`,
'Approximate size of table as reported by BigQuery' as `stats__num_bytes__description`,
(unsharded_tables.table_type = 'table') as `stats__num_bytes__include`,

'Partitioned By' as `stats__partitioning_type__label`,
partition_column as `stats__partitioning_type__value`,
'The partitioning column for this table' as `stats__partitioning_type__description`,
is_partitioned as `stats__partitioning_type__include`,

'Clustered By' as `stats__clustering_fields__label`,
clustering_columns as `stats__clustering_fields__value`,
'The clustering columns for this table' as `stats__clustering_fields__description`,
is_clustered as `stats__clustering_fields__include`

-- join using relation_id (an actual relation, not a shard prefix) to make
-- sure that column metadata is picked up through the join. This will only
-- return the column information for the "max" table in a date-sharded table set
from unsharded_tables
left join columns using (relation_id)
left join column_stats using (relation_id)
)

{% if not loop.last %} union all {% endif %}
{% endfor %}
{%- endcall -%}
{{ return(load_result('catalog').table) }}

{% endmacro %}
Loading

0 comments on commit 4c624d0

Please sign in to comment.