diff --git a/celery_progress/backend.py b/celery_progress/backend.py index e9a051a..8e24620 100644 --- a/celery_progress/backend.py +++ b/celery_progress/backend.py @@ -59,7 +59,7 @@ def __init__(self, result): def get_info(self): response = {'state': self.result.state} - if self.result.ready(): + if self.result.state in ['SUCCESS', 'FAILURE']: success = self.result.successful() with allow_join_result(): response.update({ @@ -68,18 +68,19 @@ def get_info(self): 'progress': _get_completed_progress(), 'result': self.result.get(self.result.id) if success else str(self.result.info), }) - elif self.result.state == 'RETRY': - retry = self.result.info - when = str(retry.when) if isinstance(retry.when, datetime.datetime) else str( - datetime.datetime.now() + datetime.timedelta(seconds=retry.when)) + elif self.result.state in ['RETRY', 'REVOKED']: + if self.result.state == 'RETRY': + retry = self.result.info + when = str(retry.when) if isinstance(retry.when, datetime.datetime) else str( + datetime.datetime.now() + datetime.timedelta(seconds=retry.when)) + result = {'when': when, 'message': retry.message or str(retry.exc)} + else: + result = 'Task ' + str(self.result.info) response.update({ 'complete': True, 'success': False, 'progress': _get_completed_progress(), - 'result': { - 'when': when, - 'message': retry.message or str(retry.exc) - }, + 'result': result, }) elif self.result.state == PROGRESS_STATE: response.update({ diff --git a/celery_progress/websockets/tasks.py b/celery_progress/websockets/tasks.py index 1998167..4cadf4d 100644 --- a/celery_progress/websockets/tasks.py +++ b/celery_progress/websockets/tasks.py @@ -1,4 +1,4 @@ -from celery.signals import task_postrun +from celery.signals import task_postrun, task_revoked from .backend import WebSocketProgressRecorder from celery_progress.backend import KnownResult, Progress @@ -12,3 +12,15 @@ def task_postrun_handler(task_id, **kwargs): result = KnownResult(task_id, kwargs.pop('retval'), kwargs.pop('state')) data = Progress(result).get_info() WebSocketProgressRecorder.push_update(task_id, data=data, final=True) + + +@task_revoked.connect(retry=True) +def task_revoked_handler(request, **kwargs): + """Runs if a task has been revoked. This will be used to push a websocket update for revoked events. + + If the websockets version of this package is not installed, this will fail silently.""" + _result = ('terminated' if kwargs.pop('terminated') else None) or ('expired' if kwargs.pop('expired') else None) \ + or 'revoked' + result = KnownResult(request.id, _result, 'REVOKED') + data = Progress(result).get_info() + WebSocketProgressRecorder.push_update(request.id, data=data, final=True)