Skip to content

Commit

Permalink
Error handling improvements
Browse files Browse the repository at this point in the history
All dbt errors now have proper error codes/messages
The raised message at runtime ends up in result.error.data.message
The raised message type at runtime ends up in result.error.data.typename
result.error.message is a plaintext name for result.error.code
dbt.exceptions.Exception.data() becomes result.error.data
  • Loading branch information
Jacob Beck committed Mar 7, 2019
1 parent 2ad1166 commit b397868
Show file tree
Hide file tree
Showing 7 changed files with 215 additions and 102 deletions.
50 changes: 40 additions & 10 deletions core/dbt/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
from dbt.compat import basestring, builtins
from dbt.compat import builtins
from dbt.logger import GLOBAL_LOGGER as logger
import dbt.flags
import re


class Exception(builtins.Exception):
pass
CODE = -32000
MESSAGE = "Server Error"

def data(self):
# if overriding, make sure the result is json-serializable.
return {
'type': self.__class__.__name__,
'message': str(self),
}


class MacroReturn(builtins.BaseException):
Expand All @@ -22,10 +29,21 @@ class InternalException(Exception):


class RPCException(Exception):
pass
def __init__(self, message, code, data):
self.CODE = code
self.MESSAGE = message
self._data = data

def data(self):
existing = super(RPCException, self).data()
existing.update(self._data)
return existing


class RuntimeException(RuntimeError, Exception):
CODE = 10001
MESSAGE = "Runtime error"

def __init__(self, msg, node=None):
self.stack = []
self.node = node
Expand Down Expand Up @@ -86,7 +104,14 @@ def __str__(self, prefix="! "):
[" " + line for line in lines[1:]])


class RPCFailureResult(RuntimeException):
CODE = 10002
MESSAGE = "RPC execution error"


class DatabaseException(RuntimeException):
CODE = 10003
MESSAGE = "Database Error"

def process_stack(self):
lines = []
Expand All @@ -103,6 +128,9 @@ def type(self):


class CompilationException(RuntimeException):
CODE = 10004
MESSAGE = "Compilation Error"

@property
def type(self):
return 'Compilation'
Expand All @@ -113,7 +141,8 @@ class RecursionException(RuntimeException):


class ValidationException(RuntimeException):
pass
CODE = 10005
MESSAGE = "Validation Error"


class JSONValidationException(ValidationException):
Expand All @@ -134,15 +163,16 @@ class AliasException(ValidationException):
pass


class ParsingException(Exception):
pass


class DependencyException(Exception):
pass
# this can happen due to raise_dependency_error and its callers
CODE = 10006
MESSAGE = "Dependency Error"


class DbtConfigError(RuntimeException):
CODE = 10007
MESSAGE = "DBT Configuration Error"

def __init__(self, message, project=None, result_type='invalid_project'):
self.project = project
super(DbtConfigError, self).__init__(message)
Expand Down
138 changes: 85 additions & 53 deletions core/dbt/node_runners.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from dbt.logger import GLOBAL_LOGGER as logger
from dbt.exceptions import NotImplementedException, CompilationException, \
RuntimeException, InternalException, missing_materialization
RuntimeException, InternalException, missing_materialization, RPCException
from dbt.utils import get_nodes_by_tags
from dbt.node_types import NodeType, RunHookType
from dbt.adapters.factory import get_adapter
Expand All @@ -18,6 +18,7 @@
import dbt.flags
import dbt.schema
import dbt.writer
from dbt import rpc

import six
import sys
Expand Down Expand Up @@ -49,6 +50,15 @@ def track_model_run(index, num_nodes, run_model_result):
})


class ExecutionContext(object):
"""During execution and error handling, dbt makes use of mutable state:
timing information and the newest (compiled vs executed) form of the node.
"""
def __init__(self, node):
self.timing = []
self.node = node


class BaseRunner(object):
def __init__(self, config, adapter, node, node_index, num_nodes):
self.config = config
Expand Down Expand Up @@ -120,67 +130,75 @@ def from_run_result(self, result, start_time, timing_info):
timing_info=timing_info
)

def safe_run(self, manifest):
catchable_errors = (CompilationException, RuntimeException)

# result = self.DefaultResult(self.node)
started = time.time()
timing = []
error = None
node = self.node
def compile_and_execute(self, manifest, ctx):
result = None
with collect_timing_info('compile') as timing_info:
# if we fail here, we still have a compiled node to return
# this has the benefit of showing a build path for the errant
# model
ctx.node = self.compile(manifest)

try:
with collect_timing_info('compile') as timing_info:
# if we fail here, we still have a compiled node to return
# this has the benefit of showing a build path for the errant
# model
node = self.compile(manifest)
ctx.timing.append(timing_info)

timing.append(timing_info)
# for ephemeral nodes, we only want to compile, not run
if not ctx.node.is_ephemeral_model:
with collect_timing_info('execute') as timing_info:
result = self.run(ctx.node, manifest)
ctx.node = result.node

# for ephemeral nodes, we only want to compile, not run
if not node.is_ephemeral_model:
with collect_timing_info('execute') as timing_info:
result = self.run(node, manifest)
node = result.node
ctx.timing.append(timing_info)

timing.append(timing_info)
return result

# result.extend(item.serialize() for item in timing)
def _handle_catchable_exception(self, e, ctx):
if e.node is None:
e.node = ctx.node

except catchable_errors as e:
if e.node is None:
e.node = node
return dbt.compat.to_string(e)

error = dbt.compat.to_string(e)
def _handle_internal_exception(self, e, ctx):
build_path = self.node.build_path
prefix = 'Internal error executing {}'.format(build_path)

except InternalException as e:
build_path = self.node.build_path
prefix = 'Internal error executing {}'.format(build_path)
error = "{prefix}\n{error}\n\n{note}".format(
prefix=dbt.ui.printer.red(prefix),
error=str(e).strip(),
note=INTERNAL_ERROR_STRING)
logger.debug(error)
return dbt.compat.to_string(e)

error = "{prefix}\n{error}\n\n{note}".format(
prefix=dbt.ui.printer.red(prefix),
error=str(e).strip(),
note=INTERNAL_ERROR_STRING)
logger.debug(error)
error = dbt.compat.to_string(e)
def _handle_generic_exception(self, e, ctx):
node_description = self.node.get('build_path')
if node_description is None:
node_description = self.node.unique_id
prefix = "Unhandled error while executing {description}".format(
description=node_description)

except Exception as e:
node_description = self.node.get('build_path')
if node_description is None:
node_description = self.node.unique_id
prefix = "Unhandled error while executing {description}".format(
description=node_description)
error = "{prefix}\n{error}".format(
prefix=dbt.ui.printer.red(prefix),
error=str(e).strip())

error = "{prefix}\n{error}".format(
prefix=dbt.ui.printer.red(prefix),
error=str(e).strip())
logger.error(error)
logger.debug('', exc_info=True)
return dbt.compat.to_string(e)

logger.error(error)
logger.debug('', exc_info=True)
error = dbt.compat.to_string(e)
def safe_run(self, manifest):
catchable_errors = (CompilationException, RuntimeException)

# result = self.DefaultResult(self.node)
started = time.time()
ctx = ExecutionContext(self.node)
error = None
result = None

try:
result = self.compile_and_execute(manifest, ctx)
except catchable_errors as e:
error = self._handle_catchable_exception(e, ctx)
except InternalException as e:
error = self._handle_internal_exception(e, ctx)
except Exception as e:
error = self._handle_generic_exception(e, ctx)
finally:
exc_str = self._safe_release_connection()

Expand All @@ -191,11 +209,11 @@ def safe_run(self, manifest):

if error is not None:
# we could include compile time for runtime errors here
result = self.error_result(node, error, started, [])
result = self.error_result(ctx.node, error, started, [])
elif result is not None:
result = self.from_run_result(result, started, timing)
result = self.from_run_result(result, started, ctx.timing)
else:
result = self.ephemeral_result(node, started, timing)
result = self.ephemeral_result(ctx.node, started, ctx.timing)
return result

def _safe_release_connection(self):
Expand Down Expand Up @@ -507,6 +525,15 @@ def __init__(self, config, adapter, node, node_index, num_nodes):
super(RPCCompileRunner, self).__init__(config, adapter, node,
node_index, num_nodes)

def compile_and_execute(self, manifest, ctx):
superself = super(RPCCompileRunner, self)
try:
return superself.compile_and_execute(manifest, ctx)
except dbt.exceptions.Exception as exc:
# we want to convert any errors we know how to handle before
# safe_run sees them and stringifies them and eats our types
raise RPCException(exc.MESSAGE, exc.CODE, exc.data())

def before_execute(self):
pass

Expand All @@ -523,11 +550,16 @@ def execute(self, compiled_node, manifest):
compiled_sql=compiled_node.injected_sql
)

def _handle_generic_exception(self, e, ctx):
if isinstance(e, RPCException):
raise e
return super(RPCCompileRunner, self)._handle_generic_exception(e, ctx)

def error_result(self, node, error, start_time, timing_info):
raise dbt.exceptions.RPCException(error)
raise RPCException(error)

def ephemeral_result(self, node, start_time, timing_info):
raise dbt.exceptions.NotImplementedException(
raise NotImplementedException(
'cannot execute ephemeral nodes remotely!'
)

Expand Down
32 changes: 32 additions & 0 deletions core/dbt/rpc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from jsonrpc.exceptions import JSONRPCError, JSONRPCServerError

import dbt.exceptions


class RPCError(JSONRPCError):
def __init__(self, exc):
super(RPCError, self).__init__(exc.CODE, exc.MESSAGE, data=exc.data())


def server_error(exc):
return JSONRPCServerError(data={'message': str(exc)})


def timeout_error(timeout_value):
return JSONRPCError(
code=10008,
message='RPC timeout error',
data={
'timeout': timeout_value,
'message': 'RPC timed out after {}s'.format(timeout_value),
}
)


def handle_error(exc):
if exc is None:
return None
if isinstance(exc, dbt.exceptions.Exception):
return RPCError(exc)
else:
return server_error(exc)
1 change: 1 addition & 0 deletions core/dbt/task/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from dbt import flags
from dbt import tracking
from dbt.logger import GLOBAL_LOGGER as logger
from dbt.compat import to_string
import dbt.exceptions


Expand Down
24 changes: 19 additions & 5 deletions core/dbt/task/rpc_server.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json
import os

from jsonrpc import Dispatcher, JSONRPCResponseManager
from jsonrpc.exceptions import JSONRPCError

from werkzeug.wrappers import Request, Response
from werkzeug.serving import run_simple
Expand All @@ -13,6 +13,23 @@
from dbt.utils import JSONEncoder


class RPCManager(JSONRPCResponseManager):
# the base class doesn't let us create/return our own errors, as we'd like
# rework the `_get_responses()` method to allow that

@classmethod
def _get_responses(cls, requests, dispatcher):
print('getting responses')
parent = super(RPCManager, cls)._get_responses(requests, dispatcher)
for response in parent:
if isinstance(response.result, JSONRPCError):
err = response.result
del response._data['result']
response.error = err._data

yield response


class RPCServerTask(ConfiguredTask):
def __init__(self, args, config, tasks=None):
super(RPCServerTask, self).__init__(args, config)
Expand Down Expand Up @@ -60,10 +77,7 @@ def run(self):
def handle_request(self, request):
msg = 'Received request ({0}) from {0.remote_addr}, data={0.data}'
logger.info(msg.format(request))
# request_data is the request as a parsedjson object
response = JSONRPCResponseManager.handle(
request.data, self.dispatcher
)
response = RPCManager.handle(request.data, self.dispatcher)
json_data = json.dumps(response.data, cls=JSONEncoder)
response = Response(json_data, mimetype='application/json')
# this looks and feels dumb, but our json encoder converts decimals and
Expand Down
Loading

0 comments on commit b397868

Please sign in to comment.