Skip to content

Commit

Permalink
Copy task resource dicts in scheduler.
Browse files Browse the repository at this point in the history
  • Loading branch information
riga committed Mar 3, 2018
1 parent e7967af commit 036b813
Showing 1 changed file with 6 additions and 2 deletions.
8 changes: 6 additions & 2 deletions luigi/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,8 @@ def add_task(self, task_id=None, status=PENDING, runnable=True,
worker_id = worker
worker = self._update_worker(worker_id)

resources = {} if resources is None else resources.copy()

if retry_policy_dict is None:
retry_policy_dict = {}

Expand Down Expand Up @@ -815,7 +817,9 @@ def add_task(self, task_id=None, status=PENDING, runnable=True,
if status == RUNNING and not task.worker_running:
task.worker_running = worker_id
if batch_id:
task.resources_running = self._state.get_batch_running_tasks(batch_id)[0].resources_running
# copy resources_running of the first batch task
batch_tasks = self._state.get_batch_running_tasks(batch_id)
task.resources_running = batch_tasks[0].resources_running.copy()
task.time_running = time.time()

if tracking_url is not None or task.status != RUNNING:
Expand Down Expand Up @@ -1176,7 +1180,7 @@ def get_work(self, host=None, assistant=False, current_tasks=None, worker=None,
elif best_task:
self._state.set_status(best_task, RUNNING, self._config)
best_task.worker_running = worker_id
best_task.resources_running = best_task.resources
best_task.resources_running = best_task.resources.copy()
best_task.time_running = time.time()
self._update_task_history(best_task, RUNNING, host=host)

Expand Down

0 comments on commit 036b813

Please sign in to comment.