From f1f14d237ef6dcf0adaeeaf579ddedd05cdde45c Mon Sep 17 00:00:00 2001 From: Jacob Beck Date: Thu, 7 Nov 2019 10:12:41 -0700 Subject: [PATCH 1/3] log hook info in json logs --- core/dbt/logger.py | 50 +++++++++++++++++++++++++++++---------- core/dbt/task/run.py | 48 ++++++++++++++++++++++++++++++------- core/dbt/task/runnable.py | 6 ++++- core/dbt/utils.py | 16 +++++++++++-- 4 files changed, 96 insertions(+), 24 deletions(-) diff --git a/core/dbt/logger.py b/core/dbt/logger.py index 10d972373de..abab3e5df02 100644 --- a/core/dbt/logger.py +++ b/core/dbt/logger.py @@ -265,31 +265,55 @@ def process(self, record): record.extra['node_count'] = self.node_count -class NodeMetadata(logbook.Processor): - def __init__(self, node, node_index): - self.node = node - self.node_index = node_index +class ModelMetadata(logbook.Processor): + def __init__(self, model, index): + self.model = model + self.index = index super().__init__() + def mapping_keys(self): + return [] + + def process_keys(self, record): + for attr, key in self.mapping_keys(): + value = getattr(self.model, attr, None) + if value is not None: + record.extra[key] = value + def process(self, record): - keys = [ + self.process_keys(record) + record.extra['node_index'] = self.index + + +class NodeMetadata(ModelMetadata): + def mapping_keys(self): + return [ ('alias', 'node_alias'), ('schema', 'node_schema'), ('database', 'node_database'), - ('name', 'node_name'), ('original_file_path', 'node_path'), + ('name', 'node_name'), ('resource_type', 'resource_type'), ] - for attr, key in keys: - value = getattr(self.node, attr, None) - if value is not None: - record.extra[key] = value - record.extra['node_index'] = self.node_index - if hasattr(self.node, 'config'): - materialized = getattr(self.node.config, 'materialized', None) + + def process_config(self, record): + if hasattr(self.model, 'config'): + materialized = getattr(self.model.config, 'materialized', None) if materialized is not None: record.extra['node_materialized'] = materialized + def process(self, record): + super().process(record) + self.process_config(record) + + +class HookMetadata(ModelMetadata): + def mapping_keys(self): + return [ + ('name', 'node_name'), + ('resource_type', 'resource_type'), + ] + class TimestampNamed(JsonOnly): def __init__(self, name: str): diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index 3382af4c16a..5dae7d17266 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -1,7 +1,15 @@ import functools import time - -from dbt.logger import GLOBAL_LOGGER as logger, TextOnly +from typing import List + +from dbt.logger import ( + GLOBAL_LOGGER as logger, + TextOnly, + HookMetadata, + UniqueID, + TimestampNamed, + DbtModelState, +) from dbt.node_types import NodeType, RunHookType from dbt.node_runners import ModelRunner @@ -16,6 +24,7 @@ get_counts from dbt.compilation import compile_node +from dbt.contracts.graph.parsed import ParsedHookNode from dbt.task.compile import CompileTask from dbt.utils import get_nodes_by_tags @@ -40,7 +49,7 @@ def __exit__(self, exc_type, exc_value, exc_tracebck): @functools.total_ordering -class BiggestName: +class BiggestName(str): def __lt__(self, other): return True @@ -48,10 +57,18 @@ def __eq__(self, other): return isinstance(other, self.__class__) +def _hook_list() -> List[ParsedHookNode]: + return [] + + class RunTask(CompileTask): def __init__(self, args, config): super().__init__(args, config) self.ran_hooks = [] + self._total_executed = 0 + + def index_offset(self, value: int) -> int: + return self._total_executed + value def raise_on_first_error(self): return False @@ -67,20 +84,22 @@ def get_hook_sql(self, adapter, hook, idx, num_hooks, extra_context): hook_obj = get_hook(statement, index=hook_index) return hook_obj.sql or '' - def _hook_keyfunc(self, hook): + def _hook_keyfunc(self, hook: ParsedHookNode): package_name = hook.package_name if package_name == self.config.project_name: - package_name = BiggestName() + package_name = BiggestName('') return package_name, hook.index - def get_hooks_by_type(self, hook_type): + def get_hooks_by_type( + self, hook_type: RunHookType + ) -> List[ParsedHookNode]: nodes = self.manifest.nodes.values() # find all hooks defined in the manifest (could be multiple projects) hooks = get_nodes_by_tags(nodes, {hook_type}, NodeType.Operation) hooks.sort(key=self._hook_keyfunc) return hooks - def run_hooks(self, adapter, hook_type, extra_context): + def run_hooks(self, adapter, hook_type: RunHookType, extra_context): ordered_hooks = self.get_hooks_by_type(hook_type) # on-run-* hooks should run outside of a transaction. This happens @@ -97,6 +116,8 @@ def run_hooks(self, adapter, hook_type, extra_context): print_timestamped_line( 'Running {} {} {}'.format(num_hooks, hook_type, plural) ) + startctx = TimestampNamed('node_started_at') + finishctx = TimestampNamed('node_finished_at') for idx, hook in enumerate(ordered_hooks, start=1): sql = self.get_hook_sql(adapter, hook, idx, num_hooks, @@ -105,6 +126,11 @@ def run_hooks(self, adapter, hook_type, extra_context): hook_text = '{}.{}.{}'.format(hook.package_name, hook_type, hook.index) print_hook_start_line(hook_text, idx, num_hooks) + + hook_meta_ctx = HookMetadata(hook, self.index_offset(idx)) + uid_ctx = UniqueID(hook.unique_id) + with uid_ctx, hook_meta_ctx, startctx: + logger.debug('on-run-hook starting') status = 'OK' with Timer() as timer: @@ -113,13 +139,18 @@ def run_hooks(self, adapter, hook_type, extra_context): fetch=False) self.ran_hooks.append(hook) + with uid_ctx, finishctx, DbtModelState({'node_status': status}): + logger.debug('on-run-hook complete') + print_hook_end_line(hook_text, status, idx, num_hooks, timer.elapsed) + self._total_executed += len(ordered_hooks) + with TextOnly(): print_timestamped_line("") - def safe_run_hooks(self, adapter, hook_type, extra_context): + def safe_run_hooks(self, adapter, hook_type: RunHookType, extra_context): try: self.run_hooks(adapter, hook_type, extra_context) except dbt.exceptions.RuntimeException: @@ -161,6 +192,7 @@ def after_run(self, adapter, results): {'schemas': schemas, 'results': results}) def after_hooks(self, adapter, results, elapsed): + self._total_executed += len(results) self.print_results_line(results, elapsed) def build_query(self): diff --git a/core/dbt/task/runnable.py b/core/dbt/task/runnable.py index f7f25070887..5078fbbe54d 100644 --- a/core/dbt/task/runnable.py +++ b/core/dbt/task/runnable.py @@ -67,6 +67,9 @@ def __init__(self, args, config): self._skipped_children = {} self._raise_next_tick = None + def index_offset(self, value: int) -> int: + return value + def select_nodes(self): selector = dbt.graph.selector.NodeSelector( self.linker.graph, self.manifest @@ -120,7 +123,8 @@ def call_runner(self, runner): uid_context = UniqueID(runner.node.unique_id) with RUNNING_STATE, uid_context: startctx = TimestampNamed('node_started_at') - extended_metadata = NodeMetadata(runner.node, runner.node_index) + index = self.index_offset(runner.node_index) + extended_metadata = NodeMetadata(runner.node, index) with startctx, extended_metadata: logger.debug('Began running node {}'.format( runner.node.unique_id)) diff --git a/core/dbt/utils.py b/core/dbt/utils.py index d5d924947b9..35dbd63ca2c 100644 --- a/core/dbt/utils.py +++ b/core/dbt/utils.py @@ -8,7 +8,10 @@ import json import os from enum import Enum -from typing import Tuple, Type, Any, Optional, TypeVar, Dict +from typing import ( + Tuple, Type, Any, Optional, TypeVar, Dict, Iterable, Set, List +) +from typing_extensions import Protocol import dbt.exceptions @@ -314,7 +317,16 @@ def get_pseudo_hook_path(hook_name): return os.path.join(*path_parts) -def get_nodes_by_tags(nodes, match_tags, resource_type): +class _Tagged(Protocol): + tags: Iterable[str] + + +Tagged = TypeVar('Tagged', bound=_Tagged) + + +def get_nodes_by_tags( + nodes: Iterable[Tagged], match_tags: Set[str], resource_type: NodeType +) -> List[Tagged]: matched_nodes = [] for node in nodes: node_tags = node.tags From b9b4ce30e8a960056e11e81d97d582541b2acf05 Mon Sep 17 00:00:00 2001 From: Jacob Beck Date: Thu, 7 Nov 2019 15:01:19 -0700 Subject: [PATCH 2/3] PR feedback --- core/dbt/logger.py | 16 ++++++++-------- core/dbt/task/runnable.py | 4 ++-- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/core/dbt/logger.py b/core/dbt/logger.py index abab3e5df02..9343a0c631e 100644 --- a/core/dbt/logger.py +++ b/core/dbt/logger.py @@ -265,9 +265,9 @@ def process(self, record): record.extra['node_count'] = self.node_count -class ModelMetadata(logbook.Processor): - def __init__(self, model, index): - self.model = model +class NodeMetadata(logbook.Processor): + def __init__(self, node, index): + self.node = node self.index = index super().__init__() @@ -276,7 +276,7 @@ def mapping_keys(self): def process_keys(self, record): for attr, key in self.mapping_keys(): - value = getattr(self.model, attr, None) + value = getattr(self.node, attr, None) if value is not None: record.extra[key] = value @@ -285,7 +285,7 @@ def process(self, record): record.extra['node_index'] = self.index -class NodeMetadata(ModelMetadata): +class ModelMetadata(NodeMetadata): def mapping_keys(self): return [ ('alias', 'node_alias'), @@ -297,8 +297,8 @@ def mapping_keys(self): ] def process_config(self, record): - if hasattr(self.model, 'config'): - materialized = getattr(self.model.config, 'materialized', None) + if hasattr(self.node, 'config'): + materialized = getattr(self.node.config, 'materialized', None) if materialized is not None: record.extra['node_materialized'] = materialized @@ -307,7 +307,7 @@ def process(self, record): self.process_config(record) -class HookMetadata(ModelMetadata): +class HookMetadata(NodeMetadata): def mapping_keys(self): return [ ('name', 'node_name'), diff --git a/core/dbt/task/runnable.py b/core/dbt/task/runnable.py index 5078fbbe54d..0efb782a1a0 100644 --- a/core/dbt/task/runnable.py +++ b/core/dbt/task/runnable.py @@ -12,7 +12,7 @@ UniqueID, TimestampNamed, DbtModelState, - NodeMetadata, + ModelMetadata, NodeCount, ) from dbt.compilation import compile_manifest @@ -124,7 +124,7 @@ def call_runner(self, runner): with RUNNING_STATE, uid_context: startctx = TimestampNamed('node_started_at') index = self.index_offset(runner.node_index) - extended_metadata = NodeMetadata(runner.node, index) + extended_metadata = ModelMetadata(runner.node, index) with startctx, extended_metadata: logger.debug('Began running node {}'.format( runner.node.unique_id)) From 08cb38b3425ba55d3b10f0169dbaf285a6eed095 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Thu, 7 Nov 2019 19:11:51 -0500 Subject: [PATCH 3/3] fix for manifest reload logic --- core/dbt/rpc/task_handler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/dbt/rpc/task_handler.py b/core/dbt/rpc/task_handler.py index 9ddcdbdacfe..adfd0e5e7eb 100644 --- a/core/dbt/rpc/task_handler.py +++ b/core/dbt/rpc/task_handler.py @@ -179,7 +179,7 @@ def get_results_context( with manifest_blocking: yield - if RemoteMethodFlags.RequiresManifestReloadAfter: + if RemoteMethodFlags.RequiresManifestReloadAfter in flags: manager.parse_manifest()