Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

♻️On-demand clusters: remove dask-gateway #4760

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
70edd93
adding docker-compose
sanderegg Sep 15, 2023
befa276
add infos
sanderegg Sep 15, 2023
6937052
ensure docker-compose.yml is installed
sanderegg Sep 15, 2023
cd63cc6
ensure config folder is read/write
sanderegg Sep 15, 2023
1efa285
improve boot log
sanderegg Sep 15, 2023
18a46b1
more explicit banner
sanderegg Sep 15, 2023
9c72f67
prepare docker-compose
sanderegg Sep 15, 2023
b2f571f
we show some sensible startup
sanderegg Sep 15, 2023
31088d4
have some tests
sanderegg Sep 15, 2023
6fc0a11
improve for testing
sanderegg Sep 15, 2023
1cef68d
added test to check for docker-compose presence
sanderegg Sep 15, 2023
959de93
deploy a dask scheduler stack
sanderegg Sep 15, 2023
5e0fe36
return the scheduler url
sanderegg Sep 15, 2023
e89f8de
pass the scheduler
sanderegg Sep 15, 2023
af6d4c4
use dask scheduler
sanderegg Sep 15, 2023
c393259
add functions for scheduler
sanderegg Sep 15, 2023
78403b1
removing dask-gateway
sanderegg Sep 15, 2023
ec98a03
removed dask-gateway
sanderegg Sep 17, 2023
da83b00
removed dask-gateway-server
sanderegg Sep 17, 2023
de7a45c
en voie
sanderegg Sep 17, 2023
b1b0f41
moving fixtures
sanderegg Sep 17, 2023
a1b6f2c
moving fixtures
sanderegg Sep 17, 2023
aa5baa9
testing with dask scheduler now works
sanderegg Sep 18, 2023
a876757
renaming from gateway to dask-scheduler
sanderegg Sep 18, 2023
8d7c382
renaming
sanderegg Sep 18, 2023
1c6aed9
renaming
sanderegg Sep 18, 2023
22c852c
might work out of the box
sanderegg Sep 18, 2023
95d4200
give it a name
sanderegg Sep 18, 2023
1c2a591
ensure we have access
sanderegg Sep 18, 2023
f59ead8
pass the swarm stack name and better name for server
sanderegg Sep 18, 2023
ccbf177
adjusted time
sanderegg Sep 18, 2023
e4bf114
@pcrespov review
sanderegg Sep 18, 2023
5a0086a
@pcrespov review
sanderegg Sep 18, 2023
fa33094
cleanup
sanderegg Sep 18, 2023
c4fc956
ensure we do not stop the pipeline if it is an on-demand cluster
sanderegg Sep 19, 2023
0d504b8
that should be correct
sanderegg Sep 19, 2023
545012c
final fix
sanderegg Sep 19, 2023
49eea61
ensure call into RPC is safe
sanderegg Sep 19, 2023
114bfd2
somehow the enum is in str
sanderegg Sep 19, 2023
cb3f577
fixed tests
sanderegg Sep 19, 2023
3df48a9
cleanup
sanderegg Sep 19, 2023
90e37ce
testing coverage is now good
sanderegg Sep 19, 2023
94de411
fixed test
sanderegg Sep 19, 2023
e7b2482
fix test
sanderegg Sep 19, 2023
28a4870
fix test
sanderegg Sep 19, 2023
dcbb5cf
typo
sanderegg Sep 19, 2023
6a43354
ensure stopping works
sanderegg Sep 19, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@ class OnDemandCluster(BaseModel):
state: ClusterState
user_id: UserID
wallet_id: WalletID | None
gateway_ready: bool
dask_scheduler_ready: bool
eta: datetime.timedelta
90 changes: 90 additions & 0 deletions packages/pytest-simcore/src/pytest_simcore/dask_scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# pylint: disable=redefined-outer-name
# pylint: disable=unused-argument
# pylint: disable=unused-variable


from collections.abc import AsyncIterable, Callable
from typing import Any, AsyncIterator

import distributed
import pytest
from yarl import URL


@pytest.fixture
def dask_workers_config() -> dict[str, Any]:
return {
"cpu-worker": {
"cls": distributed.Worker,
"options": {
"nthreads": 2,
"resources": {"CPU": 2, "RAM": 48e9},
},
},
"gpu-worker": {
"cls": distributed.Worker,
"options": {
"nthreads": 1,
"resources": {
"CPU": 1,
"GPU": 1,
"RAM": 48e9,
},
},
},
"large-ram-worker": {
"cls": distributed.Worker,
"options": {
"nthreads": 1,
"resources": {
"CPU": 8,
"RAM": 768e9,
},
},
},
}


@pytest.fixture
def dask_scheduler_config(
unused_tcp_port_factory: Callable,
) -> dict[str, Any]:
return {
"cls": distributed.Scheduler,
"options": {
"port": unused_tcp_port_factory(),
"dashboard_address": f":{unused_tcp_port_factory()}",
},
}


@pytest.fixture
async def dask_spec_local_cluster(
monkeypatch: pytest.MonkeyPatch,
dask_workers_config: dict[str, Any],
dask_scheduler_config: dict[str, Any],
) -> AsyncIterable[distributed.SpecCluster]:
# in this mode we can precisely create a specific cluster

async with distributed.SpecCluster(
workers=dask_workers_config,
scheduler=dask_scheduler_config,
asynchronous=True,
name="pytest_cluster",
) as cluster:
scheduler_address = URL(cluster.scheduler_address)
monkeypatch.setenv(
"COMPUTATIONAL_BACKEND_DEFAULT_CLUSTER_URL",
f"{scheduler_address}" or "invalid",
)
yield cluster


@pytest.fixture
async def dask_spec_cluster_client(
dask_spec_local_cluster: distributed.SpecCluster,
) -> AsyncIterator[distributed.Client]:
async with distributed.Client(
dask_spec_local_cluster.scheduler_address, asynchronous=True
) as client:
yield client
21 changes: 20 additions & 1 deletion services/clusters-keeper/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,29 @@ include ../../scripts/common-service.Makefile



.PHONY: test-local
.PHONY: up-devel down
up-devel: .env ## starts local test application (running bare metal against AWS)
# setting up dependencies
@docker compose up

down: .env ## stops local test app dependencies (running bare metal against AWS)
-@docker compose down


PHONY: .init-swarm
.init-swarm:
# Ensures swarm is initialized
$(if $(SWARM_HOSTS),,docker swarm init --advertise-addr=$(get_my_ip))


.PHONY: test-dask-schduler-deploy
deploy-dask-stack: .init-swarm ## deploy the dask stack for local testing
# using local/dask-sidecar:production images
@DOCKER_IMAGE_TAG=production \
DOCKER_REGISTRY=local \
LOG_LEVEL=INFO \
docker stack deploy --with-registry-auth --compose-file=src/simcore_service_clusters_keeper/data/docker-compose.yml osparc_dask_stack

down-dask-stack: ## removes the dask stack
# stopping dask stack
-@docker stack rm osparc_dask_stack
1 change: 0 additions & 1 deletion services/clusters-keeper/requirements/_base.in
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

aioboto3
dask[distributed]
dask-gateway
fastapi
packaging
types-aiobotocore[ec2]
8 changes: 0 additions & 8 deletions services/clusters-keeper/requirements/_base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ aiohttp==3.8.5
# -c requirements/../../../requirements/constraints.txt
# aiobotocore
# aiodocker
# dask-gateway
aioitertools==0.11.0
# via aiobotocore
aiormq==6.7.7
Expand Down Expand Up @@ -92,7 +91,6 @@ click==8.1.7
# via
# -c requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt
# dask
# dask-gateway
# distributed
# typer
# uvicorn
Expand All @@ -105,15 +103,11 @@ dask==2023.3.2
# via
# -c requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt
# -r requirements/_base.in
# dask-gateway
# distributed
dask-gateway==2023.1.1
# via -r requirements/_base.in
distributed==2023.3.2
# via
# -c requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt
# dask
# dask-gateway
dnspython==2.4.2
# via email-validator
email-validator==2.0.0.post2
Expand Down Expand Up @@ -280,7 +274,6 @@ pyyaml==6.0.1
# -c requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt
# -r requirements/../../../packages/service-library/requirements/_base.in
# dask
# dask-gateway
# distributed
redis==5.0.0
# via
Expand Down Expand Up @@ -354,7 +347,6 @@ toolz==0.12.0
tornado==6.3.3
# via
# -c requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt
# dask-gateway
# distributed
tqdm==4.66.1
# via
Expand Down
1 change: 0 additions & 1 deletion services/clusters-keeper/requirements/_test.in
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
aiodocker
asgi-lifespan
coverage
dask-gateway-server[local]
debugpy
deepdiff
docker
Expand Down
14 changes: 0 additions & 14 deletions services/clusters-keeper/requirements/_test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ aiohttp==3.8.5
# -c requirements/../../../requirements/constraints.txt
# -c requirements/_base.txt
# aiodocker
# dask-gateway-server
aiosignal==1.3.1
# via
# -c requirements/_base.txt
Expand Down Expand Up @@ -75,21 +74,16 @@ click==8.1.7
# via
# -c requirements/_base.txt
# flask
colorlog==6.7.0
# via dask-gateway-server
coverage==7.3.1
# via
# -r requirements/_test.in
# pytest-cov
cryptography==41.0.3
# via
# -c requirements/../../../requirements/constraints.txt
# dask-gateway-server
# moto
# python-jose
# sshpubkeys
dask-gateway-server==2023.1.1
# via -r requirements/_test.in
debugpy==1.8.0
# via -r requirements/_test.in
deepdiff==6.5.0
Expand Down Expand Up @@ -125,8 +119,6 @@ frozenlist==1.4.0
# aiosignal
graphql-core==3.2.3
# via moto
greenlet==2.0.2
# via sqlalchemy
h11==0.14.0
# via
# -c requirements/_base.txt
Expand Down Expand Up @@ -339,10 +331,6 @@ sortedcontainers==2.4.0
# via
# -c requirements/_base.txt
# fakeredis
sqlalchemy==1.4.49
# via
# -c requirements/../../../requirements/constraints.txt
# dask-gateway-server
sshpubkeys==3.3.1
# via moto
sympy==1.12
Expand All @@ -351,8 +339,6 @@ tomli==2.0.1
# via
# coverage
# pytest
traitlets==5.9.0
# via dask-gateway-server
types-pyyaml==6.0.12.11
# via responses
typing-extensions==4.7.1
Expand Down
1 change: 1 addition & 0 deletions services/clusters-keeper/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def read_reqs(reqs_path: Path) -> set[str]:
package_dir={
"": "src",
},
package_data={"": ["data/*.yml"]},
include_package_data=True,
install_requires=PROD_REQUIREMENTS,
test_suite="tests",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

"""
from contextlib import suppress
from pathlib import Path
from typing import Final

import pkg_resources
Expand Down Expand Up @@ -35,7 +36,11 @@ def get_summary() -> str:


SUMMARY: Final[str] = get_summary()

PACKAGE_DATA_FOLDER: Final[Path] = Path(
pkg_resources.resource_filename(
_current_distribution.project_name.replace("-", "_"), "data"
)
)

# https://patorjk.com/software/taag/#p=testall&f=Avatar&t=clusters_keeper
APP_STARTED_BANNER_MSG = r"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,7 @@
LogLevel,
VersionTag,
)
from pydantic import (
Field,
NonNegativeInt,
PositiveInt,
SecretStr,
parse_obj_as,
validator,
)
from pydantic import Field, NonNegativeInt, PositiveInt, parse_obj_as, validator
from settings_library.base import BaseCustomSettings
from settings_library.docker_registry import RegistrySettings
from settings_library.rabbit import RabbitSettings
Expand Down Expand Up @@ -168,13 +161,12 @@ class ApplicationSettings(BaseCustomSettings, MixinLoggingSettings):
description="defines the image tag to use for the computational backend",
)

CLUSTERS_KEEPER_COMPUTATIONAL_BACKEND_GATEWAY_PASSWORD: SecretStr = Field(
default=SecretStr("my_secure_P1ssword"),
description="very secure password, should change soon",
SWARM_STACK_NAME: str = Field(
..., description="Stack name defined upon deploy (see main Makefile)"
)

@cached_property
def LOG_LEVEL(self): # noqa: N802
def LOG_LEVEL(self) -> LogLevel: # noqa: N802
return self.CLUSTERS_KEEPER_LOGLEVEL

@validator("CLUSTERS_KEEPER_LOGLEVEL")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
version: "3.8"
services:
dask-sidecar:
image: ${DOCKER_REGISTRY:-itisfoundation}/dask-sidecar:${DOCKER_IMAGE_TAG}
init: true
hostname: "{{.Node.Hostname}}-{{.Service.Name}}"
volumes:
- computational_shared_data:${SIDECAR_COMP_SERVICES_SHARED_FOLDER:-/home/scu/computational_shared_data}
- /var/run/docker.sock:/var/run/docker.sock:ro
environment:
DASK_LOG_FORMAT_LOCAL_DEV_ENABLED: 1
DASK_NPROCS: 1
DASK_SCHEDULER_URL: ${DASK_SCHEDULER_URL:-tcp://dask-scheduler:8786}
DASK_SIDECAR_NON_USABLE_RAM: 0
DASK_SIDECAR_NUM_NON_USABLE_CPUS: 0
LOG_LEVEL: ${LOG_LEVEL:-WARNING}
SIDECAR_COMP_SERVICES_SHARED_FOLDER: ${SIDECAR_COMP_SERVICES_SHARED_FOLDER:-/home/scu/computational_shared_data}
SIDECAR_COMP_SERVICES_SHARED_VOLUME_NAME: computational_shared_data
networks:
- default
deploy:
mode: global

dask-scheduler:
image: ${DOCKER_REGISTRY:-itisfoundation}/dask-sidecar:${DOCKER_IMAGE_TAG}
init: true
hostname: "{{.Node.Hostname}}-{{.Service.Name}}-{{.Task.Slot}}"
environment:
DASK_START_AS_SCHEDULER: 1
LOG_LEVEL: ${LOG_LEVEL:-WARNING}
ports:
- 8786:8786
- 8787:8787
networks:
- default

volumes:
computational_shared_data:
name: computational_shared_data

networks:
default:
attachable: true
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
from ..core.settings import get_application_settings
from ..models import EC2InstanceData
from ..modules.clusters import delete_clusters, get_all_clusters, set_instance_heartbeat
from ..utils.dask import get_gateway_authentication, get_gateway_url
from ..utils.ec2 import HEARTBEAT_TAG_KEY, get_user_id_from_tags
from .dask import is_gateway_busy, ping_gateway
from ..utils.dask import get_scheduler_url
from ..utils.ec2 import HEARTBEAT_TAG_KEY
from .dask import is_scheduler_busy, ping_scheduler

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -54,23 +54,13 @@ async def _find_terminateable_instances(

async def check_clusters(app: FastAPI) -> None:
instances = await get_all_clusters(app)
app_settings = get_application_settings(app)
connected_intances = [
instance
for instance in instances
if await ping_gateway(
url=get_gateway_url(instance),
password=app_settings.CLUSTERS_KEEPER_COMPUTATIONAL_BACKEND_GATEWAY_PASSWORD,
)
if await ping_scheduler(get_scheduler_url(instance))
]
for instance in connected_intances:
is_busy = await is_gateway_busy(
url=get_gateway_url(instance),
gateway_auth=get_gateway_authentication(
user_id=get_user_id_from_tags(instance.tags),
password=app_settings.CLUSTERS_KEEPER_COMPUTATIONAL_BACKEND_GATEWAY_PASSWORD,
),
)
is_busy = await is_scheduler_busy(get_scheduler_url(instance))
_logger.info(
"%s currently %s",
f"{instance.id=} for {instance.tags=}",
Expand Down
Loading