Skip to content

Commit

Permalink
Feature/bq improvements (#507)
Browse files Browse the repository at this point in the history
* request gdrive scope

* re-add missing lines?

* whitespace

* remove junk

* bq tables working

* optionally override query timeout

* bump requirements.txt

* pep8

* don't mutate global state
  • Loading branch information
drewbanin authored Aug 29, 2017
1 parent 6762fc2 commit 7a5670b
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 41 deletions.
122 changes: 86 additions & 36 deletions dbt/adapters/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,23 @@
import google.cloud.exceptions
import google.cloud.bigquery

import time
import uuid


class BigQueryAdapter(PostgresAdapter):

context_functions = [
"query_for_existing",
"drop_view",
"execute_model",
"drop",
]

QUERY_TIMEOUT = 60 * 1000
SCOPE = ('https://www.googleapis.com/auth/bigquery',
'https://www.googleapis.com/auth/cloud-platform',
'https://www.googleapis.com/auth/drive')

QUERY_TIMEOUT = 300

@classmethod
def handle_error(cls, error, message, sql):
Expand Down Expand Up @@ -81,12 +88,12 @@ def get_bigquery_credentials(cls, config):
creds = google.oauth2.service_account.Credentials

if method == 'oauth':
credentials, project_id = google.auth.default()
credentials, project_id = google.auth.default(scopes=cls.SCOPE)
return credentials

elif method == 'service-account':
keyfile = config.get('keyfile')
return creds.from_service_account_file(keyfile)
return creds.from_service_account_file(keyfile, scopes=cls.SCOPE)

elif method == 'service-account-json':
details = config.get('keyfile_json')
Expand Down Expand Up @@ -141,7 +148,8 @@ def query_for_existing(cls, profile, schema, model_name=None):

relation_type_lookup = {
'TABLE': 'table',
'VIEW': 'view'
'VIEW': 'view',
'EXTERNAL': 'external'
}

existing = [(table.name, relation_type_lookup.get(table.table_type))
Expand All @@ -150,56 +158,98 @@ def query_for_existing(cls, profile, schema, model_name=None):
return dict(existing)

@classmethod
def drop_view(cls, profile, view_name, model_name):
def drop(cls, profile, relation, relation_type, model_name=None):
schema = cls.get_default_schema(profile)
dataset = cls.get_dataset(profile, schema, model_name)
view = dataset.table(view_name)
view.delete()
relation_object = dataset.table(relation)
relation_object.delete()

@classmethod
def rename(cls, profile, from_name, to_name, model_name=None):
raise dbt.exceptions.NotImplementedException(
'`rename` is not implemented for this adapter!')

# Hack because of current API limitations. We should set a flag on the
# Table object indicating StandardSQL when it's implemented
# https://github.com/GoogleCloudPlatform/google-cloud-python/issues/3388
@classmethod
def format_sql_for_bigquery(cls, sql):
return "#standardSQL\n{}".format(sql)
def get_timeout(cls, conn):
credentials = conn['credentials']
return credentials.get('timeout_seconds', cls.QUERY_TIMEOUT)

@classmethod
def execute_model(cls, profile, model, model_name=None):
connection = cls.get_connection(profile, model.get('name'))

if flags.STRICT_MODE:
validate_connection(connection)
def materialize_as_view(cls, profile, dataset, model_name, model_sql):
view = dataset.table(model_name)
view.view_query = model_sql
view.view_use_legacy_sql = False

model_name = model.get('name')
model_sql = cls.format_sql_for_bigquery(model.get('injected_sql'))
logger.debug("Model SQL ({}):\n{}".format(model_name, model_sql))

materialization = dbt.utils.get_materialization(model)
allowed_materializations = ['view', 'ephemeral']
with cls.exception_handler(profile, model_sql, model_name, model_name):
view.create()

if materialization not in allowed_materializations:
msg = "Unsupported materialization: {}".format(materialization)
if view.created is None:
msg = "Error creating view {}".format(model_name)
raise dbt.exceptions.RuntimeException(msg)

schema = cls.get_default_schema(profile)
dataset = cls.get_dataset(profile, schema, model_name)
return "CREATE VIEW"

view = dataset.table(model_name)
view.view_query = model_sql
@classmethod
def poll_until_job_completes(cls, job, timeout):
retry_count = timeout

while retry_count > 0 and job.state != 'DONE':
retry_count -= 1
time.sleep(1)
job.reload()

if job.state != 'DONE':
raise dbt.exceptions.RuntimeException("BigQuery Timeout Exceeded")

elif job.error_result:
raise job.exception()

@classmethod
def materialize_as_table(cls, profile, dataset, model_name, model_sql):
conn = cls.get_connection(profile, model_name)
client = conn.get('handle')

table = dataset.table(model_name)
job_id = 'dbt-create-{}-{}'.format(model_name, uuid.uuid4())
job = client.run_async_query(job_id, model_sql)
job.use_legacy_sql = False
job.destination = table
job.write_disposition = 'WRITE_TRUNCATE'
job.begin()

logger.debug("Model SQL ({}):\n{}".format(model_name, model_sql))

with cls.exception_handler(profile, model_sql, model_name, model_name):
view.create()
cls.poll_until_job_completes(job, cls.get_timeout(conn))

if view.created is None:
raise RuntimeError("Error creating view {}".format(model_name))
return "CREATE TABLE"

return "CREATE VIEW"
@classmethod
def execute_model(cls, profile, model, materialization, model_name=None):
connection = cls.get_connection(profile, model.get('name'))

if flags.STRICT_MODE:
validate_connection(connection)

model_name = model.get('name')
model_sql = model.get('injected_sql')

schema = cls.get_default_schema(profile)
dataset = cls.get_dataset(profile, schema, model_name)

if materialization == 'view':
res = cls.materialize_as_view(profile, dataset, model_name,
model_sql)
elif materialization == 'table':
res = cls.materialize_as_table(profile, dataset, model_name,
model_sql)
else:
msg = "Invalid relation type: '{}'".format(materialization)
raise dbt.exceptions.RuntimeException(msg, model)

return res

@classmethod
def fetch_query_results(cls, query):
Expand All @@ -220,12 +270,12 @@ def execute_and_fetch(cls, profile, sql, model_name=None, **kwargs):
conn = cls.get_connection(profile, model_name)
client = conn.get('handle')

formatted_sql = cls.format_sql_for_bigquery(sql)
query = client.run_sync_query(formatted_sql)
query.timeout_ms = cls.QUERY_TIMEOUT
query = client.run_sync_query(sql)
query.timeout_ms = cls.get_timeout(conn) * 1000
query.use_legacy_sql = False

debug_message = "Fetching data for query {}:\n{}"
logger.debug(debug_message.format(model_name, formatted_sql))
logger.debug(debug_message.format(model_name, sql))

query.run()

Expand Down
1 change: 1 addition & 0 deletions dbt/contracts/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
Required('schema'): basestring,
Optional('keyfile'): basestring,
Optional('keyfile_json'): object,
Optional('timeout_seconds'): int,
})

credentials_mapping = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,31 @@
{%- set existing = adapter.query_for_existing(schema) -%}
{%- set existing_type = existing.get(identifier) -%}

-- setup
{%- if existing_type is not none -%}
{{ adapter.drop(identifier, existing_type) }}
{%- endif -%}

-- build model
{% set result = adapter.execute_model(model) %}
{% set result = adapter.execute_model(model, 'view') %}
{{ store_result('main', status=result) }}

{%- endmaterialization %}

{% materialization table, adapter='bigquery' -%}

{{ exceptions.materialization_not_available(model, 'bigquery') }}
{%- set identifier = model['name'] -%}
{%- set tmp_identifier = identifier + '__dbt_tmp' -%}
{%- set non_destructive_mode = (flags.NON_DESTRUCTIVE == True) -%}
{%- set existing = adapter.query_for_existing(schema) -%}
{%- set existing_type = existing.get(identifier) -%}

{%- if existing_type is not none -%}
{{ adapter.drop(identifier, existing_type) }}
{%- endif -%}

-- build model
{% set result = adapter.execute_model(model, 'table') %}
{{ store_result('main', status=result) }}

{% endmaterialization %}

Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ celery==3.1.23
voluptuous==0.10.5
snowflake-connector-python==1.4.0
colorama==0.3.9
google-cloud-bigquery==0.24.0
google-cloud-bigquery==0.26.0
pyasn1==0.2.3
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
'voluptuous==0.10.5',
'snowflake-connector-python>=1.3.16',
'colorama==0.3.9',
'google-cloud-bigquery==0.24.0',
'google-cloud-bigquery==0.26.0',
'pyasn1==0.2.3',
]
)

0 comments on commit 7a5670b

Please sign in to comment.