Skip to content

Commit

Permalink
Fix #405 (forever PENDING publications).
Browse files Browse the repository at this point in the history
  • Loading branch information
jirik committed Jun 25, 2021
1 parent 482c026 commit d615ba0
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 14 deletions.
7 changes: 2 additions & 5 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
# Changelog

## v1.13.2
{release_date}
### Upgrade requirements
### Migrations and checks
#### Schema migrations
#### Data migrations
2021-06-25
### Changes
- Fix [#405](https://github.com/LayerManager/layman/issues/405). In some specific situations, [GET Workspace Layer](doc/rest.md#get-workspace-layer) and [GET Workspace Map](doc/rest.md#get-workspace-map) returned PENDING state although asynchronous tasks were already finished. Also PATCH request to these publications was not possible. It's fixed now.

## v1.13.1
2021-06-07
Expand Down
22 changes: 13 additions & 9 deletions src/layman/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@

import celery.exceptions
from flask import current_app
from celery.contrib.abortable import AbortableAsyncResult
from celery import states
from celery.contrib.abortable import AbortableAsyncResult, ABORTED

from layman.publication_relation.util import update_related_publications_after_change
from layman import settings, common, util as layman_util
Expand Down Expand Up @@ -33,7 +34,7 @@ def task_postrun(workspace, publication_type, publication_name, task_id, task_na
key = LAST_TASK_ID_IN_CHAIN_TO_PUBLICATION
hash = task_id
if rds.hexists(key, hash):
finish_publication_chain(task_id)
finish_publication_chain(task_id, task_state)
next_task = pop_step_to_run_after_chain(workspace, publication_type, publication_name)
if next_task:
module_name, method_name = next_task.split('::')
Expand All @@ -45,7 +46,7 @@ def task_postrun(workspace, publication_type, publication_name, task_id, task_na
chain_info = get_publication_chain_info_dict(workspace, publication_type, publication_name)
if chain_info is not None:
last_task_id = chain_info['last']
finish_publication_chain(last_task_id)
finish_publication_chain(last_task_id, task_state)
clear_steps_to_run_after_chain(workspace, publication_type, publication_name)
# Sometimes, when delete request run just after other request for the same publication (for example WFS-T),
# the aborted task keep running and finish after end of delete task for the same source. This part make sure,
Expand Down Expand Up @@ -102,7 +103,7 @@ def clear_steps_to_run_after_chain(workspace, publication_type, publication_name
rds.hdel(key, hash)


def finish_publication_chain(last_task_id_in_chain):
def finish_publication_chain(last_task_id_in_chain, state):
rds = settings.LAYMAN_REDIS
key = LAST_TASK_ID_IN_CHAIN_TO_PUBLICATION
hash = last_task_id_in_chain
Expand All @@ -113,6 +114,7 @@ def finish_publication_chain(last_task_id_in_chain):

chain_info = get_publication_chain_info_dict(username, publication_type, publication_name)
chain_info['finished'] = True
chain_info['state'] = state
set_publication_chain_info_dict(username, publication_type, publication_name, chain_info)

rds.hdel(key, hash)
Expand Down Expand Up @@ -190,6 +192,7 @@ def set_publication_chain_info(workspace, publication_type, publication_name, ta
},
'by_order': [r.task_id for r in chained_results],
'finished': False,
'state': states.PENDING,
}
set_publication_chain_info_dict(workspace, publication_type, publication_name, chain_info)

Expand All @@ -205,7 +208,7 @@ def abort_chain(chain_info):
return

abort_task_chain(chain_info['by_order'], chain_info['by_name'])
finish_publication_chain(chain_info['last'].task_id)
finish_publication_chain(chain_info['last'].task_id, ABORTED)


def abort_publication_chain(workspace, publication_type, publication_name):
Expand All @@ -229,7 +232,7 @@ def abort_task_chain(results_by_order, results_by_name=None):
prev_task_state = task_result.state
current_app.logger.info(f'aborting result {task_name} {task_result.id} with state {task_result.state}')
task_result.abort()
assert task_result.state == 'ABORTED'
assert task_result.state == ABORTED
if prev_task_state == 'STARTED':
current_app.logger.info(
f'waiting for result of {task_name} {task_result.id} with state {task_result.state}')
Expand All @@ -242,15 +245,16 @@ def abort_task_chain(results_by_order, results_by_name=None):


def is_chain_successful(chain_info):
return chain_info['last'].successful()
return chain_info['state'] == states.SUCCESS or chain_info['last'].successful()


def is_chain_failed(chain_info):
return any(tr.failed() for tr in chain_info['by_order'])
return chain_info['state'] == states.FAILURE or any(tr.failed() for tr in chain_info['by_order'])


def is_chain_ready(chain_info):
return is_chain_successful(chain_info) or is_chain_failed(chain_info)
return chain_info['state'] in {states.SUCCESS, states.FAILURE, ABORTED} or is_chain_successful(chain_info) or \
is_chain_failed(chain_info)


def _get_publication_hash(workspace, publication_type, publication_name):
Expand Down

0 comments on commit d615ba0

Please sign in to comment.