From d5c9765f7020c326c759ee37b3d7106994aeb444 Mon Sep 17 00:00:00 2001 From: Marcel R Date: Mon, 30 Jul 2018 14:16:24 +0200 Subject: [PATCH 1/3] Add test case to check fowarded task attributes. --- test/task_forwarded_attributes_test.py | 76 ++++++++++++++++++++++++++ 1 file changed, 76 insertions(+) create mode 100644 test/task_forwarded_attributes_test.py diff --git a/test/task_forwarded_attributes_test.py b/test/task_forwarded_attributes_test.py new file mode 100644 index 0000000000..7aaef36904 --- /dev/null +++ b/test/task_forwarded_attributes_test.py @@ -0,0 +1,76 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2012-2015 Spotify AB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from helpers import LuigiTestCase, RunOnceTask + +import luigi +import luigi.scheduler +import luigi.worker + + +forwarded_attributes = set(luigi.worker.TaskProcess.forward_reporter_attributes.values()) + + +class NonYieldingTask(RunOnceTask): + + # need to accept messages in order for the "scheduler_message" attribute to be not None + accepts_messages = True + + def gather_forwarded_attributes(self): + attrs = set() + for attr in forwarded_attributes: + if getattr(self, attr, None) is not None: + attrs.add(attr) + return attrs + + def run(self): + self.attributes_while_running = self.gather_forwarded_attributes() + + RunOnceTask.run(self) + + +class YieldingTask(NonYieldingTask): + + def run(self): + self.attributes_before_yield = self.gather_forwarded_attributes() + + yield RunOnceTask() + + self.attributes_after_yield = self.gather_forwarded_attributes() + + RunOnceTask.run(self) + + +class TaskForwardedAttributesTest(LuigiTestCase): + + def run_task(self, task): + sch = luigi.scheduler.Scheduler() + with luigi.worker.Worker(scheduler=sch) as w: + w.add(task) + w.run() + return task + + def test_non_yielding_task(self): + task = self.run_task(NonYieldingTask()) + + self.assertEqual(task.attributes_while_running, forwarded_attributes) + + def test_yielding_task(self): + task = self.run_task(YieldingTask()) + + self.assertEqual(task.attributes_before_yield, forwarded_attributes) + self.assertEqual(task.attributes_after_yield, forwarded_attributes) From 05158a7895bf8f5fc38134c4f2791dea66225a50 Mon Sep 17 00:00:00 2001 From: Marcel R Date: Mon, 30 Jul 2018 14:16:42 +0200 Subject: [PATCH 2/3] Fix attribute forwarding for tasks yielding deps. --- luigi/worker.py | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/luigi/worker.py b/luigi/worker.py index 5c76bbc3de..b118c4e80f 100644 --- a/luigi/worker.py +++ b/luigi/worker.py @@ -37,6 +37,7 @@ import signal import subprocess import sys +import contextlib try: import Queue @@ -135,16 +136,8 @@ def __init__(self, task, worker_id, result_queue, status_reporter, self.check_unfulfilled_deps = check_unfulfilled_deps def _run_get_new_deps(self): - # forward some attributes before running - for reporter_attr, task_attr in six.iteritems(self.forward_reporter_attributes): - setattr(self.task, task_attr, getattr(self.status_reporter, reporter_attr)) - task_gen = self.task.run() - # reset attributes again - for reporter_attr, task_attr in six.iteritems(self.forward_reporter_attributes): - setattr(self.task, task_attr, None) - if not isinstance(task_gen, types.GeneratorType): return None @@ -202,7 +195,8 @@ def run(self): expl = 'Task is an external data dependency ' \ 'and data does not exist (yet?).' else: - new_deps = self._run_get_new_deps() + with self._forward_attributes(): + new_deps = self._run_get_new_deps() status = DONE if not new_deps else PENDING if new_deps: @@ -258,6 +252,20 @@ def terminate(self): except ImportError: return super(TaskProcess, self).terminate() + @contextlib.contextmanager + def _forward_attributes(self): + # forward configured attributes to the task + for reporter_attr, task_attr in six.iteritems(self.forward_reporter_attributes): + setattr(self.task, task_attr, getattr(self.status_reporter, reporter_attr)) + + try: + yield self + + finally: + # reset attributes again + for reporter_attr, task_attr in six.iteritems(self.forward_reporter_attributes): + setattr(self.task, task_attr, None) + # This code and the task_process_context config key currently feels a bit ad-hoc. # Discussion on generalizing it into a plugin system: https://github.com/spotify/luigi/issues/1897 From e70c98d3673506bc8030b06165704b2fec6e25c9 Mon Sep 17 00:00:00 2001 From: Marcel R Date: Tue, 31 Jul 2018 11:23:55 +0200 Subject: [PATCH 3/3] Address review comments on code style and comments. --- luigi/worker.py | 2 -- test/task_forwarded_attributes_test.py | 23 ++++++++++++++++------- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/luigi/worker.py b/luigi/worker.py index b118c4e80f..81aa4f39ff 100644 --- a/luigi/worker.py +++ b/luigi/worker.py @@ -257,10 +257,8 @@ def _forward_attributes(self): # forward configured attributes to the task for reporter_attr, task_attr in six.iteritems(self.forward_reporter_attributes): setattr(self.task, task_attr, getattr(self.status_reporter, reporter_attr)) - try: yield self - finally: # reset attributes again for reporter_attr, task_attr in six.iteritems(self.forward_reporter_attributes): diff --git a/test/task_forwarded_attributes_test.py b/test/task_forwarded_attributes_test.py index 7aaef36904..48ef319136 100644 --- a/test/task_forwarded_attributes_test.py +++ b/test/task_forwarded_attributes_test.py @@ -22,7 +22,7 @@ import luigi.worker -forwarded_attributes = set(luigi.worker.TaskProcess.forward_reporter_attributes.values()) +FORWARDED_ATTRIBUTES = set(luigi.worker.TaskProcess.forward_reporter_attributes.values()) class NonYieldingTask(RunOnceTask): @@ -31,27 +31,36 @@ class NonYieldingTask(RunOnceTask): accepts_messages = True def gather_forwarded_attributes(self): + """ + Returns a set of names of attributes that are forwarded by the TaskProcess and that are not + *None*. The tests in this file check if and which attributes are present at different times, + e.g. while running, or before and after a dynamic dependency was yielded. + """ attrs = set() - for attr in forwarded_attributes: + for attr in FORWARDED_ATTRIBUTES: if getattr(self, attr, None) is not None: attrs.add(attr) return attrs def run(self): + # store names of forwarded attributes which are only available within the run method self.attributes_while_running = self.gather_forwarded_attributes() + # invoke the run method of the RunOnceTask which marks this task as complete RunOnceTask.run(self) class YieldingTask(NonYieldingTask): def run(self): + # as TaskProcess._run_get_new_deps handles generators in a specific way, store names of + # forwarded attributes before and after yielding a dynamic dependency, so we can explicitely + # validate the attribute forwarding implementation self.attributes_before_yield = self.gather_forwarded_attributes() - yield RunOnceTask() - self.attributes_after_yield = self.gather_forwarded_attributes() + # invoke the run method of the RunOnceTask which marks this task as complete RunOnceTask.run(self) @@ -67,10 +76,10 @@ def run_task(self, task): def test_non_yielding_task(self): task = self.run_task(NonYieldingTask()) - self.assertEqual(task.attributes_while_running, forwarded_attributes) + self.assertEqual(task.attributes_while_running, FORWARDED_ATTRIBUTES) def test_yielding_task(self): task = self.run_task(YieldingTask()) - self.assertEqual(task.attributes_before_yield, forwarded_attributes) - self.assertEqual(task.attributes_after_yield, forwarded_attributes) + self.assertEqual(task.attributes_before_yield, FORWARDED_ATTRIBUTES) + self.assertEqual(task.attributes_after_yield, FORWARDED_ATTRIBUTES)