Skip to content

Commit

Permalink
Error handling improvements
Browse files Browse the repository at this point in the history
All dbt errors now have proper error codes/messages
The raised message at runtime ends up in result.error.data.message
The raised message type at runtime ends up in result.error.data.typename
result.error.message is a plaintext name for result.error.code
dbt.exceptions.Exception.data() becomes result.error.data
Collect dbt logs and make them available to requests/responses
  • Loading branch information
Jacob Beck committed Mar 7, 2019
1 parent 2ad1166 commit 84dad1b
Show file tree
Hide file tree
Showing 10 changed files with 378 additions and 145 deletions.
5 changes: 2 additions & 3 deletions core/dbt/compat.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import abc
import codecs
import json
import warnings
import decimal

Expand Down Expand Up @@ -33,11 +32,11 @@
if WHICH_PYTHON == 2:
from SimpleHTTPServer import SimpleHTTPRequestHandler
from SocketServer import TCPServer
from Queue import PriorityQueue
from Queue import PriorityQueue, Empty as QueueEmpty
else:
from http.server import SimpleHTTPRequestHandler
from socketserver import TCPServer
from queue import PriorityQueue
from queue import PriorityQueue, Empty as QueueEmpty


def to_unicode(s):
Expand Down
64 changes: 54 additions & 10 deletions core/dbt/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
from dbt.compat import basestring, builtins
from dbt.compat import builtins
from dbt.logger import GLOBAL_LOGGER as logger
import dbt.flags
import re


class Exception(builtins.Exception):
pass
CODE = -32000
MESSAGE = "Server Error"

def data(self):
# if overriding, make sure the result is json-serializable.
return {
'type': self.__class__.__name__,
'message': str(self),
}


class MacroReturn(builtins.BaseException):
Expand All @@ -22,10 +29,21 @@ class InternalException(Exception):


class RPCException(Exception):
pass
def __init__(self, message, code, data):
self.CODE = code
self.MESSAGE = message
self._data = data

def data(self):
existing = super(RPCException, self).data()
existing.update(self._data)
return existing


class RuntimeException(RuntimeError, Exception):
CODE = 10001
MESSAGE = "Runtime error"

def __init__(self, msg, node=None):
self.stack = []
self.node = node
Expand Down Expand Up @@ -86,7 +104,28 @@ def __str__(self, prefix="! "):
[" " + line for line in lines[1:]])


class RPCFailureResult(RuntimeException):
CODE = 10002
MESSAGE = "RPC execution error"


class RPCTimeoutException(RuntimeException):
CODE = 10008
MESSAGE = 'RPC timeout error'

def __init__(self, timeout):
self.timeout = timeout

def data(self):
return {
'timeout': self.timeout,
'message': 'RPC timed out after {}s'.format(self.timeout),
}


class DatabaseException(RuntimeException):
CODE = 10003
MESSAGE = "Database Error"

def process_stack(self):
lines = []
Expand All @@ -103,6 +142,9 @@ def type(self):


class CompilationException(RuntimeException):
CODE = 10004
MESSAGE = "Compilation Error"

@property
def type(self):
return 'Compilation'
Expand All @@ -113,7 +155,8 @@ class RecursionException(RuntimeException):


class ValidationException(RuntimeException):
pass
CODE = 10005
MESSAGE = "Validation Error"


class JSONValidationException(ValidationException):
Expand All @@ -134,15 +177,16 @@ class AliasException(ValidationException):
pass


class ParsingException(Exception):
pass


class DependencyException(Exception):
pass
# this can happen due to raise_dependency_error and its callers
CODE = 10006
MESSAGE = "Dependency Error"


class DbtConfigError(RuntimeException):
CODE = 10007
MESSAGE = "DBT Configuration Error"

def __init__(self, message, project=None, result_type='invalid_project'):
self.project = project
super(DbtConfigError, self).__init__(message)
Expand Down
28 changes: 24 additions & 4 deletions core/dbt/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,18 @@ def filter(self, record):
return True


def default_formatter():
return logging.Formatter('%(asctime)-18s (%(threadName)s): %(message)s')


def initialize_logger(debug_mode=False, path=None):
global initialized, logger, stdout_handler

if initialized:
return

if debug_mode:
stdout_handler.setFormatter(
logging.Formatter('%(asctime)-18s (%(threadName)s): %(message)s'))
stdout_handler.setFormatter(default_formatter())
stdout_handler.setLevel(logging.DEBUG)

if path is not None:
Expand All @@ -101,8 +104,7 @@ def initialize_logger(debug_mode=False, path=None):
color_filter = ColorFilter()
logdir_handler.addFilter(color_filter)

logdir_handler.setFormatter(
logging.Formatter('%(asctime)-18s (%(threadName)s): %(message)s'))
logdir_handler.setFormatter(default_formatter())
logdir_handler.setLevel(logging.DEBUG)

logger.addHandler(logdir_handler)
Expand All @@ -126,3 +128,21 @@ def log_cache_events(flag):


GLOBAL_LOGGER = logger


class QueueLogHandler(logging.Handler):
def __init__(self, queue):
super(QueueLogHandler, self).__init__()
self.queue = queue

def emit(self, record):
msg = self.format(record)
self.queue.put_nowait(['log', msg])


def add_queue_handler(queue):
"""Add a queue log handler to the global logger."""
handler = QueueLogHandler(queue)
handler.setFormatter(default_formatter())
handler.setLevel(logging.DEBUG)
GLOBAL_LOGGER.addHandler(handler)
138 changes: 85 additions & 53 deletions core/dbt/node_runners.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from dbt.logger import GLOBAL_LOGGER as logger
from dbt.exceptions import NotImplementedException, CompilationException, \
RuntimeException, InternalException, missing_materialization
RuntimeException, InternalException, missing_materialization, RPCException
from dbt.utils import get_nodes_by_tags
from dbt.node_types import NodeType, RunHookType
from dbt.adapters.factory import get_adapter
Expand All @@ -18,6 +18,7 @@
import dbt.flags
import dbt.schema
import dbt.writer
from dbt import rpc

import six
import sys
Expand Down Expand Up @@ -49,6 +50,15 @@ def track_model_run(index, num_nodes, run_model_result):
})


class ExecutionContext(object):
"""During execution and error handling, dbt makes use of mutable state:
timing information and the newest (compiled vs executed) form of the node.
"""
def __init__(self, node):
self.timing = []
self.node = node


class BaseRunner(object):
def __init__(self, config, adapter, node, node_index, num_nodes):
self.config = config
Expand Down Expand Up @@ -120,67 +130,75 @@ def from_run_result(self, result, start_time, timing_info):
timing_info=timing_info
)

def safe_run(self, manifest):
catchable_errors = (CompilationException, RuntimeException)

# result = self.DefaultResult(self.node)
started = time.time()
timing = []
error = None
node = self.node
def compile_and_execute(self, manifest, ctx):
result = None
with collect_timing_info('compile') as timing_info:
# if we fail here, we still have a compiled node to return
# this has the benefit of showing a build path for the errant
# model
ctx.node = self.compile(manifest)

try:
with collect_timing_info('compile') as timing_info:
# if we fail here, we still have a compiled node to return
# this has the benefit of showing a build path for the errant
# model
node = self.compile(manifest)
ctx.timing.append(timing_info)

timing.append(timing_info)
# for ephemeral nodes, we only want to compile, not run
if not ctx.node.is_ephemeral_model:
with collect_timing_info('execute') as timing_info:
result = self.run(ctx.node, manifest)
ctx.node = result.node

# for ephemeral nodes, we only want to compile, not run
if not node.is_ephemeral_model:
with collect_timing_info('execute') as timing_info:
result = self.run(node, manifest)
node = result.node
ctx.timing.append(timing_info)

timing.append(timing_info)
return result

# result.extend(item.serialize() for item in timing)
def _handle_catchable_exception(self, e, ctx):
if e.node is None:
e.node = ctx.node

except catchable_errors as e:
if e.node is None:
e.node = node
return dbt.compat.to_string(e)

error = dbt.compat.to_string(e)
def _handle_internal_exception(self, e, ctx):
build_path = self.node.build_path
prefix = 'Internal error executing {}'.format(build_path)

except InternalException as e:
build_path = self.node.build_path
prefix = 'Internal error executing {}'.format(build_path)
error = "{prefix}\n{error}\n\n{note}".format(
prefix=dbt.ui.printer.red(prefix),
error=str(e).strip(),
note=INTERNAL_ERROR_STRING)
logger.debug(error)
return dbt.compat.to_string(e)

error = "{prefix}\n{error}\n\n{note}".format(
prefix=dbt.ui.printer.red(prefix),
error=str(e).strip(),
note=INTERNAL_ERROR_STRING)
logger.debug(error)
error = dbt.compat.to_string(e)
def _handle_generic_exception(self, e, ctx):
node_description = self.node.get('build_path')
if node_description is None:
node_description = self.node.unique_id
prefix = "Unhandled error while executing {description}".format(
description=node_description)

except Exception as e:
node_description = self.node.get('build_path')
if node_description is None:
node_description = self.node.unique_id
prefix = "Unhandled error while executing {description}".format(
description=node_description)
error = "{prefix}\n{error}".format(
prefix=dbt.ui.printer.red(prefix),
error=str(e).strip())

error = "{prefix}\n{error}".format(
prefix=dbt.ui.printer.red(prefix),
error=str(e).strip())
logger.error(error)
logger.debug('', exc_info=True)
return dbt.compat.to_string(e)

logger.error(error)
logger.debug('', exc_info=True)
error = dbt.compat.to_string(e)
def safe_run(self, manifest):
catchable_errors = (CompilationException, RuntimeException)

# result = self.DefaultResult(self.node)
started = time.time()
ctx = ExecutionContext(self.node)
error = None
result = None

try:
result = self.compile_and_execute(manifest, ctx)
except catchable_errors as e:
error = self._handle_catchable_exception(e, ctx)
except InternalException as e:
error = self._handle_internal_exception(e, ctx)
except Exception as e:
error = self._handle_generic_exception(e, ctx)
finally:
exc_str = self._safe_release_connection()

Expand All @@ -191,11 +209,11 @@ def safe_run(self, manifest):

if error is not None:
# we could include compile time for runtime errors here
result = self.error_result(node, error, started, [])
result = self.error_result(ctx.node, error, started, [])
elif result is not None:
result = self.from_run_result(result, started, timing)
result = self.from_run_result(result, started, ctx.timing)
else:
result = self.ephemeral_result(node, started, timing)
result = self.ephemeral_result(ctx.node, started, ctx.timing)
return result

def _safe_release_connection(self):
Expand Down Expand Up @@ -507,6 +525,15 @@ def __init__(self, config, adapter, node, node_index, num_nodes):
super(RPCCompileRunner, self).__init__(config, adapter, node,
node_index, num_nodes)

def compile_and_execute(self, manifest, ctx):
superself = super(RPCCompileRunner, self)
try:
return superself.compile_and_execute(manifest, ctx)
except dbt.exceptions.Exception as exc:
# we want to convert any errors we know how to handle before
# safe_run sees them and stringifies them and eats our types
raise RPCException(exc.MESSAGE, exc.CODE, exc.data())

def before_execute(self):
pass

Expand All @@ -523,11 +550,16 @@ def execute(self, compiled_node, manifest):
compiled_sql=compiled_node.injected_sql
)

def _handle_generic_exception(self, e, ctx):
if isinstance(e, RPCException):
raise e
return super(RPCCompileRunner, self)._handle_generic_exception(e, ctx)

def error_result(self, node, error, start_time, timing_info):
raise dbt.exceptions.RPCException(error)
raise RPCException(error)

def ephemeral_result(self, node, start_time, timing_info):
raise dbt.exceptions.NotImplementedException(
raise NotImplementedException(
'cannot execute ephemeral nodes remotely!'
)

Expand Down
Loading

0 comments on commit 84dad1b

Please sign in to comment.