Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨Autoscaling: terminate long pending EC2s #5832

Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,10 @@ class EC2InstancesSettings(BaseCustomSettings):
)
EC2_INSTANCES_MAX_START_TIME: datetime.timedelta = Field(
default=datetime.timedelta(minutes=1),
description="Usual time taken an EC2 instance with the given AMI takes to be in 'running' mode "
"(default to seconds, or see https://pydantic-docs.helpmanual.io/usage/types/#datetime-types for string formating)",
description="Usual time taken an EC2 instance with the given AMI takes to join the cluster "
"(default to seconds, or see https://pydantic-docs.helpmanual.io/usage/types/#datetime-types for string formating)."
"NOTE: be careful that this time should always be a factor larger than the real time, as EC2 instances"
"that take longer than this time will be terminated as sometimes it happens that EC2 machine fail on start.",
)
EC2_INSTANCES_NAME_PREFIX: str = Field(
default="autoscaling",
Expand Down Expand Up @@ -125,11 +127,8 @@ def check_valid_instance_names(
) -> dict[str, EC2InstanceBootSpecific]:
# NOTE: needed because of a flaw in BaseCustomSettings
# issubclass raises TypeError if used on Aliases
if all(parse_obj_as(InstanceTypeType, key) for key in value):
return value

msg = "Invalid instance type name"
raise ValueError(msg)
parse_obj_as(list[InstanceTypeType], list(value))
return value


class NodesMonitoringSettings(BaseCustomSettings):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ class Cluster:
"description": "This is an EC2 instance that is not yet associated to a docker node"
}
)
broken_ec2s: list[NonAssociatedInstance] = field(
metadata={
"description": "This is an existing EC2 instance that never properly joined the cluster and is deemed as broken and will be terminated"
}
)
disconnected_nodes: list[Node] = field(
metadata={
"description": "This is a docker node which is not backed by a running EC2 instance"
Expand All @@ -94,6 +99,7 @@ def total_number_of_machines(self) -> int:
+ len(self.drained_nodes)
+ len(self.reserve_drained_nodes)
+ len(self.pending_ec2s)
+ len(self.broken_ec2s)
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,28 @@ async def _analyze_current_cluster(
docker_nodes, existing_ec2_instances
)

# analyse pending ec2s, check if they are pending since too long
now = arrow.utcnow().datetime
broken_ec2s = [
instance
for instance in pending_ec2s
if (now - instance.launch_time)
> app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_START_TIME
]
if broken_ec2s:
_logger.error(
"Detected broken EC2 instances that never joined the cluster after %s: %s\n"
"TIP: if this happens very often the time to start an EC2 might have increased or "
"something might be wrong with the used AMI and/or boot script in which case this"
" would happen all the time. Please check",
app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_START_TIME,
f"{[_.id for _ in broken_ec2s]}",
)
# remove the broken ec2s from the pending ones
pending_ec2s = [
instance for instance in pending_ec2s if instance not in broken_ec2s
]

# analyse attached ec2s
active_nodes, pending_nodes, all_drained_nodes = [], [], []
for instance in attached_ec2s:
Expand Down Expand Up @@ -113,6 +135,7 @@ async def _analyze_current_cluster(
drained_nodes=drained_nodes,
reserve_drained_nodes=reserve_drained_nodes,
pending_ec2s=[NonAssociatedInstance(ec2_instance=i) for i in pending_ec2s],
broken_ec2s=[NonAssociatedInstance(ec2_instance=i) for i in broken_ec2s],
terminated_instances=terminated_ec2_instances,
disconnected_nodes=[n for n in docker_nodes if _node_not_ready(n)],
)
Expand All @@ -124,6 +147,7 @@ async def _analyze_current_cluster(
"drained_nodes": "available_resources",
"reserve_drained_nodes": True,
"pending_ec2s": "ec2_instance",
"broken_ec2s": "ec2_instance",
},
)
_logger.info(
Expand Down Expand Up @@ -152,6 +176,25 @@ async def _cleanup_disconnected_nodes(app: FastAPI, cluster: Cluster) -> Cluster
return dataclasses.replace(cluster, disconnected_nodes=[])


async def _terminate_broken_ec2s(app: FastAPI, cluster: Cluster) -> Cluster:
broken_instances = [i.ec2_instance for i in cluster.broken_ec2s]
if broken_instances:
with log_context(
_logger, logging.WARNING, msg="terminate broken EC2 instances"
):
await get_ec2_client(app).terminate_instances(broken_instances)
if has_instrumentation(app):
instrumentation = get_instrumentation(app)
for i in cluster.broken_ec2s:
instrumentation.instance_terminated(i.ec2_instance.type)

return dataclasses.replace(
cluster,
broken_ec2s=[],
terminated_instances=cluster.terminated_instances + broken_instances,
)


async def _try_attach_pending_ec2s(
app: FastAPI,
cluster: Cluster,
Expand Down Expand Up @@ -976,6 +1019,7 @@ async def auto_scale_cluster(
app, auto_scaling_mode, allowed_instance_types
)
cluster = await _cleanup_disconnected_nodes(app, cluster)
cluster = await _terminate_broken_ec2s(app, cluster)
cluster = await _try_attach_pending_ec2s(
app, cluster, auto_scaling_mode, allowed_instance_types
)
Expand Down
31 changes: 30 additions & 1 deletion services/autoscaling/tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import datetime
import json
import random
from collections.abc import AsyncIterator, Awaitable, Callable
from collections.abc import AsyncIterator, Awaitable, Callable, Iterator
from copy import deepcopy
from pathlib import Path
from typing import Any, Final, cast
Expand Down Expand Up @@ -684,6 +684,7 @@ def _creator(**cluter_overrides) -> Cluster:
drained_nodes=[],
reserve_drained_nodes=[],
pending_ec2s=[],
broken_ec2s=[],
disconnected_nodes=[],
terminated_instances=[],
),
Expand Down Expand Up @@ -842,3 +843,31 @@ def mock_machines_buffer(monkeypatch: pytest.MonkeyPatch) -> int:
num_machines_in_buffer = 5
monkeypatch.setenv("EC2_INSTANCES_MACHINES_BUFFER", f"{num_machines_in_buffer}")
return num_machines_in_buffer


@pytest.fixture
def mock_find_node_with_name_returns_none(mocker: MockerFixture) -> Iterator[mock.Mock]:
return mocker.patch(
"simcore_service_autoscaling.modules.auto_scaling_core.utils_docker.find_node_with_name",
autospec=True,
return_value=None,
)


@pytest.fixture(scope="session")
def short_ec2_instance_max_start_time() -> datetime.timedelta:
return datetime.timedelta(seconds=10)


@pytest.fixture
def with_short_ec2_instances_max_start_time(
app_environment: EnvVarsDict,
monkeypatch: pytest.MonkeyPatch,
short_ec2_instance_max_start_time: datetime.timedelta,
) -> EnvVarsDict:
return app_environment | setenvs_from_dict(
monkeypatch,
{
"EC2_INSTANCES_MAX_START_TIME": f"{short_ec2_instance_max_start_time}",
},
)
19 changes: 19 additions & 0 deletions services/autoscaling/tests/unit/test_core_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,3 +162,22 @@ def test_EC2_INSTANCES_PRE_PULL_IMAGES( # noqa: N802
] == next(
iter(settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES.values())
).pre_pull_images


def test_invalid_instance_names(
app_environment: EnvVarsDict, monkeypatch: pytest.MonkeyPatch, faker: Faker
):
settings = ApplicationSettings.create_from_envs()
assert settings.AUTOSCALING_EC2_INSTANCES

# passing an invalid image tag name will fail
setenvs_from_dict(
monkeypatch,
{
"EC2_INSTANCES_ALLOWED_TYPES": json.dumps(
{faker.pystr(): {"ami_id": faker.pystr(), "pre_pull_images": []}}
)
},
)
with pytest.raises(ValidationError):
ApplicationSettings.create_from_envs()
Loading
Loading