Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨Computational backend: connect dv2 to clusters keeper for on-demand clusters (🗃️ + ⚠️devops) #4703

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
83 commits
Select commit Hold shift + click to select a range
9b895c4
no json
sanderegg Sep 11, 2023
00a323d
fix test
sanderegg Sep 11, 2023
6c5bbf2
add boolean in comp_runs
sanderegg Sep 4, 2023
a17aa80
refactoring
sanderegg Sep 4, 2023
45cd4dc
pass pipeline params
sanderegg Sep 4, 2023
b1c74be
typing
sanderegg Sep 4, 2023
9d65028
refactor
sanderegg Sep 4, 2023
71e3705
added RPC client
sanderegg Sep 4, 2023
671c5c1
pass rabbitmq rpc client into scheduler
sanderegg Sep 4, 2023
77ea2d4
moved cluster model into rpc_schemas
sanderegg Sep 4, 2023
48590ba
add temporary variable to simulate default wallet
sanderegg Sep 4, 2023
649baed
moved cluster definition
sanderegg Sep 4, 2023
f72285c
added ad-hoc errors for on demand backend
sanderegg Sep 4, 2023
99d5c46
initial connections to on demand backend
sanderegg Sep 4, 2023
2b0e7a3
keep the timeout for now
sanderegg Sep 4, 2023
6ac329a
remove confusing log
sanderegg Sep 4, 2023
c8961a9
fixed returned url
sanderegg Sep 4, 2023
8b41403
fake cluster creation
sanderegg Sep 4, 2023
c21d736
properly count tasks
sanderegg Sep 4, 2023
8e1c252
moved calls to publish messages
sanderegg Sep 4, 2023
e3279b1
refactor
sanderegg Sep 4, 2023
b6b502f
pass some info
sanderegg Sep 4, 2023
b00a421
fix merge
sanderegg Sep 4, 2023
9986e07
testing body parameters
sanderegg Sep 4, 2023
114b829
refactor
sanderegg Sep 4, 2023
8ecbd5f
rename
sanderegg Sep 4, 2023
8d1f7c6
force usage of on-demand clusters for testing
sanderegg Sep 4, 2023
06c6cee
added ENV variables
sanderegg Sep 4, 2023
0f50722
mypy
sanderegg Sep 4, 2023
eeb3af5
fixed unit tests
sanderegg Sep 4, 2023
1f72299
fix test
sanderegg Sep 4, 2023
90d368e
hack 1
sanderegg Sep 4, 2023
781a721
hack 2
sanderegg Sep 4, 2023
404fa9c
return waiting for resources if the cluster is not ready yet
sanderegg Sep 4, 2023
4f8337c
fix test
sanderegg Sep 4, 2023
cbb7ced
fix
sanderegg Sep 11, 2023
563d79a
fix merge
sanderegg Sep 11, 2023
afa9dda
causes problem as this is already installed by the clusters-keeper in…
sanderegg Sep 11, 2023
7183f2d
better logs
sanderegg Sep 11, 2023
8c0b014
also check for retrials from the dask-gateway
sanderegg Sep 11, 2023
6fac408
ensure the configuration is done correctly
sanderegg Sep 11, 2023
7f3c516
correct merge
sanderegg Sep 11, 2023
cbaa122
fixed last merge
sanderegg Sep 11, 2023
f2aaceb
ruff
sanderegg Sep 11, 2023
37d0e16
ruff
sanderegg Sep 11, 2023
4a6203a
ruff
sanderegg Sep 11, 2023
41cec54
add new cluster type
sanderegg Sep 11, 2023
d58036a
without wallet
sanderegg Sep 11, 2023
a68a99e
remove wallet hack
sanderegg Sep 11, 2023
1849e7b
Revert "add new cluster type"
sanderegg Sep 11, 2023
bd7b482
use the url as unique identifier in the pool
sanderegg Sep 11, 2023
61c8d43
we do not require the ID anymore
sanderegg Sep 11, 2023
8aab819
refactor
sanderegg Sep 11, 2023
259b96e
ruff
sanderegg Sep 11, 2023
245bafd
move out clusters-keeper call
sanderegg Sep 11, 2023
6eb94ad
ruff
sanderegg Sep 11, 2023
1bdc68c
more logs
sanderegg Sep 11, 2023
19293a2
fixed logo
sanderegg Sep 11, 2023
4a0e7fc
reduce noisy libs
sanderegg Sep 11, 2023
2f0571d
improve logs
sanderegg Sep 11, 2023
c18b150
improve logs
sanderegg Sep 11, 2023
b1fac88
added function to wait for workers
sanderegg Sep 11, 2023
4e593e9
wait for workers after maximizing
sanderegg Sep 11, 2023
2c01155
set back to published
sanderegg Sep 11, 2023
e8a90f5
added waiting for cluster state
sanderegg Sep 11, 2023
26a0ff0
remove hack
sanderegg Sep 11, 2023
5a1b314
disable clusters-keeper by default
sanderegg Sep 11, 2023
57dcf09
ruff
sanderegg Sep 11, 2023
dce333f
linter
sanderegg Sep 11, 2023
95fd892
removed probably unnecessary method
sanderegg Sep 11, 2023
7623c5c
reverted
sanderegg Sep 11, 2023
81915a2
fix tests
sanderegg Sep 11, 2023
2a82edf
reverted
sanderegg Sep 11, 2023
13cc941
fix test
sanderegg Sep 11, 2023
0e460e8
fix variables
sanderegg Sep 11, 2023
4e07125
mypy
sanderegg Sep 11, 2023
bc97113
fix tests
sanderegg Sep 11, 2023
4c9e1a5
test fixed
sanderegg Sep 11, 2023
4e0629e
test fix?
sanderegg Sep 11, 2023
6a28e80
clean docker-compose
sanderegg Sep 12, 2023
31038e7
removed temporary variable
sanderegg Sep 12, 2023
5d58d9b
added test for rpc calls
sanderegg Sep 12, 2023
1af9f61
better handling of issues in comp scheduler
sanderegg Sep 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .env-devel
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ CATALOG_DEV_FEATURES_ENABLED=0
CATALOG_SERVICES_DEFAULT_RESOURCES='{"CPU": {"limit": 0.1, "reservation": 0.1}, "RAM": {"limit": 2147483648, "reservation": 2147483648}}'
CATALOG_SERVICES_DEFAULT_SPECIFICATIONS='{}'

CLUSTERS_KEEPER_MAX_MISSED_HEARTBEATS_BEFORE_CLUSTER_TERMINATION=5
CLUSTERS_KEEPER_TASK_INTERVAL=60

DASK_SCHEDULER_HOST=dask-scheduler
DASK_SCHEDULER_PORT=8786

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ class ComputationCreate(BaseModel):
description="the computation shall use the cluster described by its id, 0 is the default cluster",
)
simcore_user_agent: str = ""
use_on_demand_clusters: bool = Field(
default=False,
description="if True, a cluster will be created as necessary (wallet_id cannot be None, and cluster_id must be None)",
)

@validator("product_name", always=True)
@classmethod
Expand All @@ -45,6 +49,14 @@ def ensure_product_name_defined_if_computation_starts(cls, v, values):
raise ValueError(msg)
return v

@validator("use_on_demand_clusters", always=True)
@classmethod
def ensure_expected_options(cls, v, values):
if v is True and ("cluster_id" in values and values["cluster_id"] is not None):
msg = "cluster_id cannot be set if use_on_demand_clusters is set"
raise ValueError(msg)
return v


class ComputationStop(BaseModel):
user_id: UserID
Expand Down
2 changes: 1 addition & 1 deletion packages/models-library/src/models_library/clusters.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class BaseCluster(BaseModel):
type: ClusterTypeInModel
owner: GroupID
thumbnail: HttpUrl | None = Field(
None,
default=None,
description="url to the image describing this cluster",
examples=["https://placeimg.com/171/96/tech/grayscale/?0.jpg"],
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@ class RunningState(str, Enum):
SUCCESS = "SUCCESS"
FAILED = "FAILED"
ABORTED = "ABORTED"
WAITING_FOR_CLUSTER = "WAITING_FOR_CLUSTER"

def is_running(self) -> bool:
return self in (
RunningState.PUBLISHED,
RunningState.PENDING,
RunningState.WAITING_FOR_RESOURCES,
RunningState.STARTED,
RunningState.WAITING_FOR_CLUSTER,
)


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from enum import auto

from pydantic import AnyUrl, BaseModel

from ..clusters import ClusterAuthentication
from ..users import UserID
from ..utils.enums import StrAutoEnum
from ..wallets import WalletID


class ClusterState(StrAutoEnum):
STARTED = auto()
RUNNING = auto()
STOPPED = auto()


class OnDemandCluster(BaseModel):
endpoint: AnyUrl
authentication: ClusterAuthentication
state: ClusterState
user_id: UserID
wallet_id: WalletID
gateway_ready: bool
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
"""add use on demand clusters in comp_runs

Revision ID: 2cd329e47ea1
Revises: 763666c698fb
Create Date: 2023-09-04 06:57:51.291084+00:00

"""
import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "2cd329e47ea1"
down_revision = "f53806935760"
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"comp_runs", sa.Column("use_on_demand_clusters", sa.Boolean(), nullable=True)
)
# ### end Alembic commands ###
op.execute(
sa.DDL(
"UPDATE comp_runs SET use_on_demand_clusters = false WHERE use_on_demand_clusters IS NULL"
)
)

op.alter_column(
"comp_runs",
"use_on_demand_clusters",
existing_type=sa.Boolean(),
nullable=False,
)

# new statetype
op.execute(
sa.DDL("ALTER TYPE statetype ADD VALUE IF NOT EXISTS 'WAITING_FOR_CLUSTER'")
)


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column("comp_runs", "use_on_demand_clusters")
# ### end Alembic commands ###

# no need to downgrade the enum type, postgres does not allow to just remove a type
# instead the tables that use it are updated
op.execute(
sa.DDL(
"""
UPDATE comp_tasks SET state = 'PUBLISHED' WHERE state = 'WAITING_FOR_CLUSTER';
UPDATE comp_pipeline SET state = 'PUBLISHED' WHERE state = 'WAITING_FOR_CLUSTER';
UPDATE comp_runs SET result = 'PUBLISHED' WHERE result = 'WAITING_FOR_CLUSTER';
"""
)
)
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class StateType(enum.Enum):
FAILED = "FAILED"
ABORTED = "ABORTED"
WAITING_FOR_RESOURCES = "WAITING_FOR_RESOURCES"
WAITING_FOR_CLUSTER = "WAITING_FOR_CLUSTER"


def _new_uuid():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,5 +100,11 @@
doc="When the run was finished",
),
sa.Column("metadata", JSONB, nullable=True, doc="the run optional metadata"),
sa.Column(
"use_on_demand_clusters",
sa.Boolean(),
nullable=False,
doc="the run uses on demand clusters",
),
sa.UniqueConstraint("project_uuid", "user_id", "iteration"),
)
4 changes: 0 additions & 4 deletions services/clusters-keeper/docker/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,6 @@ if [ "${SC_BUILD_TARGET}" = "development" ]; then
fi
fi

if [ "${SC_BOOT_MODE}" = "debug-ptvsd" ]; then
# NOTE: production does NOT pre-installs ptvsd
pip install --no-cache-dir debugpy
fi

# Appends docker group if socket is mounted
DOCKER_MOUNT=/var/run/docker.sock
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ def get_summary() -> str:
_______ _ _______ _________ _______ _______ _______ _ _______ _______ _______ _______ _______
( ____ \( \ |\ /|( ____ \\__ __/( ____ \( ____ )( ____ \ | \ /\( ____ \( ____ \( ____ )( ____ \( ____ )
| ( \/| ( | ) ( || ( \/ ) ( | ( \/| ( )|| ( \/ | \ / /| ( \/| ( \/| ( )|| ( \/| ( )|
| | | | | | | || (_____ | | | (__ | (____)|| (_____ | (_/ / | (__ | (__ | (____)|| (__ | (____)|
| | | | | | | |(_____ ) | | | __) | __)(_____ ) | _ ( | __) | __) | _____)| __) | __)
| | | | | | | || (_____ | | | (__ | (____)|| (_____ _____ | (_/ / | (__ | (__ | (____)|| (__ | (____)|
| | | | | | | |(_____ ) | | | __) | __)(_____ )(_____)| _ ( | __) | __) | _____)| __) | __)
| | | | | | | | ) | | | | ( | (\ ( ) | | ( \ \ | ( | ( | ( | ( | (\ (
| (____/\| (____/\| (___) |/\____) | | | | (____/\| ) \ \__/\____) | | / \ \| (____/\| (____/\| ) | (____/\| ) \ \__
(_______/(_______/(_______)\_______) )_( (_______/|/ \__/\_______) _____ |_/ \/(_______/(_______/|/ (_______/|/ \__/
(_____) {}
(_______/(_______/(_______)\_______) )_( (_______/|/ \__/\_______) |_/ \/(_______/(_______/|/ (_______/|/ \__/
{}
""".format(
f"v{__version__}"
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,8 @@
import datetime
from dataclasses import dataclass
from enum import auto
from typing import TypeAlias

from models_library.clusters import ClusterAuthentication, SimpleAuthentication
from models_library.users import UserID
from models_library.utils.enums import StrAutoEnum
from models_library.wallets import WalletID
from pydantic import AnyUrl, BaseModel, ByteSize, PositiveInt, SecretStr, parse_obj_as
from pydantic import ByteSize, PositiveInt
from types_aiobotocore_ec2.literals import InstanceStateNameType, InstanceTypeType


Expand All @@ -31,48 +26,3 @@ class EC2InstanceData:
type: InstanceTypeType # noqa: A003
state: InstanceStateNameType
tags: EC2Tags


class ClusterState(StrAutoEnum):
STARTED = auto()
RUNNING = auto()
STOPPED = auto()


def _convert_ec2_state_to_cluster_state(
ec2_state: InstanceStateNameType,
) -> ClusterState:
match ec2_state:
case "pending":
return ClusterState.STARTED # type: ignore
case "running":
return ClusterState.RUNNING # type: ignore
case _:
return ClusterState.STOPPED # type: ignore


class ClusterGet(BaseModel):
endpoint: AnyUrl
authentication: ClusterAuthentication
state: ClusterState
user_id: UserID
wallet_id: WalletID
gateway_ready: bool = False

@classmethod
def from_ec2_instance_data(
cls,
instance: EC2InstanceData,
user_id: UserID,
wallet_id: WalletID,
gateway_password: SecretStr,
) -> "ClusterGet":
return cls(
endpoint=parse_obj_as(AnyUrl, f"http://{instance.aws_public_ip}"),
authentication=SimpleAuthentication(
username=f"{user_id}", password=gateway_password
),
state=_convert_ec2_state_to_cluster_state(instance.state),
user_id=user_id,
wallet_id=wallet_id,
)
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ async def cluster_heartbeat(

async def set_instance_heartbeat(app: FastAPI, *, instance: EC2InstanceData) -> None:
with log_context(
_logger, logging.INFO, msg=f"set instance heartbeat for {instance.id}"
_logger, logging.DEBUG, msg=f"set instance heartbeat for {instance.id}"
):
ec2_client = get_ec2_client(app)
await ec2_client.set_instances_tags(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,22 @@ async def _find_terminateable_instances(
# get the corresponding ec2 instance data
terminateable_instances: list[EC2InstanceData] = []

time_to_wait_before_termination = (
app_settings.CLUSTERS_KEEPER_MAX_MISSED_HEARTBEATS_BEFORE_CLUSTER_TERMINATION
* app_settings.SERVICE_TRACKING_HEARTBEAT
)
for instance in instances:
last_heartbeat = _get_instance_last_heartbeat(instance)

elapsed_time_since_heartbeat = (
datetime.datetime.now(datetime.timezone.utc) - last_heartbeat
)

if elapsed_time_since_heartbeat >= (
app_settings.CLUSTERS_KEEPER_MAX_MISSED_HEARTBEATS_BEFORE_CLUSTER_TERMINATION
* app_settings.SERVICE_TRACKING_HEARTBEAT
):
_logger.info(
"%s has still %ss before being terminateable",
f"{instance.id=}",
f"{(time_to_wait_before_termination - elapsed_time_since_heartbeat).total_seconds()}",
)
if elapsed_time_since_heartbeat >= time_to_wait_before_termination:
# let's terminate that one
terminateable_instances.append(instance)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import logging
from typing import Any, Coroutine, Final
from collections.abc import Coroutine
from typing import Any, Final

import dask_gateway
from aiohttp.client_exceptions import ClientError
Expand All @@ -22,15 +23,19 @@ async def ping_gateway(*, url: AnyUrl, password: SecretStr) -> bool:
auth=basic_auth,
asynchronous=True,
) as gateway:
cluster_reports = await asyncio.wait_for(gateway.list_clusters(), timeout=5)
_logger.info("found %s clusters", len(cluster_reports))
await asyncio.wait_for(gateway.list_clusters(), timeout=5)
return True
except asyncio.TimeoutError:
_logger.debug("gateway ping timed-out, it is still starting...")
except ClientError:
_logger.info(
"osparc-gateway %s ping timed-out, the machine is likely still starting...",
url,
)
except (ClientError, ValueError):
# this could happen if the gateway is not properly started, but it should not last
# unless the wrong password is used.
_logger.info("dask-gateway is not reachable", exc_info=True)
_logger.info(
"Machine is up but osparc-gateway %s is not reachable...yet?!", url
)

return False

Expand Down Expand Up @@ -66,9 +71,20 @@ async def is_gateway_busy(*, url: AnyUrl, gateway_auth: SimpleAuthentication) ->
client.list_datasets()
)
_logger.info(
"cluster currently has %s datasets, it is %s",
len(datasets_on_scheduler),
"BUSY" if len(datasets_on_scheduler) > 0 else "NOT BUSY",
"cluster currently has %s datasets", len(datasets_on_scheduler)
)
currently_processing = await _wrap_client_async_routine(client.processing())
return bool(datasets_on_scheduler or currently_processing)
num_processing_tasks = 0
if worker_to_processing_tasks := await _wrap_client_async_routine(
client.processing()
):
_logger.info(
"cluster current workers: %s", worker_to_processing_tasks.keys()
)
num_processing_tasks = sum(
len(tasks) for tasks in worker_to_processing_tasks.values()
)
_logger.info(
"cluster currently processes %s tasks", num_processing_tasks
)

return bool(datasets_on_scheduler or num_processing_tasks)
Loading