diff --git a/doc/tasks.rst b/doc/tasks.rst index 4304d99993..754550b51e 100644 --- a/doc/tasks.rst +++ b/doc/tasks.rst @@ -184,8 +184,8 @@ Task status tracking For long-running or remote tasks it is convenient to see extended status information not only on the command line or in your logs but also in the GUI of the central scheduler. Luigi implements -dynamic status messages and tracking urls which may point to an external monitoring system. You -can set this information using callbacks within Task.run_: +dynamic status messages, progress bar and tracking urls which may point to an external monitoring system. +You can set this information using callbacks within Task.run_: .. code:: python @@ -199,6 +199,8 @@ can set this information using callbacks within Task.run_: # do some hard work here if i % 10 == 0: self.set_status_message("Progress: %d / 100" % i) + # displays a progress bar in the scheduler UI + self.set_progress_percentage(i) .. _Events: diff --git a/luigi/scheduler.py b/luigi/scheduler.py index d2deda3131..03c4de0c85 100644 --- a/luigi/scheduler.py +++ b/luigi/scheduler.py @@ -275,7 +275,7 @@ def __eq__(self, other): class Task(object): def __init__(self, task_id, status, deps, resources=None, priority=0, family='', module=None, - params=None, tracking_url=None, status_message=None, retry_policy='notoptional'): + params=None, tracking_url=None, status_message=None, progress_percentage=None, retry_policy='notoptional'): self.id = task_id self.stakeholders = set() # workers ids that are somehow related to this task (i.e. don't prune while any of these workers are still active) self.workers = OrderedSet() # workers ids that can perform task - task is 'BROKEN' if none of these workers are active @@ -301,6 +301,7 @@ def __init__(self, task_id, status, deps, resources=None, priority=0, family='', self.failures = Failures(self.retry_policy.disable_window) self.tracking_url = tracking_url self.status_message = status_message + self.progress_percentage = progress_percentage self.scheduler_disable_time = None self.runnable = False self.batchable = False @@ -1195,7 +1196,8 @@ def _serialize_task(self, task_id, include_deps=True, deps=None): 'priority': task.priority, 'resources': task.resources, 'tracking_url': getattr(task, "tracking_url", None), - 'status_message': getattr(task, "status_message", None) + 'status_message': getattr(task, "status_message", None), + 'progress_percentage': getattr(task, "progress_percentage", None) } if task.status == DISABLED: ret['re_enable_able'] = task.scheduler_disable_time is not None @@ -1454,6 +1456,23 @@ def get_task_status_message(self, task_id): else: return {"taskId": task_id, "statusMessage": ""} + @rpc_method() + def set_task_progress_percentage(self, task_id, progress_percentage): + if self._state.has_task(task_id): + task = self._state.get_task(task_id) + task.progress_percentage = progress_percentage + if task.status == RUNNING and task.batch_id is not None: + for batch_task in self._state.get_batch_running_tasks(task.batch_id): + batch_task.progress_percentage = progress_percentage + + @rpc_method() + def get_task_progress_percentage(self, task_id): + if self._state.has_task(task_id): + task = self._state.get_task(task_id) + return {"taskId": task_id, "progressPercentage": task.progress_percentage} + else: + return {"taskId": task_id, "progressPercentage": None} + def _update_task_history(self, task, status, host=None): try: if status == DONE or status == FAILED: diff --git a/luigi/static/visualiser/index.html b/luigi/static/visualiser/index.html index d90efdc808..95b5911011 100644 --- a/luigi/static/visualiser/index.html +++ b/luigi/static/visualiser/index.html @@ -245,6 +245,10 @@ {{#re_enable}}Re-enable{{/re_enable}} {{#trackingUrl}}{{/trackingUrl}} {{#statusMessage}}{{/statusMessage}} + {{^statusMessage}} + {{#progressPercentage}} + {{/progressPercentage}} + {{/statusMessage}}