From 2e21663ec6051a9dfedb29bcab2cd458c7c287ec Mon Sep 17 00:00:00 2001 From: Dave Buchfuhrer Date: Thu, 11 Dec 2014 11:25:50 -0800 Subject: [PATCH 001/169] Times out long-running workers I woke up this morning to find that very few jobs had run overnight because something weird happened with our hadoop cluster that caused all of our jobs to hang permanently. When I checked in the morning, all workers were busy running jobs that had been going for about 10 hours, and thousands of pending jobs that would normally have been run overnight had piled up. After killing and restarting all of my workers, everything started running fine again because whatever weird issue had caused everything to hang was over. In order to automate this fix, I've added a worker_timeout property to tasks that tells the worker how long to attempt to run the task before killing it. This property defaults to 0, meaning do not time tasks out. It can be changed individually for a task by override or globally for all non-overridden tasks via client.cfg. This only applies when running with more than 1 worker process, as we need to run the task in a separate process in order to be able to kill it after the timeout. --- doc/configuration.rst | 9 +++++++++ luigi/task.py | 4 ++++ luigi/worker.py | 27 +++++++++++++++++++------ test/worker_test.py | 46 +++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 80 insertions(+), 6 deletions(-) diff --git a/doc/configuration.rst b/doc/configuration.rst index 2e7e473a35..cbb15fbb31 100644 --- a/doc/configuration.rst +++ b/doc/configuration.rst @@ -126,6 +126,15 @@ worker-ping-interval Number of seconds to wait between pinging scheduler to let it know that the worker is still alive. Defaults to 1.0. +worker-timeout + .. versionadded:: 1.0.20 + Number of seconds after which to kill a task which has been running + for too long. This provides a default value for all tasks, which can + be overridden by setting the worker-timeout property in any task. This + only works when using multiple workers, as the timeout is implemented + by killing worker subprocesses. Default value is 0, meaning no + timeout. + worker-wait-interval Number of seconds for the worker to wait before asking the scheduler for another job after the scheduler has said that it does not have any diff --git a/luigi/task.py b/luigi/task.py index 25d613bc8d..2b5a76e2c8 100644 --- a/luigi/task.py +++ b/luigi/task.py @@ -258,6 +258,10 @@ class MyTask(luigi.Task): # task requires 1 unit of the scp resource. resources = {} + # Number of seconds after which to time out the run function. No timeout if set to 0. Defaults + # to 0 or value in client.cfg + worker_timeout = None + @classmethod def event_handler(cls, event): """ Decorator for adding event handlers """ diff --git a/luigi/worker.py b/luigi/worker.py index 74678bc907..00e76f6ff2 100644 --- a/luigi/worker.py +++ b/luigi/worker.py @@ -53,12 +53,15 @@ class TaskProcess(multiprocessing.Process): ''' Wrap all task execution in this class. Mainly for convenience since this is run in a separate process. ''' - def __init__(self, task, worker_id, result_queue, random_seed=False): + def __init__(self, task, worker_id, result_queue, random_seed=False, worker_timeout=0): super(TaskProcess, self).__init__() self.task = task self.worker_id = worker_id self.result_queue = result_queue self.random_seed = random_seed + if task.worker_timeout is not None: + worker_timeout = task.worker_timeout + self.timeout_time = time.time() + worker_timeout if worker_timeout else None def run(self): logger.info('[pid %s] Worker %s running %s', os.getpid(), self.worker_id, self.task.task_id) @@ -181,7 +184,8 @@ class Worker(object): def __init__(self, scheduler=CentralPlannerScheduler(), worker_id=None, worker_processes=1, ping_interval=None, keep_alive=None, - wait_interval=None, max_reschedules=None, count_uniques=None): + wait_interval=None, max_reschedules=None, count_uniques=None, + worker_timeout=None): self.worker_processes = int(worker_processes) self._worker_info = self._generate_worker_info() @@ -211,6 +215,10 @@ def __init__(self, scheduler=CentralPlannerScheduler(), worker_id=None, max_reschedules = config.getint('core', 'max-reschedules', 1) self.__max_reschedules = max_reschedules + if worker_timeout is None: + worker_timeout = configuration.get_config().getint('core', 'worker-timeout', 0) + self.__worker_timeout = worker_timeout + self._id = worker_id self._scheduler = scheduler @@ -471,7 +479,8 @@ def _get_work(self): def _run_task(self, task_id): task = self._scheduled_tasks[task_id] p = TaskProcess(task, self._id, self._task_result_queue, - random_seed=bool(self.worker_processes > 1)) + random_seed=bool(self.worker_processes > 1), + worker_timeout=self.__worker_timeout) self._running_tasks[task_id] = p if self.worker_processes > 1: @@ -489,9 +498,15 @@ def _purge_children(self): for task_id, p in self._running_tasks.iteritems(): if not p.is_alive() and p.exitcode: error_msg = 'Worker task %s died unexpectedly with exit code %s' % (task_id, p.exitcode) - logger.info(error_msg) - self._task_result_queue.put( - (task_id, FAILED, error_msg, [], [])) + elif p.timeout_time is not None and time.time() > p.timeout_time and p.is_alive(): + p.terminate() + error_msg = 'Worker task %s timed out and was terminated.' % task_id + else: + continue + + logger.info(error_msg) + self._task_result_queue.put((task_id, FAILED, error_msg, [], [])) + def _handle_next_task(self): ''' We have to catch three ways a task can be "done" diff --git a/test/worker_test.py b/test/worker_test.py index c0dc920773..51f59fc4ba 100644 --- a/test/worker_test.py +++ b/test/worker_test.py @@ -12,6 +12,7 @@ # License for the specific language governing permissions and limitations under # the License. +import mock import shutil import time from luigi.scheduler import CentralPlannerScheduler @@ -650,6 +651,17 @@ def run(self): os.kill(os.getpid(), self.signal) +class HungWorker(luigi.Task): + worker_timeout = luigi.IntParameter(default=None) + + def run(self): + while True: + pass + + def complete(self): + return False + + class MultipleWorkersTest(unittest.TestCase): def test_multiple_workers(self): # Test using multiple workers @@ -692,5 +704,39 @@ def test_purge_multiple_workers(self): w._handle_next_task() w._handle_next_task() + def test_time_out_hung_worker(self): + luigi.build([HungWorker(0.1)], workers=2, local_scheduler=True) + + @mock.patch('luigi.worker.time') + def test_purge_hung_worker_default_timeout_time(self, mock_time): + w = Worker(worker_processes=2, wait_interval=0.01, worker_timeout=5) + mock_time.time.return_value = 0 + w.add(HungWorker()) + w._run_task('HungWorker(worker_timeout=None)') + + mock_time.time.return_value = 5 + w._handle_next_task() + self.assertEqual(1, len(w._running_tasks)) + + mock_time.time.return_value = 6 + w._handle_next_task() + self.assertEqual(0, len(w._running_tasks)) + + @mock.patch('luigi.worker.time') + def test_purge_hung_worker_override_timeout_time(self, mock_time): + w = Worker(worker_processes=2, wait_interval=0.01, worker_timeout=5) + mock_time.time.return_value = 0 + w.add(HungWorker(10)) + w._run_task('HungWorker(worker_timeout=10)') + + mock_time.time.return_value = 10 + w._handle_next_task() + self.assertEqual(1, len(w._running_tasks)) + + mock_time.time.return_value = 11 + w._handle_next_task() + self.assertEqual(0, len(w._running_tasks)) + + if __name__ == '__main__': unittest.main() From 9f76b3135f331d76c5c4858917d7a9687fed3a3d Mon Sep 17 00:00:00 2001 From: Sriram Malladi Date: Fri, 19 Dec 2014 02:34:59 +0000 Subject: [PATCH 002/169] [scalding] allow single jobs in requires as opposed to expecting lists always also cleared up the docs a bit --- luigi/scalding.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/luigi/scalding.py b/luigi/scalding.py index ebb22aa6ce..a23a6bfa14 100644 --- a/luigi/scalding.py +++ b/luigi/scalding.py @@ -4,6 +4,7 @@ import subprocess from luigi import LocalTarget +from luigi.task import flatten import configuration import hadoop import hadoop_jar @@ -205,8 +206,9 @@ class ScaldingJobTask(hadoop.BaseHadoopJobTask): method requires() should return a dictionary where the keys are Scalding argument - names and values are lists of paths. For example: - {'input1': ['A', 'B'], 'input2': ['C']} => --input1 A B --input2 C + names and values are sub tasks or lists of subtasks. For example: + {'input1': A, 'input2': C} => --input1 --input2 + {'input1': [A, B], 'input2': [C]} => --input1 --input2 """ def relpath(self, current_file, rel_path): @@ -255,7 +257,7 @@ def args(self): arglist = [] for k, v in self.requires_hadoop().iteritems(): arglist.append('--' + k) - arglist.extend([t.output().path for t in v]) + arglist.extend([t.output().path for t in flatten(v)]) arglist.extend(['--output', self.output()]) arglist.extend(self.job_args()) return arglist From 9330bdeb289f14bcbb851efa4a5fbdd0c42f2eae Mon Sep 17 00:00:00 2001 From: Dave Buchfuhrer Date: Fri, 26 Dec 2014 12:01:32 -0800 Subject: [PATCH 003/169] Limit number of shown tasks in visualiser I've noticed that my visualiser has become unusable recently when it has too many done tasks scheduled. The tab freezes up for a long time while processing all the tasks, and then is very sluggish afterward. When I have this many done tasks, it's too much to really browse through anyway, so this limits the information shown to just the count. This trades off available information with usability of the tool. The maximum number of shown tasks is controlled by the scheduler and can be adjusted in client.cfg. It may be better to make this part of the query so that it could potentially be controlled by the end user. We may also in the future want to add additional server queries to update larger sections for searches, as this removes useful search information. --- doc/configuration.rst | 7 +++++++ luigi/scheduler.py | 15 ++++++++++---- luigi/server.py | 4 +++- luigi/static/visualiser/index.html | 3 +++ luigi/static/visualiser/js/visualiserApp.js | 14 ++++++++++--- test/central_planner_test.py | 22 +++++++++++++++++++++ 6 files changed, 57 insertions(+), 8 deletions(-) diff --git a/doc/configuration.rst b/doc/configuration.rst index cbb15fbb31..9eac4968f9 100644 --- a/doc/configuration.rst +++ b/doc/configuration.rst @@ -71,6 +71,13 @@ max-reschedules reschedule a job if it is found to not be done when attempting to run a dependent job. This defaults to 1. +max-shown-tasks + .. versionadded:: 1.0.20 + The maximum number of tasks returned in a task_list api call. This + will restrict the number of tasks shown in any section in the + visualiser. Small values can alleviate frozen browsers when there are + too many done tasks. This defaults to 100000 (one hundred thousand). + no_configure_logging If true, logging is not configured. Defaults to false. diff --git a/luigi/scheduler.py b/luigi/scheduler.py index bfefc61ded..50f0370143 100644 --- a/luigi/scheduler.py +++ b/luigi/scheduler.py @@ -59,7 +59,9 @@ class Scheduler(object): # We're passing around this config a lot, so let's put it on an object SchedulerConfig = collections.namedtuple('SchedulerConfig', [ 'retry_delay', 'remove_delay', 'worker_disconnect_delay', - 'disable_failures', 'disable_window', 'disable_persist', 'disable_time']) + 'disable_failures', 'disable_window', 'disable_persist', 'disable_time', + 'max_shown_tasks', +]) def fix_time(x): @@ -342,7 +344,8 @@ class CentralPlannerScheduler(Scheduler): def __init__(self, retry_delay=900.0, remove_delay=600.0, worker_disconnect_delay=60.0, state_path='/var/lib/luigi-server/state.pickle', task_history=None, - resources=None, disable_persist=0, disable_window=0, disable_failures=None): + resources=None, disable_persist=0, disable_window=0, disable_failures=None, + max_shown_tasks=100000): ''' (all arguments are in seconds) Keyword Arguments: @@ -358,7 +361,9 @@ def __init__(self, retry_delay=900.0, remove_delay=600.0, worker_disconnect_dela disable_failures=disable_failures, disable_window=disable_window, disable_persist=disable_persist, - disable_time=disable_persist) + disable_time=disable_persist, + max_shown_tasks=max_shown_tasks, + ) self._task_history = task_history or history.NopHistory() self._state = SimpleTaskState(state_path) @@ -685,7 +690,7 @@ def dep_graph(self, task_id): self._recurse_deps(task_id, serialized) return serialized - def task_list(self, status, upstream_status): + def task_list(self, status, upstream_status, limit=True): ''' query for a subset of tasks by status ''' self.prune() result = {} @@ -696,6 +701,8 @@ def task_list(self, status, upstream_status): upstream_status == self._upstream_status(task.id, upstream_status_table)): serialized = self._serialize_task(task.id, False) result[task.id] = serialized + if limit and len(result) > self._config.max_shown_tasks: + return {'num_tasks': len(result)} return result def worker_list(self, include_running=True): diff --git a/luigi/server.py b/luigi/server.py index cddbfa71cd..a72cb0ff07 100644 --- a/luigi/server.py +++ b/luigi/server.py @@ -45,6 +45,7 @@ def _create_scheduler(): disable_window = config.getint('scheduler', 'disable-window-seconds', 3600) disable_failures = config.getint('scheduler', 'disable-num-failures', None) disable_persist = config.getint('scheduler', 'disable-persist-seconds', 86400) + max_shown_tasks = config.getint('scheduler', 'max-shown-tasks', 100000) resources = config.getintdict('resources') if config.getboolean('scheduler', 'record_task_history', False): @@ -54,7 +55,8 @@ def _create_scheduler(): task_history_impl = task_history.NopHistory() return scheduler.CentralPlannerScheduler( retry_delay, remove_delay, worker_disconnect_delay, state_path, task_history_impl, - resources, disable_persist, disable_window, disable_failures) + resources, disable_persist, disable_window, disable_failures, max_shown_tasks, + ) class RPCHandler(tornado.web.RequestHandler): diff --git a/luigi/static/visualiser/index.html b/luigi/static/visualiser/index.html index f479865c6a..151ac630b5 100644 --- a/luigi/static/visualiser/index.html +++ b/luigi/static/visualiser/index.html @@ -82,6 +82,9 @@

{{/tasks}} +