Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Fix file-picker downstream service notification issues #3058

Merged
merged 36 commits into from
May 25, 2022
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
8f005dc
fix example
May 20, 2022
4c307fa
dynamic-sidecar now purges files when downloadin incming data and fix…
May 20, 2022
d6145bc
fixed side effect due to multiple type of storage supporte
May 20, 2022
8d629ad
Merge remote-tracking branch 'upstream/master' into fix-file-picker-l…
May 20, 2022
925b868
file-picker events are now sent when files change
May 20, 2022
4d69aca
Merge remote-tracking branch 'upstream/master' into fix-file-picker-l…
May 20, 2022
e91cd8c
codestyle
May 20, 2022
ce92d38
reduce deep nesting
May 20, 2022
2199ffa
refactor reducing complexity
May 20, 2022
7452360
fixed broken test
May 20, 2022
409498f
fixed typing
May 20, 2022
fe4a47e
Merge remote-tracking branch 'upstream/master' into fix-file-picker-l…
May 20, 2022
6489e74
Merge remote-tracking branch 'upstream/master' into fix-file-picker-l…
May 23, 2022
b233621
refactor signatures
May 23, 2022
89bb6b2
fixed typing
May 23, 2022
58b00c7
remove old typing style
May 23, 2022
1d43f53
refactor typing and naming
May 23, 2022
6a72749
moved get_node_outputs_changes and typing changes
May 23, 2022
a257419
refactor moved find_changed_dict_keys
May 23, 2022
b9c86db
moved tests
May 23, 2022
5e9c03d
renaming fucntions to make it more obvious
May 23, 2022
ee30e3d
replacing old style typing
May 23, 2022
7e35f35
fixed broken
May 23, 2022
8fd5553
Merge remote-tracking branch 'upstream/master' into fix-file-picker-l…
May 24, 2022
f7f88e9
using simpler patching strategy
May 24, 2022
fbeded1
repalced base types
May 24, 2022
10ca0e7
fixed test styling
May 24, 2022
d05fe3c
adds fixme and issue notes
May 24, 2022
f4fd642
fixed function, added more test cases
May 24, 2022
e73c572
removed frotnend keys argument
May 24, 2022
8668697
reduces dependency with projects_utils (#32)
pcrespov May 24, 2022
1b7721a
refactor removing cyclic dependency
May 24, 2022
899d3f0
Merge remote-tracking branch 'upstream/master' into fix-file-picker-l…
May 25, 2022
72ac601
moved test module
May 25, 2022
b85becb
refactor typing
May 25, 2022
a0e1311
refactor function name
May 25, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions scripts/pydeps.bash
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ Run()
# Examples:
# - SEE https://pydeps.readthedocs.io/en/latest/#usage
#
# pydeps.bash services/web/server/src/simcore_service_webserver --cluster
# pydeps.bash services/web/server/src/simcore_service_webserver --only "simcore_service_webserver.projects" --cluster
# ./scripts/pydeps.bash services/web/server/src/simcore_service_webserver --cluster
# ./scripts/pydeps.bash services/web/server/src/simcore_service_webserver --only "simcore_service_webserver.projects" --cluster
#
#

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ async def pull_input_ports(
PortTypeName.INPUTS, mounted_volumes.disk_inputs_path, port_keys=port_keys
)
await send_message(rabbitmq, "Finished pulling inputs")
return transferred_bytes
return int(transferred_bytes)


@containers_router.patch(
Expand Down Expand Up @@ -187,7 +187,7 @@ async def pull_output_ports(
PortTypeName.OUTPUTS, mounted_volumes.disk_outputs_path, port_keys=port_keys
)
await send_message(rabbitmq, "Finished pulling output")
return transferred_bytes
return int(transferred_bytes)


@containers_router.post(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import time
from collections import deque
from pathlib import Path
from typing import Coroutine, Deque, Dict, List, Optional, Set, Tuple, cast
from typing import Any, Coroutine, Deque, Dict, List, Optional, Set, Tuple, cast

import magic
from pydantic import ByteSize
Expand Down Expand Up @@ -170,6 +170,76 @@ async def _get_data_from_port(port: Port) -> Tuple[Port, ItemConcreteValue]:
return (port, ret)


async def _download_files(
target_path: Path, download_tasks: Deque[Coroutine[Any, int, Any]]
) -> Tuple[dict[str, Any], ByteSize]:
Copy link
Member

@sanderegg sanderegg May 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Could you create a TypeDict at least, to know what is returned?
  2. check if you can use deque instead of Deque
  3. you can use tuple instead of Tuple
  4. if you bother on butting a deque here, why then not just pass a Sequence type? I mean the best and most secure/fast way is to pass a tuple because then you also pass the fact that the fct will not modify the argument, plus tuple is faster than deque... also I doubt a bit that going from list to deque brings so much of a difference... especially when downloading GBs...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. I've found out we already have something similar defined OutputsDict. Will be using that.
  2. Sorry I don't understand the suggestion.
  3. 👍
  4. There is no life changing improvement. The deque is just a better suited for appending data than a list. The change from list to deuque brings no real life benefit. I do not agree on the use of tuples, since they are immutable ad when constructing the sequence you need to append data to it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. deque is usable for typing

transferred_bytes = 0
data: dict[str, Any] = {}

if not download_tasks:
return data, ByteSize(transferred_bytes)

# TODO: limit concurrency to avoid saturating storage+db??
results: List[Tuple[Port, ItemConcreteValue]] = cast(
List[Tuple[Port, ItemConcreteValue]], await logged_gather(*download_tasks)
)
logger.info("completed download %s", results)
for port, value in results:

data[port.key] = {"key": port.key, "value": value}

if _FILE_TYPE_PREFIX in port.property_type:

# if there are files, move them to the final destination
downloaded_file: Optional[Path] = cast(Optional[Path], value)
dest_path: Path = target_path / port.key

if not downloaded_file or not downloaded_file.exists():
# the link may be empty
# remove files all files from disk when disconnecting port
logger.info("removing contents of dir %s", dest_path)
await remove_directory(
dest_path, only_children=True, ignore_errors=True
)
continue

transferred_bytes = transferred_bytes + downloaded_file.stat().st_size

# in case of valid file, it is either uncompressed and/or moved to the final directory
logger.info("creating directory %s", dest_path)
dest_path.mkdir(exist_ok=True, parents=True)
data[port.key] = {"key": port.key, "value": str(dest_path)}

dest_folder = PrunableFolder(dest_path)

if _is_zip_file(downloaded_file):
# unzip updated data to dest_path
logger.info("unzipping %s", downloaded_file)
unarchived: Set[Path] = await unarchive_dir(
archive_to_extract=downloaded_file, destination_folder=dest_path
)

dest_folder.prune(exclude=unarchived)

logger.info("all unzipped in %s", dest_path)
else:
logger.info("moving %s", downloaded_file)
dest_path = dest_path / Path(downloaded_file).name
await async_on_threadpool(
# pylint: disable=cell-var-from-loop
lambda: shutil.move(str(downloaded_file), dest_path)
)

dest_folder.prune(exclude={dest_path})

logger.info("all moved to %s", dest_path)
else:
transferred_bytes = transferred_bytes + sys.getsizeof(value)

return data, ByteSize(transferred_bytes)


@run_sequentially_in_context()
async def download_target_ports(
port_type_name: PortTypeName, target_path: Path, port_keys: List[str]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use list[str]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will decide that if I remove Dict or List from a function in a module from now on, I will also remove it from the entire module.
@pcrespov could we agree on this?

) -> ByteSize:
Expand All @@ -183,10 +253,9 @@ async def download_target_ports(
node_uuid=str(settings.DY_SIDECAR_NODE_ID),
r_clone_settings=settings.DY_SIDECAR_R_CLONE_SETTINGS,
)
data = {}

# let's gather all the data
download_tasks = []
download_tasks: Deque[Coroutine[Any, int, Any]] = deque()
for port_value in (await getattr(PORTS, port_type_name.value)).values():
# if port_keys contains some keys only download them
logger.info("Checking node %s", port_value.key)
Expand All @@ -196,61 +265,7 @@ async def download_target_ports(
download_tasks.append(_get_data_from_port(port_value))
logger.info("retrieving %s data", len(download_tasks))

transfer_bytes = 0
if download_tasks:
# TODO: limit concurrency to avoid saturating storage+db??
results: List[Tuple[Port, ItemConcreteValue]] = cast(
List[Tuple[Port, ItemConcreteValue]], await logged_gather(*download_tasks)
)
logger.info("completed download %s", results)
for port, value in results:

data[port.key] = {"key": port.key, "value": value}

if _FILE_TYPE_PREFIX in port.property_type:

# if there are files, move them to the final destination
downloaded_file: Optional[Path] = cast(Optional[Path], value)
dest_path: Path = target_path / port.key

if not downloaded_file or not downloaded_file.exists():
# the link may be empty
# remove files all files from disk when disconnecting port
await remove_directory(
dest_path, only_children=True, ignore_errors=True
)
continue

transfer_bytes = transfer_bytes + downloaded_file.stat().st_size

# in case of valid file, it is either uncompressed and/or moved to the final directory
logger.info("creating directory %s", dest_path)
dest_path.mkdir(exist_ok=True, parents=True)
data[port.key] = {"key": port.key, "value": str(dest_path)}

if _is_zip_file(downloaded_file):

dest_folder = PrunableFolder(dest_path)

# unzip updated data to dest_path
logger.info("unzipping %s", downloaded_file)
unarchived: Set[Path] = await unarchive_dir(
archive_to_extract=downloaded_file, destination_folder=dest_path
)

dest_folder.prune(exclude=unarchived)

logger.info("all unzipped in %s", dest_path)
else:
logger.info("moving %s", downloaded_file)
dest_path = dest_path / Path(downloaded_file).name
await async_on_threadpool(
# pylint: disable=cell-var-from-loop
lambda: shutil.move(str(downloaded_file), dest_path)
)
logger.info("all moved to %s", dest_path)
else:
transfer_bytes = transfer_bytes + sys.getsizeof(value)
data, transferred_bytes = await _download_files(target_path, download_tasks)

# create/update the json file with the new values
if data:
Expand All @@ -261,15 +276,13 @@ async def download_target_ports(
data = {**current_data, **data}
data_file.write_text(json.dumps(data))

transferred = ByteSize(transfer_bytes)
elapsed_time = time.perf_counter() - start_time
logger.info(
"Downloaded %s in %s seconds",
transferred.human_readable(decimal=True),
transferred_bytes.human_readable(decimal=True),
elapsed_time,
)

return transferred
return transferred_bytes


__all__ = ["dispatch_update_for_directory", "download_target_ports"]
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@
from typing import Any, Dict, List, Mapping, Optional

from aiohttp import ClientSession, ClientTimeout, web
from pydantic import parse_obj_as
from aiohttp.client_exceptions import (
ClientConnectionError,
ClientResponseError,
InvalidURL,
)
from models_library.services_resources import ServiceResourcesDict
from pydantic import parse_obj_as
from servicelib.aiohttp.client_session import get_client_session
from servicelib.aiohttp.rest_responses import wrap_as_envelope
from servicelib.json_serialization import json_dumps
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@
from pydantic.types import PositiveInt
from servicelib.aiohttp.application_keys import APP_DB_ENGINE_KEY
from servicelib.logging_utils import log_decorator
from servicelib.utils import logged_gather
from simcore_postgres_database.webserver_models import DB_CHANNEL_NAME, projects
from sqlalchemy.sql import select

from .computation_utils import convert_state_from_db
from .projects import projects_api, projects_exceptions
from .projects.projects_utils import project_get_depending_nodes
from .projects.projects_nodes_utils import update_node_outputs

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -54,43 +53,6 @@ async def _update_project_state(
await projects_api.notify_project_state_update(app, project)


@log_decorator(logger=log)
async def _update_project_outputs(
app: web.Application,
user_id: PositiveInt,
project_uuid: str,
node_uuid: str,
outputs: Dict,
run_hash: Optional[str],
node_errors: Optional[List[ErrorDict]],
) -> None:
# the new outputs might be {}, or {key_name: payload}
project, changed_keys = await projects_api.update_project_node_outputs(
app,
user_id,
project_uuid,
node_uuid,
new_outputs=outputs,
new_run_hash=run_hash,
)

await projects_api.notify_project_node_update(
app, project, f"{node_uuid}", errors=node_errors
)
# get depending node and notify for these ones as well
depending_node_uuids = await project_get_depending_nodes(project, f"{node_uuid}")
await logged_gather(
*[
projects_api.notify_project_node_update(app, project, nid, errors=None)
for nid in depending_node_uuids
]
)
# notify
await projects_api.post_trigger_connected_service_retrieve(
app=app, project=project, updated_node_uuid=node_uuid, changed_keys=changed_keys
)


async def listen(app: web.Application, db_engine: Engine):
listen_query = f"LISTEN {DB_CHANNEL_NAME};"
_LISTENING_TASK_BASE_SLEEPING_TIME_S = 1
Expand Down Expand Up @@ -138,14 +100,15 @@ async def listen(app: web.Application, db_engine: Engine):
if any(f in task_changes for f in ["outputs", "run_hash"]):
new_outputs = task_data.get("outputs", {})
new_run_hash = task_data.get("run_hash", None)
await _update_project_outputs(
await update_node_outputs(
app,
the_project_owner,
project_uuid,
node_uuid,
new_outputs,
new_run_hash,
node_errors=task_data.get("errors", None),
ui_changed_keys=None,
)

if "state" in task_changes:
Expand Down
Loading