Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Operation result #774

Merged
merged 43 commits into from
May 21, 2018
Merged
Show file tree
Hide file tree
Changes from 40 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
e09f8bf
First pass, just dump generated catalog data to stdout
jwerderits May 8, 2018
89ad5d6
Actually write structured data, write it to disk
jwerderits May 9, 2018
2ae45ab
Fix release_connection issues in dbt catalog generate
jwerderits May 9, 2018
8cb3091
clean up TODOs
jwerderits May 9, 2018
da107f3
Fix bugs, add unit tests
jwerderits May 10, 2018
b29fe7f
ok, I guess accepting None is pretty important here
jwerderits May 10, 2018
1e95657
Finally get a (bad) integration test going
jwerderits May 10, 2018
612d88b
Cleanup the tests, verify the entire schema
jwerderits May 10, 2018
223a764
turn this into a constant
jwerderits May 10, 2018
b232d7f
Comment / import cleanup
jwerderits May 10, 2018
4f6cd0a
Add an operations config entry and an OperationLoader
jwerderits May 11, 2018
0c5e214
Rearranged stuff, added an OperationRunner that I'm not sure about
jwerderits May 11, 2018
7abb9d9
Add functions for operating on operations inside the graph
jwerderits May 11, 2018
0c62326
Fix wrong name
jwerderits May 11, 2018
512493f
Rearrange where the agate table -> dict conversion happens, add new r…
jwerderits May 11, 2018
9c74375
Add 'operation' type
jwerderits May 11, 2018
2db41c5
Add get_catalog macro with postgres implementation
jwerderits May 11, 2018
7d27c1b
make operations loader more macro-like rather than node-like
jwerderits May 11, 2018
2c47a31
Tell jinja to use the new OperationExtension
jwerderits May 11, 2018
ef22e70
Now instead of import errors I have ones I don't understand, which is…
jwerderits May 11, 2018
4f39ef6
pep8
jwerderits May 11, 2018
8ca876a
Remove RunManager to avoid compiling I hope
jwerderits May 12, 2018
e2c1cde
Make the parser pass the macro type along
jwerderits May 14, 2018
6f78e63
Allow operations in the macro subgraph
jwerderits May 14, 2018
fcbc9d7
I don't understand why, but my test passes now
jwerderits May 14, 2018
b26ac2c
Removed a bunch of TODOs/etc from development
jwerderits May 14, 2018
2622194
More todos/bad file adds
jwerderits May 14, 2018
1b03e36
Remove this method as it is now handled in sql/operations
jwerderits May 14, 2018
9ec5d70
forgot 'yield from' is py3 only
jwerderits May 14, 2018
f1a6461
Remove useless return statement, add a comment instead
jwerderits May 14, 2018
d0a7ee0
Add a note about resource type overlaps in parsed nodes/macros
jwerderits May 14, 2018
3d7ed8e
add newline
jwerderits May 14, 2018
55d98f7
Didn't need this, I guess
jwerderits May 14, 2018
9324a2c
Move some things around and break out some methods
jwerderits May 15, 2018
5216b51
More refactoring, I do not believe we need OperationRunner here
jwerderits May 15, 2018
726aaad
move operations into macros
jwerderits May 15, 2018
fda7013
first part of making the adapter responsible for this stuff
jwerderits May 16, 2018
15fe9a3
Make the adapter operate on an actual ParsedManifest instead of the f…
jwerderits May 16, 2018
f6fb149
Have result data key be a variable
jwerderits May 16, 2018
70da428
Make contexts accept ParsedMacro
jwerderits May 16, 2018
0dd2375
PR feedback
jwerderits May 17, 2018
9b432b8
Fix file renames
jwerderits May 17, 2018
04ffaba
update changelog
jwerderits May 21, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 43 additions & 1 deletion dbt/adapters/default/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ def acquire_connection(cls, profile, name):
lock.release()

@classmethod
def release_connection(cls, profile, name):
def release_connection(cls, profile, name='master'):
global connections_in_use, connections_available, lock

if connections_in_use.get(name) is None:
Expand Down Expand Up @@ -773,3 +773,45 @@ def convert_agate_type(cls, agate_table, col_idx):
for agate_cls, func in conversions:
if isinstance(agate_type, agate_cls):
return func(agate_table, col_idx)

###
# Operations involving the manifest
###
@classmethod
def run_operation(cls, profile, project_cfg, manifest, operation_name,
result_key):
"""Look the operation identified by operation_name up in the manifest
and run it.

Return an an AttrDict with three attributes: 'table', 'data', and
'status'. 'table' is an agate.Table.
"""
operation = manifest.find_operation_by_name(operation_name, None)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i believe you can pass "dbt" as the package name here instead of None


# This causes a reference cycle, as dbt.context.runtime.generate()
# ends up calling get_adapter, so the import has to be here.
import dbt.context.runtime
context = dbt.context.runtime.generate(
operation,
project_cfg,
manifest.to_flat_graph(),
)

# TODO: should I get the return value here in case future operations
# want to return some string? Jinja (I think) stringifies the results
# so it's not super useful. Status, I guess?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, i think this is fine as is. can you remove this comment?

operation.generator(context)()

# This is a lot of magic, have to know the magic name is 'catalog'.
# TODO: How can we make this part of the data set? Could we make it
# the operation's name/unique ID somehow instead?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment is out of date

result = context['load_result'](result_key)
return result

###
# Abstract methods involving the flat graph
###
@classmethod
def get_catalog(cls, profile, project_cfg, run_operation):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's some naming inconsistency here. the comment should read manifest instead of flat graph, run_operation should be manifest to be consistent with the underlying implementations

raise dbt.exceptions.NotImplementedException(
'`get_catalog` is not implemented for this adapter!')
14 changes: 14 additions & 0 deletions dbt/adapters/postgres/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@

from dbt.logger import GLOBAL_LOGGER as logger

GET_CATALOG_OPERATION_NAME = 'get_catalog_data'
GET_CATALOG_RESULT_KEY = 'catalog' # defined in get_catalog() macro


class PostgresAdapter(dbt.adapters.default.DefaultAdapter):

Expand Down Expand Up @@ -201,3 +204,14 @@ def convert_date_type(cls, agate_table, col_idx):
@classmethod
def convert_time_type(cls, agate_table, col_idx):
return "time"

@classmethod
def get_catalog(cls, profile, project_cfg, manifest):
results = cls.run_operation(profile, project_cfg, manifest,
GET_CATALOG_OPERATION_NAME,
GET_CATALOG_RESULT_KEY)

schemas = cls.get_existing_schemas(profile, project_cfg)
results = results.table.where(lambda r: r['table_schema'] in schemas)

return results
23 changes: 23 additions & 0 deletions dbt/clients/jinja.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,28 @@ def parse(self, parser):
return node


class OperationExtension(jinja2.ext.Extension):
tags = ['operation']

def parse(self, parser):
node = jinja2.nodes.Macro(lineno=next(parser.stream).lineno)
operation_name = \
parser.parse_assign_target(name_only=True).name

node.args = []
node.defaults = []

while parser.stream.skip_if('comma'):
target = parser.parse_assign_target(name_only=True)

node.name = dbt.utils.get_operation_macro_name(operation_name)

node.body = parser.parse_statements(('name:endoperation',),
drop_needle=True)

return node


def create_macro_capture_env(node):

class ParserMacroCapture(jinja2.Undefined):
Expand Down Expand Up @@ -142,6 +164,7 @@ def get_template(string, ctx, node=None, capture_macros=False):
args['undefined'] = create_macro_capture_env(node)

args['extensions'].append(MaterializationExtension)
args['extensions'].append(OperationExtension)

env = MacroFuzzEnvironment(**args)

Expand Down
2 changes: 2 additions & 0 deletions dbt/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@

if WHICH_PYTHON == 2:
basestring = basestring
bigint = long
else:
basestring = str
bigint = int


def to_unicode(s):
Expand Down
12 changes: 11 additions & 1 deletion dbt/context/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

from dbt.adapters.factory import get_adapter
from dbt.compat import basestring, to_string
from dbt.node_types import NodeType
from dbt.contracts.graph.parsed import ParsedMacro, ParsedNode

import dbt.clients.jinja
import dbt.clients.agate_helper
Expand Down Expand Up @@ -75,6 +77,8 @@ def _add_macros(context, model, flat_graph):
macros_to_add = {'global': [], 'local': []}

for unique_id, macro in flat_graph.get('macros', {}).items():
if macro.get('resource_type') != NodeType.Macro:
continue
package_name = macro.get('package_name')

macro_map = {
Expand Down Expand Up @@ -201,8 +205,14 @@ def __init__(self, model, context, overrides):
self.overrides = overrides

if isinstance(model, dict) and model.get('unique_id'):
local_vars = model.get('config', {}).get('vars')
local_vars = model.get('config', {}).get('vars', {})
self.model_name = model.get('name')
elif isinstance(model, ParsedMacro):
local_vars = {} # macros have no config
self.model_name = model.name
elif isinstance(model, ParsedNode):
local_vars = model.config.get('vars', {})
self.model_name = model.name
else:
# still used for wrapping
self.model_name = model.nice_name
Expand Down
28 changes: 27 additions & 1 deletion dbt/contracts/graph/parsed.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,10 @@
'maxLength': 127,
},
'resource_type': {
'enum': [NodeType.Macro],
'enum': [
NodeType.Macro,
NodeType.Operation,
],
},
'unique_id': {
'type': 'string',
Expand Down Expand Up @@ -344,6 +347,29 @@ def serialize(self):
'child_map': forward_edges,
}

def _find_by_name(self, name, package, subgraph, nodetype):
"""

Find a node by its given name in the appropraite sugraph.
"""
if subgraph == 'nodes':
search = self.nodes
elif subgraph == 'macros':
search = self.macros
else:
raise NotImplementedError(
'subgraph search for {} not implemented'.format(subgraph)
)
return dbt.utils.find_in_subgraph_by_name(
search,
name,
package,
nodetype)

def find_operation_by_name(self, name, package):
return self._find_by_name(name, package, 'macros',
[NodeType.Operation])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

love this


def to_flat_graph(self):
"""Convert the parsed manifest to the 'flat graph' that the compiler
expects.
Expand Down
3 changes: 3 additions & 0 deletions dbt/contracts/graph/unparsed.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@
NodeType.Model,
NodeType.Test,
NodeType.Analysis,
# Note: Hooks fail if you remove this, even though it's
# also allowed in ParsedMacro, which seems wrong.
# Maybe need to move hook operations into macros?
NodeType.Operation,
NodeType.Seed,
# we need this if parse_node is going to handle archives.
Expand Down
58 changes: 58 additions & 0 deletions dbt/include/global_project/macros/adapters/common.sql
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,61 @@
{{ column_list_for_create_table(columns) }}
);
{% endmacro %}


{% macro get_catalog() -%}
{{ adapter_macro('get_catalog') }}
{%- endmacro %}


{% macro default__get_catalog() -%}

{% set typename = adapter.type() %}
{% set msg -%}
get_catalog not implemented for {{ typename }}
{%- endset %}

{{ exceptions.raise_compiler_error(msg) }}
{% endmacro %}


{% macro postgres__get_catalog() -%}
{%- call statement('catalog', fetch_result=True) -%}
with tables as (
select
table_schema,
table_name,
table_type

from information_schema.tables

),

columns as (

select
table_schema,
table_name,
null as table_comment,

column_name,
ordinal_position as column_index,
data_type as column_type,
null as column_comment


from information_schema.columns

)

select *
from tables
join columns using (table_schema, table_name)

where table_schema != 'information_schema'
and table_schema not like 'pg_%'
{%- endcall -%}
{# There's no point in returning anything as the jinja macro stuff calls #}
{# str() on all returns. To get the results, you'll need to use #}
{# context['load_result']('catalog') #}
{%- endmacro %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{% operation get_catalog_data %}
{% set catalog = dbt.get_catalog() %}
{{ return(catalog) }}
{% endoperation %}
15 changes: 15 additions & 0 deletions dbt/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class GraphLoader(object):
@classmethod
def load_all(cls, root_project, all_projects):
macros = MacroLoader.load_all(root_project, all_projects)
macros.update(OperationLoader.load_all(root_project, all_projects))
nodes = {}
for loader in cls._LOADERS:
nodes.update(loader.load_all(root_project, all_projects, macros))
Expand Down Expand Up @@ -85,6 +86,20 @@ def load_project(cls, root_project, all_projects, project, project_name,
macros=macros)


class OperationLoader(ResourceLoader):

@classmethod
def load_project(cls, root_project, all_projects, project, project_name,
macros):
return dbt.parser.load_and_parse_macros(
package_name=project_name,
root_project=root_project,
all_projects=all_projects,
root_dir=project.get('project-root'),
relative_dirs=project.get('macro-paths', []),
resource_type=NodeType.Operation)


class AnalysisLoader(ResourceLoader):

@classmethod
Expand Down
10 changes: 10 additions & 0 deletions dbt/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import dbt.task.seed as seed_task
import dbt.task.test as test_task
import dbt.task.archive as archive_task
import dbt.task.generate as generate_task

import dbt.tracking
import dbt.config as config
Expand Down Expand Up @@ -414,6 +415,15 @@ def parse_args(args):
)
seed_sub.set_defaults(cls=seed_task.SeedTask, which='seed')

catalog_sub = subs.add_parser('catalog', parents=[base_subparser])
catalog_subs = catalog_sub.add_subparsers()
# it might look like catalog_sub is the correct parents entry, but that
# will cause weird errors about 'conflicting option strings'.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@drewbanin should this be docs? dbt catalog generate vs dbt docs generate?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I was thinking dbt docs generate here. Might be worth changing the GenerateTask in generate.py to DocsTask and docs.py?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, I'll fix the catalog -> docs.

Is this actually a "docs task"? I assume the point of having generate be a subcommand of docs is that there are other docs tasks planned, right? If not, maybe this could just be dbt docs?

generate_sub = catalog_subs.add_parser('generate',
parents=[base_subparser])
generate_sub.set_defaults(cls=generate_task.GenerateTask,
which='generate')

sub = subs.add_parser('test', parents=[base_subparser])
sub.add_argument(
'--data',
Expand Down
21 changes: 21 additions & 0 deletions dbt/node_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from dbt.exceptions import NotImplementedException
from dbt.utils import get_nodes_by_tags
from dbt.node_types import NodeType, RunHookType
from dbt.adapters.factory import get_adapter

import dbt.clients.jinja
import dbt.context.runtime
Expand Down Expand Up @@ -61,6 +62,26 @@ def skipped(self):
return self.skip


class RunOperationResult(RunModelResult):
def __init__(self, node, error=None, skip=False, status=None,
failed=None, execution_time=0, returned=None):
super(RunOperationResult, self).__init__(node, error, skip, status,
failed, execution_time)
self.returned = returned

@property
def errored(self):
return self.error is not None

@property
def failed(self):
return self.fail

@property
def skipped(self):
return self.skip
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks to me like you don't need to override errored, failed, or skipped



class BaseRunner(object):
print_header = True

Expand Down
Loading