Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wfst in celery III #361

Merged
merged 6 commits into from
May 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions src/layman/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
from layman.common import redis as redis_util

REDIS_CURRENT_TASK_NAMES = f"{__name__}:CURRENT_TASK_NAMES"
PUBLICATION_CHAIN_INFOS = f'{__name__}:PUBLICATION_TASK_INFOS'
LAST_TASK_ID_IN_CHAIN_TO_PUBLICATION = f'{__name__}:TASK_ID_TO_PUBLICATION'
PUBLICATION_CHAIN_INFOS = f'{__name__}:PUBLICATION_CHAIN_INFOS'
LAST_TASK_ID_IN_CHAIN_TO_PUBLICATION = f'{__name__}:LAST_TASK_ID_IN_CHAIN_TO_PUBLICATION'
RUN_AFTER_CHAIN = f'{__name__}:RUN_AFTER_CHAIN'


Expand All @@ -30,7 +30,7 @@ def task_postrun(workspace, publication_type, publication_name, task_id, task_na
key = LAST_TASK_ID_IN_CHAIN_TO_PUBLICATION
hash = task_id
if rds.hexists(key, hash):
finnish_publication_task(task_id)
finnish_publication_chain(task_id)
next_task = pop_step_to_run_after_chain(workspace, publication_type, publication_name)
if next_task:
module_name, method_name = next_task.split('::')
Expand All @@ -41,7 +41,7 @@ def task_postrun(workspace, publication_type, publication_name, task_id, task_na
chain_info = get_publication_chain_info_dict(workspace, publication_type, publication_name)
if chain_info is not None:
last_task_id = chain_info['last']
finnish_publication_task(last_task_id)
finnish_publication_chain(last_task_id)


def _get_task_hash(task_name, workspace, publication_name):
Expand Down Expand Up @@ -73,17 +73,26 @@ def pop_step_to_run_after_chain(workspace, publication_type, publication_name, )
return result


def get_run_after_chain_queue(workspace, publication_type, publication_name, ):
rds = settings.LAYMAN_REDIS
key = RUN_AFTER_CHAIN
hash = _get_publication_hash(workspace, publication_type, publication_name)
val = rds.hget(key, hash)
queue = json.loads(val) if val is not None else list()
return queue


def clear_steps_to_run_after_chain(workspace, publication_type, publication_name, ):
rds = settings.LAYMAN_REDIS
key = RUN_AFTER_CHAIN
hash = _get_publication_hash(workspace, publication_type, publication_name)
rds.hdel(key, hash)


def finnish_publication_task(task_id):
def finnish_publication_chain(last_task_id_in_chain):
rds = settings.LAYMAN_REDIS
key = LAST_TASK_ID_IN_CHAIN_TO_PUBLICATION
hash = task_id
hash = last_task_id_in_chain
publ_hash = rds.hget(key, hash)
if publ_hash is None:
return
Expand Down Expand Up @@ -183,7 +192,7 @@ def abort_chain(chain_info):
return

abort_task_chain(chain_info['by_order'], chain_info['by_name'])
finnish_publication_task(chain_info['last'].task_id)
finnish_publication_chain(chain_info['last'].task_id)


def abort_publication_chain(workspace, publication_type, publication_name):
Expand Down
4 changes: 2 additions & 2 deletions src/layman/common/output_srs_list_test.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import math
from test import process, process_client, geoserver_client, util
from test import process, process_client, geoserver_client, assert_util
import pytest

from layman import settings, app
Expand Down Expand Up @@ -171,4 +171,4 @@ def test_spatial_precision_wms(ensure_layer, epsg_code, extent, img_size, style_
num_circles = 5
pixel_diff_limit = circle_perimeter * num_circles * diff_line_width

util.assert_same_images(url, obtained_file, expected_file, pixel_diff_limit)
assert_util.assert_same_images(url, obtained_file, expected_file, pixel_diff_limit)
32 changes: 5 additions & 27 deletions src/layman/geoserver_proxy_test.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import time
from test import process_client as client_util, geoserver_client, util as test_util

from test import process_client as client_util, geoserver_client, util as test_util, assert_util
from test.process_client import get_authz_headers
from test.data import wfs as data_wfs, SMALL_LAYER_BBOX
import requests
from owslib.feature.schema import get_schema as get_wfs_schema
import pytest

from geoserver.util import get_layer_thumbnail, get_square_bbox
from layman import app, settings, util as layman_util
from layman.common import bbox as bbox_util
from layman import app, settings
from layman.layer import db, util as layer_util
from layman.layer.filesystem import thumbnail
from layman.layer.geoserver import wfs as geoserver_wfs
Expand Down Expand Up @@ -373,28 +373,6 @@ def do_test(wfs_query, attribute_names):
client_util.delete_workspace_layer(username, layername1, headers=headers1)


def assert_all_sources_bbox(workspace, layer, expected_bbox):
with app.app_context():
bbox = tuple(layman_util.get_publication_info(workspace, client_util.LAYER_TYPE, layer,
context={'key': ['bounding_box']})['bounding_box'])
test_util.assert_same_bboxes(expected_bbox, bbox, 0)
test_util.assert_wfs_bbox(workspace, layer, expected_bbox)
test_util.assert_wms_bbox(workspace, layer, expected_bbox)

with app.app_context():
expected_bbox_4326 = bbox_util.transform(expected_bbox, 3857, 4326, )
md_comparison = client_util.get_workspace_layer_metadata_comparison(workspace, layer)
csw_prefix = settings.CSW_PROXY_URL
csw_src_key = client_util.get_source_key_from_metadata_comparison(md_comparison, csw_prefix)
assert csw_src_key is not None
prop_key = 'extent'
md_props = md_comparison['metadata_properties']
assert md_props[prop_key]['equal'] is True
assert md_props[prop_key]['equal_or_null'] is True
csw_bbox_4326 = tuple(md_props[prop_key]['values'][csw_src_key])
test_util.assert_same_bboxes(expected_bbox_4326, csw_bbox_4326, 0.001)


@pytest.mark.parametrize('style_file, thumbnail_style_postfix', [
(None, '_sld'),
('sample/style/small_layer.qml', '_qml'),
Expand All @@ -406,7 +384,7 @@ def test_wfs_bbox(style_file, thumbnail_style_postfix):

client_util.publish_workspace_layer(workspace, layer, style_file=style_file, )

assert_all_sources_bbox(workspace, layer, SMALL_LAYER_BBOX)
assert_util.assert_all_sources_bbox(workspace, layer, SMALL_LAYER_BBOX)

rest_url = f"http://{settings.LAYMAN_SERVER_NAME}/geoserver/{workspace}/wfs?request=Transaction"
headers = {
Expand All @@ -431,7 +409,7 @@ def test_wfs_bbox(style_file, thumbnail_style_postfix):
# until there is way to check end of asynchronous task after WFS-T
time.sleep(5)

assert_all_sources_bbox(workspace, layer, exp_bbox)
assert_util.assert_all_sources_bbox(workspace, layer, exp_bbox)

expected_thumbnail_path = f'/code/sample/style/{layer}{thumbnail_style_postfix}{thumbnail_bbox_postfix}.png'
with app.app_context():
Expand Down
12 changes: 6 additions & 6 deletions src/layman/layer/geoserver/geoserver_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from test import process_client, util as test_util, data as test_data
from test import process_client, assert_util, data as test_data
import pytest

from layman import app, settings
Expand Down Expand Up @@ -46,8 +46,8 @@ def test_geoserver_bbox():

process_client.publish_workspace_layer(workspace, layer, style_file='sample/style/small_layer.qml')

test_util.assert_wfs_bbox(workspace, layer, expected_bbox_1)
test_util.assert_wms_bbox(workspace, layer, expected_bbox_1)
assert_util.assert_wfs_bbox(workspace, layer, expected_bbox_1)
assert_util.assert_wms_bbox(workspace, layer, expected_bbox_1)

# test WFS
for bbox, expected_bbox in expected_bboxes:
Expand All @@ -61,7 +61,7 @@ def test_geoserver_bbox():
ensure_user=False,
access_rights=None,
)
test_util.assert_wfs_bbox(workspace, layer, expected_bbox)
assert_util.assert_wfs_bbox(workspace, layer, expected_bbox)

# test WMS
for bbox, expected_bbox in expected_bboxes:
Expand All @@ -74,7 +74,7 @@ def test_geoserver_bbox():
ensure_user=False,
access_rights=None,
)
test_util.assert_wms_bbox(workspace, layer, expected_bbox)
assert_util.assert_wms_bbox(workspace, layer, expected_bbox)

# test cascade WMS from QGIS
for bbox, expected_bbox in expected_bboxes:
Expand All @@ -88,6 +88,6 @@ def test_geoserver_bbox():
ensure_user=False,
access_rights=None,
)
test_util.assert_wms_bbox(workspace, layer, expected_bbox)
assert_util.assert_wms_bbox(workspace, layer, expected_bbox)

process_client.delete_workspace_layer(workspace, layer)
4 changes: 2 additions & 2 deletions src/layman/layer/geoserver/sld_test.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from urllib.parse import urljoin
from test import process_client, util
from test import process_client, assert_util
import requests
import pytest

Expand Down Expand Up @@ -53,7 +53,7 @@ def test_sld_style_applied_in_wms():

url = f"http://{settings.LAYMAN_SERVER_NAME}/geoserver/{workspace}_wms/wms?SERVICE=WMS&VERSION=1.1.1&REQUEST=GetMap&FORMAT=image/png&TRANSPARENT=true&STYLES=&LAYERS={workspace}:{layer}&SRS=EPSG:3857&WIDTH=768&HEIGHT=752&BBOX=-30022616.05686392,-30569903.32873383,30022616.05686392,28224386.44929134"

util.assert_same_images(url, obtained_file, expected_file, 2000)
assert_util.assert_same_images(url, obtained_file, expected_file, 2000)

process_client.delete_workspace_layer(workspace,
layer)
6 changes: 3 additions & 3 deletions src/layman/layer/prime_db_schema/bbox_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from test import process_client, util as test_util, data as test_data
from test import process_client, assert_util, data as test_data
import pytest
from layman import app
from .. import util
Expand All @@ -13,12 +13,12 @@ def test_bbox():

with app.app_context():
info = util.get_layer_info(workspace, layer)
test_util.assert_same_bboxes(info['bounding_box'], test_data.SMALL_LAYER_BBOX, 0.00001)
assert_util.assert_same_bboxes(info['bounding_box'], test_data.SMALL_LAYER_BBOX, 0.00001)

process_client.patch_workspace_layer(workspace, layer, file_paths=['test/data/bbox/layer_3_3-5_5.geojson', ])

with app.app_context():
info = util.get_layer_info(workspace, layer)
test_util.assert_same_bboxes(info['bounding_box'], [3000, 3000, 5000, 5000], 0.1)
assert_util.assert_same_bboxes(info['bounding_box'], [3000, 3000, 5000, 5000], 0.1)

process_client.delete_workspace_layer(workspace, layer)
6 changes: 1 addition & 5 deletions src/layman/layer/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,11 +194,7 @@ def patch_layer(workspace, layername, task_options, stop_sync_at, start_async_at


def patch_after_wfst(workspace, layername, **kwargs):
task_methods = tasks_util.get_source_task_methods(get_layer_type_def(), 'patch_after_wfst')
patch_chain = tasks_util.get_chain_of_methods(workspace, layername, task_methods, kwargs, 'layername')
res = patch_chain()

celery_util.set_publication_chain_info(workspace, LAYER_TYPE, layername, task_methods, res)
layman_util.patch_after_wfst(workspace, LAYER_TYPE, layername, **kwargs)


def delete_layer(workspace, layername, source=None, http_method='delete'):
Expand Down
6 changes: 3 additions & 3 deletions src/layman/map/prime_db_schema/bbox_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from test import process_client, util as test_util, data as test_data
from test import process_client, assert_util, data as test_data
import pytest
from layman import app
from .. import util
Expand All @@ -13,12 +13,12 @@ def test_bbox():

with app.app_context():
info = util.get_map_info(workspace, map)
test_util.assert_same_bboxes(info['bounding_box'], test_data.SMALL_MAP_BBOX, 0.00001)
assert_util.assert_same_bboxes(info['bounding_box'], test_data.SMALL_MAP_BBOX, 0.00001)

process_client.patch_workspace_map(workspace, map, file_paths=['test/data/bbox/map_3_3-5_5.json', ])

with app.app_context():
info = util.get_map_info(workspace, map)
test_util.assert_same_bboxes(info['bounding_box'], [3000, 3000, 5000, 5000], 0.1)
assert_util.assert_same_bboxes(info['bounding_box'], [3000, 3000, 5000, 5000], 0.1)

process_client.delete_workspace_map(workspace, map)
33 changes: 30 additions & 3 deletions src/layman/requests_concurrency_test.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from test import process_client
import time

from test import process_client, assert_util
from test.data import wfs as data_wfs
import requests
import pytest

from layman import settings
from layman import celery, settings
from layman.common import empty_method_returns_true


Expand All @@ -19,23 +21,48 @@ def test_wfst_concurrency():
'Content-type': 'text/xml',
}

process_client.publish_workspace_layer(workspace, layer,)
process_client.publish_workspace_layer(workspace, layer, )

queue = celery.get_run_after_chain_queue(workspace, process_client.LAYER_TYPE, layer)
assert not queue

r = requests.post(rest_url,
data=data_xml,
headers=headers)
assert r.status_code == 200, r.text

queue = celery.get_run_after_chain_queue(workspace, process_client.LAYER_TYPE, layer)
assert len(queue) == 0, queue

process_client.patch_workspace_layer(workspace, layer, title='New title', check_response_fn=empty_method_returns_true)
queue = celery.get_run_after_chain_queue(workspace, process_client.LAYER_TYPE, layer)
assert len(queue) == 1, queue
assert queue == ['layman.util::patch_after_wfst', ]

r = requests.post(rest_url,
data=data_xml,
headers=headers)
assert r.status_code == 200, r.text

queue = celery.get_run_after_chain_queue(workspace, process_client.LAYER_TYPE, layer)
assert len(queue) == 1, queue
assert queue == ['layman.util::patch_after_wfst', ]

r = requests.post(rest_url,
data=data_xml,
headers=headers)
assert r.status_code == 200, r.text

queue = celery.get_run_after_chain_queue(workspace, process_client.LAYER_TYPE, layer)
assert len(queue) == 1, queue
assert queue == ['layman.util::patch_after_wfst', ]

time.sleep(3)

expected_bbox = (1571000.0, 6268800.0, 1572590.8542062, 6269876.33561699)
assert_util.assert_all_sources_bbox(workspace, layer, expected_bbox)

process_client.delete_workspace_layer(workspace, layer, )

queue = celery.get_run_after_chain_queue(workspace, process_client.LAYER_TYPE, layer)
assert not queue, queue
6 changes: 3 additions & 3 deletions src/layman/upgrade/upgrade_v1_10_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import shutil
import os
from collections import namedtuple
from test import process_client, util
from test import process_client, util, assert_util
from test.util import url_for
import pytest

Expand Down Expand Up @@ -142,7 +142,7 @@ def test_migrate_layers_to_wms_workspace(ensure_layer):
f"BBOX=-30022616.05686392,-30569903.32873383,30022616.05686392,28224386.44929134"

obtained_file = 'tmp/artifacts/test_migrate_layers_to_wms_workspace_before_migration.png'
util.assert_same_images(old_wms_url, obtained_file, expected_file, 2000)
assert_util.assert_same_images(old_wms_url, obtained_file, expected_file, 2000)

with app.app_context():
upgrade_v1_10.migrate_layers_to_wms_workspace(workspace)
Expand Down Expand Up @@ -170,7 +170,7 @@ def test_migrate_layers_to_wms_workspace(ensure_layer):
f"LAYERS={wms_workspace}:{layer}&SRS=EPSG:3857&WIDTH=768&HEIGHT=752&" \
f"BBOX=-30022616.05686392,-30569903.32873383,30022616.05686392,28224386.44929134"
obtained_file2 = 'tmp/artifacts/test_migrate_layers_to_wms_workspace_after_migration.png'
util.assert_same_images(new_wms_url, obtained_file2, expected_file, 2000)
assert_util.assert_same_images(new_wms_url, obtained_file2, expected_file, 2000)

process_client.delete_workspace_layer(workspace, layer)

Expand Down
4 changes: 2 additions & 2 deletions src/layman/upgrade/upgrade_v1_12_test.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import datetime
from test import process_client, util as test_util, data as test_data
from test import process_client, assert_util, data as test_data
import pytest

from db import util as db_util
Expand Down Expand Up @@ -188,6 +188,6 @@ def test_adjust_prime_db_schema_for_bbox_search():
results = db_util.run_query(query, (workspace, publication_type, publication))
assert len(results) == 1 and len(results[0]) == 4, results
bbox = results[0]
test_util.assert_same_bboxes(bbox, expected_bbox, 0.000001)
assert_util.assert_same_bboxes(bbox, expected_bbox, 0.000001)

process_client.delete_workspace_layer(workspace, layer)
12 changes: 7 additions & 5 deletions src/layman/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
from flask import current_app, request, url_for as flask_url_for, jsonify
from unidecode import unidecode

from layman import settings
from layman import settings, celery as celery_util
from layman.common import tasks as tasks_util
from layman.http import LaymanError

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -389,7 +390,8 @@ def delete_publications(user,
return jsonify(infos)


def patch_after_wfst(workspace, publication_type, layername, ):
from layman.layer import LAYER_TYPE, util as layer_util
if publication_type == LAYER_TYPE:
layer_util.patch_after_wfst(workspace, layername)
def patch_after_wfst(workspace, publication_type, publication, **kwargs):
task_methods = tasks_util.get_source_task_methods(get_publication_types()[publication_type], 'patch_after_wfst')
patch_chain = tasks_util.get_chain_of_methods(workspace, publication, task_methods, kwargs, 'layername')
res = patch_chain()
celery_util.set_publication_chain_info(workspace, publication_type, publication, task_methods, res)
Loading