Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transactional json logging #1806

Merged
merged 4 commits into from
Oct 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions core/dbt/contracts/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@
from dbt.contracts.graph.unparsed import Time, FreshnessStatus
from dbt.contracts.graph.parsed import ParsedSourceDefinition
from dbt.contracts.util import Writable, Replaceable
from dbt.logger import LogMessage
from dbt.logger import (
LogMessage,
TimingProcessor,
JsonOnly,
GLOBAL_LOGGER as logger,
)
from hologram.helpers import StrEnum
from hologram import JsonSchemaMixin

Expand All @@ -28,7 +33,7 @@ def end(self):


class collect_timing_info:
def __init__(self, name):
def __init__(self, name: str):
self.timing_info = TimingInfo(name=name)

def __enter__(self):
Expand All @@ -37,6 +42,8 @@ def __enter__(self):

def __exit__(self, exc_type, exc_value, traceback):
self.timing_info.end()
with JsonOnly(), TimingProcessor(self.timing_info):
logger.debug('finished collecting timing info')


@dataclass
Expand Down
25 changes: 14 additions & 11 deletions core/dbt/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import dbt.exceptions
import dbt.flags

from dbt.logger import GLOBAL_LOGGER as logger
from dbt.logger import GLOBAL_LOGGER as logger, DbtProcessState
from dbt.node_types import NodeType
from dbt.clients.system import make_directory
from dbt.config import Project, RuntimeConfig
Expand All @@ -31,6 +31,7 @@


PARTIAL_PARSE_FILE_NAME = 'partial_parse.pickle'
PARSING_STATE = DbtProcessState('parsing')


_parser_types = [
Expand Down Expand Up @@ -284,19 +285,21 @@ def load_all(
root_config: RuntimeConfig,
internal_manifest: Optional[Manifest] = None
) -> Manifest:
projects = load_all_projects(root_config)
loader = cls(root_config, projects)
loader.load(internal_manifest=internal_manifest)
loader.write_parse_results()
manifest = loader.create_manifest()
_check_manifest(manifest, root_config)
return manifest
with PARSING_STATE:
projects = load_all_projects(root_config)
loader = cls(root_config, projects)
loader.load(internal_manifest=internal_manifest)
loader.write_parse_results()
manifest = loader.create_manifest()
_check_manifest(manifest, root_config)
return manifest

@classmethod
def load_internal(cls, root_config: RuntimeConfig) -> Manifest:
projects = load_internal_projects(root_config)
loader = cls(root_config, projects)
return loader.load_only_macros()
with PARSING_STATE:
projects = load_internal_projects(root_config)
loader = cls(root_config, projects)
return loader.load_only_macros()


def _check_resource_uniqueness(manifest):
Expand Down
119 changes: 118 additions & 1 deletion core/dbt/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,10 @@ def format_record(self, record, handler):
class JsonFormatter(LogMessageFormatter):
def __call__(self, record, handler):
"""Return a the record converted to LogMessage's JSON form"""
# utils imports exceptions which imports logger...
import dbt.utils
log_message = super().__call__(record, handler)
return json.dumps(log_message.to_dict())
return json.dumps(log_message.to_dict(), cls=dbt.utils.JSONEncoder)


class FormatterMixin:
Expand Down Expand Up @@ -152,6 +154,17 @@ def reset(self):
self._text_format_string = self._default_format
self.format_text()

def should_handle(self, record):
if record.level < self.level:
return False
text_mode = self.formatter_class is logbook.StringFormatter
if text_mode and record.extra.get('json_only', False):
return False
elif not text_mode and record.extra.get('text_only', False):
return False
else:
return True


def _redirect_std_logging():
logbook.compat.redirect_logging()
Expand Down Expand Up @@ -186,6 +199,108 @@ def process(self, record):
record.level = self.target_level


class JsonOnly(logbook.Processor):
def process(self, record):
record.extra['json_only'] = True


class TextOnly(logbook.Processor):
def process(self, record):
record.extra['text_only'] = True


class TimingProcessor(logbook.Processor):
def __init__(self, timing_info: Optional[JsonSchemaMixin] = None):
self.timing_info = timing_info
super().__init__()

def process(self, record):
if self.timing_info is not None:
record.extra['timing_info'] = self.timing_info.to_dict()


class DbtProcessState(logbook.Processor):
def __init__(self, value: str):
self.value = value
super().__init__()

def process(self, record):
overwrite = (
'run_state' not in record.extra or
record.extra['run_state'] == 'internal'
)
if overwrite:
record.extra['run_state'] = self.value


class DbtModelState(logbook.Processor):
def __init__(self, state: Dict[str, str]):
self.state = state
super().__init__()

def process(self, record):
record.extra.update(self.state)


class DbtStatusMessage(logbook.Processor):
def process(self, record):
record.extra['is_status_message'] = True


class UniqueID(logbook.Processor):
def __init__(self, unique_id: str):
self.unique_id = unique_id
super().__init__()

def process(self, record):
record.extra['unique_id'] = self.unique_id


class NodeCount(logbook.Processor):
def __init__(self, node_count: int):
self.node_count = node_count
super().__init__()

def process(self, record):
record.extra['node_count'] = self.node_count


class NodeMetadata(logbook.Processor):
def __init__(self, node, node_index):
self.node = node
self.node_index = node_index
super().__init__()

def process(self, record):
keys = [
('alias', 'node_alias'),
('schema', 'node_schema'),
('database', 'node_database'),
('name', 'node_name'),
('original_file_path', 'node_path'),
('resource_type', 'resource_type'),
]
for attr, key in keys:
value = getattr(self.node, attr, None)
if value is not None:
record.extra[key] = value
record.extra['node_index'] = self.node_index
if hasattr(self.node, 'config'):
materialized = getattr(self.node.config, 'materialized', None)
if materialized is not None:
record.extra['node_materialized'] = materialized


class TimestampNamed(JsonOnly):
def __init__(self, name: str):
self.name = name
super().__init__()

def process(self, record):
super().process(record)
record.extra[self.name] = datetime.utcnow().isoformat()


logger = logbook.Logger('dbt')
# provide this for the cache, disabled by default
CACHE_LOGGER = logbook.Logger('dbt.cache')
Expand Down Expand Up @@ -318,6 +433,7 @@ def __init__(self, stdout=colorama_stdout, stderr=sys.stderr):
self._output_handler = OutputHandler(self.stdout)
self._file_handler = DelayedFileHandler()
self._relevel_processor = Relevel(allowed=['dbt', 'werkzeug'])
self._state_processor = DbtProcessState('internal')
# keep track of wheter we've already entered to decide if we should
# be actually pushing. This allows us to log in main() and also
# support entering dbt execution via handle_and_check.
Expand All @@ -327,6 +443,7 @@ def __init__(self, stdout=colorama_stdout, stderr=sys.stderr):
self._output_handler,
self._file_handler,
self._relevel_processor,
self._state_processor,
])

def push_application(self):
Expand Down
18 changes: 18 additions & 0 deletions core/dbt/node_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,18 @@ def __init__(self, config, adapter, node, node_index, num_nodes):
self.skip = False
self.skip_cause = None

def get_result_status(self, result) -> Dict[str, str]:
if result.error:
return {'node_status': 'error', 'node_error': str(result.error)}
elif result.skip:
return {'node_status': 'skipped'}
elif result.fail:
return {'node_status': 'failed'}
elif result.warn:
return {'node_status': 'warn'}
else:
return {'node_status': 'passed'}

def run_with_hooks(self, manifest):
if self.skip:
return self.on_skip()
Expand Down Expand Up @@ -436,6 +448,12 @@ def on_skip(self):
'Freshness: nodes cannot be skipped!'
)

def get_result_status(self, result) -> Dict[str, str]:
if result.error:
return {'node_status': 'error', 'node_error': str(result.error)}
else:
return {'node_status': str(result.status)}

def before_execute(self):
description = 'freshness of {0.source_name}.{0.name}'.format(self.node)
dbt.ui.printer.print_start_line(description, self.node_index,
Expand Down
11 changes: 7 additions & 4 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import functools
import time

from dbt.logger import GLOBAL_LOGGER as logger
from dbt.logger import GLOBAL_LOGGER as logger, TextOnly
from dbt.node_types import NodeType, RunHookType
from dbt.node_runners import ModelRunner

Expand Down Expand Up @@ -92,7 +92,8 @@ def run_hooks(self, adapter, hook_type, extra_context):
num_hooks = len(ordered_hooks)

plural = 'hook' if num_hooks == 1 else 'hooks'
print_timestamped_line("")
with TextOnly():
print_timestamped_line("")
print_timestamped_line(
'Running {} {} {}'.format(num_hooks, hook_type, plural)
)
Expand All @@ -115,7 +116,8 @@ def run_hooks(self, adapter, hook_type, extra_context):
print_hook_end_line(hook_text, status, idx, num_hooks,
timer.elapsed)

print_timestamped_line("")
with TextOnly():
print_timestamped_line("")

def safe_run_hooks(self, adapter, hook_type, extra_context):
try:
Expand All @@ -134,7 +136,8 @@ def print_results_line(self, results, execution_time):
execution = " in {execution_time:0.2f}s".format(
execution_time=execution_time)

print_timestamped_line("")
with TextOnly():
print_timestamped_line("")
print_timestamped_line(
"Finished running {stat_line}{execution}."
.format(stat_line=stat_line, execution=execution))
Expand Down
38 changes: 31 additions & 7 deletions core/dbt/task/runnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,16 @@

from dbt.task.base import ConfiguredTask
from dbt.adapters.factory import get_adapter
from dbt.logger import GLOBAL_LOGGER as logger
from dbt.logger import (
GLOBAL_LOGGER as logger,
DbtProcessState,
TextOnly,
UniqueID,
TimestampNamed,
DbtModelState,
NodeMetadata,
NodeCount,
)
from dbt.compilation import compile_manifest
from dbt.contracts.results import ExecutionResult
from dbt.loader import GraphLoader
Expand All @@ -19,6 +28,7 @@

RESULT_FILE_NAME = 'run_results.json'
MANIFEST_FILE_NAME = 'manifest.json'
RUNNING_STATE = DbtProcessState('running')


def write_manifest(manifest, config):
Expand Down Expand Up @@ -120,9 +130,20 @@ def get_runner(self, node):
return cls(self.config, adapter, node, run_count, num_nodes)

def call_runner(self, runner):
# TODO: create+enforce an actual contracts for what `result` is instead
# of the current free-for-all
result = runner.run_with_hooks(self.manifest)
uid_context = UniqueID(runner.node.unique_id)
with RUNNING_STATE, uid_context:
startctx = TimestampNamed('node_started_at')
extended_metadata = NodeMetadata(runner.node, runner.node_index)
with startctx, extended_metadata:
logger.info('Began running model')
status = 'error' # we must have an error if we don't see this
try:
result = runner.run_with_hooks(self.manifest)
status = runner.get_result_status(result)
finally:
finishctx = TimestampNamed('node_finished_at')
with finishctx, DbtModelState(status):
logger.info('Finished running model')
if result.error is not None and self.raise_on_first_error():
# if we raise inside a thread, it'll just get silently swallowed.
# stash the error message we want here, and it will check the
Expand Down Expand Up @@ -203,8 +224,10 @@ def execute_nodes(self):

text = "Concurrency: {} threads (target='{}')"
concurrency_line = text.format(num_threads, target_name)
dbt.ui.printer.print_timestamped_line(concurrency_line)
dbt.ui.printer.print_timestamped_line("")
with NodeCount(self.num_nodes):
dbt.ui.printer.print_timestamped_line(concurrency_line)
with TextOnly():
dbt.ui.printer.print_timestamped_line("")

pool = ThreadPool(num_threads)
try:
Expand Down Expand Up @@ -292,7 +315,8 @@ def run(self):
elapsed_time=0.0,
)
else:
logger.info("")
with TextOnly():
logger.info("")

selected_uids = frozenset(n.unique_id for n in self._flattened_nodes)
result = self.execute_with_hooks(selected_uids)
Expand Down
Loading