diff --git a/luigi/scheduler.py b/luigi/scheduler.py index a94bd22d83..19a2c0c918 100644 --- a/luigi/scheduler.py +++ b/luigi/scheduler.py @@ -832,9 +832,12 @@ def add_task(self, task_id=None, status=PENDING, runnable=True, for batch_task in self._state.get_batch_running_tasks(task.batch_id): batch_task.expl = expl - if not (task.status in (RUNNING, BATCH_RUNNING) and (status not in (DONE, FAILED, RUNNING) or task.worker_running != worker_id)) or new_deps: + task_is_not_running = task.status not in (RUNNING, BATCH_RUNNING) + task_started_a_run = status in (DONE, FAILED, RUNNING) + running_on_this_worker = task.worker_running == worker_id + if task_is_not_running or (task_started_a_run and running_on_this_worker) or new_deps: # don't allow re-scheduling of task while it is running, it must either fail or succeed on the worker actually running it - if status == PENDING or status != task.status: + if status != task.status or status == PENDING: # Update the DB only if there was a acctual change, to prevent noise. # We also check for status == PENDING b/c that's the default value # (so checking for status != task.status woule lie) diff --git a/luigi/worker.py b/luigi/worker.py index cd40713c0a..20074a266e 100644 --- a/luigi/worker.py +++ b/luigi/worker.py @@ -488,8 +488,7 @@ def _add_task(self, *args, **kwargs): runnable = kwargs['runnable'] task = self._scheduled_tasks.get(task_id) if task: - msg = (task, status, runnable) - self._add_task_history.append(msg) + self._add_task_history.append((task, status, runnable)) kwargs['owners'] = task._owner_list() if task_id in self._batch_running_tasks: