Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨Enable on-demand clusters based on groups extra properties (⚠️ devops) #4738

11 changes: 10 additions & 1 deletion .env-devel
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,17 @@ CATALOG_DEV_FEATURES_ENABLED=0
CATALOG_SERVICES_DEFAULT_RESOURCES='{"CPU": {"limit": 0.1, "reservation": 0.1}, "RAM": {"limit": 2147483648, "reservation": 2147483648}}'
CATALOG_SERVICES_DEFAULT_SPECIFICATIONS='{}'

CLUSTERS_KEEPER_EC2_ACCESS_KEY_ID=XXXXXXXXXXXX
CLUSTERS_KEEPER_EC2_ENDPOINT=https://ec2.amazonaws.com
CLUSTERS_KEEPER_EC2_INSTANCES_ALLOWED_TYPES="[\"t2.micro\"]"
CLUSTERS_KEEPER_EC2_INSTANCES_AMI_ID=XXXXXXXXXXXXX
CLUSTERS_KEEPER_EC2_INSTANCES_KEY_NAME=XXXXXXXXXXX
CLUSTERS_KEEPER_EC2_INSTANCES_SECURITY_GROUP_IDS="[\"\"]"
CLUSTERS_KEEPER_EC2_INSTANCES_SUBNET_ID=XXXXXXXXXX
CLUSTERS_KEEPER_EC2_REGION_NAME=us-east-1
CLUSTERS_KEEPER_EC2_SECRET_ACCESS_KEY=XXXXXXXXXXXXXXX
CLUSTERS_KEEPER_MAX_MISSED_HEARTBEATS_BEFORE_CLUSTER_TERMINATION=5
CLUSTERS_KEEPER_TASK_INTERVAL=60
CLUSTERS_KEEPER_TASK_INTERVAL=30

DASK_SCHEDULER_HOST=dask-scheduler
DASK_SCHEDULER_PORT=8786
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class NodeMessageBase(ProjectMessageBase):

class LoggerRabbitMessage(RabbitMessageBase, NodeMessageBase):
channel_name: Literal["simcore.services.logs.v2"] = "simcore.services.logs.v2"
node_id: NodeID | None
messages: list[LogMessageStr]
log_level: LogLevelInt = logging.INFO

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
from enum import auto

from pydantic import AnyUrl, BaseModel
Expand All @@ -21,3 +22,4 @@ class OnDemandCluster(BaseModel):
user_id: UserID
wallet_id: WalletID
gateway_ready: bool
eta: datetime.timedelta
14 changes: 7 additions & 7 deletions services/clusters-keeper/.env-devel
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ CLUSTERS_KEEPER_DEBUG=true
CLUSTERS_KEEPER_LOGLEVEL=INFO
CLUSTERS_KEEPER_MAX_MISSED_HEARTBEATS_BEFORE_CLUSTER_TERMINATION=60
CLUSTERS_KEEPER_TASK_INTERVAL=30
EC2_ACCESS_KEY_ID=XXXXXXXXXX
EC2_INSTANCES_ALLOWED_TYPES="[\"t2.micro\"]"
EC2_INSTANCES_AMI_ID=XXXXXXXXXX
EC2_INSTANCES_KEY_NAME=XXXXXXXXXX
EC2_INSTANCES_SECURITY_GROUP_IDS=XXXXXXXXXX
EC2_INSTANCES_SUBNET_ID=XXXXXXXXXX
EC2_SECRET_ACCESS_KEY=XXXXXXXXXX
CLUSTERS_KEEPER_EC2_ACCESS_KEY_ID=XXXXXXXXXX
CLUSTERS_KEEPER_EC2_INSTANCES_ALLOWED_TYPES="[\"t2.micro\"]"
CLUSTERS_KEEPER_EC2_INSTANCES_AMI_ID=XXXXXXXXXX
CLUSTERS_KEEPER_EC2_INSTANCES_KEY_NAME=XXXXXXXXXX
CLUSTERS_KEEPER_EC2_INSTANCES_SECURITY_GROUP_IDS=XXXXXXXXXX
CLUSTERS_KEEPER_EC2_INSTANCES_SUBNET_ID=XXXXXXXXXX
CLUSTERS_KEEPER_EC2_SECRET_ACCESS_KEY=XXXXXXXXXX
LOG_FORMAT_LOCAL_DEV_ENABLED=True
RABBIT_HOST=rabbit
RABBIT_PASSWORD=test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,63 +28,63 @@


class EC2Settings(BaseCustomSettings):
EC2_ACCESS_KEY_ID: str
EC2_ENDPOINT: str | None = Field(
CLUSTERS_KEEPER_EC2_ACCESS_KEY_ID: str
CLUSTERS_KEEPER_EC2_ENDPOINT: str | None = Field(
default=None, description="do not define if using standard AWS"
)
EC2_REGION_NAME: str = "us-east-1"
EC2_SECRET_ACCESS_KEY: str
CLUSTERS_KEEPER_EC2_REGION_NAME: str = "us-east-1"
CLUSTERS_KEEPER_EC2_SECRET_ACCESS_KEY: str


class EC2InstancesSettings(BaseCustomSettings):
EC2_INSTANCES_ALLOWED_TYPES: list[str] = Field(
CLUSTERS_KEEPER_EC2_INSTANCES_ALLOWED_TYPES: list[str] = Field(
...,
min_items=1,
unique_items=True,
description="Defines which EC2 instances are considered as candidates for new EC2 instance",
)
EC2_INSTANCES_AMI_ID: str = Field(
CLUSTERS_KEEPER_EC2_INSTANCES_AMI_ID: str = Field(
...,
min_length=1,
description="Defines the AMI (Amazon Machine Image) ID used to start a new EC2 instance",
)
EC2_INSTANCES_MAX_INSTANCES: int = Field(
CLUSTERS_KEEPER_EC2_INSTANCES_MAX_INSTANCES: int = Field(
default=10,
description="Defines the maximum number of instances the clusters_keeper app may create",
)
EC2_INSTANCES_SECURITY_GROUP_IDS: list[str] = Field(
CLUSTERS_KEEPER_EC2_INSTANCES_SECURITY_GROUP_IDS: list[str] = Field(
...,
min_items=1,
description="A security group acts as a virtual firewall for your EC2 instances to control incoming and outgoing traffic"
" (https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-security-groups.html), "
" this is required to start a new EC2 instance",
)
EC2_INSTANCES_SUBNET_ID: str = Field(
CLUSTERS_KEEPER_EC2_INSTANCES_SUBNET_ID: str = Field(
...,
min_length=1,
description="A subnet is a range of IP addresses in your VPC "
" (https://docs.aws.amazon.com/vpc/latest/userguide/configure-subnets.html), "
"this is required to start a new EC2 instance",
)
EC2_INSTANCES_KEY_NAME: str = Field(
CLUSTERS_KEEPER_EC2_INSTANCES_KEY_NAME: str = Field(
...,
min_length=1,
description="SSH key filename (without ext) to access the instance through SSH"
" (https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html),"
"this is required to start a new EC2 instance",
)

EC2_INSTANCES_MAX_START_TIME: datetime.timedelta = Field(
CLUSTERS_KEEPER_EC2_INSTANCES_MAX_START_TIME: datetime.timedelta = Field(
default=datetime.timedelta(minutes=3),
description="Usual time taken an EC2 instance with the given AMI takes to be in 'running' mode",
)

EC2_INSTANCES_CUSTOM_BOOT_SCRIPTS: list[str] = Field(
CLUSTERS_KEEPER_EC2_INSTANCES_CUSTOM_BOOT_SCRIPTS: list[str] = Field(
default_factory=list,
description="script(s) to run on EC2 instance startup (be careful!), each entry is run one after the other using '&&' operator",
)

@validator("EC2_INSTANCES_ALLOWED_TYPES")
@validator("CLUSTERS_KEEPER_EC2_INSTANCES_ALLOWED_TYPES")
@classmethod
def check_valid_intance_names(cls, value):
# NOTE: needed because of a flaw in BaseCustomSettings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ async def create_cluster(
InstanceTypeType,
next(
iter(
app_settings.CLUSTERS_KEEPER_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES
app_settings.CLUSTERS_KEEPER_EC2_INSTANCES.CLUSTERS_KEEPER_EC2_INSTANCES_ALLOWED_TYPES
)
),
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ async def create(cls, settings: EC2Settings) -> "ClustersKeeperEC2":
session = aioboto3.Session()
session_client = session.client(
"ec2",
endpoint_url=settings.EC2_ENDPOINT,
aws_access_key_id=settings.EC2_ACCESS_KEY_ID,
aws_secret_access_key=settings.EC2_SECRET_ACCESS_KEY,
region_name=settings.EC2_REGION_NAME,
endpoint_url=settings.CLUSTERS_KEEPER_EC2_ENDPOINT,
aws_access_key_id=settings.CLUSTERS_KEEPER_EC2_ACCESS_KEY_ID,
aws_secret_access_key=settings.CLUSTERS_KEEPER_EC2_SECRET_ACCESS_KEY,
region_name=settings.CLUSTERS_KEEPER_EC2_REGION_NAME,
)
assert isinstance(session_client, ClientCreatorContext) # nosec
exit_stack = contextlib.AsyncExitStack()
Expand Down Expand Up @@ -102,19 +102,19 @@ async def start_aws_instance(
current_instances = await self.get_instances(instance_settings, tags=tags)
if (
len(current_instances) + number_of_instances
> instance_settings.EC2_INSTANCES_MAX_INSTANCES
> instance_settings.CLUSTERS_KEEPER_EC2_INSTANCES_MAX_INSTANCES
):
raise Ec2TooManyInstancesError(
num_instances=instance_settings.EC2_INSTANCES_MAX_INSTANCES
num_instances=instance_settings.CLUSTERS_KEEPER_EC2_INSTANCES_MAX_INSTANCES
)

instances = await self.client.run_instances(
ImageId=instance_settings.EC2_INSTANCES_AMI_ID,
ImageId=instance_settings.CLUSTERS_KEEPER_EC2_INSTANCES_AMI_ID,
MinCount=number_of_instances,
MaxCount=number_of_instances,
InstanceType=instance_type,
InstanceInitiatedShutdownBehavior="terminate",
KeyName=instance_settings.EC2_INSTANCES_KEY_NAME,
KeyName=instance_settings.CLUSTERS_KEEPER_EC2_INSTANCES_KEY_NAME,
TagSpecifications=[
{
"ResourceType": "instance",
Expand All @@ -129,8 +129,8 @@ async def start_aws_instance(
{
"AssociatePublicIpAddress": True,
"DeviceIndex": 0,
"SubnetId": instance_settings.EC2_INSTANCES_SUBNET_ID,
"Groups": instance_settings.EC2_INSTANCES_SECURITY_GROUP_IDS,
"SubnetId": instance_settings.CLUSTERS_KEEPER_EC2_INSTANCES_SUBNET_ID,
"Groups": instance_settings.CLUSTERS_KEEPER_EC2_INSTANCES_SECURITY_GROUP_IDS,
}
],
)
Expand Down Expand Up @@ -183,7 +183,7 @@ async def get_instances(
filters: list[FilterTypeDef] = [
{
"Name": "key-name",
"Values": [instance_settings.EC2_INSTANCES_KEY_NAME],
"Values": [instance_settings.CLUSTERS_KEEPER_EC2_INSTANCES_KEY_NAME],
},
{"Name": "instance-state-name", "Values": state_names},
]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import datetime
from typing import Final

from models_library.clusters import SimpleAuthentication
from models_library.rpc_schemas_clusters_keeper.clusters import (
ClusterState,
Expand Down Expand Up @@ -39,6 +42,27 @@ def _convert_ec2_state_to_cluster_state(
return ClusterState.STOPPED


_EC2_INSTANCE_MAX_START_TIME: Final[datetime.timedelta] = datetime.timedelta(minutes=3)
_GATEWAY_READYNESS_MAX_TIME: Final[datetime.timedelta] = datetime.timedelta(minutes=3)


def _create_eta(
instance_launch_time: datetime.datetime,
*,
gateway_ready: bool,
) -> datetime.timedelta:
now = datetime.datetime.now(datetime.timezone.utc)
estimated_time_to_running = (
instance_launch_time
+ _EC2_INSTANCE_MAX_START_TIME
+ _GATEWAY_READYNESS_MAX_TIME
- now
)
if gateway_ready is True:
estimated_time_to_running = datetime.timedelta(seconds=0)
return estimated_time_to_running


def create_cluster_from_ec2_instance(
instance: EC2InstanceData,
user_id: UserID,
Expand All @@ -56,4 +80,5 @@ def create_cluster_from_ec2_instance(
user_id=user_id,
wallet_id=wallet_id,
gateway_ready=gateway_ready,
eta=_create_eta(instance.launch_time, gateway_ready=gateway_ready),
)
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def creation_ec2_tags(
assert app_settings.CLUSTERS_KEEPER_EC2_INSTANCES # nosec
return _DEFAULT_CLUSTERS_KEEPER_TAGS | {
# NOTE: this one gets special treatment in AWS GUI and is applied to the name of the instance
"Name": f"osparc-gateway-server-{app_settings.CLUSTERS_KEEPER_EC2_INSTANCES.EC2_INSTANCES_KEY_NAME}-user_id:{user_id}-wallet_id:{wallet_id}",
"Name": f"osparc-gateway-server-{app_settings.CLUSTERS_KEEPER_EC2_INSTANCES.CLUSTERS_KEEPER_EC2_INSTANCES_KEY_NAME}-user_id:{user_id}-wallet_id:{wallet_id}",
"user_id": f"{user_id}",
"wallet_id": f"{wallet_id}",
}
Expand Down
31 changes: 16 additions & 15 deletions services/clusters-keeper/tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,15 @@ def app_environment(
envs = setenvs_from_dict(
monkeypatch,
{
"EC2_ACCESS_KEY_ID": faker.pystr(),
"EC2_SECRET_ACCESS_KEY": faker.pystr(),
"EC2_INSTANCES_KEY_NAME": faker.pystr(),
"EC2_INSTANCES_SECURITY_GROUP_IDS": json.dumps(
"CLUSTERS_KEEPER_EC2_ACCESS_KEY_ID": faker.pystr(),
"CLUSTERS_KEEPER_EC2_SECRET_ACCESS_KEY": faker.pystr(),
"CLUSTERS_KEEPER_EC2_INSTANCES_KEY_NAME": faker.pystr(),
"CLUSTERS_KEEPER_EC2_INSTANCES_SECURITY_GROUP_IDS": json.dumps(
faker.pylist(allowed_types=(str,))
),
"EC2_INSTANCES_SUBNET_ID": faker.pystr(),
"EC2_INSTANCES_AMI_ID": faker.pystr(),
"EC2_INSTANCES_ALLOWED_TYPES": json.dumps(ec2_instances),
"CLUSTERS_KEEPER_EC2_INSTANCES_SUBNET_ID": faker.pystr(),
"CLUSTERS_KEEPER_EC2_INSTANCES_AMI_ID": faker.pystr(),
"CLUSTERS_KEEPER_EC2_INSTANCES_ALLOWED_TYPES": json.dumps(ec2_instances),
},
)
return mock_env_devel_environment | envs
Expand Down Expand Up @@ -123,7 +123,7 @@ def disabled_rabbitmq(app_environment: EnvVarsDict, monkeypatch: pytest.MonkeyPa

@pytest.fixture
def disabled_ec2(app_environment: EnvVarsDict, monkeypatch: pytest.MonkeyPatch):
monkeypatch.delenv("EC2_ACCESS_KEY_ID")
monkeypatch.delenv("CLUSTERS_KEEPER_EC2_ACCESS_KEY_ID")


@pytest.fixture
Expand Down Expand Up @@ -197,9 +197,9 @@ def mocked_aws_server_envs(
monkeypatch: pytest.MonkeyPatch,
) -> EnvVarsDict:
changed_envs = {
"EC2_ENDPOINT": f"http://{mocked_aws_server._ip_address}:{mocked_aws_server._port}", # pylint: disable=protected-access # noqa: SLF001
"EC2_ACCESS_KEY_ID": "xxx",
"EC2_SECRET_ACCESS_KEY": "xxx",
"CLUSTERS_KEEPER_EC2_ENDPOINT": f"http://{mocked_aws_server._ip_address}:{mocked_aws_server._port}", # pylint: disable=protected-access # noqa: SLF001
"CLUSTERS_KEEPER_EC2_ACCESS_KEY_ID": "xxx",
"CLUSTERS_KEEPER_EC2_SECRET_ACCESS_KEY": "xxx",
}
return app_environment | setenvs_from_dict(monkeypatch, changed_envs)

Expand All @@ -210,7 +210,7 @@ def aws_allowed_ec2_instance_type_names(
monkeypatch: pytest.MonkeyPatch,
) -> EnvVarsDict:
changed_envs = {
"EC2_INSTANCES_ALLOWED_TYPES": json.dumps(
"CLUSTERS_KEEPER_EC2_INSTANCES_ALLOWED_TYPES": json.dumps(
[
"t2.xlarge",
"t2.2xlarge",
Expand Down Expand Up @@ -267,7 +267,7 @@ async def aws_subnet_id(
subnet_id = subnet["Subnet"]["SubnetId"]
print(f"--> Created Subnet in AWS with {subnet_id=}")

monkeypatch.setenv("EC2_INSTANCES_SUBNET_ID", subnet_id)
monkeypatch.setenv("CLUSTERS_KEEPER_EC2_INSTANCES_SUBNET_ID", subnet_id)
yield subnet_id

# all the instances in the subnet must be terminated before that works
Expand Down Expand Up @@ -303,7 +303,8 @@ async def aws_security_group_id(
security_group_id = security_group["GroupId"]
print(f"--> Created Security Group in AWS with {security_group_id=}")
monkeypatch.setenv(
"EC2_INSTANCES_SECURITY_GROUP_IDS", json.dumps([security_group_id])
"CLUSTERS_KEEPER_EC2_INSTANCES_SECURITY_GROUP_IDS",
json.dumps([security_group_id]),
)
yield security_group_id
await ec2_client.delete_security_group(GroupId=security_group_id)
Expand All @@ -320,7 +321,7 @@ async def aws_ami_id(
images = await ec2_client.describe_images()
image = random.choice(images["Images"]) # noqa S311
ami_id = image["ImageId"] # type: ignore
monkeypatch.setenv("EC2_INSTANCES_AMI_ID", ami_id)
monkeypatch.setenv("CLUSTERS_KEEPER_EC2_INSTANCES_AMI_ID", ami_id)
return ami_id


Expand Down
19 changes: 13 additions & 6 deletions services/clusters-keeper/tests/unit/test_modules_ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,26 +118,33 @@ async def test_get_ec2_instance_capabilities(
clusters_keeper_ec2: ClustersKeeperEC2,
):
assert app_settings.CLUSTERS_KEEPER_EC2_INSTANCES
assert app_settings.CLUSTERS_KEEPER_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES
assert (
app_settings.CLUSTERS_KEEPER_EC2_INSTANCES.CLUSTERS_KEEPER_EC2_INSTANCES_ALLOWED_TYPES
)
instance_types = await clusters_keeper_ec2.get_ec2_instance_capabilities(
cast(
set[InstanceTypeType],
set(app_settings.CLUSTERS_KEEPER_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES),
set(
app_settings.CLUSTERS_KEEPER_EC2_INSTANCES.CLUSTERS_KEEPER_EC2_INSTANCES_ALLOWED_TYPES
),
)
)
assert instance_types
assert len(instance_types) == len(
app_settings.CLUSTERS_KEEPER_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES
app_settings.CLUSTERS_KEEPER_EC2_INSTANCES.CLUSTERS_KEEPER_EC2_INSTANCES_ALLOWED_TYPES
)

# all the instance names are found and valid
assert all(
i.name in app_settings.CLUSTERS_KEEPER_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES
i.name
in app_settings.CLUSTERS_KEEPER_EC2_INSTANCES.CLUSTERS_KEEPER_EC2_INSTANCES_ALLOWED_TYPES
for i in instance_types
)
for (
instance_type_name
) in app_settings.CLUSTERS_KEEPER_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES:
) in (
app_settings.CLUSTERS_KEEPER_EC2_INSTANCES.CLUSTERS_KEEPER_EC2_INSTANCES_ALLOWED_TYPES
):
assert any(i.name == instance_type_name for i in instance_types)


Expand Down Expand Up @@ -207,7 +214,7 @@ async def test_start_aws_instance_is_limited_in_number_of_instances(
tags = faker.pydict(allowed_types=(str,))
startup_script = faker.pystr()
for _ in range(
app_settings.CLUSTERS_KEEPER_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES
app_settings.CLUSTERS_KEEPER_EC2_INSTANCES.CLUSTERS_KEEPER_EC2_INSTANCES_MAX_INSTANCES
):
await clusters_keeper_ec2.start_aws_instance(
app_settings.CLUSTERS_KEEPER_EC2_INSTANCES,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,9 @@ class ComputationalBackendOnDemandClustersKeeperNotReadyError(

class ComputationalBackendOnDemandNotReadyError(PydanticErrorMixin, SchedulerError):
code = "computational_backend.on_demand_cluster.not_ready"
msg_template = "The on demand computational cluster is not ready"
msg_template = (
"The on demand computational cluster is not ready 'est. remaining time: {eta}'"
)


#
Expand Down
Loading