diff --git a/.vscode/settings.template.json b/.vscode/settings.template.json index 8a23cbf4584..86d0e0550d6 100644 --- a/.vscode/settings.template.json +++ b/.vscode/settings.template.json @@ -46,14 +46,14 @@ "[makefile]": { "editor.insertSpaces": false }, - "python.testing.pyTestEnabled": true, + "python.testing.pytestEnabled": true, "autoDocstring.docstringFormat": "sphinx", "hadolint.hadolintPath": "${workspaceFolder}/scripts/hadolint.bash", "shellcheck.executablePath": "${workspaceFolder}/scripts/shellcheck.bash", "shellcheck.run": "onSave", "shellcheck.enableQuickFix": true, "python.formatting.provider": "black", - "python.sortImports.path": "${VIRTUAL_ENV}/bin/isort", + "python.sortImports.path": "${workspaceFolder}/.venv/bin/isort", "python.sortImports.args": [ "--settings-path=${workspaceFolder}/.isort.cfg" ] diff --git a/packages/models-library/src/models_library/projects_nodes_io.py b/packages/models-library/src/models_library/projects_nodes_io.py index de59cbcbac0..305176b8ad3 100644 --- a/packages/models-library/src/models_library/projects_nodes_io.py +++ b/packages/models-library/src/models_library/projects_nodes_io.py @@ -14,9 +14,10 @@ # Pydantic does not support exporting a jsonschema with Dict keys being something else than a str # this is a regex for having uuids of type: 8-4-4-4-12 digits -NodeID_AsDictKey = constr( - regex=r"^[0-9a-fA-F]{8}-?[0-9a-fA-F]{4}-?[0-9a-fA-F]{4}-?[0-9a-fA-F]{4}-?[0-9a-fA-F]{12}$" +UUID_REGEX = ( + r"^[0-9a-fA-F]{8}-?[0-9a-fA-F]{4}-?[0-9a-fA-F]{4}-?[0-9a-fA-F]{4}-?[0-9a-fA-F]{12}$" ) +NodeID_AsDictKey = constr(regex=UUID_REGEX) class PortLink(BaseModel): diff --git a/packages/models-library/src/models_library/services.py b/packages/models-library/src/models_library/services.py index 878911cd3b8..dddefc4a936 100644 --- a/packages/models-library/src/models_library/services.py +++ b/packages/models-library/src/models_library/services.py @@ -16,7 +16,7 @@ KEY_RE = SERVICE_KEY_RE # TODO: deprecate this global constant by SERVICE_KEY_RE PROPERTY_TYPE_RE = r"^(number|integer|boolean|string|data:([^/\s,]+/[^/\s,]+|\[[^/\s,]+/[^/\s,]+(,[^/\s]+/[^/,\s]+)*\]))$" -PROPERTY_KEY_RE = r"^[-_a-zA-Z0-9]+$" # TODO: should be a UUID_RE instead?? +PROPERTY_KEY_RE = r"^[-_a-zA-Z0-9]+$" FILENAME_RE = r".+" diff --git a/packages/postgres-database/src/simcore_postgres_database/migration/versions/a23183ac1742_always_trigger_when_comp_task_changes.py b/packages/postgres-database/src/simcore_postgres_database/migration/versions/a23183ac1742_always_trigger_when_comp_task_changes.py new file mode 100644 index 00000000000..4dfbf34cdc5 --- /dev/null +++ b/packages/postgres-database/src/simcore_postgres_database/migration/versions/a23183ac1742_always_trigger_when_comp_task_changes.py @@ -0,0 +1,126 @@ +"""always trigger when comp_task changes + +Revision ID: a23183ac1742 +Revises: cfd1c43b5d33 +Create Date: 2020-11-27 12:30:13.836161+00:00 + +""" +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "a23183ac1742" +down_revision = "cfd1c43b5d33" +branch_labels = None +depends_on = None + +DB_PROCEDURE_NAME: str = "notify_comp_tasks_changed" +DB_TRIGGER_NAME: str = f"{DB_PROCEDURE_NAME}_event" +DB_CHANNEL_NAME: str = "comp_tasks_output_events" + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + drop_trigger = sa.DDL( + f""" +DROP TRIGGER IF EXISTS {DB_TRIGGER_NAME} on comp_tasks; +""" + ) + + task_output_changed_procedure = sa.DDL( + f""" + CREATE OR REPLACE FUNCTION {DB_PROCEDURE_NAME}() RETURNS TRIGGER AS $$ + DECLARE + record RECORD; + payload JSON; + changes JSONB; + BEGIN + IF (TG_OP = 'DELETE') THEN + record = OLD; + ELSE + record = NEW; + END IF; + + SELECT jsonb_agg(pre.key ORDER BY pre.key) INTO changes + FROM jsonb_each(to_jsonb(OLD)) AS pre, jsonb_each(to_jsonb(NEW)) AS post + WHERE pre.key = post.key AND pre.value IS DISTINCT FROM post.value; + + payload = json_build_object('table', TG_TABLE_NAME, + 'changes', changes, + 'action', TG_OP, + 'data', row_to_json(record)); + + PERFORM pg_notify('{DB_CHANNEL_NAME}', payload::text); + + RETURN NULL; + END; + $$ LANGUAGE plpgsql; + """ + ) + + task_output_changed_trigger = sa.DDL( + f""" +DROP TRIGGER IF EXISTS {DB_TRIGGER_NAME} on comp_tasks; +CREATE TRIGGER {DB_TRIGGER_NAME} +AFTER UPDATE OF outputs,state ON comp_tasks + FOR EACH ROW + WHEN ((OLD.outputs::jsonb IS DISTINCT FROM NEW.outputs::jsonb OR OLD.state IS DISTINCT FROM NEW.state)) + EXECUTE PROCEDURE {DB_PROCEDURE_NAME}(); +""" + ) + + op.execute(drop_trigger) + op.execute(task_output_changed_procedure) + op.execute(task_output_changed_trigger) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + drop_trigger = sa.DDL( + f""" +DROP TRIGGER IF EXISTS {DB_TRIGGER_NAME} on comp_tasks; +""" + ) + + task_output_changed_procedure = sa.DDL( + f""" + CREATE OR REPLACE FUNCTION {DB_PROCEDURE_NAME}() RETURNS TRIGGER AS $$ + DECLARE + record RECORD; + payload JSON; + BEGIN + IF (TG_OP = 'DELETE') THEN + record = OLD; + ELSE + record = NEW; + END IF; + + payload = json_build_object('table', TG_TABLE_NAME, + 'action', TG_OP, + 'data', row_to_json(record)); + + PERFORM pg_notify('{DB_CHANNEL_NAME}', payload::text); + + RETURN NULL; + END; + $$ LANGUAGE plpgsql; + """ + ) + + task_output_changed_trigger = sa.DDL( + f""" +DROP TRIGGER IF EXISTS {DB_TRIGGER_NAME} on comp_tasks; +CREATE TRIGGER {DB_TRIGGER_NAME} +AFTER UPDATE OF outputs,state ON comp_tasks + FOR EACH ROW + WHEN ((OLD.outputs::jsonb IS DISTINCT FROM NEW.outputs::jsonb OR OLD.state IS DISTINCT FROM NEW.state) + AND NEW.node_class <> 'FRONTEND') + EXECUTE PROCEDURE {DB_PROCEDURE_NAME}(); +""" + ) + + op.execute(drop_trigger) + op.execute(task_output_changed_procedure) + op.execute(task_output_changed_trigger) + # ### end Alembic commands ### diff --git a/packages/postgres-database/src/simcore_postgres_database/models/comp_tasks.py b/packages/postgres-database/src/simcore_postgres_database/models/comp_tasks.py index 3e8118adfc4..6d743c03e96 100644 --- a/packages/postgres-database/src/simcore_postgres_database/models/comp_tasks.py +++ b/packages/postgres-database/src/simcore_postgres_database/models/comp_tasks.py @@ -33,7 +33,10 @@ class NodeClass(enum.Enum): sa.Column("outputs", sa.JSON), sa.Column("image", sa.JSON), sa.Column( - "state", sa.Enum(StateType), nullable=False, server_default=StateType.NOT_STARTED.value + "state", + sa.Enum(StateType), + nullable=False, + server_default=StateType.NOT_STARTED.value, ), # utc timestamps for submission/start/end sa.Column("submit", sa.DateTime), @@ -55,8 +58,7 @@ class NodeClass(enum.Enum): CREATE TRIGGER {DB_TRIGGER_NAME} AFTER UPDATE OF outputs,state ON comp_tasks FOR EACH ROW - WHEN ((OLD.outputs::jsonb IS DISTINCT FROM NEW.outputs::jsonb OR OLD.state IS DISTINCT FROM NEW.state) - AND NEW.node_class <> 'FRONTEND') + WHEN ((OLD.outputs::jsonb IS DISTINCT FROM NEW.outputs::jsonb OR OLD.state IS DISTINCT FROM NEW.state)) EXECUTE PROCEDURE {DB_PROCEDURE_NAME}(); """ ) @@ -69,6 +71,7 @@ class NodeClass(enum.Enum): DECLARE record RECORD; payload JSON; + changes JSONB; BEGIN IF (TG_OP = 'DELETE') THEN record = OLD; @@ -76,7 +79,12 @@ class NodeClass(enum.Enum): record = NEW; END IF; + SELECT jsonb_agg(pre.key ORDER BY pre.key) INTO changes + FROM jsonb_each(to_jsonb(OLD)) AS pre, jsonb_each(to_jsonb(NEW)) AS post + WHERE pre.key = post.key AND pre.value IS DISTINCT FROM post.value; + payload = json_build_object('table', TG_TABLE_NAME, + 'changes', changes, 'action', TG_OP, 'data', row_to_json(record)); diff --git a/packages/postgres-database/tests/test_comp_tasks.py b/packages/postgres-database/tests/test_comp_tasks.py new file mode 100644 index 00000000000..9899e882f84 --- /dev/null +++ b/packages/postgres-database/tests/test_comp_tasks.py @@ -0,0 +1,139 @@ +# pylint: disable=no-value-for-parameter +# pylint:disable=unused-variable +# pylint:disable=unused-argument +# pylint:disable=redefined-outer-name + +import asyncio +import json +from typing import Dict, List + +import pytest +from aiopg.sa.engine import Engine, SAConnection +from aiopg.sa.result import RowProxy +from simcore_postgres_database.models.comp_pipeline import StateType +from simcore_postgres_database.models.comp_tasks import ( + DB_CHANNEL_NAME, + NodeClass, + comp_tasks, +) +from sqlalchemy.sql.elements import literal_column + + +@pytest.fixture() +async def db_connection(pg_engine: Engine) -> SAConnection: + async with pg_engine.acquire() as conn: + yield conn + + +@pytest.fixture() +async def db_notification_queue(db_connection: SAConnection) -> asyncio.Queue: + listen_query = f"LISTEN {DB_CHANNEL_NAME};" + await db_connection.execute(listen_query) + notifications_queue: asyncio.Queue = db_connection.connection.notifies + assert notifications_queue.empty() + yield notifications_queue + + assert ( + notifications_queue.empty() + ), f"the notification queue was not emptied: {notifications_queue.qsize()} remaining notifications" + + +@pytest.fixture() +async def task( + db_connection: SAConnection, + db_notification_queue: asyncio.Queue, + task_class: NodeClass, +) -> Dict: + result = await db_connection.execute( + comp_tasks.insert() + .values(outputs=json.dumps({}), node_class=task_class) + .returning(literal_column("*")) + ) + row: RowProxy = await result.fetchone() + task = dict(row) + + assert ( + db_notification_queue.empty() + ), "database triggered change although it should only trigger on updates!" + + yield task + + +async def _assert_notification_queue_status( + notification_queue: asyncio.Queue, num_exp_messages: int +) -> List[Dict]: + if num_exp_messages > 0: + assert not notification_queue.empty() + + tasks = [] + for n in range(num_exp_messages): + msg = await notification_queue.get() + + assert msg, "notification msg from postgres is empty!" + task_data = json.loads(msg.payload) + + for k in ["table", "changes", "action", "data"]: + assert k in task_data, f"invalid structure, expected [{k}] in {task_data}" + + tasks.append(task_data) + assert ( + notification_queue.empty() + ), f"there are {notification_queue.qsize()} remaining messages in the queue" + + return tasks + + +async def _update_comp_task_with(conn: SAConnection, task: Dict, **kwargs): + await conn.execute( + comp_tasks.update() + .values(**kwargs) + .where(comp_tasks.c.task_id == task["task_id"]) + ) + + +@pytest.mark.parametrize( + "task_class", + [(NodeClass.COMPUTATIONAL), (NodeClass.INTERACTIVE), (NodeClass.FRONTEND)], +) +async def test_listen_query( + db_notification_queue: asyncio.Queue, + db_connection: SAConnection, + task: Dict, +): + """this tests how the postgres LISTEN query and in particular the aiopg implementation of it works""" + # let's test the trigger + updated_output = {"some new stuff": "it is new"} + await _update_comp_task_with( + db_connection, task, outputs=updated_output, state=StateType.ABORTED + ) + tasks = await _assert_notification_queue_status(db_notification_queue, 1) + assert tasks[0]["changes"] == ["outputs", "state"] + assert ( + tasks[0]["data"]["outputs"] == updated_output + ), f"the data received from the database is {tasks[0]}, expected new output is {updated_output}" + + # setting the exact same data twice triggers only ONCE + updated_output = {"some new stuff": "it is newer"} + await _update_comp_task_with(db_connection, task, outputs=updated_output) + await _update_comp_task_with(db_connection, task, outputs=updated_output) + tasks = await _assert_notification_queue_status(db_notification_queue, 1) + assert tasks[0]["changes"] == ["outputs"] + assert ( + tasks[0]["data"]["outputs"] == updated_output + ), f"the data received from the database is {tasks[0]}, expected new output is {updated_output}" + + # updating a number of times with different stuff comes out in FIFO order + NUM_CALLS = 20 + update_outputs = [] + for n in range(NUM_CALLS): + new_output = {"some new stuff": f"a {n} time"} + await _update_comp_task_with(db_connection, task, outputs=new_output) + update_outputs.append(new_output) + + tasks = await _assert_notification_queue_status(db_notification_queue, NUM_CALLS) + + for n, output in enumerate(update_outputs): + assert tasks[n]["changes"] == ["outputs"] + assert ( + tasks[n]["data"]["outputs"] == output + ), f"the data received from the database is {tasks[n]}, expected new output is {output}" diff --git a/scripts/json-schema-faker.bash b/scripts/json-schema-faker.bash index cf8abcea101..5cc2349d152 100755 --- a/scripts/json-schema-faker.bash +++ b/scripts/json-schema-faker.bash @@ -33,7 +33,7 @@ FROM node:12.18.2 COPY ./generate.js /app/generate.js WORKDIR /app RUN npm install \ - json-schema-faker@0.5.0-rcv.29 \ + json-schema-faker@0.5.0-rcv.30 \ jsonfile@6.1.0 && \ npm list --depth=0 diff --git a/services/director-v2/src/simcore_service_director_v2/api/dependencies/dynamic_services.py b/services/director-v2/src/simcore_service_director_v2/api/dependencies/dynamic_services.py new file mode 100644 index 00000000000..4f9c627a5ad --- /dev/null +++ b/services/director-v2/src/simcore_service_director_v2/api/dependencies/dynamic_services.py @@ -0,0 +1,38 @@ +import logging + +from fastapi import Depends, Request +from models_library.projects_nodes import NodeID +from starlette.datastructures import URL + +from ...models.schemas.services import RunningServiceDetails +from ...modules.dynamic_services import ServicesClient +from ...utils.logging_utils import log_decorator +from .director_v0 import DirectorV0Client, get_director_v0_client + +logger = logging.getLogger(__name__) + + +@log_decorator(logger=logger) +async def get_service_base_url( + node_uuid: NodeID, + director_v0_client: DirectorV0Client = Depends(get_director_v0_client), +) -> URL: + + # get the service details + service_details: RunningServiceDetails = ( + await director_v0_client.get_running_service_details(node_uuid) + ) + # compute service url + service_url = URL( + f"http://{service_details.service_host}:{service_details.service_port}{service_details.service_basepath}" + ) + return service_url + + +@log_decorator(logger=logger) +def get_services_client( + request: Request, +) -> ServicesClient: + + client = ServicesClient.instance(request.app) + return client diff --git a/services/director-v2/src/simcore_service_director_v2/api/entrypoints.py b/services/director-v2/src/simcore_service_director_v2/api/entrypoints.py index cdb7abb6d29..5447a7d5b87 100644 --- a/services/director-v2/src/simcore_service_director_v2/api/entrypoints.py +++ b/services/director-v2/src/simcore_service_director_v2/api/entrypoints.py @@ -1,7 +1,14 @@ from fastapi import APIRouter from ..meta import api_vtag -from .routes import computations, health, meta, running_interactive, services +from .routes import ( + computations, + dynamic_services, + health, + meta, + running_interactive, + services, +) # Info meta_router = APIRouter() @@ -23,6 +30,9 @@ v2_router.include_router( computations.router, tags=["computations"], prefix="/computations" ) +v2_router.include_router( + dynamic_services.router, tags=["dynamic services"], prefix="/dynamic_services" +) # root api_router = APIRouter() diff --git a/services/director-v2/src/simcore_service_director_v2/api/routes/dynamic_services.py b/services/director-v2/src/simcore_service_director_v2/api/routes/dynamic_services.py new file mode 100644 index 00000000000..dee9c30c371 --- /dev/null +++ b/services/director-v2/src/simcore_service_director_v2/api/routes/dynamic_services.py @@ -0,0 +1,38 @@ +import logging + +from fastapi import APIRouter, Depends +from starlette import status +from starlette.datastructures import URL + +from ...models.domains.dynamic_services import RetrieveDataIn, RetrieveDataOutEnveloped +from ...utils.logging_utils import log_decorator +from ..dependencies.dynamic_services import ( + ServicesClient, + get_service_base_url, + get_services_client, +) + +router = APIRouter() +logger = logging.getLogger(__file__) + + +@router.post( + "/{node_uuid}:retrieve", + summary="Calls the dynamic service's retrieve endpoint with optional port_keys", + response_model=RetrieveDataOutEnveloped, + status_code=status.HTTP_200_OK, +) +@log_decorator(logger=logger) +async def service_retrieve_data_on_ports( + retrieve_settings: RetrieveDataIn, + service_base_url: URL = Depends(get_service_base_url), + services_client: ServicesClient = Depends(get_services_client), +): + # the handling of client/server errors is already encapsulated in the call to request + resp = await services_client.request( + "POST", + f"{service_base_url}/retrieve", + data=retrieve_settings.json(by_alias=True), + ) + # validate and return + return RetrieveDataOutEnveloped.parse_obj(resp.json()) diff --git a/services/director-v2/src/simcore_service_director_v2/api/routes/running_interactive.py b/services/director-v2/src/simcore_service_director_v2/api/routes/running_interactive.py index 2eab7dbb469..ec2ae183f7d 100644 --- a/services/director-v2/src/simcore_service_director_v2/api/routes/running_interactive.py +++ b/services/director-v2/src/simcore_service_director_v2/api/routes/running_interactive.py @@ -2,7 +2,7 @@ from fastapi import APIRouter, Depends, Query, Response, status from models_library.services import KEY_RE, VERSION_RE -from ...models.schemas.services import RunningServicesEnveloped +from ...models.schemas.services import RunningServicesDetailsArrayEnveloped from ..dependencies.director_v0 import forward_to_director_v0 router = APIRouter() @@ -20,7 +20,7 @@ @router.get( "", description="Lists of running interactive services", - response_model=RunningServicesEnveloped, + response_model=RunningServicesDetailsArrayEnveloped, ) async def list_running_interactive_services( user_id: str = UserIdQuery, diff --git a/services/director-v2/src/simcore_service_director_v2/core/application.py b/services/director-v2/src/simcore_service_director_v2/core/application.py index 2f24b3b5925..7674f3d91f1 100644 --- a/services/director-v2/src/simcore_service_director_v2/core/application.py +++ b/services/director-v2/src/simcore_service_director_v2/core/application.py @@ -13,7 +13,14 @@ ) from ..api.errors.validation_error import http422_error_handler from ..meta import api_version, api_vtag, project_name, summary -from ..modules import celery, db, director_v0, docker_registry, remote_debug +from ..modules import ( + celery, + db, + director_v0, + docker_registry, + dynamic_services, + remote_debug, +) from ..utils.logging_utils import config_all_loggers from .events import on_shutdown, on_startup from .settings import AppSettings, BootModeEnum @@ -47,6 +54,9 @@ def init_app(settings: Optional[AppSettings] = None) -> FastAPI: if settings.director_v0.enabled: director_v0.setup(app, settings.director_v0) + if settings.dynamic_services.enabled: + dynamic_services.setup(app, settings.dynamic_services) + if settings.postgres.enabled: db.setup(app, settings.postgres) @@ -65,11 +75,15 @@ def init_app(settings: Optional[AppSettings] = None) -> FastAPI: # SEE https://docs.python.org/3/library/exceptions.html#exception-hierarchy app.add_exception_handler( NotImplementedError, - make_http_error_handler_for_exception(status.HTTP_501_NOT_IMPLEMENTED, NotImplementedError), + make_http_error_handler_for_exception( + status.HTTP_501_NOT_IMPLEMENTED, NotImplementedError + ), ) app.add_exception_handler( Exception, - make_http_error_handler_for_exception(status.HTTP_500_INTERNAL_SERVER_ERROR, Exception), + make_http_error_handler_for_exception( + status.HTTP_500_INTERNAL_SERVER_ERROR, Exception + ), ) app.include_router(api_router) diff --git a/services/director-v2/src/simcore_service_director_v2/core/settings.py b/services/director-v2/src/simcore_service_director_v2/core/settings.py index 6742226b03f..a2321ed7025 100644 --- a/services/director-v2/src/simcore_service_director_v2/core/settings.py +++ b/services/director-v2/src/simcore_service_director_v2/core/settings.py @@ -65,6 +65,13 @@ class Config(CommonConfig): env_prefix = "DIRECTOR_" +class DynamicServicesSettings(BaseSettings): + enabled: bool = Field(True, description="Enables/Disables connection with service") + + class Config(CommonConfig): + pass + + class PGSettings(PostgresSettings): enabled: bool = Field(True, description="Enables/Disables connection with service") @@ -119,6 +126,7 @@ def create_from_env(cls, **settings_kwargs) -> "AppSettings": director_v0=DirectorV0Settings(), registry=RegistrySettings(), celery=CelerySettings.create_from_env(), + dynamic_services=DynamicServicesSettings(), **settings_kwargs, ) @@ -129,6 +137,7 @@ def create_from_env(cls, **settings_kwargs) -> "AppSettings": log_level_name: str = Field("DEBUG", env="LOG_LEVEL") @validator("log_level_name") + @classmethod def match_logging_level(cls, value) -> str: try: getattr(logging, value.upper()) @@ -146,6 +155,9 @@ def loglevel(self) -> int: # DIRECTOR submodule director_v0: DirectorV0Settings + # Dynamic Services submodule + dynamic_services: DynamicServicesSettings + # REGISTRY submodule registry: RegistrySettings diff --git a/services/director-v2/src/simcore_service_director_v2/models/domains/dynamic_services.py b/services/director-v2/src/simcore_service_director_v2/models/domains/dynamic_services.py new file mode 100644 index 00000000000..f43f535eeec --- /dev/null +++ b/services/director-v2/src/simcore_service_director_v2/models/domains/dynamic_services.py @@ -0,0 +1,20 @@ +from typing import List + +from models_library.services import PropertyName +from pydantic import BaseModel, Field + + +class RetrieveDataIn(BaseModel): + port_keys: List[PropertyName] = Field( + ..., description="The port keys to retrieve data from" + ) + + +class RetrieveDataOut(BaseModel): + size_bytes: int = Field( + ..., description="The amount of data transferred by the retrieve call" + ) + + +class RetrieveDataOutEnveloped(BaseModel): + data: RetrieveDataOut diff --git a/services/director-v2/src/simcore_service_director_v2/models/schemas/services.py b/services/director-v2/src/simcore_service_director_v2/models/schemas/services.py index 56bbe39e340..b6e2bb4866d 100644 --- a/services/director-v2/src/simcore_service_director_v2/models/schemas/services.py +++ b/services/director-v2/src/simcore_service_director_v2/models/schemas/services.py @@ -1,9 +1,10 @@ from enum import Enum from typing import List, Optional +from models_library.basic_types import PortInt +from models_library.projects_nodes_io import UUID_REGEX from models_library.services import KEY_RE, VERSION_RE, ServiceDockerData from pydantic import BaseModel, Field, constr -from pydantic.types import UUID4, PositiveInt class NodeRequirement(str, Enum): @@ -36,15 +37,19 @@ class ServiceState(str, Enum): FAILED = "failed" -class RunningServiceType(BaseModel): - published_port: PositiveInt = Field( - ..., description="The ports where the service provides its interface" +class RunningServiceDetails(BaseModel): + published_port: Optional[PortInt] = Field( + ..., + description="The ports where the service provides its interface on the docker swarm", + deprecated=True, + ) + entry_point: str = Field( + ..., + description="The entry point where the service provides its interface", ) - entry_point: Optional[str] = Field( - None, - description="The entry point where the service provides its interface if specified", + service_uuid: str = Field( + ..., regex=UUID_REGEX, description="The node UUID attached to the service" ) - service_uuid: UUID4 = Field(..., description="The UUID attached to this service") service_key: constr(regex=KEY_RE) = Field( ..., description="distinctive name for the node based on the docker registry path", @@ -59,26 +64,30 @@ class RunningServiceType(BaseModel): example=["1.0.0", "0.0.1"], ) service_host: str = Field(..., description="service host name within the network") - service_port: PositiveInt = Field( - ..., description="port to access the service within the network" + service_port: PortInt = Field( + 80, description="port to access the service within the network" ) - service_basepath: Optional[str] = Field( - "", - description="different base path where current service is mounted otherwise defaults to root", + service_basepath: str = Field( + ..., + description="the service base entrypoint where the service serves its contents", ) service_state: ServiceState = Field( ..., description="the service state * 'pending' - The service is waiting for resources to start * 'pulling' - The service is being pulled from the registry * 'starting' - The service is starting * 'running' - The service is running * 'complete' - The service completed * 'failed' - The service failed to start\n", ) - service_message: Optional[str] = Field(None, description="the service message") + service_message: str = Field(..., description="the service message") + + +class RunningServicesDetailsArray(BaseModel): + __root__: List[RunningServiceDetails] -class RunningServicesArray(BaseModel): - __root__: List[RunningServiceType] +class RunningServicesDetailsArrayEnveloped(BaseModel): + data: RunningServicesDetailsArray -class RunningServicesEnveloped(BaseModel): - data: RunningServicesArray +class RunningServiceDetailsEnveloped(BaseModel): + data: RunningServiceDetails class ServicesArrayEnveloped(BaseModel): diff --git a/services/director-v2/src/simcore_service_director_v2/modules/director_v0.py b/services/director-v2/src/simcore_service_director_v2/modules/director_v0.py index 2a235cc51dc..1bc7836fc66 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/director_v0.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/director_v0.py @@ -5,11 +5,10 @@ import logging import urllib from dataclasses import dataclass -from typing import Dict import httpx from fastapi import FastAPI, HTTPException, Request, Response -from httpx import codes +from models_library.projects_nodes import NodeID from models_library.services import ServiceDockerData, ServiceKeyVersion # Module's business logic --------------------------------------------- @@ -17,12 +16,13 @@ from starlette.datastructures import URL from ..core.settings import DirectorV0Settings -from ..models.schemas.services import ServiceExtras +from ..models.schemas.services import RunningServiceDetails, ServiceExtras from ..utils.client_decorators import handle_errors, handle_retry +from ..utils.clients import unenvelope_or_raise_error +from ..utils.logging_utils import log_decorator logger = logging.getLogger(__name__) - # Module's setup logic --------------------------------------------- @@ -45,34 +45,6 @@ async def on_shutdown() -> None: app.add_event_handler("shutdown", on_shutdown) -def _unenvelope_or_raise_error(resp: httpx.Response) -> Dict: - """ - Director responses are enveloped - If successful response, we un-envelop it and return data as a dict - If error, it raise an HTTPException - """ - body = resp.json() - - assert "data" in body or "error" in body # nosec - data = body.get("data") - error = body.get("error") - - if codes.is_server_error(resp.status_code): - logger.error( - "director error %d [%s]: %s", - resp.status_code, - resp.reason_phrase, - error, - ) - raise HTTPException(status.HTTP_503_SERVICE_UNAVAILABLE) - - if codes.is_client_error(resp.status_code): - msg = error or resp.reason_phrase - raise HTTPException(resp.status_code, detail=msg) - - return data or {} - - @dataclass class DirectorV0Client: client: httpx.AsyncClient @@ -114,6 +86,7 @@ async def forward(self, request: Request, response: Response) -> Response: # NOTE: the response is NOT validated! return response + @log_decorator(logger=logger) async def get_service_details( self, service: ServiceKeyVersion ) -> ServiceDockerData: @@ -121,14 +94,24 @@ async def get_service_details( "GET", f"services/{urllib.parse.quote_plus(service.key)}/{service.version}" ) if resp.status_code == status.HTTP_200_OK: - return ServiceDockerData.parse_obj(_unenvelope_or_raise_error(resp)[0]) + return ServiceDockerData.parse_obj(unenvelope_or_raise_error(resp)[0]) raise HTTPException(status_code=resp.status_code, detail=resp.content) + @log_decorator(logger=logger) async def get_service_extras(self, service: ServiceKeyVersion) -> ServiceExtras: resp = await self.request( "GET", f"service_extras/{urllib.parse.quote_plus(service.key)}/{service.version}", ) if resp.status_code == status.HTTP_200_OK: - return ServiceExtras.parse_obj(_unenvelope_or_raise_error(resp)) + return ServiceExtras.parse_obj(unenvelope_or_raise_error(resp)) + raise HTTPException(status_code=resp.status_code, detail=resp.content) + + @log_decorator(logger=logger) + async def get_running_service_details( + self, service_uuid: NodeID + ) -> RunningServiceDetails: + resp = await self.request("GET", f"running_interactive_services/{service_uuid}") + if resp.status_code == status.HTTP_200_OK: + return RunningServiceDetails.parse_obj(unenvelope_or_raise_error(resp)) raise HTTPException(status_code=resp.status_code, detail=resp.content) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_services.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_services.py new file mode 100644 index 00000000000..c43cb5680a0 --- /dev/null +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_services.py @@ -0,0 +1,56 @@ +""" Module that takes care of communications with dynamic services v0 + + +""" +import logging +from dataclasses import dataclass + +import httpx +from fastapi import FastAPI, Response + +# Module's business logic --------------------------------------------- +from ..core.settings import DynamicServicesSettings +from ..utils.client_decorators import handle_errors, handle_retry + +logger = logging.getLogger(__name__) + + +# Module's setup logic --------------------------------------------- + + +def setup(app: FastAPI, settings: DynamicServicesSettings): + if not settings: + settings = DynamicServicesSettings() + + def on_startup() -> None: + ServicesClient.create( + app, + client=httpx.AsyncClient(), + ) + + async def on_shutdown() -> None: + client = ServicesClient.instance(app).client + await client.aclose() + del app.state.dynamic_services_client + + app.add_event_handler("startup", on_startup) + app.add_event_handler("shutdown", on_shutdown) + + +@dataclass +class ServicesClient: + client: httpx.AsyncClient + + @classmethod + def create(cls, app: FastAPI, **kwargs): + app.state.dynamic_services_client = cls(**kwargs) + return cls.instance(app) + + @classmethod + def instance(cls, app: FastAPI): + return app.state.dynamic_services_client + + @handle_errors("DynamicService", logger) + @handle_retry(logger) + async def request(self, method: str, tail_path: str, **kwargs) -> Response: + return await self.client.request(method, tail_path, **kwargs) diff --git a/services/director-v2/src/simcore_service_director_v2/utils/clients.py b/services/director-v2/src/simcore_service_director_v2/utils/clients.py new file mode 100644 index 00000000000..7a5fc764fb2 --- /dev/null +++ b/services/director-v2/src/simcore_service_director_v2/utils/clients.py @@ -0,0 +1,37 @@ +import logging +from typing import Dict + +import httpx +from fastapi import HTTPException +from httpx import codes +from starlette import status + +logger = logging.getLogger(__name__) + + +def unenvelope_or_raise_error(resp: httpx.Response) -> Dict: + """ + Director responses are enveloped + If successful response, we un-envelop it and return data as a dict + If error, it raise an HTTPException + """ + body = resp.json() + + assert "data" in body or "error" in body # nosec + data = body.get("data") + error = body.get("error") + + if codes.is_server_error(resp.status_code): + logger.error( + "director error %d [%s]: %s", + resp.status_code, + resp.reason_phrase, + error, + ) + raise HTTPException(status.HTTP_503_SERVICE_UNAVAILABLE) + + if codes.is_client_error(resp.status_code): + msg = error or resp.reason_phrase + raise HTTPException(resp.status_code, detail=msg) + + return data or {} diff --git a/services/director-v2/tests/unit/test_modules_director_v0.py b/services/director-v2/tests/unit/test_modules_director_v0.py index 2158b15a80a..790c8fb53fe 100644 --- a/services/director-v2/tests/unit/test_modules_director_v0.py +++ b/services/director-v2/tests/unit/test_modules_director_v0.py @@ -4,16 +4,22 @@ # pylint:disable=protected-access import json +import re import urllib.parse from collections import namedtuple from pathlib import Path +from random import randint from typing import Callable, List +from uuid import uuid4 import pytest import respx from fastapi import FastAPI, status from models_library.services import ServiceDockerData, ServiceKeyVersion -from simcore_service_director_v2.models.schemas.services import ServiceExtras +from simcore_service_director_v2.models.schemas.services import ( + RunningServiceDetails, + ServiceExtras, +) from simcore_service_director_v2.modules.director_v0 import DirectorV0Client @@ -127,9 +133,27 @@ def fake_service_extras(random_json_from_schema: Callable) -> ServiceExtras: return random_extras +@pytest.fixture +def fake_running_service_details( + random_json_from_schema: Callable, +) -> RunningServiceDetails: + random_data = random_json_from_schema(RunningServiceDetails.schema_json(indent=2)) + # fix port stuff, the randomiser does not understand positive ints + KEYS_TO_FIX = ["published_port", "service_port"] + for k in KEYS_TO_FIX: + if k in random_data: + random_data[k] = randint(1, 50000) + random_details = RunningServiceDetails(**random_data) + + return random_details + + @pytest.fixture def mocked_director_service_fcts( - minimal_app: FastAPI, fake_service_details, fake_service_extras + minimal_app: FastAPI, + fake_service_details: ServiceDockerData, + fake_service_extras: ServiceExtras, + fake_running_service_details: RunningServiceDetails, ): with respx.mock( base_url=minimal_app.state.settings.director_v0.base_url(include_tag=False), @@ -147,6 +171,14 @@ def mocked_director_service_fcts( content={"data": fake_service_extras.dict(by_alias=True)}, alias="get_service_extras", ) + pattern = re.compile( + r"v0/running_interactive_services/[0-9a-fA-F]{8}-?[0-9a-fA-F]{4}-?[0-9a-fA-F]{4}-?[0-9a-fA-F]{4}-?[0-9a-fA-F]{12}$" + ) + respx_mock.get( + pattern, + content={"data": fake_running_service_details.dict(by_alias=True)}, + alias="get_running_service_details", + ) yield respx_mock @@ -179,3 +211,18 @@ async def test_get_service_extras( service_extras: ServiceExtras = await director_client.get_service_extras(service) assert mocked_director_service_fcts["get_service_extras"].called assert fake_service_extras == service_extras + + +async def test_get_running_service_details( + minimal_app: FastAPI, + mocked_director_service_fcts, + fake_running_service_details: RunningServiceDetails, +): + + director_client: DirectorV0Client = minimal_app.state.director_v0_client + + service_details: RunningServiceDetails = ( + await director_client.get_running_service_details(str(uuid4())) + ) + assert mocked_director_service_fcts["get_running_service_details"].called + assert fake_running_service_details == service_details diff --git a/services/docker-compose.devel.yml b/services/docker-compose.devel.yml index f11f2dbd000..cb99877d303 100644 --- a/services/docker-compose.devel.yml +++ b/services/docker-compose.devel.yml @@ -36,7 +36,7 @@ services: director-v2: environment: - SC_BOOT_MODE=debug-ptvsd - - LOGLEVEL=debug + - LOG_LEVEL=debug - DEBUG=true volumes: - ./director-v2:/devel/services/director-v2 diff --git a/services/web/server/src/simcore_service_webserver/computation_comp_tasks_listening_task.py b/services/web/server/src/simcore_service_webserver/computation_comp_tasks_listening_task.py index 951c371e1fc..4d128fd3b60 100644 --- a/services/web/server/src/simcore_service_webserver/computation_comp_tasks_listening_task.py +++ b/services/web/server/src/simcore_service_webserver/computation_comp_tasks_listening_task.py @@ -6,11 +6,17 @@ import json import logging from pprint import pformat -from typing import Dict +from typing import Dict, List from aiohttp import web from aiopg.sa import Engine +from aiopg.sa.connection import SAConnection +from models_library.projects import ProjectID +from models_library.projects_nodes import NodeID +from models_library.projects_state import RunningState +from pydantic.types import PositiveInt from servicelib.application_keys import APP_DB_ENGINE_KEY +from servicelib.logging_utils import log_decorator from simcore_postgres_database.webserver_models import DB_CHANNEL_NAME, projects from sqlalchemy.sql import select @@ -19,75 +25,58 @@ log = logging.getLogger(__name__) -OUTPUT_KEYS_TO_COMPARE = ["path", "store"] - -def _is_output_changed(current_outputs: Dict, new_outputs: Dict) -> bool: - if new_outputs != current_outputs: - if not current_outputs: - return True - for port_key in new_outputs: - if port_key not in current_outputs: - return True - if isinstance(new_outputs[port_key], dict): - # file type output - if any( - current_outputs[port_key][x] != new_outputs[port_key][x] - for x in OUTPUT_KEYS_TO_COMPARE - ): - return True - elif new_outputs[port_key] != current_outputs[port_key]: - # value output - return True - return False - - -def _is_state_changed(current_state: str, new_state: str) -> bool: - return current_state != new_state +@log_decorator(logger=log) +async def _get_project_owner( + conn: SAConnection, project_uuid: ProjectID +) -> PositiveInt: + the_project_owner = await conn.scalar( + select([projects.c.prj_owner]).where(projects.c.uuid == project_uuid) + ) + if not the_project_owner: + raise projects_exceptions.ProjectOwnerNotFoundError(project_uuid) + return the_project_owner + + +@log_decorator(logger=log) +async def _update_project_state( + app: web.Application, + user_id: PositiveInt, + project_uuid: ProjectID, + node_uuid: NodeID, + new_state: RunningState, +) -> None: + project = await projects_api.update_project_node_state( + app, user_id, project_uuid, node_uuid, new_state + ) + await projects_api.notify_project_node_update(app, project, node_uuid) + await projects_api.notify_project_state_update(app, project) -async def _update_project_node_and_notify_if_needed( - app: web.Application, project: Dict, new_node_data: Dict, user_id: int +@log_decorator(logger=log) +async def _update_project_outputs( + app: web.Application, + user_id: PositiveInt, + project_uuid: ProjectID, + node_uuid: NodeID, + outputs: Dict, ) -> None: - log.debug( - "Received update from comp_task update from DB: %s", pformat(new_node_data) + changed_keys: List[str] = list(outputs.keys()) + if not changed_keys: + return + + project = await projects_api.update_project_node_outputs( + app, + user_id, + project_uuid, + node_uuid, + data=outputs, + ) + + await projects_api.notify_project_node_update(app, project, node_uuid) + await projects_api.trigger_connected_service_retrieve( + app, project, node_uuid, changed_keys ) - node_uuid = new_node_data["node_id"] - project_uuid = new_node_data["project_id"] - - if node_uuid not in project["workbench"]: - raise projects_exceptions.NodeNotFoundError(project_uuid, node_uuid) - - current_outputs = project["workbench"][node_uuid].get("outputs") - if _is_output_changed(current_outputs, new_node_data["outputs"]): - project = await projects_api.update_project_node_outputs( - app, - user_id, - project_uuid, - node_uuid, - data=new_node_data["outputs"], - ) - log.debug( - "Updated node outputs from\n %s\n to\n %s", - pformat(current_outputs or ""), - pformat(new_node_data["outputs"]), - ) - await projects_api.notify_project_node_update(app, project, node_uuid) - - current_state = project["workbench"][node_uuid].get("state") - new_state = convert_state_from_db(new_node_data["state"]).value - if _is_state_changed(current_state, new_state): - project = await projects_api.update_project_node_state( - app, user_id, project_uuid, node_uuid, new_state - ) - log.debug( - "Updated node %s state from %s to %s", - node_uuid, - pformat(current_state or ""), - pformat(new_state), - ) - await projects_api.notify_project_node_update(app, project, node_uuid) - await projects_api.notify_project_state_update(app, project) async def listen(app: web.Application): @@ -97,55 +86,62 @@ async def listen(app: web.Application): await conn.execute(listen_query) while True: - msg = await conn.connection.notifies.get() - - # Changes on comp_tasks.outputs of non-frontend task - log.debug("DB comp_tasks.outputs/state updated: <- %s", msg.payload) - task_data = json.loads(msg.payload)["data"] - log.debug( - "node %s new outputs: %s", - task_data["node_id"], - pformat(task_data["outputs"]), - ) + # NOTE: this waits for a new notification so the engine is locked here + notification = await conn.connection.notifies.get() log.debug( - "node %s new state: %s", - task_data["node_id"], - pformat(task_data["state"]), + "received update from database: %s", pformat(notification.payload) ) - project_uuid = task_data["project_id"] + # get the data and the info on what changed + payload: Dict = json.loads(notification.payload) + + # FIXME: this part should be replaced by a pydantic CompTaskAtDB once it moves to director-v2 + task_data = payload.get("data", {}) + task_changes = payload.get("changes", []) + + if not task_data: + log.error("task data invalid: %s", pformat(payload)) + continue + + if not task_changes: + log.error("no changes but still triggered: %s", pformat(payload)) + + project_uuid = task_data.get("project_id", None) + node_uuid = task_data.get("node_id", None) + outputs = task_data.get("outputs", {}) + state = convert_state_from_db(task_data.get("state")).value # FIXME: we do not know who triggered these changes. we assume the user had the rights to do so # therefore we'll use the prj_owner user id. This should be fixed when the new sidecar comes in # and comp_tasks/comp_pipeline get deprecated. + try: + # find the user(s) linked to that project + the_project_owner = await _get_project_owner(conn, project_uuid) - # find the user(s) linked to that project - the_project_owner = await conn.scalar( - select([projects.c.prj_owner]).where(projects.c.uuid == project_uuid) - ) - if not the_project_owner: - log.warning( - "Project %s was not found and cannot be updated", project_uuid - ) - continue + if "outputs" in task_changes: + await _update_project_outputs( + app, the_project_owner, project_uuid, node_uuid, outputs + ) - # update the project if necessary - project = await projects_api.get_project_for_user( - app, project_uuid, the_project_owner, include_state=True - ) - # Update the project outputs - try: - await _update_project_node_and_notify_if_needed( - app, project, task_data, the_project_owner - ) + if "state" in task_changes: + await _update_project_state( + app, the_project_owner, project_uuid, node_uuid, state + ) except projects_exceptions.ProjectNotFoundError as exc: log.warning( - "Project %s was not found and cannot be updated", exc.project_uuid + "Project %s was not found and cannot be updated. Maybe was it deleted?", + exc.project_uuid, + ) + continue + except projects_exceptions.ProjectOwnerNotFoundError as exc: + log.warning( + "Project owner of project %s could not be found, is the project valid?", + exc.project_uuid, ) continue except projects_exceptions.NodeNotFoundError as exc: log.warning( - "Node %s ib project %s not found and cannot be updated", + "Node %s of project %s not found and cannot be updated. Maybe was it deleted?", exc.node_uuid, exc.project_uuid, ) diff --git a/services/web/server/src/simcore_service_webserver/director/director_api.py b/services/web/server/src/simcore_service_webserver/director/director_api.py index cd7f52363c5..5cb0e38762b 100644 --- a/services/web/server/src/simcore_service_webserver/director/director_api.py +++ b/services/web/server/src/simcore_service_webserver/director/director_api.py @@ -84,7 +84,9 @@ async def stop_service(app: web.Application, service_uuid: str) -> None: # this will allow to sava bigger datasets from the services url = api_endpoint / "running_interactive_services" / service_uuid async with session.delete( - url, ssl=False, timeout=ServicesCommonSettings().webserver_director_stop_service_timeout + url, + ssl=False, + timeout=ServicesCommonSettings().webserver_director_stop_service_timeout, ) as resp: if resp.status == 404: raise director_exceptions.ServiceNotFoundError(service_uuid) diff --git a/services/web/server/src/simcore_service_webserver/director_v2.py b/services/web/server/src/simcore_service_webserver/director_v2.py index 65e5878ee03..9a82daa7186 100644 --- a/services/web/server/src/simcore_service_webserver/director_v2.py +++ b/services/web/server/src/simcore_service_webserver/director_v2.py @@ -1,17 +1,16 @@ import logging from asyncio import CancelledError -from typing import Any, Dict, Optional, Tuple +from typing import Any, Dict, List, Optional, Tuple from uuid import UUID from aiohttp import web -from pydantic.types import PositiveInt -from yarl import URL - from models_library.projects_state import RunningState +from pydantic.types import PositiveInt from servicelib.application_setup import ModuleCategory, app_module_setup from servicelib.logging_utils import log_decorator from servicelib.rest_responses import wrap_as_envelope from servicelib.rest_routing import iter_path_operations, map_handlers_with_operations +from yarl import URL from .director_v2_settings import ( CONFIG_SECTION_NAME, @@ -176,6 +175,29 @@ async def stop_pipeline(request: web.Request) -> web.Response: ) +@log_decorator(logger=log) +async def request_retrieve_dyn_service( + app: web.Application, service_uuid: str, port_keys: List[str] +) -> None: + director2_settings: Directorv2Settings = get_settings(app) + backend_url = URL( + f"{director2_settings.endpoint}/dynamic_services/{service_uuid}:retrieve" + ) + body = {"port_keys": port_keys} + + try: + # request to director-v2 + await _request_director_v2(app, "POST", backend_url, data=body) + except _DirectorServiceError as exc: + log.warning( + "Unable to call :retrieve endpoint on service %s, keys: [%s]: error: [%s:%s]", + service_uuid, + port_keys, + exc.status, + exc.reason, + ) + + @app_module_setup( __name__, ModuleCategory.ADDON, diff --git a/services/web/server/src/simcore_service_webserver/projects/projects_api.py b/services/web/server/src/simcore_service_webserver/projects/projects_api.py index bdb1fdba752..4e0428b3597 100644 --- a/services/web/server/src/simcore_service_webserver/projects/projects_api.py +++ b/services/web/server/src/simcore_service_webserver/projects/projects_api.py @@ -9,8 +9,9 @@ # pylint: disable=too-many-arguments import logging +from collections import defaultdict from pprint import pformat -from typing import Any, Dict, Optional, Set +from typing import Any, Dict, List, Optional, Set from uuid import uuid4 from aiohttp import web @@ -28,7 +29,11 @@ from servicelib.utils import fire_and_forget_task, logged_gather from ..director import director_api -from ..director_v2 import delete_pipeline, get_pipeline_state +from ..director_v2 import ( + delete_pipeline, + get_pipeline_state, + request_retrieve_dyn_service, +) from ..resource_manager.websocket_manager import managed_resource from ..socketio.events import ( SOCKET_IO_NODE_UPDATED_EVENT, @@ -420,8 +425,43 @@ async def notify_project_node_update( await post_group_messages(app, room, messages) +async def trigger_connected_service_retrieve( + app: web.Application, project: Dict, updated_node_uuid: str, changed_keys: List[str] +) -> None: + workbench = project["workbench"] + nodes_keys_to_update: Dict[str, List[str]] = defaultdict(list) + # find the nodes that need to retrieve data + for node_uuid, node in workbench.items(): + # check this node is dynamic + if not _is_node_dynamic(node["key"]): + continue + + # check whether this node has our updated node as linked inputs + node_inputs = node.get("inputs", {}) + for port_key, port_value in node_inputs.items(): + # we look for node port links, not values + if not isinstance(port_value, dict): + continue + + input_node_uuid = port_value.get("nodeUuid") + if input_node_uuid != updated_node_uuid: + continue + # so this node is linked to the updated one, now check if the port was changed? + linked_input_port = port_value.get("output") + if linked_input_port in changed_keys: + nodes_keys_to_update[node_uuid].append(port_key) + + # call /retrieve on the nodes + update_tasks = [ + request_retrieve_dyn_service(app, node, keys) + for node, keys in nodes_keys_to_update.items() + ] + await logged_gather(*update_tasks) + + # PROJECT STATE ------------------------------------------------------------------- + async def _get_project_lock_state( user_id: int, project_uuid: str, app: web.Application ) -> ProjectLocked: diff --git a/services/web/server/src/simcore_service_webserver/projects/projects_exceptions.py b/services/web/server/src/simcore_service_webserver/projects/projects_exceptions.py index c79c1154009..565f63ddcf5 100644 --- a/services/web/server/src/simcore_service_webserver/projects/projects_exceptions.py +++ b/services/web/server/src/simcore_service_webserver/projects/projects_exceptions.py @@ -19,6 +19,14 @@ def __init__(self, user_id, project_uuid): self.project_uuid = project_uuid +class ProjectOwnerNotFoundError(ProjectsException): + """Project owner was not found""" + + def __init__(self, project_uuid): + super().__init__(f"Project with uuid {project_uuid} has not project owner") + self.project_uuid = project_uuid + + class ProjectNotFoundError(ProjectsException): """Project was not found in DB""" diff --git a/services/web/server/tests/unit/with_dbs/fast/test_comp_tasks_listening_task.py b/services/web/server/tests/unit/with_dbs/fast/test_comp_tasks_listening_task.py new file mode 100644 index 00000000000..6365d801266 --- /dev/null +++ b/services/web/server/tests/unit/with_dbs/fast/test_comp_tasks_listening_task.py @@ -0,0 +1,142 @@ +# pylint:disable=unused-variable +# pylint:disable=unused-argument +# pylint:disable=redefined-outer-name +# pylint:disable=no-value-for-parameter + +import asyncio +import json +import logging +from asyncio import Future +from typing import Any, Dict, List +from unittest.mock import MagicMock + +import aiopg.sa +import pytest +import tenacity +from aiopg.sa.result import RowProxy +from servicelib.application_keys import APP_DB_ENGINE_KEY +from simcore_postgres_database.models.comp_pipeline import StateType +from simcore_postgres_database.models.comp_tasks import NodeClass, comp_tasks +from simcore_service_webserver.computation_comp_tasks_listening_task import ( + comp_tasks_listening_task, +) +from sqlalchemy.sql.elements import literal_column + + +def future_with_result(result: Any) -> asyncio.Future: + f = Future() + f.set_result(result) + return f + + +@pytest.fixture +async def mock_project_subsystem(mocker) -> Dict: + mocked_project_calls = { + "_get_project_owner": mocker.patch( + "simcore_service_webserver.computation_comp_tasks_listening_task._get_project_owner", + return_value=future_with_result(""), + ), + "_update_project_state": mocker.patch( + "simcore_service_webserver.computation_comp_tasks_listening_task._update_project_state", + return_value=future_with_result(""), + ), + "_update_project_outputs": mocker.patch( + "simcore_service_webserver.computation_comp_tasks_listening_task._update_project_outputs", + return_value=future_with_result(""), + ), + } + yield mocked_project_calls + + +async def test_mock_project_api(loop, mock_project_subsystem: Dict): + from simcore_service_webserver.computation_comp_tasks_listening_task import ( + _get_project_owner, + _update_project_outputs, + _update_project_state, + ) + + assert isinstance(_get_project_owner, MagicMock) + assert isinstance(_update_project_state, MagicMock) + assert isinstance(_update_project_outputs, MagicMock) + + +@pytest.fixture +async def comp_task_listening_task( + loop, mock_project_subsystem: Dict, client +) -> asyncio.Task: + listening_task = loop.create_task(comp_tasks_listening_task(client.app)) + yield listening_task + + listening_task.cancel() + await listening_task + + +MAX_TIMEOUT_S = 10 +logger = logging.getLogger(__name__) + + +@tenacity.retry( + wait=tenacity.wait_fixed(1), + stop=tenacity.stop_after_delay(MAX_TIMEOUT_S), + retry=tenacity.retry_if_exception_type(AssertionError), + before=tenacity.before_log(logger, logging.INFO), + reraise=True, +) +async def _wait_for_call(mock_fct): + mock_fct.assert_called() + + +@pytest.mark.parametrize( + "task_class", [NodeClass.COMPUTATIONAL, NodeClass.INTERACTIVE, NodeClass.FRONTEND] +) +@pytest.mark.parametrize( + "upd_value, exp_calls", + [ + ( + {"outputs": {"some new stuff": "it is new"}}, + ["_get_project_owner", "_update_project_outputs"], + ), + ( + {"state": StateType.ABORTED}, + ["_get_project_owner", "_update_project_state"], + ), + ( + {"outputs": {"some new stuff": "it is new"}, "state": StateType.ABORTED}, + ["_get_project_owner", "_update_project_outputs", "_update_project_state"], + ), + ( + {"inputs": {"should not trigger": "right?"}}, + [], + ), + ], +) +async def test_listen_comp_tasks_task( + mock_project_subsystem: Dict, + comp_task_listening_task: asyncio.Task, + client, + upd_value: Dict[str, Any], + exp_calls: List[str], + task_class: NodeClass, +): + db_engine: aiopg.sa.Engine = client.app[APP_DB_ENGINE_KEY] + async with db_engine.acquire() as conn: + # let's put some stuff in there now + result = await conn.execute( + comp_tasks.insert() + .values(outputs=json.dumps({}), node_class=task_class) + .returning(literal_column("*")) + ) + row: RowProxy = await result.fetchone() + task = dict(row) + + # let's update some values + await conn.execute( + comp_tasks.update() + .values(**upd_value) + .where(comp_tasks.c.task_id == task["task_id"]) + ) + for key, mock_fct in mock_project_subsystem.items(): + if key in exp_calls: + await _wait_for_call(mock_fct) + else: + mock_fct.assert_not_called() diff --git a/services/web/server/tests/unit/with_dbs/fast/test_computation_api.py b/services/web/server/tests/unit/with_dbs/fast/test_computation_api.py index 1dd63f9fb5d..df63ab843c2 100644 --- a/services/web/server/tests/unit/with_dbs/fast/test_computation_api.py +++ b/services/web/server/tests/unit/with_dbs/fast/test_computation_api.py @@ -3,18 +3,10 @@ # pylint:disable=redefined-outer-name -import faker import pytest - from models_library.projects_state import RunningState from simcore_postgres_database.models.comp_pipeline import StateType -from simcore_service_webserver.computation_api import ( - convert_state_from_db, -) - -fake = faker.Faker() - -NodeID = str +from simcore_service_webserver.computation_api import convert_state_from_db @pytest.mark.parametrize(