Skip to content

Commit

Permalink
✨Enable on-demand clusters based on groups extra properties (⚠️ devop…
Browse files Browse the repository at this point in the history
…s) (#4738)
  • Loading branch information
sanderegg authored Sep 13, 2023
1 parent ab5fa9a commit 0394294
Show file tree
Hide file tree
Showing 19 changed files with 170 additions and 74 deletions.
11 changes: 10 additions & 1 deletion .env-devel
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,17 @@ CATALOG_DEV_FEATURES_ENABLED=0
CATALOG_SERVICES_DEFAULT_RESOURCES='{"CPU": {"limit": 0.1, "reservation": 0.1}, "RAM": {"limit": 2147483648, "reservation": 2147483648}}'
CATALOG_SERVICES_DEFAULT_SPECIFICATIONS='{}'

CLUSTERS_KEEPER_EC2_ACCESS_KEY_ID=XXXXXXXXXXXX
CLUSTERS_KEEPER_EC2_ENDPOINT=https://ec2.amazonaws.com
CLUSTERS_KEEPER_EC2_INSTANCES_ALLOWED_TYPES="[\"t2.micro\"]"
CLUSTERS_KEEPER_EC2_INSTANCES_AMI_ID=XXXXXXXXXXXXX
CLUSTERS_KEEPER_EC2_INSTANCES_KEY_NAME=XXXXXXXXXXX
CLUSTERS_KEEPER_EC2_INSTANCES_SECURITY_GROUP_IDS="[\"\"]"
CLUSTERS_KEEPER_EC2_INSTANCES_SUBNET_ID=XXXXXXXXXX
CLUSTERS_KEEPER_EC2_REGION_NAME=us-east-1
CLUSTERS_KEEPER_EC2_SECRET_ACCESS_KEY=XXXXXXXXXXXXXXX
CLUSTERS_KEEPER_MAX_MISSED_HEARTBEATS_BEFORE_CLUSTER_TERMINATION=5
CLUSTERS_KEEPER_TASK_INTERVAL=60
CLUSTERS_KEEPER_TASK_INTERVAL=30

DASK_SCHEDULER_HOST=dask-scheduler
DASK_SCHEDULER_PORT=8786
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class NodeMessageBase(ProjectMessageBase):

class LoggerRabbitMessage(RabbitMessageBase, NodeMessageBase):
channel_name: Literal["simcore.services.logs.v2"] = "simcore.services.logs.v2"
node_id: NodeID | None
messages: list[LogMessageStr]
log_level: LogLevelInt = logging.INFO

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
from enum import auto

from pydantic import AnyUrl, BaseModel
Expand All @@ -21,3 +22,4 @@ class OnDemandCluster(BaseModel):
user_id: UserID
wallet_id: WalletID
gateway_ready: bool
eta: datetime.timedelta
14 changes: 7 additions & 7 deletions services/clusters-keeper/.env-devel
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ CLUSTERS_KEEPER_DEBUG=true
CLUSTERS_KEEPER_LOGLEVEL=INFO
CLUSTERS_KEEPER_MAX_MISSED_HEARTBEATS_BEFORE_CLUSTER_TERMINATION=60
CLUSTERS_KEEPER_TASK_INTERVAL=30
EC2_ACCESS_KEY_ID=XXXXXXXXXX
EC2_INSTANCES_ALLOWED_TYPES="[\"t2.micro\"]"
EC2_INSTANCES_AMI_ID=XXXXXXXXXX
EC2_INSTANCES_KEY_NAME=XXXXXXXXXX
EC2_INSTANCES_SECURITY_GROUP_IDS=XXXXXXXXXX
EC2_INSTANCES_SUBNET_ID=XXXXXXXXXX
EC2_SECRET_ACCESS_KEY=XXXXXXXXXX
CLUSTERS_KEEPER_EC2_ACCESS_KEY_ID=XXXXXXXXXX
CLUSTERS_KEEPER_EC2_INSTANCES_ALLOWED_TYPES="[\"t2.micro\"]"
CLUSTERS_KEEPER_EC2_INSTANCES_AMI_ID=XXXXXXXXXX
CLUSTERS_KEEPER_EC2_INSTANCES_KEY_NAME=XXXXXXXXXX
CLUSTERS_KEEPER_EC2_INSTANCES_SECURITY_GROUP_IDS=XXXXXXXXXX
CLUSTERS_KEEPER_EC2_INSTANCES_SUBNET_ID=XXXXXXXXXX
CLUSTERS_KEEPER_EC2_SECRET_ACCESS_KEY=XXXXXXXXXX
LOG_FORMAT_LOCAL_DEV_ENABLED=True
RABBIT_HOST=rabbit
RABBIT_PASSWORD=test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,63 +28,63 @@


class EC2Settings(BaseCustomSettings):
EC2_ACCESS_KEY_ID: str
EC2_ENDPOINT: str | None = Field(
CLUSTERS_KEEPER_EC2_ACCESS_KEY_ID: str
CLUSTERS_KEEPER_EC2_ENDPOINT: str | None = Field(
default=None, description="do not define if using standard AWS"
)
EC2_REGION_NAME: str = "us-east-1"
EC2_SECRET_ACCESS_KEY: str
CLUSTERS_KEEPER_EC2_REGION_NAME: str = "us-east-1"
CLUSTERS_KEEPER_EC2_SECRET_ACCESS_KEY: str


class EC2InstancesSettings(BaseCustomSettings):
EC2_INSTANCES_ALLOWED_TYPES: list[str] = Field(
CLUSTERS_KEEPER_EC2_INSTANCES_ALLOWED_TYPES: list[str] = Field(
...,
min_items=1,
unique_items=True,
description="Defines which EC2 instances are considered as candidates for new EC2 instance",
)
EC2_INSTANCES_AMI_ID: str = Field(
CLUSTERS_KEEPER_EC2_INSTANCES_AMI_ID: str = Field(
...,
min_length=1,
description="Defines the AMI (Amazon Machine Image) ID used to start a new EC2 instance",
)
EC2_INSTANCES_MAX_INSTANCES: int = Field(
CLUSTERS_KEEPER_EC2_INSTANCES_MAX_INSTANCES: int = Field(
default=10,
description="Defines the maximum number of instances the clusters_keeper app may create",
)
EC2_INSTANCES_SECURITY_GROUP_IDS: list[str] = Field(
CLUSTERS_KEEPER_EC2_INSTANCES_SECURITY_GROUP_IDS: list[str] = Field(
...,
min_items=1,
description="A security group acts as a virtual firewall for your EC2 instances to control incoming and outgoing traffic"
" (https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-security-groups.html), "
" this is required to start a new EC2 instance",
)
EC2_INSTANCES_SUBNET_ID: str = Field(
CLUSTERS_KEEPER_EC2_INSTANCES_SUBNET_ID: str = Field(
...,
min_length=1,
description="A subnet is a range of IP addresses in your VPC "
" (https://docs.aws.amazon.com/vpc/latest/userguide/configure-subnets.html), "
"this is required to start a new EC2 instance",
)
EC2_INSTANCES_KEY_NAME: str = Field(
CLUSTERS_KEEPER_EC2_INSTANCES_KEY_NAME: str = Field(
...,
min_length=1,
description="SSH key filename (without ext) to access the instance through SSH"
" (https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html),"
"this is required to start a new EC2 instance",
)

EC2_INSTANCES_MAX_START_TIME: datetime.timedelta = Field(
CLUSTERS_KEEPER_EC2_INSTANCES_MAX_START_TIME: datetime.timedelta = Field(
default=datetime.timedelta(minutes=3),
description="Usual time taken an EC2 instance with the given AMI takes to be in 'running' mode",
)

EC2_INSTANCES_CUSTOM_BOOT_SCRIPTS: list[str] = Field(
CLUSTERS_KEEPER_EC2_INSTANCES_CUSTOM_BOOT_SCRIPTS: list[str] = Field(
default_factory=list,
description="script(s) to run on EC2 instance startup (be careful!), each entry is run one after the other using '&&' operator",
)

@validator("EC2_INSTANCES_ALLOWED_TYPES")
@validator("CLUSTERS_KEEPER_EC2_INSTANCES_ALLOWED_TYPES")
@classmethod
def check_valid_intance_names(cls, value):
# NOTE: needed because of a flaw in BaseCustomSettings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ async def create_cluster(
InstanceTypeType,
next(
iter(
app_settings.CLUSTERS_KEEPER_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES
app_settings.CLUSTERS_KEEPER_EC2_INSTANCES.CLUSTERS_KEEPER_EC2_INSTANCES_ALLOWED_TYPES
)
),
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ async def create(cls, settings: EC2Settings) -> "ClustersKeeperEC2":
session = aioboto3.Session()
session_client = session.client(
"ec2",
endpoint_url=settings.EC2_ENDPOINT,
aws_access_key_id=settings.EC2_ACCESS_KEY_ID,
aws_secret_access_key=settings.EC2_SECRET_ACCESS_KEY,
region_name=settings.EC2_REGION_NAME,
endpoint_url=settings.CLUSTERS_KEEPER_EC2_ENDPOINT,
aws_access_key_id=settings.CLUSTERS_KEEPER_EC2_ACCESS_KEY_ID,
aws_secret_access_key=settings.CLUSTERS_KEEPER_EC2_SECRET_ACCESS_KEY,
region_name=settings.CLUSTERS_KEEPER_EC2_REGION_NAME,
)
assert isinstance(session_client, ClientCreatorContext) # nosec
exit_stack = contextlib.AsyncExitStack()
Expand Down Expand Up @@ -102,19 +102,19 @@ async def start_aws_instance(
current_instances = await self.get_instances(instance_settings, tags=tags)
if (
len(current_instances) + number_of_instances
> instance_settings.EC2_INSTANCES_MAX_INSTANCES
> instance_settings.CLUSTERS_KEEPER_EC2_INSTANCES_MAX_INSTANCES
):
raise Ec2TooManyInstancesError(
num_instances=instance_settings.EC2_INSTANCES_MAX_INSTANCES
num_instances=instance_settings.CLUSTERS_KEEPER_EC2_INSTANCES_MAX_INSTANCES
)

instances = await self.client.run_instances(
ImageId=instance_settings.EC2_INSTANCES_AMI_ID,
ImageId=instance_settings.CLUSTERS_KEEPER_EC2_INSTANCES_AMI_ID,
MinCount=number_of_instances,
MaxCount=number_of_instances,
InstanceType=instance_type,
InstanceInitiatedShutdownBehavior="terminate",
KeyName=instance_settings.EC2_INSTANCES_KEY_NAME,
KeyName=instance_settings.CLUSTERS_KEEPER_EC2_INSTANCES_KEY_NAME,
TagSpecifications=[
{
"ResourceType": "instance",
Expand All @@ -129,8 +129,8 @@ async def start_aws_instance(
{
"AssociatePublicIpAddress": True,
"DeviceIndex": 0,
"SubnetId": instance_settings.EC2_INSTANCES_SUBNET_ID,
"Groups": instance_settings.EC2_INSTANCES_SECURITY_GROUP_IDS,
"SubnetId": instance_settings.CLUSTERS_KEEPER_EC2_INSTANCES_SUBNET_ID,
"Groups": instance_settings.CLUSTERS_KEEPER_EC2_INSTANCES_SECURITY_GROUP_IDS,
}
],
)
Expand Down Expand Up @@ -183,7 +183,7 @@ async def get_instances(
filters: list[FilterTypeDef] = [
{
"Name": "key-name",
"Values": [instance_settings.EC2_INSTANCES_KEY_NAME],
"Values": [instance_settings.CLUSTERS_KEEPER_EC2_INSTANCES_KEY_NAME],
},
{"Name": "instance-state-name", "Values": state_names},
]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import datetime
from typing import Final

from models_library.clusters import SimpleAuthentication
from models_library.rpc_schemas_clusters_keeper.clusters import (
ClusterState,
Expand Down Expand Up @@ -39,6 +42,27 @@ def _convert_ec2_state_to_cluster_state(
return ClusterState.STOPPED


_EC2_INSTANCE_MAX_START_TIME: Final[datetime.timedelta] = datetime.timedelta(minutes=3)
_GATEWAY_READYNESS_MAX_TIME: Final[datetime.timedelta] = datetime.timedelta(minutes=3)


def _create_eta(
instance_launch_time: datetime.datetime,
*,
gateway_ready: bool,
) -> datetime.timedelta:
now = datetime.datetime.now(datetime.timezone.utc)
estimated_time_to_running = (
instance_launch_time
+ _EC2_INSTANCE_MAX_START_TIME
+ _GATEWAY_READYNESS_MAX_TIME
- now
)
if gateway_ready is True:
estimated_time_to_running = datetime.timedelta(seconds=0)
return estimated_time_to_running


def create_cluster_from_ec2_instance(
instance: EC2InstanceData,
user_id: UserID,
Expand All @@ -56,4 +80,5 @@ def create_cluster_from_ec2_instance(
user_id=user_id,
wallet_id=wallet_id,
gateway_ready=gateway_ready,
eta=_create_eta(instance.launch_time, gateway_ready=gateway_ready),
)
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def creation_ec2_tags(
assert app_settings.CLUSTERS_KEEPER_EC2_INSTANCES # nosec
return _DEFAULT_CLUSTERS_KEEPER_TAGS | {
# NOTE: this one gets special treatment in AWS GUI and is applied to the name of the instance
"Name": f"osparc-gateway-server-{app_settings.CLUSTERS_KEEPER_EC2_INSTANCES.EC2_INSTANCES_KEY_NAME}-user_id:{user_id}-wallet_id:{wallet_id}",
"Name": f"osparc-gateway-server-{app_settings.CLUSTERS_KEEPER_EC2_INSTANCES.CLUSTERS_KEEPER_EC2_INSTANCES_KEY_NAME}-user_id:{user_id}-wallet_id:{wallet_id}",
"user_id": f"{user_id}",
"wallet_id": f"{wallet_id}",
}
Expand Down
31 changes: 16 additions & 15 deletions services/clusters-keeper/tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,15 @@ def app_environment(
envs = setenvs_from_dict(
monkeypatch,
{
"EC2_ACCESS_KEY_ID": faker.pystr(),
"EC2_SECRET_ACCESS_KEY": faker.pystr(),
"EC2_INSTANCES_KEY_NAME": faker.pystr(),
"EC2_INSTANCES_SECURITY_GROUP_IDS": json.dumps(
"CLUSTERS_KEEPER_EC2_ACCESS_KEY_ID": faker.pystr(),
"CLUSTERS_KEEPER_EC2_SECRET_ACCESS_KEY": faker.pystr(),
"CLUSTERS_KEEPER_EC2_INSTANCES_KEY_NAME": faker.pystr(),
"CLUSTERS_KEEPER_EC2_INSTANCES_SECURITY_GROUP_IDS": json.dumps(
faker.pylist(allowed_types=(str,))
),
"EC2_INSTANCES_SUBNET_ID": faker.pystr(),
"EC2_INSTANCES_AMI_ID": faker.pystr(),
"EC2_INSTANCES_ALLOWED_TYPES": json.dumps(ec2_instances),
"CLUSTERS_KEEPER_EC2_INSTANCES_SUBNET_ID": faker.pystr(),
"CLUSTERS_KEEPER_EC2_INSTANCES_AMI_ID": faker.pystr(),
"CLUSTERS_KEEPER_EC2_INSTANCES_ALLOWED_TYPES": json.dumps(ec2_instances),
},
)
return mock_env_devel_environment | envs
Expand Down Expand Up @@ -123,7 +123,7 @@ def disabled_rabbitmq(app_environment: EnvVarsDict, monkeypatch: pytest.MonkeyPa

@pytest.fixture
def disabled_ec2(app_environment: EnvVarsDict, monkeypatch: pytest.MonkeyPatch):
monkeypatch.delenv("EC2_ACCESS_KEY_ID")
monkeypatch.delenv("CLUSTERS_KEEPER_EC2_ACCESS_KEY_ID")


@pytest.fixture
Expand Down Expand Up @@ -197,9 +197,9 @@ def mocked_aws_server_envs(
monkeypatch: pytest.MonkeyPatch,
) -> EnvVarsDict:
changed_envs = {
"EC2_ENDPOINT": f"http://{mocked_aws_server._ip_address}:{mocked_aws_server._port}", # pylint: disable=protected-access # noqa: SLF001
"EC2_ACCESS_KEY_ID": "xxx",
"EC2_SECRET_ACCESS_KEY": "xxx",
"CLUSTERS_KEEPER_EC2_ENDPOINT": f"http://{mocked_aws_server._ip_address}:{mocked_aws_server._port}", # pylint: disable=protected-access # noqa: SLF001
"CLUSTERS_KEEPER_EC2_ACCESS_KEY_ID": "xxx",
"CLUSTERS_KEEPER_EC2_SECRET_ACCESS_KEY": "xxx",
}
return app_environment | setenvs_from_dict(monkeypatch, changed_envs)

Expand All @@ -210,7 +210,7 @@ def aws_allowed_ec2_instance_type_names(
monkeypatch: pytest.MonkeyPatch,
) -> EnvVarsDict:
changed_envs = {
"EC2_INSTANCES_ALLOWED_TYPES": json.dumps(
"CLUSTERS_KEEPER_EC2_INSTANCES_ALLOWED_TYPES": json.dumps(
[
"t2.xlarge",
"t2.2xlarge",
Expand Down Expand Up @@ -267,7 +267,7 @@ async def aws_subnet_id(
subnet_id = subnet["Subnet"]["SubnetId"]
print(f"--> Created Subnet in AWS with {subnet_id=}")

monkeypatch.setenv("EC2_INSTANCES_SUBNET_ID", subnet_id)
monkeypatch.setenv("CLUSTERS_KEEPER_EC2_INSTANCES_SUBNET_ID", subnet_id)
yield subnet_id

# all the instances in the subnet must be terminated before that works
Expand Down Expand Up @@ -303,7 +303,8 @@ async def aws_security_group_id(
security_group_id = security_group["GroupId"]
print(f"--> Created Security Group in AWS with {security_group_id=}")
monkeypatch.setenv(
"EC2_INSTANCES_SECURITY_GROUP_IDS", json.dumps([security_group_id])
"CLUSTERS_KEEPER_EC2_INSTANCES_SECURITY_GROUP_IDS",
json.dumps([security_group_id]),
)
yield security_group_id
await ec2_client.delete_security_group(GroupId=security_group_id)
Expand All @@ -320,7 +321,7 @@ async def aws_ami_id(
images = await ec2_client.describe_images()
image = random.choice(images["Images"]) # noqa S311
ami_id = image["ImageId"] # type: ignore
monkeypatch.setenv("EC2_INSTANCES_AMI_ID", ami_id)
monkeypatch.setenv("CLUSTERS_KEEPER_EC2_INSTANCES_AMI_ID", ami_id)
return ami_id


Expand Down
19 changes: 13 additions & 6 deletions services/clusters-keeper/tests/unit/test_modules_ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,26 +118,33 @@ async def test_get_ec2_instance_capabilities(
clusters_keeper_ec2: ClustersKeeperEC2,
):
assert app_settings.CLUSTERS_KEEPER_EC2_INSTANCES
assert app_settings.CLUSTERS_KEEPER_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES
assert (
app_settings.CLUSTERS_KEEPER_EC2_INSTANCES.CLUSTERS_KEEPER_EC2_INSTANCES_ALLOWED_TYPES
)
instance_types = await clusters_keeper_ec2.get_ec2_instance_capabilities(
cast(
set[InstanceTypeType],
set(app_settings.CLUSTERS_KEEPER_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES),
set(
app_settings.CLUSTERS_KEEPER_EC2_INSTANCES.CLUSTERS_KEEPER_EC2_INSTANCES_ALLOWED_TYPES
),
)
)
assert instance_types
assert len(instance_types) == len(
app_settings.CLUSTERS_KEEPER_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES
app_settings.CLUSTERS_KEEPER_EC2_INSTANCES.CLUSTERS_KEEPER_EC2_INSTANCES_ALLOWED_TYPES
)

# all the instance names are found and valid
assert all(
i.name in app_settings.CLUSTERS_KEEPER_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES
i.name
in app_settings.CLUSTERS_KEEPER_EC2_INSTANCES.CLUSTERS_KEEPER_EC2_INSTANCES_ALLOWED_TYPES
for i in instance_types
)
for (
instance_type_name
) in app_settings.CLUSTERS_KEEPER_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES:
) in (
app_settings.CLUSTERS_KEEPER_EC2_INSTANCES.CLUSTERS_KEEPER_EC2_INSTANCES_ALLOWED_TYPES
):
assert any(i.name == instance_type_name for i in instance_types)


Expand Down Expand Up @@ -207,7 +214,7 @@ async def test_start_aws_instance_is_limited_in_number_of_instances(
tags = faker.pydict(allowed_types=(str,))
startup_script = faker.pystr()
for _ in range(
app_settings.CLUSTERS_KEEPER_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES
app_settings.CLUSTERS_KEEPER_EC2_INSTANCES.CLUSTERS_KEEPER_EC2_INSTANCES_MAX_INSTANCES
):
await clusters_keeper_ec2.start_aws_instance(
app_settings.CLUSTERS_KEEPER_EC2_INSTANCES,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,9 @@ class ComputationalBackendOnDemandClustersKeeperNotReadyError(

class ComputationalBackendOnDemandNotReadyError(PydanticErrorMixin, SchedulerError):
code = "computational_backend.on_demand_cluster.not_ready"
msg_template = "The on demand computational cluster is not ready"
msg_template = (
"The on demand computational cluster is not ready 'est. remaining time: {eta}'"
)


#
Expand Down
Loading

0 comments on commit 0394294

Please sign in to comment.