Skip to content

Commit

Permalink
Fix error message naming / docs. Set task.expl earlier.
Browse files Browse the repository at this point in the history
  • Loading branch information
dhruvg committed Sep 3, 2015
1 parent f41397a commit f526751
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 12 deletions.
6 changes: 3 additions & 3 deletions luigi/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,9 @@ def add_task(self, task_id=None, status=PENDING, runnable=True,
if task.remove is not None:
task.remove = None # unmark task for removal so it isn't removed after being added

if expl is not None:
task.expl = expl

if not (task.status == RUNNING and status == PENDING):
# don't allow re-scheduling of task while it is running, it must either fail or succeed first
if status == PENDING or status != task.status:
Expand Down Expand Up @@ -649,9 +652,6 @@ def add_task(self, task_id=None, status=PENDING, runnable=True,
self._state.get_worker(worker_id).tasks.add(task)
task.runnable = runnable

if expl is not None:
task.expl = expl

def add_worker(self, worker, info, **kwargs):
self._state.get_worker(worker).add_info(info)

Expand Down
2 changes: 1 addition & 1 deletion luigi/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ def on_failure(self, exception):
Override for custom error handling.
This method gets called if an exception is raised in :py:meth:`run`.
Return value of this method is json encoded and sent to the scheduler
The returned value of this method is json encoded and sent to the scheduler
as the `expl` argument. Its string representation will be used as the
body of the error email sent out if any.
Expand Down
16 changes: 9 additions & 7 deletions luigi/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def run(self):
random.seed((os.getpid(), time.time()))

status = FAILED
error_message = ''
expl = ''
missing = []
new_deps = []
try:
Expand Down Expand Up @@ -146,7 +146,7 @@ def run(self):
elif status == DONE:
self.task.trigger_event(
Event.PROCESSING_TIME, self.task, time.time() - t0)
error_message = json.dumps(self.task.on_success())
expl = json.dumps(self.task.on_success())
logger.info('[pid %s] Worker %s done %s', os.getpid(),
self.worker_id, self.task.task_id)
self.task.trigger_event(Event.SUCCESS, self.task)
Expand All @@ -159,13 +159,15 @@ def run(self):
self.task.trigger_event(Event.FAILURE, self.task, ex)
subject = "Luigi: %s FAILED" % self.task

error_message = notifications.wrap_traceback(self.task.on_failure(ex))
raw_error_message = self.task.on_failure(ex)
notification_error_message = notifications.wrap_traceback(raw_error_message)
expl = json.dumps(raw_error_message)

This comment has been minimized.

Copy link
@stephenpascoe

stephenpascoe Sep 15, 2015

json(dumps(raw_error_message) causes new lines in tracebacks to be escaped. Is there a reason why we call json.dumps() here? I believe raw_error_message is a string. See PR #1086.

This comment has been minimized.

Copy link
@dhruvg

dhruvg Sep 17, 2015

Author Contributor

That is what the comment on the on_failure method has been communicating for a while. Also, there are cases when on_failure and on_success may not necessarily return strings, but dict or list instead. This should not effect the type of the expl attribute of a task, which should always be a string.

To get back the original unescaped new lines, you can do json.loads(your_string).

This comment has been minimized.

Copy link
@Tarrasch

Tarrasch Sep 17, 2015

Contributor

In retrospect, I'm a bit worried that this changes the api for add_task. Old luigi clients will communicate something the luigi server won't like.

This comment has been minimized.

Copy link
@stephenpascoe

stephenpascoe Sep 17, 2015

I see this does bring the code in line with the on_failure docs. However, if we json encode shouldn't we decode again in scheduler.py (https://github.com/spotify/luigi/blob/master/luigi/scheduler.py#L997)?

If we deal with this consumer-side we will have to wrap any json.loads(your_string) in try/except to support old clients.

This comment has been minimized.

Copy link
@stephenpascoe

stephenpascoe Sep 17, 2015

#1086 will now handle json encoded or non-encoded error messages.

formatted_error_message = notifications.format_task_error(subject, self.task,
formatted_exception=error_message)
formatted_exception=notification_error_message)
notifications.send_error_email(subject, formatted_error_message, self.task.owner_email)
finally:
self.result_queue.put(
(self.task.task_id, status, error_message, missing, new_deps))
(self.task.task_id, status, expl, missing, new_deps))


class SingleProcessPool(object):
Expand Down Expand Up @@ -635,7 +637,7 @@ def _handle_next_task(self):
self._purge_children() # Deal with subprocess failures

try:
task_id, status, error_message, missing, new_requirements = (
task_id, status, expl, missing, new_requirements = (
self._task_result_queue.get(
timeout=float(self._config.wait_interval)))
except Queue.Empty:
Expand All @@ -657,7 +659,7 @@ def _handle_next_task(self):
self._add_task(worker=self._id,
task_id=task_id,
status=status,
expl=error_message,
expl=expl,
resources=task.process_resources(),
runnable=None,
params=task.to_str_params(),
Expand Down
43 changes: 42 additions & 1 deletion test/worker_task_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import multiprocessing

from helpers import unittest
import mock
import json

import luigi
import luigi.date_interval
import luigi.notifications
from luigi.worker import TaskException
from luigi.worker import TaskException, TaskProcess
from luigi.scheduler import DONE, FAILED

luigi.notifications.DEBUG = True

Expand All @@ -33,6 +37,21 @@ def __init__(self):
pass


class SuccessTask(luigi.Task):

def on_success(self):
return "test success expl"


class FailTask(luigi.Task):

def run(self):
raise BaseException("Uh oh.")

def on_failure(self, exception):
return "test failure expl"


class WorkerTaskTest(unittest.TestCase):

def test_constructor(self):
Expand All @@ -45,5 +64,27 @@ def f():
luigi.build([None], local_scheduler=True)
self.assertRaises(TaskException, f)


class TaskProcessTest(unittest.TestCase):

def test_update_result_queue_on_success(self):
task = SuccessTask()
result_queue = multiprocessing.Queue()
task_process = TaskProcess(task, 1, result_queue)

with mock.patch.object(result_queue, 'put') as mock_put:
task_process.run()
mock_put.assert_called_once_with((task.task_id, DONE, json.dumps("test success expl"), [], None))

def test_update_result_queue_on_failure(self):
task = FailTask()
result_queue = multiprocessing.Queue()
task_process = TaskProcess(task, 1, result_queue)

with mock.patch.object(result_queue, 'put') as mock_put:
task_process.run()
mock_put.assert_called_once_with((task.task_id, FAILED, json.dumps("test failure expl"), [], []))


if __name__ == '__main__':
unittest.main()

0 comments on commit f526751

Please sign in to comment.