Skip to content

Commit

Permalink
Test
Browse files Browse the repository at this point in the history
  • Loading branch information
ulzha committed Jul 2, 2018
1 parent 8fb9488 commit fe52cf3
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 2 deletions.
2 changes: 1 addition & 1 deletion luigi/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ def __init__(self, context, *args, **kwargs):
def run(self):
if self.context:
logger.debug('Instantiating ' + self.context)
module_path, class_name = self.context.rsplit('.')
module_path, class_name = self.context.rsplit('.', 1)
module = importlib.import_module(module_path)
cls = getattr(module, class_name)

Expand Down
1 change: 0 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ def get_static_files(path):
install_requires = [
'tornado>=4.0,<5',
'python-daemon<3.0',
'humanfriendly>=2.1',
]

if os.environ.get('READTHEDOCS', None) == 'True':
Expand Down
70 changes: 70 additions & 0 deletions test/worker_task_process_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# -*- coding: utf-8 -*-
#
# Copyright 2012-2015 Spotify AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from helpers import LuigiTestCase, temporary_unloaded_module
import luigi
from luigi.worker import Worker
import multiprocessing


class ContextManagedTaskProcessTest(LuigiTestCase):

def _test_context_manager(self, force_multiprocessing):
CONTEXT_MANAGER_MODULE = b'''
class MyContextManager(object):
def __init__(self, task_process):
self.task = task_process.task
def __enter__(self):
assert not self.task.run_event.is_set(), "the task should not have run yet"
self.task.enter_event.set()
return self
def __exit__(self, exc_type=None, exc_value=None, traceback=None):
assert self.task.run_event.is_set(), "the task should have run"
self.task.exit_event.set()
'''

class DummyEventRecordingTask(luigi.Task):
def __init__(self, *args, **kwargs):
self.enter_event = multiprocessing.Event()
self.exit_event = multiprocessing.Event()
self.run_event = multiprocessing.Event()
super(DummyEventRecordingTask, self).__init__(*args, **kwargs)

def run(self):
assert self.enter_event.is_set(), "the context manager should have been entered"
assert not self.exit_event.is_set(), "the context manager should not have been exited yet"
assert not self.run_event.is_set(), "the task should have run yet"
self.run_event.set()

def complete(self):
return self.run_event.is_set()

with temporary_unloaded_module(CONTEXT_MANAGER_MODULE) as module_name:
t = DummyEventRecordingTask()
w = Worker(task_process_context=module_name + '.MyContextManager',
force_multiprocessing=force_multiprocessing)
w.add(t)
self.assertTrue(w.run())
self.assertTrue(t.complete())
self.assertTrue(t.enter_event.is_set())
self.assertTrue(t.exit_event.is_set())

def test_context_manager_without_multiprocessing(self):
self._test_context_manager(False)

def test_context_manager_with_multiprocessing(self):
self._test_context_manager(True)

0 comments on commit fe52cf3

Please sign in to comment.