Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rewrite materializations with statement block #466

Merged
merged 43 commits into from
Jul 21, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
6da8662
first pass
Jun 21, 2017
7a632c6
fix table for tests. add view
Jun 21, 2017
a41e184
incremental
Jun 21, 2017
6cc5ad3
capture statement results
Jun 24, 2017
f6c3c19
remove extraneous log.info statement in node_runners.py
Jun 24, 2017
3770897
remove irrelevant unit test (test_runner.py)
Jun 24, 2017
8be612b
bigquery working
Jun 26, 2017
a0c67bf
fix bq
Jun 26, 2017
e2b2d6b
fix unit
Jun 26, 2017
9d21d97
macro mania
Jun 27, 2017
b671307
integration tests passing
Jun 27, 2017
d19c271
fix unit tests
Jun 29, 2017
61ace92
combine compiler get_compiler_context and wrapper get_materialization…
Jun 29, 2017
b618727
remove dbt.wrapper fo real
Jun 29, 2017
7258665
cleanup
Jun 30, 2017
f1973cb
more cleanup
Jun 30, 2017
2746df7
dry up dbwrapper
Jun 30, 2017
da4ea7a
more cleanup, allow adapters to define their own context functions
Jun 30, 2017
01f6457
archive as materialization
Jul 3, 2017
7078fb6
cleanup: remove dbt/targets.py, unused utils
Jul 3, 2017
e22c218
cleanup: remove unused vars in compilation, linker, model, schema, pa…
Jul 3, 2017
7cbc924
materializations w custom configs
Jul 6, 2017
3e64775
fix tests, isolate dist/sort to redshift adapter
Jul 6, 2017
ca783ba
archive
Jul 14, 2017
1f062f3
re-add env_var
Jul 14, 2017
7a3de02
Merge branch 'rewrite-materializations-1' of github.com:fishtown-anal…
drewbanin Jul 16, 2017
d62486e
remove log stmt
Jul 17, 2017
ddfdbc0
schema tests in packages
Jul 18, 2017
f6cdf63
fix context merging issue
Jul 19, 2017
75896e5
state is a pain
Jul 19, 2017
17661a6
cleanup, ok to use ref and var in macros
Jul 19, 2017
3f60e0a
fix all codeclimates
Jul 19, 2017
403bbdd
another round of codeclimates
Jul 19, 2017
876417d
Merge branch 'rewrite-materializations-1' of github.com:fishtown-anal…
drewbanin Jul 19, 2017
10b3dbb
Fix for missing config error, sort updates
drewbanin Jul 19, 2017
a9d7ee7
default data
Jul 19, 2017
a99fbfe
Merge branch 'rewrite-materializations-1' of github.com:fishtown-anal…
Jul 19, 2017
e1bcf95
add exceptions to context, use them for bigquery
Jul 19, 2017
ae848c3
do not translate jinja syntax errors (i.e. print the whole thing)
Jul 19, 2017
8a1698b
default to compound sortkey
drewbanin Jul 20, 2017
ef06431
make statement a macro
Jul 20, 2017
fc54094
log is a fn, not a block
Jul 20, 2017
4f207d4
fix test
Jul 20, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 7 additions & 10 deletions dbt/adapters/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

import dbt.exceptions
import dbt.flags as flags
import dbt.materializers
import dbt.clients.gcloud

from dbt.adapters.postgres import PostgresAdapter
Expand All @@ -19,15 +18,13 @@

class BigQueryAdapter(PostgresAdapter):

QUERY_TIMEOUT = 60 * 1000
context_functions = [
"query_for_existing",
"drop_view",
"execute_model",
]

@classmethod
def get_materializer(cls, node, existing):
materializer = dbt.materializers.NonDDLMaterializer
return dbt.materializers.make_materializer(materializer,
cls,
node,
existing)
QUERY_TIMEOUT = 60 * 1000

@classmethod
def handle_error(cls, error, message, sql):
Expand Down Expand Up @@ -171,7 +168,7 @@ def format_sql_for_bigquery(cls, sql):
return "#standardSQL\n{}".format(sql)

@classmethod
def execute_model(cls, profile, model):
def execute_model(cls, profile, model, model_name=None):
connection = cls.get_connection(profile, model.get('name'))

if flags.STRICT_MODE:
Expand Down
102 changes: 29 additions & 73 deletions dbt/adapters/default.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
import copy
import itertools
import multiprocessing
import re
import time
import yaml

from contextlib import contextmanager

import dbt.exceptions
import dbt.flags
import dbt.materializers

from dbt.contracts.connection import validate_connection
from dbt.logger import GLOBAL_LOGGER as logger
Expand All @@ -24,9 +22,22 @@ class DefaultAdapter(object):

requires = {}

@classmethod
def get_materializer(cls, model, existing):
return dbt.materializers.get_materializer(cls, model, existing)
context_functions = [
"already_exists",
"get_columns_in_table",
"get_missing_columns",
"query_for_existing",
"rename",
"drop",
"truncate",
"add_query",
"expand_target_column_types",
]

raw_functions = [
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the difference between a raw func and a context func?

"get_status",
"get_result_from_cursor"
]

###
# ADAPTER-SPECIFIC FUNCTIONS -- each of these must be overridden in
Expand All @@ -49,16 +60,6 @@ def date_function(cls):
raise dbt.exceptions.NotImplementedException(
'`date_function` is not implemented for this adapter!')

@classmethod
def dist_qualifier(cls):
raise dbt.exceptions.NotImplementedException(
'`dist_qualifier` is not implemented for this adapter!')

@classmethod
def sort_qualifier(cls):
raise dbt.exceptions.NotImplementedException(
'`sort_qualifier` is not implemented for this adapter!')

@classmethod
def get_status(cls, cursor):
raise dbt.exceptions.NotImplementedException(
Expand Down Expand Up @@ -88,6 +89,18 @@ def cancel_connection(cls, project, connection):
###
# FUNCTIONS THAT SHOULD BE ABSTRACT
###
@classmethod
def get_result_from_cursor(cls, cursor):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpicky, but this probably doesn't belong under FUNCTIONS THAT SHOULD BE ABSTRACT right?

data = []

if cursor.description is not None:
column_names = [col[0] for col in cursor.description]
raw_results = cursor.fetchall()
data = [dict(zip(column_names, row))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could so readily be a DataFrame. Don't think we're there yet... but this is great

for row in raw_results]

return data

@classmethod
def drop(cls, profile, relation, relation_type, model_name=None):
if relation_type == 'view':
Expand Down Expand Up @@ -144,35 +157,6 @@ def rename(cls, profile, from_name, to_name, model_name=None):
def is_cancelable(cls):
return True

@classmethod
def execute_model(cls, profile, model):
parts = re.split(r'-- (DBT_OPERATION .*)', model.get('wrapped_sql'))

for i, part in enumerate(parts):
matches = re.match(r'^DBT_OPERATION ({.*})$', part)
if matches is not None:
instruction_string = matches.groups()[0]
instruction = yaml.safe_load(instruction_string)
function = instruction['function']
kwargs = instruction['args']

def call_expand_target_column_types(kwargs):
kwargs.update({'profile': profile,
'model_name': model.get('name')})
return cls.expand_target_column_types(**kwargs)

func_map = {
'expand_column_types_if_needed':
call_expand_target_column_types
}

func_map[function](kwargs)
else:
connection, cursor = cls.add_query(
profile, part, model.get('name'))

return cls.get_status(cursor)

@classmethod
def get_missing_columns(cls, profile,
from_schema, from_table,
Expand Down Expand Up @@ -259,27 +243,6 @@ def get_drop_schema_sql(cls, schema):
return ('drop schema if exists "{schema} cascade"'
.format(schema=schema))

@classmethod
def get_create_table_sql(cls, schema, table, columns, sort, dist):
fields = ['"{field}" {data_type}'.format(
field=column.name, data_type=column.data_type
) for column in columns]
fields_csv = ",\n ".join(fields)
dist = cls.dist_qualifier(dist)
sort = cls.sort_qualifier('compound', sort)
sql = """
create table if not exists "{schema}"."{table}" (
{fields}
)
{dist} {sort}
""".format(
schema=schema,
table=table,
fields=fields_csv,
sort=sort,
dist=dist).strip()
return sql

###
# ODBC FUNCTIONS -- these should not need to change for every adapter,
# although some adapters may override them
Expand Down Expand Up @@ -595,13 +558,6 @@ def drop_schema(cls, profile, schema, model_name=None):
sql = cls.get_drop_schema_sql(schema)
return cls.add_query(profile, sql, model_name)

@classmethod
def create_table(cls, profile, schema, table, columns, sort, dist,
model_name=None):
logger.debug('Creating table "%s"."%s".', schema, table)
sql = cls.get_create_table_sql(schema, table, columns, sort, dist)
return cls.add_query(profile, sql, model_name)

@classmethod
def table_exists(cls, profile, schema, table, model_name=None):
tables = cls.query_for_existing(profile, schema, model_name)
Expand Down
8 changes: 0 additions & 8 deletions dbt/adapters/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,6 @@ def type(cls):
def date_function(cls):
return 'datenow()'

@classmethod
def dist_qualifier(cls, dist):
return ''

@classmethod
def sort_qualifier(cls, sort_type, sort):
return ''

@classmethod
def get_status(cls, cursor):
return cursor.statusmessage
Expand Down
31 changes: 0 additions & 31 deletions dbt/adapters/redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,37 +18,6 @@ def type(cls):
def date_function(cls):
return 'getdate()'

@classmethod
def dist_qualifier(cls, dist):
dist_key = dist.strip().lower()

if dist_key in ['all', 'even']:
return 'diststyle {}'.format(dist_key)
else:
return 'diststyle key distkey("{}")'.format(dist_key)

@classmethod
def sort_qualifier(cls, sort_type, sort):
valid_sort_types = ['compound', 'interleaved']
if sort_type not in valid_sort_types:
raise RuntimeError(
"Invalid sort_type given: {} -- must be one of {}"
.format(sort_type, valid_sort_types)
)

if isinstance(sort, basestring):
sort_keys = [sort]
else:
sort_keys = sort

formatted_sort_keys = ['"{}"'.format(sort_key)
for sort_key in sort_keys]
keys_csv = ', '.join(formatted_sort_keys)

return "{sort_type} sortkey({keys_csv})".format(
sort_type=sort_type, keys_csv=keys_csv
)

@classmethod
def drop(cls, profile, relation, relation_type, model_name=None):
global drop_lock
Expand Down
10 changes: 0 additions & 10 deletions dbt/adapters/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,16 +132,6 @@ def rename(cls, profile, from_name, to_name, model_name=None):

connection, cursor = cls.add_query(profile, sql, model_name)

@classmethod
def execute_model(cls, profile, model):
connection = cls.get_connection(profile, model.get('name'))

if flags.STRICT_MODE:
validate_connection(connection)

return super(PostgresAdapter, cls).execute_model(
profile, model)

@classmethod
def add_begin_query(cls, profile, name):
return cls.add_query(profile, 'BEGIN', name, auto_begin=False,
Expand Down
Loading