From d615ba072c3288dd44ba2bb1f547016ea9a4ddc0 Mon Sep 17 00:00:00 2001 From: Jiri Kozel Date: Fri, 25 Jun 2021 11:45:23 +0200 Subject: [PATCH] Fix #405 (forever PENDING publications). --- CHANGELOG.md | 7 ++----- src/layman/celery.py | 22 +++++++++++++--------- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 841d05953..2511cd836 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,12 +1,9 @@ # Changelog ## v1.13.2 - {release_date} -### Upgrade requirements -### Migrations and checks -#### Schema migrations -#### Data migrations + 2021-06-25 ### Changes +- Fix [#405](https://github.com/LayerManager/layman/issues/405). In some specific situations, [GET Workspace Layer](doc/rest.md#get-workspace-layer) and [GET Workspace Map](doc/rest.md#get-workspace-map) returned PENDING state although asynchronous tasks were already finished. Also PATCH request to these publications was not possible. It's fixed now. ## v1.13.1 2021-06-07 diff --git a/src/layman/celery.py b/src/layman/celery.py index 659ddaddd..79e81841f 100644 --- a/src/layman/celery.py +++ b/src/layman/celery.py @@ -3,7 +3,8 @@ import celery.exceptions from flask import current_app -from celery.contrib.abortable import AbortableAsyncResult +from celery import states +from celery.contrib.abortable import AbortableAsyncResult, ABORTED from layman.publication_relation.util import update_related_publications_after_change from layman import settings, common, util as layman_util @@ -33,7 +34,7 @@ def task_postrun(workspace, publication_type, publication_name, task_id, task_na key = LAST_TASK_ID_IN_CHAIN_TO_PUBLICATION hash = task_id if rds.hexists(key, hash): - finish_publication_chain(task_id) + finish_publication_chain(task_id, task_state) next_task = pop_step_to_run_after_chain(workspace, publication_type, publication_name) if next_task: module_name, method_name = next_task.split('::') @@ -45,7 +46,7 @@ def task_postrun(workspace, publication_type, publication_name, task_id, task_na chain_info = get_publication_chain_info_dict(workspace, publication_type, publication_name) if chain_info is not None: last_task_id = chain_info['last'] - finish_publication_chain(last_task_id) + finish_publication_chain(last_task_id, task_state) clear_steps_to_run_after_chain(workspace, publication_type, publication_name) # Sometimes, when delete request run just after other request for the same publication (for example WFS-T), # the aborted task keep running and finish after end of delete task for the same source. This part make sure, @@ -102,7 +103,7 @@ def clear_steps_to_run_after_chain(workspace, publication_type, publication_name rds.hdel(key, hash) -def finish_publication_chain(last_task_id_in_chain): +def finish_publication_chain(last_task_id_in_chain, state): rds = settings.LAYMAN_REDIS key = LAST_TASK_ID_IN_CHAIN_TO_PUBLICATION hash = last_task_id_in_chain @@ -113,6 +114,7 @@ def finish_publication_chain(last_task_id_in_chain): chain_info = get_publication_chain_info_dict(username, publication_type, publication_name) chain_info['finished'] = True + chain_info['state'] = state set_publication_chain_info_dict(username, publication_type, publication_name, chain_info) rds.hdel(key, hash) @@ -190,6 +192,7 @@ def set_publication_chain_info(workspace, publication_type, publication_name, ta }, 'by_order': [r.task_id for r in chained_results], 'finished': False, + 'state': states.PENDING, } set_publication_chain_info_dict(workspace, publication_type, publication_name, chain_info) @@ -205,7 +208,7 @@ def abort_chain(chain_info): return abort_task_chain(chain_info['by_order'], chain_info['by_name']) - finish_publication_chain(chain_info['last'].task_id) + finish_publication_chain(chain_info['last'].task_id, ABORTED) def abort_publication_chain(workspace, publication_type, publication_name): @@ -229,7 +232,7 @@ def abort_task_chain(results_by_order, results_by_name=None): prev_task_state = task_result.state current_app.logger.info(f'aborting result {task_name} {task_result.id} with state {task_result.state}') task_result.abort() - assert task_result.state == 'ABORTED' + assert task_result.state == ABORTED if prev_task_state == 'STARTED': current_app.logger.info( f'waiting for result of {task_name} {task_result.id} with state {task_result.state}') @@ -242,15 +245,16 @@ def abort_task_chain(results_by_order, results_by_name=None): def is_chain_successful(chain_info): - return chain_info['last'].successful() + return chain_info['state'] == states.SUCCESS or chain_info['last'].successful() def is_chain_failed(chain_info): - return any(tr.failed() for tr in chain_info['by_order']) + return chain_info['state'] == states.FAILURE or any(tr.failed() for tr in chain_info['by_order']) def is_chain_ready(chain_info): - return is_chain_successful(chain_info) or is_chain_failed(chain_info) + return chain_info['state'] in {states.SUCCESS, states.FAILURE, ABORTED} or is_chain_successful(chain_info) or \ + is_chain_failed(chain_info) def _get_publication_hash(workspace, publication_type, publication_name):