Skip to content

Commit

Permalink
PR feedback: QueueMessageType class, remove extra assignments
Browse files Browse the repository at this point in the history
  • Loading branch information
Jacob Beck committed Mar 8, 2019
1 parent 3f948ae commit 7e18128
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 8 deletions.
13 changes: 13 additions & 0 deletions core/dbt/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,16 @@ def dbt_error(exc, logs=None):
exc = RPCException(code=exc.CODE, message=exc.MESSAGE, data=exc.data(),
logs=logs)
return exc


class QueueMessageType(object):
Error = 'error'
Result = 'result'
Log = 'log'

@classmethod
def terminating(cls):
return [
cls.Error,
cls.Result
]
17 changes: 9 additions & 8 deletions core/dbt/task/rpc_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,14 @@ def _wait_for_results(self):
except QueueEmpty:
raise dbt.exceptions.RPCTimeoutException(self.timeout)

if msgtype == 'log':
if msgtype == rpc.QueueMessageType.Log:
self.logs.append(value)
else:
elif msgtype in rpc.QueueMessageType.terminating():
return msgtype, value
else:
raise dbt.exceptions.InternalException(
'Got invalid queue message type {}'.format(msgtype)
)

def _join_process(self):
try:
Expand All @@ -64,10 +68,7 @@ def _join_process(self):
finally:
self.process.join()

self.process = None
self.queue = None

if msgtype == 'error':
if msgtype == rpc.QueueMessageType.Error:
raise rpc.RPCException.from_error(result)

return result
Expand Down Expand Up @@ -101,9 +102,9 @@ def task_bootstrap(self, kwargs):

# put whatever result we got onto the queue as well.
if error is not None:
self.queue.put(['error', error.error])
self.queue.put([rpc.QueueMessageType.Error, error.error])
else:
self.queue.put(['result', result])
self.queue.put([rpc.QueueMessageType.Result, result])

def handle(self, kwargs):
self.started = time.time()
Expand Down

0 comments on commit 7e18128

Please sign in to comment.