Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Store computational services progress in comp_tasks table, decouple webserver from api-server 🗃️ #4197

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
71 commits
Select commit Hold shift + click to select a range
a4d2336
add progress to comp_tasks table
sanderegg May 5, 2023
eedaaa1
add progress to director
sanderegg May 5, 2023
4999e96
use a float type
sanderegg May 8, 2023
de01b8d
increment example
sanderegg May 8, 2023
1298c3d
typing and add function to update progress if needed
sanderegg May 8, 2023
5b50800
typing
sanderegg May 9, 2023
fe63f8a
use factory
sanderegg May 9, 2023
ad40225
refactor
sanderegg May 9, 2023
19ca083
more refactoring
sanderegg May 9, 2023
de8da5b
add progress to NodeState
sanderegg May 9, 2023
b99189f
now we pass the progress back
sanderegg May 9, 2023
4410da0
ensure progress is reset when published
sanderegg May 9, 2023
7953b79
ongoing
sanderegg May 10, 2023
34e4472
waiting for a test
sanderegg May 10, 2023
7a89804
fix examples
sanderegg May 10, 2023
f5e7e15
missing progress in pipeline details mock
sanderegg May 10, 2023
caf17d1
revert
sanderegg May 11, 2023
636a069
fix will come from @GitHK
sanderegg May 11, 2023
cf1d64c
make progress optional
sanderegg May 11, 2023
0fb1c5e
revert
sanderegg May 11, 2023
0604a6f
revert
sanderegg May 11, 2023
3d088b1
refactor
sanderegg May 11, 2023
fa2248c
sqlalchemy 2.0
sanderegg May 11, 2023
303ae87
refactor
sanderegg May 11, 2023
0e1fbf6
fix typing
sanderegg May 11, 2023
831389f
types
sanderegg May 11, 2023
60c8e61
set progress to 1 when aborting
sanderegg May 11, 2023
3b6b648
refactor testing
sanderegg May 11, 2023
192da72
improve
sanderegg May 11, 2023
1aa7ed2
clean
sanderegg May 11, 2023
0500297
cleanup
sanderegg May 11, 2023
ded4af6
cleanup
sanderegg May 11, 2023
c406597
adding a test for testing progress handler
sanderegg May 11, 2023
7ebc4f7
try to refactor
sanderegg May 11, 2023
69cad2c
more refactoring
sanderegg May 11, 2023
57f44a5
prevent crashes
sanderegg May 11, 2023
3b5cd05
ensure progress is rightfully transmitted and set in the database
sanderegg May 11, 2023
75150f2
notes
sanderegg May 11, 2023
0231cd4
fix sqlalchemy 2.0 change in code
sanderegg May 11, 2023
3a8f5ce
obvious fix
sanderegg May 11, 2023
7b69c92
another obvious fix
sanderegg May 11, 2023
60ec861
ongoing fix of test
sanderegg May 12, 2023
ae02664
fix double restart of migration service
sanderegg May 12, 2023
7438685
improve error
sanderegg May 12, 2023
bbb7a62
add anote
sanderegg May 12, 2023
64969fd
fixes show of asserts
sanderegg May 12, 2023
44bd2b9
wait less
sanderegg May 12, 2023
0794ce1
ongoing
sanderegg May 12, 2023
ec1f073
properly wait for catalog to be ready
sanderegg May 12, 2023
09de27a
partial run almost fixed
sanderegg May 12, 2023
3cb8060
types
sanderegg May 16, 2023
d9913f5
run_partial_computation fixed
sanderegg May 16, 2023
dbdd461
sql 2.0
sanderegg May 16, 2023
4b0fd45
fixes tests
sanderegg May 16, 2023
a7dbd7e
pylance
sanderegg May 16, 2023
bbe0ae9
mypy
sanderegg May 16, 2023
9b01cc0
pylance
sanderegg May 16, 2023
de29c4f
correct mocks
sanderegg May 16, 2023
5a16b7e
pylance
sanderegg May 16, 2023
e981e31
test fixed
sanderegg May 16, 2023
fc76423
sql 2.0
sanderegg May 16, 2023
585225b
sql 2.0
sanderegg May 16, 2023
9ee66fa
flake
sanderegg May 16, 2023
2cb2e98
convert float to percentage
sanderegg May 16, 2023
b2d5f24
no need to keep the progress anymore
sanderegg May 16, 2023
bb7e694
revert
sanderegg May 16, 2023
6e0076b
add a note
sanderegg May 16, 2023
cacd77d
revert
sanderegg May 16, 2023
a2907c4
@pcrespov review: be careful with tab and spaces
sanderegg May 16, 2023
0abf5dd
mypy
sanderegg May 17, 2023
07b4d8d
mypy
sanderegg May 17, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import TypeAlias, Union

from distributed.worker import get_worker
from pydantic import BaseModel, Extra, NonNegativeFloat
from pydantic import BaseModel, Extra, validator


class BaseTaskEvent(BaseModel, ABC):
Expand All @@ -20,7 +20,7 @@ class Config:


class TaskProgressEvent(BaseTaskEvent):
progress: NonNegativeFloat
progress: float

@staticmethod
def topic_name() -> str:
Expand All @@ -44,6 +44,13 @@ class Config(BaseTaskEvent.Config):
]
}

@validator("progress", always=True)
@classmethod
def ensure_between_0_1(cls, v):
if 0 <= v <= 1:
return v
return min(max(0, v), 1)


LogMessageStr: TypeAlias = str
LogLevelInt: TypeAlias = int
Expand Down
12 changes: 11 additions & 1 deletion packages/models-library/src/models_library/projects_nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ class NodeState(BaseModel):
description="the node's current state",
alias="currentStatus",
)
progress: float | None = Field(
default=0,
ge=0.0,
le=1.0,
description="current progress of the task if available (None if not started or not a computational task)",
)

class Config:
extra = Extra.forbid
Expand Down Expand Up @@ -133,7 +139,11 @@ class Node(BaseModel):
..., description="The short name of the node", examples=["JupyterLab"]
)
progress: float | None = Field(
default=None, ge=0, le=100, description="the node progress value"
default=None,
ge=0,
le=100,
description="the node progress value",
deprecated=True,
)
thumbnail: HttpUrlWithCustomMinLength | None = Field(
default=None,
Expand Down
25 changes: 17 additions & 8 deletions packages/models-library/src/models_library/projects_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from typing import Dict, List, Optional
from uuid import UUID

from pydantic import BaseModel, Field, PositiveInt
Expand All @@ -9,11 +8,17 @@


class PipelineDetails(BaseModel):
adjacency_list: Dict[NodeID, List[NodeID]] = Field(
adjacency_list: dict[NodeID, list[NodeID]] = Field(
...,
description="The adjacency list of the current pipeline in terms of {NodeID: [successor NodeID]}",
)
node_states: Dict[NodeID, NodeState] = Field(
progress: float | None = Field(
...,
ge=0,
le=1.0,
description="the progress of the pipeline (None if there are no computational tasks)",
)
node_states: dict[NodeID, NodeState] = Field(
..., description="The states of each of the computational nodes in the pipeline"
)

Expand All @@ -24,17 +29,15 @@ class PipelineDetails(BaseModel):
class ComputationTask(BaseModel):
id: TaskID = Field(..., description="the id of the computation task")
state: RunningState = Field(..., description="the state of the computational task")
result: Optional[str] = Field(
None, description="the result of the computational task"
)
result: str | None = Field(None, description="the result of the computational task")
pipeline_details: PipelineDetails = Field(
..., description="the details of the generated pipeline"
)
iteration: Optional[PositiveInt] = Field(
iteration: PositiveInt | None = Field(
...,
description="the iteration id of the computation task (none if no task ran yet)",
)
cluster_id: Optional[ClusterID] = Field(
cluster_id: ClusterID | None = Field(
...,
description="the cluster on which the computaional task runs/ran (none if no task ran yet)",
)
Expand All @@ -56,14 +59,17 @@ class Config:
"2fb4808a-e403-4a46-b52c-892560d27862": {
"modified": True,
"dependencies": [],
"progress": 0.0,
},
"19a40c7b-0a40-458a-92df-c77a5df7c886": {
"modified": False,
"dependencies": [
"2fb4808a-e403-4a46-b52c-892560d27862"
],
"progress": 0.0,
},
},
"progress": 0.0,
},
"iteration": None,
"cluster_id": None,
Expand All @@ -82,14 +88,17 @@ class Config:
"2fb4808a-e403-4a46-b52c-892560d27862": {
"modified": False,
"dependencies": [],
"progress": 1.0,
},
"19a40c7b-0a40-458a-92df-c77a5df7c886": {
"modified": False,
"dependencies": [
"2fb4808a-e403-4a46-b52c-892560d27862"
],
"progress": 1.0,
},
},
"progress": 1.0,
},
"iteration": 2,
"cluster_id": 0,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""add progress to comp_tasks

Revision ID: 0c084cb1091c
Revises: 432aa859098b
Create Date: 2023-05-05 08:00:18.951040+00:00

"""
import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "0c084cb1091c"
down_revision = "432aa859098b"
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"comp_tasks",
sa.Column("progress", sa.Numeric(precision=3, scale=2), nullable=True),
)
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column("comp_tasks", "progress")
# ### end Alembic commands ###
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class NodeClass(enum.Enum):
sa.Enum(StateType),
nullable=False,
server_default=StateType.NOT_STARTED.value,
doc="Current state in the task lifecicle",
doc="Current state in the task lifecycle",
),
sa.Column(
"errors",
Expand All @@ -65,6 +65,12 @@ class NodeClass(enum.Enum):
doc="List[models_library.errors.ErrorDict] with error information"
" for a failing state, otherwise set to None",
),
sa.Column(
"progress",
sa.Numeric(precision=3, scale=2), # numbers from 0.00 and 1.00
nullable=True,
doc="current progress of the task if available",
),
# utc timestamps for submission/start/end
sa.Column("submit", sa.DateTime, doc="UTC timestamp for task submission"),
sa.Column("start", sa.DateTime, doc="UTC timestamp when task started"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ def creator(**user_kwargs) -> dict[str, Any]:
)
# this is needed to get the primary_gid correctly
result = con.execute(
sa.select([users]).where(users.c.id == user_config["id"])
sa.select(users).where(users.c.id == user_config["id"])
)
user = result.first()
assert user
print(f"--> created {user=}")
created_user_ids.append(user["id"])
return dict(user)
return dict(user._asdict())

yield creator

Expand Down Expand Up @@ -81,7 +81,7 @@ def creator(user: dict[str, Any], **overrides) -> ProjectAtDB:
.returning(sa.literal_column("*"))
)

inserted_project = ProjectAtDB.parse_obj(result.first())
inserted_project = ProjectAtDB.from_orm(result.first())
print(f"--> created {inserted_project=}")
created_project_ids.append(f"{inserted_project.uuid}")
return inserted_project
Expand Down
27 changes: 14 additions & 13 deletions packages/pytest-simcore/src/pytest_simcore/docker_swarm.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def _is_docker_swarm_init(docker_client: docker.client.DockerClient) -> bool:


@retry(
wait=wait_fixed(5),
wait=wait_fixed(1),
stop=stop_after_delay(8 * MINUTE),
before_sleep=before_sleep_log(log, logging.WARNING),
reraise=True,
Expand Down Expand Up @@ -183,25 +183,26 @@ def docker_swarm(
retry=retry_if_exception_type(AssertionError),
stop=stop_after_delay(30),
)
def _wait_for_new_task_to_be_started(service: Any, old_task_ids: set[str]) -> None:
service.reload()
new_task_ids = {t["ID"] for t in service.tasks()}
assert len(new_task_ids.difference(old_task_ids)) == 1
def _wait_for_migration_service_to_be_removed(
docker_client: docker.client.DockerClient,
) -> None:
for service in docker_client.services.list():
if "migration" in service.name: # type: ignore
raise TryAgain


def _force_restart_migration_service(docker_client: docker.client.DockerClient) -> None:
def _force_remove_migration_service(docker_client: docker.client.DockerClient) -> None:
for migration_service in (
service
for service in docker_client.services.list()
if "migration" in service.name
if "migration" in service.name # type: ignore
):
print(
"WARNING: migration service detected before updating stack, it will be force-updated"
"WARNING: migration service detected before updating stack, it will be force-removed now and re-deployed to ensure DB update"
)
before_update_task_ids = {t["ID"] for t in migration_service.tasks()}
migration_service.force_update()
_wait_for_new_task_to_be_started(migration_service, before_update_task_ids)
print(f"forced updated {migration_service.name}.")
migration_service.remove() # type: ignore
_wait_for_migration_service_to_be_removed(docker_client)
print(f"forced updated {migration_service.name}.") # type: ignore


def _deploy_stack(compose_file: Path, stack_name: str) -> None:
Expand Down Expand Up @@ -273,7 +274,7 @@ def docker_stack(

# NOTE: if the migration service was already running prior to this call it must
# be force updated so that it does its job. else it remains and tests will fail
_force_restart_migration_service(docker_client)
_force_remove_migration_service(docker_client)

# make up-version
stacks_deployed: dict[str, dict] = {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@


def create_computation_cb(url, **kwargs) -> CallbackResult:

assert "json" in kwargs, f"missing body in call to {url}"
body = kwargs["json"]
for param in ["user_id", "project_id"]:
Expand Down Expand Up @@ -113,6 +112,7 @@ def create_computation_cb(url, **kwargs) -> CallbackResult:
"pipeline_details": {
"adjacency_list": pipeline,
"node_states": node_states,
"progress": 0,
},
},
)
Expand All @@ -131,6 +131,7 @@ def get_computation_cb(url, **kwargs) -> CallbackResult:
"pipeline_details": {
"adjacency_list": pipeline,
"node_states": node_states,
"progress": 0,
},
"iteration": 2,
"cluster_id": 23,
Expand Down Expand Up @@ -350,7 +351,6 @@ def get_upload_link_cb(url: URL, **kwargs) -> CallbackResult:
scheme = {LinkType.PRESIGNED: "http", LinkType.S3: "s3"}

if file_size := kwargs["params"].get("file_size") is not None:

upload_schema = FileUploadSchema(
chunk_size=parse_obj_as(ByteSize, "5GiB"),
urls=[parse_obj_as(AnyUrl, f"{scheme[link_type]}://{file_id}")],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
import urllib.parse
from asyncio import Task
from datetime import datetime
from typing import Any, Awaitable, Callable, Coroutine, Optional
from typing import Any, Awaitable, Callable, Coroutine

from pydantic import (
BaseModel,
ConstrainedFloat,
Field,
PositiveFloat,
confloat,
validate_arguments,
validator,
)
Expand All @@ -20,7 +20,13 @@
TaskType = Callable[..., Coroutine[Any, Any, Any]]

ProgressMessage = str
ProgressPercent = confloat(ge=0.0, le=1.0)


class ProgressPercent(ConstrainedFloat):
ge = 0.0
le = 1.0


ProgressCallback = Callable[[ProgressMessage, ProgressPercent, TaskId], Awaitable[None]]


Expand All @@ -41,8 +47,8 @@ class TaskProgress(BaseModel):
def update(
self,
*,
message: Optional[ProgressMessage] = None,
percent: Optional[ProgressPercent] = None,
message: ProgressMessage | None = None,
percent: ProgressPercent | None = None,
) -> None:
"""`percent` must be between 0.0 and 1.0 otherwise ValueError is raised"""
if message:
Expand Down Expand Up @@ -77,7 +83,7 @@ class TrackedTask(BaseModel):
)

started: datetime = Field(default_factory=datetime.utcnow)
last_status_check: Optional[datetime] = Field(
last_status_check: datetime | None = Field(
default=None,
description=(
"used to detect when if the task is not actively "
Expand All @@ -96,8 +102,8 @@ class TaskStatus(BaseModel):


class TaskResult(BaseModel):
result: Optional[Any]
error: Optional[Any]
result: Any | None
error: Any | None


class ClientConfiguration(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def _compose_job_resource_name(solver_key, solver_version, job_id) -> str:
)


## JOBS ---------------
# JOBS ---------------
#
# - Similar to docker container's API design (container = job and image = solver)
#
Expand Down Expand Up @@ -224,7 +224,7 @@ async def inspect_job(
job_id: UUID,
user_id: PositiveInt = Depends(get_current_user_id),
director2_api: DirectorV2Api = Depends(get_api_client(DirectorV2Api)),
):
) -> JobStatus:
job_name = _compose_job_resource_name(solver_key, version, job_id)
_logger.debug("Inspecting Job '%s'", job_name)

Expand Down
Loading