From 5d507ee62337d209153120cfd913e1e1039414a6 Mon Sep 17 00:00:00 2001 From: Pedro Crespo-Valero <32402063+pcrespov@users.noreply.github.com> Date: Wed, 24 Mar 2021 23:58:31 +0100 Subject: [PATCH] =?UTF-8?q?Fixes=20#2041:=20adds=20access-rights=20layer?= =?UTF-8?q?=20to=20storage=20(=E2=9A=A0=EF=B8=8F=20deploy=20&=20upgrade=20?= =?UTF-8?q?patch)=20(#2223)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit FIXES: Study collaborators have no access to the Study files #2041 FIXES: Older study overwrites more recent one ITISFoundation/osparc-issues#435 (when study is shared, the non-owner had no access to stored data, downloaded "nothing" and would upload "nothing", overriding real stored data) FIXES: simcore-sdk multiline error messages * Adding access layer to storage * Added doc Fixed linter import errors * Adds access layer on all dsm operations * Agreggate access rights including ONLY user's groups * Adds debug messages to trace access rights * Fixes and doc * Adapted tests * Fixing copy_folders_from_project handler - refactor mapping of location_ids * Added access layer to folder copy * Sorted members in dsm class * Refactor upload/download so it can be mocked for tests * Mocks datcore downloads * @GitHK review: on style * @GitHK review: download is streamed and saved in chuncks * Fixes sidecar tests: fixture injects in database user and owned project * Fixing simcore-sdk tests: adding user and project in db and establising ownership * Fixes minor typo in webserver API export operation * Fixds handling unformatted file-ids * Adapted simcore-sdk test: responses are more consistent * Started test-access-layer * Fixes simcore-sdk * fixed failing test in webserver-export (#33) * Minor fix prjowner is nullable * Fixes projects cloning * Fixes tests_access_to_studies by extending storage mock Co-authored-by: @sanderegg @odeimaiz @GitHK --- api/specs/webserver/openapi.yaml | 2 +- packages/simcore-sdk/requirements/_test.in | 1 + packages/simcore-sdk/requirements/_test.txt | 5 + .../src/simcore_sdk/node_ports/exceptions.py | 4 +- .../simcore-sdk/tests/integration/conftest.py | 58 ++- .../tests/integration/test_filemanager.py | 34 +- .../tests/integration/test_nodeports2.py | 11 +- services/sidecar/requirements/_test.in | 1 + services/sidecar/requirements/_test.txt | 7 +- .../sidecar/tests/integration/test_sidecar.py | 45 +- services/storage/requirements/_test.in | 1 + services/storage/requirements/_test.txt | 5 + .../simcore_service_storage/access_layer.py | 287 ++++++++++ .../datcore_wrapper.py | 23 +- .../src/simcore_service_storage/db_tokens.py | 15 +- .../src/simcore_service_storage/dsm.py | 488 +++++++++++------- .../src/simcore_service_storage/handlers.py | 330 ++++++------ .../src/simcore_service_storage/models.py | 42 +- .../simcore_service_storage/rest_routes.py | 1 + .../src/simcore_service_storage/utils.py | 40 +- services/storage/tests/conftest.py | 76 +-- services/storage/tests/docker-compose.yml | 9 + services/storage/tests/test_access_layer.py | 83 +++ services/storage/tests/test_dsm.py | 22 +- services/storage/tests/test_rest.py | 47 +- services/storage/tests/test_utils.py | 18 + services/storage/tests/utils.py | 120 ++--- .../api/v0/openapi.yaml | 2 +- .../exporter/formatters/formatter_v1.py | 67 ++- .../projects/projects_api.py | 52 +- .../projects/projects_db.py | 37 +- .../projects/projects_handlers.py | 31 +- .../simcore_service_webserver/storage_api.py | 6 +- .../studies_access.py | 23 +- .../server/tests/integration/test_exporter.py | 2 +- .../integration/test_project_workflow.py | 3 +- .../server/tests/unit/with_dbs/conftest.py | 2 +- .../with_dbs/fast/test_access_to_studies.py | 25 +- .../tests/unit/with_dbs/slow/test_projects.py | 3 - 39 files changed, 1363 insertions(+), 665 deletions(-) create mode 100644 services/storage/src/simcore_service_storage/access_layer.py create mode 100644 services/storage/tests/test_access_layer.py create mode 100644 services/storage/tests/test_utils.py diff --git a/api/specs/webserver/openapi.yaml b/api/specs/webserver/openapi.yaml index 2e886294c73..acc8271acd8 100644 --- a/api/specs/webserver/openapi.yaml +++ b/api/specs/webserver/openapi.yaml @@ -168,7 +168,7 @@ paths: /projects/{project_id}/state: $ref: "./openapi-projects.yaml#/paths/~1projects~1{project_id}~1state" - /projects/{project_id}:xport: + /projects/{project_id}:export: $ref: "./openapi-projects.yaml#/paths/~1projects~1{project_id}~1xport" /projects/{project_id}:duplicate: diff --git a/packages/simcore-sdk/requirements/_test.in b/packages/simcore-sdk/requirements/_test.in index 3f06bfbcc25..e13c8cd7fcf 100644 --- a/packages/simcore-sdk/requirements/_test.in +++ b/packages/simcore-sdk/requirements/_test.in @@ -25,6 +25,7 @@ aioresponses requests docker python-dotenv +faker # tools for CI pylint diff --git a/packages/simcore-sdk/requirements/_test.txt b/packages/simcore-sdk/requirements/_test.txt index 76767b6705c..e6bce6db2d9 100644 --- a/packages/simcore-sdk/requirements/_test.txt +++ b/packages/simcore-sdk/requirements/_test.txt @@ -48,6 +48,8 @@ docopt==0.6.2 # via coveralls execnet==1.8.0 # via pytest-xdist +faker==6.6.2 + # via -r requirements/_test.in icdiff==1.9.1 # via pytest-icdiff idna-ssl==1.1.0 @@ -140,6 +142,7 @@ python-dateutil==2.8.1 # via # -c requirements/_base.txt # alembic + # faker python-dotenv==0.15.0 # via -r requirements/_test.in python-editor==1.0.4 @@ -161,6 +164,8 @@ sqlalchemy[postgresql_psycopg2binary]==1.3.23 # alembic termcolor==1.1.0 # via pytest-sugar +text-unidecode==1.3 + # via faker toml==0.10.2 # via # pylint diff --git a/packages/simcore-sdk/src/simcore_sdk/node_ports/exceptions.py b/packages/simcore-sdk/src/simcore_sdk/node_ports/exceptions.py index d0bbeb7e83e..6564f26d3ae 100644 --- a/packages/simcore-sdk/src/simcore_sdk/node_ports/exceptions.py +++ b/packages/simcore-sdk/src/simcore_sdk/node_ports/exceptions.py @@ -60,11 +60,11 @@ def __init__(self, dct, msg: Optional[str] = None): class StorageInvalidCall(NodeportsException): - """S3 transfer error""" + """Storage returned an error 400<=status<500""" class StorageServerIssue(NodeportsException): - """S3 transfer error""" + """Storage returned an error status>=500""" class S3TransferError(NodeportsException): diff --git a/packages/simcore-sdk/tests/integration/conftest.py b/packages/simcore-sdk/tests/integration/conftest.py index 5eaf5f32946..3360c0aee8c 100644 --- a/packages/simcore-sdk/tests/integration/conftest.py +++ b/packages/simcore-sdk/tests/integration/conftest.py @@ -5,30 +5,60 @@ import asyncio import json -import sys import uuid from pathlib import Path -from typing import Any, Callable, Dict, List, Tuple +from typing import Any, Callable, Dict, Iterable, List, Tuple import np_helpers import pytest import sqlalchemy as sa +from pytest_simcore.helpers.rawdata_fakers import random_project, random_user +from simcore_postgres_database.storage_models import projects, users from simcore_sdk.models.pipeline_models import ComputationalPipeline, ComputationalTask from simcore_sdk.node_ports import node_config from yarl import URL -current_dir = Path(sys.argv[0] if __name__ == "__main__" else __file__).resolve().parent - @pytest.fixture -def user_id() -> int: - # see fixtures/postgres.py - yield 1258 +def user_id(postgres_engine: sa.engine.Engine) -> Iterable[int]: + # inject user in db + + # NOTE: Ideally this (and next fixture) should be done via webserver API but at this point + # in time, the webserver service would bring more dependencies to other services + # which would turn this test too complex. + + # pylint: disable=no-value-for-parameter + stmt = users.insert().values(**random_user(name="test")).returning(users.c.id) + print(str(stmt)) + with postgres_engine.connect() as conn: + result = conn.execute(stmt) + [usr_id] = result.fetchone() + + yield usr_id + + with postgres_engine.connect() as conn: + conn.execute(users.delete().where(users.c.id == usr_id)) @pytest.fixture -def project_id() -> str: - return str(uuid.uuid4()) +def project_id(user_id: int, postgres_engine: sa.engine.Engine) -> Iterable[str]: + # inject project for user in db. This will give user_id, the full project's ownership + + # pylint: disable=no-value-for-parameter + stmt = ( + projects.insert() + .values(**random_project(prj_owner=user_id)) + .returning(projects.c.uuid) + ) + print(str(stmt)) + with postgres_engine.connect() as conn: + result = conn.execute(stmt) + [prj_uuid] = result.fetchone() + + yield prj_uuid + + with postgres_engine.connect() as conn: + conn.execute(projects.delete().where(projects.c.uuid == prj_uuid)) @pytest.fixture @@ -57,7 +87,7 @@ async def filemanager_cfg( @pytest.fixture -def file_uuid(project_id: str, node_uuid: str) -> Callable: +def create_valid_file_uuid(project_id: str, node_uuid: str) -> Callable: def create(file_path: Path, project: str = None, node: str = None): if project is None: project = project_id @@ -65,7 +95,7 @@ def create(file_path: Path, project: str = None, node: str = None): node = node_uuid return np_helpers.file_uuid(file_path, project, node) - yield create + return create @pytest.fixture() @@ -100,13 +130,15 @@ def create_node_link(key: str) -> Dict[str, str]: @pytest.fixture() -def store_link(minio_service, bucket, file_uuid, s3_simcore_location) -> Callable: +def store_link( + minio_service, bucket, create_valid_file_uuid, s3_simcore_location +) -> Callable: def create_store_link( file_path: Path, project_id: str = None, node_id: str = None ) -> Dict[str, str]: # upload the file to S3 assert Path(file_path).exists() - file_id = file_uuid(file_path, project_id, node_id) + file_id = create_valid_file_uuid(file_path, project_id, node_id) # using the s3 client the path must be adapted # TODO: use the storage sdk instead s3_object = Path(project_id, node_id, Path(file_path).name).as_posix() diff --git a/packages/simcore-sdk/tests/integration/test_filemanager.py b/packages/simcore-sdk/tests/integration/test_filemanager.py index b618ebdac91..c2420e357a9 100644 --- a/packages/simcore-sdk/tests/integration/test_filemanager.py +++ b/packages/simcore-sdk/tests/integration/test_filemanager.py @@ -5,7 +5,10 @@ import filecmp from pathlib import Path +from typing import Callable +from uuid import uuid4 +import np_helpers import pytest from simcore_sdk.node_ports import exceptions, filemanager @@ -19,14 +22,14 @@ async def test_valid_upload_download( bucket: str, filemanager_cfg: None, user_id: str, - file_uuid: str, + create_valid_file_uuid: Callable, s3_simcore_location: str, ): file_path = Path(tmpdir) / "test.test" file_path.write_text("I am a test file") assert file_path.exists() - file_id = file_uuid(file_path) + file_id = create_valid_file_uuid(file_path) store_id, e_tag = await filemanager.upload_file( store_id=s3_simcore_location, s3_object=file_id, local_file_path=file_path ) @@ -47,14 +50,14 @@ async def test_invalid_file_path( bucket: str, filemanager_cfg: None, user_id: str, - file_uuid: str, + create_valid_file_uuid: Callable, s3_simcore_location: str, ): file_path = Path(tmpdir) / "test.test" file_path.write_text("I am a test file") assert file_path.exists() - file_id = file_uuid(file_path) + file_id = create_valid_file_uuid(file_path) store = s3_simcore_location with pytest.raises(FileNotFoundError): await filemanager.upload_file( @@ -70,11 +73,12 @@ async def test_invalid_file_path( ) -async def test_invalid_fileid( +async def test_errors_upon_invalid_file_identifiers( tmpdir: Path, bucket: str, filemanager_cfg: None, user_id: str, + project_id: str, s3_simcore_location: str, ): file_path = Path(tmpdir) / "test.test" @@ -86,7 +90,8 @@ async def test_invalid_fileid( await filemanager.upload_file( store_id=store, s3_object="", local_file_path=file_path ) - with pytest.raises(exceptions.StorageServerIssue): + + with pytest.raises(exceptions.StorageInvalidCall): await filemanager.upload_file( store_id=store, s3_object="file_id", local_file_path=file_path ) @@ -96,9 +101,12 @@ async def test_invalid_fileid( await filemanager.download_file_from_s3( store_id=store, s3_object="", local_folder=download_folder ) + with pytest.raises(exceptions.InvalidDownloadLinkError): await filemanager.download_file_from_s3( - store_id=store, s3_object="file_id", local_folder=download_folder + store_id=store, + s3_object=np_helpers.file_uuid("invisible.txt", project_id, uuid4()), + local_folder=download_folder, ) @@ -107,14 +115,14 @@ async def test_invalid_store( bucket: str, filemanager_cfg: None, user_id: str, - file_uuid: str, + create_valid_file_uuid: Callable, s3_simcore_location: str, ): file_path = Path(tmpdir) / "test.test" file_path.write_text("I am a test file") assert file_path.exists() - file_id = file_uuid(file_path) + file_id = create_valid_file_uuid(file_path) store = "somefunkystore" with pytest.raises(exceptions.S3InvalidStore): await filemanager.upload_file( @@ -133,14 +141,14 @@ async def test_valid_metadata( bucket: str, filemanager_cfg: None, user_id: str, - file_uuid: str, + create_valid_file_uuid: Callable, s3_simcore_location: str, ): file_path = Path(tmpdir) / "test.test" file_path.write_text("I am a test file") assert file_path.exists() - file_id = file_uuid(file_path) + file_id = create_valid_file_uuid(file_path) store_id, e_tag = await filemanager.upload_file( store_id=s3_simcore_location, s3_object=file_id, local_file_path=file_path ) @@ -167,11 +175,11 @@ async def test_invalid_metadata( bucket: str, filemanager_cfg: None, user_id: str, - file_uuid: str, + create_valid_file_uuid: Callable, s3_simcore_location: str, ): file_path = Path(tmpdir) / "test.test" - file_id = file_uuid(file_path) + file_id = create_valid_file_uuid(file_path) assert file_path.exists() is False with pytest.raises(exceptions.NodeportsException) as exc_info: diff --git a/packages/simcore-sdk/tests/integration/test_nodeports2.py b/packages/simcore-sdk/tests/integration/test_nodeports2.py index c343aee9e93..9a27b8a6380 100644 --- a/packages/simcore-sdk/tests/integration/test_nodeports2.py +++ b/packages/simcore-sdk/tests/integration/test_nodeports2.py @@ -174,12 +174,21 @@ async def test_port_file_accessors( item_value: str, item_pytype: Type, config_value: Dict[str, str], + project_id, + node_uuid, e_tag: str, ): # pylint: disable=W0613, W0621 - config_dict, project_id, node_uuid = special_configuration( + + config_value["path"] = f"{project_id}/{node_uuid}/{Path(config_value['path']).name}" + + config_dict, _project_id, _node_uuid = special_configuration( inputs=[("in_1", item_type, config_value)], outputs=[("out_34", item_type, None)], ) + + assert _project_id == project_id + assert _node_uuid == node_uuid + PORTS = await node_ports_v2.ports() await check_config_valid(PORTS, config_dict) assert await (await PORTS.outputs)["out_34"].get() is None # check emptyness diff --git a/services/sidecar/requirements/_test.in b/services/sidecar/requirements/_test.in index 65065a30521..16316a981f0 100644 --- a/services/sidecar/requirements/_test.in +++ b/services/sidecar/requirements/_test.in @@ -25,6 +25,7 @@ pytest-sugar aiopg docker python-dotenv +faker # tools for CI pylint diff --git a/services/sidecar/requirements/_test.txt b/services/sidecar/requirements/_test.txt index 8a25a455f6e..b5d01dfe782 100644 --- a/services/sidecar/requirements/_test.txt +++ b/services/sidecar/requirements/_test.txt @@ -9,7 +9,7 @@ aiohttp==3.7.3 # -c requirements/_base.txt # -c requirements/_packages.txt # pytest-aiohttp -aiopg[sa]==1.0.0 +aiopg==1.0.0 # via # -c requirements/_base.txt # -c requirements/_packages.txt @@ -50,6 +50,8 @@ docker==4.4.4 # via -r requirements/_test.in docopt==0.6.2 # via coveralls +faker==6.6.2 + # via -r requirements/_test.in icdiff==1.9.1 # via pytest-icdiff idna-ssl==1.1.0 @@ -144,6 +146,7 @@ python-dateutil==2.8.1 # via # -c requirements/_packages.txt # alembic + # faker python-dotenv==0.15.0 # via -r requirements/_test.in python-editor==1.0.4 @@ -169,6 +172,8 @@ sqlalchemy[postgresql_psycopg2binary]==1.3.19 # alembic termcolor==1.1.0 # via pytest-sugar +text-unidecode==1.3 + # via faker toml==0.10.2 # via # pylint diff --git a/services/sidecar/tests/integration/test_sidecar.py b/services/sidecar/tests/integration/test_sidecar.py index bccf21ab2f2..bf21d5af8d0 100644 --- a/services/sidecar/tests/integration/test_sidecar.py +++ b/services/sidecar/tests/integration/test_sidecar.py @@ -8,14 +8,15 @@ from collections import deque from pathlib import Path from pprint import pformat -from typing import Any, Dict, List, Tuple -from uuid import uuid4 +from typing import Any, Dict, Iterable, List, Tuple import aio_pika import pytest import sqlalchemy as sa from models_library.settings.celery import CeleryConfig from models_library.settings.rabbit import RabbitConfig +from pytest_simcore.helpers.rawdata_fakers import random_project, random_user +from simcore_postgres_database.storage_models import projects, users from simcore_sdk.models.pipeline_models import ComputationalPipeline, ComputationalTask from simcore_service_sidecar import config, utils from yarl import URL @@ -35,13 +36,45 @@ @pytest.fixture -def project_id() -> str: - return str(uuid4()) +def user_id(postgres_engine: sa.engine.Engine) -> Iterable[int]: + # inject user in db + + # NOTE: Ideally this (and next fixture) should be done via webserver API but at this point + # in time, the webserver service would bring more dependencies to other services + # which would turn this test too complex. + + # pylint: disable=no-value-for-parameter + stmt = users.insert().values(**random_user(name="test")).returning(users.c.id) + print(str(stmt)) + with postgres_engine.connect() as conn: + result = conn.execute(stmt) + [usr_id] = result.fetchone() + + yield usr_id + + with postgres_engine.connect() as conn: + conn.execute(users.delete().where(users.c.id == usr_id)) @pytest.fixture -def user_id() -> int: - return 1 +def project_id(user_id: int, postgres_engine: sa.engine.Engine) -> Iterable[str]: + # inject project for user in db. This will give user_id, the full project's ownership + + # pylint: disable=no-value-for-parameter + stmt = ( + projects.insert() + .values(**random_project(prj_owner=user_id)) + .returning(projects.c.uuid) + ) + print(str(stmt)) + with postgres_engine.connect() as conn: + result = conn.execute(stmt) + [prj_uuid] = result.fetchone() + + yield prj_uuid + + with postgres_engine.connect() as conn: + conn.execute(projects.delete().where(projects.c.uuid == prj_uuid)) @pytest.fixture diff --git a/services/storage/requirements/_test.in b/services/storage/requirements/_test.in index a2743cf861d..882b65c3ef4 100644 --- a/services/storage/requirements/_test.in +++ b/services/storage/requirements/_test.in @@ -22,6 +22,7 @@ pylint coverage coveralls codecov +faker # remote debugging ptvsd diff --git a/services/storage/requirements/_test.txt b/services/storage/requirements/_test.txt index 9e1375e08af..a1148416562 100644 --- a/services/storage/requirements/_test.txt +++ b/services/storage/requirements/_test.txt @@ -68,6 +68,8 @@ docopt==0.6.2 # -c requirements/_base.txt # coveralls # docker-compose +faker==6.6.2 + # via -r requirements/_test.in idna-ssl==1.1.0 # via # -c requirements/_base.txt @@ -158,6 +160,7 @@ pytest==6.2.2 python-dateutil==2.8.1 # via # -c requirements/_base.txt + # faker # pandas python-dotenv==0.15.0 # via docker-compose @@ -189,6 +192,8 @@ six==1.15.0 # websocket-client termcolor==1.1.0 # via pytest-sugar +text-unidecode==1.3 + # via faker texttable==1.6.3 # via docker-compose toml==0.10.2 diff --git a/services/storage/src/simcore_service_storage/access_layer.py b/services/storage/src/simcore_service_storage/access_layer.py new file mode 100644 index 00000000000..c453708aa20 --- /dev/null +++ b/services/storage/src/simcore_service_storage/access_layer.py @@ -0,0 +1,287 @@ +""" Helper functions to determin access-rights on stored data + +# DRAFT Rationale: + + osparc-simcore defines TWO authorization methods: i.e. a set of rules on what, + how and when any resource can be accessed or operated by a user + + ## ROLE-BASED METHOD: + In this method, a user is assigned a role (user/tester/admin) upon registration. Each role is + system-wide and defines a set of operations that the user *can* perform + - Every operation is named as a resource and an action (e.g. ) + - Resource is named hierarchically + - Roles can inherit permitted operations from other role + This method is static because is system-wide and it is defined directly in the + code at services/web/server/src/simcore_service_webserver/security_roles.py + It is defined on top of every API entrypoint and applied just after authentication of the user. + + ## GROUP-BASED METHOD: + The second method is designed to dynamically share a resource among groups of users. A group + defines a set of rules that apply to a resource and users can be added to the group dynamically. + So far, there are two resources that define access rights (AR): + - one applies to projects (read/write/delete) and + - the other to services (execute/write) + The project access rights are set in the column "access_rights" of the "projects" table . + The service access rights has its own table: service_access_rights + + Access rights apply hierarchically, meaning that the access granted to a project applies + to all nodes inside and stored data in nodes. + + How do these two AR coexist?: Access to read, write or delete a project are defined in the project AR but execution + will depend on the service AR attached to nodes inside. + + What about stored data? + - data generated in nodes inherits the AR from the associated project + - data generated in API uses full AR provided by ownership (i.e. user_id in files_meta_data table) + +""" + + +import logging +from dataclasses import dataclass +from typing import Dict, List, Optional +from uuid import UUID + +import sqlalchemy as sa +from aiopg.sa.connection import SAConnection +from aiopg.sa.result import ResultProxy, RowProxy +from simcore_postgres_database.storage_models import file_meta_data, user_to_groups +from sqlalchemy.sql import text + +logger = logging.getLogger(__name__) + + +ProjectID = str + + +@dataclass +class AccessRights: + read: bool + write: bool + delete: bool + + @classmethod + def all(cls) -> "AccessRights": + return cls(True, True, True) + + @classmethod + def none(cls) -> "AccessRights": + return cls(False, False, False) + + +class AccessLayerError(Exception): + """ Base class for access-layer related errors """ + + +class InvalidFileIdentifier(AccessLayerError): + """Identifier does not follow the criteria to + be a file identifier (see naming criteria below) + """ + + def __init__(self, identifier, reason=None, details=None): + self.identifier = identifier + self.reason = reason or "Invalid file identifier" + self.details = details + + super().__init__(self.reason, self.details) + + def __str__(self): + return "Error in {}: {} [{}]".format(self.identifier, self.reason, self.details) + + +async def _get_user_groups_ids(conn: SAConnection, user_id: int) -> List[int]: + stmt = sa.select([user_to_groups.c.gid]).where(user_to_groups.c.uid == user_id) + rows = await (await conn.execute(stmt)).fetchall() + user_group_ids = [g.gid for g in rows] + return user_group_ids + + +def _aggregate_access_rights( + access_rights: Dict[str, Dict], group_ids: List[int] +) -> AccessRights: + try: + prj_access = {"read": False, "write": False, "delete": False} + for gid, grp_access in access_rights.items(): + if int(gid) in group_ids: + for operation in grp_access: + prj_access[operation] |= grp_access[operation] + + return AccessRights(**prj_access) + except KeyError: + # NOTE: database does NOT include schema for json access_rights column! + logger.warning( + "Invalid entry in projects.access_rights. Revoking all rights [%s]", + access_rights, + ) + return AccessRights.none() + + +async def list_projects_access_rights( + conn: SAConnection, user_id: int +) -> Dict[ProjectID, AccessRights]: + """ + Returns access-rights of user (user_id) over all OWNED or SHARED projects + """ + + user_group_ids: List[int] = await _get_user_groups_ids(conn, user_id) + + smt = text( + f"""\ + SELECT uuid, access_rights + FROM projects + WHERE ( + prj_owner = {user_id} + OR jsonb_exists_any( access_rights, ( + SELECT ARRAY( SELECT gid::TEXT FROM user_to_groups WHERE uid = {user_id} ) + ) + ) + ) + """ + ) + projects_access_rights = {} + + async for row in conn.execute(smt): + assert isinstance(row.access_rights, dict) + assert isinstance(row.uuid, ProjectID) + + if row.access_rights: + # TODO: access_rights should be direclty filtered from result in stm instead calling again user_group_ids + projects_access_rights[row.uuid] = _aggregate_access_rights( + row.access_rights, user_group_ids + ) + + else: + # backwards compatibility + # - no access_rights defined BUT project is owned + projects_access_rights[row.uuid] = AccessRights.all() + + return projects_access_rights + + +async def get_project_access_rights( + conn: SAConnection, user_id: int, project_id: ProjectID +) -> AccessRights: + """ + Returns access-rights of user (user_id) over a project resource (project_id) + """ + user_group_ids: List[int] = await _get_user_groups_ids(conn, user_id) + + stmt = text( + f"""\ + SELECT prj_owner, access_rights + FROM projects + WHERE ( + ( uuid = '{project_id}' ) AND ( + prj_owner = {user_id} + OR jsonb_exists_any( access_rights, ( + SELECT ARRAY( SELECT gid::TEXT FROM user_to_groups WHERE uid = {user_id} ) + ) + ) + ) + ) + """ + ) + + result: ResultProxy = await conn.execute(stmt) + row: Optional[RowProxy] = await result.first() + + if not row: + # Either project does not exists OR user_id has NO access + return AccessRights.none() + + assert row.prj_owner is None or isinstance(row.prj_owner, int) + assert isinstance(row.access_rights, dict) + + if row.prj_owner == user_id: + return AccessRights.all() + + # determine user's access rights by aggregating AR of all groups + prj_access = _aggregate_access_rights(row.access_rights, user_group_ids) + return prj_access + + +async def get_file_access_rights( + conn: SAConnection, user_id: int, file_uuid: str +) -> AccessRights: + """ + Returns access-rights of user (user_id) over data file resource (file_uuid) + + raises InvalidFileIdentifier + """ + + # + # 1. file registered in file_meta_data table + # + stmt = sa.select([file_meta_data.c.project_id, file_meta_data.c.user_id]).where( + file_meta_data.c.file_uuid == file_uuid + ) + result: ResultProxy = await conn.execute(stmt) + row: Optional[RowProxy] = await result.first() + + if row: + if int(row.user_id) == user_id: + # is owner + return AccessRights.all() + + if not row.project_id: + # not owner and not shared via project + return AccessRights.none() + + # has associated project + access_rights = await get_project_access_rights( + conn, user_id, project_id=row.project_id + ) + if not access_rights: + logger.warning( + "File %s references a project %s that does not exists in db." + "TIP: Audit sync between files_meta_data and projects tables", + file_uuid, + row.project_id, + ) + return AccessRights.none() + + else: + # + # 2. file is NOT registered in meta-data table e.g. it is about to be uploaded or it was deleted + # We rely on the assumption that file_uuid is formatted either as + # + # - project's data: {project_id}/{node_id}/{filename} + # - API data: api/{file_id}/{filename} + # + try: + parent, _, _ = file_uuid.split("/") + + if parent == "api": + # FIXME: this is wrong, all api data must be registered and OWNED + # ownership still not defined, so we assume it is user_id + return AccessRights.all() + + # otherwise assert 'parent' string corresponds to a valid UUID + UUID(parent) # raises ValueError + access_rights = await get_project_access_rights( + conn, user_id, project_id=parent + ) + if not access_rights: + logger.warning( + "File %s references a project %s that does not exists in db", + file_uuid, + row.project_id, + ) + return AccessRights.none() + + except (ValueError, AttributeError) as err: + raise InvalidFileIdentifier( + identifier=file_uuid, + details=str(err), + ) from err + + return access_rights + + +# HELPERS ----------------------------------------------- + + +async def get_readable_project_ids(conn: SAConnection, user_id: int) -> List[ProjectID]: + """ Returns a list of projects where user has granted read-access """ + projects_access_rights = await list_projects_access_rights(conn, int(user_id)) + return [pid for pid, access in projects_access_rights.items() if access.read] diff --git a/services/storage/src/simcore_service_storage/datcore_wrapper.py b/services/storage/src/simcore_service_storage/datcore_wrapper.py index 36c4604a6bc..b7c3f021ad0 100644 --- a/services/storage/src/simcore_service_storage/datcore_wrapper.py +++ b/services/storage/src/simcore_service_storage/datcore_wrapper.py @@ -4,7 +4,6 @@ from concurrent.futures import ThreadPoolExecutor from contextlib import contextmanager from functools import wraps -from pathlib import Path from typing import List, Optional, Tuple import attr @@ -12,14 +11,8 @@ from .datcore import DatcoreClient from .models import FileMetaData, FileMetaDataEx -FileMetaDataVec = List[FileMetaData] -FileMetaDataExVec = List[FileMetaDataEx] - -CURRENT_DIR = Path(__file__).resolve().parent logger = logging.getLogger(__name__) -# pylint: disable=W0703 - @contextmanager def safe_call(error_msg: str = "", *, skip_logs: bool = False): @@ -49,11 +42,11 @@ async def async_wrapper(self, *args, **kwargs): class DatcoreWrapper: - """ Wrapper to call the python2 api from datcore + """Wrapper to call the python2 api from datcore - This can go away now. Next cleanup round... + This can go away now. Next cleanup round... - NOTE: Auto-disables client + NOTE: Auto-disables client """ @@ -72,13 +65,13 @@ def __init__( api_secret=api_secret, host="https://api.blackfynn.io", ) - except Exception: + except Exception: # pylint: disable=broad-except self.d_client = None # Disabled: any call will raise AttributeError logger.warning("Failed to setup datcore. Disabling client.", exc_info=True) @property def is_communication_enabled(self) -> bool: - """ Wrapper class auto-disables if client cannot be created + """Wrapper class auto-disables if client cannot be created e.g. if endpoint service is down @@ -88,7 +81,7 @@ def is_communication_enabled(self) -> bool: return self.d_client is not None @make_async - def list_files_recursively(self) -> FileMetaDataVec: # pylint: disable=W0613 + def list_files_recursively(self) -> List[FileMetaData]: # pylint: disable=W0613 files = [] with safe_call(error_msg="Error listing datcore files"): @@ -97,7 +90,7 @@ def list_files_recursively(self) -> FileMetaDataVec: # pylint: disable=W0613 return files @make_async - def list_files_raw(self) -> FileMetaDataExVec: # pylint: disable=W0613 + def list_files_raw(self) -> List[FileMetaDataEx]: # pylint: disable=W0613 files = [] with safe_call(error_msg="Error listing datcore files"): @@ -108,7 +101,7 @@ def list_files_raw(self) -> FileMetaDataExVec: # pylint: disable=W0613 @make_async def list_files_raw_dataset( self, dataset_id: str - ) -> FileMetaDataExVec: # pylint: disable=W0613 + ) -> List[FileMetaDataEx]: # pylint: disable=W0613 files = [] with safe_call(error_msg="Error listing datcore files"): files = self.d_client.list_files_raw_dataset(dataset_id) diff --git a/services/storage/src/simcore_service_storage/db_tokens.py b/services/storage/src/simcore_service_storage/db_tokens.py index df42c73f1e4..321300c4c4d 100644 --- a/services/storage/src/simcore_service_storage/db_tokens.py +++ b/services/storage/src/simcore_service_storage/db_tokens.py @@ -4,9 +4,8 @@ import sqlalchemy as sa from aiohttp import web from psycopg2 import Error as DbApiError -from tenacity import retry - from servicelib.aiopg_utils import PostgresRetryPolicyUponOperation +from tenacity import retry from .models import tokens from .settings import APP_CONFIG_KEY, APP_DB_ENGINE_KEY @@ -17,20 +16,24 @@ @retry(**PostgresRetryPolicyUponOperation(log).kwargs) async def _get_tokens_from_db(engine, userid): async with engine.acquire() as conn: - stmt = sa.select([tokens,]).where(tokens.c.user_id == userid) + stmt = sa.select( + [ + tokens, + ] + ).where(tokens.c.user_id == userid) result = await conn.execute(stmt) row = await result.first() data = dict(row) if row else {} return data -async def get_api_token_and_secret(request: web.Request, userid) -> Tuple[str, str]: +async def get_api_token_and_secret(app: web.Application, userid) -> Tuple[str, str]: # FIXME: this is a temporary solution. This information should be sent in some form # from the client side together with the userid? - engine = request.app.get(APP_DB_ENGINE_KEY, None) + engine = app.get(APP_DB_ENGINE_KEY, None) # defaults from config if any, othewise None - defaults = request.app[APP_CONFIG_KEY].get("test_datcore", {}) + defaults = app[APP_CONFIG_KEY].get("test_datcore", {}) api_token, api_secret = defaults.get("api_token"), defaults.get("api_secret") if engine: diff --git a/services/storage/src/simcore_service_storage/dsm.py b/services/storage/src/simcore_service_storage/dsm.py index 20dd75f3e26..8eed08c094c 100644 --- a/services/storage/src/simcore_service_storage/dsm.py +++ b/services/storage/src/simcore_service_storage/dsm.py @@ -1,16 +1,19 @@ +# pylint: disable=no-value-for-parameter +# FIXME: E1120:No value for argument 'dml' in method call +# pylint: disable=protected-access +# FIXME: Access to a protected member _result_proxy of a client class + import asyncio import logging import os import re -import shutil import tempfile from collections import deque from concurrent.futures import ThreadPoolExecutor from pathlib import Path -from typing import Dict, List, Optional, Tuple +from typing import Dict, List, Optional, Tuple, Union import aiobotocore -import aiofiles import attr import sqlalchemy as sa from aiohttp import web @@ -20,17 +23,22 @@ from servicelib.aiopg_utils import DBAPIError, PostgresRetryPolicyUponOperation from servicelib.client_session import get_client_session from servicelib.utils import fire_and_forget_task -from sqlalchemy.sql import and_ from tenacity import retry from yarl import URL +from .access_layer import ( + AccessRights, + get_file_access_rights, + get_project_access_rights, + get_readable_project_ids, +) from .datcore_wrapper import DatcoreWrapper from .models import ( DatasetMetaData, FileMetaData, FileMetaDataEx, - _location_from_id, file_meta_data, + get_location_from_id, projects, ) from .s3 import get_config_s3 @@ -44,58 +52,47 @@ SIMCORE_S3_ID, SIMCORE_S3_STR, ) -from .utils import expo - -# pylint: disable=no-value-for-parameter -# FIXME: E1120:No value for argument 'dml' in method call - -# pylint: disable=protected-access -# FIXME: Access to a protected member _result_proxy of a client class - +from .utils import download_to_file_or_raise, expo logger = logging.getLogger(__name__) postgres_service_retry_policy_kwargs = PostgresRetryPolicyUponOperation(logger).kwargs -FileMetaDataVec = List[FileMetaData] -FileMetaDataExVec = List[FileMetaDataEx] -DatasetMetaDataVec = List[DatasetMetaData] - -async def _setup_dsm(app: web.Application): - cfg = app[APP_CONFIG_KEY] +def setup_dsm(app: web.Application): + async def _cleanup_context(app: web.Application): + cfg = app[APP_CONFIG_KEY] - main_cfg = cfg + main_cfg = cfg - engine = app.get(APP_DB_ENGINE_KEY) - loop = asyncio.get_event_loop() - s3_client = app.get(APP_S3_KEY) + engine = app.get(APP_DB_ENGINE_KEY) + loop = asyncio.get_event_loop() + s3_client = app.get(APP_S3_KEY) - max_workers = main_cfg["max_workers"] - pool = ThreadPoolExecutor(max_workers=max_workers) + max_workers = main_cfg["max_workers"] + pool = ThreadPoolExecutor(max_workers=max_workers) - s3_cfg = get_config_s3(app) - bucket_name = s3_cfg["bucket_name"] + s3_cfg = get_config_s3(app) + bucket_name = s3_cfg["bucket_name"] - testing = main_cfg["testing"] - dsm = DataStorageManager( - s3_client, engine, loop, pool, bucket_name, not testing, app - ) + testing = main_cfg["testing"] + dsm = DataStorageManager( + s3_client, engine, loop, pool, bucket_name, not testing, app + ) - app[APP_DSM_KEY] = dsm + app[APP_DSM_KEY] = dsm - yield - # clean up + yield + # NOTE: write here clean up -def setup_dsm(app: web.Application): - app.cleanup_ctx.append(_setup_dsm) + app.cleanup_ctx.append(_cleanup_context) @attr.s(auto_attribs=True) class DatCoreApiToken: - api_token: str = None - api_secret: str = None + api_token: Optional[str] = None + api_secret: Optional[str] = None def to_tuple(self): return (self.api_token, self.api_secret) @@ -138,7 +135,7 @@ class DataStorageManager: pool: ThreadPoolExecutor simcore_bucket_name: str has_project_db: bool - app: web.Application = None + app: Optional[web.Application] = None datcore_tokens: Dict[str, DatCoreApiToken] = attr.Factory(dict) # TODO: perhaps can be used a cache? add a lifetime? @@ -162,7 +159,7 @@ async def locations(self, user_id: str): @classmethod def location_from_id(cls, location_id: str): - return _location_from_id(location_id) + return get_location_from_id(location_id) async def ping_datcore(self, user_id: str) -> bool: """Checks whether user account in datcore is accesible @@ -186,31 +183,39 @@ async def ping_datcore(self, user_id: str) -> bool: return False + # LIST/GET --------------------------- + # pylint: disable=too-many-arguments # pylint: disable=too-many-branches # pylint: disable=too-many-statements async def list_files( self, user_id: str, location: str, uuid_filter: str = "", regex: str = "" - ) -> FileMetaDataExVec: + ) -> List[FileMetaDataEx]: """Returns a list of file paths - Works for simcore.s3 and datcore - - Can filter on uuid: useful to filter on project_id/node_id - - Can filter upon regular expression (for now only on key: value pairs of the FileMetaData) + - Works for simcore.s3 and datcore + - Can filter on uuid: useful to filter on project_id/node_id + - Can filter upon regular expression (for now only on key: value pairs of the FileMetaData) """ data = deque() if location == SIMCORE_S3_STR: + accesible_projects_ids = [] async with self.engine.acquire() as conn: - query = sa.select([file_meta_data]).where( - file_meta_data.c.user_id == user_id + + accesible_projects_ids = await get_readable_project_ids( + conn, int(user_id) ) + has_read_access = ( + file_meta_data.c.user_id == user_id + ) | file_meta_data.c.project_id.in_(accesible_projects_ids) + + query = sa.select([file_meta_data]).where(has_read_access) + async for row in conn.execute(query): - result_dict = dict(zip(row._result_proxy.keys, row._row)) - d = FileMetaData(**result_dict) - parent_id = str(Path(d.object_name).parent) - dex = FileMetaDataEx(fmd=d, parent_id=parent_id) + d = FileMetaData(**dict(row)) + dex = FileMetaDataEx( + fmd=d, parent_id=str(Path(d.object_name).parent) + ) data.append(dex) if self.has_project_db: @@ -219,7 +224,7 @@ async def list_files( try: async with self.engine.acquire() as conn: query = sa.select([projects]).where( - projects.c.prj_owner == user_id + projects.c.uuid.in_(accesible_projects_ids) ) async for row in conn.execute(query): @@ -270,6 +275,7 @@ async def list_files( data = await dcw.list_files_raw() if uuid_filter: + # TODO: incorporate this in db query! _query = re.compile(uuid_filter, re.IGNORECASE) filtered_data = deque() for dx in data: @@ -295,21 +301,22 @@ async def list_files( async def list_files_dataset( self, user_id: str, location: str, dataset_id: str - ) -> FileMetaDataVec: + ) -> Union[List[FileMetaData], List[FileMetaDataEx]]: # this is a cheap shot, needs fixing once storage/db is in sync data = [] if location == SIMCORE_S3_STR: - data = await self.list_files( + data: List[FileMetaDataEx] = await self.list_files( user_id, location, uuid_filter=dataset_id + "/" ) + elif location == DATCORE_STR: api_token, api_secret = self._get_datcore_tokens(user_id) dcw = DatcoreWrapper(api_token, api_secret, self.loop, self.pool) - data = await dcw.list_files_raw_dataset(dataset_id) + data: List[FileMetaData] = await dcw.list_files_raw_dataset(dataset_id) return data - async def list_datasets(self, user_id: str, location: str) -> DatasetMetaDataVec: + async def list_datasets(self, user_id: str, location: str) -> List[DatasetMetaData]: """Returns a list of top level datasets Works for simcore.s3 and datcore @@ -318,22 +325,27 @@ async def list_datasets(self, user_id: str, location: str) -> DatasetMetaDataVec data = [] if location == SIMCORE_S3_STR: - # get lis of all projects belonging to user if self.has_project_db: try: async with self.engine.acquire() as conn: - query = sa.select([projects]).where( - projects.c.prj_owner == user_id + readable_projects_ids = await get_readable_project_ids( + conn, int(user_id) + ) + has_read_access = projects.c.uuid.in_(readable_projects_ids) + + # FIXME: this DOES NOT read from file-metadata table!!! + query = sa.select([projects.c.uuid, projects.c.name]).where( + has_read_access ) async for row in conn.execute(query): - proj_data = dict(row.items()) dmd = DatasetMetaData( - dataset_id=proj_data["uuid"], - display_name=proj_data["name"], + dataset_id=row.uuid, + display_name=row.name, ) data.append(dmd) except DBAPIError as _err: logger.exception("Error querying database for project names") + elif location == DATCORE_STR: api_token, api_secret = self._get_datcore_tokens(user_id) dcw = DatcoreWrapper(api_token, api_secret, self.loop, self.pool) @@ -343,79 +355,43 @@ async def list_datasets(self, user_id: str, location: str) -> DatasetMetaDataVec async def list_file( self, user_id: str, location: str, file_uuid: str - ) -> FileMetaDataEx: + ) -> Optional[FileMetaDataEx]: + if location == SIMCORE_S3_STR: - # TODO: get engine from outside + async with self.engine.acquire() as conn: - query = sa.select([file_meta_data]).where( - and_( - file_meta_data.c.user_id == user_id, - file_meta_data.c.file_uuid == file_uuid, - ) + can: Optional[AccessRights] = await get_file_access_rights( + conn, int(user_id), file_uuid ) - async for row in conn.execute(query): - result_dict = dict(zip(row._result_proxy.keys, row._row)) - d = FileMetaData(**result_dict) - dx = FileMetaDataEx(fmd=d, parent_id="") - return dx + if can.read: + query = sa.select([file_meta_data]).where( + file_meta_data.c.file_uuid == file_uuid + ) + async for row in conn.execute(query): + d = FileMetaData(**dict(row)) + dx = FileMetaDataEx(fmd=d, parent_id="") + return dx + else: + logger.debug("User %s was not read file %s", user_id, file_uuid) + elif location == DATCORE_STR: + # FIXME: review return inconsistencies api_token, api_secret = self._get_datcore_tokens(user_id) _dcw = DatcoreWrapper(api_token, api_secret, self.loop, self.pool) data = [] # await _dcw.list_file(file_uuid) return data - async def delete_file(self, user_id: str, location: str, file_uuid: str): - """Deletes a file given its fmd and location - - Additionally requires a user_id for 3rd party auth - - For internal storage, the db state should be updated upon completion via - Notification mechanism - - For simcore.s3 we can use the file_name - For datcore we need the full path - """ - if location == SIMCORE_S3_STR: - to_delete = [] - async with self.engine.acquire() as conn: - query = sa.select([file_meta_data]).where( - file_meta_data.c.file_uuid == file_uuid - ) - async for row in conn.execute(query): - result_dict = dict(zip(row._result_proxy.keys, row._row)) - d = FileMetaData(**result_dict) - # make sure this is the current user - if d.user_id == user_id: - if self.s3_client.remove_objects( - d.bucket_name, [d.object_name] - ): - stmt = file_meta_data.delete().where( - file_meta_data.c.file_uuid == file_uuid - ) - to_delete.append(stmt) - - async with self.engine.acquire() as conn: - for stmt in to_delete: - await conn.execute(stmt) - - elif location == DATCORE_STR: - api_token, api_secret = self._get_datcore_tokens(user_id) - dcw = DatcoreWrapper(api_token, api_secret, self.loop, self.pool) - # destination, filename = _parse_datcore(file_uuid) - file_id = file_uuid - return await dcw.delete_file_by_id(file_id) + # UPLOAD/DOWNLOAD LINKS --------------------------- async def upload_file_to_datcore( self, user_id: str, local_file_path: str, destination_id: str - ): # pylint: disable=W0613 + ): # uploads a locally available file to dat core given the storage path, optionally attached some meta data api_token, api_secret = self._get_datcore_tokens(user_id) dcw = DatcoreWrapper(api_token, api_secret, self.loop, self.pool) await dcw.upload_file_to_id(destination_id, local_file_path) - # actually we have to query the master db - - async def metadata_file_updater( + async def _metadata_file_updater( self, file_uuid: str, bucket_name: str, @@ -491,18 +467,37 @@ async def metadata_file_updater( logger.error("Could not update file metadata for '%s'", file_uuid) async def upload_link(self, user_id: str, file_uuid: str): + """ + Creates pre-signed upload link and updates metadata table when + link is used and upload is successfuly completed + + SEE _metadata_file_updater + """ + + async with self.engine.acquire() as conn: + can: Optional[AccessRights] = await get_file_access_rights( + conn, int(user_id), file_uuid + ) + if not can.write: + logger.debug( + "User %s was not allowed to upload file %s", user_id, file_uuid + ) + raise web.HTTPForbidden( + reason=f"User does not have enough access rights to upload file {file_uuid}" + ) + @retry(**postgres_service_retry_policy_kwargs) async def _init_metadata() -> Tuple[int, str]: async with self.engine.acquire() as conn: fmd = FileMetaData() fmd.simcore_from_uuid(file_uuid, self.simcore_bucket_name) - fmd.user_id = user_id + fmd.user_id = user_id # NOTE: takes ownership of uploaded data + query = sa.select([file_meta_data]).where( file_meta_data.c.file_uuid == file_uuid ) # if file already exists, we might want to update a time-stamp - rows = await conn.execute(query) - exists = await rows.scalar() + exists = await (await conn.execute(query)).scalar() if exists is None: ins = file_meta_data.insert().values(**vars(fmd)) await conn.execute(ins) @@ -516,7 +511,7 @@ async def _init_metadata() -> Tuple[int, str]: # a parallel task is tarted which will update the metadata of the updated file # once the update has finished. fire_and_forget_task( - self.metadata_file_updater( + self._metadata_file_updater( file_uuid=file_uuid, bucket_name=bucket_name, object_name=object_name, @@ -526,7 +521,43 @@ async def _init_metadata() -> Tuple[int, str]: ) return self.s3_client.create_presigned_put_url(bucket_name, object_name) + async def download_link_s3(self, file_uuid: str, user_id: int) -> str: + + # access layer + async with self.engine.acquire() as conn: + can: Optional[AccessRights] = await get_file_access_rights( + conn, int(user_id), file_uuid + ) + if not can.read: + # NOTE: this is tricky. A user with read access can download and data! + # If write permission would be required, then shared projects as views cannot + # recover data in nodes (e.g. jupyter cannot pull work data) + # + logger.debug( + "User %s was not allowed to download file %s", user_id, file_uuid + ) + raise web.HTTPForbidden( + reason=f"User does not have enough rights to download {file_uuid}" + ) + + link = None + bucket_name = self.simcore_bucket_name + object_name = file_uuid + link = self.s3_client.create_presigned_get_url(bucket_name, object_name) + return link + + async def download_link_datcore(self, user_id: str, file_id: str) -> Dict[str, str]: + link = "" + api_token, api_secret = self._get_datcore_tokens(user_id) + dcw = DatcoreWrapper(api_token, api_secret, self.loop, self.pool) + link, filename = await dcw.download_link_by_id(file_id) + return link, filename + + # COPY ----------------------------- + async def copy_file_s3_s3(self, user_id: str, dest_uuid: str, source_uuid: str): + # FIXME: operation MUST be atomic + # source is s3, location is s3 to_bucket_name = self.simcore_bucket_name to_object_name = dest_uuid @@ -537,6 +568,7 @@ async def copy_file_s3_s3(self, user_id: str, dest_uuid: str, source_uuid: str): self.s3_client.copy_object( to_bucket_name, to_object_name, from_bucket_object_name ) + # update db async with self.engine.acquire() as conn: fmd = FileMetaData() @@ -548,26 +580,30 @@ async def copy_file_s3_s3(self, user_id: str, dest_uuid: str, source_uuid: str): async def copy_file_s3_datcore( self, user_id: str, dest_uuid: str, source_uuid: str ): + session = get_client_session(self.app) + # source is s3, get link and copy to datcore bucket_name = self.simcore_bucket_name object_name = source_uuid filename = source_uuid.split("/")[-1] - tmp_dirpath = tempfile.mkdtemp() - local_file_path = os.path.join(tmp_dirpath, filename) - url = self.s3_client.create_presigned_get_url(bucket_name, object_name) - session = get_client_session(self.app) - async with session.get(url) as resp: - if resp.status == 200: - f = await aiofiles.open(local_file_path, mode="wb") - await f.write(await resp.read()) - await f.close() - # and then upload - await self.upload_file_to_datcore( - user_id=user_id, - local_file_path=local_file_path, - destination_id=dest_uuid, - ) - shutil.rmtree(tmp_dirpath) + + s3_dowload_link = self.s3_client.create_presigned_get_url( + bucket_name, object_name + ) + + with tempfile.TemporaryDirectory() as tmpdir: + # FIXME: connect download and upload streams + local_file_path = os.path.join(tmpdir, filename) + + # Downloads S3 -> local + await download_to_file_or_raise(session, s3_dowload_link, local_file_path) + + # Uploads local -> DATCore + await self.upload_file_to_datcore( + user_id=user_id, + local_file_path=local_file_path, + destination_id=dest_uuid, + ) async def copy_file_datcore_s3( self, @@ -576,6 +612,8 @@ async def copy_file_datcore_s3( source_uuid: str, filename_missing: bool = False, ): + session = get_client_session(self.app) + # 2 steps: Get download link for local copy, the upload link to s3 # TODO: This should be a redirect stream! dc_link, filename = await self.download_link_datcore( @@ -586,22 +624,26 @@ async def copy_file_datcore_s3( s3_upload_link = await self.upload_link(user_id, dest_uuid) - # FIXME: user of mkdtemp is RESPONSIBLE to deleting it https://docs.python.org/3/library/tempfile.html#tempfile.mkdtemp - tmp_dirpath = tempfile.mkdtemp() - local_file_path = os.path.join(tmp_dirpath, filename) - session = get_client_session(self.app) - - async with session.get(dc_link) as resp: - if resp.status == 200: - f = await aiofiles.open(local_file_path, mode="wb") - await f.write(await resp.read()) - await f.close() - s3_upload_link = URL(s3_upload_link) - async with session.put( - s3_upload_link, data=Path(local_file_path).open("rb") - ) as resp: - if resp.status > 299: - _response_text = await resp.text() + with tempfile.TemporaryDirectory() as tmpdir: + # FIXME: connect download and upload streams + + local_file_path = os.path.join(tmpdir, filename) + + # Downloads DATCore -> local + await download_to_file_or_raise(session, dc_link, local_file_path) + + # Uploads local -> S3 + s3_upload_link = URL(s3_upload_link) + async with session.put( + s3_upload_link, + data=Path(local_file_path).open("rb"), + raise_for_status=True, + ) as resp: + logger.debug( + "Uploaded local -> SIMCore %s . Status %s", + s3_upload_link, + resp.status, + ) return dest_uuid @@ -624,20 +666,6 @@ async def copy_file( if dest_location == SIMCORE_S3_STR: await self.copy_file_datcore_s3(user_id, dest_uuid, source_uuid) - async def download_link_s3(self, file_uuid: str) -> str: - link = None - bucket_name = self.simcore_bucket_name - object_name = file_uuid - link = self.s3_client.create_presigned_get_url(bucket_name, object_name) - return link - - async def download_link_datcore(self, user_id: str, file_id: str) -> Dict[str, str]: - link = "" - api_token, api_secret = self._get_datcore_tokens(user_id) - dcw = DatcoreWrapper(api_token, api_secret, self.loop, self.pool) - link, filename = await dcw.download_link_by_id(file_id) - return link, filename - async def deep_copy_project_simcore_s3( self, user_id: str, source_project, destination_project, node_mapping ): @@ -661,6 +689,29 @@ async def deep_copy_project_simcore_s3( source_folder = source_project["uuid"] dest_folder = destination_project["uuid"] + # access layer + async with self.engine.acquire() as conn: + can = await get_project_access_rights( + conn, int(user_id), project_id=source_folder + ) + if not can.read: + logger.debug( + "User %s was not allowed to copy project %s", user_id, source_folder + ) + raise web.HTTPForbidden( + reason=f"User does not have enough access rights to copy project '{source_folder}'" + ) + can = await get_project_access_rights( + conn, int(user_id), project_id=dest_folder + ) + if not can.write: + logger.debug( + "User %s was not allowed to copy project %s", user_id, dest_folder + ) + raise web.HTTPForbidden( + reason=f"User does not have enough access rights to copy project '{dest_folder}'" + ) + # build up naming map based on labels uuid_name_dict = {} uuid_name_dict[dest_folder] = destination_project["name"] @@ -761,6 +812,8 @@ async def deep_copy_project_simcore_s3( # step 4 sync db async with self.engine.acquire() as conn: + + # TODO: upsert in one statment of ALL for fmd in fmds: query = sa.select([file_meta_data]).where( file_meta_data.c.file_uuid == fmd.file_uuid @@ -776,19 +829,84 @@ async def deep_copy_project_simcore_s3( ins = file_meta_data.insert().values(**vars(fmd)) await conn.execute(ins) + # DELETE ------------------------------------- + + async def delete_file(self, user_id: str, location: str, file_uuid: str): + """Deletes a file given its fmd and location + + Additionally requires a user_id for 3rd party auth + + For internal storage, the db state should be updated upon completion via + Notification mechanism + + For simcore.s3 we can use the file_name + For datcore we need the full path + """ + if location == SIMCORE_S3_STR: + # FIXME: operation MUST be atomic, transaction?? + + to_delete = [] + async with self.engine.acquire() as conn: + can: Optional[AccessRights] = await get_file_access_rights( + conn, int(user_id), file_uuid + ) + if not can.delete: + logger.debug( + "User %s was not allowed to delete file %s", user_id, file_uuid + ) + raise web.HTTPForbidden( + reason=f"User '{user_id}' does not have enough access rights to delete file {file_uuid}" + ) + + query = sa.select( + [file_meta_data.c.bucket_name, file_meta_data.c.object_name] + ).where(file_meta_data.c.file_uuid == file_uuid) + + async for row in conn.execute(query): + if self.s3_client.remove_objects( + row.bucket_name, [row.object_name] + ): + to_delete.append(file_uuid) + + await conn.execute( + file_meta_data.delete().where( + file_meta_data.c.file_uuid.in_(to_delete) + ) + ) + + elif location == DATCORE_STR: + # FIXME: review return inconsistencies + api_token, api_secret = self._get_datcore_tokens(user_id) + dcw = DatcoreWrapper(api_token, api_secret, self.loop, self.pool) + # destination, filename = _parse_datcore(file_uuid) + file_id = file_uuid + return await dcw.delete_file_by_id(file_id) + async def delete_project_simcore_s3( self, user_id: str, project_id: str, node_id: Optional[str] = None ) -> web.Response: + """Deletes all files from a given node in a project in simcore.s3 and updated db accordingly. If node_id is not given, then all the project files db entries are deleted. """ + # FIXME: operation MUST be atomic. Mark for deletion and remove from db when deletion fully confirmed + async with self.engine.acquire() as conn: - delete_me = file_meta_data.delete().where( - and_( - file_meta_data.c.user_id == user_id, - file_meta_data.c.project_id == project_id, + # access layer + can: Optional[AccessRights] = await get_project_access_rights( + conn, int(user_id), project_id + ) + if not can.delete: + logger.debug( + "User %s was not allowed to delete project %s", user_id, project_id + ) + raise web.HTTPForbidden( + reason=f"User does not have delete access for {project_id}" ) + + delete_me = file_meta_data.delete().where( + file_meta_data.c.project_id == project_id, ) if node_id: delete_me = delete_me.where(file_meta_data.c.node_id == node_id) @@ -817,23 +935,31 @@ async def delete_project_simcore_s3( ) return response + # SEARCH ------------------------------------- + async def search_files_starting_with( self, user_id: int, prefix: str ) -> List[FileMetaDataEx]: # Avoids using list_files since it accounts for projects/nodes # Storage should know NOTHING about those concepts - data = deque() + files_meta = deque() + async with self.engine.acquire() as conn: + # access layer + can_read_projects_ids = await get_readable_project_ids(conn, int(user_id)) + has_read_access = ( + file_meta_data.c.user_id == str(user_id) + ) | file_meta_data.c.project_id.in_(can_read_projects_ids) + stmt = sa.select([file_meta_data]).where( - (file_meta_data.c.user_id == str(user_id)) - & file_meta_data.c.file_uuid.startswith(prefix) + file_meta_data.c.file_uuid.startswith(prefix) & has_read_access ) async for row in conn.execute(stmt): meta = FileMetaData(**dict(row)) - data.append( - FileMetaDataEx( - fmd=meta, parent_id=str(Path(meta.object_name).parent) - ) + meta_extended = FileMetaDataEx( + fmd=meta, + parent_id=str(Path(meta.object_name).parent), ) - return list(data) + files_meta.append(meta_extended) + return list(files_meta) diff --git a/services/storage/src/simcore_service_storage/handlers.py b/services/storage/src/simcore_service_storage/handlers.py index fed29199a56..71ba7d712d0 100644 --- a/services/storage/src/simcore_service_storage/handlers.py +++ b/services/storage/src/simcore_service_storage/handlers.py @@ -1,11 +1,13 @@ import json import logging +from contextlib import contextmanager from typing import Dict import attr from aiohttp import web from servicelib.rest_utils import extract_and_validate +from .access_layer import InvalidFileIdentifier from .db_tokens import get_api_token_and_secret from .dsm import DataStorageManager, DatCoreApiToken from .meta import __version__ @@ -14,6 +16,52 @@ log = logging.getLogger(__name__) +async def _prepare_storage_manager( + params: Dict, query: Dict, request: web.Request +) -> DataStorageManager: + # FIXME: scope properly, either request or app level!! + # Notice that every request is changing tokens! + # I would rather store tokens in request instead of in dsm + # or creating an different instance of dsm per request + + INIT_STR = "init" + dsm: DataStorageManager = request.app[APP_DSM_KEY] + + user_id = query.get("user_id") + location_id = params.get("location_id") + location = ( + dsm.location_from_id(location_id) if location_id is not None else INIT_STR + ) + + if user_id and location in (INIT_STR, DATCORE_STR): + # TODO: notify from db instead when tokens changed, then invalidate resource which enforces + # re-query when needed. + + # updates from db + token_info = await get_api_token_and_secret(request.app, user_id) + if all(token_info): + dsm.datcore_tokens[user_id] = DatCoreApiToken(*token_info) + else: + dsm.datcore_tokens.pop(user_id, None) + return dsm + + +@contextmanager +def handle_storage_errors(): + """Basic policies to translate low-level errors into HTTP errors""" + # TODO: include _prepare_storage_manager? + # TODO: middleware? decorator? + try: + + yield + + except InvalidFileIdentifier as err: + raise web.HTTPUnprocessableEntity( + reason=f"{err.identifier} is an invalid file identifier" + ) from err + + +# HANDLERS --------------------------------------------------- async def check_health(request: web.Request): log.debug("CHECK HEALTH INCOMING PATH %s", request.path) await extract_and_validate(request) @@ -37,7 +85,7 @@ async def check_action(request: web.Request): # echo's input FIXME: convert to dic # FIXME: output = fake_schema.dump(body) - output = { + return { "path_value": params.get("action"), "query_value": query.get("data"), "body_value": { @@ -45,7 +93,6 @@ async def check_action(request: web.Request): "key2": 0, # body.body_value.key2, }, } - return output async def get_storage_locations(request: web.Request): @@ -58,12 +105,14 @@ async def get_storage_locations(request: web.Request): assert not body, "body %s" % body # nosec assert query["user_id"] # nosec - user_id = query["user_id"] - dsm = await _prepare_storage_manager(params, query, request) - locs = await dsm.locations(user_id) + with handle_storage_errors(): + user_id = query["user_id"] + + dsm = await _prepare_storage_manager(params, query, request) + locs = await dsm.locations(user_id) - return {"error": None, "data": locs} + return {"error": None, "data": locs} async def get_datasets_metadata(request: web.Request): @@ -78,16 +127,18 @@ async def get_datasets_metadata(request: web.Request): assert params["location_id"] # nosec assert query["user_id"] # nosec - location_id = params["location_id"] - user_id = query["user_id"] + with handle_storage_errors(): - dsm = await _prepare_storage_manager(params, query, request) + location_id = params["location_id"] + user_id = query["user_id"] - location = dsm.location_from_id(location_id) - # To implement - data = await dsm.list_datasets(user_id, location) + dsm = await _prepare_storage_manager(params, query, request) - return {"error": None, "data": data} + location = dsm.location_from_id(location_id) + # To implement + data = await dsm.list_datasets(user_id, location) + + return {"error": None, "data": data} async def get_files_metadata(request: web.Request): @@ -102,27 +153,26 @@ async def get_files_metadata(request: web.Request): assert params["location_id"] # nosec assert query["user_id"] # nosec - location_id = params["location_id"] - user_id = query["user_id"] - uuid_filter = query.get("uuid_filter", "") - - dsm = await _prepare_storage_manager(params, query, request) - location = dsm.location_from_id(location_id) + with handle_storage_errors(): + location_id = params["location_id"] + user_id = query["user_id"] + uuid_filter = query.get("uuid_filter", "") - log.debug("list files %s %s %s", user_id, location, uuid_filter) + dsm = await _prepare_storage_manager(params, query, request) + location = dsm.location_from_id(location_id) - data = await dsm.list_files( - user_id=user_id, location=location, uuid_filter=uuid_filter - ) + log.debug("list files %s %s %s", user_id, location, uuid_filter) - data_as_dict = [] - for d in data: - log.info("DATA %s", attr.asdict(d.fmd)) - data_as_dict.append({**attr.asdict(d.fmd), "parent_id": d.parent_id}) + data = await dsm.list_files( + user_id=user_id, location=location, uuid_filter=uuid_filter + ) - envelope = {"error": None, "data": data_as_dict} + data_as_dict = [] + for d in data: + log.info("DATA %s", attr.asdict(d.fmd)) + data_as_dict.append({**attr.asdict(d.fmd), "parent_id": d.parent_id}) - return envelope + return {"error": None, "data": data_as_dict} async def get_files_metadata_dataset(request: web.Request): @@ -138,28 +188,27 @@ async def get_files_metadata_dataset(request: web.Request): assert params["dataset_id"] # nosec assert query["user_id"] # nosec - location_id = params["location_id"] - user_id = query["user_id"] - dataset_id = params["dataset_id"] + with handle_storage_errors(): + location_id = params["location_id"] + user_id = query["user_id"] + dataset_id = params["dataset_id"] - dsm = await _prepare_storage_manager(params, query, request) + dsm = await _prepare_storage_manager(params, query, request) - location = dsm.location_from_id(location_id) + location = dsm.location_from_id(location_id) - log.debug("list files %s %s %s", user_id, location, dataset_id) + log.debug("list files %s %s %s", user_id, location, dataset_id) - data = await dsm.list_files_dataset( - user_id=user_id, location=location, dataset_id=dataset_id - ) - - data_as_dict = [] - for d in data: - log.info("DATA %s", attr.asdict(d.fmd)) - data_as_dict.append({**attr.asdict(d.fmd), "parent_id": d.parent_id}) + data = await dsm.list_files_dataset( + user_id=user_id, location=location, dataset_id=dataset_id + ) - envelope = {"error": None, "data": data_as_dict} + data_as_dict = [] + for d in data: + log.info("DATA %s", attr.asdict(d.fmd)) + data_as_dict.append({**attr.asdict(d.fmd), "parent_id": d.parent_id}) - return envelope + return {"error": None, "data": data_as_dict} async def get_file_metadata(request: web.Request): @@ -173,24 +222,25 @@ async def get_file_metadata(request: web.Request): assert params["fileId"] # nosec assert query["user_id"] # nosec - location_id = params["location_id"] - user_id = query["user_id"] - file_uuid = params["fileId"] - - dsm = await _prepare_storage_manager(params, query, request) - location = dsm.location_from_id(location_id) + with handle_storage_errors(): + location_id = params["location_id"] + user_id = query["user_id"] + file_uuid = params["fileId"] - data = await dsm.list_file(user_id=user_id, location=location, file_uuid=file_uuid) - # when no metadata is found - if data is None: - return {"error": "No result found", "data": {}} + dsm = await _prepare_storage_manager(params, query, request) + location = dsm.location_from_id(location_id) - envelope = { - "error": None, - "data": {**attr.asdict(data.fmd), "parent_id": data.parent_id}, - } + data = await dsm.list_file( + user_id=user_id, location=location, file_uuid=file_uuid + ) + # when no metadata is found + if data is None: + return {"error": "No result found", "data": {}} - return envelope + return { + "error": None, + "data": {**attr.asdict(data.fmd), "parent_id": data.parent_id}, + } async def update_file_meta_data(request: web.Request): @@ -204,12 +254,13 @@ async def update_file_meta_data(request: web.Request): assert params["fileId"] # nosec assert query["user_id"] # nosec - location_id = params["location_id"] - _user_id = query["user_id"] - _file_uuid = params["fileId"] + with handle_storage_errors(): + location_id = params["location_id"] + _user_id = query["user_id"] + _file_uuid = params["fileId"] - dsm = await _prepare_storage_manager(params, query, request) - _location = dsm.location_from_id(location_id) + dsm = await _prepare_storage_manager(params, query, request) + _location = dsm.location_from_id(location_id) async def download_file(request: web.Request): @@ -223,18 +274,19 @@ async def download_file(request: web.Request): assert params["fileId"] # nosec assert query["user_id"] # nosec - location_id = params["location_id"] - user_id = query["user_id"] - file_uuid = params["fileId"] + with handle_storage_errors(): + location_id = params["location_id"] + user_id = query["user_id"] + file_uuid = params["fileId"] - dsm = await _prepare_storage_manager(params, query, request) - location = dsm.location_from_id(location_id) - if location == SIMCORE_S3_STR: - link = await dsm.download_link_s3(file_uuid=file_uuid) - else: - link, _filename = await dsm.download_link_datcore(user_id, file_uuid) + dsm = await _prepare_storage_manager(params, query, request) + location = dsm.location_from_id(location_id) + if location == SIMCORE_S3_STR: + link = await dsm.download_link_s3(file_uuid, user_id) + else: + link, _filename = await dsm.download_link_datcore(user_id, file_uuid) - return {"error": None, "data": {"link": link}} + return {"error": None, "data": {"link": link}} async def upload_file(request: web.Request): @@ -244,26 +296,27 @@ async def upload_file(request: web.Request): assert query, "query %s" % query # nosec assert not body, "body %s" % body # nosec - location_id = params["location_id"] - user_id = query["user_id"] - file_uuid = params["fileId"] - - dsm = await _prepare_storage_manager(params, query, request) - location = dsm.location_from_id(location_id) - - if query.get("extra_source") and query.get("extra_location"): - source_uuid = query["extra_source"] - source_id = query["extra_location"] - source_location = dsm.location_from_id(source_id) - link = await dsm.copy_file( - user_id=user_id, - dest_location=location, - dest_uuid=file_uuid, - source_location=source_location, - source_uuid=source_uuid, - ) - else: - link = await dsm.upload_link(user_id=user_id, file_uuid=file_uuid) + with handle_storage_errors(): + location_id = params["location_id"] + user_id = query["user_id"] + file_uuid = params["fileId"] + + dsm = await _prepare_storage_manager(params, query, request) + location = dsm.location_from_id(location_id) + + if query.get("extra_source") and query.get("extra_location"): + source_uuid = query["extra_source"] + source_id = query["extra_location"] + source_location = dsm.location_from_id(source_id) + link = await dsm.copy_file( + user_id=user_id, + dest_location=location, + dest_uuid=file_uuid, + source_location=source_location, + source_uuid=source_uuid, + ) + else: + link = await dsm.upload_link(user_id=user_id, file_uuid=file_uuid) return {"error": None, "data": {"link": link}} @@ -279,26 +332,25 @@ async def delete_file(request: web.Request): assert params["fileId"] # nosec assert query["user_id"] # nosec - location_id = params["location_id"] - user_id = query["user_id"] - file_uuid = params["fileId"] + with handle_storage_errors(): + location_id = params["location_id"] + user_id = query["user_id"] + file_uuid = params["fileId"] - dsm = await _prepare_storage_manager(params, query, request) - location = dsm.location_from_id(location_id) - _discard = await dsm.delete_file( - user_id=user_id, location=location, file_uuid=file_uuid - ) + dsm = await _prepare_storage_manager(params, query, request) + location = dsm.location_from_id(location_id) + await dsm.delete_file(user_id=user_id, location=location, file_uuid=file_uuid) - return {"error": None, "data": None} + return {"error": None, "data": None} # Exclusive for simcore-s3 storage ----------------------- +# POST /simcore-s3/folders: copy_folders_from_project async def create_folders_from_project(request: web.Request): # FIXME: Update openapi-core. Fails with additionalProperties https://github.com/p1c2u/openapi-core/issues/124. Fails with project # params, query, body = await extract_and_validate(request) - user_id = request.query.get("user_id") body = await request.json() @@ -312,12 +364,15 @@ async def create_folders_from_project(request: web.Request): ) # nosec # TODO: validate project with jsonschema instead?? - params = {"location_id": SIMCORE_S3_ID} - query = {"user_id": user_id} - dsm = await _prepare_storage_manager(params, query, request) - await dsm.deep_copy_project_simcore_s3( - user_id, source_project, destination_project, nodes_map - ) + with handle_storage_errors(): + dsm = await _prepare_storage_manager( + params={"location_id": SIMCORE_S3_ID}, + query={"user_id": user_id}, + request=request, + ) + await dsm.deep_copy_project_simcore_s3( + user_id, source_project, destination_project, nodes_map + ) raise web.HTTPCreated( text=json.dumps(destination_project), content_type="application/json" @@ -329,10 +384,13 @@ async def delete_folders_of_project(request: web.Request): user_id = request.query.get("user_id") node_id = request.query.get("node_id", None) - params = {"location_id": SIMCORE_S3_ID} - query = {"user_id": user_id} - dsm = await _prepare_storage_manager(params, query, request) - await dsm.delete_project_simcore_s3(user_id, folder_id, node_id) + with handle_storage_errors(): + dsm = await _prepare_storage_manager( + params={"location_id": SIMCORE_S3_ID}, + query={"user_id": user_id}, + request=request, + ) + await dsm.delete_project_simcore_s3(user_id, folder_id, node_id) raise web.HTTPNoContent(content_type="application/json") @@ -346,40 +404,16 @@ async def search_files_starting_with(request: web.Request): assert query["user_id"] # nosec assert query["startswith"] # nosec - user_id = int(query["user_id"]) - startswith = query["startswith"] - - dsm = await _prepare_storage_manager( - {"location_id": SIMCORE_S3_ID}, {"user_id": user_id}, request - ) - - data = await dsm.search_files_starting_with(int(user_id), prefix=startswith) - log.debug("Found %d files starting with '%s'", len(data), startswith) - - return [{**attr.asdict(d.fmd), "parent_id": d.parent_id} for d in data] - + with handle_storage_errors(): -# HELPERS ----------------------------------------------------- -INIT_STR = "init" + user_id = int(query["user_id"]) + startswith = query["startswith"] + dsm = await _prepare_storage_manager( + {"location_id": SIMCORE_S3_ID}, {"user_id": user_id}, request + ) -async def _prepare_storage_manager( - params: Dict, query: Dict, request: web.Request -) -> DataStorageManager: - dsm = request.app[APP_DSM_KEY] + data = await dsm.search_files_starting_with(int(user_id), prefix=startswith) + log.debug("Found %d files starting with '%s'", len(data), startswith) - user_id = query.get("user_id") - location_id = params.get("location_id") - location = dsm.location_from_id(location_id) if location_id else INIT_STR - - if user_id and location in (INIT_STR, DATCORE_STR): - # TODO: notify from db instead when tokens changed, then invalidate resource which enforces - # re-query when needed. - - # updates from db - token_info = await get_api_token_and_secret(request, user_id) - if all(token_info): - dsm.datcore_tokens[user_id] = DatCoreApiToken(*token_info) - else: - dsm.datcore_tokens.pop(user_id, None) - return dsm + return [{**attr.asdict(d.fmd), "parent_id": d.parent_id} for d in data] diff --git a/services/storage/src/simcore_service_storage/models.py b/services/storage/src/simcore_service_storage/models.py index 75c80e189bc..a4479ad6f72 100644 --- a/services/storage/src/simcore_service_storage/models.py +++ b/services/storage/src/simcore_service_storage/models.py @@ -3,7 +3,7 @@ """ import datetime from pathlib import Path -from typing import Tuple +from typing import Tuple, Union from uuid import UUID import attr @@ -25,42 +25,26 @@ # pylint: disable=R0902 +_LOCATION_ID_TO_TAG_MAP = {0: SIMCORE_S3_STR, 1: DATCORE_STR} +UNDEFINED_LOCATION_TAG: str = "undefined" + + def _parse_datcore(file_uuid: str) -> Tuple[str, str]: # we should have 12/123123123/111.txt and return (12/123123123, 111.txt) file_path = Path(file_uuid) - destination = file_path.parent - file_name = file_path.name + destination = str(file_path.parent) + file_name = str(file_path.name) return destination, file_name -def _locations(): - # TODO: so far this is hardcoded - simcore_s3 = {"name": SIMCORE_S3_STR, "id": 0} - datcore = {"name": DATCORE_STR, "id": 1} - return [simcore_s3, datcore] - - -def _location_from_id(location_id: str) -> str: - # TODO create a map to sync _location_from_id and _location_from_str - loc_str = "undefined" - if location_id == "0": - loc_str = SIMCORE_S3_STR - elif location_id == "1": - loc_str = DATCORE_STR - - return loc_str - - -def _location_from_str(location: str) -> str: - intstr = "undefined" - if location == SIMCORE_S3_STR: - intstr = "0" - elif location == DATCORE_STR: - intstr = "1" - - return intstr +def get_location_from_id(location_id: Union[str, int]) -> str: + try: + loc_id = int(location_id) + return _LOCATION_ID_TO_TAG_MAP[loc_id] + except (ValueError, KeyError): + return UNDEFINED_LOCATION_TAG @attr.s(auto_attribs=True) diff --git a/services/storage/src/simcore_service_storage/rest_routes.py b/services/storage/src/simcore_service_storage/rest_routes.py index 6c5cbab6602..62619b72925 100644 --- a/services/storage/src/simcore_service_storage/rest_routes.py +++ b/services/storage/src/simcore_service_storage/rest_routes.py @@ -87,6 +87,7 @@ def create(specs: OpenApiSpec) -> List[web.RouteDef]: operation_id = specs.paths[path].operations["post"].operation_id routes.append(web.post(BASEPATH + path, handle, name=operation_id)) + # copy_folders_from_project path, handle = "/simcore-s3/folders", handlers.create_folders_from_project operation_id = specs.paths[path].operations["post"].operation_id routes.append(web.post(BASEPATH + path, handle, name=operation_id)) diff --git a/services/storage/src/simcore_service_storage/utils.py b/services/storage/src/simcore_service_storage/utils.py index 7eaaefc358b..37a6b308145 100644 --- a/services/storage/src/simcore_service_storage/utils.py +++ b/services/storage/src/simcore_service_storage/utils.py @@ -1,12 +1,17 @@ import logging +from pathlib import Path +from typing import Union +import aiofiles import tenacity from aiohttp import ClientSession +from aiohttp.typedefs import StrOrURL from yarl import URL logger = logging.getLogger(__name__) +MAX_CHUNK_SIZE = 1024 RETRY_WAIT_SECS = 2 RETRY_COUNT = 20 CONNECT_TIMEOUT_SECS = 30 @@ -20,7 +25,7 @@ async def assert_enpoint_is_ok( session: ClientSession, url: URL, expected_response: int = 200 ): - """ Tenace check to GET given url endpoint + """Tenace check to GET given url endpoint Typically used to check connectivity to a given service @@ -55,4 +60,35 @@ def expo(base=1.2, factor=0.1, max_value=2): yield a n += 1 else: - yield max_value \ No newline at end of file + yield max_value + + +async def download_to_file_or_raise( + session: ClientSession, + url: StrOrURL, + destination_path: Union[str, Path], + *, + chunk_size=MAX_CHUNK_SIZE, +) -> int: + """ + Downloads content from url into destination_path + + Returns downloaded file size + + May raise aiohttp.ClientErrors: + - aiohttp.ClientResponseError if not 2XX + - aiohttp.ClientPayloadError while streaming chunks + """ + # SEE Streaming API: https://docs.aiohttp.org/en/stable/streams.html + + dest_file = Path(destination_path) + + total_size = 0 + async with session.get(url, raise_for_status=True) as response: + dest_file.parent.mkdir(parents=True, exist_ok=True) + async with aiofiles.open(dest_file, mode="wb") as fh: + async for chunk in response.content.iter_chunked(chunk_size): + await fh.write(chunk) + total_size += len(chunk) + + return total_size diff --git a/services/storage/tests/conftest.py b/services/storage/tests/conftest.py index c367d2bd863..1bcbe1e4029 100644 --- a/services/storage/tests/conftest.py +++ b/services/storage/tests/conftest.py @@ -26,16 +26,26 @@ from simcore_service_storage.dsm import DataStorageManager, DatCoreApiToken from simcore_service_storage.models import FileMetaData from simcore_service_storage.settings import SIMCORE_S3_STR -from utils import ACCESS_KEY, BUCKET_NAME, DATABASE, PASS, SECRET_KEY, USER, USER_ID +from utils import ( + ACCESS_KEY, + BUCKET_NAME, + DATA_DIR, + DATABASE, + PASS, + SECRET_KEY, + USER, + USER_ID, +) + +CURRENT_DIR = Path(sys.argv[0] if __name__ == "__main__" else __file__).resolve().parent -current_dir = Path(sys.argv[0] if __name__ == "__main__" else __file__).resolve().parent # TODO: replace by pytest_simcore -sys.path.append(str(current_dir / "helpers")) +sys.path.append(str(CURRENT_DIR / "helpers")) @pytest.fixture(scope="session") def here() -> Path: - return current_dir + return CURRENT_DIR @pytest.fixture(scope="session") @@ -105,6 +115,9 @@ def docker_compose_file(here) -> Iterator[str]: os.environ = old +# POSTGRES SERVICES FIXTURES--------------------- + + @pytest.fixture(scope="session") def postgres_service(docker_services, docker_ip): url = "postgresql://{user}:{password}@{host}:{port}/{database}".format( @@ -136,30 +149,35 @@ def postgres_service(docker_services, docker_ip): return postgres_service -@pytest.fixture(scope="session") +@pytest.fixture(scope="function") def postgres_service_url(postgres_service, docker_services, docker_ip): - postgres_service_url = ( - "postgresql://{user}:{password}@{host}:{port}/{database}".format( - user=USER, - password=PASS, - database=DATABASE, - host=docker_ip, - port=docker_services.port_for("postgres", 5432), - ) + url = "postgresql://{user}:{password}@{host}:{port}/{database}".format( + user=USER, + password=PASS, + database=DATABASE, + host=docker_ip, + port=docker_services.port_for("postgres", 5432), ) - return postgres_service_url + utils.create_tables(url) + + yield url + + utils.drop_tables(url) @pytest.fixture(scope="function") async def postgres_engine(loop, postgres_service_url): - postgres_engine = await create_engine(postgres_service_url) + pg_engine = await create_engine(postgres_service_url) + + yield pg_engine - yield postgres_engine + if pg_engine: + pg_engine.close() + await pg_engine.wait_closed() - if postgres_engine: - postgres_engine.close() - await postgres_engine.wait_closed() + +## MINIO SERVICE FIXTURES ---------------------------------------------- @pytest.fixture(scope="session") @@ -201,6 +219,9 @@ def s3_client(minio_service): return s3_client +## FAKE DATA FIXTURES ---------------------------------------------- + + @pytest.fixture(scope="function") def mock_files_factory(tmpdir_factory): def _create_files(count): @@ -220,8 +241,10 @@ def _create_files(count): @pytest.fixture(scope="function") -def dsm_mockup_complete_db(postgres_service_url, s3_client) -> Tuple[str, str]: - utils.create_full_tables(url=postgres_service_url) +def dsm_mockup_complete_db(postgres_service_url, s3_client) -> Tuple[Dict, Dict]: + + utils.fill_tables_from_csv_files(url=postgres_service_url) + bucket_name = BUCKET_NAME s3_client.create_bucket(bucket_name, delete_contents_if_exists=True) file_1 = { @@ -229,7 +252,7 @@ def dsm_mockup_complete_db(postgres_service_url, s3_client) -> Tuple[str, str]: "node_id": "ad9bda7f-1dc5-5480-ab22-5fef4fc53eac", "filename": "outputController.dat", } - f = utils.data_dir() / Path("outputController.dat") + f = DATA_DIR / "outputController.dat" object_name = "{project_id}/{node_id}/{filename}".format(**file_1) s3_client.upload_file(bucket_name, object_name, f) @@ -238,17 +261,14 @@ def dsm_mockup_complete_db(postgres_service_url, s3_client) -> Tuple[str, str]: "node_id": "a3941ea0-37c4-5c1d-a7b3-01b5fd8a80c8", "filename": "notebooks.zip", } - f = utils.data_dir() / Path("notebooks.zip") + f = DATA_DIR / "notebooks.zip" object_name = "{project_id}/{node_id}/{filename}".format(**file_2) s3_client.upload_file(bucket_name, object_name, f) yield (file_1, file_2) - utils.drop_all_tables(url=postgres_service_url) @pytest.fixture(scope="function") def dsm_mockup_db(postgres_service_url, s3_client, mock_files_factory): - # db - utils.create_tables(url=postgres_service_url) # s3 client bucket_name = BUCKET_NAME @@ -326,14 +346,12 @@ def dsm_mockup_db(postgres_service_url, s3_client, mock_files_factory): total_count = total_count + 1 assert total_count == N + yield data # s3 client s3_client.remove_bucket(bucket_name, delete_contents=True) - # db - utils.drop_tables(url=postgres_service_url) - @pytest.fixture(scope="function") async def datcore_testbucket(loop, mock_files_factory): diff --git a/services/storage/tests/docker-compose.yml b/services/storage/tests/docker-compose.yml index 5e347c4b466..c8711c7a66e 100644 --- a/services/storage/tests/docker-compose.yml +++ b/services/storage/tests/docker-compose.yml @@ -28,3 +28,12 @@ services: ports: - "9001:9000" command: server /data + adminer: + image: adminer:4.7.6 + init: true + environment: + - ADMINER_DEFAULT_SERVER=postgres + - ADMINER_DESIGN=nette + - ADMINER_PLUGINS=json-column + ports: + - "18080:8080" diff --git a/services/storage/tests/test_access_layer.py b/services/storage/tests/test_access_layer.py new file mode 100644 index 00000000000..9793ca77b4a --- /dev/null +++ b/services/storage/tests/test_access_layer.py @@ -0,0 +1,83 @@ +# pylint:disable=unused-variable +# pylint:disable=unused-argument +# pylint:disable=redefined-outer-name + + +from typing import Iterable +from uuid import UUID + +import pytest +from aiopg.sa.engine import Engine +from pytest_simcore.helpers.rawdata_fakers import random_project, random_user +from simcore_postgres_database.storage_models import projects, users +from simcore_service_storage.access_layer import ( + AccessRights, + get_file_access_rights, + get_project_access_rights, +) + + +@pytest.fixture +async def user_id(postgres_engine: Engine) -> Iterable[int]: + # inject a random user in db + + # NOTE: Ideally this (and next fixture) should be done via webserver API but at this point + # in time, the webserver service would bring more dependencies to other services + # which would turn this test too complex. + + # pylint: disable=no-value-for-parameter + stmt = users.insert().values(**random_user(name="test")).returning(users.c.id) + print(str(stmt)) + async with postgres_engine.acquire() as conn: + result = await conn.execute(stmt) + row = await result.fetchone() + + assert isinstance(row.id, int) + yield row.id + + async with postgres_engine.acquire() as conn: + conn.execute(users.delete().where(users.c.id == row.id)) + + +@pytest.fixture +async def project_id(user_id: int, postgres_engine: Engine) -> Iterable[UUID]: + # inject a random project for user in db. This will give user_id, the full project's ownership + + # pylint: disable=no-value-for-parameter + stmt = ( + projects.insert() + .values(**random_project(prj_owner=user_id)) + .returning(projects.c.uuid) + ) + print(str(stmt)) + async with postgres_engine.acquire() as conn: + result = await conn.execute(stmt) + [prj_uuid] = (await result.fetchone()).as_tuple() + + yield UUID(prj_uuid) + + async with postgres_engine.acquire() as conn: + conn.execute(projects.delete().where(projects.c.uuid == prj_uuid)) + + +@pytest.fixture +async def filemeta_id( + user_id: int, project_id: str, postgres_engine: Engine +) -> Iterable[str]: + raise NotImplementedError() + + +async def test_access_rights_on_owned_project( + user_id: int, project_id: UUID, postgres_engine: Engine +): + + async with postgres_engine.acquire() as conn: + + access = await get_project_access_rights(conn, user_id, str(project_id)) + assert access == AccessRights.all() + + # still NOT registered in file_meta_data BUT with prefix {project_id} owned by user + access = await get_file_access_rights( + conn, user_id, f"{project_id}/node_id/not-in-file-metadata-table.txt" + ) + assert access == AccessRights.all() diff --git a/services/storage/tests/test_dsm.py b/services/storage/tests/test_dsm.py index 8574bf14269..d9f7e7ae46c 100644 --- a/services/storage/tests/test_dsm.py +++ b/services/storage/tests/test_dsm.py @@ -109,16 +109,16 @@ async def test_dsm_s3(dsm_mockup_db, dsm_fixture): def _create_file_meta_for_s3(postgres_url, s3_client, tmp_file): - utils.create_tables(url=postgres_url) + bucket_name = BUCKET_NAME s3_client.create_bucket(bucket_name, delete_contents_if_exists=True) # create file and upload filename = os.path.basename(tmp_file) - project_id = "22" + project_id = "api" # "357879cc-f65d-48b2-ad6c-074e2b9aa1c7" project_name = "battlestar" node_name = "galactica" - node_id = "1006" + node_id = "b423b654-686d-4157-b74b-08fa9d90b36e" file_name = filename file_uuid = os.path.join(str(project_id), str(node_id), str(file_name)) display_name = os.path.join(str(project_name), str(node_name), str(file_name)) @@ -154,7 +154,6 @@ def _create_file_meta_for_s3(postgres_url, s3_client, tmp_file): async def test_links_s3( postgres_service_url, s3_client, mock_files_factory, dsm_fixture ): - utils.create_tables(url=postgres_service_url) tmp_file = mock_files_factory(1)[0] fmd = _create_file_meta_for_s3(postgres_service_url, s3_client, tmp_file) @@ -170,7 +169,7 @@ async def test_links_s3( tmp_file2 = tmp_file + ".rec" user_id = 0 - down_url = await dsm.download_link_s3(fmd.file_uuid) + down_url = await dsm.download_link_s3(fmd.file_uuid, user_id) urllib.request.urlretrieve(down_url, tmp_file2) @@ -180,7 +179,6 @@ async def test_links_s3( async def test_copy_s3_s3( postgres_service_url, s3_client, mock_files_factory, dsm_fixture ): - utils.create_tables(url=postgres_service_url) tmp_file = mock_files_factory(1)[0] fmd = _create_file_meta_for_s3(postgres_service_url, s3_client, tmp_file) @@ -229,8 +227,6 @@ async def test_dsm_datcore( if not has_datcore_tokens(): return - utils.create_tables(url=postgres_service_url) - dsm = dsm_fixture user_id = "0" data = await dsm.list_files( @@ -264,7 +260,7 @@ async def test_dsm_s3_to_datcore( ): if not has_datcore_tokens(): return - utils.create_tables(url=postgres_service_url) + tmp_file = mock_files_factory(1)[0] fmd = _create_file_meta_for_s3(postgres_service_url, s3_client, tmp_file) @@ -314,7 +310,7 @@ async def test_dsm_datcore_to_local( ): if not has_datcore_tokens(): return - utils.create_tables(url=postgres_service_url) + dsm = dsm_fixture user_id = USER_ID data = await dsm.list_files( @@ -343,7 +339,7 @@ async def test_dsm_datcore_to_S3( ): if not has_datcore_tokens(): return - utils.create_tables(url=postgres_service_url) + # create temporary file tmp_file = mock_files_factory(1)[0] dest_fmd = _create_file_meta_for_s3(postgres_service_url, s3_client, tmp_file) @@ -396,7 +392,6 @@ async def test_copy_datcore( ): if not has_datcore_tokens(): return - utils.create_tables(url=postgres_service_url) # the fixture should provide 3 files dsm = dsm_fixture @@ -501,7 +496,8 @@ async def test_deep_copy_project_simcore_s3( if not has_datcore_tokens(): return dsm = dsm_fixture - utils.create_full_tables(url=postgres_service_url) + + utils.fill_tables_from_csv_files(url=postgres_service_url) path_in_datcore = datcore_structured_testbucket["file_id3"] file_name_in_datcore = Path(datcore_structured_testbucket["filename3"]).name diff --git a/services/storage/tests/test_rest.py b/services/storage/tests/test_rest.py index 717c8808bc3..9e498c95dcf 100644 --- a/services/storage/tests/test_rest.py +++ b/services/storage/tests/test_rest.py @@ -11,8 +11,9 @@ import pytest from aiohttp import web +from simcore_service_storage.access_layer import AccessRights from simcore_service_storage.db import setup_db -from simcore_service_storage.dsm import APP_DSM_KEY, setup_dsm +from simcore_service_storage.dsm import APP_DSM_KEY, DataStorageManager, setup_dsm from simcore_service_storage.rest import setup_rest from simcore_service_storage.s3 import setup_s3 from simcore_service_storage.settings import APP_CONFIG_KEY, SIMCORE_S3_ID @@ -292,20 +293,52 @@ def get_project_with_data(): return projects +@pytest.fixture +def mock_datcore_download(mocker, client): + # Use to mock downloading from DATCore + async def _fake_download_to_file_or_raise(session, url, dest_path): + print(f"Faking download: {url} -> {dest_path}") + Path(dest_path).write_text("FAKE: test_create_and_delete_folders_from_project") + + mocker.patch( + "simcore_service_storage.dsm.download_to_file_or_raise", + side_effect=_fake_download_to_file_or_raise, + ) + + dsm = client.app[APP_DSM_KEY] + assert dsm + assert isinstance(dsm, DataStorageManager) + + mock = mocker.patch.object(dsm, "download_link_datcore") + mock.return_value = Future() + mock.return_value.set_result(("https://httpbin.org/image", "foo.txt")) + + +@pytest.fixture +def mock_get_project_access_rights(mocker): + # NOTE: this avoid having to inject project in database + for module in ("dsm", "access_layer"): + mock = mocker.patch( + f"simcore_service_storage.{module}.get_project_access_rights" + ) + mock.return_value = Future() + mock.return_value.set_result(AccessRights.all()) + + @pytest.mark.parametrize( "project_name,project", [(prj["name"], prj) for prj in get_project_with_data()] ) async def test_create_and_delete_folders_from_project( - client, dsm_mockup_db, project_name, project, mocker + client, + dsm_mockup_db, + project_name, + project, + mock_get_project_access_rights, + mock_datcore_download, ): source_project = project destination_project, nodes_map = clone_project_data(source_project) - dsm = client.app[APP_DSM_KEY] - mock_dsm = mocker.patch.object(dsm, "copy_file_datcore_s3") - mock_dsm.return_value = Future() - mock_dsm.return_value.set_result("Howdie") - # CREATING url = ( client.app.router["copy_folders_from_project"].url_for().with_query(user_id="1") diff --git a/services/storage/tests/test_utils.py b/services/storage/tests/test_utils.py new file mode 100644 index 00000000000..82aa06a4517 --- /dev/null +++ b/services/storage/tests/test_utils.py @@ -0,0 +1,18 @@ +from pathlib import Path + +from aiohttp import ClientSession +from simcore_service_storage.utils import MAX_CHUNK_SIZE, download_to_file_or_raise + + +async def test_download_files(tmpdir): + + destination = Path(tmpdir) / "data" + expected_size = MAX_CHUNK_SIZE * 3 + 1000 + + async with ClientSession() as session: + total_size = await download_to_file_or_raise( + session, f"https://httpbin.org/bytes/{expected_size}", destination + ) + assert destination.exists() + assert expected_size == total_size + assert destination.stat().st_size == total_size diff --git a/services/storage/tests/utils.py b/services/storage/tests/utils.py index 40cf9f359f1..94e0a3c930b 100644 --- a/services/storage/tests/utils.py +++ b/services/storage/tests/utils.py @@ -11,6 +11,7 @@ FileMetaData, file_meta_data, groups, + metadata, projects, user_to_groups, users, @@ -29,13 +30,16 @@ BUCKET_NAME = "simcore-testing-bucket" USER_ID = "0" +PG_TABLES_NEEDED_FOR_STORAGE = [ + user_to_groups, + file_meta_data, + projects, + users, + groups, +] -def current_dir() -> Path: - return Path(sys.argv[0] if __name__ == "__main__" else __file__).resolve().parent - - -def data_dir() -> Path: - return current_dir() / Path("data") +CURRENT_DIR = Path(sys.argv[0] if __name__ == "__main__" else __file__).resolve().parent +DATA_DIR = CURRENT_DIR / "data" def has_datcore_tokens() -> bool: @@ -73,20 +77,19 @@ def is_postgres_responsive(url) -> bool: def create_tables(url, engine=None): - meta = sa.MetaData() if not engine: engine = sa.create_engine(url) - meta.drop_all(bind=engine, tables=[file_meta_data]) - meta.create_all(bind=engine, tables=[file_meta_data]) + metadata.drop_all(bind=engine) + metadata.create_all(bind=engine, tables=PG_TABLES_NEEDED_FOR_STORAGE) + return engine def drop_tables(url, engine=None): - meta = sa.MetaData() if not engine: engine = sa.create_engine(url) - meta.drop_all(bind=engine, tables=[file_meta_data]) + metadata.drop_all(bind=engine) def insert_metadata(url: str, fmd: FileMetaData): @@ -114,87 +117,22 @@ def insert_metadata(url: str, fmd: FileMetaData): ) engine = sa.create_engine(url) - conn = engine.connect() - conn.execute(ins) - engine.dispose() - - -def create_full_tables(url): - meta = sa.MetaData() - engine = sa.create_engine(url) + try: + conn = engine.connect() + conn.execute(ins) + finally: + engine.dispose() - meta.drop_all( - bind=engine, - tables=[ - user_to_groups, - file_meta_data, - projects, - users, - groups, - ], - checkfirst=True, - ) - meta.create_all( - bind=engine, - tables=[ - file_meta_data, - projects, - users, - groups, - user_to_groups, - ], - ) - for t in ["users", "file_meta_data", "projects"]: - filename = t + ".csv" - csv_file = str(data_dir() / Path(filename)) - with open(csv_file, "r") as file: - data_df = pd.read_csv(file) - data_df.to_sql( - t, con=engine, index=False, index_label="id", if_exists="append" - ) - - # NOTE: Leave here as a reference - # import psycopg2 - # conn = psycopg2.connect(url) - # cur = conn.cursor() - # columns = [["file_uuid","location_id","location","bucket_name","object_name","project_id","project_name","node_id","node_name","file_name","user_id","user_name"],[],[],[]] - # if False: - # for t in ["file_meta_data", "projects", "users"]: - # filename = t + ".sql" - # sqlfile = str(data_dir() / Path(filename)) - # cur.execute(open(sqlfile, "r").read()) - # else: - # for t in ["file_meta_data", "projects", "users"]: - # filename = t + ".csv" - # csv_file = str(data_dir() / Path(filename)) - # if False: - # with open(csv_file, 'r') as file: - # next(file) - # if t == "file_meta_data": - # cur.copy_from(file, t, sep=',', columns=columns[0]) - # else: - # cur.copy_from(file, t, sep=',') - # conn.commit() - # else: - # with open(csv_file, 'r') as file: - # data_df = pd.read_csv(file) - # data_df.to_sql(t, con=engine, index=False, index_label="id", if_exists='append') - engine.dispose() - - -def drop_all_tables(url): - meta = sa.MetaData() +def fill_tables_from_csv_files(url): engine = sa.create_engine(url) - meta.drop_all( - bind=engine, - tables=[ - file_meta_data, - projects, - users, - groups, - user_to_groups, - ], - ) - engine.dispose() + try: + for table in ["users", "file_meta_data", "projects"]: + with open(DATA_DIR / f"{table}.csv", "r") as file: + data_df = pd.read_csv(file) + data_df.to_sql( + table, con=engine, index=False, index_label="id", if_exists="append" + ) + finally: + engine.dispose() diff --git a/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml b/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml index 655e4091bf8..bb2afafff83 100644 --- a/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml +++ b/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml @@ -10743,7 +10743,7 @@ paths: message: Password is not secure field: pasword status: 400 - '/projects/{project_id}:xport': + '/projects/{project_id}:export': parameters: - name: project_id in: path diff --git a/services/web/server/src/simcore_service_webserver/exporter/formatters/formatter_v1.py b/services/web/server/src/simcore_service_webserver/exporter/formatters/formatter_v1.py index d27c9c4a435..3f515d2af56 100644 --- a/services/web/server/src/simcore_service_webserver/exporter/formatters/formatter_v1.py +++ b/services/web/server/src/simcore_service_webserver/exporter/formatters/formatter_v1.py @@ -293,26 +293,13 @@ async def _remove_runtime_states(project: Project): node_data.state.dependencies = None -async def import_files_and_validate_project( - app: web.Application, user_id: int, root_folder: Path -) -> str: - project_file = await ProjectFile.model_from_file(root_dir=root_folder) - shuffled_data: ShuffledData = project_file.get_shuffled_uuids() - - # replace shuffled_data in project - # NOTE: there is no reason to write the shuffled data to file - log.debug("Loaded project data: %s", project_file) - shuffled_project_file = project_file.new_instance_from_shuffled_data( - shuffled_data=shuffled_data - ) - - log.debug("Shuffled project data: %s", shuffled_project_file) - - # NOTE: it is not necessary to apply data shuffling to the manifest - manifest_file = await ManifestFile.model_from_file(root_dir=root_folder) - - user: Dict = await get_user(app=app, user_id=user_id) - +async def _upload_files_to_storage( + app: web.Application, + user_id: int, + root_folder: Path, + manifest_file: ManifestFile, + shuffled_data: ShuffledData, +) -> List[Tuple[LinkAndPath2, ETag]]: # check all attachments are present client_timeout = ClientTimeout(total=UPLOAD_HTTP_TIMEOUT, connect=5, sock_connect=5) async with ClientSession(timeout=client_timeout) as session: @@ -347,7 +334,30 @@ async def import_files_and_validate_project( ) links_to_new_e_tags = await asyncio.gather(*run_in_parallel) - # finally create and add the project + return links_to_new_e_tags + + +async def import_files_and_validate_project( + app: web.Application, user_id: int, root_folder: Path +) -> str: + project_file = await ProjectFile.model_from_file(root_dir=root_folder) + shuffled_data: ShuffledData = project_file.get_shuffled_uuids() + + # replace shuffled_data in project + # NOTE: there is no reason to write the shuffled data to file + log.debug("Loaded project data: %s", project_file) + shuffled_project_file = project_file.new_instance_from_shuffled_data( + shuffled_data=shuffled_data + ) + + log.debug("Shuffled project data: %s", shuffled_project_file) + + # NOTE: it is not necessary to apply data shuffling to the manifest + manifest_file = await ManifestFile.model_from_file(root_dir=root_folder) + + user: Dict = await get_user(app=app, user_id=user_id) + + # create and add the project project = Project( uuid=shuffled_project_file.uuid, name=shuffled_project_file.name, @@ -367,14 +377,23 @@ async def import_files_and_validate_project( project_uuid = str(project.uuid) try: + await _remove_runtime_states(project) + await add_new_project(app, project, user_id) + + # upload files to storage + links_to_new_e_tags = await _upload_files_to_storage( + app=app, + user_id=user_id, + root_folder=root_folder, + manifest_file=manifest_file, + shuffled_data=shuffled_data, + ) + # fix etags await _fix_file_e_tags(project, links_to_new_e_tags) # NOTE: first fix the file eTags, and then the run hashes await _fix_node_run_hashes_based_on_old_project( project, project_file, shuffled_data ) - - await _remove_runtime_states(project) - await add_new_project(app, project, user_id) except Exception as e: log.warning( "The below error occurred during import\n%s", traceback.format_exc() diff --git a/services/web/server/src/simcore_service_webserver/projects/projects_api.py b/services/web/server/src/simcore_service_webserver/projects/projects_api.py index d6829dce217..71fb8ae0804 100644 --- a/services/web/server/src/simcore_service_webserver/projects/projects_api.py +++ b/services/web/server/src/simcore_service_webserver/projects/projects_api.py @@ -40,7 +40,6 @@ SOCKET_IO_PROJECT_UPDATED_EVENT, post_group_messages, ) -from ..storage_api import copy_data_folders_from_project # mocked in unit-tests from ..storage_api import ( delete_data_folders_of_project, delete_data_folders_of_project_node, @@ -48,7 +47,6 @@ from ..users_api import get_user_name from .config import CONFIG_SECTION_NAME from .projects_db import APP_PROJECT_DBAPI -from .projects_utils import clone_project_document log = logging.getLogger(__name__) @@ -97,34 +95,28 @@ async def get_project_for_user( return project -async def clone_project( - request: web.Request, project: Dict, user_id: int, forced_copy_project_id: str = "" -) -> Dict: - """Clones both document and data folders of a project - - - document - - get new identifiers for project and nodes - - data folders - - folder name composes as project_uuid/node_uuid - - data is deep-copied to new folder corresponding to new identifiers - - managed by storage uservice - - TODO: request to application - - :param request: http request - :type request: web.Request - :param project: source project document - :type project: Dict - :return: project document with updated data links - :rtype: Dict - """ - cloned_project, nodes_map = clone_project_document(project, forced_copy_project_id) - - updated_project = await copy_data_folders_from_project( - request.app, project, cloned_project, nodes_map, user_id - ) - - return updated_project +# NOTE: Needs refactoring after access-layer in storage. DO NOT USE but keep +# here since it documents well the concept +# +# async def clone_project( +# request: web.Request, project: Dict, user_id: int, forced_copy_project_id: str = "" +# ) -> Dict: +# """Clones both document and data folders of a project +# +# - document +# - get new identifiers for project and nodes +# - data folders +# - folder name composes as project_uuid/node_uuid +# - data is deep-copied to new folder corresponding to new identifiers +# - managed by storage uservice +# """ +# cloned_project, nodes_map = clone_project_document(project, forced_copy_project_id) +# +# updated_project = await copy_data_folders_from_project( +# request.app, project, cloned_project, nodes_map, user_id +# ) +# +# return updated_project async def start_project_interactive_services( diff --git a/services/web/server/src/simcore_service_webserver/projects/projects_db.py b/services/web/server/src/simcore_service_webserver/projects/projects_db.py index d3c27e77591..0d73c4f729a 100644 --- a/services/web/server/src/simcore_service_webserver/projects/projects_db.py +++ b/services/web/server/src/simcore_service_webserver/projects/projects_db.py @@ -319,22 +319,13 @@ async def load_user_projects( log.info("Loading projects for user %s", user_id) async with self.engine.acquire() as conn: user_groups: List[RowProxy] = await self.__load_user_groups(conn, user_id) - # HOTFIX: disabled for now as there is an issue with shared data - # query = textwrap.dedent( - # f"""\ - # SELECT * - # FROM projects - # WHERE projects.type != 'TEMPLATE' - # AND (jsonb_exists_any(projects.access_rights, array[{', '.join(f"'{group.gid}'" for group in user_groups)}]) - # OR prj_owner = {user_id}) - # """ - # ) query = textwrap.dedent( f"""\ SELECT * FROM projects WHERE projects.type != 'TEMPLATE' - AND (prj_owner = {user_id}) + AND (jsonb_exists_any(projects.access_rights, array[{', '.join(f"'{group.gid}'" for group in user_groups)}]) + OR prj_owner = {user_id}) """ ) projects_list = await self.__load_projects( @@ -450,19 +441,6 @@ async def _get_project( user_groups: List[RowProxy] = await self.__load_user_groups(connection, user_id) # NOTE: in order to use specific postgresql function jsonb_exists_any we use raw call here - # HOTFIX: disabled for now as there is an issue with shared data - # query = textwrap.dedent( - # f"""\ - # SELECT * - # FROM projects - # WHERE - # {"" if include_templates else "projects.type != 'TEMPLATE' AND"} - # uuid = '{project_uuid}' - # AND (jsonb_exists_any(projects.access_rights, array[{', '.join(f"'{group.gid}'" for group in user_groups)}]) - # OR prj_owner = {user_id}) - # {"FOR UPDATE" if for_update else ""} - # """ - # ) query = textwrap.dedent( f"""\ SELECT * @@ -470,7 +448,8 @@ async def _get_project( WHERE {"" if include_templates else "projects.type != 'TEMPLATE' AND"} uuid = '{project_uuid}' - AND (prj_owner = {user_id}) + AND (jsonb_exists_any(projects.access_rights, array[{', '.join(f"'{group.gid}'" for group in user_groups)}]) + OR prj_owner = {user_id}) {"FOR UPDATE" if for_update else ""} """ ) @@ -495,9 +474,7 @@ async def _get_project( async def add_tag(self, user_id: int, project_uuid: str, tag_id: int) -> Dict: async with self.engine.acquire() as conn: - project = await self._get_project( - conn, user_id, project_uuid, include_templates=True - ) + project = await self._get_project(conn, user_id, project_uuid, include_templates=True) # pylint: disable=no-value-for-parameter query = study_tags.insert().values(study_id=project["id"], tag_id=tag_id) user_email = await self._get_user_email(conn, user_id) @@ -509,9 +486,7 @@ async def add_tag(self, user_id: int, project_uuid: str, tag_id: int) -> Dict: async def remove_tag(self, user_id: int, project_uuid: str, tag_id: int) -> Dict: async with self.engine.acquire() as conn: - project = await self._get_project( - conn, user_id, project_uuid, include_templates=True - ) + project = await self._get_project(conn, user_id, project_uuid, include_templates=True) user_email = await self._get_user_email(conn, user_id) # pylint: disable=no-value-for-parameter query = study_tags.delete().where( diff --git a/services/web/server/src/simcore_service_webserver/projects/projects_handlers.py b/services/web/server/src/simcore_service_webserver/projects/projects_handlers.py index d159bf11c06..6eb582f30fe 100644 --- a/services/web/server/src/simcore_service_webserver/projects/projects_handlers.py +++ b/services/web/server/src/simcore_service_webserver/projects/projects_handlers.py @@ -3,7 +3,7 @@ """ import json import logging -from typing import Any, Dict, List, Optional, Set +from typing import Any, Coroutine, Dict, List, Optional, Set import aioredlock from aiohttp import web @@ -17,11 +17,12 @@ from ..resource_manager.websocket_manager import managed_resource from ..security_api import check_permission from ..security_decorators import permission_required +from ..storage_api import copy_data_folders_from_project from ..users_api import get_user_name from . import projects_api from .projects_db import APP_PROJECT_DBAPI from .projects_exceptions import ProjectInvalidRightsError, ProjectNotFoundError -from .projects_utils import project_uses_available_services +from .projects_utils import clone_project_document, project_uses_available_services OVERRIDABLE_DOCUMENT_KEYS = [ "name", @@ -50,6 +51,7 @@ async def create_projects(request: web.Request): try: project = {} + clone_data_coro: Optional[Coroutine] = None if as_template: # create template from await check_permission(request, "project.template.create") @@ -60,16 +62,28 @@ async def create_projects(request: web.Request): user_id=user_id, include_templates=False, ) - project = await projects_api.clone_project(request, source_project, user_id) + # clone user project as tempalte + project, nodes_map = clone_project_document( + source_project, forced_copy_project_id=False + ) + clone_data_coro = copy_data_folders_from_project( + request.app, source_project, project, nodes_map, user_id + ) elif template_uuid: # create from template - template_prj = await db.get_template_project(template_uuid) - if not template_prj: + source_project = await db.get_template_project(template_uuid) + if not source_project: raise web.HTTPNotFound( reason="Invalid template uuid {}".format(template_uuid) ) + # clone template as user project + project, nodes_map = clone_project_document( + source_project, forced_copy_project_id=False + ) + clone_data_coro = copy_data_folders_from_project( + request.app, source_project, project, nodes_map, user_id + ) - project = await projects_api.clone_project(request, template_prj, user_id) # remove template access rights project["accessRights"] = {} # FIXME: parameterized inputs should get defaults provided by service @@ -93,6 +107,11 @@ async def create_projects(request: web.Request): project = await db.add_project( project, user_id, force_as_template=as_template is not None ) + + # copies the project's DATA IF cloned + if clone_data_coro: + await clone_data_coro + # This is a new project and every new graph needs to be reflected in the pipeline tables await director_v2.create_or_update_pipeline( request.app, user_id, project["uuid"] diff --git a/services/web/server/src/simcore_service_webserver/storage_api.py b/services/web/server/src/simcore_service_webserver/storage_api.py index ca41f3d07dc..a5722ebd03f 100644 --- a/services/web/server/src/simcore_service_webserver/storage_api.py +++ b/services/web/server/src/simcore_service_webserver/storage_api.py @@ -5,9 +5,8 @@ from pprint import pformat from aiohttp import web -from yarl import URL - from servicelib.rest_responses import unwrap_envelope +from yarl import URL from .storage_config import get_client_session, get_storage_config @@ -44,6 +43,9 @@ async def copy_data_folders_from_project( ssl=False, ) as resp: payload = await resp.json() + + # FIXME: relying on storage to change the project is not a good idea since + # it is not storage responsibility to deal with projects updated_project, error = unwrap_envelope(payload) if error: msg = "Cannot copy project data in storage: %s" % pformat(error) diff --git a/services/web/server/src/simcore_service_webserver/studies_access.py b/services/web/server/src/simcore_service_webserver/studies_access.py index 5d190d24b97..a53b18ae80b 100644 --- a/services/web/server/src/simcore_service_webserver/studies_access.py +++ b/services/web/server/src/simcore_service_webserver/studies_access.py @@ -17,7 +17,6 @@ from aiohttp import web from aioredlock import Aioredlock - from servicelib.application_keys import APP_CONFIG_KEY from servicelib.application_setup import ModuleCategory, app_module_setup @@ -28,6 +27,7 @@ ) from .security_api import is_anonymous, remember from .statics import INDEX_RESOURCE_NAME +from .storage_api import copy_data_folders_from_project from .utils import compose_error_msg log = logging.getLogger(__name__) @@ -149,8 +149,10 @@ async def copy_study_to_account( """ from .projects.projects_db import APP_PROJECT_DBAPI from .projects.projects_exceptions import ProjectNotFoundError - from .projects.projects_utils import substitute_parameterized_inputs - from .projects.projects_api import clone_project + from .projects.projects_utils import ( + clone_project_document, + substitute_parameterized_inputs, + ) # FIXME: ONLY projects should have access to db since it avoids access layer # TODO: move to project_api and add access layer @@ -169,10 +171,11 @@ async def copy_study_to_account( # FIXME: if template is parametrized and user has already a copy, then delete it and create a new one?? except ProjectNotFoundError: - # new project from template - project = await clone_project( - request, template_project, user["id"], forced_copy_project_id=project_uuid + # New project cloned from template + project, nodes_map = clone_project_document( + template_project, forced_copy_project_id=project_uuid ) + # remove template access rights # FIXME: temporary fix until. Unify access management while cloning a project. Right not, at least two workflows have different implementations project["accessRights"] = {} @@ -184,7 +187,15 @@ async def copy_study_to_account( substitute_parameterized_inputs(project, template_parameters) or project ) + # add project model + copy data TODO: guarantee order and atomicity await db.add_project(project, user["id"], force_project_uuid=True) + await copy_data_folders_from_project( + request.app, + template_project, + project, + nodes_map, + user["id"], + ) return project_uuid diff --git a/services/web/server/tests/integration/test_exporter.py b/services/web/server/tests/integration/test_exporter.py index eed45e77972..02d307c2d0b 100644 --- a/services/web/server/tests/integration/test_exporter.py +++ b/services/web/server/tests/integration/test_exporter.py @@ -467,7 +467,7 @@ async def test_import_export_import_duplicate( url_export = client.app.router["export_project"].url_for( project_id=imported_project_uuid ) - assert url_export == URL(API_PREFIX + f"/projects/{imported_project_uuid}:xport") + assert url_export == URL(API_PREFIX + f"/projects/{imported_project_uuid}:export") async with await client.post(url_export, timeout=10) as export_response: assert export_response.status == 200, await export_response.text() diff --git a/services/web/server/tests/integration/test_project_workflow.py b/services/web/server/tests/integration/test_project_workflow.py index b19260802b4..883f2055f6e 100644 --- a/services/web/server/tests/integration/test_project_workflow.py +++ b/services/web/server/tests/integration/test_project_workflow.py @@ -132,8 +132,9 @@ async def storage_subsystem_mock(loop, mocker): Patched functions are exposed within projects but call storage subsystem """ # requests storage to copy data + mock = mocker.patch( - "simcore_service_webserver.projects.projects_api.copy_data_folders_from_project" + "simcore_service_webserver.projects.projects_handlers.copy_data_folders_from_project" ) async def _mock_copy_data_from_project(*args): diff --git a/services/web/server/tests/unit/with_dbs/conftest.py b/services/web/server/tests/unit/with_dbs/conftest.py index 76f69a82ae9..141d0bc6fbc 100644 --- a/services/web/server/tests/unit/with_dbs/conftest.py +++ b/services/web/server/tests/unit/with_dbs/conftest.py @@ -208,7 +208,7 @@ async def storage_subsystem_mock(loop, mocker): """ # requests storage to copy data mock = mocker.patch( - "simcore_service_webserver.projects.projects_api.copy_data_folders_from_project" + "simcore_service_webserver.projects.projects_handlers.copy_data_folders_from_project" ) async def _mock_copy_data_from_project(*args): diff --git a/services/web/server/tests/unit/with_dbs/fast/test_access_to_studies.py b/services/web/server/tests/unit/with_dbs/fast/test_access_to_studies.py index 49fa60a8161..2eca3723589 100644 --- a/services/web/server/tests/unit/with_dbs/fast/test_access_to_studies.py +++ b/services/web/server/tests/unit/with_dbs/fast/test_access_to_studies.py @@ -187,7 +187,7 @@ async def mocked_get_services_for_user(*args, **kwargs): @pytest.fixture -def mocks_on_projects_api(mocker) -> Dict: +def mocks_on_projects_api(mocker) -> None: """ All projects in this module are UNLOCKED """ @@ -197,6 +197,29 @@ def mocks_on_projects_api(mocker) -> Dict: ) +@pytest.fixture +async def storage_subsystem_mock(storage_subsystem_mock, mocker): + """ + Mocks functions that require storage client + """ + # Overrides + extends fixture in services/web/server/tests/unit/with_dbs/conftest.py + # SEE https://docs.pytest.org/en/stable/fixture.html#override-a-fixture-on-a-folder-conftest-level + + # Mocks copy_data_folders_from_project BUT under studies_access + mock = mocker.patch( + "simcore_service_webserver.studies_access.copy_data_folders_from_project" + ) + + async def _mock_copy_data_from_project(app, src_prj, dst_prj, nodes_map, user_id): + print( + f"MOCK copying data project {src_prj['uuid']} -> {dst_prj['uuid']} " + f"with {len(nodes_map)} s3 objects by user={user_id}" + ) + return dst_prj + + mock.side_effect = _mock_copy_data_from_project + + # TESTS ---------------------------------------------------------------------------------------------- diff --git a/services/web/server/tests/unit/with_dbs/slow/test_projects.py b/services/web/server/tests/unit/with_dbs/slow/test_projects.py index 7ecbccade09..31a8b71077c 100644 --- a/services/web/server/tests/unit/with_dbs/slow/test_projects.py +++ b/services/web/server/tests/unit/with_dbs/slow/test_projects.py @@ -796,7 +796,6 @@ async def test_new_template_from_project( pytest.fail("Invalid uuid in workbench node {}".format(node_name)) -@pytest.mark.skip(reason="https://github.com/ITISFoundation/osparc-simcore/issues/2041") @pytest.mark.parametrize(*standard_role_response()) @pytest.mark.parametrize( "share_rights", @@ -1375,7 +1374,6 @@ async def test_tags_to_studies( await assert_status(resp, web.HTTPNoContent) -@pytest.mark.skip(reason="https://github.com/ITISFoundation/osparc-simcore/issues/2041") @pytest.mark.parametrize(*standard_role_response()) async def test_open_shared_project_2_users_locked( client, @@ -1523,7 +1521,6 @@ async def test_open_shared_project_2_users_locked( ) -@pytest.mark.skip(reason="https://github.com/ITISFoundation/osparc-simcore/issues/2041") @pytest.mark.parametrize(*standard_role_response()) async def test_open_shared_project_at_same_time( loop,