Skip to content

Commit

Permalink
test both computational and dynamic
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg committed May 16, 2024
1 parent a9ea5ec commit 80d5512
Show file tree
Hide file tree
Showing 3 changed files with 212 additions and 55 deletions.
30 changes: 29 additions & 1 deletion services/autoscaling/tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import datetime
import json
import random
from collections.abc import AsyncIterator, Awaitable, Callable
from collections.abc import AsyncIterator, Awaitable, Callable, Iterator
from copy import deepcopy
from pathlib import Path
from typing import Any, Final, cast
Expand Down Expand Up @@ -843,3 +843,31 @@ def mock_machines_buffer(monkeypatch: pytest.MonkeyPatch) -> int:
num_machines_in_buffer = 5
monkeypatch.setenv("EC2_INSTANCES_MACHINES_BUFFER", f"{num_machines_in_buffer}")
return num_machines_in_buffer


@pytest.fixture
def mock_find_node_with_name_returns_none(mocker: MockerFixture) -> Iterator[mock.Mock]:
return mocker.patch(
"simcore_service_autoscaling.modules.auto_scaling_core.utils_docker.find_node_with_name",
autospec=True,
return_value=None,
)


@pytest.fixture(scope="session")
def short_ec2_instance_max_start_time() -> datetime.timedelta:
return datetime.timedelta(seconds=10)


@pytest.fixture
def with_short_ec2_instances_max_start_time(
app_environment: EnvVarsDict,
monkeypatch: pytest.MonkeyPatch,
short_ec2_instance_max_start_time: datetime.timedelta,
) -> EnvVarsDict:
return app_environment | setenvs_from_dict(
monkeypatch,
{
"EC2_INSTANCES_MAX_START_TIME": f"{short_ec2_instance_max_start_time}",
},
)
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
)
from types_aiobotocore_ec2.client import EC2Client
from types_aiobotocore_ec2.literals import InstanceTypeType
from types_aiobotocore_ec2.type_defs import InstanceTypeDef


@pytest.fixture
Expand Down Expand Up @@ -98,11 +99,10 @@ async def _assert_ec2_instances(
num_instances: int,
instance_type: str,
instance_state: str,
) -> list[str]:
) -> list[InstanceTypeDef]:
list_instances: list[InstanceTypeDef] = []
all_instances = await ec2_client.describe_instances()

assert len(all_instances["Reservations"]) == num_reservations
internal_dns_names = []
for reservation in all_instances["Reservations"]:
assert "Instances" in reservation
assert len(reservation["Instances"]) == num_instances
Expand All @@ -127,7 +127,6 @@ async def _assert_ec2_instances(
assert "PrivateDnsName" in instance
instance_private_dns_name = instance["PrivateDnsName"]
assert instance_private_dns_name.endswith(".ec2.internal")
internal_dns_names.append(instance_private_dns_name)
assert "State" in instance
state = instance["State"]
assert "Name" in state
Expand All @@ -141,7 +140,8 @@ async def _assert_ec2_instances(
assert "Value" in user_data["UserData"]
user_data = base64.b64decode(user_data["UserData"]["Value"]).decode()
assert user_data.count("docker swarm join") == 1
return internal_dns_names
list_instances.append(instance)
return list_instances


def _assert_rabbit_autoscaling_message_sent(
Expand Down Expand Up @@ -169,7 +169,7 @@ def _assert_rabbit_autoscaling_message_sent(


@pytest.fixture
def mock_docker_find_node_with_name(
def mock_docker_find_node_with_name_returns_fake_node(
mocker: MockerFixture, fake_node: DockerNode
) -> Iterator[mock.Mock]:
return mocker.patch(
Expand Down Expand Up @@ -376,7 +376,7 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915
mock_docker_tag_node: mock.Mock,
fake_node: DockerNode,
mock_rabbitmq_post_message: mock.Mock,
mock_docker_find_node_with_name: mock.Mock,
mock_docker_find_node_with_name_returns_fake_node: mock.Mock,
mock_docker_set_node_availability: mock.Mock,
mock_docker_compute_node_used_resources: mock.Mock,
mock_dask_get_worker_has_results_in_memory: mock.Mock,
Expand Down Expand Up @@ -418,7 +418,7 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915
)

# as the new node is already running, but is not yet connected, hence not tagged and drained
mock_docker_find_node_with_name.assert_not_called()
mock_docker_find_node_with_name_returns_fake_node.assert_not_called()
mock_docker_tag_node.assert_not_called()
mock_docker_set_node_availability.assert_not_called()
mock_docker_compute_node_used_resources.assert_not_called()
Expand All @@ -445,19 +445,20 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915
mock_dask_get_worker_used_resources.assert_called_once()
mock_dask_get_worker_used_resources.reset_mock()
mock_dask_is_worker_connected.assert_not_called()
internal_dns_names = await _assert_ec2_instances(
instances = await _assert_ec2_instances(
ec2_client,
num_reservations=1,
num_instances=1,
instance_type=expected_ec2_type,
instance_state="running",
)
assert len(internal_dns_names) == 1
internal_dns_name = internal_dns_names[0].removesuffix(".ec2.internal")
assert len(instances) == 1
assert "PrivateDnsName" in instances[0]
internal_dns_name = instances[0]["PrivateDnsName"].removesuffix(".ec2.internal")

# the node is attached first and then tagged and made active right away since we still have the pending task
mock_docker_find_node_with_name.assert_called_once()
mock_docker_find_node_with_name.reset_mock()
mock_docker_find_node_with_name_returns_fake_node.assert_called_once()
mock_docker_find_node_with_name_returns_fake_node.reset_mock()
expected_docker_node_tags = {
DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY: expected_ec2_type
}
Expand Down Expand Up @@ -559,7 +560,7 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915
mock_dask_get_worker_used_resources.assert_called()
assert mock_dask_get_worker_used_resources.call_count == 2 * num_useless_calls
mock_dask_get_worker_used_resources.reset_mock()
mock_docker_find_node_with_name.assert_not_called()
mock_docker_find_node_with_name_returns_fake_node.assert_not_called()
mock_docker_tag_node.assert_not_called()
mock_docker_set_node_availability.assert_not_called()
# check the number of instances did not change and is still running
Expand Down Expand Up @@ -801,7 +802,7 @@ async def test_cluster_scaling_up_starts_multiple_instances(
mock_docker_tag_node: mock.Mock,
scale_up_params: _ScaleUpParams,
mock_rabbitmq_post_message: mock.Mock,
mock_docker_find_node_with_name: mock.Mock,
mock_docker_find_node_with_name_returns_fake_node: mock.Mock,
mock_docker_set_node_availability: mock.Mock,
dask_spec_local_cluster: distributed.SpecCluster,
):
Expand Down Expand Up @@ -837,7 +838,7 @@ async def test_cluster_scaling_up_starts_multiple_instances(
)

# as the new node is already running, but is not yet connected, hence not tagged and drained
mock_docker_find_node_with_name.assert_not_called()
mock_docker_find_node_with_name_returns_fake_node.assert_not_called()
mock_docker_tag_node.assert_not_called()
mock_docker_set_node_availability.assert_not_called()
# check rabbit messages were sent
Expand All @@ -862,7 +863,7 @@ async def test_cluster_scaling_up_more_than_allowed_max_starts_max_instances_and
create_dask_task_resources: Callable[..., DaskTaskResources],
mock_docker_tag_node: mock.Mock,
mock_rabbitmq_post_message: mock.Mock,
mock_docker_find_node_with_name: mock.Mock,
mock_docker_find_node_with_name_returns_fake_node: mock.Mock,
mock_docker_set_node_availability: mock.Mock,
mock_docker_compute_node_used_resources: mock.Mock,
mock_dask_get_worker_has_results_in_memory: mock.Mock,
Expand Down Expand Up @@ -904,7 +905,7 @@ async def test_cluster_scaling_up_more_than_allowed_max_starts_max_instances_and
instance_state="running",
)
# as the new node is already running, but is not yet connected, hence not tagged and drained
mock_docker_find_node_with_name.assert_not_called()
mock_docker_find_node_with_name_returns_fake_node.assert_not_called()
mock_docker_tag_node.assert_not_called()
mock_docker_set_node_availability.assert_not_called()
mock_docker_compute_node_used_resources.assert_not_called()
Expand Down Expand Up @@ -947,7 +948,7 @@ async def test_cluster_scaling_up_more_than_allowed_with_multiple_types_max_star
create_dask_task_resources: Callable[..., DaskTaskResources],
mock_docker_tag_node: mock.Mock,
mock_rabbitmq_post_message: mock.Mock,
mock_docker_find_node_with_name: mock.Mock,
mock_docker_find_node_with_name_returns_fake_node: mock.Mock,
mock_docker_set_node_availability: mock.Mock,
mock_docker_compute_node_used_resources: mock.Mock,
mock_dask_get_worker_has_results_in_memory: mock.Mock,
Expand Down Expand Up @@ -1002,7 +1003,7 @@ async def test_cluster_scaling_up_more_than_allowed_with_multiple_types_max_star
)

# as the new node is already running, but is not yet connected, hence not tagged and drained
mock_docker_find_node_with_name.assert_not_called()
mock_docker_find_node_with_name_returns_fake_node.assert_not_called()
mock_docker_tag_node.assert_not_called()
mock_docker_set_node_availability.assert_not_called()
mock_docker_compute_node_used_resources.assert_not_called()
Expand All @@ -1029,3 +1030,159 @@ async def test_cluster_scaling_up_more_than_allowed_with_multiple_types_max_star
assert len(all_instances["Reservations"]) == len(
aws_allowed_ec2_instance_type_names
)


@pytest.mark.parametrize(
"dask_task_imposed_ec2_type, dask_ram, expected_ec2_type",
[
pytest.param(
None,
parse_obj_as(ByteSize, "128Gib"),
"r5n.4xlarge",
id="No explicit instance defined",
),
],
)
async def test_long_pending_ec2_is_detected_as_broken_terminated_and_restarted(
with_short_ec2_instances_max_start_time: EnvVarsDict,
minimal_configuration: None,
app_settings: ApplicationSettings,
initialized_app: FastAPI,
create_dask_task: Callable[[DaskTaskResources], distributed.Future],
ec2_client: EC2Client,
dask_task_imposed_ec2_type: InstanceTypeType | None,
dask_ram: ByteSize | None,
create_dask_task_resources: Callable[..., DaskTaskResources],
dask_spec_local_cluster: distributed.SpecCluster,
expected_ec2_type: InstanceTypeType,
mock_find_node_with_name_returns_none: mock.Mock,
mock_docker_tag_node: mock.Mock,
mock_rabbitmq_post_message: mock.Mock,
short_ec2_instance_max_start_time: datetime.timedelta,
):
assert app_settings.AUTOSCALING_EC2_INSTANCES
assert (
short_ec2_instance_max_start_time
== app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_START_TIME
)
# we have nothing running now
all_instances = await ec2_client.describe_instances()
assert not all_instances["Reservations"]
# create a task that needs more power
dask_future = await _create_task_with_resources(
ec2_client,
dask_task_imposed_ec2_type,
dask_ram,
create_dask_task_resources,
create_dask_task,
)
assert dask_future
# this should trigger a scaling up as we have no nodes
await auto_scale_cluster(
app=initialized_app, auto_scaling_mode=ComputationalAutoscaling()
)

# check the instance was started and we have exactly 1
instances = await _assert_ec2_instances(
ec2_client,
num_reservations=1,
num_instances=1,
instance_type=expected_ec2_type,
instance_state="running",
)

# as the new node is already running, but is not yet connected, hence not tagged and drained
mock_find_node_with_name_returns_none.assert_not_called()
mock_docker_tag_node.assert_not_called()
# check rabbit messages were sent
_assert_rabbit_autoscaling_message_sent(
mock_rabbitmq_post_message,
app_settings,
initialized_app,
dask_spec_local_cluster.scheduler_address,
instances_running=0,
instances_pending=1,
)
mock_rabbitmq_post_message.reset_mock()

assert instances
assert "LaunchTime" in instances[0]
original_instance_launch_time: datetime.datetime = deepcopy(
instances[0]["LaunchTime"]
)
await asyncio.sleep(1) # NOTE: we wait here since AWS does not keep microseconds
now = arrow.utcnow().datetime

assert now > original_instance_launch_time
assert now < (
original_instance_launch_time
+ app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_START_TIME
)

# 2. running again several times the autoscaler, the node does not join
for i in range(7):
await auto_scale_cluster(
app=initialized_app, auto_scaling_mode=ComputationalAutoscaling()
)
# there should be no scaling up, since there is already a pending instance
instances = await _assert_ec2_instances(
ec2_client,
num_reservations=1,
num_instances=1,
instance_type=expected_ec2_type,
instance_state="running",
)
assert mock_find_node_with_name_returns_none.call_count == i + 1
mock_docker_tag_node.assert_not_called()
_assert_rabbit_autoscaling_message_sent(
mock_rabbitmq_post_message,
app_settings,
initialized_app,
dask_spec_local_cluster.scheduler_address,
instances_running=0,
instances_pending=1,
)
mock_rabbitmq_post_message.reset_mock()
assert instances
assert "LaunchTime" in instances[0]
assert instances[0]["LaunchTime"] == original_instance_launch_time

# 3. wait for the instance max start time and try again, shall terminate the instance
now = arrow.utcnow().datetime
sleep_time = (
original_instance_launch_time
+ app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_START_TIME
- now
).total_seconds() + 1
print(
f"--> waiting now for {sleep_time}s for the pending EC2 to be deemed as unworthy"
)
await asyncio.sleep(sleep_time)
now = arrow.utcnow().datetime
assert now > (
original_instance_launch_time
+ app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_START_TIME
)
# scaling now will terminate the broken ec2 that did not connect, and directly create a replacement
await auto_scale_cluster(
app=initialized_app, auto_scaling_mode=ComputationalAutoscaling()
)
# we have therefore 2 reservations, first instance is terminated and a second one started
all_instances = await ec2_client.describe_instances()
assert len(all_instances["Reservations"]) == 2
assert "Instances" in all_instances["Reservations"][0]
assert len(all_instances["Reservations"][0]["Instances"]) == 1
assert "State" in all_instances["Reservations"][0]["Instances"][0]
assert "Name" in all_instances["Reservations"][0]["Instances"][0]["State"]
assert (
all_instances["Reservations"][0]["Instances"][0]["State"]["Name"]
== "terminated"
)

assert "Instances" in all_instances["Reservations"][1]
assert len(all_instances["Reservations"][1]["Instances"]) == 1
assert "State" in all_instances["Reservations"][1]["Instances"][0]
assert "Name" in all_instances["Reservations"][1]["Instances"][0]["State"]
assert (
all_instances["Reservations"][1]["Instances"][0]["State"]["Name"] == "running"
)
Loading

0 comments on commit 80d5512

Please sign in to comment.