Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transactional json logging #1806

Merged
merged 4 commits into from
Oct 10, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions core/dbt/contracts/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@
from dbt.contracts.graph.unparsed import Time, FreshnessStatus
from dbt.contracts.graph.parsed import ParsedSourceDefinition
from dbt.contracts.util import Writable, Replaceable
from dbt.logger import LogMessage
from dbt.logger import (
LogMessage,
TimingProcessor,
JsonOnly,
GLOBAL_LOGGER as logger,
)
from hologram.helpers import StrEnum
from hologram import JsonSchemaMixin

Expand All @@ -28,7 +33,7 @@ def end(self):


class collect_timing_info:
def __init__(self, name):
def __init__(self, name: str):
self.timing_info = TimingInfo(name=name)

def __enter__(self):
Expand All @@ -37,6 +42,8 @@ def __enter__(self):

def __exit__(self, exc_type, exc_value, traceback):
self.timing_info.end()
with JsonOnly(), TimingProcessor(self.timing_info):
logger.debug('finished collecting timing info')


@dataclass
Expand Down
25 changes: 14 additions & 11 deletions core/dbt/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import dbt.exceptions
import dbt.flags

from dbt.logger import GLOBAL_LOGGER as logger
from dbt.logger import GLOBAL_LOGGER as logger, DbtProcessState
from dbt.node_types import NodeType
from dbt.clients.system import make_directory
from dbt.config import Project, RuntimeConfig
Expand All @@ -31,6 +31,7 @@


PARTIAL_PARSE_FILE_NAME = 'partial_parse.pickle'
PARSING_STATE = DbtProcessState('parsing')


_parser_types = [
Expand Down Expand Up @@ -284,19 +285,21 @@ def load_all(
root_config: RuntimeConfig,
internal_manifest: Optional[Manifest] = None
) -> Manifest:
projects = load_all_projects(root_config)
loader = cls(root_config, projects)
loader.load(internal_manifest=internal_manifest)
loader.write_parse_results()
manifest = loader.create_manifest()
_check_manifest(manifest, root_config)
return manifest
with PARSING_STATE:
projects = load_all_projects(root_config)
loader = cls(root_config, projects)
loader.load(internal_manifest=internal_manifest)
loader.write_parse_results()
manifest = loader.create_manifest()
_check_manifest(manifest, root_config)
return manifest

@classmethod
def load_internal(cls, root_config: RuntimeConfig) -> Manifest:
projects = load_internal_projects(root_config)
loader = cls(root_config, projects)
return loader.load_only_macros()
with PARSING_STATE:
projects = load_internal_projects(root_config)
loader = cls(root_config, projects)
return loader.load_only_macros()


def _check_resource_uniqueness(manifest):
Expand Down
107 changes: 106 additions & 1 deletion core/dbt/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,10 @@ def format_record(self, record, handler):
class JsonFormatter(LogMessageFormatter):
def __call__(self, record, handler):
"""Return a the record converted to LogMessage's JSON form"""
# utils imports exceptions which imports logger...
import dbt.utils
log_message = super().__call__(record, handler)
return json.dumps(log_message.to_dict())
return json.dumps(log_message.to_dict(), cls=dbt.utils.JSONEncoder)


class FormatterMixin:
Expand Down Expand Up @@ -152,6 +154,17 @@ def reset(self):
self._text_format_string = self._default_format
self.format_text()

def should_handle(self, record):
if record.level < self.level:
return False
text_mode = self.formatter_class is logbook.StringFormatter
if text_mode and record.extra.get('json_only', False):
return False
elif not text_mode and record.extra.get('text_only', False):
return False
else:
return True


def _redirect_std_logging():
logbook.compat.redirect_logging()
Expand Down Expand Up @@ -186,6 +199,96 @@ def process(self, record):
record.level = self.target_level


class JsonOnly(logbook.Processor):
def process(self, record):
record.extra['json_only'] = True


class TextOnly(logbook.Processor):
def process(self, record):
record.extra['text_only'] = True


class TimingProcessor(logbook.Processor):
def __init__(self, timing_info: Optional[JsonSchemaMixin] = None):
self.timing_info = timing_info
super().__init__()

def process(self, record):
if self.timing_info is not None:
record.extra['timing_info'] = self.timing_info.to_dict()


class DbtProcessState(logbook.Processor):
def __init__(self, value: str):
self.value = value
super().__init__()

def process(self, record):
overwrite = (
'run_state' not in record.extra or
record.extra['run_state'] == 'internal'
)
if overwrite:
record.extra['run_state'] = self.value


class DbtModelState(logbook.Processor):
def __init__(self, state):
self.state = state
super().__init__()

def process(self, record):
record.extra['model_state'] = self.state


class DbtStatusMessage(logbook.Processor):
def process(self, record):
record.extra['is_status_message'] = True


class UniqueID(logbook.Processor):
def __init__(self, unique_id: str):
self.unique_id = unique_id
super().__init__()

def process(self, record):
record.extra['unique_id'] = self.unique_id


class NodeMetadata(logbook.Processor):
def __init__(self, node):
self.node = node
super().__init__()

def process(self, record):
keys = [
('alias', 'node_alias'),
('schema', 'node_schema'),
('database', 'node_database'),
('name', 'node_name'),
('original_file_path', 'node_path')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we also include the resource type here?

]
for attr, key in keys:
value = getattr(self.node, attr, None)
if value is not None:
record.extra[key] = value
if hasattr(self.node, 'config'):
materialized = getattr(self.node.config, 'materialized', None)
if materialized is not None:
record.extra['node_materialized'] = materialized


class TimestampNamed(JsonOnly):
def __init__(self, name: str):
self.name = name
super().__init__()

def process(self, record):
super().process(record)
record.extra[self.name] = datetime.utcnow().isoformat()


logger = logbook.Logger('dbt')
# provide this for the cache, disabled by default
CACHE_LOGGER = logbook.Logger('dbt.cache')
Expand Down Expand Up @@ -318,6 +421,7 @@ def __init__(self, stdout=colorama_stdout, stderr=sys.stderr):
self._output_handler = OutputHandler(self.stdout)
self._file_handler = DelayedFileHandler()
self._relevel_processor = Relevel(allowed=['dbt', 'werkzeug'])
self._state_processor = DbtProcessState('internal')
# keep track of wheter we've already entered to decide if we should
# be actually pushing. This allows us to log in main() and also
# support entering dbt execution via handle_and_check.
Expand All @@ -327,6 +431,7 @@ def __init__(self, stdout=colorama_stdout, stderr=sys.stderr):
self._output_handler,
self._file_handler,
self._relevel_processor,
self._state_processor,
])

def push_application(self):
Expand Down
18 changes: 18 additions & 0 deletions core/dbt/node_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,18 @@ def __init__(self, config, adapter, node, node_index, num_nodes):
self.skip = False
self.skip_cause = None

def get_result_status(self, result) -> str:
if result.error:
return 'error: {}'.format(result.error)
elif result.skip:
return 'skipped'
elif result.fail:
return 'failed'
elif result.warn:
return 'warn'
else:
return 'passed'

def run_with_hooks(self, manifest):
if self.skip:
return self.on_skip()
Expand Down Expand Up @@ -436,6 +448,12 @@ def on_skip(self):
'Freshness: nodes cannot be skipped!'
)

def get_result_status(self, result) -> str:
if result.error:
return 'error: {}'.format(result.error)
else:
return str(result.status)

def before_execute(self):
description = 'freshness of {0.source_name}.{0.name}'.format(self.node)
dbt.ui.printer.print_start_line(description, self.node_index,
Expand Down
11 changes: 7 additions & 4 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import functools
import time

from dbt.logger import GLOBAL_LOGGER as logger
from dbt.logger import GLOBAL_LOGGER as logger, TextOnly
from dbt.node_types import NodeType, RunHookType
from dbt.node_runners import ModelRunner

Expand Down Expand Up @@ -92,7 +92,8 @@ def run_hooks(self, adapter, hook_type, extra_context):
num_hooks = len(ordered_hooks)

plural = 'hook' if num_hooks == 1 else 'hooks'
print_timestamped_line("")
with TextOnly():
print_timestamped_line("")
print_timestamped_line(
'Running {} {} {}'.format(num_hooks, hook_type, plural)
)
Expand All @@ -115,7 +116,8 @@ def run_hooks(self, adapter, hook_type, extra_context):
print_hook_end_line(hook_text, status, idx, num_hooks,
timer.elapsed)

print_timestamped_line("")
with TextOnly():
print_timestamped_line("")

def safe_run_hooks(self, adapter, hook_type, extra_context):
try:
Expand All @@ -134,7 +136,8 @@ def print_results_line(self, results, execution_time):
execution = " in {execution_time:0.2f}s".format(
execution_time=execution_time)

print_timestamped_line("")
with TextOnly():
print_timestamped_line("")
print_timestamped_line(
"Finished running {stat_line}{execution}."
.format(stat_line=stat_line, execution=execution))
Expand Down
27 changes: 22 additions & 5 deletions core/dbt/task/runnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,15 @@

from dbt.task.base import ConfiguredTask
from dbt.adapters.factory import get_adapter
from dbt.logger import GLOBAL_LOGGER as logger
from dbt.logger import (
GLOBAL_LOGGER as logger,
DbtProcessState,
TextOnly,
UniqueID,
TimestampNamed,
DbtModelState,
NodeMetadata,
)
from dbt.compilation import compile_manifest
from dbt.contracts.results import ExecutionResult
from dbt.loader import GraphLoader
Expand All @@ -19,6 +27,7 @@

RESULT_FILE_NAME = 'run_results.json'
MANIFEST_FILE_NAME = 'manifest.json'
RUNNING_STATE = DbtProcessState('running')


def write_manifest(manifest, config):
Expand Down Expand Up @@ -120,9 +129,16 @@ def get_runner(self, node):
return cls(self.config, adapter, node, run_count, num_nodes)

def call_runner(self, runner):
# TODO: create+enforce an actual contracts for what `result` is instead
# of the current free-for-all
result = runner.run_with_hooks(self.manifest)
with RUNNING_STATE, UniqueID(runner.node.unique_id):
with TimestampNamed('node_started_at'), NodeMetadata(runner.node):
logger.info('Began running model')
status = 'error' # we must have an error if we don't see this
try:
result = runner.run_with_hooks(self.manifest)
status = runner.get_result_status(result)
finally:
with TimestampNamed('node_finished_at'), DbtModelState(status):
logger.info('Finished running model')
if result.error is not None and self.raise_on_first_error():
# if we raise inside a thread, it'll just get silently swallowed.
# stash the error message we want here, and it will check the
Expand Down Expand Up @@ -292,7 +308,8 @@ def run(self):
elapsed_time=0.0,
)
else:
logger.info("")
with TextOnly():
logger.info("")

selected_uids = frozenset(n.unique_id for n in self._flattened_nodes)
result = self.execute_with_hooks(selected_uids)
Expand Down
8 changes: 5 additions & 3 deletions core/dbt/task/seed.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import random

from dbt.logger import GLOBAL_LOGGER as logger
from dbt.logger import GLOBAL_LOGGER as logger, TextOnly
from dbt.node_runners import SeedRunner
from dbt.node_types import NodeType
from dbt.task.run import RunTask
Expand Down Expand Up @@ -35,11 +35,13 @@ def show_table(self, result):
alias = result.node.alias

header = "Random sample of table: {}.{}".format(schema, alias)
logger.info("")
with TextOnly():
logger.info("")
logger.info(header)
logger.info("-" * len(header))
rand_table.print_table(max_rows=10, max_columns=None)
logger.info("")
with TextOnly():
logger.info("")

def show_tables(self, results):
for result in results:
Expand Down
12 changes: 12 additions & 0 deletions core/dbt/tracking.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from snowplow_tracker import SelfDescribingJson
from datetime import datetime

import logbook
import pytz
import platform
import uuid
Expand Down Expand Up @@ -336,3 +337,14 @@ def initialize_tracking(cookie_dir):
logger.debug('Got an exception trying to initialize tracking',
exc_info=True)
active_user = User(None)


class InvocationProcessor(logbook.Processor):
def __init__(self):
super().__init__()

def process(self, record):
record.extra.update({
"run_started_at": active_user.run_started_at.isoformat(),
"invocation_id": active_user.invocation_id,
})
Loading