Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/bq incremental and archive #856

Merged
merged 12 commits into from
Jul 19, 2018
74 changes: 69 additions & 5 deletions dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from dbt.logger import GLOBAL_LOGGER as logger

import google.auth
import google.api_core
import google.oauth2
import google.cloud.exceptions
import google.cloud.bigquery
Expand All @@ -30,13 +31,18 @@ class BigQueryAdapter(PostgresAdapter):
# deprecated -- use versions that take relations instead
"query_for_existing",
"execute_model",
"create_temporary_table",
"drop",
"execute",
"quote_schema_and_table",
"make_date_partitioned_table",
"already_exists",
"expand_target_column_types",
"load_dataframe",
"get_missing_columns",

"create_schema",
"alter_table_add_column",

# versions of adapter functions that take / return Relations
"list_relations",
Expand Down Expand Up @@ -183,7 +189,6 @@ def close(cls, connection):
@classmethod
def list_relations(cls, profile, project_cfg, schema, model_name=None):
connection = cls.get_connection(profile, model_name)
credentials = connection.get('credentials', {})
client = connection.get('handle')

bigquery_dataset = cls.get_dataset(
Expand All @@ -201,6 +206,15 @@ def list_relations(cls, profile, project_cfg, schema, model_name=None):
# won't need to do this
max_results=100000)

# This will 404 if the dataset does not exist. It's lazily
# evaluated, so wrap it in a list to throw _now_. This behavior
# mirrors the implementation of list_relations for other adapters

try:
all_tables = list(all_tables)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you considered consuming the iterator here by moving the list comprehension up? As it is this is going to make two copies of each table (one BQ API result and one Relation). If we're actually pulling up to 100k entries, doubling that could be a lot!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think 100k would be super excessive! But agree, happy to slide the comprehension up into the try/catch

except google.api_core.exceptions.NotFound as e:
all_tables = []

return [cls.bq_table_to_relation(table) for table in all_tables]

@classmethod
Expand Down Expand Up @@ -357,7 +371,7 @@ def execute_model(cls, profile, project_cfg, model,
return res

@classmethod
def execute(cls, profile, sql, model_name=None, fetch=False, **kwargs):
def raw_execute(cls, profile, sql, model_name=None, fetch=False, **kwargs):
conn = cls.get_connection(profile, model_name)
client = conn.get('handle')

Expand All @@ -372,6 +386,49 @@ def execute(cls, profile, sql, model_name=None, fetch=False, **kwargs):
with cls.exception_handler(profile, sql, model_name):
iterator = query_job.result()

return query_job, iterator

@classmethod
def create_temporary_table(cls, profile, project, sql, model_name=None,
**kwargs):

# BQ queries always return a temp table with their results
query_job, _ = cls.raw_execute(profile, sql, model_name)
bq_table = query_job.destination

return cls.Relation.create(
project=bq_table.project,
schema=bq_table.dataset_id,
identifier=bq_table.table_id,
quote_policy={
'schema': True,
'identifier': True
},
type=BigQueryRelation.Table)

@classmethod
def alter_table_add_column(cls, profile, project, relation, column,
model_name=None):

conn = cls.get_connection(profile, model_name)
client = conn.get('handle')

dataset = cls.get_dataset(profile, project, relation.schema,
model_name)

table_ref = dataset.table(relation.name)
table = client.get_table(table_ref)

new_schema = table.schema + [column.to_bq_schema_object()]

new_table = google.cloud.bigquery.Table(table_ref, schema=new_schema)
client.update_table(new_table, ['schema'])

@classmethod
def execute(cls, profile, sql, model_name=None, fetch=None, **kwargs):
_, iterator = cls.raw_execute(profile, sql, model_name, fetch,
**kwargs)

if fetch:
res = cls.get_table_from_response(iterator)
else:
Expand Down Expand Up @@ -410,8 +467,13 @@ def create_schema(cls, profile, project_cfg, schema, model_name=None):
client = conn.get('handle')

dataset = cls.get_dataset(profile, project_cfg, schema, model_name)
with cls.exception_handler(profile, 'create dataset', model_name):
client.create_dataset(dataset)

# Emulate 'create schema if not exists ...'
try:
client.get_dataset(dataset)
except google.api_core.exceptions.NotFound:
with cls.exception_handler(profile, 'create dataset', model_name):
client.create_dataset(dataset)

@classmethod
def drop_tables_in_schema(cls, profile, project_cfg, dataset):
Expand Down Expand Up @@ -468,8 +530,10 @@ def get_columns_in_table(cls, profile, project_cfg,

columns = []
for col in table_schema:
# BigQuery returns type labels that are not valid type specifiers
dtype = cls.Column.translate_type(col.field_type)
column = cls.Column(
col.name, col.field_type, col.fields, col.mode)
col.name, dtype, col.fields, col.mode)
columns.append(column)

return columns
Expand Down
2 changes: 1 addition & 1 deletion dbt/adapters/default/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,7 @@ def already_exists(cls, profile, project_cfg,

@classmethod
def quote(cls, identifier):
return '"{}"'.format(identifier.replace('"', '""'))
return '"{}"'.format(identifier)

@classmethod
def quote_schema_and_table(cls, profile, project_cfg,
Expand Down
4 changes: 4 additions & 0 deletions dbt/include/global_project/macros/adapters/bigquery.sql
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,7 @@
{{ sql }}
);
{% endmacro %}

{% macro bigquery__create_schema(schema_name) %}
{{ adapter.create_schema(schema_name) }}
{% endmacro %}
8 changes: 7 additions & 1 deletion dbt/include/global_project/macros/adapters/common.sql
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,13 @@
{%- endmacro %}

{% macro create_schema(schema_name) %}
create schema if not exists {{ schema_name }};
{{ adapter_macro('create_schema', schema_name) }}
{% endmacro %}

{% macro default__create_schema(schema_name) %}
{% call statement() %}
create schema if not exists {{ schema_name }};
{% endcall %}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get rid of this create_schema macro, use adapter.create_schema instead

{% endmacro %}


Expand Down
Loading