diff --git a/core/dbt/logger.py b/core/dbt/logger.py index 10d972373de..9343a0c631e 100644 --- a/core/dbt/logger.py +++ b/core/dbt/logger.py @@ -266,30 +266,54 @@ def process(self, record): class NodeMetadata(logbook.Processor): - def __init__(self, node, node_index): + def __init__(self, node, index): self.node = node - self.node_index = node_index + self.index = index super().__init__() + def mapping_keys(self): + return [] + + def process_keys(self, record): + for attr, key in self.mapping_keys(): + value = getattr(self.node, attr, None) + if value is not None: + record.extra[key] = value + def process(self, record): - keys = [ + self.process_keys(record) + record.extra['node_index'] = self.index + + +class ModelMetadata(NodeMetadata): + def mapping_keys(self): + return [ ('alias', 'node_alias'), ('schema', 'node_schema'), ('database', 'node_database'), - ('name', 'node_name'), ('original_file_path', 'node_path'), + ('name', 'node_name'), ('resource_type', 'resource_type'), ] - for attr, key in keys: - value = getattr(self.node, attr, None) - if value is not None: - record.extra[key] = value - record.extra['node_index'] = self.node_index + + def process_config(self, record): if hasattr(self.node, 'config'): materialized = getattr(self.node.config, 'materialized', None) if materialized is not None: record.extra['node_materialized'] = materialized + def process(self, record): + super().process(record) + self.process_config(record) + + +class HookMetadata(NodeMetadata): + def mapping_keys(self): + return [ + ('name', 'node_name'), + ('resource_type', 'resource_type'), + ] + class TimestampNamed(JsonOnly): def __init__(self, name: str): diff --git a/core/dbt/rpc/task_handler.py b/core/dbt/rpc/task_handler.py index 9ddcdbdacfe..adfd0e5e7eb 100644 --- a/core/dbt/rpc/task_handler.py +++ b/core/dbt/rpc/task_handler.py @@ -179,7 +179,7 @@ def get_results_context( with manifest_blocking: yield - if RemoteMethodFlags.RequiresManifestReloadAfter: + if RemoteMethodFlags.RequiresManifestReloadAfter in flags: manager.parse_manifest() diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index 3382af4c16a..5dae7d17266 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -1,7 +1,15 @@ import functools import time - -from dbt.logger import GLOBAL_LOGGER as logger, TextOnly +from typing import List + +from dbt.logger import ( + GLOBAL_LOGGER as logger, + TextOnly, + HookMetadata, + UniqueID, + TimestampNamed, + DbtModelState, +) from dbt.node_types import NodeType, RunHookType from dbt.node_runners import ModelRunner @@ -16,6 +24,7 @@ get_counts from dbt.compilation import compile_node +from dbt.contracts.graph.parsed import ParsedHookNode from dbt.task.compile import CompileTask from dbt.utils import get_nodes_by_tags @@ -40,7 +49,7 @@ def __exit__(self, exc_type, exc_value, exc_tracebck): @functools.total_ordering -class BiggestName: +class BiggestName(str): def __lt__(self, other): return True @@ -48,10 +57,18 @@ def __eq__(self, other): return isinstance(other, self.__class__) +def _hook_list() -> List[ParsedHookNode]: + return [] + + class RunTask(CompileTask): def __init__(self, args, config): super().__init__(args, config) self.ran_hooks = [] + self._total_executed = 0 + + def index_offset(self, value: int) -> int: + return self._total_executed + value def raise_on_first_error(self): return False @@ -67,20 +84,22 @@ def get_hook_sql(self, adapter, hook, idx, num_hooks, extra_context): hook_obj = get_hook(statement, index=hook_index) return hook_obj.sql or '' - def _hook_keyfunc(self, hook): + def _hook_keyfunc(self, hook: ParsedHookNode): package_name = hook.package_name if package_name == self.config.project_name: - package_name = BiggestName() + package_name = BiggestName('') return package_name, hook.index - def get_hooks_by_type(self, hook_type): + def get_hooks_by_type( + self, hook_type: RunHookType + ) -> List[ParsedHookNode]: nodes = self.manifest.nodes.values() # find all hooks defined in the manifest (could be multiple projects) hooks = get_nodes_by_tags(nodes, {hook_type}, NodeType.Operation) hooks.sort(key=self._hook_keyfunc) return hooks - def run_hooks(self, adapter, hook_type, extra_context): + def run_hooks(self, adapter, hook_type: RunHookType, extra_context): ordered_hooks = self.get_hooks_by_type(hook_type) # on-run-* hooks should run outside of a transaction. This happens @@ -97,6 +116,8 @@ def run_hooks(self, adapter, hook_type, extra_context): print_timestamped_line( 'Running {} {} {}'.format(num_hooks, hook_type, plural) ) + startctx = TimestampNamed('node_started_at') + finishctx = TimestampNamed('node_finished_at') for idx, hook in enumerate(ordered_hooks, start=1): sql = self.get_hook_sql(adapter, hook, idx, num_hooks, @@ -105,6 +126,11 @@ def run_hooks(self, adapter, hook_type, extra_context): hook_text = '{}.{}.{}'.format(hook.package_name, hook_type, hook.index) print_hook_start_line(hook_text, idx, num_hooks) + + hook_meta_ctx = HookMetadata(hook, self.index_offset(idx)) + uid_ctx = UniqueID(hook.unique_id) + with uid_ctx, hook_meta_ctx, startctx: + logger.debug('on-run-hook starting') status = 'OK' with Timer() as timer: @@ -113,13 +139,18 @@ def run_hooks(self, adapter, hook_type, extra_context): fetch=False) self.ran_hooks.append(hook) + with uid_ctx, finishctx, DbtModelState({'node_status': status}): + logger.debug('on-run-hook complete') + print_hook_end_line(hook_text, status, idx, num_hooks, timer.elapsed) + self._total_executed += len(ordered_hooks) + with TextOnly(): print_timestamped_line("") - def safe_run_hooks(self, adapter, hook_type, extra_context): + def safe_run_hooks(self, adapter, hook_type: RunHookType, extra_context): try: self.run_hooks(adapter, hook_type, extra_context) except dbt.exceptions.RuntimeException: @@ -161,6 +192,7 @@ def after_run(self, adapter, results): {'schemas': schemas, 'results': results}) def after_hooks(self, adapter, results, elapsed): + self._total_executed += len(results) self.print_results_line(results, elapsed) def build_query(self): diff --git a/core/dbt/task/runnable.py b/core/dbt/task/runnable.py index f7f25070887..0efb782a1a0 100644 --- a/core/dbt/task/runnable.py +++ b/core/dbt/task/runnable.py @@ -12,7 +12,7 @@ UniqueID, TimestampNamed, DbtModelState, - NodeMetadata, + ModelMetadata, NodeCount, ) from dbt.compilation import compile_manifest @@ -67,6 +67,9 @@ def __init__(self, args, config): self._skipped_children = {} self._raise_next_tick = None + def index_offset(self, value: int) -> int: + return value + def select_nodes(self): selector = dbt.graph.selector.NodeSelector( self.linker.graph, self.manifest @@ -120,7 +123,8 @@ def call_runner(self, runner): uid_context = UniqueID(runner.node.unique_id) with RUNNING_STATE, uid_context: startctx = TimestampNamed('node_started_at') - extended_metadata = NodeMetadata(runner.node, runner.node_index) + index = self.index_offset(runner.node_index) + extended_metadata = ModelMetadata(runner.node, index) with startctx, extended_metadata: logger.debug('Began running node {}'.format( runner.node.unique_id)) diff --git a/core/dbt/utils.py b/core/dbt/utils.py index d5d924947b9..35dbd63ca2c 100644 --- a/core/dbt/utils.py +++ b/core/dbt/utils.py @@ -8,7 +8,10 @@ import json import os from enum import Enum -from typing import Tuple, Type, Any, Optional, TypeVar, Dict +from typing import ( + Tuple, Type, Any, Optional, TypeVar, Dict, Iterable, Set, List +) +from typing_extensions import Protocol import dbt.exceptions @@ -314,7 +317,16 @@ def get_pseudo_hook_path(hook_name): return os.path.join(*path_parts) -def get_nodes_by_tags(nodes, match_tags, resource_type): +class _Tagged(Protocol): + tags: Iterable[str] + + +Tagged = TypeVar('Tagged', bound=_Tagged) + + +def get_nodes_by_tags( + nodes: Iterable[Tagged], match_tags: Set[str], resource_type: NodeType +) -> List[Tagged]: matched_nodes = [] for node in nodes: node_tags = node.tags