Skip to content

Commit

Permalink
Bugfix: updating the project database atomically (#2174)
Browse files Browse the repository at this point in the history
* sequentialy update the database
* patch the workbench instead of replacing
* name sidecar workers in a human readable way
* improve coverage on conversion to schema names
  • Loading branch information
sanderegg authored Mar 15, 2021
1 parent d53b9a8 commit 21c9504
Show file tree
Hide file tree
Showing 26 changed files with 1,128 additions and 424 deletions.
16 changes: 9 additions & 7 deletions packages/service-library/src/servicelib/async_utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import asyncio
from functools import wraps
from collections import deque
from functools import wraps
from typing import Dict, List

import attr
from typing import List, Dict


@attr.s(auto_attribs=True)
Expand All @@ -12,6 +13,9 @@ class Context:
initialized: bool


sequential_jobs_contexts = {}


def run_sequentially_in_context(target_args: List[str] = None):
"""All request to function with same calling context will be run sequentially.
Expand Down Expand Up @@ -49,8 +53,6 @@ async def func(param1, param2, param3):
target_args = [] if target_args is None else target_args

def internal(decorated_function):
contexts = {}

def get_context(args, kwargs: Dict) -> Context:
arg_names = decorated_function.__code__.co_varnames[
: decorated_function.__code__.co_argcount
Expand All @@ -70,14 +72,14 @@ def get_context(args, kwargs: Dict) -> Context:

key = ":".join(map(str, key_parts))

if key not in contexts:
contexts[key] = Context(
if key not in sequential_jobs_contexts:
sequential_jobs_contexts[key] = Context(
in_queue=asyncio.Queue(),
out_queue=asyncio.Queue(),
initialized=False,
)

return contexts[key]
return sequential_jobs_contexts[key]

@wraps(decorated_function)
async def wrapper(*args, **kwargs):
Expand Down
12 changes: 10 additions & 2 deletions packages/service-library/tests/test_async_utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# pylint: disable=redefined-outer-name

import asyncio
import copy
import random
Expand All @@ -6,8 +8,14 @@
from typing import Any, Dict, List

import pytest
from servicelib.async_utils import run_sequentially_in_context, sequential_jobs_contexts


from servicelib.async_utils import run_sequentially_in_context
@pytest.fixture(autouse=True)
def ensure_run_in_sequence_context_is_empty():
# NOTE: since the contexts variable is initialized at import time, when several test run
# the import happens only once and is rendered invalid, therefore explicit clearance is necessary
sequential_jobs_contexts.clear()


class LockedStore:
Expand Down Expand Up @@ -147,4 +155,4 @@ async def sleep_for(sleep_interval: float, control: Any) -> Any:

minimum_timelapse = (sleep_duration) * len(control_sequence)
assert elapsed > minimum_timelapse
assert control_sequence == result
assert control_sequence == result
2 changes: 2 additions & 0 deletions services/docker-compose.devel.yml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ services:
- REGISTRY_USER=${REGISTRY_USER}
- REGISTRY_PW=${REGISTRY_PW}
- SWARM_STACK_NAME=${SWARM_STACK_NAME:-simcore}
- SIDECAR_HOST_HOSTNAME_PATH=${SIDECAR_HOST_HOSTNAME_PATH:-/home/scu/hostname}
networks:
- computational_services_subnet

Expand Down Expand Up @@ -177,6 +178,7 @@ services:
- REGISTRY_USER=${REGISTRY_USER}
- REGISTRY_PW=${REGISTRY_PW}
- SWARM_STACK_NAME=${SWARM_STACK_NAME:-simcore}
- SIDECAR_HOST_HOSTNAME_PATH=${SIDECAR_HOST_HOSTNAME_PATH:-/home/scu/hostname}
- TARGET_MPI_NODE_CPU_COUNT=${DEV_PC_CPU_COUNT:-0} # development computer CPU count, if env var is missing put to 0 to disable
networks:
- computational_services_subnet
Expand Down
1 change: 1 addition & 0 deletions services/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ services:
- REGISTRY_PW=${REGISTRY_PW}
- SWARM_STACK_NAME=${SWARM_STACK_NAME:-simcore}
- SIDECAR_LOGLEVEL=${LOG_LEVEL:-WARNING}
- SIDECAR_HOST_HOSTNAME_PATH=${SIDECAR_HOST_HOSTNAME_PATH:-/home/scu/hostname}
- START_AS_MODE_CPU=${SIDECAR_FORCE_CPU_NODE:-0}

networks:
Expand Down
4 changes: 2 additions & 2 deletions services/sidecar/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ HEALTHCHECK --interval=60s \
--timeout=60s \
--start-period=30s \
--retries=3 \
CMD celery inspect ping -b amqp://${RABBIT_USER}:${RABBIT_PASSWORD}@${RABBIT_HOST}:${RABBIT_PORT} -d celery@${HOSTNAME}
CMD celery inspect ping -b amqp://${RABBIT_USER}:${RABBIT_PASSWORD}@${RABBIT_HOST}:${RABBIT_PORT} -d sidecar."$(cat ${SIDECAR_HOST_HOSTNAME_PATH})"@${HOSTNAME}

ENTRYPOINT [ "/bin/sh", "services/sidecar/docker/entrypoint.sh" ]
CMD ["/bin/sh", "services/sidecar/docker/boot.sh"]
Expand All @@ -145,7 +145,7 @@ HEALTHCHECK --interval=60s \
--timeout=60s \
--start-period=120s \
--retries=3 \
CMD celery inspect ping -b amqp://${RABBIT_USER}:${RABBIT_PASSWORD}@${RABBIT_HOST}:${RABBIT_PORT} -d celery@${HOSTNAME}
CMD celery inspect ping -b amqp://${RABBIT_USER}:${RABBIT_PASSWORD}@${RABBIT_HOST}:${RABBIT_PORT} -d sidecar."$(cat ${SIDECAR_HOST_HOSTNAME_PATH})"@${HOSTNAME}

ENTRYPOINT [ "/bin/sh", "services/sidecar/docker/entrypoint.sh" ]
CMD ["/bin/sh", "services/sidecar/docker/boot.sh"]
6 changes: 4 additions & 2 deletions services/sidecar/docker/boot.sh
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,13 @@ if [ "${SC_BOOT_MODE}" = "debug-ptvsd" ]; then
--app simcore_service_sidecar.celery:app \
--concurrency ${CONCURRENCY} \
--loglevel="${SIDECAR_LOGLEVEL-WARNING}" \
--pool=${POOL}
--pool=${POOL} \
--hostname=sidecar."$(cat ${SIDECAR_HOST_HOSTNAME_PATH})"@%h
else
exec celery worker \
--app simcore_service_sidecar.celery:app \
--concurrency ${CONCURRENCY} \
--loglevel="${SIDECAR_LOGLEVEL-WARNING}" \
--pool=${POOL}
--pool=${POOL} \
--hostname=sidecar."$(cat ${SIDECAR_HOST_HOSTNAME_PATH})"@%h
fi
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ def _shared_task_dispatch(
return
raise

# this needs to be done here since the tasks are created recursively and the state might not be upgraded yet
log.info("Sidecar successfuly completed run.")
if not celery_request.is_aborted():
# the task may be aborted already...
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ async def listen(app: web.Application):
# find the user(s) linked to that project
the_project_owner = await _get_project_owner(conn, project_uuid)

if "outputs" in task_changes:
if any(f in task_changes for f in ["outputs", "run_hash"]):
new_outputs = task_data.get("outputs", {})
new_run_hash = task_data.get("run_hash", None)
await _update_project_outputs(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
from ..users_api import get_user_name
from .config import CONFIG_SECTION_NAME
from .projects_db import APP_PROJECT_DBAPI
from .projects_exceptions import NodeNotFoundError
from .projects_utils import clone_project_document

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -297,25 +296,24 @@ async def update_project_node_state(
project_id,
user_id,
)
project = await get_project_for_user(app, project_id, user_id)
if not node_id in project["workbench"]:
raise NodeNotFoundError(project_id, node_id)
if project["workbench"][node_id].get("state", {}).get("currentStatus") == new_state:
# nothing to do here
return project
project["workbench"][node_id].setdefault("state", {}).update(
{"currentStatus": new_state}
)
partial_workbench_data = {
node_id: {"state": {"currentStatus": new_state}},
}
if RunningState(new_state) in [
RunningState.PUBLISHED,
RunningState.PENDING,
RunningState.STARTED,
]:
project["workbench"][node_id]["progress"] = 0
partial_workbench_data[node_id]["progress"] = 0
elif RunningState(new_state) in [RunningState.SUCCESS, RunningState.FAILED]:
project["workbench"][node_id]["progress"] = 100
partial_workbench_data[node_id]["progress"] = 100

db = app[APP_PROJECT_DBAPI]
updated_project = await db.update_user_project(project, user_id, project_id)
updated_project, _ = await db.patch_user_project_workbench(
partial_workbench_data=partial_workbench_data,
user_id=user_id,
project_uuid=project_id,
)
updated_project = await add_project_states_for_user(
user_id=user_id, project=updated_project, is_template=False, app=app
)
Expand All @@ -332,13 +330,15 @@ async def update_project_node_progress(
user_id,
progress,
)
project = await get_project_for_user(app, project_id, user_id)
if not node_id in project["workbench"]:
raise NodeNotFoundError(project_id, node_id)

project["workbench"][node_id]["progress"] = int(100.0 * float(progress) + 0.5)
partial_workbench_data = {
node_id: {"progress": int(100.0 * float(progress) + 0.5)},
}
db = app[APP_PROJECT_DBAPI]
updated_project = await db.update_user_project(project, user_id, project_id)
updated_project, _ = await db.patch_user_project_workbench(
partial_workbench_data=partial_workbench_data,
user_id=user_id,
project_uuid=project_id,
)
updated_project = await add_project_states_for_user(
user_id=user_id, project=updated_project, is_template=False, app=app
)
Expand All @@ -365,29 +365,29 @@ async def update_project_node_outputs(
new_run_hash,
)
new_outputs: Dict[str, Any] = new_outputs or {}
project = await get_project_for_user(app, project_id, user_id)

if not node_id in project["workbench"]:
raise NodeNotFoundError(project_id, node_id)

# NOTE: update outputs (not required) if necessary as the UI expects a
# dataset/label field that is missing
current_outputs = project["workbench"][node_id].setdefault("outputs", {})
project["workbench"][node_id]["outputs"] = new_outputs
project["workbench"][node_id]["runHash"] = new_run_hash

# find changed keys (the ones that appear or disapppear for sure)
changed_keys = list(current_outputs.keys() ^ new_outputs.keys())
# now check the ones that are in both object
for key in current_outputs.keys() & new_outputs.keys():
if current_outputs[key] != new_outputs[key]:
changed_keys.append(key)
partial_workbench_data = {
node_id: {"outputs": new_outputs, "runHash": new_run_hash},
}

db = app[APP_PROJECT_DBAPI]
updated_project = await db.update_user_project(project, user_id, project_id)
updated_project, changed_entries = await db.patch_user_project_workbench(
partial_workbench_data=partial_workbench_data,
user_id=user_id,
project_uuid=project_id,
)
log.debug(
"patched project %s, following entries changed: %s",
project_id,
pformat(changed_entries),
)
updated_project = await add_project_states_for_user(
user_id=user_id, project=updated_project, is_template=False, app=app
)

# changed entries come in the form of {node_uuid: {outputs: {changed_key1: value1, changed_key2: value2}}}
# we do want only the key names
changed_keys = changed_entries.get(node_id, {}).get("outputs", {}).keys()
return updated_project, changed_keys


Expand Down
Loading

0 comments on commit 21c9504

Please sign in to comment.