Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wfst in celery iii #360

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions src/layman/authz/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from functools import wraps
from flask import after_this_request, request

from layman import LaymanError, settings, authn, util as layman_util
from layman import LaymanError, settings, authn, util as layman_util, common
from layman.common.prime_db_schema import workspaces, users
from layman.common.rest import parse_request_path

Expand All @@ -16,11 +16,11 @@ def authorize(workspace, publication_type, publication_name, request_method, act
}[publication_type]

if is_multi_publication_request:
if request_method in ['GET', 'DELETE']:
if request_method.lower() in [common.REQUEST_METHOD_GET, common.REQUEST_METHOD_DELETE]:
if not workspaces.get_workspace_infos(workspace):
raise LaymanError(40) # Workspace not found
return
if request_method in ['POST']:
if request_method.lower() in [common.REQUEST_METHOD_POST]:
if actor_name == workspace:
return
if ((not users.get_user_infos(workspace)) # public workspace
Expand All @@ -45,11 +45,12 @@ def authorize(workspace, publication_type, publication_name, request_method, act
if not publ_info:
raise LaymanError(publication_not_found_code)
user_can_read = is_user_in_access_rule(actor_name, publ_info['access_rights']['read'])
if request_method in ['GET']:
if request_method.lower() in [common.REQUEST_METHOD_GET]:
if user_can_read:
return
raise LaymanError(publication_not_found_code)
if request_method in ['POST', 'PUT', 'PATCH', 'DELETE']:
if request_method.lower() in [common.REQUEST_METHOD_PATCH, common.REQUEST_METHOD_DELETE,
common.REQUEST_METHOD_POST, common.REQUEST_METHOD_PUT, ]:
if is_user_in_access_rule(actor_name, publ_info['access_rights']['write']):
return
if user_can_read:
Expand Down Expand Up @@ -113,7 +114,7 @@ def decorated_function(*args, **kwargs):
actor_name = authn.get_authn_username()
# raises exception in case of unauthorized request
authorize(workspace, publication_type, publication_name, request.method, actor_name)
if workspace and publication_type and not publication_name and request.method == 'GET':
if workspace and publication_type and not publication_name and request.method == common.REQUEST_METHOD_GET:
# pylint: disable=unused-variable
@after_this_request
def authorize_after_request_tmp(response):
Expand All @@ -132,7 +133,7 @@ def decorated_function(*args, **kwargs):
if publication_type is None or workspace or publication_name:
raise Exception(f"Authorization module is unable to authorize path {req_path}")
actor_name = authn.get_authn_username()
if request.method == 'GET':
if request.method == common.REQUEST_METHOD_GET:
# pylint: disable=unused-variable
@after_this_request
def authorize_after_request_tmp(response):
Expand Down
81 changes: 68 additions & 13 deletions src/layman/celery.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import json
import importlib
from flask import current_app
from celery.contrib.abortable import AbortableAsyncResult

from layman import settings
from layman import settings, common
from layman.common import redis as redis_util

REDIS_CURRENT_TASK_NAMES = f"{__name__}:CURRENT_TASK_NAMES"
PUBLICATION_CHAIN_INFOS = f'{__name__}:PUBLICATION_TASK_INFOS'
TASK_ID_TO_PUBLICATION = f'{__name__}:TASK_ID_TO_PUBLICATION'
PUBLICATION_CHAIN_INFOS = f'{__name__}:PUBLICATION_CHAIN_INFOS'
LAST_TASK_ID_IN_CHAIN_TO_PUBLICATION = f'{__name__}:LAST_TASK_ID_IN_CHAIN_TO_PUBLICATION'
RUN_AFTER_CHAIN = f'{__name__}:RUN_AFTER_CHAIN'


def task_prerun(workspace, _publication_type, publication_name, _task_id, task_name):
Expand All @@ -25,25 +27,72 @@ def task_postrun(workspace, publication_type, publication_name, task_id, task_na
task_hash = _get_task_hash(task_name, workspace, publication_name)
rds.srem(key, task_hash)

key = TASK_ID_TO_PUBLICATION
key = LAST_TASK_ID_IN_CHAIN_TO_PUBLICATION
hash = task_id
if rds.hexists(key, hash):
finnish_publication_task(task_id)
finnish_publication_chain(task_id)
next_task = pop_step_to_run_after_chain(workspace, publication_type, publication_name)
if next_task:
module_name, method_name = next_task.split('::')
module = importlib.import_module(module_name)
method = getattr(module, method_name)
method(workspace, publication_type, publication_name)
elif task_state == 'FAILURE':
chain_info = get_publication_chain_info_dict(workspace, publication_type, publication_name)
if chain_info is not None:
last_task_id = chain_info['last']
finnish_publication_task(last_task_id)
finnish_publication_chain(last_task_id)


def _get_task_hash(task_name, workspace, publication_name):
return f"{task_name}:{workspace}:{publication_name}"


def finnish_publication_task(task_id):
def push_step_to_run_after_chain(workspace, publication_type, publication_name, step_code, ):
rds = settings.LAYMAN_REDIS
key = TASK_ID_TO_PUBLICATION
hash = task_id
key = LAST_TASK_ID_IN_CHAIN_TO_PUBLICATION
hash = _get_publication_hash(workspace, publication_type, publication_name)
val = rds.hget(key, hash)
queue = json.loads(val) if val is not None else list()
if step_code not in queue:
queue.append(step_code)
rds.hset(key, hash, json.dumps(queue))


def pop_step_to_run_after_chain(workspace, publication_type, publication_name, ):
rds = settings.LAYMAN_REDIS
key = LAST_TASK_ID_IN_CHAIN_TO_PUBLICATION
hash = _get_publication_hash(workspace, publication_type, publication_name)
val = rds.hget(key, hash)
result = None
if val:
queue = json.loads(val)
if len(queue) > 0:
result = queue.pop(0)
rds.hset(key, hash, json.dumps(queue))
return result


def get_run_after_chain_queue(workspace, publication_type, publication_name, ):
rds = settings.LAYMAN_REDIS
key = LAST_TASK_ID_IN_CHAIN_TO_PUBLICATION
index-git marked this conversation as resolved.
Show resolved Hide resolved
hash = _get_publication_hash(workspace, publication_type, publication_name)
val = rds.hget(key, hash)
queue = json.loads(val) if val is not None else list()
return queue


def clear_steps_to_run_after_chain(workspace, publication_type, publication_name, ):
rds = settings.LAYMAN_REDIS
key = LAST_TASK_ID_IN_CHAIN_TO_PUBLICATION
hash = _get_publication_hash(workspace, publication_type, publication_name)
rds.hdel(key, hash)


def finnish_publication_chain(last_task_id_in_chain):
rds = settings.LAYMAN_REDIS
key = LAST_TASK_ID_IN_CHAIN_TO_PUBLICATION
hash = last_task_id_in_chain
publ_hash = rds.hget(key, hash)
if publ_hash is None:
return
Expand All @@ -56,7 +105,7 @@ def finnish_publication_task(task_id):
rds.hdel(key, hash)

lock = redis_util.get_publication_lock(username, publication_type, publication_name)
if lock in ['patch', 'post', 'wfst', ]:
if lock in [common.REQUEST_METHOD_PATCH, common.REQUEST_METHOD_POST, common.PUBLICATION_LOCK_CODE_WFST, ]:
redis_util.unlock_publication(username, publication_type, publication_name)


Expand Down Expand Up @@ -132,7 +181,7 @@ def set_publication_chain_info(workspace, publication_type, publication_name, ta
set_publication_chain_info_dict(workspace, publication_type, publication_name, chain_info)

rds = settings.LAYMAN_REDIS
key = TASK_ID_TO_PUBLICATION
key = LAST_TASK_ID_IN_CHAIN_TO_PUBLICATION
val = _get_publication_hash(workspace, publication_type, publication_name)
hash = chain_info['last']
rds.hset(key, hash, val)
Expand All @@ -143,7 +192,13 @@ def abort_chain(chain_info):
return

abort_task_chain(chain_info['by_order'], chain_info['by_name'])
finnish_publication_task(chain_info['last'].task_id)
finnish_publication_chain(chain_info['last'].task_id)


def abort_publication_chain(workspace, publication_type, publication_name):
chain_info = get_publication_chain_info(workspace, publication_type, publication_name)
abort_chain(chain_info)
clear_steps_to_run_after_chain(workspace, publication_type, publication_name)


def abort_task_chain(results_by_order, results_by_name=None):
Expand Down Expand Up @@ -198,7 +253,7 @@ def delete_publication(workspace, publication_type, publication_name):
hash = _get_publication_hash(workspace, publication_type, publication_name)
rds.hdel(key, hash)

key = TASK_ID_TO_PUBLICATION
key = LAST_TASK_ID_IN_CHAIN_TO_PUBLICATION
rds.hdel(key, task_id)


Expand Down
10 changes: 10 additions & 0 deletions src/layman/common/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
from collections import namedtuple

REQUEST_METHOD_DELETE = 'delete'
REQUEST_METHOD_GET = 'get'
REQUEST_METHOD_PATCH = 'patch'
REQUEST_METHOD_POST = 'post'
REQUEST_METHOD_PUT = 'put'
PUBLICATION_LOCK_CODE_POST = 'post'
PUBLICATION_LOCK_CODE_PATCH = 'patch'
PUBLICATION_LOCK_CODE_DELETE = 'delete'
PUBLICATION_LOCK_CODE_WFST = 'wfst'

InternalSourceTypeDef = namedtuple('InternalSourceTypeDef', ['info_items',
])

Expand Down
4 changes: 2 additions & 2 deletions src/layman/common/output_srs_list_test.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import math
from test import process, process_client, geoserver_client, util
from test import process, process_client, geoserver_client, assert_util
import pytest

from layman import settings, app
Expand Down Expand Up @@ -171,4 +171,4 @@ def test_spatial_precision_wms(ensure_layer, epsg_code, extent, img_size, style_
num_circles = 5
pixel_diff_limit = circle_perimeter * num_circles * diff_line_width

util.assert_same_images(url, obtained_file, expected_file, pixel_diff_limit)
assert_util.assert_same_images(url, obtained_file, expected_file, pixel_diff_limit)
32 changes: 17 additions & 15 deletions src/layman/common/redis.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from functools import wraps
from flask import request, current_app

from layman import settings, celery as celery_util
from layman import settings, celery as celery_util, common
from layman import LaymanError

PUBLICATION_LOCKS_KEY = f'{__name__}:PUBLICATION_LOCKS'
Expand Down Expand Up @@ -62,32 +62,34 @@ def unlock_publication(workspace, publication_type, publication_name):
rds.hdel(key, hash)


def solve_locks(workspace, publication_type, publication_name, error_code, method):
def solve_locks(workspace, publication_type, publication_name, error_code, requested_lock):
current_lock = get_publication_lock(
workspace,
publication_type,
publication_name,
)
if current_lock is None:
return
if method not in ['patch', 'delete', 'wfst', ]:
raise Exception(f"Unknown method to check: {method}")
if current_lock not in ['patch', 'delete', 'post', 'wfst', ]:
if requested_lock not in [common.PUBLICATION_LOCK_CODE_PATCH, common.PUBLICATION_LOCK_CODE_DELETE,
common.PUBLICATION_LOCK_CODE_WFST, ]:
raise Exception(f"Unknown method to check: {requested_lock}")
if current_lock not in [common.PUBLICATION_LOCK_CODE_PATCH, common.PUBLICATION_LOCK_CODE_DELETE,
common.PUBLICATION_LOCK_CODE_POST,
common.PUBLICATION_LOCK_CODE_WFST, ]:
raise Exception(f"Unknown current lock: {current_lock}")
if current_lock in ['patch', 'post']:
if method in ['patch', 'post']:
if current_lock in [common.PUBLICATION_LOCK_CODE_PATCH, common.PUBLICATION_LOCK_CODE_POST, ]:
if requested_lock in [common.PUBLICATION_LOCK_CODE_PATCH, common.PUBLICATION_LOCK_CODE_POST, ]:
raise LaymanError(error_code)
elif current_lock in ['delete']:
if method in ['patch', 'post']:
elif current_lock in [common.PUBLICATION_LOCK_CODE_DELETE, ]:
if requested_lock in [common.PUBLICATION_LOCK_CODE_PATCH, common.PUBLICATION_LOCK_CODE_POST, common.PUBLICATION_LOCK_CODE_WFST, ]:
raise LaymanError(error_code)
if method not in ['delete']:
if (current_lock, method) == ('wfst', 'wfst'):
if requested_lock not in [common.PUBLICATION_LOCK_CODE_DELETE, ]:
if requested_lock == common.PUBLICATION_LOCK_CODE_WFST:
raise LaymanError(19, private_data={'can_run_later': True})
if current_lock == common.PUBLICATION_LOCK_CODE_WFST and requested_lock in [common.REQUEST_METHOD_PATCH, common.REQUEST_METHOD_POST, ]:
chain_info = celery_util.get_publication_chain_info(workspace, publication_type, publication_name)
celery_util.abort_chain(chain_info)
else:
assert current_lock not in ['wfst', ] and method not in ['wfst', ],\
f'current_lock={current_lock}, method={method},' \
f'workspace, publication_type, publication_name={(workspace, publication_type, publication_name)}'
celery_util.push_step_to_run_after_chain(workspace, publication_type, publication_name, 'layman.util::patch_after_wfst')


def _get_publication_hash(workspace, publication_type, publication_name):
Expand Down
20 changes: 13 additions & 7 deletions src/layman/geoserver_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from flask import Blueprint, g, current_app as app, request, Response

from geoserver.util import reset as gs_reset
from layman import authn, authz, settings
from layman import authn, authz, common, settings, LaymanError, celery as celery_util
from layman.authn import authenticate, is_user_with_name
from layman.common import redis
from layman.layer import db, LAYER_TYPE, util as layer_util
Expand Down Expand Up @@ -179,13 +179,10 @@ def proxy(subpath):
headers_req[settings.LAYMAN_GS_AUTHN_HTTP_HEADER_ATTRIBUTE] = authn_username

app.logger.info(f"{request.method} GeoServer proxy, headers_req={headers_req}, url={url}")
changing_wfs_t_layers = set()
wfs_t_layers = set()
if data is not None and len(data) > 0:
try:
wfs_t_attribs, wfs_t_layers = extract_attributes_and_layers_from_wfs_t(data)
changing_wfs_t_layers = {(workspace, layer) for workspace, layer in wfs_t_layers if authz.can_i_edit(LAYER_TYPE, workspace, layer)}
for workspace, layer in changing_wfs_t_layers:
redis.create_lock(workspace, LAYER_TYPE, layer, 19, 'wfst')
if wfs_t_attribs:
ensure_wfs_t_attributes(wfs_t_attribs)
except BaseException as err:
Expand All @@ -198,8 +195,17 @@ def proxy(subpath):
allow_redirects=False
)

for workspace, layername in changing_wfs_t_layers:
patch_after_wfst(workspace, layername)
for workspace, layername in wfs_t_layers:
if authz.can_i_edit(LAYER_TYPE, workspace, layername):
try:
redis.create_lock(workspace, LAYER_TYPE, layername, 19, common.PUBLICATION_LOCK_CODE_WFST)
patch_after_wfst(workspace, layername)
except LaymanError as exc:
if exc.code == 19 and exc.private_data.get('can_run_later', False):
celery_util.push_step_to_run_after_chain(workspace, LAYER_TYPE, layername,
'layman.util::patch_after_wfst')
else:
raise exc

excluded_headers = ['content-encoding', 'content-length', 'transfer-encoding', 'connection']
headers = {key: value for (key, value) in response.headers.items() if key.lower() not in excluded_headers}
Expand Down
Loading