From 5c3e227bafb817adfc8d8416bfa05948e4309d44 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 4 Mar 2022 08:39:09 +0100 Subject: [PATCH 01/41] replace aiopg --- services/catalog/requirements/_base.in | 2 +- services/catalog/requirements/_base.txt | 10 ++-------- services/catalog/requirements/_test.txt | 4 +--- 3 files changed, 4 insertions(+), 12 deletions(-) diff --git a/services/catalog/requirements/_base.in b/services/catalog/requirements/_base.in index 1f8066416f0..c3b97677471 100644 --- a/services/catalog/requirements/_base.in +++ b/services/catalog/requirements/_base.in @@ -21,7 +21,7 @@ fastapi[all] pydantic[dotenv] # database -aiopg[sa] +sqlalchemy[asyncio] # web client httpx diff --git a/services/catalog/requirements/_base.txt b/services/catalog/requirements/_base.txt index 6c990bab139..12903ca411b 100644 --- a/services/catalog/requirements/_base.txt +++ b/services/catalog/requirements/_base.txt @@ -12,8 +12,6 @@ aiofiles==0.5.0 # via # -c requirements/../../../packages/service-library/requirements/./_base.in # -r requirements/../../../packages/service-library/requirements/_base.in -aiopg==1.3.3 - # via -r requirements/_base.in alembic==1.7.4 # via -r requirements/../../../packages/postgres-database/requirements/_base.in anyio==3.5.0 @@ -22,8 +20,6 @@ anyio==3.5.0 # starlette asgiref==3.4.1 # via uvicorn -async-timeout==4.0.2 - # via aiopg certifi==2020.12.5 # via # httpcore @@ -113,9 +109,7 @@ orjson==3.4.8 packaging==20.9 # via -r requirements/_base.in psycopg2-binary==2.8.6 - # via - # aiopg - # sqlalchemy + # via sqlalchemy pydantic==1.9.0 # via # -c requirements/../../../packages/models-library/requirements/../../../requirements/constraints.txt @@ -180,7 +174,7 @@ sqlalchemy==1.4.31 # -c requirements/../../../packages/settings-library/requirements/../../../requirements/constraints.txt # -c requirements/../../../requirements/constraints.txt # -r requirements/../../../packages/postgres-database/requirements/_base.in - # aiopg + # -r requirements/_base.in # alembic starlette==0.17.1 # via fastapi diff --git a/services/catalog/requirements/_test.txt b/services/catalog/requirements/_test.txt index bb18eb03a3c..f24a5ee60c6 100644 --- a/services/catalog/requirements/_test.txt +++ b/services/catalog/requirements/_test.txt @@ -21,9 +21,7 @@ anyio==3.5.0 astroid==2.9.3 # via pylint async-timeout==4.0.2 - # via - # -c requirements/_base.txt - # aiohttp + # via aiohttp attrs==21.4.0 # via # aiohttp From e9c9b6f6fcc48d130a9ce61e4260c4acc8849095 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 4 Mar 2022 08:45:13 +0100 Subject: [PATCH 02/41] catalog uses sqlalchemy in async mode --- .../utils_aiosqlalchemy.py | 49 +++++++++++++++++++ .../src/simcore_service_catalog/db/events.py | 16 +++--- .../db/repositories/_base.py | 4 +- .../db/repositories/dags.py | 18 +++---- .../db/repositories/groups.py | 15 +++--- .../db/repositories/projects.py | 2 +- .../db/repositories/services.py | 27 ++++------ 7 files changed, 86 insertions(+), 45 deletions(-) create mode 100644 packages/postgres-database/src/simcore_postgres_database/utils_aiosqlalchemy.py diff --git a/packages/postgres-database/src/simcore_postgres_database/utils_aiosqlalchemy.py b/packages/postgres-database/src/simcore_postgres_database/utils_aiosqlalchemy.py new file mode 100644 index 00000000000..90b03f67ac5 --- /dev/null +++ b/packages/postgres-database/src/simcore_postgres_database/utils_aiosqlalchemy.py @@ -0,0 +1,49 @@ +from typing import Any, Dict + + +from sqlalchemy.ext.asyncio import AsyncEngine + +from .utils_migration import get_current_head + +_ENGINE_ATTRS = "closed driver dsn freesize maxsize minsize name size timeout".split() + + +def get_pg_engine_info(engine: AsyncEngine) -> Dict[str, Any]: + return {attr: getattr(engine, attr, None) for attr in _ENGINE_ATTRS} + + +from sqlalchemy.pool import Pool + + +def get_pg_engine_stateinfo(engine: AsyncEngine) -> Dict[str, Any]: + engine_pool: Pool = engine.pool + return engine_pool.info + # return { + # "size": engine.size, + # "acquired": engine.size - engine.freesize, + # "free": engine.freesize, + # "reserved": {"min": engine.minsize, "max": engine.maxsize}, + # } + + +async def close_engine(engine: AsyncEngine) -> None: + await engine.dispose() + + +class DBMigrationError(RuntimeError): + pass + + +async def raise_if_migration_not_ready(engine: AsyncEngine): + """Ensures db migration is complete + + :raises DBMigrationError + :raises + """ + async with engine.connect() as conn: + version_num = await conn.scalar('SELECT "version_num" FROM "alembic_version"') + head_version_num = get_current_head() + if version_num != head_version_num: + raise DBMigrationError( + f"Migration is incomplete, expected {head_version_num} but got {version_num}" + ) diff --git a/services/catalog/src/simcore_service_catalog/db/events.py b/services/catalog/src/simcore_service_catalog/db/events.py index 181e4756469..aa822cec3f8 100644 --- a/services/catalog/src/simcore_service_catalog/db/events.py +++ b/services/catalog/src/simcore_service_catalog/db/events.py @@ -1,9 +1,8 @@ import logging -from aiopg.sa import Engine, create_engine from fastapi import FastAPI from servicelib.retry_policies import PostgresRetryPolicyUponInitialization -from simcore_postgres_database.utils_aiopg import ( +from simcore_postgres_database.utils_aiosqlalchemy import ( close_engine, get_pg_engine_info, raise_if_migration_not_ready, @@ -13,19 +12,20 @@ from ..core.settings import PostgresSettings logger = logging.getLogger(__name__) +from sqlalchemy.ext.asyncio import create_async_engine, AsyncEngine @retry(**PostgresRetryPolicyUponInitialization(logger).kwargs) async def connect_to_db(app: FastAPI) -> None: logger.debug("Connecting db ...") cfg: PostgresSettings = app.state.settings.CATALOG_POSTGRES - engine: Engine = await create_engine( - str(cfg.dsn), + engine: AsyncEngine = await create_async_engine( + cfg.dsn, application_name=f"{__name__}_{id(app)}", # unique identifier per app - minsize=cfg.POSTGRES_MINSIZE, - maxsize=cfg.POSTGRES_MAXSIZE, + pool_size=cfg.POSTGRES_MINSIZE, + max_overflow=cfg.POSTGRES_MAXSIZE - cfg.POSTGRES_MINSIZE, ) - logger.debug("Connected to %s", engine.dsn) + logger.debug("Connected to %s", engine.url) logger.debug("Checking db migrationn ...") try: @@ -50,4 +50,4 @@ async def close_db_connection(app: FastAPI) -> None: if engine := app.state.engine: await close_engine(engine) - logger.debug("Disconnected from %s", engine.dsn) + logger.debug("Disconnected from %s", engine.url) diff --git a/services/catalog/src/simcore_service_catalog/db/repositories/_base.py b/services/catalog/src/simcore_service_catalog/db/repositories/_base.py index 1f7cc57a069..4a20b37c735 100644 --- a/services/catalog/src/simcore_service_catalog/db/repositories/_base.py +++ b/services/catalog/src/simcore_service_catalog/db/repositories/_base.py @@ -1,6 +1,6 @@ from dataclasses import dataclass -from aiopg.sa import Engine +from sqlalchemy.ext.asyncio import AsyncEngine @dataclass @@ -9,4 +9,4 @@ class BaseRepository: Repositories are pulled at every request """ - db_engine: Engine = None + db_engine: AsyncEngine diff --git a/services/catalog/src/simcore_service_catalog/db/repositories/dags.py b/services/catalog/src/simcore_service_catalog/db/repositories/dags.py index 07397f61103..83f33da3e3d 100644 --- a/services/catalog/src/simcore_service_catalog/db/repositories/dags.py +++ b/services/catalog/src/simcore_service_catalog/db/repositories/dags.py @@ -2,7 +2,7 @@ from typing import List, Optional import sqlalchemy as sa -from aiopg.sa.result import RowProxy +from sqlalchemy.engine.result import Row from ...models.domain.dag import DAGAtDB from ...models.schemas.dag import DAGIn @@ -13,21 +13,21 @@ class DAGsRepository(BaseRepository): async def list_dags(self) -> List[DAGAtDB]: dagraphs = [] - async with self.db_engine.acquire() as conn: + async with self.db_engine.connect() as conn: async for row in conn.execute(dags.select()): - dagraphs.append(DAGAtDB(**row)) + dagraphs.append(DAGAtDB.parse_obj(row)) return dagraphs async def get_dag(self, dag_id: int) -> Optional[DAGAtDB]: - async with self.db_engine.acquire() as conn: - row: RowProxy = await ( + async with self.db_engine.connect() as conn: + row: Row = await ( await conn.execute(dags.select().where(dags.c.id == dag_id)) ).first() if row: return DAGAtDB(**row) async def create_dag(self, dag: DAGIn) -> int: - async with self.db_engine.acquire() as conn: + async with self.db_engine.connect() as conn: new_id: int = await ( await conn.execute( dags.insert().values( @@ -39,7 +39,7 @@ async def create_dag(self, dag: DAGIn) -> int: return new_id async def replace_dag(self, dag_id: int, dag: DAGIn): - async with self.db_engine.acquire() as conn: + async with self.db_engine.connect() as conn: await conn.execute( dags.update() .values( @@ -53,7 +53,7 @@ async def update_dag(self, dag_id: int, dag: DAGIn): patch = dag.dict(exclude_unset=True, exclude={"workbench"}) if "workbench" in dag.__fields_set__: patch["workbench"] = json.dumps(patch["workbench"]) - async with self.db_engine.acquire() as conn: + async with self.db_engine.connect() as conn: res = await conn.execute( sa.update(dags).values(**patch).where(dags.c.id == dag_id) ) @@ -62,5 +62,5 @@ async def update_dag(self, dag_id: int, dag: DAGIn): assert res.returns_rows == False # nosec async def delete_dag(self, dag_id: int): - async with self.db_engine.acquire() as conn: + async with self.db_engine.connect() as conn: await conn.execute(sa.delete(dags).where(dags.c.id == dag_id)) diff --git a/services/catalog/src/simcore_service_catalog/db/repositories/groups.py b/services/catalog/src/simcore_service_catalog/db/repositories/groups.py index eb44da3588f..2f2aacff6f5 100644 --- a/services/catalog/src/simcore_service_catalog/db/repositories/groups.py +++ b/services/catalog/src/simcore_service_catalog/db/repositories/groups.py @@ -1,7 +1,6 @@ from typing import Dict, List, Optional, Set import sqlalchemy as sa -from aiopg.sa.result import RowProxy from pydantic.networks import EmailStr from pydantic.types import PositiveInt @@ -14,7 +13,7 @@ class GroupsRepository(BaseRepository): async def list_user_groups(self, user_id: int) -> List[GroupAtDB]: groups_in_db = [] - async with self.db_engine.acquire() as conn: + async with self.db_engine.connect() as conn: async for row in conn.execute( sa.select([groups]) .select_from( @@ -26,8 +25,8 @@ async def list_user_groups(self, user_id: int) -> List[GroupAtDB]: return groups_in_db async def get_everyone_group(self) -> GroupAtDB: - async with self.db_engine.acquire() as conn: - row: RowProxy = await ( + async with self.db_engine.connect() as conn: + row: sa.engine.result.Row = await ( await conn.execute( sa.select([groups]).where(groups.c.type == GroupType.EVERYONE) ) @@ -39,19 +38,19 @@ async def get_everyone_group(self) -> GroupAtDB: async def get_user_gid_from_email( self, user_email: EmailStr ) -> Optional[PositiveInt]: - async with self.db_engine.acquire() as conn: + async with self.db_engine.connect() as conn: return await conn.scalar( sa.select([users.c.primary_gid]).where(users.c.email == user_email) ) async def get_gid_from_affiliation(self, affiliation: str) -> Optional[PositiveInt]: - async with self.db_engine.acquire() as conn: + async with self.db_engine.connect() as conn: return await conn.scalar( sa.select([groups.c.gid]).where(groups.c.name == affiliation) ) async def get_user_email_from_gid(self, gid: PositiveInt) -> Optional[EmailStr]: - async with self.db_engine.acquire() as conn: + async with self.db_engine.connect() as conn: return await conn.scalar( sa.select([users.c.email]).where(users.c.primary_gid == gid) ) @@ -60,7 +59,7 @@ async def list_user_emails_from_gids( self, gids: Set[PositiveInt] ) -> Dict[PositiveInt, Optional[EmailStr]]: service_owners = {} - async with self.db_engine.acquire() as conn: + async with self.db_engine.connect() as conn: async for row in conn.execute( sa.select([users.c.primary_gid, users.c.email]).where( users.c.primary_gid.in_(gids) diff --git a/services/catalog/src/simcore_service_catalog/db/repositories/projects.py b/services/catalog/src/simcore_service_catalog/db/repositories/projects.py index 78f2d07e1f7..efd90d9401f 100644 --- a/services/catalog/src/simcore_service_catalog/db/repositories/projects.py +++ b/services/catalog/src/simcore_service_catalog/db/repositories/projects.py @@ -14,7 +14,7 @@ class ProjectsRepository(BaseRepository): async def list_services_from_published_templates(self) -> List[ServiceKeyVersion]: list_of_published_services: List[ServiceKeyVersion] = [] - async with self.db_engine.acquire() as conn: + async with self.db_engine.connect() as conn: async for row in conn.execute( sa.select([projects]).where( (projects.c.type == ProjectType.TEMPLATE) diff --git a/services/catalog/src/simcore_service_catalog/db/repositories/services.py b/services/catalog/src/simcore_service_catalog/db/repositories/services.py index 0df83258785..a1b1d1ac045 100644 --- a/services/catalog/src/simcore_service_catalog/db/repositories/services.py +++ b/services/catalog/src/simcore_service_catalog/db/repositories/services.py @@ -74,7 +74,7 @@ async def list_services( ) -> List[ServiceMetaDataAtDB]: services_in_db = [] - async with self.db_engine.acquire() as conn: + async with self.db_engine.connect() as conn: async for row in conn.execute( _make_list_services_query( gids, @@ -124,7 +124,7 @@ async def list_service_releases( query = query.limit(limit_count) releases = [] - async with self.db_engine.acquire() as conn: + async with self.db_engine.connect() as conn: async for row in conn.execute(query): releases.append(ServiceMetaDataAtDB(**row)) @@ -170,7 +170,7 @@ async def get_service( ) ) ) - async with self.db_engine.acquire() as conn: + async with self.db_engine.connect() as conn: result = await conn.execute(query) row = await result.first() if row: @@ -191,7 +191,7 @@ async def create_service( f"{access_rights} does not correspond to service {new_service.key}:{new_service.version}" ) - async with self.db_engine.acquire() as conn: + async with self.db_engine.connect() as conn: # NOTE: this ensure proper rollback in case of issue async with conn.begin() as _transaction: result = await conn.execute( @@ -215,7 +215,7 @@ async def update_service( self, patched_service: ServiceMetaDataAtDB ) -> ServiceMetaDataAtDB: # update the services_meta_data table - async with self.db_engine.acquire() as conn: + async with self.db_engine.connect() as conn: result = await conn.execute( # pylint: disable=no-value-for-parameter services_meta_data.update() @@ -249,7 +249,7 @@ async def get_service_access_rights( query = sa.select([services_access_rights]).where(search_expression) - async with self.db_engine.acquire() as conn: + async with self.db_engine.connect() as conn: async for row in conn.execute(query): services_in_db.append(ServiceAccessRightsAtDB(**row)) return services_in_db @@ -271,15 +271,8 @@ async def list_services_access_rights( else True ) ) - async with self.db_engine.acquire() as conn: - # NOTE: this strange query compile is due to an incompatilility - # between aiopg.sa and sqlalchemy 1.4 - # One of the maintainer of aiopg says: - # https://github.com/aio-libs/aiopg/issues/798#issuecomment-934256346 - # we should drop aiopg.sa - async for row in conn.execute( - f"{(query.compile(compile_kwargs={'literal_binds': True}))}" - ): + async with self.db_engine.connect() as conn: + async for row in conn.execute(query): service_to_access_rights[ ( row[services_access_rights.c.key], @@ -306,7 +299,7 @@ async def upsert_service_access_rights( set_=rights.dict(by_alias=True, exclude_unset=True), ) try: - async with self.db_engine.acquire() as conn: + async with self.db_engine.connect() as conn: await conn.execute( # pylint: disable=no-value-for-parameter on_update_stmt @@ -321,7 +314,7 @@ async def upsert_service_access_rights( async def delete_service_access_rights( self, delete_access_rights: List[ServiceAccessRightsAtDB] ) -> None: - async with self.db_engine.acquire() as conn: + async with self.db_engine.connect() as conn: for rights in delete_access_rights: await conn.execute( # pylint: disable=no-value-for-parameter From 8a9a42a9affe3f86edb67700b403fa7f9667939e Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 4 Mar 2022 08:51:05 +0100 Subject: [PATCH 03/41] only allow aiopg in tests --- services/catalog/requirements/_test.in | 1 + services/catalog/requirements/_test.txt | 8 +++++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/services/catalog/requirements/_test.in b/services/catalog/requirements/_test.in index 644fe417545..29deeda5155 100644 --- a/services/catalog/requirements/_test.in +++ b/services/catalog/requirements/_test.in @@ -26,6 +26,7 @@ respx # migration due to pytest_simcore.postgres_service alembic +aiopg[sa] docker click diff --git a/services/catalog/requirements/_test.txt b/services/catalog/requirements/_test.txt index f24a5ee60c6..9e6073028e7 100644 --- a/services/catalog/requirements/_test.txt +++ b/services/catalog/requirements/_test.txt @@ -8,6 +8,8 @@ aiohttp==3.8.1 # via # -c requirements/../../../requirements/constraints.txt # pytest-aiohttp +aiopg==1.3.3 + # via -r requirements/_test.in aiosignal==1.2.0 # via aiohttp alembic==1.7.4 @@ -21,7 +23,9 @@ anyio==3.5.0 astroid==2.9.3 # via pylint async-timeout==4.0.2 - # via aiohttp + # via + # aiohttp + # aiopg attrs==21.4.0 # via # aiohttp @@ -159,6 +163,7 @@ pluggy==1.0.0 psycopg2-binary==2.8.6 # via # -c requirements/_base.txt + # aiopg # sqlalchemy ptvsd==4.3.2 # via -r requirements/_test.in @@ -238,6 +243,7 @@ sqlalchemy==1.4.31 # via # -c requirements/../../../requirements/constraints.txt # -c requirements/_base.txt + # aiopg # alembic texttable==1.6.4 # via docker-compose From ee2c755da3b7dff5137574f12c24bb98ba773128 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 4 Mar 2022 16:14:12 +0100 Subject: [PATCH 04/41] asyncpg needed by sqlalchemy --- services/catalog/requirements/_base.in | 1 + services/catalog/requirements/_base.txt | 2 ++ 2 files changed, 3 insertions(+) diff --git a/services/catalog/requirements/_base.in b/services/catalog/requirements/_base.in index c3b97677471..431962463f1 100644 --- a/services/catalog/requirements/_base.in +++ b/services/catalog/requirements/_base.in @@ -21,6 +21,7 @@ fastapi[all] pydantic[dotenv] # database +asyncpg sqlalchemy[asyncio] # web client diff --git a/services/catalog/requirements/_base.txt b/services/catalog/requirements/_base.txt index 12903ca411b..e02b4cd2bd8 100644 --- a/services/catalog/requirements/_base.txt +++ b/services/catalog/requirements/_base.txt @@ -20,6 +20,8 @@ anyio==3.5.0 # starlette asgiref==3.4.1 # via uvicorn +asyncpg==0.25.0 + # via -r requirements/_base.in certifi==2020.12.5 # via # httpcore From 2613623c94a2ae9b7d9e085b4431721f92066445 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 4 Mar 2022 16:14:36 +0100 Subject: [PATCH 05/41] added utils for async sqlalchemy --- .../utils_aiosqlalchemy.py | 43 +++++++++++-------- 1 file changed, 26 insertions(+), 17 deletions(-) diff --git a/packages/postgres-database/src/simcore_postgres_database/utils_aiosqlalchemy.py b/packages/postgres-database/src/simcore_postgres_database/utils_aiosqlalchemy.py index 90b03f67ac5..200a8237a4d 100644 --- a/packages/postgres-database/src/simcore_postgres_database/utils_aiosqlalchemy.py +++ b/packages/postgres-database/src/simcore_postgres_database/utils_aiosqlalchemy.py @@ -1,29 +1,36 @@ -from typing import Any, Dict - +from typing import Any, Dict, List +import sqlalchemy as sa from sqlalchemy.ext.asyncio import AsyncEngine from .utils_migration import get_current_head -_ENGINE_ATTRS = "closed driver dsn freesize maxsize minsize name size timeout".split() - -def get_pg_engine_info(engine: AsyncEngine) -> Dict[str, Any]: - return {attr: getattr(engine, attr, None) for attr in _ENGINE_ATTRS} +async def _get_connections(engine, db_name: str) -> List[Dict]: + """Return information about connections""" + sql = sa.DDL( + f""" + SELECT + pid, + state + FROM pg_stat_activity + WHERE datname = '{db_name}' + AND query NOT LIKE '%%FROM pg_stat_activity%%' + """ + ) + async with engine.connect() as conn: + result = await conn.execute(sql) + connections = [{"pid": r[0], "state": r[1]} for r in result.fetchall()] -from sqlalchemy.pool import Pool + return connections -def get_pg_engine_stateinfo(engine: AsyncEngine) -> Dict[str, Any]: - engine_pool: Pool = engine.pool - return engine_pool.info - # return { - # "size": engine.size, - # "acquired": engine.size - engine.freesize, - # "free": engine.freesize, - # "reserved": {"min": engine.minsize, "max": engine.maxsize}, - # } +async def get_pg_engine_stateinfo(engine: AsyncEngine, db_name: str) -> Dict[str, Any]: + return { + "pgserver stats": f"{await _get_connections(engine, db_name)}", + "current pool connections": f"{engine.pool.checkedin()=},{engine.pool.checkedout()=}", + } async def close_engine(engine: AsyncEngine) -> None: @@ -41,7 +48,9 @@ async def raise_if_migration_not_ready(engine: AsyncEngine): :raises """ async with engine.connect() as conn: - version_num = await conn.scalar('SELECT "version_num" FROM "alembic_version"') + version_num = await conn.scalar( + sa.DDL('SELECT "version_num" FROM "alembic_version"') + ) head_version_num = get_current_head() if version_num != head_version_num: raise DBMigrationError( From 3c361344b396c35d649d856933d6f65ca97421e8 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 4 Mar 2022 16:15:20 +0100 Subject: [PATCH 06/41] removed usage of aiopg --- .../api/dependencies/database.py | 22 +++++++------------ .../core/background_tasks.py | 8 +++---- .../src/simcore_service_catalog/db/events.py | 17 +++++++++----- .../db/repositories/dags.py | 7 ++---- .../db/repositories/groups.py | 13 +++++------ .../db/repositories/projects.py | 2 +- .../db/repositories/services.py | 14 ++++++------ .../services/access_rights.py | 4 ++-- 8 files changed, 41 insertions(+), 46 deletions(-) diff --git a/services/catalog/src/simcore_service_catalog/api/dependencies/database.py b/services/catalog/src/simcore_service_catalog/api/dependencies/database.py index 35d9d2c60a9..2b0118662f3 100644 --- a/services/catalog/src/simcore_service_catalog/api/dependencies/database.py +++ b/services/catalog/src/simcore_service_catalog/api/dependencies/database.py @@ -1,38 +1,32 @@ import logging from typing import AsyncGenerator, Callable, Type -from aiopg.sa import Engine from fastapi import Depends from fastapi.requests import Request +from sqlalchemy.ext.asyncio import AsyncEngine from ...db.repositories import BaseRepository logger = logging.getLogger(__name__) -def _get_db_engine(request: Request) -> Engine: +def _get_db_engine(request: Request) -> AsyncEngine: return request.app.state.engine def get_repository(repo_type: Type[BaseRepository]) -> Callable: async def _get_repo( - engine: Engine = Depends(_get_db_engine), + engine: AsyncEngine = Depends(_get_db_engine), ) -> AsyncGenerator[BaseRepository, None]: # NOTE: 2 different ideas were tried here with not so good # 1st one was acquiring a connection per repository which lead to the following issue https://github.com/ITISFoundation/osparc-simcore/pull/1966 # 2nd one was acquiring a connection per request which works but blocks the director-v2 responsiveness once # the max amount of connections is reached - # now the current solution is to acquire connection when needed. - available_engines = engine.maxsize - (engine.size - engine.freesize) - if available_engines <= 1: - logger.warning( - "Low pg connections available in pool: pool size=%d, acquired=%d, free=%d, reserved=[%d, %d]", - engine.size, - engine.size - engine.freesize, - engine.freesize, - engine.minsize, - engine.maxsize, - ) + # now the current solution is to connect connection when needed. + logger.info( + "%s", + f"current pool connections {engine.pool.checkedin()=},{engine.pool.checkedout()=}", + ) yield repo_type(db_engine=engine) return _get_repo diff --git a/services/catalog/src/simcore_service_catalog/core/background_tasks.py b/services/catalog/src/simcore_service_catalog/core/background_tasks.py index 6f99d194c78..fe9f64cf84d 100644 --- a/services/catalog/src/simcore_service_catalog/core/background_tasks.py +++ b/services/catalog/src/simcore_service_catalog/core/background_tasks.py @@ -15,12 +15,12 @@ from pprint import pformat from typing import Dict, Set, Tuple -from aiopg.sa import Engine from fastapi import FastAPI from models_library.services import ServiceDockerData from models_library.services_db import ServiceAccessRightsAtDB, ServiceMetaDataAtDB from packaging.version import Version from pydantic import ValidationError +from sqlalchemy.ext.asyncio import AsyncEngine from ..api.dependencies.director import get_director_api from ..db.repositories.groups import GroupsRepository @@ -63,7 +63,7 @@ async def _list_registry_services( async def _list_db_services( - db_engine: Engine, + db_engine: AsyncEngine, ) -> Set[Tuple[ServiceKey, ServiceVersion]]: services_repo = ServicesRepository(db_engine=db_engine) return { @@ -138,7 +138,7 @@ async def _ensure_registry_insync_with_db(app: FastAPI) -> None: async def _ensure_published_templates_accessible( - db_engine: Engine, default_product_name: str + db_engine: AsyncEngine, default_product_name: str ) -> None: # Rationale: if a project template was published, its services must be available to everyone. # a published template has a column Published that is set to True @@ -180,7 +180,7 @@ async def _ensure_published_templates_accessible( async def sync_registry_task(app: FastAPI) -> None: default_product: str = app.state.settings.CATALOG_ACCESS_RIGHTS_DEFAULT_PRODUCT_NAME - engine: Engine = app.state.engine + engine: AsyncEngine = app.state.engine while app.state.registry_syncer_running: try: diff --git a/services/catalog/src/simcore_service_catalog/db/events.py b/services/catalog/src/simcore_service_catalog/db/events.py index aa822cec3f8..c6857253746 100644 --- a/services/catalog/src/simcore_service_catalog/db/events.py +++ b/services/catalog/src/simcore_service_catalog/db/events.py @@ -4,26 +4,31 @@ from servicelib.retry_policies import PostgresRetryPolicyUponInitialization from simcore_postgres_database.utils_aiosqlalchemy import ( close_engine, - get_pg_engine_info, + get_pg_engine_stateinfo, raise_if_migration_not_ready, ) +from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine from tenacity import retry from ..core.settings import PostgresSettings logger = logging.getLogger(__name__) -from sqlalchemy.ext.asyncio import create_async_engine, AsyncEngine @retry(**PostgresRetryPolicyUponInitialization(logger).kwargs) async def connect_to_db(app: FastAPI) -> None: logger.debug("Connecting db ...") cfg: PostgresSettings = app.state.settings.CATALOG_POSTGRES - engine: AsyncEngine = await create_async_engine( - cfg.dsn, - application_name=f"{__name__}_{id(app)}", # unique identifier per app + logger.debug(cfg.dsn) + modified_dsn = cfg.dsn.replace("postgresql", "postgresql+asyncpg") + logger.debug(modified_dsn) + engine: AsyncEngine = create_async_engine( + modified_dsn, pool_size=cfg.POSTGRES_MINSIZE, max_overflow=cfg.POSTGRES_MAXSIZE - cfg.POSTGRES_MINSIZE, + connect_args={ + "server_settings": {"application_name": cfg.POSTGRES_CLIENT_NAME} + }, ) logger.debug("Connected to %s", engine.url) @@ -40,7 +45,7 @@ async def connect_to_db(app: FastAPI) -> None: logger.debug( "Setup engine: %s", - get_pg_engine_info(engine), + await get_pg_engine_stateinfo(engine, cfg.POSTGRES_DB), ) diff --git a/services/catalog/src/simcore_service_catalog/db/repositories/dags.py b/services/catalog/src/simcore_service_catalog/db/repositories/dags.py index 83f33da3e3d..5a6837b20a7 100644 --- a/services/catalog/src/simcore_service_catalog/db/repositories/dags.py +++ b/services/catalog/src/simcore_service_catalog/db/repositories/dags.py @@ -2,7 +2,6 @@ from typing import List, Optional import sqlalchemy as sa -from sqlalchemy.engine.result import Row from ...models.domain.dag import DAGAtDB from ...models.schemas.dag import DAGIn @@ -14,15 +13,13 @@ class DAGsRepository(BaseRepository): async def list_dags(self) -> List[DAGAtDB]: dagraphs = [] async with self.db_engine.connect() as conn: - async for row in conn.execute(dags.select()): + async for row in await conn.stream(dags.select()): dagraphs.append(DAGAtDB.parse_obj(row)) return dagraphs async def get_dag(self, dag_id: int) -> Optional[DAGAtDB]: async with self.db_engine.connect() as conn: - row: Row = await ( - await conn.execute(dags.select().where(dags.c.id == dag_id)) - ).first() + row = await conn.execute(dags.select().where(dags.c.id == dag_id)).first() if row: return DAGAtDB(**row) diff --git a/services/catalog/src/simcore_service_catalog/db/repositories/groups.py b/services/catalog/src/simcore_service_catalog/db/repositories/groups.py index 2f2aacff6f5..0279da181b6 100644 --- a/services/catalog/src/simcore_service_catalog/db/repositories/groups.py +++ b/services/catalog/src/simcore_service_catalog/db/repositories/groups.py @@ -14,7 +14,7 @@ class GroupsRepository(BaseRepository): async def list_user_groups(self, user_id: int) -> List[GroupAtDB]: groups_in_db = [] async with self.db_engine.connect() as conn: - async for row in conn.execute( + async for row in await conn.stream( sa.select([groups]) .select_from( user_to_groups.join(groups, user_to_groups.c.gid == groups.c.gid), @@ -26,11 +26,10 @@ async def list_user_groups(self, user_id: int) -> List[GroupAtDB]: async def get_everyone_group(self) -> GroupAtDB: async with self.db_engine.connect() as conn: - row: sa.engine.result.Row = await ( - await conn.execute( - sa.select([groups]).where(groups.c.type == GroupType.EVERYONE) - ) - ).first() + result = await conn.execute( + sa.select([groups]).where(groups.c.type == GroupType.EVERYONE) + ) + row = result.first() if not row: raise RepositoryError(f"{GroupType.EVERYONE} groups was never initialized") return GroupAtDB(**row) @@ -60,7 +59,7 @@ async def list_user_emails_from_gids( ) -> Dict[PositiveInt, Optional[EmailStr]]: service_owners = {} async with self.db_engine.connect() as conn: - async for row in conn.execute( + async for row in await conn.stream( sa.select([users.c.primary_gid, users.c.email]).where( users.c.primary_gid.in_(gids) ) diff --git a/services/catalog/src/simcore_service_catalog/db/repositories/projects.py b/services/catalog/src/simcore_service_catalog/db/repositories/projects.py index efd90d9401f..9d8fa4e491f 100644 --- a/services/catalog/src/simcore_service_catalog/db/repositories/projects.py +++ b/services/catalog/src/simcore_service_catalog/db/repositories/projects.py @@ -15,7 +15,7 @@ class ProjectsRepository(BaseRepository): async def list_services_from_published_templates(self) -> List[ServiceKeyVersion]: list_of_published_services: List[ServiceKeyVersion] = [] async with self.db_engine.connect() as conn: - async for row in conn.execute( + async for row in await conn.stream( sa.select([projects]).where( (projects.c.type == ProjectType.TEMPLATE) & (projects.c.published == True) diff --git a/services/catalog/src/simcore_service_catalog/db/repositories/services.py b/services/catalog/src/simcore_service_catalog/db/repositories/services.py index a1b1d1ac045..b309d3e3f9a 100644 --- a/services/catalog/src/simcore_service_catalog/db/repositories/services.py +++ b/services/catalog/src/simcore_service_catalog/db/repositories/services.py @@ -75,7 +75,7 @@ async def list_services( services_in_db = [] async with self.db_engine.connect() as conn: - async for row in conn.execute( + async for row in await conn.stream( _make_list_services_query( gids, execute_access, @@ -125,7 +125,7 @@ async def list_service_releases( releases = [] async with self.db_engine.connect() as conn: - async for row in conn.execute(query): + async for row in await conn.stream(query): releases.append(ServiceMetaDataAtDB(**row)) return releases @@ -172,7 +172,7 @@ async def get_service( ) async with self.db_engine.connect() as conn: result = await conn.execute(query) - row = await result.first() + row = result.first() if row: return ServiceMetaDataAtDB(**row) @@ -200,7 +200,7 @@ async def create_service( .values(**new_service.dict(by_alias=True)) .returning(literal_column("*")) ) - row = await result.first() + row = result.first() assert row # nosec created_service = ServiceMetaDataAtDB(**row) @@ -226,7 +226,7 @@ async def update_service( .values(**patched_service.dict(by_alias=True, exclude_unset=True)) .returning(literal_column("*")) ) - row = await result.first() + row = result.first() assert row # nosec updated_service = ServiceMetaDataAtDB(**row) return updated_service @@ -250,7 +250,7 @@ async def get_service_access_rights( query = sa.select([services_access_rights]).where(search_expression) async with self.db_engine.connect() as conn: - async for row in conn.execute(query): + async for row in await conn.stream(query): services_in_db.append(ServiceAccessRightsAtDB(**row)) return services_in_db @@ -272,7 +272,7 @@ async def list_services_access_rights( ) ) async with self.db_engine.connect() as conn: - async for row in conn.execute(query): + async for row in await conn.stream(query): service_to_access_rights[ ( row[services_access_rights.c.key], diff --git a/services/catalog/src/simcore_service_catalog/services/access_rights.py b/services/catalog/src/simcore_service_catalog/services/access_rights.py index a7846e44429..bc373ac3912 100644 --- a/services/catalog/src/simcore_service_catalog/services/access_rights.py +++ b/services/catalog/src/simcore_service_catalog/services/access_rights.py @@ -7,12 +7,12 @@ from typing import Callable, Dict, List, Optional, Tuple, Union from urllib.parse import quote_plus -from aiopg.sa.engine import Engine from fastapi import FastAPI from models_library.services import ServiceDockerData from models_library.services_db import ServiceAccessRightsAtDB from packaging.version import Version from pydantic.types import PositiveInt +from sqlalchemy.ext.asyncio import AsyncEngine from ..api.dependencies.director import get_director_api from ..db.repositories.groups import GroupsRepository @@ -54,7 +54,7 @@ async def evaluate_default_policy( 2. Services published after 19.08.2020 will be visible ONLY to his/her owner 3. Front-end services are have execute-access to everyone """ - db_engine: Engine = app.state.engine + db_engine: AsyncEngine = app.state.engine groups_repo = GroupsRepository(db_engine) owner_gid = None From 2dc80203243efdc013f65ab5f9b14fd4365d493c Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 4 Mar 2022 17:14:48 +0100 Subject: [PATCH 07/41] use async sqlalchemy in tests too --- .../src/pytest_simcore/postgres_service.py | 18 ++++++++++++- .../src/simcore_service_catalog/db/events.py | 2 +- .../catalog/tests/unit/with_dbs/conftest.py | 26 ++++++++++++------- .../unit/with_dbs/test_db_repositories.py | 4 +-- .../with_dbs/test_services_access_rights.py | 6 ++--- 5 files changed, 39 insertions(+), 17 deletions(-) diff --git a/packages/pytest-simcore/src/pytest_simcore/postgres_service.py b/packages/pytest-simcore/src/pytest_simcore/postgres_service.py index 5930bd14afd..9b4d2a3b1c0 100644 --- a/packages/pytest-simcore/src/pytest_simcore/postgres_service.py +++ b/packages/pytest-simcore/src/pytest_simcore/postgres_service.py @@ -181,6 +181,22 @@ async def aiopg_engine( await engine.wait_closed() +@pytest.fixture(scope="function") +async def sqlalchemy_async_engine( + postgres_db: sa.engine.Engine, +) -> AsyncIterator: + # NOTE: prevent having to import this if latest sqlalchemy not installed (temp) + from sqlalchemy.ext.asyncio import create_async_engine + + engine = create_async_engine( + f"{postgres_db.url}".replace("postgresql", "postgresql+asyncpg") + ) + assert engine + yield engine + + await engine.dispose() + + @pytest.fixture(scope="function") def postgres_host_config(postgres_dsn: Dict[str, str], monkeypatch) -> Dict[str, str]: monkeypatch.setenv("POSTGRES_USER", postgres_dsn["user"]) @@ -195,7 +211,7 @@ def postgres_host_config(postgres_dsn: Dict[str, str], monkeypatch) -> Dict[str, @pytest.fixture(scope="module") -def postgres_session(postgres_db: sa.engine.Engine) -> sa.orm.session.Session: +def postgres_session(postgres_db: sa.engine.Engine) -> Iterator[sa.orm.session.Session]: from sqlalchemy.orm.session import Session Session_cls = sessionmaker(postgres_db) diff --git a/services/catalog/src/simcore_service_catalog/db/events.py b/services/catalog/src/simcore_service_catalog/db/events.py index c6857253746..d7f20b22d80 100644 --- a/services/catalog/src/simcore_service_catalog/db/events.py +++ b/services/catalog/src/simcore_service_catalog/db/events.py @@ -30,7 +30,7 @@ async def connect_to_db(app: FastAPI) -> None: "server_settings": {"application_name": cfg.POSTGRES_CLIENT_NAME} }, ) - logger.debug("Connected to %s", engine.url) + logger.debug("Connected to %s", cfg.dsn) logger.debug("Checking db migrationn ...") try: diff --git a/services/catalog/tests/unit/with_dbs/conftest.py b/services/catalog/tests/unit/with_dbs/conftest.py index 234bcc8ac9c..f98998875d1 100644 --- a/services/catalog/tests/unit/with_dbs/conftest.py +++ b/services/catalog/tests/unit/with_dbs/conftest.py @@ -11,7 +11,6 @@ import respx import sqlalchemy as sa from _pytest.monkeypatch import MonkeyPatch -from aiopg.sa.engine import Engine from faker import Faker from fastapi import FastAPI from pytest_mock.plugin import MockerFixture @@ -24,6 +23,7 @@ services_meta_data, ) from sqlalchemy.dialects.postgresql import insert as pg_insert +from sqlalchemy.ext.asyncio import AsyncEngine from starlette.testclient import TestClient @@ -83,7 +83,9 @@ def director_mockup(app: FastAPI) -> Iterator[MockRouter]: @pytest.fixture() -async def products_names(aiopg_engine: Engine) -> AsyncIterator[List[str]]: +async def products_names( + sqlalchemy_async_engine: AsyncEngine, +) -> AsyncIterator[List[str]]: """Inits products db table and returns product names""" data = [ # already upon creation: ("osparc", r"([\.-]{0,1}osparc[\.-])"), @@ -93,7 +95,7 @@ async def products_names(aiopg_engine: Engine) -> AsyncIterator[List[str]]: # pylint: disable=no-value-for-parameter - async with aiopg_engine.acquire() as conn: + async with sqlalchemy_async_engine.begin() as conn: # NOTE: The 'default' dialect with current database version settings does not support in-place multirow inserts for name, regex in data: stmt = products.insert().values(name=name, host_regex=regex) @@ -104,12 +106,14 @@ async def products_names(aiopg_engine: Engine) -> AsyncIterator[List[str]]: ] + [items[0] for items in data] yield names - async with aiopg_engine.acquire() as conn: + async with sqlalchemy_async_engine.begin() as conn: await conn.execute(products.delete()) @pytest.fixture() -async def user_groups_ids(aiopg_engine: Engine) -> AsyncIterator[List[int]]: +async def user_groups_ids( + sqlalchemy_async_engine: AsyncEngine, +) -> AsyncIterator[List[int]]: """Inits groups table and returns group identifiers""" cols = ("gid", "name", "description", "type", "thumbnail", "inclusion_rules") @@ -126,7 +130,7 @@ async def user_groups_ids(aiopg_engine: Engine) -> AsyncIterator[List[int]]: ] # pylint: disable=no-value-for-parameter - async with aiopg_engine.acquire() as conn: + async with sqlalchemy_async_engine.begin() as conn: for row in data: # NOTE: The 'default' dialect with current database version settings does not support in-place multirow inserts stmt = groups.insert().values(**dict(zip(cols, row))) @@ -138,13 +142,15 @@ async def user_groups_ids(aiopg_engine: Engine) -> AsyncIterator[List[int]]: yield gids - async with aiopg_engine.acquire() as conn: + async with sqlalchemy_async_engine.begin() as conn: await conn.execute(services_meta_data.delete()) await conn.execute(groups.delete().where(groups.c.gid.in_(gids[1:]))) @pytest.fixture() -async def services_db_tables_injector(aiopg_engine: Engine) -> AsyncIterator[Callable]: +async def services_db_tables_injector( + sqlalchemy_async_engine: AsyncEngine, +) -> AsyncIterator[Callable]: """Returns a helper function to init services_meta_data and services_access_rights tables @@ -175,7 +181,7 @@ async def services_db_tables_injector(aiopg_engine: Engine) -> AsyncIterator[Cal async def inject_in_db(fake_catalog: List[Tuple]): # [(service, ar1, ...), (service2, ar1, ...) ] - async with aiopg_engine.acquire() as conn: + async with sqlalchemy_async_engine.begin() as conn: # NOTE: The 'default' dialect with current database version settings does not support in-place multirow inserts for service in [items[0] for items in fake_catalog]: insert_meta = pg_insert(services_meta_data).values(**service) @@ -194,7 +200,7 @@ async def inject_in_db(fake_catalog: List[Tuple]): yield inject_in_db - async with aiopg_engine.acquire() as conn: + async with sqlalchemy_async_engine.begin() as conn: await conn.execute(services_access_rights.delete()) await conn.execute(services_meta_data.delete()) diff --git a/services/catalog/tests/unit/with_dbs/test_db_repositories.py b/services/catalog/tests/unit/with_dbs/test_db_repositories.py index 52b5332d645..c7fa11550e6 100644 --- a/services/catalog/tests/unit/with_dbs/test_db_repositories.py +++ b/services/catalog/tests/unit/with_dbs/test_db_repositories.py @@ -20,8 +20,8 @@ @pytest.fixture -def services_repo(aiopg_engine): - repo = ServicesRepository(aiopg_engine) +def services_repo(sqlalchemy_async_engine): + repo = ServicesRepository(sqlalchemy_async_engine) return repo diff --git a/services/catalog/tests/unit/with_dbs/test_services_access_rights.py b/services/catalog/tests/unit/with_dbs/test_services_access_rights.py index 6796d8173ce..0b2f43e241d 100644 --- a/services/catalog/tests/unit/with_dbs/test_services_access_rights.py +++ b/services/catalog/tests/unit/with_dbs/test_services_access_rights.py @@ -4,7 +4,6 @@ from typing import Callable, List -from aiopg.sa.engine import Engine from fastapi import FastAPI from models_library.services import ServiceDockerData from models_library.services_db import ServiceAccessRightsAtDB @@ -15,6 +14,7 @@ evaluate_default_policy, reduce_access_rights, ) +from sqlalchemy.ext.asyncio import AsyncEngine pytest_simcore_core_services_selection = [ "postgres", @@ -80,7 +80,7 @@ def test_reduce_access_rights(): async def test_auto_upgrade_policy( - aiopg_engine: Engine, + sqlalchemy_async_engine: AsyncEngine, user_groups_ids: List[int], products_names: List[str], services_db_tables_injector: Callable, @@ -145,7 +145,7 @@ async def test_auto_upgrade_policy( # ------------ app = FastAPI() - app.state.engine = aiopg_engine + app.state.engine = sqlalchemy_async_engine app.state.settings = mocker.Mock() app.state.settings.CATALOG_ACCESS_RIGHTS_DEFAULT_PRODUCT_NAME = target_product From fb2d97c573fc008bb609d9c3cce2ec07288d193a Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Sun, 6 Mar 2022 22:12:06 +0100 Subject: [PATCH 08/41] a test for measuring catalog listing speed --- .../locust_files/catalog_servicesd.py | 60 +++++++++++++++++++ 1 file changed, 60 insertions(+) create mode 100644 tests/performance/locust_files/catalog_servicesd.py diff --git a/tests/performance/locust_files/catalog_servicesd.py b/tests/performance/locust_files/catalog_servicesd.py new file mode 100644 index 00000000000..62efb86c1e1 --- /dev/null +++ b/tests/performance/locust_files/catalog_servicesd.py @@ -0,0 +1,60 @@ +# +# SEE https://docs.locust.io/en/stable/quickstart.html +# + +import logging +from time import time + +import faker +from dotenv import load_dotenv +from locust import task +from locust.contrib.fasthttp import FastHttpUser + +logging.basicConfig(level=logging.INFO) + +fake = faker.Faker() + +load_dotenv() # take environment variables from .env + + +class WebApiUser(FastHttpUser): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + self.email = fake.email() + + @task() + def get_services_with_details(self): + start = time() + with self.client.get( + "/v0/services?user_id=1&details=true", + headers={ + "x-simcore-products-name": "osparc", + }, + catch_response=True, + ) as response: + response.raise_for_status() + num_services = len(response.json()) + print(f"got {num_services} WITH DETAILS in {time() - start}s") + response.success() + + @task() + def get_services_without_details(self): + start = time() + with self.client.get( + "/v0/services?user_id=1&details=false", + headers={ + "x-simcore-products-name": "osparc", + }, + catch_response=True, + ) as response: + response.raise_for_status() + num_services = len(response.json()) + print(f"got {num_services} in {time() - start}s") + response.success() + + def on_start(self): + print("Created User ", self.email) + + def on_stop(self): + print("Stopping", self.email) From afde986583a7c6bdbfbfe186bd21631306117815 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Sun, 6 Mar 2022 22:12:18 +0100 Subject: [PATCH 09/41] refactor --- .../src/simcore_service_catalog/db/repositories/services.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/catalog/src/simcore_service_catalog/db/repositories/services.py b/services/catalog/src/simcore_service_catalog/db/repositories/services.py index b309d3e3f9a..5ea179f21b5 100644 --- a/services/catalog/src/simcore_service_catalog/db/repositories/services.py +++ b/services/catalog/src/simcore_service_catalog/db/repositories/services.py @@ -39,8 +39,8 @@ def _make_list_services_query( query = ( sa.select( [services_meta_data], - distinct=[services_meta_data.c.key, services_meta_data.c.version], ) + .distinct(services_meta_data.c.key, services_meta_data.c.version) .select_from(services_meta_data.join(services_access_rights)) .where( and_( From e1bf7e179f5ad62ccb38ef8219fb1c994bde4c95 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Sun, 6 Mar 2022 22:12:41 +0100 Subject: [PATCH 10/41] improve construction of no detail service --- .../models/schemas/services.py | 20 ++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/services/catalog/src/simcore_service_catalog/models/schemas/services.py b/services/catalog/src/simcore_service_catalog/models/schemas/services.py index 99c5bb2788f..5daaed206ee 100644 --- a/services/catalog/src/simcore_service_catalog/models/schemas/services.py +++ b/services/catalog/src/simcore_service_catalog/models/schemas/services.py @@ -1,6 +1,6 @@ from typing import Optional -from models_library.services import ServiceDockerData, ServiceMetaData +from models_library.services import ServiceDockerData, ServiceMetaData, ServiceType from models_library.services_access import ServiceAccessRights from pydantic import EmailStr, Extra from pydantic.main import BaseModel @@ -64,6 +64,10 @@ class ServiceOut( ): # pylint: disable=too-many-ancestors owner: Optional[EmailStr] + @classmethod + def no_detail_service(cls, key, version): + return NoDetailService.copy(update={"key": key, "version": version}) + class Config: extra = Extra.ignore schema_extra = { @@ -106,6 +110,20 @@ class Config: } +NoDetailService = ServiceOut.parse_obj( + { + "key": "simcore/services/dynamic/example", + "version": "0.0.0", + "name": "nodetails", + "description": "nodetails", + "type": ServiceType.COMPUTATIONAL, + "authors": [{"name": "nodetails", "email": "nodetails@nodetails.com"}], + "contact": "nodetails@nodetails.com", + "inputs": {}, + "outputs": {}, + } +) + # TODO: prototype for next iteration # Items are non-detailed version of resources listed class ServiceItem(BaseModel): From de25841833301c7bda37c98bfc67f71f9011f643 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Sun, 6 Mar 2022 22:13:41 +0100 Subject: [PATCH 11/41] no detail service improvement 20% --- .../simcore_service_catalog/api/routes/services.py | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/services/catalog/src/simcore_service_catalog/api/routes/services.py b/services/catalog/src/simcore_service_catalog/api/routes/services.py index 68244a637d2..13f8e7efab7 100644 --- a/services/catalog/src/simcore_service_catalog/api/routes/services.py +++ b/services/catalog/src/simcore_service_catalog/api/routes/services.py @@ -6,7 +6,7 @@ from typing import Any, Dict, List, Optional, Set, Tuple from fastapi import APIRouter, Depends, Header, HTTPException, status -from models_library.services import KEY_RE, VERSION_RE, ServiceType +from models_library.services import KEY_RE, VERSION_RE from models_library.services_db import ServiceAccessRightsAtDB, ServiceMetaDataAtDB from pydantic import ValidationError, constr from pydantic.types import PositiveInt @@ -102,16 +102,9 @@ async def list_services( # only return a stripped down version # FIXME: add name, ddescription, type, etc... services_overview = [ - ServiceOut( + ServiceOut.no_detail_service( key=key, version=version, - name="nodetails", - description="nodetails", - type=ServiceType.COMPUTATIONAL, - authors=[{"name": "nodetails", "email": "nodetails@nodetails.com"}], - contact="nodetails@nodetails.com", - inputs={}, - outputs={}, ) for key, version in services_in_db ] From cc770135764eac007712da0dfa2f0d1f175b6399 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Sun, 6 Mar 2022 22:14:00 +0100 Subject: [PATCH 12/41] TEMP: testing --- .../api/routes/services.py | 47 +++++++++---------- 1 file changed, 23 insertions(+), 24 deletions(-) diff --git a/services/catalog/src/simcore_service_catalog/api/routes/services.py b/services/catalog/src/simcore_service_catalog/api/routes/services.py index 13f8e7efab7..790f3059970 100644 --- a/services/catalog/src/simcore_service_catalog/api/routes/services.py +++ b/services/catalog/src/simcore_service_catalog/api/routes/services.py @@ -16,7 +16,6 @@ from ...db.repositories.services import ServicesRepository from ...models.schemas.services import ServiceOut, ServiceUpdate from ...services.function_services import get_function_service, is_function_service -from ...utils.pools import non_blocking_process_pool_executor from ...utils.requests_decorators import cancellable_request from ..dependencies.database import get_repository from ..dependencies.director import DirectorApi, get_director_api @@ -123,12 +122,13 @@ async def list_services( # getting services from director get_registry_services_task = director_client.get("/services") + services_owner_emails = {} ( - services_in_registry, + # services_in_registry, services_access_rights, services_owner_emails, ) = await asyncio.gather( - get_registry_services_task, + # get_registry_services_task, get_services_access_rights_task, get_services_owner_emails_task, ) @@ -139,27 +139,26 @@ async def list_services( # 3. then we compose the final service using as a base the registry service, overriding with the same # service from the database, adding also the access rights and the owner as email address instead of gid # NOTE: this final step runs in a process pool so that it runs asynchronously and does not block in any way - with non_blocking_process_pool_executor(max_workers=2) as pool: - _target_services = ( - request.app.state.frontend_services_catalog + services_in_registry - ) - services_details = await asyncio.gather( - *[ - asyncio.get_event_loop().run_in_executor( - pool, - _prepare_service_details, - s, - services_in_db[s["key"], s["version"]], - services_access_rights[s["key"], s["version"]], - services_owner_emails.get( - services_in_db[s["key"], s["version"]].owner - ), - ) - for s in _target_services - if (s.get("key"), s.get("version")) in services_in_db - ] - ) - return [s for s in services_details if s is not None] + # with non_blocking_process_pool_executor(max_workers=2) as pool: + # _target_services = ( + # request.app.state.frontend_services_catalog + services_in_registry + # ) + # services_details = await asyncio.gather( + # *[ + # asyncio.get_event_loop().run_in_executor( + # None, + # _prepare_service_details, + # s, + # services_in_db[s["key"], s["version"]], + # services_access_rights[s["key"], s["version"]], + # services_owner_emails.get(services_in_db[s["key"], s["version"]].owner), + # ) + # for s in _target_services + # if (s.get("key"), s.get("version")) in services_in_db + # ] + # ) + # return [s for s in services_details if s is not None] + return [] @router.get( From fbc9f9612768b5bfda7b0054711e2c67b6ad080a Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Sun, 6 Mar 2022 22:35:45 +0100 Subject: [PATCH 13/41] add caching --- .../api/routes/services.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/services/catalog/src/simcore_service_catalog/api/routes/services.py b/services/catalog/src/simcore_service_catalog/api/routes/services.py index 790f3059970..2e306052b3d 100644 --- a/services/catalog/src/simcore_service_catalog/api/routes/services.py +++ b/services/catalog/src/simcore_service_catalog/api/routes/services.py @@ -3,6 +3,8 @@ import asyncio import logging import urllib.parse +from functools import lru_cache +from time import time from typing import Any, Dict, List, Optional, Set, Tuple from fastapi import APIRouter, Depends, Header, HTTPException, status @@ -120,15 +122,24 @@ async def list_services( ) # getting services from director - get_registry_services_task = director_client.get("/services") + # get_registry_services_task = director_client.get("/services") + + def _get_cache_ttl(seconds=5 * 60): + return round(time() / seconds) + + @lru_cache(maxsize=2) + async def cached_registry_services(_cache_ttl): + return await director_client.get("/services") + + get_registry_services_task = cached_registry_services(_get_cache_ttl()) services_owner_emails = {} ( - # services_in_registry, + services_in_registry, services_access_rights, services_owner_emails, ) = await asyncio.gather( - # get_registry_services_task, + get_registry_services_task, get_services_access_rights_task, get_services_owner_emails_task, ) From c11f970b07c8188eb05ecd3f13078699a075b656 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Mon, 7 Mar 2022 09:24:03 +0100 Subject: [PATCH 14/41] small improvement --- .../src/simcore_service_catalog/db/repositories/services.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/services/catalog/src/simcore_service_catalog/db/repositories/services.py b/services/catalog/src/simcore_service_catalog/db/repositories/services.py index 5ea179f21b5..73e40e561c1 100644 --- a/services/catalog/src/simcore_service_catalog/db/repositories/services.py +++ b/services/catalog/src/simcore_service_catalog/db/repositories/services.py @@ -1,6 +1,6 @@ import logging from collections import defaultdict -from typing import Dict, List, Optional, Tuple +from typing import Dict, Iterable, List, Optional, Tuple import sqlalchemy as sa from models_library.services_db import ServiceAccessRightsAtDB, ServiceMetaDataAtDB @@ -255,7 +255,9 @@ async def get_service_access_rights( return services_in_db async def list_services_access_rights( - self, key_versions: List[Tuple[str, str]], product_name: Optional[str] = None + self, + key_versions: Iterable[Tuple[str, str]], + product_name: Optional[str] = None, ) -> Dict[Tuple[str, str], List[ServiceAccessRightsAtDB]]: """Batch version of get_service_access_rights""" service_to_access_rights = defaultdict(list) From f9576c8fde7a6638f44329f7ed651835e8e67fa6 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Mon, 7 Mar 2022 09:24:19 +0100 Subject: [PATCH 15/41] added async lru caching with ttl --- .../api/routes/services.py | 48 ++++++++++++------- 1 file changed, 30 insertions(+), 18 deletions(-) diff --git a/services/catalog/src/simcore_service_catalog/api/routes/services.py b/services/catalog/src/simcore_service_catalog/api/routes/services.py index 2e306052b3d..2421f64dc74 100644 --- a/services/catalog/src/simcore_service_catalog/api/routes/services.py +++ b/services/catalog/src/simcore_service_catalog/api/routes/services.py @@ -3,6 +3,7 @@ import asyncio import logging import urllib.parse +from collections import deque from functools import lru_cache from time import time from typing import Any, Dict, List, Optional, Set, Tuple @@ -12,6 +13,7 @@ from models_library.services_db import ServiceAccessRightsAtDB, ServiceMetaDataAtDB from pydantic import ValidationError, constr from pydantic.types import PositiveInt +from simcore_service_catalog.services.director import MINUTE from starlette.requests import Request from ...db.repositories.groups import GroupsRepository @@ -66,6 +68,18 @@ def _prepare_service_details( return validated_service +def async_lru_cache(*lru_cache_args, **lru_cache_kwargs): + def async_lru_cache_decorator(async_function): + @lru_cache(*lru_cache_args, **lru_cache_kwargs) + def cached_async_function(*args, **kwargs): + coroutine = async_function(*args, **kwargs) + return asyncio.ensure_future(coroutine) + + return cached_async_function + + return async_lru_cache_decorator + + @router.get("", response_model=List[ServiceOut], **RESPONSE_MODEL_POLICY) @cancellable_request async def list_services( @@ -111,39 +125,37 @@ async def list_services( ] return services_overview - # let's get all the services access rights - get_services_access_rights_task = services_repo.list_services_access_rights( - key_versions=list(services_in_db.keys()), product_name=x_simcore_products_name - ) - - # let's get the service owners - get_services_owner_emails_task = groups_repository.list_user_emails_from_gids( - {s.owner for s in services_in_db.values() if s.owner} - ) - # getting services from director # get_registry_services_task = director_client.get("/services") + DIRECTOR_CACHING_TTL = 5 * MINUTE - def _get_cache_ttl(seconds=5 * 60): + def _get_cache_ttl(seconds): return round(time() / seconds) - @lru_cache(maxsize=2) + @async_lru_cache(maxsize=1) async def cached_registry_services(_cache_ttl): return await director_client.get("/services") - get_registry_services_task = cached_registry_services(_get_cache_ttl()) - - services_owner_emails = {} ( services_in_registry, services_access_rights, services_owner_emails, ) = await asyncio.gather( - get_registry_services_task, - get_services_access_rights_task, - get_services_owner_emails_task, + cached_registry_services(_get_cache_ttl(DIRECTOR_CACHING_TTL)), + services_repo.list_services_access_rights( + key_versions=services_in_db, + product_name=x_simcore_products_name, + ), + groups_repository.list_user_emails_from_gids( + {s.owner for s in services_in_db.values() if s.owner} + ), ) + filtered_services = deque( + s + for s in (request.app.state.frontend_services_catalog + services_in_registry) + if (s.get("key"), s.get("version")) in services_in_db + ) # NOTE: for the details of the services: # 1. we get all the services from the director-v0 (TODO: move the registry to the catalog) # 2. we filter the services using the visible ones from the db From 198b07de98b81d476c7f0e8887486715805b8b36 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Mon, 7 Mar 2022 10:21:24 +0100 Subject: [PATCH 16/41] added async-lru --- services/catalog/requirements/_base.in | 1 + services/catalog/requirements/_base.txt | 2 ++ 2 files changed, 3 insertions(+) diff --git a/services/catalog/requirements/_base.in b/services/catalog/requirements/_base.in index 431962463f1..6f35d254ba9 100644 --- a/services/catalog/requirements/_base.in +++ b/services/catalog/requirements/_base.in @@ -28,6 +28,7 @@ sqlalchemy[asyncio] httpx # other +async-lru tenacity packaging pyyaml diff --git a/services/catalog/requirements/_base.txt b/services/catalog/requirements/_base.txt index e02b4cd2bd8..dbcee0f60ee 100644 --- a/services/catalog/requirements/_base.txt +++ b/services/catalog/requirements/_base.txt @@ -20,6 +20,8 @@ anyio==3.5.0 # starlette asgiref==3.4.1 # via uvicorn +async-lru==1.0.2 + # via -r requirements/_base.in asyncpg==0.25.0 # via -r requirements/_base.in certifi==2020.12.5 From d1ffa03a04a146bcb9a7f027f0ce199c7c8d4843 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Mon, 7 Mar 2022 11:30:42 +0100 Subject: [PATCH 17/41] added aiocache --- services/catalog/requirements/_base.in | 2 +- services/catalog/requirements/_base.txt | 19 +++++++++++++++++-- services/catalog/requirements/_test.txt | 1 + 3 files changed, 19 insertions(+), 3 deletions(-) diff --git a/services/catalog/requirements/_base.in b/services/catalog/requirements/_base.in index 6f35d254ba9..9d4caa6ec66 100644 --- a/services/catalog/requirements/_base.in +++ b/services/catalog/requirements/_base.in @@ -28,7 +28,7 @@ sqlalchemy[asyncio] httpx # other -async-lru +aiocache[redis,msgpack] tenacity packaging pyyaml diff --git a/services/catalog/requirements/_base.txt b/services/catalog/requirements/_base.txt index dbcee0f60ee..f1675f94241 100644 --- a/services/catalog/requirements/_base.txt +++ b/services/catalog/requirements/_base.txt @@ -4,6 +4,8 @@ # # pip-compile --output-file=requirements/_base.txt --strip-extras requirements/_base.in # +aiocache==0.11.1 + # via -r requirements/_base.in aiodebug==1.1.2 # via # -c requirements/../../../packages/service-library/requirements/./_base.in @@ -12,6 +14,15 @@ aiofiles==0.5.0 # via # -c requirements/../../../packages/service-library/requirements/./_base.in # -r requirements/../../../packages/service-library/requirements/_base.in +aioredis==1.3.1 + # via + # -c requirements/../../../packages/models-library/requirements/../../../requirements/constraints.txt + # -c requirements/../../../packages/postgres-database/requirements/../../../requirements/constraints.txt + # -c requirements/../../../packages/service-library/requirements/../../../requirements/constraints.txt + # -c requirements/../../../packages/service-library/requirements/./../../../requirements/constraints.txt + # -c requirements/../../../packages/settings-library/requirements/../../../requirements/constraints.txt + # -c requirements/../../../requirements/constraints.txt + # aiocache alembic==1.7.4 # via -r requirements/../../../packages/postgres-database/requirements/_base.in anyio==3.5.0 @@ -20,8 +31,8 @@ anyio==3.5.0 # starlette asgiref==3.4.1 # via uvicorn -async-lru==1.0.2 - # via -r requirements/_base.in +async-timeout==4.0.2 + # via aioredis asyncpg==0.25.0 # via -r requirements/_base.in certifi==2020.12.5 @@ -56,6 +67,8 @@ h11==0.12.0 # via # httpcore # uvicorn +hiredis==2.0.0 + # via aioredis httpcore==0.14.4 # via httpx httptools==0.2.0 @@ -102,6 +115,8 @@ markupsafe==1.1.1 # via # jinja2 # mako +msgpack==1.0.3 + # via aiocache multidict==5.1.0 # via yarl opentracing==2.4.0 diff --git a/services/catalog/requirements/_test.txt b/services/catalog/requirements/_test.txt index 9e6073028e7..f336623fdf6 100644 --- a/services/catalog/requirements/_test.txt +++ b/services/catalog/requirements/_test.txt @@ -24,6 +24,7 @@ astroid==2.9.3 # via pylint async-timeout==4.0.2 # via + # -c requirements/_base.txt # aiohttp # aiopg attrs==21.4.0 From d6c5c457f94a48d4969738ca153f12c184b58637 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Mon, 7 Mar 2022 11:31:11 +0100 Subject: [PATCH 18/41] small optimizations + refactoring --- .../api/routes/services.py | 97 ++++++++++--------- 1 file changed, 49 insertions(+), 48 deletions(-) diff --git a/services/catalog/src/simcore_service_catalog/api/routes/services.py b/services/catalog/src/simcore_service_catalog/api/routes/services.py index 2421f64dc74..6284d263685 100644 --- a/services/catalog/src/simcore_service_catalog/api/routes/services.py +++ b/services/catalog/src/simcore_service_catalog/api/routes/services.py @@ -5,13 +5,13 @@ import urllib.parse from collections import deque from functools import lru_cache -from time import time -from typing import Any, Dict, List, Optional, Set, Tuple +from typing import Any, Deque, Dict, List, Optional, Set, Tuple +from aiocache import cached from fastapi import APIRouter, Depends, Header, HTTPException, status -from models_library.services import KEY_RE, VERSION_RE +from models_library.services import ServiceKey, ServiceType, ServiceVersion from models_library.services_db import ServiceAccessRightsAtDB, ServiceMetaDataAtDB -from pydantic import ValidationError, constr +from pydantic import ValidationError from pydantic.types import PositiveInt from simcore_service_catalog.services.director import MINUTE from starlette.requests import Request @@ -39,6 +39,8 @@ "response_model_exclude_none": False, } +DIRECTOR_CACHING_TTL = 5 * MINUTE + def _prepare_service_details( service_in_registry: Dict[str, Any], @@ -111,37 +113,47 @@ async def list_services( product_name=x_simcore_products_name, ) } - # Non-detailed views from the services_repo database if not details: # only return a stripped down version # FIXME: add name, ddescription, type, etc... + # NOTE: here validation is not necessary since key,version were already validated + # in terms of time, this takes the most services_overview = [ - ServiceOut.no_detail_service( + ServiceOut.construct( key=key, version=version, + name="nodetails", + description="nodetails", + type=ServiceType.COMPUTATIONAL, + authors=[{"name": "nodetails", "email": "nodetails@nodetails.com"}], + contact="nodetails@nodetails.com", + inputs={}, + outputs={}, ) for key, version in services_in_db ] return services_overview - # getting services from director - # get_registry_services_task = director_client.get("/services") - DIRECTOR_CACHING_TTL = 5 * MINUTE - - def _get_cache_ttl(seconds): - return round(time() / seconds) - - @async_lru_cache(maxsize=1) - async def cached_registry_services(_cache_ttl): - return await director_client.get("/services") + # caching this steps brings down the time to generate it at the expense of being sometimes a bit out of date + @cached(ttl=DIRECTOR_CACHING_TTL) + async def cached_registry_services() -> Deque[Dict[str, Any]]: + services_in_registry = await director_client.get("/services") + filtered_services = deque( + s + for s in ( + request.app.state.frontend_services_catalog + services_in_registry + ) + if (s.get("key"), s.get("version")) in services_in_db + ) + return filtered_services ( - services_in_registry, + registry_filtered_services, services_access_rights, services_owner_emails, ) = await asyncio.gather( - cached_registry_services(_get_cache_ttl(DIRECTOR_CACHING_TTL)), + cached_registry_services(), services_repo.list_services_access_rights( key_versions=services_in_db, product_name=x_simcore_products_name, @@ -151,37 +163,26 @@ async def cached_registry_services(_cache_ttl): ), ) - filtered_services = deque( - s - for s in (request.app.state.frontend_services_catalog + services_in_registry) - if (s.get("key"), s.get("version")) in services_in_db - ) # NOTE: for the details of the services: # 1. we get all the services from the director-v0 (TODO: move the registry to the catalog) # 2. we filter the services using the visible ones from the db # 3. then we compose the final service using as a base the registry service, overriding with the same # service from the database, adding also the access rights and the owner as email address instead of gid - # NOTE: this final step runs in a process pool so that it runs asynchronously and does not block in any way - # with non_blocking_process_pool_executor(max_workers=2) as pool: - # _target_services = ( - # request.app.state.frontend_services_catalog + services_in_registry - # ) - # services_details = await asyncio.gather( - # *[ - # asyncio.get_event_loop().run_in_executor( - # None, - # _prepare_service_details, - # s, - # services_in_db[s["key"], s["version"]], - # services_access_rights[s["key"], s["version"]], - # services_owner_emails.get(services_in_db[s["key"], s["version"]].owner), - # ) - # for s in _target_services - # if (s.get("key"), s.get("version")) in services_in_db - # ] - # ) - # return [s for s in services_details if s is not None] - return [] + # NOTE: This step takes the bulk of the time to generate the list + services_details = await asyncio.gather( + *[ + asyncio.get_event_loop().run_in_executor( + None, + _prepare_service_details, + s, + services_in_db[s["key"], s["version"]], + services_access_rights[s["key"], s["version"]], + services_owner_emails.get(services_in_db[s["key"], s["version"]].owner), + ) + for s in registry_filtered_services + ] + ) + return [s for s in services_details if s is not None] @router.get( @@ -191,8 +192,8 @@ async def cached_registry_services(_cache_ttl): ) async def get_service( user_id: int, - service_key: constr(regex=KEY_RE), - service_version: constr(regex=VERSION_RE), + service_key: ServiceKey, + service_version: ServiceVersion, director_client: DirectorApi = Depends(get_director_api), groups_repository: GroupsRepository = Depends(get_repository(GroupsRepository)), services_repo: ServicesRepository = Depends(get_repository(ServicesRepository)), @@ -272,8 +273,8 @@ async def get_service( async def modify_service( # pylint: disable=too-many-arguments user_id: int, - service_key: constr(regex=KEY_RE), - service_version: constr(regex=VERSION_RE), + service_key: ServiceKey, + service_version: ServiceVersion, updated_service: ServiceUpdate, director_client: DirectorApi = Depends(get_director_api), groups_repository: GroupsRepository = Depends(get_repository(GroupsRepository)), From f065c9cdee8daa179f7724f1064d21d3c2a5c1aa Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Mon, 7 Mar 2022 11:31:56 +0100 Subject: [PATCH 19/41] refactor --- .../models/schemas/services.py | 20 +------------------ .../utils/requests_decorators.py | 2 +- 2 files changed, 2 insertions(+), 20 deletions(-) diff --git a/services/catalog/src/simcore_service_catalog/models/schemas/services.py b/services/catalog/src/simcore_service_catalog/models/schemas/services.py index 5daaed206ee..99c5bb2788f 100644 --- a/services/catalog/src/simcore_service_catalog/models/schemas/services.py +++ b/services/catalog/src/simcore_service_catalog/models/schemas/services.py @@ -1,6 +1,6 @@ from typing import Optional -from models_library.services import ServiceDockerData, ServiceMetaData, ServiceType +from models_library.services import ServiceDockerData, ServiceMetaData from models_library.services_access import ServiceAccessRights from pydantic import EmailStr, Extra from pydantic.main import BaseModel @@ -64,10 +64,6 @@ class ServiceOut( ): # pylint: disable=too-many-ancestors owner: Optional[EmailStr] - @classmethod - def no_detail_service(cls, key, version): - return NoDetailService.copy(update={"key": key, "version": version}) - class Config: extra = Extra.ignore schema_extra = { @@ -110,20 +106,6 @@ class Config: } -NoDetailService = ServiceOut.parse_obj( - { - "key": "simcore/services/dynamic/example", - "version": "0.0.0", - "name": "nodetails", - "description": "nodetails", - "type": ServiceType.COMPUTATIONAL, - "authors": [{"name": "nodetails", "email": "nodetails@nodetails.com"}], - "contact": "nodetails@nodetails.com", - "inputs": {}, - "outputs": {}, - } -) - # TODO: prototype for next iteration # Items are non-detailed version of resources listed class ServiceItem(BaseModel): diff --git a/services/catalog/src/simcore_service_catalog/utils/requests_decorators.py b/services/catalog/src/simcore_service_catalog/utils/requests_decorators.py index d35dd88f6a5..c53ead446d6 100644 --- a/services/catalog/src/simcore_service_catalog/utils/requests_decorators.py +++ b/services/catalog/src/simcore_service_catalog/utils/requests_decorators.py @@ -24,7 +24,7 @@ async def _cancel_task_if_client_disconnected( await asyncio.sleep(interval) -def cancellable_request(handler: Callable[[Any], Coroutine[Any, Any, Response]]): +def cancellable_request(handler: Callable[..., Coroutine[Any, Any, Response]]): """this decorator periodically checks if the client disconnected and then will cancel the request and return a 499 code (a la nginx).""" @wraps(handler) From 65ed60189b314dd9a8573f5ef1d887ba61e7d308 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Mon, 7 Mar 2022 11:40:53 +0100 Subject: [PATCH 20/41] small improvements --- .../simcore_service_catalog/api/routes/services.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/services/catalog/src/simcore_service_catalog/api/routes/services.py b/services/catalog/src/simcore_service_catalog/api/routes/services.py index 6284d263685..11439a8dd4c 100644 --- a/services/catalog/src/simcore_service_catalog/api/routes/services.py +++ b/services/catalog/src/simcore_service_catalog/api/routes/services.py @@ -137,10 +137,10 @@ async def list_services( # caching this steps brings down the time to generate it at the expense of being sometimes a bit out of date @cached(ttl=DIRECTOR_CACHING_TTL) - async def cached_registry_services() -> Deque[Dict[str, Any]]: + async def cached_registry_services() -> Deque[Tuple[str, str, Dict[str, Any]]]: services_in_registry = await director_client.get("/services") filtered_services = deque( - s + (s["key"], s["version"], s) for s in ( request.app.state.frontend_services_catalog + services_in_registry ) @@ -174,12 +174,12 @@ async def cached_registry_services() -> Deque[Dict[str, Any]]: asyncio.get_event_loop().run_in_executor( None, _prepare_service_details, - s, - services_in_db[s["key"], s["version"]], - services_access_rights[s["key"], s["version"]], - services_owner_emails.get(services_in_db[s["key"], s["version"]].owner), + details, + services_in_db[key, version], + services_access_rights[key, version], + services_owner_emails.get(services_in_db[key, version].owner), ) - for s in registry_filtered_services + for key, version, details in registry_filtered_services ] ) return [s for s in services_details if s is not None] From 737acdd3c6e9447d9fa9b0d4d8001e7f76b18c05 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Mon, 7 Mar 2022 13:27:19 +0100 Subject: [PATCH 21/41] removed aiopg --- services/catalog/requirements/_test.in | 1 - services/catalog/requirements/_test.txt | 5 ----- 2 files changed, 6 deletions(-) diff --git a/services/catalog/requirements/_test.in b/services/catalog/requirements/_test.in index 29deeda5155..644fe417545 100644 --- a/services/catalog/requirements/_test.in +++ b/services/catalog/requirements/_test.in @@ -26,7 +26,6 @@ respx # migration due to pytest_simcore.postgres_service alembic -aiopg[sa] docker click diff --git a/services/catalog/requirements/_test.txt b/services/catalog/requirements/_test.txt index f336623fdf6..bb18eb03a3c 100644 --- a/services/catalog/requirements/_test.txt +++ b/services/catalog/requirements/_test.txt @@ -8,8 +8,6 @@ aiohttp==3.8.1 # via # -c requirements/../../../requirements/constraints.txt # pytest-aiohttp -aiopg==1.3.3 - # via -r requirements/_test.in aiosignal==1.2.0 # via aiohttp alembic==1.7.4 @@ -26,7 +24,6 @@ async-timeout==4.0.2 # via # -c requirements/_base.txt # aiohttp - # aiopg attrs==21.4.0 # via # aiohttp @@ -164,7 +161,6 @@ pluggy==1.0.0 psycopg2-binary==2.8.6 # via # -c requirements/_base.txt - # aiopg # sqlalchemy ptvsd==4.3.2 # via -r requirements/_test.in @@ -244,7 +240,6 @@ sqlalchemy==1.4.31 # via # -c requirements/../../../requirements/constraints.txt # -c requirements/_base.txt - # aiopg # alembic texttable==1.6.4 # via docker-compose From bc2823d433dbc2b3798b4910eb5463fb91b19f65 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Mon, 7 Mar 2022 14:35:27 +0100 Subject: [PATCH 22/41] removed import --- packages/pytest-simcore/src/pytest_simcore/postgres_service.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/pytest-simcore/src/pytest_simcore/postgres_service.py b/packages/pytest-simcore/src/pytest_simcore/postgres_service.py index 9b4d2a3b1c0..6c5e776d26b 100644 --- a/packages/pytest-simcore/src/pytest_simcore/postgres_service.py +++ b/packages/pytest-simcore/src/pytest_simcore/postgres_service.py @@ -4,7 +4,6 @@ import logging from typing import AsyncIterator, Dict, Iterator, List -import aiopg.sa import pytest import sqlalchemy as sa import tenacity @@ -168,7 +167,7 @@ def postgres_db( @pytest.fixture(scope="function") async def aiopg_engine( postgres_db: sa.engine.Engine, -) -> AsyncIterator[aiopg.sa.engine.Engine]: +) -> AsyncIterator: """An aiopg engine connected to an initialized database""" from aiopg.sa import create_engine From 7a19a74133fe7d9b20b89502dafc43b02a299610 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Mon, 7 Mar 2022 14:35:37 +0100 Subject: [PATCH 23/41] set postgres username doc --- packages/settings-library/src/settings_library/postgres.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/packages/settings-library/src/settings_library/postgres.py b/packages/settings-library/src/settings_library/postgres.py index eda7d1d95ff..2fbaff316b1 100644 --- a/packages/settings-library/src/settings_library/postgres.py +++ b/packages/settings-library/src/settings_library/postgres.py @@ -33,7 +33,12 @@ class PostgresSettings(BaseCustomSettings): POSTGRES_CLIENT_NAME: Optional[str] = Field( None, description="Name of the application connecting the postgres database, will default to use the host hostname (hostname on linux)", - env=["HOST", "HOSTNAME", "POSTGRES_CLIENT_NAME"], + env=[ + "POSTGRES_CLIENT_NAME", + # This is useful when running inside a docker container, then the hostname is set each client gets a different name + "HOST", + "HOSTNAME", + ], ) @validator("POSTGRES_MAXSIZE") From a3fc426ca66e75e858c8bdf2e3c9fb5c18bc02c7 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Mon, 7 Mar 2022 14:35:52 +0100 Subject: [PATCH 24/41] typo --- services/catalog/src/simcore_service_catalog/db/events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/catalog/src/simcore_service_catalog/db/events.py b/services/catalog/src/simcore_service_catalog/db/events.py index d7f20b22d80..a38ccfd7e37 100644 --- a/services/catalog/src/simcore_service_catalog/db/events.py +++ b/services/catalog/src/simcore_service_catalog/db/events.py @@ -32,7 +32,7 @@ async def connect_to_db(app: FastAPI) -> None: ) logger.debug("Connected to %s", cfg.dsn) - logger.debug("Checking db migrationn ...") + logger.debug("Checking db migration...") try: await raise_if_migration_not_ready(engine) except Exception: From b72d44a7bfe0c297ddd265ede149bf501ba390de Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Mon, 7 Mar 2022 14:35:59 +0100 Subject: [PATCH 25/41] fix test env --- services/catalog/tests/unit/with_dbs/conftest.py | 1 + 1 file changed, 1 insertion(+) diff --git a/services/catalog/tests/unit/with_dbs/conftest.py b/services/catalog/tests/unit/with_dbs/conftest.py index f98998875d1..d2dee6630fc 100644 --- a/services/catalog/tests/unit/with_dbs/conftest.py +++ b/services/catalog/tests/unit/with_dbs/conftest.py @@ -37,6 +37,7 @@ def app( ) -> Iterable[FastAPI]: monkeypatch.setenv("CATALOG_TRACING", "null") monkeypatch.setenv("SC_BOOT_MODE", "local-development") + monkeypatch.setenv("POSTGRES_CLIENT_NAME", "pytest_client") app = init_app() yield app From 04a142b4dc4dfbe17b164513429de3bb3d88e164 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Mon, 7 Mar 2022 14:36:08 +0100 Subject: [PATCH 26/41] refactor --- services/catalog/tests/unit/test_services_director.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/services/catalog/tests/unit/test_services_director.py b/services/catalog/tests/unit/test_services_director.py index e365bf75c2b..cb6f458b3ff 100644 --- a/services/catalog/tests/unit/test_services_director.py +++ b/services/catalog/tests/unit/test_services_director.py @@ -4,7 +4,7 @@ # pylint:disable=protected-access -from typing import Dict, Iterable, Iterator +from typing import Dict, Iterator import pytest import respx @@ -32,7 +32,7 @@ def minimal_app( @pytest.fixture() -def client(minimal_app: FastAPI) -> Iterable[TestClient]: +def client(minimal_app: FastAPI) -> Iterator[TestClient]: # NOTE: this way we ensure the events are run in the application # since it starts the app on a test server with TestClient(minimal_app) as client: @@ -40,7 +40,7 @@ def client(minimal_app: FastAPI) -> Iterable[TestClient]: @pytest.fixture -def mocked_director_service_api(minimal_app: FastAPI) -> MockRouter: +def mocked_director_service_api(minimal_app: FastAPI) -> Iterator[MockRouter]: with respx.mock( base_url=minimal_app.state.settings.CATALOG_DIRECTOR.base_url, assert_all_called=False, From fd05b2be2835af280ae345aa55a2d4aff70b584f Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Mon, 7 Mar 2022 14:57:47 +0100 Subject: [PATCH 27/41] list services test goes through --- .../catalog/tests/unit/with_dbs/conftest.py | 34 ++++ .../unit/with_dbs/test_entrypoint_services.py | 153 +----------------- 2 files changed, 40 insertions(+), 147 deletions(-) diff --git a/services/catalog/tests/unit/with_dbs/conftest.py b/services/catalog/tests/unit/with_dbs/conftest.py index d2dee6630fc..2a16807ba7c 100644 --- a/services/catalog/tests/unit/with_dbs/conftest.py +++ b/services/catalog/tests/unit/with_dbs/conftest.py @@ -5,6 +5,7 @@ import itertools import random +from random import randint from typing import Any, AsyncIterator, Callable, Dict, Iterable, Iterator, List, Tuple import pytest @@ -13,9 +14,11 @@ from _pytest.monkeypatch import MonkeyPatch from faker import Faker from fastapi import FastAPI +from pydantic import PositiveInt from pytest_mock.plugin import MockerFixture from respx.router import MockRouter from simcore_postgres_database.models.products import products +from simcore_postgres_database.models.users import UserRole, UserStatus, users from simcore_service_catalog.core.application import init_app from simcore_service_catalog.db.tables import ( groups, @@ -83,6 +86,37 @@ def director_mockup(app: FastAPI) -> Iterator[MockRouter]: # +@pytest.fixture(scope="session") +def user_id() -> PositiveInt: + return randint(1, 10000) + + +@pytest.fixture() +def user_db(postgres_db: sa.engine.Engine, user_id: PositiveInt) -> Iterator[Dict]: + with postgres_db.connect() as con: + # removes all users before continuing + con.execute(users.delete()) + con.execute( + users.insert() + .values( + id=user_id, + name="test user", + email="test@user.com", + password_hash="testhash", + status=UserStatus.ACTIVE, + role=UserRole.USER, + ) + .returning(sa.literal_column("*")) + ) + # this is needed to get the primary_gid correctly + result = con.execute(sa.select([users]).where(users.c.id == user_id)) + user = result.first() + + yield dict(user) + + con.execute(users.delete().where(users.c.id == user_id)) + + @pytest.fixture() async def products_names( sqlalchemy_async_engine: AsyncEngine, diff --git a/services/catalog/tests/unit/with_dbs/test_entrypoint_services.py b/services/catalog/tests/unit/with_dbs/test_entrypoint_services.py index 8eaf3b896a7..efaaca4807c 100644 --- a/services/catalog/tests/unit/with_dbs/test_entrypoint_services.py +++ b/services/catalog/tests/unit/with_dbs/test_entrypoint_services.py @@ -5,22 +5,10 @@ # pylint: disable=unused-argument # pylint: disable=unused-variable -import asyncio -from datetime import datetime -from random import randint -from typing import List, Optional +from typing import Any, Dict, List -import pytest -import simcore_service_catalog.api.dependencies.director from fastapi import FastAPI -from models_library.services import ServiceDockerData, ServiceType -from models_library.services_db import ServiceAccessRightsAtDB -from pydantic.types import PositiveInt from respx.router import MockRouter -from simcore_service_catalog.api.routes import services -from simcore_service_catalog.db.repositories.groups import GroupsRepository -from simcore_service_catalog.models.domain.group import GroupAtDB, GroupType -from simcore_service_catalog.models.schemas.services import ServiceOut from starlette.testclient import TestClient from yarl import URL @@ -32,146 +20,17 @@ ] -@pytest.fixture(scope="session") -def user_id() -> int: - return randint(1, 10000) - - -@pytest.fixture(scope="session") -def user_groups(user_id: int) -> List[GroupAtDB]: - return [ - GroupAtDB( - gid=user_id, - name="my primary group", - description="it is primary", - type=GroupType.PRIMARY, - ), - GroupAtDB( - gid=randint(10001, 15000), - name="all group", - description="it is everyone", - type=GroupType.EVERYONE, - ), - GroupAtDB( - gid=randint(15001, 20000), - name="standard group", - description="it is standard", - type=GroupType.STANDARD, - ), - ] - - -@pytest.fixture(scope="session") -def registry_services() -> List[ServiceDockerData]: - NUMBER_OF_SERVICES = 5 - return [ - ServiceDockerData( - key="simcore/services/comp/my_comp_service", - version=f"{v}.{randint(0,20)}.{randint(0,20)}", - type=ServiceType.COMPUTATIONAL, - name=f"my service {v}", - description="a sleeping service version {v}", - authors=[{"name": "me", "email": "me@myself.com"}], - contact="me.myself@you.com", - inputs=[], - outputs=[], - ) - for v in range(NUMBER_OF_SERVICES) - ] - - -@pytest.fixture(scope="session") -def db_services( - registry_services: List[ServiceOut], user_groups: List[GroupAtDB] -) -> List[ServiceAccessRightsAtDB]: - return [ - ServiceAccessRightsAtDB( - key=s.key, - version=s.version, - gid=user_groups[0].gid, - execute_access=True, - product_name="osparc", - ) - for s in registry_services - ] - - -@pytest.fixture() -async def director_mockup( - monkeypatch, registry_services: List[ServiceOut], app: FastAPI -): - async def return_list_services(user_id: int) -> List[ServiceOut]: - return registry_services - - monkeypatch.setattr(services, "list_services", return_list_services) - - class FakeDirector: - @staticmethod - async def get(url: str): - if url == "/services": - return [s.dict(by_alias=True) for s in registry_services] - if "/service_extras/" in url: - return { - "build_date": f"{datetime.utcnow().isoformat(timespec='seconds')}Z" - } - - def fake_director_api(*args, **kwargs): - return FakeDirector() - - monkeypatch.setattr( - simcore_service_catalog.api.dependencies.director, - "get_director_api", - fake_director_api, - ) - - # Check mock - from simcore_service_catalog.api.dependencies.director import get_director_api - - assert isinstance(get_director_api(), FakeDirector) - yield - - -@pytest.fixture() -async def db_mockup( - monkeypatch, - app: FastAPI, - user_groups: List[GroupAtDB], - db_services: List[ServiceAccessRightsAtDB], -): - async def return_list_user_groups(self, user_id: int) -> List[GroupAtDB]: - return user_groups - - async def return_gid_from_email(*args, **kwargs) -> Optional[PositiveInt]: - return user_groups[0].gid - - monkeypatch.setattr(GroupsRepository, "list_user_groups", return_list_user_groups) - monkeypatch.setattr( - GroupsRepository, "get_user_gid_from_email", return_gid_from_email - ) - - -async def test_director_mockup( - director_mockup: MockRouter, - app: FastAPI, - registry_services: List[ServiceOut], - user_id: int, -): - assert await services.list_services(user_id) == registry_services - - -@pytest.mark.skip( - reason="Not ready, depency injection does not work, using monkeypatch. still issue with setting up database" -) async def test_list_services( director_mockup: MockRouter, - db_mockup: None, app: FastAPI, client: TestClient, user_id: int, + user_db: Dict[str, Any], + products_names: List[str], ): - await asyncio.sleep(10) - url = URL("/v0/services").with_query(user_id=user_id) - response = client.get(str(url)) + response = client.get( + f"{url}", headers={"x-simcore-products-name": products_names[0]} + ) assert response.status_code == 200 data = response.json() From 7669e6e56e5d3a07d32e56825d205d3da7ef907c Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Mon, 7 Mar 2022 15:12:02 +0100 Subject: [PATCH 28/41] fix fixtures --- .../catalog/tests/unit/with_dbs/conftest.py | 13 +++----- .../unit/with_dbs/test_entrypoint_services.py | 33 ++++++++++++++++--- 2 files changed, 33 insertions(+), 13 deletions(-) diff --git a/services/catalog/tests/unit/with_dbs/conftest.py b/services/catalog/tests/unit/with_dbs/conftest.py index 2a16807ba7c..1eb269f6d3e 100644 --- a/services/catalog/tests/unit/with_dbs/conftest.py +++ b/services/catalog/tests/unit/with_dbs/conftest.py @@ -147,13 +147,12 @@ async def products_names( @pytest.fixture() async def user_groups_ids( - sqlalchemy_async_engine: AsyncEngine, + sqlalchemy_async_engine: AsyncEngine, user_db: Dict[str, Any] ) -> AsyncIterator[List[int]]: """Inits groups table and returns group identifiers""" cols = ("gid", "name", "description", "type", "thumbnail", "inclusion_rules") data = [ - (34, "john.smith", "primary group for user", "PRIMARY", None, {}), ( 20001, "Team Black", @@ -164,22 +163,18 @@ async def user_groups_ids( ), ] # pylint: disable=no-value-for-parameter - async with sqlalchemy_async_engine.begin() as conn: for row in data: # NOTE: The 'default' dialect with current database version settings does not support in-place multirow inserts - stmt = groups.insert().values(**dict(zip(cols, row))) - await conn.execute(stmt) + await conn.execute(groups.insert().values(**dict(zip(cols, row)))) - gids = [ - 1, - ] + [items[0] for items in data] + gids = [1, user_db["primary_gid"]] + [items[0] for items in data] yield gids async with sqlalchemy_async_engine.begin() as conn: await conn.execute(services_meta_data.delete()) - await conn.execute(groups.delete().where(groups.c.gid.in_(gids[1:]))) + await conn.execute(groups.delete().where(groups.c.gid.in_(gids[2:]))) @pytest.fixture() diff --git a/services/catalog/tests/unit/with_dbs/test_entrypoint_services.py b/services/catalog/tests/unit/with_dbs/test_entrypoint_services.py index efaaca4807c..f218a12d1b9 100644 --- a/services/catalog/tests/unit/with_dbs/test_entrypoint_services.py +++ b/services/catalog/tests/unit/with_dbs/test_entrypoint_services.py @@ -5,7 +5,7 @@ # pylint: disable=unused-argument # pylint: disable=unused-variable -from typing import Any, Dict, List +from typing import Any, Callable, Dict, List from fastapi import FastAPI from respx.router import MockRouter @@ -27,10 +27,35 @@ async def test_list_services( user_id: int, user_db: Dict[str, Any], products_names: List[str], + service_catalog_faker: Callable, + services_db_tables_injector: Callable, ): - url = URL("/v0/services").with_query(user_id=user_id) - response = client.get( - f"{url}", headers={"x-simcore-products-name": products_names[0]} + target_product = products_names[-1] + # injects fake data in db + await services_db_tables_injector( + [ + service_catalog_faker( + "simcore/services/dynamic/jupyterlab", + "1.0.0", + team_access=None, + everyone_access=None, + product=target_product, + ), + service_catalog_faker( + "simcore/services/dynamic/jupyterlab", + "1.0.2", + team_access="x", + everyone_access=None, + product=target_product, + ), + ] ) + + url = URL("/v0/services").with_query(user_id=user_id) + response = client.get(f"{url}", headers={"x-simcore-products-name": target_product}) assert response.status_code == 200 data = response.json() + assert len(data) == 2 + import pdb + + pdb.set_trace() From 41507f244d2620aec50f7e6acd282b05cb614aaf Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Mon, 7 Mar 2022 16:01:20 +0100 Subject: [PATCH 29/41] fix testing of services list --- .../simcore_service_catalog/core/events.py | 1 + .../src/simcore_service_catalog/db/events.py | 1 + .../services/director.py | 4 +- .../catalog/tests/unit/with_dbs/conftest.py | 21 ++- .../unit/with_dbs/test_entrypoint_services.py | 122 ++++++++++++++++-- 5 files changed, 129 insertions(+), 20 deletions(-) diff --git a/services/catalog/src/simcore_service_catalog/core/events.py b/services/catalog/src/simcore_service_catalog/core/events.py index 04992589404..74b88d094da 100644 --- a/services/catalog/src/simcore_service_catalog/core/events.py +++ b/services/catalog/src/simcore_service_catalog/core/events.py @@ -57,6 +57,7 @@ async def start_app() -> None: # FIXME: check director service is in place and ready. Hand-shake?? # SEE https://github.com/ITISFoundation/osparc-simcore/issues/1728 await start_registry_sync_task(app) + ... if app.state.settings.CATALOG_TRACING: setup_tracing(app, app.state.settings.CATALOG_TRACING) diff --git a/services/catalog/src/simcore_service_catalog/db/events.py b/services/catalog/src/simcore_service_catalog/db/events.py index a38ccfd7e37..cd86fbb8d62 100644 --- a/services/catalog/src/simcore_service_catalog/db/events.py +++ b/services/catalog/src/simcore_service_catalog/db/events.py @@ -29,6 +29,7 @@ async def connect_to_db(app: FastAPI) -> None: connect_args={ "server_settings": {"application_name": cfg.POSTGRES_CLIENT_NAME} }, + echo=True, ) logger.debug("Connected to %s", cfg.dsn) diff --git a/services/catalog/src/simcore_service_catalog/services/director.py b/services/catalog/src/simcore_service_catalog/services/director.py index 95e1e665605..9f47721290a 100644 --- a/services/catalog/src/simcore_service_catalog/services/director.py +++ b/services/catalog/src/simcore_service_catalog/services/director.py @@ -48,8 +48,8 @@ async def setup_director(app: FastAPI) -> None: async def close_director(app: FastAPI) -> None: - if director_api := app.state.director_api: - await director_api.delete() + if director_client := app.state.director_api: + await director_client.delete() logger.debug("Director client closed successfully") diff --git a/services/catalog/tests/unit/with_dbs/conftest.py b/services/catalog/tests/unit/with_dbs/conftest.py index 1eb269f6d3e..fa0ae624fc1 100644 --- a/services/catalog/tests/unit/with_dbs/conftest.py +++ b/services/catalog/tests/unit/with_dbs/conftest.py @@ -16,7 +16,6 @@ from fastapi import FastAPI from pydantic import PositiveInt from pytest_mock.plugin import MockerFixture -from respx.router import MockRouter from simcore_postgres_database.models.products import products from simcore_postgres_database.models.users import UserRole, UserStatus, users from simcore_service_catalog.core.application import init_app @@ -53,8 +52,7 @@ def client(app: FastAPI) -> Iterator[TestClient]: @pytest.fixture() -def director_mockup(app: FastAPI) -> Iterator[MockRouter]: - +def director_mockup(app: FastAPI) -> Iterator: with respx.mock( base_url=app.state.settings.CATALOG_DIRECTOR.base_url, assert_all_called=False, @@ -64,7 +62,7 @@ def director_mockup(app: FastAPI) -> Iterator[MockRouter]: respx_mock.get("/services", name="list_services").respond( 200, json={"data": []} ) - yield respx_mock + yield # DATABASE tables fixtures ----------------------------------- @@ -327,3 +325,18 @@ def _fake_factory( return tuple(fakes) return _fake_factory + + +@pytest.fixture +def mock_catalog_background_task(mocker: MockerFixture): + """patch the setup of the background task so we can call it manually""" + mocker.patch( + "simcore_service_catalog.core.events.start_registry_sync_task", + return_value=None, + autospec=True, + ) + mocker.patch( + "simcore_service_catalog.core.events.stop_registry_sync_task", + return_value=None, + autospec=True, + ) diff --git a/services/catalog/tests/unit/with_dbs/test_entrypoint_services.py b/services/catalog/tests/unit/with_dbs/test_entrypoint_services.py index f218a12d1b9..1c713e58c2d 100644 --- a/services/catalog/tests/unit/with_dbs/test_entrypoint_services.py +++ b/services/catalog/tests/unit/with_dbs/test_entrypoint_services.py @@ -5,9 +5,8 @@ # pylint: disable=unused-argument # pylint: disable=unused-variable -from typing import Any, Callable, Dict, List +from typing import Callable, List -from fastapi import FastAPI from respx.router import MockRouter from starlette.testclient import TestClient from yarl import URL @@ -20,42 +19,137 @@ ] -async def test_list_services( +async def test_list_services_without_details( + mock_catalog_background_task, director_mockup: MockRouter, - app: FastAPI, client: TestClient, user_id: int, - user_db: Dict[str, Any], products_names: List[str], service_catalog_faker: Callable, services_db_tables_injector: Callable, ): target_product = products_names[-1] # injects fake data in db + NUM_SERVICES = 1000 await services_db_tables_injector( [ service_catalog_faker( "simcore/services/dynamic/jupyterlab", - "1.0.0", + f"1.0.{s}", team_access=None, everyone_access=None, product=target_product, - ), + ) + for s in range(NUM_SERVICES) + ] + ) + + url = URL("/v0/services").with_query({"user_id": user_id, "details": "false"}) + response = client.get(f"{url}", headers={"x-simcore-products-name": target_product}) + assert response.status_code == 200 + data = response.json() + assert len(data) == NUM_SERVICES + + +async def test_list_services_without_details_with_wrong_user_id_returns_403( + mock_catalog_background_task, + director_mockup: MockRouter, + client: TestClient, + user_id: int, + products_names: List[str], + service_catalog_faker: Callable, + services_db_tables_injector: Callable, +): + target_product = products_names[-1] + # injects fake data in db + NUM_SERVICES = 1 + await services_db_tables_injector( + [ service_catalog_faker( "simcore/services/dynamic/jupyterlab", - "1.0.2", - team_access="x", + f"1.0.{s}", + team_access=None, everyone_access=None, product=target_product, - ), + ) + for s in range(NUM_SERVICES) ] ) - url = URL("/v0/services").with_query(user_id=user_id) + url = URL("/v0/services").with_query({"user_id": user_id + 1, "details": "false"}) response = client.get(f"{url}", headers={"x-simcore-products-name": target_product}) + assert response.status_code == 403 + + +async def test_list_services_without_details_with_another_product_returns_other_services( + mock_catalog_background_task, + director_mockup: MockRouter, + client: TestClient, + user_id: int, + products_names: List[str], + service_catalog_faker: Callable, + services_db_tables_injector: Callable, +): + target_product = products_names[-1] + assert ( + len(products_names) > 1 + ), "please adjust the fixture to have the right number of products" + # injects fake data in db + NUM_SERVICES = 15 + await services_db_tables_injector( + [ + service_catalog_faker( + "simcore/services/dynamic/jupyterlab", + f"1.0.{s}", + team_access=None, + everyone_access=None, + product=target_product, + ) + for s in range(NUM_SERVICES) + ] + ) + + url = URL("/v0/services").with_query({"user_id": user_id, "details": "false"}) + response = client.get( + f"{url}", headers={"x-simcore-products-name": products_names[0]} + ) assert response.status_code == 200 data = response.json() - assert len(data) == 2 - import pdb + assert len(data) == 0 + + +async def test_list_services_without_details_with_wrong_product_returns_0_service( + mock_catalog_background_task, + director_mockup: MockRouter, + client: TestClient, + user_id: int, + products_names: List[str], + service_catalog_faker: Callable, + services_db_tables_injector: Callable, +): + target_product = products_names[-1] + assert ( + len(products_names) > 1 + ), "please adjust the fixture to have the right number of products" + # injects fake data in db + NUM_SERVICES = 1 + await services_db_tables_injector( + [ + service_catalog_faker( + "simcore/services/dynamic/jupyterlab", + f"1.0.{s}", + team_access=None, + everyone_access=None, + product=target_product, + ) + for s in range(NUM_SERVICES) + ] + ) - pdb.set_trace() + url = URL("/v0/services").with_query({"user_id": user_id, "details": "false"}) + response = client.get( + f"{url}", headers={"x-simcore-products-name": "no valid product"} + ) + assert response.status_code == 200 + data = response.json() + assert len(data) == 0 From 0456d6751234348081e271d42fc8a21b5125a70d Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Mon, 7 Mar 2022 16:50:46 +0100 Subject: [PATCH 30/41] list services with details tested --- .../catalog/tests/unit/with_dbs/conftest.py | 4 +- .../unit/with_dbs/test_entrypoint_services.py | 77 ++++++++++++++++++- 2 files changed, 78 insertions(+), 3 deletions(-) diff --git a/services/catalog/tests/unit/with_dbs/conftest.py b/services/catalog/tests/unit/with_dbs/conftest.py index fa0ae624fc1..995cbf28001 100644 --- a/services/catalog/tests/unit/with_dbs/conftest.py +++ b/services/catalog/tests/unit/with_dbs/conftest.py @@ -52,7 +52,7 @@ def client(app: FastAPI) -> Iterator[TestClient]: @pytest.fixture() -def director_mockup(app: FastAPI) -> Iterator: +def director_mockup(app: FastAPI) -> Iterator[respx.MockRouter]: with respx.mock( base_url=app.state.settings.CATALOG_DIRECTOR.base_url, assert_all_called=False, @@ -62,7 +62,7 @@ def director_mockup(app: FastAPI) -> Iterator: respx_mock.get("/services", name="list_services").respond( 200, json={"data": []} ) - yield + yield respx_mock # DATABASE tables fixtures ----------------------------------- diff --git a/services/catalog/tests/unit/with_dbs/test_entrypoint_services.py b/services/catalog/tests/unit/with_dbs/test_entrypoint_services.py index 1c713e58c2d..be0fd0581fe 100644 --- a/services/catalog/tests/unit/with_dbs/test_entrypoint_services.py +++ b/services/catalog/tests/unit/with_dbs/test_entrypoint_services.py @@ -5,8 +5,14 @@ # pylint: disable=unused-argument # pylint: disable=unused-variable +import re from typing import Callable, List +import httpx +import pytest +import respx +from fastapi import FastAPI +from models_library.services import ServiceDockerData from respx.router import MockRouter from starlette.testclient import TestClient from yarl import URL @@ -19,6 +25,68 @@ ] +@pytest.fixture +def mock_director_services( + director_mockup: respx.MockRouter, app: FastAPI +) -> MockRouter: + mock_route = director_mockup.get( + f"{app.state.settings.CATALOG_DIRECTOR.base_url}/services", + name="list_services", + ).respond(200, json={"data": ["blahblah"]}) + response = httpx.get(f"{app.state.settings.CATALOG_DIRECTOR.base_url}/services") + assert mock_route.called + assert response.json() == {"data": ["blahblah"]} + return director_mockup + + +async def test_list_services_with_details( + mock_catalog_background_task, + director_mockup: MockRouter, + client: TestClient, + user_id: int, + products_names: List[str], + service_catalog_faker: Callable, + services_db_tables_injector: Callable, +): + target_product = products_names[-1] + # create some fake services + NUM_SERVICES = 1000 + fake_services = [ + service_catalog_faker( + "simcore/services/dynamic/jupyterlab", + f"1.0.{s}", + team_access=None, + everyone_access=None, + product=target_product, + ) + for s in range(NUM_SERVICES) + ] + # injects fake data in db + await services_db_tables_injector(fake_services) + + url = URL("/v0/services").with_query({"user_id": user_id, "details": "true"}) + + # now fake the director such that it returns half the services + fake_registry_service_data = ServiceDockerData.Config.schema_extra["examples"][0] + + director_mockup.get("/services", name="list_services").respond( + 200, + json={ + "data": [ + { + **fake_registry_service_data, + **{"key": s[0]["key"], "version": s[0]["version"]}, + } + for s in fake_services[::2] + ] + }, + ) + response = client.get(f"{url}", headers={"x-simcore-products-name": target_product}) + assert response.status_code == 200 + data = response.json() + assert len(data) == round(NUM_SERVICES / 2) + + async def test_list_services_without_details( mock_catalog_background_task, director_mockup: MockRouter, @@ -31,10 +99,11 @@ async def test_list_services_without_details( target_product = products_names[-1] # injects fake data in db NUM_SERVICES = 1000 + SERVICE_KEY = "simcore/services/dynamic/jupyterlab" await services_db_tables_injector( [ service_catalog_faker( - "simcore/services/dynamic/jupyterlab", + SERVICE_KEY, f"1.0.{s}", team_access=None, everyone_access=None, @@ -49,6 +118,12 @@ async def test_list_services_without_details( assert response.status_code == 200 data = response.json() assert len(data) == NUM_SERVICES + for service in data: + assert service["key"] == SERVICE_KEY + assert re.match("1.0.[0-9]+", service["version"]) is not None + assert service["name"] == "nodetails" + assert service["description"] == "nodetails" + assert service["contact"] == "nodetails@nodetails.com" async def test_list_services_without_details_with_wrong_user_id_returns_403( From e40b23e3ea122b5118946fba31e79b8313cea0aa Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Mon, 7 Mar 2022 17:06:59 +0100 Subject: [PATCH 31/41] added pytest-benchmark --- services/catalog/requirements/_test.in | 3 ++- services/catalog/requirements/_test.txt | 5 +++++ .../tests/unit/with_dbs/test_entrypoint_services.py | 12 ++++++++++-- 3 files changed, 17 insertions(+), 3 deletions(-) diff --git a/services/catalog/requirements/_test.in b/services/catalog/requirements/_test.in index 644fe417545..b8e5d0adde8 100644 --- a/services/catalog/requirements/_test.in +++ b/services/catalog/requirements/_test.in @@ -14,7 +14,8 @@ jsonschema # testing pytest -pytest-aiohttp # incompatible with pytest-asyncio. See https://github.com/pytest-dev/pytest-asyncio/issues/76 +pytest-aiohttp +pytest-benchmark pytest-cov pytest-mock pytest-runner diff --git a/services/catalog/requirements/_test.txt b/services/catalog/requirements/_test.txt index bb18eb03a3c..9e2e76c9807 100644 --- a/services/catalog/requirements/_test.txt +++ b/services/catalog/requirements/_test.txt @@ -166,6 +166,8 @@ ptvsd==4.3.2 # via -r requirements/_test.in py==1.11.0 # via pytest +py-cpuinfo==8.0.0 + # via pytest-benchmark pycparser==2.21 # via cffi pylint==2.12.2 @@ -183,6 +185,7 @@ pytest==6.2.5 # -r requirements/_test.in # pytest-aiohttp # pytest-asyncio + # pytest-benchmark # pytest-cov # pytest-docker # pytest-mock @@ -190,6 +193,8 @@ pytest-aiohttp==1.0.4 # via -r requirements/_test.in pytest-asyncio==0.18.1 # via pytest-aiohttp +pytest-benchmark==3.4.1 + # via -r requirements/_test.in pytest-cov==3.0.0 # via -r requirements/_test.in pytest-docker==0.10.3 diff --git a/services/catalog/tests/unit/with_dbs/test_entrypoint_services.py b/services/catalog/tests/unit/with_dbs/test_entrypoint_services.py index be0fd0581fe..a51aabd07fa 100644 --- a/services/catalog/tests/unit/with_dbs/test_entrypoint_services.py +++ b/services/catalog/tests/unit/with_dbs/test_entrypoint_services.py @@ -47,6 +47,7 @@ async def test_list_services_with_details( products_names: List[str], service_catalog_faker: Callable, services_db_tables_injector: Callable, + benchmark, ): target_product = products_names[-1] # create some fake services @@ -81,7 +82,11 @@ async def test_list_services_with_details( ] }, ) - response = client.get(f"{url}", headers={"x-simcore-products-name": target_product}) + + response = benchmark( + client.get, f"{url}", headers={"x-simcore-products-name": target_product} + ) + assert response.status_code == 200 data = response.json() assert len(data) == round(NUM_SERVICES / 2) @@ -95,6 +100,7 @@ async def test_list_services_without_details( products_names: List[str], service_catalog_faker: Callable, services_db_tables_injector: Callable, + benchmark, ): target_product = products_names[-1] # injects fake data in db @@ -114,7 +120,9 @@ async def test_list_services_without_details( ) url = URL("/v0/services").with_query({"user_id": user_id, "details": "false"}) - response = client.get(f"{url}", headers={"x-simcore-products-name": target_product}) + response = benchmark( + client.get, f"{url}", headers={"x-simcore-products-name": target_product} + ) assert response.status_code == 200 data = response.json() assert len(data) == NUM_SERVICES From 1e2a083bbafee5b4955b5ed205e608f5837c7acb Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Mon, 7 Mar 2022 17:14:05 +0100 Subject: [PATCH 32/41] refactor --- .../settings-library/src/settings_library/postgres.py | 5 +++++ .../catalog/src/simcore_service_catalog/db/events.py | 11 +++++------ 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/packages/settings-library/src/settings_library/postgres.py b/packages/settings-library/src/settings_library/postgres.py index 2fbaff316b1..fc49d501f0b 100644 --- a/packages/settings-library/src/settings_library/postgres.py +++ b/packages/settings-library/src/settings_library/postgres.py @@ -62,6 +62,11 @@ def dsn(self) -> str: ) return dsn + @cached_property + def dsn_with_async_sqlalchemy(self) -> str: + dsn = self.dsn.replace("postgresql", "postgresql+asyncpg") + return dsn + @cached_property def dsn_with_query(self) -> str: """Some clients do not support queries in the dsn""" diff --git a/services/catalog/src/simcore_service_catalog/db/events.py b/services/catalog/src/simcore_service_catalog/db/events.py index cd86fbb8d62..fe77af11f74 100644 --- a/services/catalog/src/simcore_service_catalog/db/events.py +++ b/services/catalog/src/simcore_service_catalog/db/events.py @@ -19,11 +19,9 @@ async def connect_to_db(app: FastAPI) -> None: logger.debug("Connecting db ...") cfg: PostgresSettings = app.state.settings.CATALOG_POSTGRES - logger.debug(cfg.dsn) - modified_dsn = cfg.dsn.replace("postgresql", "postgresql+asyncpg") - logger.debug(modified_dsn) + engine: AsyncEngine = create_async_engine( - modified_dsn, + cfg.dsn_with_async_sqlalchemy, pool_size=cfg.POSTGRES_MINSIZE, max_overflow=cfg.POSTGRES_MAXSIZE - cfg.POSTGRES_MINSIZE, connect_args={ @@ -31,7 +29,8 @@ async def connect_to_db(app: FastAPI) -> None: }, echo=True, ) - logger.debug("Connected to %s", cfg.dsn) + + logger.debug("Connected to %s", engine.url) # pylint: disable=no-member logger.debug("Checking db migration...") try: @@ -56,4 +55,4 @@ async def close_db_connection(app: FastAPI) -> None: if engine := app.state.engine: await close_engine(engine) - logger.debug("Disconnected from %s", engine.url) + logger.debug("Disconnected from %s", engine.url) # pylint: disable=no-member From 968e2bc5c6268d2590f20cf15fc5c8091a3f72fc Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Mon, 7 Mar 2022 17:52:24 +0100 Subject: [PATCH 33/41] remove echoing --- services/catalog/src/simcore_service_catalog/db/events.py | 1 - 1 file changed, 1 deletion(-) diff --git a/services/catalog/src/simcore_service_catalog/db/events.py b/services/catalog/src/simcore_service_catalog/db/events.py index fe77af11f74..37d411307c2 100644 --- a/services/catalog/src/simcore_service_catalog/db/events.py +++ b/services/catalog/src/simcore_service_catalog/db/events.py @@ -27,7 +27,6 @@ async def connect_to_db(app: FastAPI) -> None: connect_args={ "server_settings": {"application_name": cfg.POSTGRES_CLIENT_NAME} }, - echo=True, ) logger.debug("Connected to %s", engine.url) # pylint: disable=no-member From 4142e2a46f4819a93074cf0824516f1ce5537c11 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Tue, 8 Mar 2022 14:25:23 +0100 Subject: [PATCH 34/41] cleanup --- .../simcore_service_catalog/api/routes/services.py | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/services/catalog/src/simcore_service_catalog/api/routes/services.py b/services/catalog/src/simcore_service_catalog/api/routes/services.py index 11439a8dd4c..257d073b0a0 100644 --- a/services/catalog/src/simcore_service_catalog/api/routes/services.py +++ b/services/catalog/src/simcore_service_catalog/api/routes/services.py @@ -4,7 +4,6 @@ import logging import urllib.parse from collections import deque -from functools import lru_cache from typing import Any, Deque, Dict, List, Optional, Set, Tuple from aiocache import cached @@ -70,18 +69,6 @@ def _prepare_service_details( return validated_service -def async_lru_cache(*lru_cache_args, **lru_cache_kwargs): - def async_lru_cache_decorator(async_function): - @lru_cache(*lru_cache_args, **lru_cache_kwargs) - def cached_async_function(*args, **kwargs): - coroutine = async_function(*args, **kwargs) - return asyncio.ensure_future(coroutine) - - return cached_async_function - - return async_lru_cache_decorator - - @router.get("", response_model=List[ServiceOut], **RESPONSE_MODEL_POLICY) @cancellable_request async def list_services( From d229e5b052b8025f05edc42f31047e174fa69fa2 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Tue, 8 Mar 2022 21:13:55 +0100 Subject: [PATCH 35/41] @pcrespov review: avoid sql injection --- .../utils_aiosqlalchemy.py | 25 ++----------------- .../src/simcore_service_catalog/db/events.py | 2 +- 2 files changed, 3 insertions(+), 24 deletions(-) diff --git a/packages/postgres-database/src/simcore_postgres_database/utils_aiosqlalchemy.py b/packages/postgres-database/src/simcore_postgres_database/utils_aiosqlalchemy.py index 200a8237a4d..e8ed22a3280 100644 --- a/packages/postgres-database/src/simcore_postgres_database/utils_aiosqlalchemy.py +++ b/packages/postgres-database/src/simcore_postgres_database/utils_aiosqlalchemy.py @@ -1,4 +1,4 @@ -from typing import Any, Dict, List +from typing import Any, Dict import sqlalchemy as sa from sqlalchemy.ext.asyncio import AsyncEngine @@ -6,29 +6,8 @@ from .utils_migration import get_current_head -async def _get_connections(engine, db_name: str) -> List[Dict]: - """Return information about connections""" - sql = sa.DDL( - f""" - SELECT - pid, - state - FROM pg_stat_activity - WHERE datname = '{db_name}' - AND query NOT LIKE '%%FROM pg_stat_activity%%' - """ - ) - async with engine.connect() as conn: - result = await conn.execute(sql) - - connections = [{"pid": r[0], "state": r[1]} for r in result.fetchall()] - - return connections - - -async def get_pg_engine_stateinfo(engine: AsyncEngine, db_name: str) -> Dict[str, Any]: +async def get_pg_engine_stateinfo(engine: AsyncEngine) -> Dict[str, Any]: return { - "pgserver stats": f"{await _get_connections(engine, db_name)}", "current pool connections": f"{engine.pool.checkedin()=},{engine.pool.checkedout()=}", } diff --git a/services/catalog/src/simcore_service_catalog/db/events.py b/services/catalog/src/simcore_service_catalog/db/events.py index 37d411307c2..b1a929df714 100644 --- a/services/catalog/src/simcore_service_catalog/db/events.py +++ b/services/catalog/src/simcore_service_catalog/db/events.py @@ -44,7 +44,7 @@ async def connect_to_db(app: FastAPI) -> None: logger.debug( "Setup engine: %s", - await get_pg_engine_stateinfo(engine, cfg.POSTGRES_DB), + await get_pg_engine_stateinfo(engine), ) From b269305650deaa9a83e6844502b4011c38843579 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Tue, 8 Mar 2022 21:15:46 +0100 Subject: [PATCH 36/41] @pcrespov review: removed too simple function --- .../src/simcore_postgres_database/utils_aiosqlalchemy.py | 4 ---- services/catalog/src/simcore_service_catalog/db/events.py | 5 ++--- 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/packages/postgres-database/src/simcore_postgres_database/utils_aiosqlalchemy.py b/packages/postgres-database/src/simcore_postgres_database/utils_aiosqlalchemy.py index e8ed22a3280..02db3fc9c10 100644 --- a/packages/postgres-database/src/simcore_postgres_database/utils_aiosqlalchemy.py +++ b/packages/postgres-database/src/simcore_postgres_database/utils_aiosqlalchemy.py @@ -12,10 +12,6 @@ async def get_pg_engine_stateinfo(engine: AsyncEngine) -> Dict[str, Any]: } -async def close_engine(engine: AsyncEngine) -> None: - await engine.dispose() - - class DBMigrationError(RuntimeError): pass diff --git a/services/catalog/src/simcore_service_catalog/db/events.py b/services/catalog/src/simcore_service_catalog/db/events.py index b1a929df714..af617015e7b 100644 --- a/services/catalog/src/simcore_service_catalog/db/events.py +++ b/services/catalog/src/simcore_service_catalog/db/events.py @@ -3,7 +3,6 @@ from fastapi import FastAPI from servicelib.retry_policies import PostgresRetryPolicyUponInitialization from simcore_postgres_database.utils_aiosqlalchemy import ( - close_engine, get_pg_engine_stateinfo, raise_if_migration_not_ready, ) @@ -36,7 +35,7 @@ async def connect_to_db(app: FastAPI) -> None: await raise_if_migration_not_ready(engine) except Exception: # NOTE: engine must be closed because retry will create a new engine - await close_engine(engine) + await engine.dispose() raise logger.debug("Migration up-to-date") @@ -52,6 +51,6 @@ async def close_db_connection(app: FastAPI) -> None: logger.debug("Disconnecting db ...") if engine := app.state.engine: - await close_engine(engine) + await engine.dispose() logger.debug("Disconnected from %s", engine.url) # pylint: disable=no-member From 041faff7f237aea35c5a77d8d2338ceb6b561493 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Tue, 8 Mar 2022 21:17:04 +0100 Subject: [PATCH 37/41] @pcrespov review: remove comment --- .../src/simcore_postgres_database/utils_aiosqlalchemy.py | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/postgres-database/src/simcore_postgres_database/utils_aiosqlalchemy.py b/packages/postgres-database/src/simcore_postgres_database/utils_aiosqlalchemy.py index 02db3fc9c10..2e7634dea28 100644 --- a/packages/postgres-database/src/simcore_postgres_database/utils_aiosqlalchemy.py +++ b/packages/postgres-database/src/simcore_postgres_database/utils_aiosqlalchemy.py @@ -20,7 +20,6 @@ async def raise_if_migration_not_ready(engine: AsyncEngine): """Ensures db migration is complete :raises DBMigrationError - :raises """ async with engine.connect() as conn: version_num = await conn.scalar( From cc08de365774e7c825059bbd1f6ecbd65f1aa538 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Tue, 8 Mar 2022 21:17:50 +0100 Subject: [PATCH 38/41] @pcrespov review: removed temp --- packages/pytest-simcore/src/pytest_simcore/postgres_service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/pytest-simcore/src/pytest_simcore/postgres_service.py b/packages/pytest-simcore/src/pytest_simcore/postgres_service.py index 6c5e776d26b..9fce95293a5 100644 --- a/packages/pytest-simcore/src/pytest_simcore/postgres_service.py +++ b/packages/pytest-simcore/src/pytest_simcore/postgres_service.py @@ -184,7 +184,7 @@ async def aiopg_engine( async def sqlalchemy_async_engine( postgres_db: sa.engine.Engine, ) -> AsyncIterator: - # NOTE: prevent having to import this if latest sqlalchemy not installed (temp) + # NOTE: prevent having to import this if latest sqlalchemy not installed from sqlalchemy.ext.asyncio import create_async_engine engine = create_async_engine( From b5b209b6dca518de6b4829f1d19935d4955aa2e6 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Tue, 8 Mar 2022 21:19:25 +0100 Subject: [PATCH 39/41] renamed locustfile towards good naming --- .../locust_files/catalog_services.py | 52 +++++++++------- .../locust_files/catalog_servicesd.py | 60 ------------------- .../locust_files/webserver_services.py | 51 ++++++++++++++++ 3 files changed, 81 insertions(+), 82 deletions(-) delete mode 100644 tests/performance/locust_files/catalog_servicesd.py create mode 100644 tests/performance/locust_files/webserver_services.py diff --git a/tests/performance/locust_files/catalog_services.py b/tests/performance/locust_files/catalog_services.py index 0a44dc63748..62efb86c1e1 100644 --- a/tests/performance/locust_files/catalog_services.py +++ b/tests/performance/locust_files/catalog_services.py @@ -3,7 +3,7 @@ # import logging -import os +from time import time import faker from dotenv import load_dotenv @@ -23,30 +23,38 @@ def __init__(self, *args, **kwargs): self.email = fake.email() - # @task - # def health_check(self): - # self.client.get("/v0/health") - - @task(weight=5) - def get_services(self): - self.client.get( - f"/v0/catalog/services", - ) + @task() + def get_services_with_details(self): + start = time() + with self.client.get( + "/v0/services?user_id=1&details=true", + headers={ + "x-simcore-products-name": "osparc", + }, + catch_response=True, + ) as response: + response.raise_for_status() + num_services = len(response.json()) + print(f"got {num_services} WITH DETAILS in {time() - start}s") + response.success() + + @task() + def get_services_without_details(self): + start = time() + with self.client.get( + "/v0/services?user_id=1&details=false", + headers={ + "x-simcore-products-name": "osparc", + }, + catch_response=True, + ) as response: + response.raise_for_status() + num_services = len(response.json()) + print(f"got {num_services} in {time() - start}s") + response.success() def on_start(self): print("Created User ", self.email) - self.client.post( - "/v0/auth/register", - json={ - "email": self.email, - "password": "my secret", - "confirm": "my secret", - }, - ) - self.client.post( - "/v0/auth/login", json={"email": self.email, "password": "my secret"} - ) def on_stop(self): - self.client.post("/v0/auth/logout") print("Stopping", self.email) diff --git a/tests/performance/locust_files/catalog_servicesd.py b/tests/performance/locust_files/catalog_servicesd.py deleted file mode 100644 index 62efb86c1e1..00000000000 --- a/tests/performance/locust_files/catalog_servicesd.py +++ /dev/null @@ -1,60 +0,0 @@ -# -# SEE https://docs.locust.io/en/stable/quickstart.html -# - -import logging -from time import time - -import faker -from dotenv import load_dotenv -from locust import task -from locust.contrib.fasthttp import FastHttpUser - -logging.basicConfig(level=logging.INFO) - -fake = faker.Faker() - -load_dotenv() # take environment variables from .env - - -class WebApiUser(FastHttpUser): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - - self.email = fake.email() - - @task() - def get_services_with_details(self): - start = time() - with self.client.get( - "/v0/services?user_id=1&details=true", - headers={ - "x-simcore-products-name": "osparc", - }, - catch_response=True, - ) as response: - response.raise_for_status() - num_services = len(response.json()) - print(f"got {num_services} WITH DETAILS in {time() - start}s") - response.success() - - @task() - def get_services_without_details(self): - start = time() - with self.client.get( - "/v0/services?user_id=1&details=false", - headers={ - "x-simcore-products-name": "osparc", - }, - catch_response=True, - ) as response: - response.raise_for_status() - num_services = len(response.json()) - print(f"got {num_services} in {time() - start}s") - response.success() - - def on_start(self): - print("Created User ", self.email) - - def on_stop(self): - print("Stopping", self.email) diff --git a/tests/performance/locust_files/webserver_services.py b/tests/performance/locust_files/webserver_services.py new file mode 100644 index 00000000000..45432cfcd2e --- /dev/null +++ b/tests/performance/locust_files/webserver_services.py @@ -0,0 +1,51 @@ +# +# SEE https://docs.locust.io/en/stable/quickstart.html +# + +import logging + +import faker +from dotenv import load_dotenv +from locust import task +from locust.contrib.fasthttp import FastHttpUser + +logging.basicConfig(level=logging.INFO) + +fake = faker.Faker() + +load_dotenv() # take environment variables from .env + + +class WebApiUser(FastHttpUser): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + self.email = fake.email() + + # @task + # def health_check(self): + # self.client.get("/v0/health") + + @task(weight=5) + def get_services(self): + self.client.get( + f"/v0/catalog/services", + ) + + def on_start(self): + print("Created User ", self.email) + self.client.post( + "/v0/auth/register", + json={ + "email": self.email, + "password": "my secret", + "confirm": "my secret", + }, + ) + self.client.post( + "/v0/auth/login", json={"email": self.email, "password": "my secret"} + ) + + def on_stop(self): + self.client.post("/v0/auth/logout") + print("Stopping", self.email) From 9d735edc44eb046ac8e257a6c8b90559914797cb Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Thu, 10 Mar 2022 08:35:28 +0100 Subject: [PATCH 40/41] improve logging, increase timeout for CI --- services/director-v2/tests/unit/test_modules_dask_client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/services/director-v2/tests/unit/test_modules_dask_client.py b/services/director-v2/tests/unit/test_modules_dask_client.py index 0dbbf93d106..7fd45ddf617 100644 --- a/services/director-v2/tests/unit/test_modules_dask_client.py +++ b/services/director-v2/tests/unit/test_modules_dask_client.py @@ -95,7 +95,7 @@ async def _assert_wait_for_task_status( ) current_task_status = await dask_client.get_task_status(job_id) assert isinstance(current_task_status, RunningState) - print(f"{current_task_status=} vs {expected_status}") + print(f"{current_task_status=} vs {expected_status=}") assert current_task_status == expected_status @@ -637,7 +637,7 @@ def fake_remote_fct( # after releasing the results, the task shall be UNKNOWN await dask_client.release_task_result(job_id) await _assert_wait_for_task_status( - job_id, dask_client, RunningState.UNKNOWN, timeout=60 + job_id, dask_client, RunningState.UNKNOWN, timeout=120 ) From 66fcd7ec2a208717f95baa324833adc73e7c9e32 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Thu, 10 Mar 2022 17:15:30 +0100 Subject: [PATCH 41/41] add pessimistic handling of disconnections --- services/catalog/src/simcore_service_catalog/db/events.py | 1 + 1 file changed, 1 insertion(+) diff --git a/services/catalog/src/simcore_service_catalog/db/events.py b/services/catalog/src/simcore_service_catalog/db/events.py index af617015e7b..1be02440b2b 100644 --- a/services/catalog/src/simcore_service_catalog/db/events.py +++ b/services/catalog/src/simcore_service_catalog/db/events.py @@ -26,6 +26,7 @@ async def connect_to_db(app: FastAPI) -> None: connect_args={ "server_settings": {"application_name": cfg.POSTGRES_CLIENT_NAME} }, + pool_pre_ping=True, # https://docs.sqlalchemy.org/en/14/core/pooling.html#dealing-with-disconnects ) logger.debug("Connected to %s", engine.url) # pylint: disable=no-member