Skip to content

Commit

Permalink
Get latest changes. Merge branch 'dev/louisa-may-alcott' of https://g…
Browse files Browse the repository at this point in the history
  • Loading branch information
kconvey committed Nov 8, 2019
2 parents 563dfc1 + 1f1d100 commit 83de6b1
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 22 deletions.
42 changes: 33 additions & 9 deletions core/dbt/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,30 +266,54 @@ def process(self, record):


class NodeMetadata(logbook.Processor):
def __init__(self, node, node_index):
def __init__(self, node, index):
self.node = node
self.node_index = node_index
self.index = index
super().__init__()

def mapping_keys(self):
return []

def process_keys(self, record):
for attr, key in self.mapping_keys():
value = getattr(self.node, attr, None)
if value is not None:
record.extra[key] = value

def process(self, record):
keys = [
self.process_keys(record)
record.extra['node_index'] = self.index


class ModelMetadata(NodeMetadata):
def mapping_keys(self):
return [
('alias', 'node_alias'),
('schema', 'node_schema'),
('database', 'node_database'),
('name', 'node_name'),
('original_file_path', 'node_path'),
('name', 'node_name'),
('resource_type', 'resource_type'),
]
for attr, key in keys:
value = getattr(self.node, attr, None)
if value is not None:
record.extra[key] = value
record.extra['node_index'] = self.node_index

def process_config(self, record):
if hasattr(self.node, 'config'):
materialized = getattr(self.node.config, 'materialized', None)
if materialized is not None:
record.extra['node_materialized'] = materialized

def process(self, record):
super().process(record)
self.process_config(record)


class HookMetadata(NodeMetadata):
def mapping_keys(self):
return [
('name', 'node_name'),
('resource_type', 'resource_type'),
]


class TimestampNamed(JsonOnly):
def __init__(self, name: str):
Expand Down
2 changes: 1 addition & 1 deletion core/dbt/rpc/task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ def get_results_context(

with manifest_blocking:
yield
if RemoteMethodFlags.RequiresManifestReloadAfter:
if RemoteMethodFlags.RequiresManifestReloadAfter in flags:
manager.parse_manifest()


Expand Down
48 changes: 40 additions & 8 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
import functools
import time

from dbt.logger import GLOBAL_LOGGER as logger, TextOnly
from typing import List

from dbt.logger import (
GLOBAL_LOGGER as logger,
TextOnly,
HookMetadata,
UniqueID,
TimestampNamed,
DbtModelState,
)
from dbt.node_types import NodeType, RunHookType
from dbt.node_runners import ModelRunner

Expand All @@ -16,6 +24,7 @@
get_counts

from dbt.compilation import compile_node
from dbt.contracts.graph.parsed import ParsedHookNode
from dbt.task.compile import CompileTask
from dbt.utils import get_nodes_by_tags

Expand All @@ -40,18 +49,26 @@ def __exit__(self, exc_type, exc_value, exc_tracebck):


@functools.total_ordering
class BiggestName:
class BiggestName(str):
def __lt__(self, other):
return True

def __eq__(self, other):
return isinstance(other, self.__class__)


def _hook_list() -> List[ParsedHookNode]:
return []


class RunTask(CompileTask):
def __init__(self, args, config):
super().__init__(args, config)
self.ran_hooks = []
self._total_executed = 0

def index_offset(self, value: int) -> int:
return self._total_executed + value

def raise_on_first_error(self):
return False
Expand All @@ -67,20 +84,22 @@ def get_hook_sql(self, adapter, hook, idx, num_hooks, extra_context):
hook_obj = get_hook(statement, index=hook_index)
return hook_obj.sql or ''

def _hook_keyfunc(self, hook):
def _hook_keyfunc(self, hook: ParsedHookNode):
package_name = hook.package_name
if package_name == self.config.project_name:
package_name = BiggestName()
package_name = BiggestName('')
return package_name, hook.index

def get_hooks_by_type(self, hook_type):
def get_hooks_by_type(
self, hook_type: RunHookType
) -> List[ParsedHookNode]:
nodes = self.manifest.nodes.values()
# find all hooks defined in the manifest (could be multiple projects)
hooks = get_nodes_by_tags(nodes, {hook_type}, NodeType.Operation)
hooks.sort(key=self._hook_keyfunc)
return hooks

def run_hooks(self, adapter, hook_type, extra_context):
def run_hooks(self, adapter, hook_type: RunHookType, extra_context):
ordered_hooks = self.get_hooks_by_type(hook_type)

# on-run-* hooks should run outside of a transaction. This happens
Expand All @@ -97,6 +116,8 @@ def run_hooks(self, adapter, hook_type, extra_context):
print_timestamped_line(
'Running {} {} {}'.format(num_hooks, hook_type, plural)
)
startctx = TimestampNamed('node_started_at')
finishctx = TimestampNamed('node_finished_at')

for idx, hook in enumerate(ordered_hooks, start=1):
sql = self.get_hook_sql(adapter, hook, idx, num_hooks,
Expand All @@ -105,6 +126,11 @@ def run_hooks(self, adapter, hook_type, extra_context):
hook_text = '{}.{}.{}'.format(hook.package_name, hook_type,
hook.index)
print_hook_start_line(hook_text, idx, num_hooks)

hook_meta_ctx = HookMetadata(hook, self.index_offset(idx))
uid_ctx = UniqueID(hook.unique_id)
with uid_ctx, hook_meta_ctx, startctx:
logger.debug('on-run-hook starting')
status = 'OK'

with Timer() as timer:
Expand All @@ -113,13 +139,18 @@ def run_hooks(self, adapter, hook_type, extra_context):
fetch=False)
self.ran_hooks.append(hook)

with uid_ctx, finishctx, DbtModelState({'node_status': status}):
logger.debug('on-run-hook complete')

print_hook_end_line(hook_text, status, idx, num_hooks,
timer.elapsed)

self._total_executed += len(ordered_hooks)

with TextOnly():
print_timestamped_line("")

def safe_run_hooks(self, adapter, hook_type, extra_context):
def safe_run_hooks(self, adapter, hook_type: RunHookType, extra_context):
try:
self.run_hooks(adapter, hook_type, extra_context)
except dbt.exceptions.RuntimeException:
Expand Down Expand Up @@ -161,6 +192,7 @@ def after_run(self, adapter, results):
{'schemas': schemas, 'results': results})

def after_hooks(self, adapter, results, elapsed):
self._total_executed += len(results)
self.print_results_line(results, elapsed)

def build_query(self):
Expand Down
8 changes: 6 additions & 2 deletions core/dbt/task/runnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
UniqueID,
TimestampNamed,
DbtModelState,
NodeMetadata,
ModelMetadata,
NodeCount,
)
from dbt.compilation import compile_manifest
Expand Down Expand Up @@ -67,6 +67,9 @@ def __init__(self, args, config):
self._skipped_children = {}
self._raise_next_tick = None

def index_offset(self, value: int) -> int:
return value

def select_nodes(self):
selector = dbt.graph.selector.NodeSelector(
self.linker.graph, self.manifest
Expand Down Expand Up @@ -120,7 +123,8 @@ def call_runner(self, runner):
uid_context = UniqueID(runner.node.unique_id)
with RUNNING_STATE, uid_context:
startctx = TimestampNamed('node_started_at')
extended_metadata = NodeMetadata(runner.node, runner.node_index)
index = self.index_offset(runner.node_index)
extended_metadata = ModelMetadata(runner.node, index)
with startctx, extended_metadata:
logger.debug('Began running node {}'.format(
runner.node.unique_id))
Expand Down
16 changes: 14 additions & 2 deletions core/dbt/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
import json
import os
from enum import Enum
from typing import Tuple, Type, Any, Optional, TypeVar, Dict
from typing import (
Tuple, Type, Any, Optional, TypeVar, Dict, Iterable, Set, List
)
from typing_extensions import Protocol

import dbt.exceptions

Expand Down Expand Up @@ -314,7 +317,16 @@ def get_pseudo_hook_path(hook_name):
return os.path.join(*path_parts)


def get_nodes_by_tags(nodes, match_tags, resource_type):
class _Tagged(Protocol):
tags: Iterable[str]


Tagged = TypeVar('Tagged', bound=_Tagged)


def get_nodes_by_tags(
nodes: Iterable[Tagged], match_tags: Set[str], resource_type: NodeType
) -> List[Tagged]:
matched_nodes = []
for node in nodes:
node_tags = node.tags
Expand Down

0 comments on commit 83de6b1

Please sign in to comment.