Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RPC: Error handling improvements #1341

Merged
merged 10 commits into from
Mar 12, 2019
5 changes: 2 additions & 3 deletions core/dbt/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import abc
import codecs
import json
import warnings
import decimal

Expand Down Expand Up @@ -35,12 +34,12 @@
if WHICH_PYTHON == 2:
from SimpleHTTPServer import SimpleHTTPRequestHandler
from SocketServer import TCPServer
from Queue import PriorityQueue
from Queue import PriorityQueue, Empty as QueueEmpty
from thread import get_ident
else:
from http.server import SimpleHTTPRequestHandler
from socketserver import TCPServer
from queue import PriorityQueue
from queue import PriorityQueue, Empty as QueueEmpty
from threading import get_ident


Expand Down
2 changes: 1 addition & 1 deletion core/dbt/compilation.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def print_compile_stats(stats):
stat_line = ", ".join(
["{} {}".format(ct, names.get(t)) for t, ct in results.items()])

logger.info("Found {}".format(stat_line))
logger.notice("Found {}".format(stat_line))


def _add_prepended_cte(prepended_ctes, new_cte):
Expand Down
2 changes: 1 addition & 1 deletion core/dbt/context/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ def generate_base(model, model_dict, config, manifest, source_config,
"config": provider.Config(model_dict, source_config),
"database": config.credentials.database,
"env_var": env_var,
"exceptions": dbt.exceptions.CONTEXT_EXPORTS,
"exceptions": dbt.exceptions.wrapped_exports(model),
"execute": provider.execute,
"flags": dbt.flags,
# TODO: Do we have to leave this in?
Expand Down
85 changes: 72 additions & 13 deletions core/dbt/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,22 @@
from dbt.compat import basestring, builtins
import sys
import six
import functools

from dbt.compat import builtins
from dbt.logger import GLOBAL_LOGGER as logger
import dbt.flags
import re


class Exception(builtins.Exception):
pass
CODE = -32000
MESSAGE = "Server Error"

def data(self):
# if overriding, make sure the result is json-serializable.
return {
'type': self.__class__.__name__,
'message': str(self),
}


class MacroReturn(builtins.BaseException):
Expand All @@ -21,11 +32,10 @@ class InternalException(Exception):
pass


class RPCException(Exception):
pass


class RuntimeException(RuntimeError, Exception):
CODE = 10001
MESSAGE = "Runtime error"

def __init__(self, msg, node=None):
self.stack = []
self.node = node
Expand Down Expand Up @@ -86,7 +96,28 @@ def __str__(self, prefix="! "):
[" " + line for line in lines[1:]])


class RPCFailureResult(RuntimeException):
CODE = 10002
MESSAGE = "RPC execution error"


class RPCTimeoutException(RuntimeException):
CODE = 10008
MESSAGE = 'RPC timeout error'

def __init__(self, timeout):
self.timeout = timeout

def data(self):
return {
'timeout': self.timeout,
'message': 'RPC timed out after {}s'.format(self.timeout),
}


class DatabaseException(RuntimeException):
CODE = 10003
MESSAGE = "Database Error"

def process_stack(self):
lines = []
Expand All @@ -103,6 +134,9 @@ def type(self):


class CompilationException(RuntimeException):
CODE = 10004
MESSAGE = "Compilation Error"

@property
def type(self):
return 'Compilation'
Expand All @@ -113,7 +147,8 @@ class RecursionException(RuntimeException):


class ValidationException(RuntimeException):
pass
CODE = 10005
MESSAGE = "Validation Error"


class JSONValidationException(ValidationException):
Expand All @@ -134,15 +169,16 @@ class AliasException(ValidationException):
pass


class ParsingException(Exception):
pass


class DependencyException(Exception):
pass
# this can happen due to raise_dependency_error and its callers
CODE = 10006
MESSAGE = "Dependency Error"


class DbtConfigError(RuntimeException):
CODE = 10007
MESSAGE = "DBT Configuration Error"

def __init__(self, message, project=None, result_type='invalid_project'):
self.project = project
super(DbtConfigError, self).__init__(message)
Expand Down Expand Up @@ -616,3 +652,26 @@ def warn_or_error(msg, node=None, log_fmt=None):
relation_wrong_type,
]
}


def wrapper(model):
def wrap(func):
@functools.wraps(func)
def inner(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception:
exc_type, exc, exc_tb = sys.exc_info()
if hasattr(exc, 'node') and exc.node is None:
exc.node = model
six.reraise(exc_type, exc, exc_tb)

return inner
return wrap


def wrapped_exports(model):
wrap = wrapper(model)
return {
name: wrap(export) for name, export in CONTEXT_EXPORTS.items()
}
112 changes: 93 additions & 19 deletions core/dbt/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@
import logging.handlers
import os
import sys
import warnings

import colorama


# Colorama needs some help on windows because we're using logger.info
# intead of print(). If the Windows env doesn't have a TERM var set,
# then we should override the logging stream to use the colorama
Expand All @@ -17,6 +15,27 @@
colorama_stdout = sys.stdout
colorama_wrap = True

colorama.init(wrap=colorama_wrap)

DEBUG = logging.DEBUG
NOTICE = 15
INFO = logging.INFO
WARNING = logging.WARNING
ERROR = logging.ERROR
CRITICAL = logging.CRITICAL

logging.addLevelName(NOTICE, 'NOTICE')


class Logger(logging.Logger):
def notice(self, msg, *args, **kwargs):
if self.isEnabledFor(NOTICE):
self._log(NOTICE, msg, args, **kwargs)


logging.setLoggerClass(Logger)


if sys.platform == 'win32' and not os.environ.get('TERM'):
colorama_wrap = False
colorama_stdout = colorama.AnsiToWin32(sys.stdout).stream
Expand All @@ -29,22 +48,22 @@
# create a global console logger for dbt
stdout_handler = logging.StreamHandler(colorama_stdout)
stdout_handler.setFormatter(logging.Formatter('%(message)s'))
stdout_handler.setLevel(logging.INFO)
stdout_handler.setLevel(NOTICE)

logger = logging.getLogger('dbt')
logger.addHandler(stdout_handler)
logger.setLevel(logging.DEBUG)
logging.getLogger().setLevel(logging.CRITICAL)
logger.setLevel(DEBUG)
logging.getLogger().setLevel(CRITICAL)

# Quiet these down in the logs
logging.getLogger('botocore').setLevel(logging.INFO)
logging.getLogger('requests').setLevel(logging.INFO)
logging.getLogger('urllib3').setLevel(logging.INFO)
logging.getLogger('google').setLevel(logging.INFO)
logging.getLogger('snowflake.connector').setLevel(logging.INFO)
logging.getLogger('parsedatetime').setLevel(logging.INFO)
logging.getLogger('botocore').setLevel(INFO)
logging.getLogger('requests').setLevel(INFO)
logging.getLogger('urllib3').setLevel(INFO)
logging.getLogger('google').setLevel(INFO)
logging.getLogger('snowflake.connector').setLevel(INFO)
logging.getLogger('parsedatetime').setLevel(INFO)
# we never want to seek werkzeug logs
logging.getLogger('werkzeug').setLevel(logging.CRITICAL)
logging.getLogger('werkzeug').setLevel(CRITICAL)

# provide this for the cache.
CACHE_LOGGER = logging.getLogger('dbt.cache')
Expand Down Expand Up @@ -75,16 +94,19 @@ def filter(self, record):
return True


def default_formatter():
return logging.Formatter('%(asctime)-18s (%(threadName)s): %(message)s')


def initialize_logger(debug_mode=False, path=None):
global initialized, logger, stdout_handler

if initialized:
return

if debug_mode:
stdout_handler.setFormatter(
logging.Formatter('%(asctime)-18s (%(threadName)s): %(message)s'))
stdout_handler.setLevel(logging.DEBUG)
stdout_handler.setFormatter(default_formatter())
stdout_handler.setLevel(DEBUG)

if path is not None:
make_log_dir_if_missing(path)
Expand All @@ -101,16 +123,15 @@ def initialize_logger(debug_mode=False, path=None):
color_filter = ColorFilter()
logdir_handler.addFilter(color_filter)

logdir_handler.setFormatter(
logging.Formatter('%(asctime)-18s (%(threadName)s): %(message)s'))
logdir_handler.setLevel(logging.DEBUG)
logdir_handler.setFormatter(default_formatter())
logdir_handler.setLevel(DEBUG)

logger.addHandler(logdir_handler)

# Log Python warnings to file
warning_logger = logging.getLogger('py.warnings')
warning_logger.addHandler(logdir_handler)
warning_logger.setLevel(logging.DEBUG)
warning_logger.setLevel(DEBUG)

initialized = True

Expand All @@ -126,3 +147,56 @@ def log_cache_events(flag):


GLOBAL_LOGGER = logger


class QueueFormatter(logging.Formatter):
def formatMessage(self, record):
superself = super(QueueFormatter, self)
if hasattr(superself, 'formatMessage'):
# python 3.x
return superself.formatMessage(record)

# python 2.x, handling weird unicode things
try:
return self._fmt % record.__dict__
except UnicodeDecodeError as e:
try:
record.name = record.name.decode('utf-8')
return self._fmt % record.__dict__
except UnicodeDecodeError as e:
raise e

def format(self, record):
record.message = record.getMessage()
record.asctime = self.formatTime(record, self.datefmt)
formatted = self.formatMessage(record)

output = {
'message': formatted,
'timestamp': record.asctime,
'levelname': record.levelname,
'level': record.levelno,
}
if record.exc_info:
if not record.exc_text:
record.exc_text = self.formatException(record.exc_info)
output['exc_info'] = record.exc_text
return output


class QueueLogHandler(logging.Handler):
def __init__(self, queue):
super(QueueLogHandler, self).__init__()
self.queue = queue

def emit(self, record):
msg = self.format(record)
self.queue.put_nowait(['log', msg])


def add_queue_handler(queue):
"""Add a queue log handler to the global logger."""
handler = QueueLogHandler(queue)
handler.setFormatter(QueueFormatter())
handler.setLevel(DEBUG)
GLOBAL_LOGGER.addHandler(handler)
2 changes: 1 addition & 1 deletion core/dbt/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def track_run(task):
)
except (dbt.exceptions.NotImplementedException,
dbt.exceptions.FailedToConnectException) as e:
logger.info('ERROR: {}'.format(e))
logger.error('ERROR: {}'.format(e))
dbt.tracking.track_invocation_end(
config=task.config, args=task.args, result_type="error"
)
Expand Down
Loading