Skip to content

Commit

Permalink
Address feedback, appease flake8/mypy/unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Jacob Beck committed Oct 4, 2019
1 parent 4025556 commit 4b227a6
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 26 deletions.
2 changes: 0 additions & 2 deletions core/dbt/contracts/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,13 @@
from hologram import JsonSchemaMixin

import agate
import logbook

from dataclasses import dataclass, field
from datetime import datetime
from typing import Union, Dict, List, Optional, Any, NamedTuple
from numbers import Real



@dataclass
class TimingInfo(JsonSchemaMixin):
name: str
Expand Down
43 changes: 29 additions & 14 deletions core/dbt/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,25 +219,18 @@ def process(self, record):
record.extra['timing_info'] = self.timing_info.to_dict()


class PushIfNonDefault(logbook.Processor):
KEY: str = NotImplemented
DEFAULT: Any = NotImplemented
class DbtProcessState(logbook.Processor):
def __init__(self, value: str):
self.value = value
super().__init__()

def process(self, record):
overwrite = (
self.KEY not in record.extra or
record.extra[self.KEY] == self.DEFAULT
'run_state' not in record.extra or
record.extra['run_state'] == 'internal'
)
if overwrite:
record.extra[self.KEY] = self.value


class DbtProcessState(PushIfNonDefault):
KEY = 'run_state'
DEFAULT = 'internal'
record.extra['run_state'] = self.value


class DbtModelState(logbook.Processor):
Expand All @@ -254,7 +247,7 @@ def process(self, record):
record.extra['is_status_message'] = True


class NodeInfo(logbook.Processor):
class UniqueID(logbook.Processor):
def __init__(self, unique_id: str):
self.unique_id = unique_id
super().__init__()
Expand All @@ -263,6 +256,29 @@ def process(self, record):
record.extra['unique_id'] = self.unique_id


class NodeMetadata(logbook.Processor):
def __init__(self, node):
self.node = node
super().__init__()

def process(self, record):
keys = [
('alias', 'node_alias'),
('schema', 'node_schema'),
('database', 'node_database'),
('name', 'node_name'),
('original_file_path', 'node_path')
]
for attr, key in keys:
value = getattr(self.node, attr, None)
if value is not None:
record.extra[key] = value
if hasattr(self.node, 'config'):
materialized = getattr(self.node.config, 'materialized', None)
if materialized is not None:
record.extra['node_materialized'] = materialized


class TimestampNamed(JsonOnly):
def __init__(self, name: str):
self.name = name
Expand All @@ -273,7 +289,6 @@ def process(self, record):
record.extra[self.name] = datetime.utcnow().isoformat()



logger = logbook.Logger('dbt')
# provide this for the cache, disabled by default
CACHE_LOGGER = logbook.Logger('dbt.cache')
Expand Down Expand Up @@ -406,7 +421,7 @@ def __init__(self, stdout=colorama_stdout, stderr=sys.stderr):
self._output_handler = OutputHandler(self.stdout)
self._file_handler = DelayedFileHandler()
self._relevel_processor = Relevel(allowed=['dbt', 'werkzeug'])
self._state_processor = DbtProcessState(DbtProcessState.DEFAULT)
self._state_processor = DbtProcessState('internal')
# keep track of wheter we've already entered to decide if we should
# be actually pushing. This allows us to log in main() and also
# support entering dbt execution via handle_and_check.
Expand Down
6 changes: 1 addition & 5 deletions core/dbt/node_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,7 @@

from dbt import deprecations
from dbt.adapters.base import BaseRelation
from dbt.logger import (
GLOBAL_LOGGER as logger,
JsonOnly,
DbtModelState,
)
from dbt.logger import GLOBAL_LOGGER as logger
from dbt.exceptions import (
NotImplementedException, CompilationException, RuntimeException,
InternalException, missing_materialization
Expand Down
1 change: 0 additions & 1 deletion core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ def run_hooks(self, adapter, hook_type, extra_context):
print_hook_end_line(hook_text, status, idx, num_hooks,
timer.elapsed)


with TextOnly():
print_timestamped_line("")

Expand Down
8 changes: 4 additions & 4 deletions core/dbt/task/runnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
GLOBAL_LOGGER as logger,
DbtProcessState,
TextOnly,
NodeInfo,
UniqueID,
TimestampNamed,
JsonOnly,
DbtModelState,
NodeMetadata,
)
from dbt.compilation import compile_manifest
from dbt.contracts.results import ExecutionResult
Expand Down Expand Up @@ -127,8 +127,8 @@ def get_runner(self, node):
return cls(self.config, adapter, node, run_count, num_nodes)

def call_runner(self, runner):
with RUNNING_STATE, NodeInfo(runner.node.unique_id):
with TimestampNamed('node_started_at'):
with RUNNING_STATE, UniqueID(runner.node.unique_id):
with TimestampNamed('node_started_at'), NodeMetadata(runner.node):
logger.info('Began running model')
status = 'error' # we must have an error if we don't see this
try:
Expand Down

0 comments on commit 4b227a6

Please sign in to comment.