Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add bigquery hooks (#779) #836

Merged
merged 20 commits into from
Jul 16, 2018
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
68015d8
Add a first attempt at pre/post hooks in bigquery
jwerderits Jul 6, 2018
7f65ce3
Add some tests that I assume won't/can't work
jwerderits Jul 6, 2018
1e0f0fd
Log the actual sql query in BigQuery when we have it instead of alway…
jwerderits Jul 9, 2018
ed191c2
Bring back add_query
jwerderits Jul 9, 2018
f842420
Have node_runners call execute(..., fetch=False) instead of execute_one
jwerderits Jul 9, 2018
9e0c8a1
improve integration test support for bigquery
jwerderits Jul 10, 2018
c3856d0
Split out bigquery tests into their own files
jwerderits Jul 10, 2018
31cf513
Re-order arguments, oops
jwerderits Jul 10, 2018
55f4b79
fix logger import issue
jwerderits Jul 10, 2018
a1e987d
Tests expect tuples, not dbt.schema.Column objects
jwerderits Jul 10, 2018
a1f2135
Some tests care about order here
jwerderits Jul 10, 2018
7ef42ac
For these tests, we want 'dtype', not 'data_type'
jwerderits Jul 10, 2018
83f3cd1
Merge branch 'development' into bigquery-hooks
jwerderits Jul 11, 2018
4f7644c
Make it so dbt can actually find the seed file so we can actually tes…
jwerderits Jul 11, 2018
e0b5112
Added checks to make sure the test actually does anything, fixed the …
jwerderits Jul 11, 2018
8d0bb7c
Update changelog
jwerderits Jul 13, 2018
db880ea
Merge branch 'development' into bigquery-hooks
jwerderits Jul 13, 2018
8440f01
Make seed pre-run hooks happen, add tests
jwerderits Jul 13, 2018
cdd6ed1
Add some random numbers so our tests don't collide
jwerderits Jul 13, 2018
fdd0306
Take up to an hour with no output on snowflake tests before failing
jwerderits Jul 16, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ def execute(cls, profile, sql, model_name=None, fetch=False, **kwargs):
query_job = client.query(sql, job_config)

# this blocks until the query has completed
with cls.exception_handler(profile, 'create dataset', model_name):
with cls.exception_handler(profile, sql, model_name):
iterator = query_job.result()

if fetch:
Expand All @@ -373,10 +373,15 @@ def get_table_from_response(cls, resp):
rows = [dict(row.items()) for row in resp]
return dbt.clients.agate_helper.table_from_data(rows, column_names)

# BigQuery doesn't support BEGIN/COMMIT, so stub these out.

@classmethod
def add_begin_query(cls, profile, name):
raise dbt.exceptions.NotImplementedException(
'`add_begin_query` is not implemented for this adapter!')
pass

@classmethod
def add_commit_query(cls, profile, name):
pass

@classmethod
def create_schema(cls, profile, project_cfg, schema, model_name=None):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
{% endif %}
{% endif %}

{{ run_hooks(pre_hooks) }}

{#
Since dbt uses WRITE_TRUNCATE mode for tables, we only need to drop this thing
if it is not a table. If it _is_ already a table, then we can overwrite it without downtime
Expand All @@ -71,5 +73,6 @@
{% endcall -%}
{% endif %}

{{ run_hooks(post_hooks) }}

{% endmaterialization %}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
identifier=identifier, schema=schema,
type='view') -%}

{{ run_hooks(pre_hooks) }}

-- drop if exists
{%- if old_relation is not none -%}
{%- if old_relation.is_table and not flags.FULL_REFRESH -%}
Expand All @@ -36,4 +38,6 @@
{%- endcall %}
{%- endif %}

{{ run_hooks(post_hooks) }}

{%- endmaterialization %}
4 changes: 2 additions & 2 deletions dbt/node_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,8 +326,8 @@ def run_hooks(cls, project, adapter, flat_graph, hook_type):
sql = hook_dict.get('sql', '')

if len(sql.strip()) > 0:
adapter.execute_one(profile, sql, model_name=model_name,
auto_begin=False)
adapter.execute(profile, sql, model_name=model_name,
auto_begin=False, fetch=False)

adapter.release_connection(profile, model_name)

Expand Down
30 changes: 30 additions & 0 deletions test/integration/014_hook_tests/macros/before-and-after-bq.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@

{% macro custom_run_hook_bq(state, target, run_started_at, invocation_id) %}

insert into {{ target.schema }}.on_run_hook (
state,
target_dbname,
target_host,
target_name,
target_schema,
target_type,
target_user,
target_pass,
target_threads,
run_started_at,
invocation_id
) VALUES (
'{{ state }}',
'{{ target.dbname }}',
'{{ target.host }}',
'{{ target.name }}',
'{{ target.schema }}',
'{{ target.type }}',
'{{ target.user }}',
'{{ target.pass }}',
{{ target.threads }},
'{{ run_started_at }}',
'{{ invocation_id }}'
)

{% endmacro %}
5 changes: 5 additions & 0 deletions test/integration/014_hook_tests/seed-models-bq/schema.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@

example_seed:
constraints:
not_null:
- a
18 changes: 18 additions & 0 deletions test/integration/014_hook_tests/seed_model_bigquery.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@

drop table if exists {schema}.on_model_hook;

create table {schema}.on_model_hook (
state STRING, -- start|end

target_dbname STRING,
target_host STRING,
target_name STRING,
target_schema STRING,
target_type STRING,
target_user STRING,
target_pass STRING,
target_threads INT64,

run_started_at STRING,
invocation_id STRING
);
18 changes: 18 additions & 0 deletions test/integration/014_hook_tests/seed_run_bigquery.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@

drop table if exists {schema}.on_run_hook;

create table {schema}.on_run_hook (
state STRING, -- start|end

target_dbname STRING,
target_host STRING,
target_name STRING,
target_schema STRING,
target_type STRING,
target_user STRING,
target_pass STRING,
target_threads INT64,

run_started_at STRING,
invocation_id STRING
);
8 changes: 5 additions & 3 deletions test/integration/014_hook_tests/test_model_hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@


class TestPrePostModelHooks(DBTIntegrationTest):

def setUp(self):
self.adapter_type = 'bigquery'
DBTIntegrationTest.setUp(self)

self.run_sql_file("test/integration/014_hook_tests/seed_model.sql")
Expand Down Expand Up @@ -178,6 +178,8 @@ def project_config(self):

@attr(type='postgres')
def test_hooks_on_seeds(self):
self.run_dbt(['seed'])
self.run_dbt(['test'])
res = self.run_dbt(['seed'])
self.assertEqual(len(res), 1, 'Expected exactly one item')
res = self.run_dbt(['test'])
self.assertEqual(len(res), 1, 'Expected exactly one item')

156 changes: 156 additions & 0 deletions test/integration/014_hook_tests/test_model_hooks_bq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
from nose.plugins.attrib import attr
from test.integration.base import DBTIntegrationTest

MODEL_PRE_HOOK = """
insert into {{this.schema}}.on_model_hook (
state,
target_name,
target_schema,
target_type,
target_threads,
run_started_at,
invocation_id
) VALUES (
'start',
'{{ target.name }}',
'{{ target.schema }}',
'{{ target.type }}',
{{ target.threads }},
'{{ run_started_at }}',
'{{ invocation_id }}'
)
"""


MODEL_POST_HOOK = """
insert into {{this.schema}}.on_model_hook (
state,
target_name,
target_schema,
target_type,
target_threads,
run_started_at,
invocation_id
) VALUES (
'end',
'{{ target.name }}',
'{{ target.schema }}',
'{{ target.type }}',
{{ target.threads }},
'{{ run_started_at }}',
'{{ invocation_id }}'
)
"""

class TestBigqueryPrePostModelHooks(DBTIntegrationTest):
def setUp(self):
DBTIntegrationTest.setUp(self)
self.use_profile('bigquery')
self.use_default_project()
self.run_sql_file("test/integration/014_hook_tests/seed_model_bigquery.sql")

self.fields = [
'state',
'target_name',
'target_schema',
'target_threads',
'target_type',
'run_started_at',
'invocation_id'
]

@property
def schema(self):
return "model_hooks_014"

@property
def profile_config(self):
profile = self.bigquery_profile()
profile['test']['outputs']['default2']['threads'] = 3
return profile

@property
def project_config(self):
return {
'macro-paths': ['test/integration/014_hook_tests/macros'],
'models': {
'test': {
'pre-hook': [MODEL_PRE_HOOK],

'post-hook':[MODEL_POST_HOOK]
}
}
}

@property
def models(self):
return "test/integration/014_hook_tests/models"

def get_ctx_vars(self, state):
field_list = ", ".join(self.fields)
query = "select {field_list} from `{schema}.on_model_hook` where state = '{state}'".format(field_list=field_list, schema=self.unique_schema(), state=state)

vals = self.run_sql(query, fetch='all')
self.assertFalse(len(vals) == 0, 'nothing inserted into hooks table')
self.assertFalse(len(vals) > 1, 'too many rows in hooks table')
ctx = dict(zip(self.fields, vals[0]))

return ctx

def check_hooks(self, state):
ctx = self.get_ctx_vars(state)

self.assertEqual(ctx['state'], state)
self.assertEqual(ctx['target_name'], 'default2')
self.assertEqual(ctx['target_schema'], self.unique_schema())
self.assertEqual(ctx['target_threads'], 3)
self.assertEqual(ctx['target_type'], 'bigquery')
self.assertTrue(ctx['run_started_at'] is not None and len(ctx['run_started_at']) > 0, 'run_started_at was not set')
self.assertTrue(ctx['invocation_id'] is not None and len(ctx['invocation_id']) > 0, 'invocation_id was not set')

@attr(type='bigquery')
def test_pre_and_post_model_hooks(self):
self.run_dbt(['run'])

self.check_hooks('start')
self.check_hooks('end')


class TestBigqueryPrePostModelHooksOnSeeds(DBTIntegrationTest):
def setUp(self):
DBTIntegrationTest.setUp(self)
self.use_profile('bigquery')
self.use_default_project()

@property
def schema(self):
return "model_hooks_014"

@property
def models(self):
return "test/integration/014_hook_tests/seed-models-bq"

@property
def project_config(self):
return {
'data-paths': ['test/integration/014_hook_tests/data'],
'models': {},
'seeds': {
'post-hook': [
'insert into {{ this }} (a, b, c) VALUES (10, 11, 12)',
]
}
}

@attr(type='bigquery')
def test_hooks_on_seeds(self):
res = self.run_dbt(['seed'])
self.assertEqual(len(res), 1, 'Expected exactly one item')
res = self.run_dbt(['test'])
self.assertEqual(len(res), 1, 'Expected exactly one item')
result = self.run_sql(
'select a, b, c from `{schema}`.`example_seed` where a = 10',
fetch='all'
)
self.assertFalse(len(result) == 0, 'nothing inserted into table by hook')
self.assertFalse(len(result) > 1, 'too many rows in table')
3 changes: 0 additions & 3 deletions test/integration/014_hook_tests/test_run_hooks.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from nose.plugins.attrib import attr
from test.integration.base import DBTIntegrationTest

import os.path

class TestPrePostRunHooks(DBTIntegrationTest):

def setUp(self):
Expand Down Expand Up @@ -88,6 +86,5 @@ def test_pre_and_post_run_hooks(self):
self.check_hooks('start')
self.check_hooks('end')


self.assertTableDoesNotExist("start_hook_order_test")
self.assertTableDoesNotExist("end_hook_order_test")
Loading