From dcad2db54e53406c7129579fa28a9afbc7c95181 Mon Sep 17 00:00:00 2001 From: Sylvain <35365065+sanderegg@users.noreply.github.com> Date: Thu, 10 Mar 2022 17:40:27 +0100 Subject: [PATCH] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20Replace=20aiopg=20in=20cat?= =?UTF-8?q?alog=20service=20(#2869)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * catalog uses sqlalchemy in async mode * use async sqlalchemy in tests too * added caching of director-v0 services --- .../utils_aiosqlalchemy.py | 32 ++ .../src/pytest_simcore/postgres_service.py | 21 +- .../src/settings_library/postgres.py | 12 +- services/catalog/requirements/_base.in | 4 +- services/catalog/requirements/_base.txt | 27 +- services/catalog/requirements/_test.in | 3 +- services/catalog/requirements/_test.txt | 5 + .../api/dependencies/database.py | 22 +- .../api/routes/services.py | 99 +++--- .../core/background_tasks.py | 8 +- .../simcore_service_catalog/core/events.py | 1 + .../src/simcore_service_catalog/db/events.py | 34 +- .../db/repositories/_base.py | 4 +- .../db/repositories/dags.py | 21 +- .../db/repositories/groups.py | 26 +- .../db/repositories/projects.py | 4 +- .../db/repositories/services.py | 47 ++- .../services/access_rights.py | 4 +- .../services/director.py | 4 +- .../utils/requests_decorators.py | 2 +- .../tests/unit/test_services_director.py | 6 +- .../catalog/tests/unit/with_dbs/conftest.py | 91 ++++-- .../unit/with_dbs/test_db_repositories.py | 4 +- .../unit/with_dbs/test_entrypoint_services.py | 309 +++++++++++------- .../with_dbs/test_services_access_rights.py | 6 +- .../tests/unit/test_modules_dask_client.py | 4 +- .../locust_files/catalog_services.py | 52 +-- .../locust_files/webserver_services.py | 51 +++ 28 files changed, 571 insertions(+), 332 deletions(-) create mode 100644 packages/postgres-database/src/simcore_postgres_database/utils_aiosqlalchemy.py create mode 100644 tests/performance/locust_files/webserver_services.py diff --git a/packages/postgres-database/src/simcore_postgres_database/utils_aiosqlalchemy.py b/packages/postgres-database/src/simcore_postgres_database/utils_aiosqlalchemy.py new file mode 100644 index 00000000000..2e7634dea28 --- /dev/null +++ b/packages/postgres-database/src/simcore_postgres_database/utils_aiosqlalchemy.py @@ -0,0 +1,32 @@ +from typing import Any, Dict + +import sqlalchemy as sa +from sqlalchemy.ext.asyncio import AsyncEngine + +from .utils_migration import get_current_head + + +async def get_pg_engine_stateinfo(engine: AsyncEngine) -> Dict[str, Any]: + return { + "current pool connections": f"{engine.pool.checkedin()=},{engine.pool.checkedout()=}", + } + + +class DBMigrationError(RuntimeError): + pass + + +async def raise_if_migration_not_ready(engine: AsyncEngine): + """Ensures db migration is complete + + :raises DBMigrationError + """ + async with engine.connect() as conn: + version_num = await conn.scalar( + sa.DDL('SELECT "version_num" FROM "alembic_version"') + ) + head_version_num = get_current_head() + if version_num != head_version_num: + raise DBMigrationError( + f"Migration is incomplete, expected {head_version_num} but got {version_num}" + ) diff --git a/packages/pytest-simcore/src/pytest_simcore/postgres_service.py b/packages/pytest-simcore/src/pytest_simcore/postgres_service.py index 5930bd14afd..9fce95293a5 100644 --- a/packages/pytest-simcore/src/pytest_simcore/postgres_service.py +++ b/packages/pytest-simcore/src/pytest_simcore/postgres_service.py @@ -4,7 +4,6 @@ import logging from typing import AsyncIterator, Dict, Iterator, List -import aiopg.sa import pytest import sqlalchemy as sa import tenacity @@ -168,7 +167,7 @@ def postgres_db( @pytest.fixture(scope="function") async def aiopg_engine( postgres_db: sa.engine.Engine, -) -> AsyncIterator[aiopg.sa.engine.Engine]: +) -> AsyncIterator: """An aiopg engine connected to an initialized database""" from aiopg.sa import create_engine @@ -181,6 +180,22 @@ async def aiopg_engine( await engine.wait_closed() +@pytest.fixture(scope="function") +async def sqlalchemy_async_engine( + postgres_db: sa.engine.Engine, +) -> AsyncIterator: + # NOTE: prevent having to import this if latest sqlalchemy not installed + from sqlalchemy.ext.asyncio import create_async_engine + + engine = create_async_engine( + f"{postgres_db.url}".replace("postgresql", "postgresql+asyncpg") + ) + assert engine + yield engine + + await engine.dispose() + + @pytest.fixture(scope="function") def postgres_host_config(postgres_dsn: Dict[str, str], monkeypatch) -> Dict[str, str]: monkeypatch.setenv("POSTGRES_USER", postgres_dsn["user"]) @@ -195,7 +210,7 @@ def postgres_host_config(postgres_dsn: Dict[str, str], monkeypatch) -> Dict[str, @pytest.fixture(scope="module") -def postgres_session(postgres_db: sa.engine.Engine) -> sa.orm.session.Session: +def postgres_session(postgres_db: sa.engine.Engine) -> Iterator[sa.orm.session.Session]: from sqlalchemy.orm.session import Session Session_cls = sessionmaker(postgres_db) diff --git a/packages/settings-library/src/settings_library/postgres.py b/packages/settings-library/src/settings_library/postgres.py index eda7d1d95ff..fc49d501f0b 100644 --- a/packages/settings-library/src/settings_library/postgres.py +++ b/packages/settings-library/src/settings_library/postgres.py @@ -33,7 +33,12 @@ class PostgresSettings(BaseCustomSettings): POSTGRES_CLIENT_NAME: Optional[str] = Field( None, description="Name of the application connecting the postgres database, will default to use the host hostname (hostname on linux)", - env=["HOST", "HOSTNAME", "POSTGRES_CLIENT_NAME"], + env=[ + "POSTGRES_CLIENT_NAME", + # This is useful when running inside a docker container, then the hostname is set each client gets a different name + "HOST", + "HOSTNAME", + ], ) @validator("POSTGRES_MAXSIZE") @@ -57,6 +62,11 @@ def dsn(self) -> str: ) return dsn + @cached_property + def dsn_with_async_sqlalchemy(self) -> str: + dsn = self.dsn.replace("postgresql", "postgresql+asyncpg") + return dsn + @cached_property def dsn_with_query(self) -> str: """Some clients do not support queries in the dsn""" diff --git a/services/catalog/requirements/_base.in b/services/catalog/requirements/_base.in index 1f8066416f0..9d4caa6ec66 100644 --- a/services/catalog/requirements/_base.in +++ b/services/catalog/requirements/_base.in @@ -21,12 +21,14 @@ fastapi[all] pydantic[dotenv] # database -aiopg[sa] +asyncpg +sqlalchemy[asyncio] # web client httpx # other +aiocache[redis,msgpack] tenacity packaging pyyaml diff --git a/services/catalog/requirements/_base.txt b/services/catalog/requirements/_base.txt index 6c990bab139..f1675f94241 100644 --- a/services/catalog/requirements/_base.txt +++ b/services/catalog/requirements/_base.txt @@ -4,6 +4,8 @@ # # pip-compile --output-file=requirements/_base.txt --strip-extras requirements/_base.in # +aiocache==0.11.1 + # via -r requirements/_base.in aiodebug==1.1.2 # via # -c requirements/../../../packages/service-library/requirements/./_base.in @@ -12,8 +14,15 @@ aiofiles==0.5.0 # via # -c requirements/../../../packages/service-library/requirements/./_base.in # -r requirements/../../../packages/service-library/requirements/_base.in -aiopg==1.3.3 - # via -r requirements/_base.in +aioredis==1.3.1 + # via + # -c requirements/../../../packages/models-library/requirements/../../../requirements/constraints.txt + # -c requirements/../../../packages/postgres-database/requirements/../../../requirements/constraints.txt + # -c requirements/../../../packages/service-library/requirements/../../../requirements/constraints.txt + # -c requirements/../../../packages/service-library/requirements/./../../../requirements/constraints.txt + # -c requirements/../../../packages/settings-library/requirements/../../../requirements/constraints.txt + # -c requirements/../../../requirements/constraints.txt + # aiocache alembic==1.7.4 # via -r requirements/../../../packages/postgres-database/requirements/_base.in anyio==3.5.0 @@ -23,7 +32,9 @@ anyio==3.5.0 asgiref==3.4.1 # via uvicorn async-timeout==4.0.2 - # via aiopg + # via aioredis +asyncpg==0.25.0 + # via -r requirements/_base.in certifi==2020.12.5 # via # httpcore @@ -56,6 +67,8 @@ h11==0.12.0 # via # httpcore # uvicorn +hiredis==2.0.0 + # via aioredis httpcore==0.14.4 # via httpx httptools==0.2.0 @@ -102,6 +115,8 @@ markupsafe==1.1.1 # via # jinja2 # mako +msgpack==1.0.3 + # via aiocache multidict==5.1.0 # via yarl opentracing==2.4.0 @@ -113,9 +128,7 @@ orjson==3.4.8 packaging==20.9 # via -r requirements/_base.in psycopg2-binary==2.8.6 - # via - # aiopg - # sqlalchemy + # via sqlalchemy pydantic==1.9.0 # via # -c requirements/../../../packages/models-library/requirements/../../../requirements/constraints.txt @@ -180,7 +193,7 @@ sqlalchemy==1.4.31 # -c requirements/../../../packages/settings-library/requirements/../../../requirements/constraints.txt # -c requirements/../../../requirements/constraints.txt # -r requirements/../../../packages/postgres-database/requirements/_base.in - # aiopg + # -r requirements/_base.in # alembic starlette==0.17.1 # via fastapi diff --git a/services/catalog/requirements/_test.in b/services/catalog/requirements/_test.in index 644fe417545..b8e5d0adde8 100644 --- a/services/catalog/requirements/_test.in +++ b/services/catalog/requirements/_test.in @@ -14,7 +14,8 @@ jsonschema # testing pytest -pytest-aiohttp # incompatible with pytest-asyncio. See https://github.com/pytest-dev/pytest-asyncio/issues/76 +pytest-aiohttp +pytest-benchmark pytest-cov pytest-mock pytest-runner diff --git a/services/catalog/requirements/_test.txt b/services/catalog/requirements/_test.txt index bb18eb03a3c..9e2e76c9807 100644 --- a/services/catalog/requirements/_test.txt +++ b/services/catalog/requirements/_test.txt @@ -166,6 +166,8 @@ ptvsd==4.3.2 # via -r requirements/_test.in py==1.11.0 # via pytest +py-cpuinfo==8.0.0 + # via pytest-benchmark pycparser==2.21 # via cffi pylint==2.12.2 @@ -183,6 +185,7 @@ pytest==6.2.5 # -r requirements/_test.in # pytest-aiohttp # pytest-asyncio + # pytest-benchmark # pytest-cov # pytest-docker # pytest-mock @@ -190,6 +193,8 @@ pytest-aiohttp==1.0.4 # via -r requirements/_test.in pytest-asyncio==0.18.1 # via pytest-aiohttp +pytest-benchmark==3.4.1 + # via -r requirements/_test.in pytest-cov==3.0.0 # via -r requirements/_test.in pytest-docker==0.10.3 diff --git a/services/catalog/src/simcore_service_catalog/api/dependencies/database.py b/services/catalog/src/simcore_service_catalog/api/dependencies/database.py index 35d9d2c60a9..2b0118662f3 100644 --- a/services/catalog/src/simcore_service_catalog/api/dependencies/database.py +++ b/services/catalog/src/simcore_service_catalog/api/dependencies/database.py @@ -1,38 +1,32 @@ import logging from typing import AsyncGenerator, Callable, Type -from aiopg.sa import Engine from fastapi import Depends from fastapi.requests import Request +from sqlalchemy.ext.asyncio import AsyncEngine from ...db.repositories import BaseRepository logger = logging.getLogger(__name__) -def _get_db_engine(request: Request) -> Engine: +def _get_db_engine(request: Request) -> AsyncEngine: return request.app.state.engine def get_repository(repo_type: Type[BaseRepository]) -> Callable: async def _get_repo( - engine: Engine = Depends(_get_db_engine), + engine: AsyncEngine = Depends(_get_db_engine), ) -> AsyncGenerator[BaseRepository, None]: # NOTE: 2 different ideas were tried here with not so good # 1st one was acquiring a connection per repository which lead to the following issue https://github.com/ITISFoundation/osparc-simcore/pull/1966 # 2nd one was acquiring a connection per request which works but blocks the director-v2 responsiveness once # the max amount of connections is reached - # now the current solution is to acquire connection when needed. - available_engines = engine.maxsize - (engine.size - engine.freesize) - if available_engines <= 1: - logger.warning( - "Low pg connections available in pool: pool size=%d, acquired=%d, free=%d, reserved=[%d, %d]", - engine.size, - engine.size - engine.freesize, - engine.freesize, - engine.minsize, - engine.maxsize, - ) + # now the current solution is to connect connection when needed. + logger.info( + "%s", + f"current pool connections {engine.pool.checkedin()=},{engine.pool.checkedout()=}", + ) yield repo_type(db_engine=engine) return _get_repo diff --git a/services/catalog/src/simcore_service_catalog/api/routes/services.py b/services/catalog/src/simcore_service_catalog/api/routes/services.py index 68244a637d2..257d073b0a0 100644 --- a/services/catalog/src/simcore_service_catalog/api/routes/services.py +++ b/services/catalog/src/simcore_service_catalog/api/routes/services.py @@ -3,20 +3,22 @@ import asyncio import logging import urllib.parse -from typing import Any, Dict, List, Optional, Set, Tuple +from collections import deque +from typing import Any, Deque, Dict, List, Optional, Set, Tuple +from aiocache import cached from fastapi import APIRouter, Depends, Header, HTTPException, status -from models_library.services import KEY_RE, VERSION_RE, ServiceType +from models_library.services import ServiceKey, ServiceType, ServiceVersion from models_library.services_db import ServiceAccessRightsAtDB, ServiceMetaDataAtDB -from pydantic import ValidationError, constr +from pydantic import ValidationError from pydantic.types import PositiveInt +from simcore_service_catalog.services.director import MINUTE from starlette.requests import Request from ...db.repositories.groups import GroupsRepository from ...db.repositories.services import ServicesRepository from ...models.schemas.services import ServiceOut, ServiceUpdate from ...services.function_services import get_function_service, is_function_service -from ...utils.pools import non_blocking_process_pool_executor from ...utils.requests_decorators import cancellable_request from ..dependencies.database import get_repository from ..dependencies.director import DirectorApi, get_director_api @@ -36,6 +38,8 @@ "response_model_exclude_none": False, } +DIRECTOR_CACHING_TTL = 5 * MINUTE + def _prepare_service_details( service_in_registry: Dict[str, Any], @@ -96,13 +100,14 @@ async def list_services( product_name=x_simcore_products_name, ) } - # Non-detailed views from the services_repo database if not details: # only return a stripped down version # FIXME: add name, ddescription, type, etc... + # NOTE: here validation is not necessary since key,version were already validated + # in terms of time, this takes the most services_overview = [ - ServiceOut( + ServiceOut.construct( key=key, version=version, name="nodetails", @@ -117,27 +122,32 @@ async def list_services( ] return services_overview - # let's get all the services access rights - get_services_access_rights_task = services_repo.list_services_access_rights( - key_versions=list(services_in_db.keys()), product_name=x_simcore_products_name - ) - - # let's get the service owners - get_services_owner_emails_task = groups_repository.list_user_emails_from_gids( - {s.owner for s in services_in_db.values() if s.owner} - ) - - # getting services from director - get_registry_services_task = director_client.get("/services") + # caching this steps brings down the time to generate it at the expense of being sometimes a bit out of date + @cached(ttl=DIRECTOR_CACHING_TTL) + async def cached_registry_services() -> Deque[Tuple[str, str, Dict[str, Any]]]: + services_in_registry = await director_client.get("/services") + filtered_services = deque( + (s["key"], s["version"], s) + for s in ( + request.app.state.frontend_services_catalog + services_in_registry + ) + if (s.get("key"), s.get("version")) in services_in_db + ) + return filtered_services ( - services_in_registry, + registry_filtered_services, services_access_rights, services_owner_emails, ) = await asyncio.gather( - get_registry_services_task, - get_services_access_rights_task, - get_services_owner_emails_task, + cached_registry_services(), + services_repo.list_services_access_rights( + key_versions=services_in_db, + product_name=x_simcore_products_name, + ), + groups_repository.list_user_emails_from_gids( + {s.owner for s in services_in_db.values() if s.owner} + ), ) # NOTE: for the details of the services: @@ -145,28 +155,21 @@ async def list_services( # 2. we filter the services using the visible ones from the db # 3. then we compose the final service using as a base the registry service, overriding with the same # service from the database, adding also the access rights and the owner as email address instead of gid - # NOTE: this final step runs in a process pool so that it runs asynchronously and does not block in any way - with non_blocking_process_pool_executor(max_workers=2) as pool: - _target_services = ( - request.app.state.frontend_services_catalog + services_in_registry - ) - services_details = await asyncio.gather( - *[ - asyncio.get_event_loop().run_in_executor( - pool, - _prepare_service_details, - s, - services_in_db[s["key"], s["version"]], - services_access_rights[s["key"], s["version"]], - services_owner_emails.get( - services_in_db[s["key"], s["version"]].owner - ), - ) - for s in _target_services - if (s.get("key"), s.get("version")) in services_in_db - ] - ) - return [s for s in services_details if s is not None] + # NOTE: This step takes the bulk of the time to generate the list + services_details = await asyncio.gather( + *[ + asyncio.get_event_loop().run_in_executor( + None, + _prepare_service_details, + details, + services_in_db[key, version], + services_access_rights[key, version], + services_owner_emails.get(services_in_db[key, version].owner), + ) + for key, version, details in registry_filtered_services + ] + ) + return [s for s in services_details if s is not None] @router.get( @@ -176,8 +179,8 @@ async def list_services( ) async def get_service( user_id: int, - service_key: constr(regex=KEY_RE), - service_version: constr(regex=VERSION_RE), + service_key: ServiceKey, + service_version: ServiceVersion, director_client: DirectorApi = Depends(get_director_api), groups_repository: GroupsRepository = Depends(get_repository(GroupsRepository)), services_repo: ServicesRepository = Depends(get_repository(ServicesRepository)), @@ -257,8 +260,8 @@ async def get_service( async def modify_service( # pylint: disable=too-many-arguments user_id: int, - service_key: constr(regex=KEY_RE), - service_version: constr(regex=VERSION_RE), + service_key: ServiceKey, + service_version: ServiceVersion, updated_service: ServiceUpdate, director_client: DirectorApi = Depends(get_director_api), groups_repository: GroupsRepository = Depends(get_repository(GroupsRepository)), diff --git a/services/catalog/src/simcore_service_catalog/core/background_tasks.py b/services/catalog/src/simcore_service_catalog/core/background_tasks.py index 6f99d194c78..fe9f64cf84d 100644 --- a/services/catalog/src/simcore_service_catalog/core/background_tasks.py +++ b/services/catalog/src/simcore_service_catalog/core/background_tasks.py @@ -15,12 +15,12 @@ from pprint import pformat from typing import Dict, Set, Tuple -from aiopg.sa import Engine from fastapi import FastAPI from models_library.services import ServiceDockerData from models_library.services_db import ServiceAccessRightsAtDB, ServiceMetaDataAtDB from packaging.version import Version from pydantic import ValidationError +from sqlalchemy.ext.asyncio import AsyncEngine from ..api.dependencies.director import get_director_api from ..db.repositories.groups import GroupsRepository @@ -63,7 +63,7 @@ async def _list_registry_services( async def _list_db_services( - db_engine: Engine, + db_engine: AsyncEngine, ) -> Set[Tuple[ServiceKey, ServiceVersion]]: services_repo = ServicesRepository(db_engine=db_engine) return { @@ -138,7 +138,7 @@ async def _ensure_registry_insync_with_db(app: FastAPI) -> None: async def _ensure_published_templates_accessible( - db_engine: Engine, default_product_name: str + db_engine: AsyncEngine, default_product_name: str ) -> None: # Rationale: if a project template was published, its services must be available to everyone. # a published template has a column Published that is set to True @@ -180,7 +180,7 @@ async def _ensure_published_templates_accessible( async def sync_registry_task(app: FastAPI) -> None: default_product: str = app.state.settings.CATALOG_ACCESS_RIGHTS_DEFAULT_PRODUCT_NAME - engine: Engine = app.state.engine + engine: AsyncEngine = app.state.engine while app.state.registry_syncer_running: try: diff --git a/services/catalog/src/simcore_service_catalog/core/events.py b/services/catalog/src/simcore_service_catalog/core/events.py index 04992589404..74b88d094da 100644 --- a/services/catalog/src/simcore_service_catalog/core/events.py +++ b/services/catalog/src/simcore_service_catalog/core/events.py @@ -57,6 +57,7 @@ async def start_app() -> None: # FIXME: check director service is in place and ready. Hand-shake?? # SEE https://github.com/ITISFoundation/osparc-simcore/issues/1728 await start_registry_sync_task(app) + ... if app.state.settings.CATALOG_TRACING: setup_tracing(app, app.state.settings.CATALOG_TRACING) diff --git a/services/catalog/src/simcore_service_catalog/db/events.py b/services/catalog/src/simcore_service_catalog/db/events.py index 181e4756469..1be02440b2b 100644 --- a/services/catalog/src/simcore_service_catalog/db/events.py +++ b/services/catalog/src/simcore_service_catalog/db/events.py @@ -1,13 +1,12 @@ import logging -from aiopg.sa import Engine, create_engine from fastapi import FastAPI from servicelib.retry_policies import PostgresRetryPolicyUponInitialization -from simcore_postgres_database.utils_aiopg import ( - close_engine, - get_pg_engine_info, +from simcore_postgres_database.utils_aiosqlalchemy import ( + get_pg_engine_stateinfo, raise_if_migration_not_ready, ) +from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine from tenacity import retry from ..core.settings import PostgresSettings @@ -19,20 +18,25 @@ async def connect_to_db(app: FastAPI) -> None: logger.debug("Connecting db ...") cfg: PostgresSettings = app.state.settings.CATALOG_POSTGRES - engine: Engine = await create_engine( - str(cfg.dsn), - application_name=f"{__name__}_{id(app)}", # unique identifier per app - minsize=cfg.POSTGRES_MINSIZE, - maxsize=cfg.POSTGRES_MAXSIZE, + + engine: AsyncEngine = create_async_engine( + cfg.dsn_with_async_sqlalchemy, + pool_size=cfg.POSTGRES_MINSIZE, + max_overflow=cfg.POSTGRES_MAXSIZE - cfg.POSTGRES_MINSIZE, + connect_args={ + "server_settings": {"application_name": cfg.POSTGRES_CLIENT_NAME} + }, + pool_pre_ping=True, # https://docs.sqlalchemy.org/en/14/core/pooling.html#dealing-with-disconnects ) - logger.debug("Connected to %s", engine.dsn) - logger.debug("Checking db migrationn ...") + logger.debug("Connected to %s", engine.url) # pylint: disable=no-member + + logger.debug("Checking db migration...") try: await raise_if_migration_not_ready(engine) except Exception: # NOTE: engine must be closed because retry will create a new engine - await close_engine(engine) + await engine.dispose() raise logger.debug("Migration up-to-date") @@ -40,7 +44,7 @@ async def connect_to_db(app: FastAPI) -> None: logger.debug( "Setup engine: %s", - get_pg_engine_info(engine), + await get_pg_engine_stateinfo(engine), ) @@ -48,6 +52,6 @@ async def close_db_connection(app: FastAPI) -> None: logger.debug("Disconnecting db ...") if engine := app.state.engine: - await close_engine(engine) + await engine.dispose() - logger.debug("Disconnected from %s", engine.dsn) + logger.debug("Disconnected from %s", engine.url) # pylint: disable=no-member diff --git a/services/catalog/src/simcore_service_catalog/db/repositories/_base.py b/services/catalog/src/simcore_service_catalog/db/repositories/_base.py index 1f7cc57a069..4a20b37c735 100644 --- a/services/catalog/src/simcore_service_catalog/db/repositories/_base.py +++ b/services/catalog/src/simcore_service_catalog/db/repositories/_base.py @@ -1,6 +1,6 @@ from dataclasses import dataclass -from aiopg.sa import Engine +from sqlalchemy.ext.asyncio import AsyncEngine @dataclass @@ -9,4 +9,4 @@ class BaseRepository: Repositories are pulled at every request """ - db_engine: Engine = None + db_engine: AsyncEngine diff --git a/services/catalog/src/simcore_service_catalog/db/repositories/dags.py b/services/catalog/src/simcore_service_catalog/db/repositories/dags.py index 07397f61103..5a6837b20a7 100644 --- a/services/catalog/src/simcore_service_catalog/db/repositories/dags.py +++ b/services/catalog/src/simcore_service_catalog/db/repositories/dags.py @@ -2,7 +2,6 @@ from typing import List, Optional import sqlalchemy as sa -from aiopg.sa.result import RowProxy from ...models.domain.dag import DAGAtDB from ...models.schemas.dag import DAGIn @@ -13,21 +12,19 @@ class DAGsRepository(BaseRepository): async def list_dags(self) -> List[DAGAtDB]: dagraphs = [] - async with self.db_engine.acquire() as conn: - async for row in conn.execute(dags.select()): - dagraphs.append(DAGAtDB(**row)) + async with self.db_engine.connect() as conn: + async for row in await conn.stream(dags.select()): + dagraphs.append(DAGAtDB.parse_obj(row)) return dagraphs async def get_dag(self, dag_id: int) -> Optional[DAGAtDB]: - async with self.db_engine.acquire() as conn: - row: RowProxy = await ( - await conn.execute(dags.select().where(dags.c.id == dag_id)) - ).first() + async with self.db_engine.connect() as conn: + row = await conn.execute(dags.select().where(dags.c.id == dag_id)).first() if row: return DAGAtDB(**row) async def create_dag(self, dag: DAGIn) -> int: - async with self.db_engine.acquire() as conn: + async with self.db_engine.connect() as conn: new_id: int = await ( await conn.execute( dags.insert().values( @@ -39,7 +36,7 @@ async def create_dag(self, dag: DAGIn) -> int: return new_id async def replace_dag(self, dag_id: int, dag: DAGIn): - async with self.db_engine.acquire() as conn: + async with self.db_engine.connect() as conn: await conn.execute( dags.update() .values( @@ -53,7 +50,7 @@ async def update_dag(self, dag_id: int, dag: DAGIn): patch = dag.dict(exclude_unset=True, exclude={"workbench"}) if "workbench" in dag.__fields_set__: patch["workbench"] = json.dumps(patch["workbench"]) - async with self.db_engine.acquire() as conn: + async with self.db_engine.connect() as conn: res = await conn.execute( sa.update(dags).values(**patch).where(dags.c.id == dag_id) ) @@ -62,5 +59,5 @@ async def update_dag(self, dag_id: int, dag: DAGIn): assert res.returns_rows == False # nosec async def delete_dag(self, dag_id: int): - async with self.db_engine.acquire() as conn: + async with self.db_engine.connect() as conn: await conn.execute(sa.delete(dags).where(dags.c.id == dag_id)) diff --git a/services/catalog/src/simcore_service_catalog/db/repositories/groups.py b/services/catalog/src/simcore_service_catalog/db/repositories/groups.py index eb44da3588f..0279da181b6 100644 --- a/services/catalog/src/simcore_service_catalog/db/repositories/groups.py +++ b/services/catalog/src/simcore_service_catalog/db/repositories/groups.py @@ -1,7 +1,6 @@ from typing import Dict, List, Optional, Set import sqlalchemy as sa -from aiopg.sa.result import RowProxy from pydantic.networks import EmailStr from pydantic.types import PositiveInt @@ -14,8 +13,8 @@ class GroupsRepository(BaseRepository): async def list_user_groups(self, user_id: int) -> List[GroupAtDB]: groups_in_db = [] - async with self.db_engine.acquire() as conn: - async for row in conn.execute( + async with self.db_engine.connect() as conn: + async for row in await conn.stream( sa.select([groups]) .select_from( user_to_groups.join(groups, user_to_groups.c.gid == groups.c.gid), @@ -26,12 +25,11 @@ async def list_user_groups(self, user_id: int) -> List[GroupAtDB]: return groups_in_db async def get_everyone_group(self) -> GroupAtDB: - async with self.db_engine.acquire() as conn: - row: RowProxy = await ( - await conn.execute( - sa.select([groups]).where(groups.c.type == GroupType.EVERYONE) - ) - ).first() + async with self.db_engine.connect() as conn: + result = await conn.execute( + sa.select([groups]).where(groups.c.type == GroupType.EVERYONE) + ) + row = result.first() if not row: raise RepositoryError(f"{GroupType.EVERYONE} groups was never initialized") return GroupAtDB(**row) @@ -39,19 +37,19 @@ async def get_everyone_group(self) -> GroupAtDB: async def get_user_gid_from_email( self, user_email: EmailStr ) -> Optional[PositiveInt]: - async with self.db_engine.acquire() as conn: + async with self.db_engine.connect() as conn: return await conn.scalar( sa.select([users.c.primary_gid]).where(users.c.email == user_email) ) async def get_gid_from_affiliation(self, affiliation: str) -> Optional[PositiveInt]: - async with self.db_engine.acquire() as conn: + async with self.db_engine.connect() as conn: return await conn.scalar( sa.select([groups.c.gid]).where(groups.c.name == affiliation) ) async def get_user_email_from_gid(self, gid: PositiveInt) -> Optional[EmailStr]: - async with self.db_engine.acquire() as conn: + async with self.db_engine.connect() as conn: return await conn.scalar( sa.select([users.c.email]).where(users.c.primary_gid == gid) ) @@ -60,8 +58,8 @@ async def list_user_emails_from_gids( self, gids: Set[PositiveInt] ) -> Dict[PositiveInt, Optional[EmailStr]]: service_owners = {} - async with self.db_engine.acquire() as conn: - async for row in conn.execute( + async with self.db_engine.connect() as conn: + async for row in await conn.stream( sa.select([users.c.primary_gid, users.c.email]).where( users.c.primary_gid.in_(gids) ) diff --git a/services/catalog/src/simcore_service_catalog/db/repositories/projects.py b/services/catalog/src/simcore_service_catalog/db/repositories/projects.py index 78f2d07e1f7..9d8fa4e491f 100644 --- a/services/catalog/src/simcore_service_catalog/db/repositories/projects.py +++ b/services/catalog/src/simcore_service_catalog/db/repositories/projects.py @@ -14,8 +14,8 @@ class ProjectsRepository(BaseRepository): async def list_services_from_published_templates(self) -> List[ServiceKeyVersion]: list_of_published_services: List[ServiceKeyVersion] = [] - async with self.db_engine.acquire() as conn: - async for row in conn.execute( + async with self.db_engine.connect() as conn: + async for row in await conn.stream( sa.select([projects]).where( (projects.c.type == ProjectType.TEMPLATE) & (projects.c.published == True) diff --git a/services/catalog/src/simcore_service_catalog/db/repositories/services.py b/services/catalog/src/simcore_service_catalog/db/repositories/services.py index 0df83258785..73e40e561c1 100644 --- a/services/catalog/src/simcore_service_catalog/db/repositories/services.py +++ b/services/catalog/src/simcore_service_catalog/db/repositories/services.py @@ -1,6 +1,6 @@ import logging from collections import defaultdict -from typing import Dict, List, Optional, Tuple +from typing import Dict, Iterable, List, Optional, Tuple import sqlalchemy as sa from models_library.services_db import ServiceAccessRightsAtDB, ServiceMetaDataAtDB @@ -39,8 +39,8 @@ def _make_list_services_query( query = ( sa.select( [services_meta_data], - distinct=[services_meta_data.c.key, services_meta_data.c.version], ) + .distinct(services_meta_data.c.key, services_meta_data.c.version) .select_from(services_meta_data.join(services_access_rights)) .where( and_( @@ -74,8 +74,8 @@ async def list_services( ) -> List[ServiceMetaDataAtDB]: services_in_db = [] - async with self.db_engine.acquire() as conn: - async for row in conn.execute( + async with self.db_engine.connect() as conn: + async for row in await conn.stream( _make_list_services_query( gids, execute_access, @@ -124,8 +124,8 @@ async def list_service_releases( query = query.limit(limit_count) releases = [] - async with self.db_engine.acquire() as conn: - async for row in conn.execute(query): + async with self.db_engine.connect() as conn: + async for row in await conn.stream(query): releases.append(ServiceMetaDataAtDB(**row)) return releases @@ -170,9 +170,9 @@ async def get_service( ) ) ) - async with self.db_engine.acquire() as conn: + async with self.db_engine.connect() as conn: result = await conn.execute(query) - row = await result.first() + row = result.first() if row: return ServiceMetaDataAtDB(**row) @@ -191,7 +191,7 @@ async def create_service( f"{access_rights} does not correspond to service {new_service.key}:{new_service.version}" ) - async with self.db_engine.acquire() as conn: + async with self.db_engine.connect() as conn: # NOTE: this ensure proper rollback in case of issue async with conn.begin() as _transaction: result = await conn.execute( @@ -200,7 +200,7 @@ async def create_service( .values(**new_service.dict(by_alias=True)) .returning(literal_column("*")) ) - row = await result.first() + row = result.first() assert row # nosec created_service = ServiceMetaDataAtDB(**row) @@ -215,7 +215,7 @@ async def update_service( self, patched_service: ServiceMetaDataAtDB ) -> ServiceMetaDataAtDB: # update the services_meta_data table - async with self.db_engine.acquire() as conn: + async with self.db_engine.connect() as conn: result = await conn.execute( # pylint: disable=no-value-for-parameter services_meta_data.update() @@ -226,7 +226,7 @@ async def update_service( .values(**patched_service.dict(by_alias=True, exclude_unset=True)) .returning(literal_column("*")) ) - row = await result.first() + row = result.first() assert row # nosec updated_service = ServiceMetaDataAtDB(**row) return updated_service @@ -249,13 +249,15 @@ async def get_service_access_rights( query = sa.select([services_access_rights]).where(search_expression) - async with self.db_engine.acquire() as conn: - async for row in conn.execute(query): + async with self.db_engine.connect() as conn: + async for row in await conn.stream(query): services_in_db.append(ServiceAccessRightsAtDB(**row)) return services_in_db async def list_services_access_rights( - self, key_versions: List[Tuple[str, str]], product_name: Optional[str] = None + self, + key_versions: Iterable[Tuple[str, str]], + product_name: Optional[str] = None, ) -> Dict[Tuple[str, str], List[ServiceAccessRightsAtDB]]: """Batch version of get_service_access_rights""" service_to_access_rights = defaultdict(list) @@ -271,15 +273,8 @@ async def list_services_access_rights( else True ) ) - async with self.db_engine.acquire() as conn: - # NOTE: this strange query compile is due to an incompatilility - # between aiopg.sa and sqlalchemy 1.4 - # One of the maintainer of aiopg says: - # https://github.com/aio-libs/aiopg/issues/798#issuecomment-934256346 - # we should drop aiopg.sa - async for row in conn.execute( - f"{(query.compile(compile_kwargs={'literal_binds': True}))}" - ): + async with self.db_engine.connect() as conn: + async for row in await conn.stream(query): service_to_access_rights[ ( row[services_access_rights.c.key], @@ -306,7 +301,7 @@ async def upsert_service_access_rights( set_=rights.dict(by_alias=True, exclude_unset=True), ) try: - async with self.db_engine.acquire() as conn: + async with self.db_engine.connect() as conn: await conn.execute( # pylint: disable=no-value-for-parameter on_update_stmt @@ -321,7 +316,7 @@ async def upsert_service_access_rights( async def delete_service_access_rights( self, delete_access_rights: List[ServiceAccessRightsAtDB] ) -> None: - async with self.db_engine.acquire() as conn: + async with self.db_engine.connect() as conn: for rights in delete_access_rights: await conn.execute( # pylint: disable=no-value-for-parameter diff --git a/services/catalog/src/simcore_service_catalog/services/access_rights.py b/services/catalog/src/simcore_service_catalog/services/access_rights.py index a7846e44429..bc373ac3912 100644 --- a/services/catalog/src/simcore_service_catalog/services/access_rights.py +++ b/services/catalog/src/simcore_service_catalog/services/access_rights.py @@ -7,12 +7,12 @@ from typing import Callable, Dict, List, Optional, Tuple, Union from urllib.parse import quote_plus -from aiopg.sa.engine import Engine from fastapi import FastAPI from models_library.services import ServiceDockerData from models_library.services_db import ServiceAccessRightsAtDB from packaging.version import Version from pydantic.types import PositiveInt +from sqlalchemy.ext.asyncio import AsyncEngine from ..api.dependencies.director import get_director_api from ..db.repositories.groups import GroupsRepository @@ -54,7 +54,7 @@ async def evaluate_default_policy( 2. Services published after 19.08.2020 will be visible ONLY to his/her owner 3. Front-end services are have execute-access to everyone """ - db_engine: Engine = app.state.engine + db_engine: AsyncEngine = app.state.engine groups_repo = GroupsRepository(db_engine) owner_gid = None diff --git a/services/catalog/src/simcore_service_catalog/services/director.py b/services/catalog/src/simcore_service_catalog/services/director.py index 95e1e665605..9f47721290a 100644 --- a/services/catalog/src/simcore_service_catalog/services/director.py +++ b/services/catalog/src/simcore_service_catalog/services/director.py @@ -48,8 +48,8 @@ async def setup_director(app: FastAPI) -> None: async def close_director(app: FastAPI) -> None: - if director_api := app.state.director_api: - await director_api.delete() + if director_client := app.state.director_api: + await director_client.delete() logger.debug("Director client closed successfully") diff --git a/services/catalog/src/simcore_service_catalog/utils/requests_decorators.py b/services/catalog/src/simcore_service_catalog/utils/requests_decorators.py index d35dd88f6a5..c53ead446d6 100644 --- a/services/catalog/src/simcore_service_catalog/utils/requests_decorators.py +++ b/services/catalog/src/simcore_service_catalog/utils/requests_decorators.py @@ -24,7 +24,7 @@ async def _cancel_task_if_client_disconnected( await asyncio.sleep(interval) -def cancellable_request(handler: Callable[[Any], Coroutine[Any, Any, Response]]): +def cancellable_request(handler: Callable[..., Coroutine[Any, Any, Response]]): """this decorator periodically checks if the client disconnected and then will cancel the request and return a 499 code (a la nginx).""" @wraps(handler) diff --git a/services/catalog/tests/unit/test_services_director.py b/services/catalog/tests/unit/test_services_director.py index e365bf75c2b..cb6f458b3ff 100644 --- a/services/catalog/tests/unit/test_services_director.py +++ b/services/catalog/tests/unit/test_services_director.py @@ -4,7 +4,7 @@ # pylint:disable=protected-access -from typing import Dict, Iterable, Iterator +from typing import Dict, Iterator import pytest import respx @@ -32,7 +32,7 @@ def minimal_app( @pytest.fixture() -def client(minimal_app: FastAPI) -> Iterable[TestClient]: +def client(minimal_app: FastAPI) -> Iterator[TestClient]: # NOTE: this way we ensure the events are run in the application # since it starts the app on a test server with TestClient(minimal_app) as client: @@ -40,7 +40,7 @@ def client(minimal_app: FastAPI) -> Iterable[TestClient]: @pytest.fixture -def mocked_director_service_api(minimal_app: FastAPI) -> MockRouter: +def mocked_director_service_api(minimal_app: FastAPI) -> Iterator[MockRouter]: with respx.mock( base_url=minimal_app.state.settings.CATALOG_DIRECTOR.base_url, assert_all_called=False, diff --git a/services/catalog/tests/unit/with_dbs/conftest.py b/services/catalog/tests/unit/with_dbs/conftest.py index 234bcc8ac9c..995cbf28001 100644 --- a/services/catalog/tests/unit/with_dbs/conftest.py +++ b/services/catalog/tests/unit/with_dbs/conftest.py @@ -5,18 +5,19 @@ import itertools import random +from random import randint from typing import Any, AsyncIterator, Callable, Dict, Iterable, Iterator, List, Tuple import pytest import respx import sqlalchemy as sa from _pytest.monkeypatch import MonkeyPatch -from aiopg.sa.engine import Engine from faker import Faker from fastapi import FastAPI +from pydantic import PositiveInt from pytest_mock.plugin import MockerFixture -from respx.router import MockRouter from simcore_postgres_database.models.products import products +from simcore_postgres_database.models.users import UserRole, UserStatus, users from simcore_service_catalog.core.application import init_app from simcore_service_catalog.db.tables import ( groups, @@ -24,6 +25,7 @@ services_meta_data, ) from sqlalchemy.dialects.postgresql import insert as pg_insert +from sqlalchemy.ext.asyncio import AsyncEngine from starlette.testclient import TestClient @@ -37,6 +39,7 @@ def app( ) -> Iterable[FastAPI]: monkeypatch.setenv("CATALOG_TRACING", "null") monkeypatch.setenv("SC_BOOT_MODE", "local-development") + monkeypatch.setenv("POSTGRES_CLIENT_NAME", "pytest_client") app = init_app() yield app @@ -49,8 +52,7 @@ def client(app: FastAPI) -> Iterator[TestClient]: @pytest.fixture() -def director_mockup(app: FastAPI) -> Iterator[MockRouter]: - +def director_mockup(app: FastAPI) -> Iterator[respx.MockRouter]: with respx.mock( base_url=app.state.settings.CATALOG_DIRECTOR.base_url, assert_all_called=False, @@ -82,8 +84,41 @@ def director_mockup(app: FastAPI) -> Iterator[MockRouter]: # +@pytest.fixture(scope="session") +def user_id() -> PositiveInt: + return randint(1, 10000) + + +@pytest.fixture() +def user_db(postgres_db: sa.engine.Engine, user_id: PositiveInt) -> Iterator[Dict]: + with postgres_db.connect() as con: + # removes all users before continuing + con.execute(users.delete()) + con.execute( + users.insert() + .values( + id=user_id, + name="test user", + email="test@user.com", + password_hash="testhash", + status=UserStatus.ACTIVE, + role=UserRole.USER, + ) + .returning(sa.literal_column("*")) + ) + # this is needed to get the primary_gid correctly + result = con.execute(sa.select([users]).where(users.c.id == user_id)) + user = result.first() + + yield dict(user) + + con.execute(users.delete().where(users.c.id == user_id)) + + @pytest.fixture() -async def products_names(aiopg_engine: Engine) -> AsyncIterator[List[str]]: +async def products_names( + sqlalchemy_async_engine: AsyncEngine, +) -> AsyncIterator[List[str]]: """Inits products db table and returns product names""" data = [ # already upon creation: ("osparc", r"([\.-]{0,1}osparc[\.-])"), @@ -93,7 +128,7 @@ async def products_names(aiopg_engine: Engine) -> AsyncIterator[List[str]]: # pylint: disable=no-value-for-parameter - async with aiopg_engine.acquire() as conn: + async with sqlalchemy_async_engine.begin() as conn: # NOTE: The 'default' dialect with current database version settings does not support in-place multirow inserts for name, regex in data: stmt = products.insert().values(name=name, host_regex=regex) @@ -104,17 +139,18 @@ async def products_names(aiopg_engine: Engine) -> AsyncIterator[List[str]]: ] + [items[0] for items in data] yield names - async with aiopg_engine.acquire() as conn: + async with sqlalchemy_async_engine.begin() as conn: await conn.execute(products.delete()) @pytest.fixture() -async def user_groups_ids(aiopg_engine: Engine) -> AsyncIterator[List[int]]: +async def user_groups_ids( + sqlalchemy_async_engine: AsyncEngine, user_db: Dict[str, Any] +) -> AsyncIterator[List[int]]: """Inits groups table and returns group identifiers""" cols = ("gid", "name", "description", "type", "thumbnail", "inclusion_rules") data = [ - (34, "john.smith", "primary group for user", "PRIMARY", None, {}), ( 20001, "Team Black", @@ -125,26 +161,24 @@ async def user_groups_ids(aiopg_engine: Engine) -> AsyncIterator[List[int]]: ), ] # pylint: disable=no-value-for-parameter - - async with aiopg_engine.acquire() as conn: + async with sqlalchemy_async_engine.begin() as conn: for row in data: # NOTE: The 'default' dialect with current database version settings does not support in-place multirow inserts - stmt = groups.insert().values(**dict(zip(cols, row))) - await conn.execute(stmt) + await conn.execute(groups.insert().values(**dict(zip(cols, row)))) - gids = [ - 1, - ] + [items[0] for items in data] + gids = [1, user_db["primary_gid"]] + [items[0] for items in data] yield gids - async with aiopg_engine.acquire() as conn: + async with sqlalchemy_async_engine.begin() as conn: await conn.execute(services_meta_data.delete()) - await conn.execute(groups.delete().where(groups.c.gid.in_(gids[1:]))) + await conn.execute(groups.delete().where(groups.c.gid.in_(gids[2:]))) @pytest.fixture() -async def services_db_tables_injector(aiopg_engine: Engine) -> AsyncIterator[Callable]: +async def services_db_tables_injector( + sqlalchemy_async_engine: AsyncEngine, +) -> AsyncIterator[Callable]: """Returns a helper function to init services_meta_data and services_access_rights tables @@ -175,7 +209,7 @@ async def services_db_tables_injector(aiopg_engine: Engine) -> AsyncIterator[Cal async def inject_in_db(fake_catalog: List[Tuple]): # [(service, ar1, ...), (service2, ar1, ...) ] - async with aiopg_engine.acquire() as conn: + async with sqlalchemy_async_engine.begin() as conn: # NOTE: The 'default' dialect with current database version settings does not support in-place multirow inserts for service in [items[0] for items in fake_catalog]: insert_meta = pg_insert(services_meta_data).values(**service) @@ -194,7 +228,7 @@ async def inject_in_db(fake_catalog: List[Tuple]): yield inject_in_db - async with aiopg_engine.acquire() as conn: + async with sqlalchemy_async_engine.begin() as conn: await conn.execute(services_access_rights.delete()) await conn.execute(services_meta_data.delete()) @@ -291,3 +325,18 @@ def _fake_factory( return tuple(fakes) return _fake_factory + + +@pytest.fixture +def mock_catalog_background_task(mocker: MockerFixture): + """patch the setup of the background task so we can call it manually""" + mocker.patch( + "simcore_service_catalog.core.events.start_registry_sync_task", + return_value=None, + autospec=True, + ) + mocker.patch( + "simcore_service_catalog.core.events.stop_registry_sync_task", + return_value=None, + autospec=True, + ) diff --git a/services/catalog/tests/unit/with_dbs/test_db_repositories.py b/services/catalog/tests/unit/with_dbs/test_db_repositories.py index 52b5332d645..c7fa11550e6 100644 --- a/services/catalog/tests/unit/with_dbs/test_db_repositories.py +++ b/services/catalog/tests/unit/with_dbs/test_db_repositories.py @@ -20,8 +20,8 @@ @pytest.fixture -def services_repo(aiopg_engine): - repo = ServicesRepository(aiopg_engine) +def services_repo(sqlalchemy_async_engine): + repo = ServicesRepository(sqlalchemy_async_engine) return repo diff --git a/services/catalog/tests/unit/with_dbs/test_entrypoint_services.py b/services/catalog/tests/unit/with_dbs/test_entrypoint_services.py index 8eaf3b896a7..a51aabd07fa 100644 --- a/services/catalog/tests/unit/with_dbs/test_entrypoint_services.py +++ b/services/catalog/tests/unit/with_dbs/test_entrypoint_services.py @@ -5,22 +5,15 @@ # pylint: disable=unused-argument # pylint: disable=unused-variable -import asyncio -from datetime import datetime -from random import randint -from typing import List, Optional +import re +from typing import Callable, List +import httpx import pytest -import simcore_service_catalog.api.dependencies.director +import respx from fastapi import FastAPI -from models_library.services import ServiceDockerData, ServiceType -from models_library.services_db import ServiceAccessRightsAtDB -from pydantic.types import PositiveInt +from models_library.services import ServiceDockerData from respx.router import MockRouter -from simcore_service_catalog.api.routes import services -from simcore_service_catalog.db.repositories.groups import GroupsRepository -from simcore_service_catalog.models.domain.group import GroupAtDB, GroupType -from simcore_service_catalog.models.schemas.services import ServiceOut from starlette.testclient import TestClient from yarl import URL @@ -32,146 +25,214 @@ ] -@pytest.fixture(scope="session") -def user_id() -> int: - return randint(1, 10000) - - -@pytest.fixture(scope="session") -def user_groups(user_id: int) -> List[GroupAtDB]: - return [ - GroupAtDB( - gid=user_id, - name="my primary group", - description="it is primary", - type=GroupType.PRIMARY, - ), - GroupAtDB( - gid=randint(10001, 15000), - name="all group", - description="it is everyone", - type=GroupType.EVERYONE, - ), - GroupAtDB( - gid=randint(15001, 20000), - name="standard group", - description="it is standard", - type=GroupType.STANDARD, - ), - ] +@pytest.fixture +def mock_director_services( + director_mockup: respx.MockRouter, app: FastAPI +) -> MockRouter: + mock_route = director_mockup.get( + f"{app.state.settings.CATALOG_DIRECTOR.base_url}/services", + name="list_services", + ).respond(200, json={"data": ["blahblah"]}) + response = httpx.get(f"{app.state.settings.CATALOG_DIRECTOR.base_url}/services") + assert mock_route.called + assert response.json() == {"data": ["blahblah"]} + return director_mockup -@pytest.fixture(scope="session") -def registry_services() -> List[ServiceDockerData]: - NUMBER_OF_SERVICES = 5 - return [ - ServiceDockerData( - key="simcore/services/comp/my_comp_service", - version=f"{v}.{randint(0,20)}.{randint(0,20)}", - type=ServiceType.COMPUTATIONAL, - name=f"my service {v}", - description="a sleeping service version {v}", - authors=[{"name": "me", "email": "me@myself.com"}], - contact="me.myself@you.com", - inputs=[], - outputs=[], +async def test_list_services_with_details( + mock_catalog_background_task, + director_mockup: MockRouter, + client: TestClient, + user_id: int, + products_names: List[str], + service_catalog_faker: Callable, + services_db_tables_injector: Callable, + benchmark, +): + target_product = products_names[-1] + # create some fake services + NUM_SERVICES = 1000 + fake_services = [ + service_catalog_faker( + "simcore/services/dynamic/jupyterlab", + f"1.0.{s}", + team_access=None, + everyone_access=None, + product=target_product, ) - for v in range(NUMBER_OF_SERVICES) + for s in range(NUM_SERVICES) ] + # injects fake data in db + await services_db_tables_injector(fake_services) + url = URL("/v0/services").with_query({"user_id": user_id, "details": "true"}) -@pytest.fixture(scope="session") -def db_services( - registry_services: List[ServiceOut], user_groups: List[GroupAtDB] -) -> List[ServiceAccessRightsAtDB]: - return [ - ServiceAccessRightsAtDB( - key=s.key, - version=s.version, - gid=user_groups[0].gid, - execute_access=True, - product_name="osparc", - ) - for s in registry_services - ] + # now fake the director such that it returns half the services + fake_registry_service_data = ServiceDockerData.Config.schema_extra["examples"][0] - -@pytest.fixture() -async def director_mockup( - monkeypatch, registry_services: List[ServiceOut], app: FastAPI -): - async def return_list_services(user_id: int) -> List[ServiceOut]: - return registry_services - - monkeypatch.setattr(services, "list_services", return_list_services) - - class FakeDirector: - @staticmethod - async def get(url: str): - if url == "/services": - return [s.dict(by_alias=True) for s in registry_services] - if "/service_extras/" in url: - return { - "build_date": f"{datetime.utcnow().isoformat(timespec='seconds')}Z" + director_mockup.get("/services", name="list_services").respond( + 200, + json={ + "data": [ + { + **fake_registry_service_data, + **{"key": s[0]["key"], "version": s[0]["version"]}, } - - def fake_director_api(*args, **kwargs): - return FakeDirector() - - monkeypatch.setattr( - simcore_service_catalog.api.dependencies.director, - "get_director_api", - fake_director_api, + for s in fake_services[::2] + ] + }, ) - # Check mock - from simcore_service_catalog.api.dependencies.director import get_director_api + response = benchmark( + client.get, f"{url}", headers={"x-simcore-products-name": target_product} + ) - assert isinstance(get_director_api(), FakeDirector) - yield + assert response.status_code == 200 + data = response.json() + assert len(data) == round(NUM_SERVICES / 2) -@pytest.fixture() -async def db_mockup( - monkeypatch, - app: FastAPI, - user_groups: List[GroupAtDB], - db_services: List[ServiceAccessRightsAtDB], +async def test_list_services_without_details( + mock_catalog_background_task, + director_mockup: MockRouter, + client: TestClient, + user_id: int, + products_names: List[str], + service_catalog_faker: Callable, + services_db_tables_injector: Callable, + benchmark, ): - async def return_list_user_groups(self, user_id: int) -> List[GroupAtDB]: - return user_groups + target_product = products_names[-1] + # injects fake data in db + NUM_SERVICES = 1000 + SERVICE_KEY = "simcore/services/dynamic/jupyterlab" + await services_db_tables_injector( + [ + service_catalog_faker( + SERVICE_KEY, + f"1.0.{s}", + team_access=None, + everyone_access=None, + product=target_product, + ) + for s in range(NUM_SERVICES) + ] + ) - async def return_gid_from_email(*args, **kwargs) -> Optional[PositiveInt]: - return user_groups[0].gid + url = URL("/v0/services").with_query({"user_id": user_id, "details": "false"}) + response = benchmark( + client.get, f"{url}", headers={"x-simcore-products-name": target_product} + ) + assert response.status_code == 200 + data = response.json() + assert len(data) == NUM_SERVICES + for service in data: + assert service["key"] == SERVICE_KEY + assert re.match("1.0.[0-9]+", service["version"]) is not None + assert service["name"] == "nodetails" + assert service["description"] == "nodetails" + assert service["contact"] == "nodetails@nodetails.com" - monkeypatch.setattr(GroupsRepository, "list_user_groups", return_list_user_groups) - monkeypatch.setattr( - GroupsRepository, "get_user_gid_from_email", return_gid_from_email + +async def test_list_services_without_details_with_wrong_user_id_returns_403( + mock_catalog_background_task, + director_mockup: MockRouter, + client: TestClient, + user_id: int, + products_names: List[str], + service_catalog_faker: Callable, + services_db_tables_injector: Callable, +): + target_product = products_names[-1] + # injects fake data in db + NUM_SERVICES = 1 + await services_db_tables_injector( + [ + service_catalog_faker( + "simcore/services/dynamic/jupyterlab", + f"1.0.{s}", + team_access=None, + everyone_access=None, + product=target_product, + ) + for s in range(NUM_SERVICES) + ] ) + url = URL("/v0/services").with_query({"user_id": user_id + 1, "details": "false"}) + response = client.get(f"{url}", headers={"x-simcore-products-name": target_product}) + assert response.status_code == 403 + -async def test_director_mockup( +async def test_list_services_without_details_with_another_product_returns_other_services( + mock_catalog_background_task, director_mockup: MockRouter, - app: FastAPI, - registry_services: List[ServiceOut], + client: TestClient, user_id: int, + products_names: List[str], + service_catalog_faker: Callable, + services_db_tables_injector: Callable, ): - assert await services.list_services(user_id) == registry_services + target_product = products_names[-1] + assert ( + len(products_names) > 1 + ), "please adjust the fixture to have the right number of products" + # injects fake data in db + NUM_SERVICES = 15 + await services_db_tables_injector( + [ + service_catalog_faker( + "simcore/services/dynamic/jupyterlab", + f"1.0.{s}", + team_access=None, + everyone_access=None, + product=target_product, + ) + for s in range(NUM_SERVICES) + ] + ) + + url = URL("/v0/services").with_query({"user_id": user_id, "details": "false"}) + response = client.get( + f"{url}", headers={"x-simcore-products-name": products_names[0]} + ) + assert response.status_code == 200 + data = response.json() + assert len(data) == 0 -@pytest.mark.skip( - reason="Not ready, depency injection does not work, using monkeypatch. still issue with setting up database" -) -async def test_list_services( +async def test_list_services_without_details_with_wrong_product_returns_0_service( + mock_catalog_background_task, director_mockup: MockRouter, - db_mockup: None, - app: FastAPI, client: TestClient, user_id: int, + products_names: List[str], + service_catalog_faker: Callable, + services_db_tables_injector: Callable, ): - await asyncio.sleep(10) + target_product = products_names[-1] + assert ( + len(products_names) > 1 + ), "please adjust the fixture to have the right number of products" + # injects fake data in db + NUM_SERVICES = 1 + await services_db_tables_injector( + [ + service_catalog_faker( + "simcore/services/dynamic/jupyterlab", + f"1.0.{s}", + team_access=None, + everyone_access=None, + product=target_product, + ) + for s in range(NUM_SERVICES) + ] + ) - url = URL("/v0/services").with_query(user_id=user_id) - response = client.get(str(url)) + url = URL("/v0/services").with_query({"user_id": user_id, "details": "false"}) + response = client.get( + f"{url}", headers={"x-simcore-products-name": "no valid product"} + ) assert response.status_code == 200 data = response.json() + assert len(data) == 0 diff --git a/services/catalog/tests/unit/with_dbs/test_services_access_rights.py b/services/catalog/tests/unit/with_dbs/test_services_access_rights.py index 6796d8173ce..0b2f43e241d 100644 --- a/services/catalog/tests/unit/with_dbs/test_services_access_rights.py +++ b/services/catalog/tests/unit/with_dbs/test_services_access_rights.py @@ -4,7 +4,6 @@ from typing import Callable, List -from aiopg.sa.engine import Engine from fastapi import FastAPI from models_library.services import ServiceDockerData from models_library.services_db import ServiceAccessRightsAtDB @@ -15,6 +14,7 @@ evaluate_default_policy, reduce_access_rights, ) +from sqlalchemy.ext.asyncio import AsyncEngine pytest_simcore_core_services_selection = [ "postgres", @@ -80,7 +80,7 @@ def test_reduce_access_rights(): async def test_auto_upgrade_policy( - aiopg_engine: Engine, + sqlalchemy_async_engine: AsyncEngine, user_groups_ids: List[int], products_names: List[str], services_db_tables_injector: Callable, @@ -145,7 +145,7 @@ async def test_auto_upgrade_policy( # ------------ app = FastAPI() - app.state.engine = aiopg_engine + app.state.engine = sqlalchemy_async_engine app.state.settings = mocker.Mock() app.state.settings.CATALOG_ACCESS_RIGHTS_DEFAULT_PRODUCT_NAME = target_product diff --git a/services/director-v2/tests/unit/test_modules_dask_client.py b/services/director-v2/tests/unit/test_modules_dask_client.py index 0dbbf93d106..7fd45ddf617 100644 --- a/services/director-v2/tests/unit/test_modules_dask_client.py +++ b/services/director-v2/tests/unit/test_modules_dask_client.py @@ -95,7 +95,7 @@ async def _assert_wait_for_task_status( ) current_task_status = await dask_client.get_task_status(job_id) assert isinstance(current_task_status, RunningState) - print(f"{current_task_status=} vs {expected_status}") + print(f"{current_task_status=} vs {expected_status=}") assert current_task_status == expected_status @@ -637,7 +637,7 @@ def fake_remote_fct( # after releasing the results, the task shall be UNKNOWN await dask_client.release_task_result(job_id) await _assert_wait_for_task_status( - job_id, dask_client, RunningState.UNKNOWN, timeout=60 + job_id, dask_client, RunningState.UNKNOWN, timeout=120 ) diff --git a/tests/performance/locust_files/catalog_services.py b/tests/performance/locust_files/catalog_services.py index 0a44dc63748..62efb86c1e1 100644 --- a/tests/performance/locust_files/catalog_services.py +++ b/tests/performance/locust_files/catalog_services.py @@ -3,7 +3,7 @@ # import logging -import os +from time import time import faker from dotenv import load_dotenv @@ -23,30 +23,38 @@ def __init__(self, *args, **kwargs): self.email = fake.email() - # @task - # def health_check(self): - # self.client.get("/v0/health") - - @task(weight=5) - def get_services(self): - self.client.get( - f"/v0/catalog/services", - ) + @task() + def get_services_with_details(self): + start = time() + with self.client.get( + "/v0/services?user_id=1&details=true", + headers={ + "x-simcore-products-name": "osparc", + }, + catch_response=True, + ) as response: + response.raise_for_status() + num_services = len(response.json()) + print(f"got {num_services} WITH DETAILS in {time() - start}s") + response.success() + + @task() + def get_services_without_details(self): + start = time() + with self.client.get( + "/v0/services?user_id=1&details=false", + headers={ + "x-simcore-products-name": "osparc", + }, + catch_response=True, + ) as response: + response.raise_for_status() + num_services = len(response.json()) + print(f"got {num_services} in {time() - start}s") + response.success() def on_start(self): print("Created User ", self.email) - self.client.post( - "/v0/auth/register", - json={ - "email": self.email, - "password": "my secret", - "confirm": "my secret", - }, - ) - self.client.post( - "/v0/auth/login", json={"email": self.email, "password": "my secret"} - ) def on_stop(self): - self.client.post("/v0/auth/logout") print("Stopping", self.email) diff --git a/tests/performance/locust_files/webserver_services.py b/tests/performance/locust_files/webserver_services.py new file mode 100644 index 00000000000..45432cfcd2e --- /dev/null +++ b/tests/performance/locust_files/webserver_services.py @@ -0,0 +1,51 @@ +# +# SEE https://docs.locust.io/en/stable/quickstart.html +# + +import logging + +import faker +from dotenv import load_dotenv +from locust import task +from locust.contrib.fasthttp import FastHttpUser + +logging.basicConfig(level=logging.INFO) + +fake = faker.Faker() + +load_dotenv() # take environment variables from .env + + +class WebApiUser(FastHttpUser): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + self.email = fake.email() + + # @task + # def health_check(self): + # self.client.get("/v0/health") + + @task(weight=5) + def get_services(self): + self.client.get( + f"/v0/catalog/services", + ) + + def on_start(self): + print("Created User ", self.email) + self.client.post( + "/v0/auth/register", + json={ + "email": self.email, + "password": "my secret", + "confirm": "my secret", + }, + ) + self.client.post( + "/v0/auth/login", json={"email": self.email, "password": "my secret"} + ) + + def on_stop(self): + self.client.post("/v0/auth/logout") + print("Stopping", self.email)