-
Notifications
You must be signed in to change notification settings - Fork 27
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Auto retrieve inputs of dynamic services (#1999)
- Loading branch information
Showing
27 changed files
with
925 additions
and
185 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
126 changes: 126 additions & 0 deletions
126
...ostgres_database/migration/versions/a23183ac1742_always_trigger_when_comp_task_changes.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,126 @@ | ||
"""always trigger when comp_task changes | ||
Revision ID: a23183ac1742 | ||
Revises: cfd1c43b5d33 | ||
Create Date: 2020-11-27 12:30:13.836161+00:00 | ||
""" | ||
import sqlalchemy as sa | ||
from alembic import op | ||
|
||
# revision identifiers, used by Alembic. | ||
revision = "a23183ac1742" | ||
down_revision = "cfd1c43b5d33" | ||
branch_labels = None | ||
depends_on = None | ||
|
||
DB_PROCEDURE_NAME: str = "notify_comp_tasks_changed" | ||
DB_TRIGGER_NAME: str = f"{DB_PROCEDURE_NAME}_event" | ||
DB_CHANNEL_NAME: str = "comp_tasks_output_events" | ||
|
||
|
||
def upgrade(): | ||
# ### commands auto generated by Alembic - please adjust! ### | ||
drop_trigger = sa.DDL( | ||
f""" | ||
DROP TRIGGER IF EXISTS {DB_TRIGGER_NAME} on comp_tasks; | ||
""" | ||
) | ||
|
||
task_output_changed_procedure = sa.DDL( | ||
f""" | ||
CREATE OR REPLACE FUNCTION {DB_PROCEDURE_NAME}() RETURNS TRIGGER AS $$ | ||
DECLARE | ||
record RECORD; | ||
payload JSON; | ||
changes JSONB; | ||
BEGIN | ||
IF (TG_OP = 'DELETE') THEN | ||
record = OLD; | ||
ELSE | ||
record = NEW; | ||
END IF; | ||
SELECT jsonb_agg(pre.key ORDER BY pre.key) INTO changes | ||
FROM jsonb_each(to_jsonb(OLD)) AS pre, jsonb_each(to_jsonb(NEW)) AS post | ||
WHERE pre.key = post.key AND pre.value IS DISTINCT FROM post.value; | ||
payload = json_build_object('table', TG_TABLE_NAME, | ||
'changes', changes, | ||
'action', TG_OP, | ||
'data', row_to_json(record)); | ||
PERFORM pg_notify('{DB_CHANNEL_NAME}', payload::text); | ||
RETURN NULL; | ||
END; | ||
$$ LANGUAGE plpgsql; | ||
""" | ||
) | ||
|
||
task_output_changed_trigger = sa.DDL( | ||
f""" | ||
DROP TRIGGER IF EXISTS {DB_TRIGGER_NAME} on comp_tasks; | ||
CREATE TRIGGER {DB_TRIGGER_NAME} | ||
AFTER UPDATE OF outputs,state ON comp_tasks | ||
FOR EACH ROW | ||
WHEN ((OLD.outputs::jsonb IS DISTINCT FROM NEW.outputs::jsonb OR OLD.state IS DISTINCT FROM NEW.state)) | ||
EXECUTE PROCEDURE {DB_PROCEDURE_NAME}(); | ||
""" | ||
) | ||
|
||
op.execute(drop_trigger) | ||
op.execute(task_output_changed_procedure) | ||
op.execute(task_output_changed_trigger) | ||
# ### end Alembic commands ### | ||
|
||
|
||
def downgrade(): | ||
# ### commands auto generated by Alembic - please adjust! ### | ||
drop_trigger = sa.DDL( | ||
f""" | ||
DROP TRIGGER IF EXISTS {DB_TRIGGER_NAME} on comp_tasks; | ||
""" | ||
) | ||
|
||
task_output_changed_procedure = sa.DDL( | ||
f""" | ||
CREATE OR REPLACE FUNCTION {DB_PROCEDURE_NAME}() RETURNS TRIGGER AS $$ | ||
DECLARE | ||
record RECORD; | ||
payload JSON; | ||
BEGIN | ||
IF (TG_OP = 'DELETE') THEN | ||
record = OLD; | ||
ELSE | ||
record = NEW; | ||
END IF; | ||
payload = json_build_object('table', TG_TABLE_NAME, | ||
'action', TG_OP, | ||
'data', row_to_json(record)); | ||
PERFORM pg_notify('{DB_CHANNEL_NAME}', payload::text); | ||
RETURN NULL; | ||
END; | ||
$$ LANGUAGE plpgsql; | ||
""" | ||
) | ||
|
||
task_output_changed_trigger = sa.DDL( | ||
f""" | ||
DROP TRIGGER IF EXISTS {DB_TRIGGER_NAME} on comp_tasks; | ||
CREATE TRIGGER {DB_TRIGGER_NAME} | ||
AFTER UPDATE OF outputs,state ON comp_tasks | ||
FOR EACH ROW | ||
WHEN ((OLD.outputs::jsonb IS DISTINCT FROM NEW.outputs::jsonb OR OLD.state IS DISTINCT FROM NEW.state) | ||
AND NEW.node_class <> 'FRONTEND') | ||
EXECUTE PROCEDURE {DB_PROCEDURE_NAME}(); | ||
""" | ||
) | ||
|
||
op.execute(drop_trigger) | ||
op.execute(task_output_changed_procedure) | ||
op.execute(task_output_changed_trigger) | ||
# ### end Alembic commands ### |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,139 @@ | ||
# pylint: disable=no-value-for-parameter | ||
# pylint:disable=unused-variable | ||
# pylint:disable=unused-argument | ||
# pylint:disable=redefined-outer-name | ||
|
||
import asyncio | ||
import json | ||
from typing import Dict, List | ||
|
||
import pytest | ||
from aiopg.sa.engine import Engine, SAConnection | ||
from aiopg.sa.result import RowProxy | ||
from simcore_postgres_database.models.comp_pipeline import StateType | ||
from simcore_postgres_database.models.comp_tasks import ( | ||
DB_CHANNEL_NAME, | ||
NodeClass, | ||
comp_tasks, | ||
) | ||
from sqlalchemy.sql.elements import literal_column | ||
|
||
|
||
@pytest.fixture() | ||
async def db_connection(pg_engine: Engine) -> SAConnection: | ||
async with pg_engine.acquire() as conn: | ||
yield conn | ||
|
||
|
||
@pytest.fixture() | ||
async def db_notification_queue(db_connection: SAConnection) -> asyncio.Queue: | ||
listen_query = f"LISTEN {DB_CHANNEL_NAME};" | ||
await db_connection.execute(listen_query) | ||
notifications_queue: asyncio.Queue = db_connection.connection.notifies | ||
assert notifications_queue.empty() | ||
yield notifications_queue | ||
|
||
assert ( | ||
notifications_queue.empty() | ||
), f"the notification queue was not emptied: {notifications_queue.qsize()} remaining notifications" | ||
|
||
|
||
@pytest.fixture() | ||
async def task( | ||
db_connection: SAConnection, | ||
db_notification_queue: asyncio.Queue, | ||
task_class: NodeClass, | ||
) -> Dict: | ||
result = await db_connection.execute( | ||
comp_tasks.insert() | ||
.values(outputs=json.dumps({}), node_class=task_class) | ||
.returning(literal_column("*")) | ||
) | ||
row: RowProxy = await result.fetchone() | ||
task = dict(row) | ||
|
||
assert ( | ||
db_notification_queue.empty() | ||
), "database triggered change although it should only trigger on updates!" | ||
|
||
yield task | ||
|
||
|
||
async def _assert_notification_queue_status( | ||
notification_queue: asyncio.Queue, num_exp_messages: int | ||
) -> List[Dict]: | ||
if num_exp_messages > 0: | ||
assert not notification_queue.empty() | ||
|
||
tasks = [] | ||
for n in range(num_exp_messages): | ||
msg = await notification_queue.get() | ||
|
||
assert msg, "notification msg from postgres is empty!" | ||
task_data = json.loads(msg.payload) | ||
|
||
for k in ["table", "changes", "action", "data"]: | ||
assert k in task_data, f"invalid structure, expected [{k}] in {task_data}" | ||
|
||
tasks.append(task_data) | ||
assert ( | ||
notification_queue.empty() | ||
), f"there are {notification_queue.qsize()} remaining messages in the queue" | ||
|
||
return tasks | ||
|
||
|
||
async def _update_comp_task_with(conn: SAConnection, task: Dict, **kwargs): | ||
await conn.execute( | ||
comp_tasks.update() | ||
.values(**kwargs) | ||
.where(comp_tasks.c.task_id == task["task_id"]) | ||
) | ||
|
||
|
||
@pytest.mark.parametrize( | ||
"task_class", | ||
[(NodeClass.COMPUTATIONAL), (NodeClass.INTERACTIVE), (NodeClass.FRONTEND)], | ||
) | ||
async def test_listen_query( | ||
db_notification_queue: asyncio.Queue, | ||
db_connection: SAConnection, | ||
task: Dict, | ||
): | ||
"""this tests how the postgres LISTEN query and in particular the aiopg implementation of it works""" | ||
# let's test the trigger | ||
updated_output = {"some new stuff": "it is new"} | ||
await _update_comp_task_with( | ||
db_connection, task, outputs=updated_output, state=StateType.ABORTED | ||
) | ||
tasks = await _assert_notification_queue_status(db_notification_queue, 1) | ||
assert tasks[0]["changes"] == ["outputs", "state"] | ||
assert ( | ||
tasks[0]["data"]["outputs"] == updated_output | ||
), f"the data received from the database is {tasks[0]}, expected new output is {updated_output}" | ||
|
||
# setting the exact same data twice triggers only ONCE | ||
updated_output = {"some new stuff": "it is newer"} | ||
await _update_comp_task_with(db_connection, task, outputs=updated_output) | ||
await _update_comp_task_with(db_connection, task, outputs=updated_output) | ||
tasks = await _assert_notification_queue_status(db_notification_queue, 1) | ||
assert tasks[0]["changes"] == ["outputs"] | ||
assert ( | ||
tasks[0]["data"]["outputs"] == updated_output | ||
), f"the data received from the database is {tasks[0]}, expected new output is {updated_output}" | ||
|
||
# updating a number of times with different stuff comes out in FIFO order | ||
NUM_CALLS = 20 | ||
update_outputs = [] | ||
for n in range(NUM_CALLS): | ||
new_output = {"some new stuff": f"a {n} time"} | ||
await _update_comp_task_with(db_connection, task, outputs=new_output) | ||
update_outputs.append(new_output) | ||
|
||
tasks = await _assert_notification_queue_status(db_notification_queue, NUM_CALLS) | ||
|
||
for n, output in enumerate(update_outputs): | ||
assert tasks[n]["changes"] == ["outputs"] | ||
assert ( | ||
tasks[n]["data"]["outputs"] == output | ||
), f"the data received from the database is {tasks[n]}, expected new output is {output}" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
38 changes: 38 additions & 0 deletions
38
services/director-v2/src/simcore_service_director_v2/api/dependencies/dynamic_services.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
import logging | ||
|
||
from fastapi import Depends, Request | ||
from models_library.projects_nodes import NodeID | ||
from starlette.datastructures import URL | ||
|
||
from ...models.schemas.services import RunningServiceDetails | ||
from ...modules.dynamic_services import ServicesClient | ||
from ...utils.logging_utils import log_decorator | ||
from .director_v0 import DirectorV0Client, get_director_v0_client | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
@log_decorator(logger=logger) | ||
async def get_service_base_url( | ||
node_uuid: NodeID, | ||
director_v0_client: DirectorV0Client = Depends(get_director_v0_client), | ||
) -> URL: | ||
|
||
# get the service details | ||
service_details: RunningServiceDetails = ( | ||
await director_v0_client.get_running_service_details(node_uuid) | ||
) | ||
# compute service url | ||
service_url = URL( | ||
f"http://{service_details.service_host}:{service_details.service_port}{service_details.service_basepath}" | ||
) | ||
return service_url | ||
|
||
|
||
@log_decorator(logger=logger) | ||
def get_services_client( | ||
request: Request, | ||
) -> ServicesClient: | ||
|
||
client = ServicesClient.instance(request.app) | ||
return client |
Oops, something went wrong.