From c45cff9d5762a56bf2e7203f10ed144afbcb985e Mon Sep 17 00:00:00 2001 From: index-git Date: Fri, 6 Nov 2020 13:23:35 +0100 Subject: [PATCH] Access rights are checked before storing --- .../common/prime_db_schema/publications.py | 75 ++++++++++ .../prime_db_schema/publications_test.py | 131 ++++++++++++++++++ src/layman/error_list.py | 1 + src/layman/gs_wfs_proxy_test.py | 5 - 4 files changed, 207 insertions(+), 5 deletions(-) diff --git a/src/layman/common/prime_db_schema/publications.py b/src/layman/common/prime_db_schema/publications.py index f60fd0399..7807cda61 100644 --- a/src/layman/common/prime_db_schema/publications.py +++ b/src/layman/common/prime_db_schema/publications.py @@ -64,6 +64,79 @@ def get_publication_infos(workspace_name=None, pub_type=None): return infos +def only_valid_names(users_list): + usernames_for_chesk = users_list.copy() + usernames_for_chesk.discard(ROLE_EVERYONE) + for username in usernames_for_chesk: + info = users.get_user_infos(username) + if not info: + raise LaymanError(43, {f'Not existing username. Username={username}'}) + + +def at_least_one_can_write(can_write): + if not can_write: + raise LaymanError(43, {f'At least one user have to have write rights.'}) + + +def who_can_write_can_read(can_read, can_write): + if ROLE_EVERYONE not in can_read and set(can_write).difference(can_read): + raise LaymanError(43, {f'All users who have write rights have to have also read rights. Who is missing={set(can_write).difference(can_read)}'}) + + +def i_can_still_write(actor_name, can_write): + if ROLE_EVERYONE not in can_write and actor_name not in can_write: + raise LaymanError(43, {f'After the operation, the actor has to have write right.'}) + + +def owner_can_still_write(owner, + can_write, + ): + if owner and ROLE_EVERYONE not in can_write and owner not in can_write: + raise LaymanError(43, {f'Owner of the personal workspace have to keep write right.'}) + + +def check_rights_axioms(can_read, + can_write, + actor_name, + owner, + can_read_old=None, + can_write_old=None): + if can_read: + only_valid_names(can_read) + if can_write: + only_valid_names(can_write) + owner_can_still_write(owner, can_write) + at_least_one_can_write(can_write) + i_can_still_write(actor_name, can_write) + if can_read or can_write: + can_read_check = can_read or can_read_old + can_write_check = can_write or can_write_old + who_can_write_can_read(can_read_check, can_write_check) + + +def check_publication_info(workspace_name, info): + owner_info = users.get_user_infos(workspace_name).get(workspace_name) + info["owner"] = owner_info and owner_info["username"] + try: + check_rights_axioms(info['access_rights'].get('read'), + info['access_rights'].get('write'), + info["actor_name"], + info["owner"], + info['access_rights'].get('read_old'), + info['access_rights'].get('write_old'), + ) + except LaymanError as exc_info: + raise LaymanError(43, {'workspace_name': workspace_name, + 'publication_name': info.get("name"), + 'access_rights': { + 'read': info['access_rights'].get('read'), + 'write': info['access_rights'].get('write'), }, + 'actor_name': info.get("actor_name"), + 'owner': info["owner"], + 'message': exc_info.data, + }) + + def clear_roles(users_list, workspace_name): result_set = set(users_list) result_set.discard(ROLE_EVERYONE) @@ -75,6 +148,7 @@ def clear_roles(users_list, workspace_name): def insert_publication(workspace_name, info): id_workspace = workspaces.ensure_workspace(workspace_name) + check_publication_info(workspace_name, info) insert_publications_sql = f'''insert into {DB_SCHEMA}.publications as p (id_workspace, name, title, type, uuid, everyone_can_read, everyone_can_write) values @@ -122,6 +196,7 @@ def update_publication(workspace_name, info): for right_type in right_type_list: access_rights_det[right_type]['username_list_old'] = info_old["access_rights"][right_type] info["access_rights"][right_type + "_old"] = access_rights_det[right_type]['username_list_old'] + check_publication_info(workspace_name, info) for right_type in right_type_list: if info['access_rights'].get(right_type): diff --git a/src/layman/common/prime_db_schema/publications_test.py b/src/layman/common/prime_db_schema/publications_test.py index 22237ceec..252ae2b2e 100644 --- a/src/layman/common/prime_db_schema/publications_test.py +++ b/src/layman/common/prime_db_schema/publications_test.py @@ -115,6 +115,137 @@ def test_select_publications(): assert len(pubs) == 0, pubs +def test_only_valid_names(): + workspace_name = 'test_only_valid_names_workspace' + username = 'test_only_valid_names_user' + + with app.app_context(): + workspaces.ensure_workspace(workspace_name) + id_workspace_user = workspaces.ensure_workspace(username) + users.ensure_user(id_workspace_user, userinfo) + + publications.only_valid_names(set()) + publications.only_valid_names({username, }) + publications.only_valid_names({settings.RIGHTS_EVERYONE_ROLE, }) + publications.only_valid_names({settings.RIGHTS_EVERYONE_ROLE, username, }) + publications.only_valid_names({username, settings.RIGHTS_EVERYONE_ROLE, }) + + with pytest.raises(LaymanError) as exc_info: + publications.only_valid_names({username, workspace_name}) + assert exc_info.value.code == 43 + + with pytest.raises(LaymanError) as exc_info: + publications.only_valid_names({workspace_name, username}) + assert exc_info.value.code == 43 + + with pytest.raises(LaymanError) as exc_info: + publications.only_valid_names({workspace_name, settings.RIGHTS_EVERYONE_ROLE, }) + assert exc_info.value.code == 43 + + with pytest.raises(LaymanError) as exc_info: + publications.only_valid_names({settings.RIGHTS_EVERYONE_ROLE, 'skaljgdalskfglshfgd', }) + assert exc_info.value.code == 43 + + +def test_at_least_one_can_write(): + workspace_name = 'test_at_least_one_can_write_workspace' + username = 'test_at_least_one_can_write_user' + + publications.at_least_one_can_write({username, }) + publications.at_least_one_can_write({settings.RIGHTS_EVERYONE_ROLE, }) + publications.at_least_one_can_write({username, settings.RIGHTS_EVERYONE_ROLE, }) + publications.at_least_one_can_write({workspace_name, }) + publications.at_least_one_can_write({'lusfjdiaurghalskug', }) + + with pytest.raises(LaymanError) as exc_info: + publications.at_least_one_can_write(set()) + assert exc_info.value.code == 43 + + +def test_who_can_write_can_read(): + workspace_name = 'test_who_can_write_can_read_workspace' + username = 'test_who_can_write_can_read_user' + + publications.who_can_write_can_read(set(), set()) + publications.who_can_write_can_read({username, }, {username, }) + publications.who_can_write_can_read({username, workspace_name}, {username, }) + publications.who_can_write_can_read({username, settings.RIGHTS_EVERYONE_ROLE}, {username, }) + publications.who_can_write_can_read({username, settings.RIGHTS_EVERYONE_ROLE}, {username, settings.RIGHTS_EVERYONE_ROLE, }) + publications.who_can_write_can_read({settings.RIGHTS_EVERYONE_ROLE, }, {settings.RIGHTS_EVERYONE_ROLE, }) + publications.who_can_write_can_read({settings.RIGHTS_EVERYONE_ROLE, }, {settings.RIGHTS_EVERYONE_ROLE, username, }) + publications.who_can_write_can_read({settings.RIGHTS_EVERYONE_ROLE, }, {settings.RIGHTS_EVERYONE_ROLE, workspace_name, }) + publications.who_can_write_can_read({settings.RIGHTS_EVERYONE_ROLE, username, }, {settings.RIGHTS_EVERYONE_ROLE, }) + publications.who_can_write_can_read({settings.RIGHTS_EVERYONE_ROLE, username, }, set()) + publications.who_can_write_can_read({workspace_name, }, {workspace_name, }) + + with pytest.raises(LaymanError) as exc_info: + publications.who_can_write_can_read(set(), {workspace_name, }) + assert exc_info.value.code == 43 + + with pytest.raises(LaymanError) as exc_info: + publications.who_can_write_can_read(set(), {username, }) + assert exc_info.value.code == 43 + + with pytest.raises(LaymanError) as exc_info: + publications.who_can_write_can_read(set(), {settings.RIGHTS_EVERYONE_ROLE, }) + assert exc_info.value.code == 43 + + with pytest.raises(LaymanError) as exc_info: + publications.who_can_write_can_read(username, {settings.RIGHTS_EVERYONE_ROLE, }) + assert exc_info.value.code == 43 + + with pytest.raises(LaymanError) as exc_info: + publications.who_can_write_can_read(username, {workspace_name, }) + assert exc_info.value.code == 43 + + +def test_i_can_still_write(): + workspace_name = 'test_i_can_still_write_workspace' + username = 'test_who_can_write_can_read_user' + + publications.i_can_still_write(None, {settings.RIGHTS_EVERYONE_ROLE, }) + publications.i_can_still_write(None, {username, settings.RIGHTS_EVERYONE_ROLE, }) + publications.i_can_still_write(username, {settings.RIGHTS_EVERYONE_ROLE, }) + publications.i_can_still_write(username, {workspace_name, settings.RIGHTS_EVERYONE_ROLE, }) + publications.i_can_still_write(username, {workspace_name, username, }) + + with pytest.raises(LaymanError) as exc_info: + publications.i_can_still_write(None, set()) + assert exc_info.value.code == 43 + + with pytest.raises(LaymanError) as exc_info: + publications.i_can_still_write(None, {workspace_name, }) + assert exc_info.value.code == 43 + + with pytest.raises(LaymanError) as exc_info: + publications.i_can_still_write(username, set()) + assert exc_info.value.code == 43 + + with pytest.raises(LaymanError) as exc_info: + publications.i_can_still_write(username, {workspace_name, }) + assert exc_info.value.code == 43 + + +def test_owner_can_still_write(): + workspace_name = 'test_owner_can_still_write_workspace' + username = 'test_owner_can_still_write_user' + + publications.owner_can_still_write(None, set()) + publications.owner_can_still_write(None, {settings.RIGHTS_EVERYONE_ROLE, }) + publications.owner_can_still_write(None, {username, }) + publications.owner_can_still_write(username, {settings.RIGHTS_EVERYONE_ROLE, }) + publications.owner_can_still_write(username, {username, }) + publications.owner_can_still_write(username, {username, workspace_name, }) + + with pytest.raises(LaymanError) as exc_info: + publications.owner_can_still_write(username, set()) + assert exc_info.value.code == 43 + + with pytest.raises(LaymanError) as exc_info: + publications.owner_can_still_write(username, {workspace_name, }) + assert exc_info.value.code == 43 + + def test_clear_roles(): workspace_name = 'test_clear_roles_workspace' username = 'test_clear_roles_user' diff --git a/src/layman/error_list.py b/src/layman/error_list.py index 594509c3a..4f6260f31 100644 --- a/src/layman/error_list.py +++ b/src/layman/error_list.py @@ -43,4 +43,5 @@ 40: (404, 'Username does not exist.'), 41: (409, 'Username is in conflict with LAYMAN_GS_USER. To resolve this conflict, you can create new GeoServer user with another name to become new LAYMAN_GS_USER, give him LAYMAN_GS_ROLE and ADMIN roles, remove the old LAYMAN_GS_USER user at GeoServer, change environment settings LAYMAN_GS_USER and LAYMAN_GS_PASSWORD, and restart Layman'), 42: (409, 'LAYMAN_PRIME_SCHEMA is in conflict with existing username.'), + 43: (400, 'Wrong access rights.') } diff --git a/src/layman/gs_wfs_proxy_test.py b/src/layman/gs_wfs_proxy_test.py index 1f815dbad..d8ac18eba 100644 --- a/src/layman/gs_wfs_proxy_test.py +++ b/src/layman/gs_wfs_proxy_test.py @@ -1,11 +1,6 @@ import requests from urllib.parse import urljoin -import sys - - -del sys.modules['layman'] - from layman import app from layman import settings from layman.layer import db