From 80d5512cb7f4ff86b517de0e9ac3b53f72477134 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Thu, 16 May 2024 14:05:02 +0200 Subject: [PATCH] test both computational and dynamic --- services/autoscaling/tests/unit/conftest.py | 30 ++- ...test_modules_auto_scaling_computational.py | 197 ++++++++++++++++-- .../unit/test_modules_auto_scaling_dynamic.py | 40 +--- 3 files changed, 212 insertions(+), 55 deletions(-) diff --git a/services/autoscaling/tests/unit/conftest.py b/services/autoscaling/tests/unit/conftest.py index 87bd2ca2a41..450bd675669 100644 --- a/services/autoscaling/tests/unit/conftest.py +++ b/services/autoscaling/tests/unit/conftest.py @@ -7,7 +7,7 @@ import datetime import json import random -from collections.abc import AsyncIterator, Awaitable, Callable +from collections.abc import AsyncIterator, Awaitable, Callable, Iterator from copy import deepcopy from pathlib import Path from typing import Any, Final, cast @@ -843,3 +843,31 @@ def mock_machines_buffer(monkeypatch: pytest.MonkeyPatch) -> int: num_machines_in_buffer = 5 monkeypatch.setenv("EC2_INSTANCES_MACHINES_BUFFER", f"{num_machines_in_buffer}") return num_machines_in_buffer + + +@pytest.fixture +def mock_find_node_with_name_returns_none(mocker: MockerFixture) -> Iterator[mock.Mock]: + return mocker.patch( + "simcore_service_autoscaling.modules.auto_scaling_core.utils_docker.find_node_with_name", + autospec=True, + return_value=None, + ) + + +@pytest.fixture(scope="session") +def short_ec2_instance_max_start_time() -> datetime.timedelta: + return datetime.timedelta(seconds=10) + + +@pytest.fixture +def with_short_ec2_instances_max_start_time( + app_environment: EnvVarsDict, + monkeypatch: pytest.MonkeyPatch, + short_ec2_instance_max_start_time: datetime.timedelta, +) -> EnvVarsDict: + return app_environment | setenvs_from_dict( + monkeypatch, + { + "EC2_INSTANCES_MAX_START_TIME": f"{short_ec2_instance_max_start_time}", + }, + ) diff --git a/services/autoscaling/tests/unit/test_modules_auto_scaling_computational.py b/services/autoscaling/tests/unit/test_modules_auto_scaling_computational.py index bc60b9753fc..f51c36e191c 100644 --- a/services/autoscaling/tests/unit/test_modules_auto_scaling_computational.py +++ b/services/autoscaling/tests/unit/test_modules_auto_scaling_computational.py @@ -49,6 +49,7 @@ ) from types_aiobotocore_ec2.client import EC2Client from types_aiobotocore_ec2.literals import InstanceTypeType +from types_aiobotocore_ec2.type_defs import InstanceTypeDef @pytest.fixture @@ -98,11 +99,10 @@ async def _assert_ec2_instances( num_instances: int, instance_type: str, instance_state: str, -) -> list[str]: +) -> list[InstanceTypeDef]: + list_instances: list[InstanceTypeDef] = [] all_instances = await ec2_client.describe_instances() - assert len(all_instances["Reservations"]) == num_reservations - internal_dns_names = [] for reservation in all_instances["Reservations"]: assert "Instances" in reservation assert len(reservation["Instances"]) == num_instances @@ -127,7 +127,6 @@ async def _assert_ec2_instances( assert "PrivateDnsName" in instance instance_private_dns_name = instance["PrivateDnsName"] assert instance_private_dns_name.endswith(".ec2.internal") - internal_dns_names.append(instance_private_dns_name) assert "State" in instance state = instance["State"] assert "Name" in state @@ -141,7 +140,8 @@ async def _assert_ec2_instances( assert "Value" in user_data["UserData"] user_data = base64.b64decode(user_data["UserData"]["Value"]).decode() assert user_data.count("docker swarm join") == 1 - return internal_dns_names + list_instances.append(instance) + return list_instances def _assert_rabbit_autoscaling_message_sent( @@ -169,7 +169,7 @@ def _assert_rabbit_autoscaling_message_sent( @pytest.fixture -def mock_docker_find_node_with_name( +def mock_docker_find_node_with_name_returns_fake_node( mocker: MockerFixture, fake_node: DockerNode ) -> Iterator[mock.Mock]: return mocker.patch( @@ -376,7 +376,7 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 mock_docker_tag_node: mock.Mock, fake_node: DockerNode, mock_rabbitmq_post_message: mock.Mock, - mock_docker_find_node_with_name: mock.Mock, + mock_docker_find_node_with_name_returns_fake_node: mock.Mock, mock_docker_set_node_availability: mock.Mock, mock_docker_compute_node_used_resources: mock.Mock, mock_dask_get_worker_has_results_in_memory: mock.Mock, @@ -418,7 +418,7 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 ) # as the new node is already running, but is not yet connected, hence not tagged and drained - mock_docker_find_node_with_name.assert_not_called() + mock_docker_find_node_with_name_returns_fake_node.assert_not_called() mock_docker_tag_node.assert_not_called() mock_docker_set_node_availability.assert_not_called() mock_docker_compute_node_used_resources.assert_not_called() @@ -445,19 +445,20 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 mock_dask_get_worker_used_resources.assert_called_once() mock_dask_get_worker_used_resources.reset_mock() mock_dask_is_worker_connected.assert_not_called() - internal_dns_names = await _assert_ec2_instances( + instances = await _assert_ec2_instances( ec2_client, num_reservations=1, num_instances=1, instance_type=expected_ec2_type, instance_state="running", ) - assert len(internal_dns_names) == 1 - internal_dns_name = internal_dns_names[0].removesuffix(".ec2.internal") + assert len(instances) == 1 + assert "PrivateDnsName" in instances[0] + internal_dns_name = instances[0]["PrivateDnsName"].removesuffix(".ec2.internal") # the node is attached first and then tagged and made active right away since we still have the pending task - mock_docker_find_node_with_name.assert_called_once() - mock_docker_find_node_with_name.reset_mock() + mock_docker_find_node_with_name_returns_fake_node.assert_called_once() + mock_docker_find_node_with_name_returns_fake_node.reset_mock() expected_docker_node_tags = { DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY: expected_ec2_type } @@ -559,7 +560,7 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 mock_dask_get_worker_used_resources.assert_called() assert mock_dask_get_worker_used_resources.call_count == 2 * num_useless_calls mock_dask_get_worker_used_resources.reset_mock() - mock_docker_find_node_with_name.assert_not_called() + mock_docker_find_node_with_name_returns_fake_node.assert_not_called() mock_docker_tag_node.assert_not_called() mock_docker_set_node_availability.assert_not_called() # check the number of instances did not change and is still running @@ -801,7 +802,7 @@ async def test_cluster_scaling_up_starts_multiple_instances( mock_docker_tag_node: mock.Mock, scale_up_params: _ScaleUpParams, mock_rabbitmq_post_message: mock.Mock, - mock_docker_find_node_with_name: mock.Mock, + mock_docker_find_node_with_name_returns_fake_node: mock.Mock, mock_docker_set_node_availability: mock.Mock, dask_spec_local_cluster: distributed.SpecCluster, ): @@ -837,7 +838,7 @@ async def test_cluster_scaling_up_starts_multiple_instances( ) # as the new node is already running, but is not yet connected, hence not tagged and drained - mock_docker_find_node_with_name.assert_not_called() + mock_docker_find_node_with_name_returns_fake_node.assert_not_called() mock_docker_tag_node.assert_not_called() mock_docker_set_node_availability.assert_not_called() # check rabbit messages were sent @@ -862,7 +863,7 @@ async def test_cluster_scaling_up_more_than_allowed_max_starts_max_instances_and create_dask_task_resources: Callable[..., DaskTaskResources], mock_docker_tag_node: mock.Mock, mock_rabbitmq_post_message: mock.Mock, - mock_docker_find_node_with_name: mock.Mock, + mock_docker_find_node_with_name_returns_fake_node: mock.Mock, mock_docker_set_node_availability: mock.Mock, mock_docker_compute_node_used_resources: mock.Mock, mock_dask_get_worker_has_results_in_memory: mock.Mock, @@ -904,7 +905,7 @@ async def test_cluster_scaling_up_more_than_allowed_max_starts_max_instances_and instance_state="running", ) # as the new node is already running, but is not yet connected, hence not tagged and drained - mock_docker_find_node_with_name.assert_not_called() + mock_docker_find_node_with_name_returns_fake_node.assert_not_called() mock_docker_tag_node.assert_not_called() mock_docker_set_node_availability.assert_not_called() mock_docker_compute_node_used_resources.assert_not_called() @@ -947,7 +948,7 @@ async def test_cluster_scaling_up_more_than_allowed_with_multiple_types_max_star create_dask_task_resources: Callable[..., DaskTaskResources], mock_docker_tag_node: mock.Mock, mock_rabbitmq_post_message: mock.Mock, - mock_docker_find_node_with_name: mock.Mock, + mock_docker_find_node_with_name_returns_fake_node: mock.Mock, mock_docker_set_node_availability: mock.Mock, mock_docker_compute_node_used_resources: mock.Mock, mock_dask_get_worker_has_results_in_memory: mock.Mock, @@ -1002,7 +1003,7 @@ async def test_cluster_scaling_up_more_than_allowed_with_multiple_types_max_star ) # as the new node is already running, but is not yet connected, hence not tagged and drained - mock_docker_find_node_with_name.assert_not_called() + mock_docker_find_node_with_name_returns_fake_node.assert_not_called() mock_docker_tag_node.assert_not_called() mock_docker_set_node_availability.assert_not_called() mock_docker_compute_node_used_resources.assert_not_called() @@ -1029,3 +1030,159 @@ async def test_cluster_scaling_up_more_than_allowed_with_multiple_types_max_star assert len(all_instances["Reservations"]) == len( aws_allowed_ec2_instance_type_names ) + + +@pytest.mark.parametrize( + "dask_task_imposed_ec2_type, dask_ram, expected_ec2_type", + [ + pytest.param( + None, + parse_obj_as(ByteSize, "128Gib"), + "r5n.4xlarge", + id="No explicit instance defined", + ), + ], +) +async def test_long_pending_ec2_is_detected_as_broken_terminated_and_restarted( + with_short_ec2_instances_max_start_time: EnvVarsDict, + minimal_configuration: None, + app_settings: ApplicationSettings, + initialized_app: FastAPI, + create_dask_task: Callable[[DaskTaskResources], distributed.Future], + ec2_client: EC2Client, + dask_task_imposed_ec2_type: InstanceTypeType | None, + dask_ram: ByteSize | None, + create_dask_task_resources: Callable[..., DaskTaskResources], + dask_spec_local_cluster: distributed.SpecCluster, + expected_ec2_type: InstanceTypeType, + mock_find_node_with_name_returns_none: mock.Mock, + mock_docker_tag_node: mock.Mock, + mock_rabbitmq_post_message: mock.Mock, + short_ec2_instance_max_start_time: datetime.timedelta, +): + assert app_settings.AUTOSCALING_EC2_INSTANCES + assert ( + short_ec2_instance_max_start_time + == app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_START_TIME + ) + # we have nothing running now + all_instances = await ec2_client.describe_instances() + assert not all_instances["Reservations"] + # create a task that needs more power + dask_future = await _create_task_with_resources( + ec2_client, + dask_task_imposed_ec2_type, + dask_ram, + create_dask_task_resources, + create_dask_task, + ) + assert dask_future + # this should trigger a scaling up as we have no nodes + await auto_scale_cluster( + app=initialized_app, auto_scaling_mode=ComputationalAutoscaling() + ) + + # check the instance was started and we have exactly 1 + instances = await _assert_ec2_instances( + ec2_client, + num_reservations=1, + num_instances=1, + instance_type=expected_ec2_type, + instance_state="running", + ) + + # as the new node is already running, but is not yet connected, hence not tagged and drained + mock_find_node_with_name_returns_none.assert_not_called() + mock_docker_tag_node.assert_not_called() + # check rabbit messages were sent + _assert_rabbit_autoscaling_message_sent( + mock_rabbitmq_post_message, + app_settings, + initialized_app, + dask_spec_local_cluster.scheduler_address, + instances_running=0, + instances_pending=1, + ) + mock_rabbitmq_post_message.reset_mock() + + assert instances + assert "LaunchTime" in instances[0] + original_instance_launch_time: datetime.datetime = deepcopy( + instances[0]["LaunchTime"] + ) + await asyncio.sleep(1) # NOTE: we wait here since AWS does not keep microseconds + now = arrow.utcnow().datetime + + assert now > original_instance_launch_time + assert now < ( + original_instance_launch_time + + app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_START_TIME + ) + + # 2. running again several times the autoscaler, the node does not join + for i in range(7): + await auto_scale_cluster( + app=initialized_app, auto_scaling_mode=ComputationalAutoscaling() + ) + # there should be no scaling up, since there is already a pending instance + instances = await _assert_ec2_instances( + ec2_client, + num_reservations=1, + num_instances=1, + instance_type=expected_ec2_type, + instance_state="running", + ) + assert mock_find_node_with_name_returns_none.call_count == i + 1 + mock_docker_tag_node.assert_not_called() + _assert_rabbit_autoscaling_message_sent( + mock_rabbitmq_post_message, + app_settings, + initialized_app, + dask_spec_local_cluster.scheduler_address, + instances_running=0, + instances_pending=1, + ) + mock_rabbitmq_post_message.reset_mock() + assert instances + assert "LaunchTime" in instances[0] + assert instances[0]["LaunchTime"] == original_instance_launch_time + + # 3. wait for the instance max start time and try again, shall terminate the instance + now = arrow.utcnow().datetime + sleep_time = ( + original_instance_launch_time + + app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_START_TIME + - now + ).total_seconds() + 1 + print( + f"--> waiting now for {sleep_time}s for the pending EC2 to be deemed as unworthy" + ) + await asyncio.sleep(sleep_time) + now = arrow.utcnow().datetime + assert now > ( + original_instance_launch_time + + app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_START_TIME + ) + # scaling now will terminate the broken ec2 that did not connect, and directly create a replacement + await auto_scale_cluster( + app=initialized_app, auto_scaling_mode=ComputationalAutoscaling() + ) + # we have therefore 2 reservations, first instance is terminated and a second one started + all_instances = await ec2_client.describe_instances() + assert len(all_instances["Reservations"]) == 2 + assert "Instances" in all_instances["Reservations"][0] + assert len(all_instances["Reservations"][0]["Instances"]) == 1 + assert "State" in all_instances["Reservations"][0]["Instances"][0] + assert "Name" in all_instances["Reservations"][0]["Instances"][0]["State"] + assert ( + all_instances["Reservations"][0]["Instances"][0]["State"]["Name"] + == "terminated" + ) + + assert "Instances" in all_instances["Reservations"][1] + assert len(all_instances["Reservations"][1]["Instances"]) == 1 + assert "State" in all_instances["Reservations"][1]["Instances"][0] + assert "Name" in all_instances["Reservations"][1]["Instances"][0]["State"] + assert ( + all_instances["Reservations"][1]["Instances"][0]["State"]["Name"] == "running" + ) diff --git a/services/autoscaling/tests/unit/test_modules_auto_scaling_dynamic.py b/services/autoscaling/tests/unit/test_modules_auto_scaling_dynamic.py index 5e70e70031b..d7e76c8cb0f 100644 --- a/services/autoscaling/tests/unit/test_modules_auto_scaling_dynamic.py +++ b/services/autoscaling/tests/unit/test_modules_auto_scaling_dynamic.py @@ -11,7 +11,7 @@ from collections.abc import AsyncIterator, Awaitable, Callable, Iterator from copy import deepcopy from dataclasses import dataclass -from typing import Any, Final +from typing import Any from unittest import mock import aiodocker @@ -35,7 +35,7 @@ from models_library.rabbitmq_messages import RabbitAutoscalingStatusMessage from pydantic import ByteSize, parse_obj_as from pytest_mock.plugin import MockerFixture -from pytest_simcore.helpers.utils_envs import EnvVarsDict, setenvs_from_dict +from pytest_simcore.helpers.utils_envs import EnvVarsDict from simcore_service_autoscaling.core.settings import ApplicationSettings from simcore_service_autoscaling.models import AssociatedInstance, Cluster from simcore_service_autoscaling.modules.auto_scaling_core import ( @@ -89,15 +89,6 @@ def mock_rabbitmq_post_message(mocker: MockerFixture) -> Iterator[mock.Mock]: ) -@pytest.fixture -def mock_find_node_with_name_returns_none(mocker: MockerFixture) -> Iterator[mock.Mock]: - return mocker.patch( - "simcore_service_autoscaling.modules.auto_scaling_core.utils_docker.find_node_with_name", - autospec=True, - return_value=None, - ) - - @pytest.fixture def mock_find_node_with_name_returns_fake_node( mocker: MockerFixture, fake_node: Node @@ -363,7 +354,6 @@ async def _assert_ec2_instances( ) -> list[InstanceTypeDef]: list_instances: list[InstanceTypeDef] = [] all_instances = await ec2_client.describe_instances() - internal_dns_names = [] assert len(all_instances["Reservations"]) == num_reservations for reservation in all_instances["Reservations"]: assert "Instances" in reservation @@ -392,7 +382,6 @@ async def _assert_ec2_instances( assert "PrivateDnsName" in instance instance_private_dns_name = instance["PrivateDnsName"] assert instance_private_dns_name.endswith(".ec2.internal") - internal_dns_names.append(instance_private_dns_name) assert "State" in instance state = instance["State"] assert "Name" in state @@ -884,24 +873,6 @@ async def test_cluster_scaling_up_starts_multiple_instances( mock_rabbitmq_post_message.reset_mock() -_SHORT_EC2_INSTANCES_MAX_START_TIME: Final[datetime.timedelta] = datetime.timedelta( - seconds=10 -) - - -@pytest.fixture -def with_short_ec2_instances_max_start_time( - app_environment: EnvVarsDict, - monkeypatch: pytest.MonkeyPatch, -) -> EnvVarsDict: - return app_environment | setenvs_from_dict( - monkeypatch, - { - "EC2_INSTANCES_MAX_START_TIME": f"{_SHORT_EC2_INSTANCES_MAX_START_TIME}", - }, - ) - - @pytest.mark.parametrize( "docker_service_imposed_ec2_type, docker_service_ram, expected_ec2_type", [ @@ -913,7 +884,7 @@ def with_short_ec2_instances_max_start_time( ), ], ) -async def test_long_pending_ec2_is_detected_as_defect( +async def test_long_pending_ec2_is_detected_as_broken_terminated_and_restarted( with_short_ec2_instances_max_start_time: EnvVarsDict, minimal_configuration: None, service_monitored_labels: dict[DockerLabelKey, str], @@ -931,11 +902,12 @@ async def test_long_pending_ec2_is_detected_as_defect( mock_find_node_with_name_returns_none: mock.Mock, mock_docker_tag_node: mock.Mock, mock_rabbitmq_post_message: mock.Mock, + short_ec2_instance_max_start_time: datetime.timedelta, ): assert app_settings.AUTOSCALING_EC2_INSTANCES assert ( - app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_START_TIME - == _SHORT_EC2_INSTANCES_MAX_START_TIME + short_ec2_instance_max_start_time + == app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_START_TIME ) # we have nothing running now all_instances = await ec2_client.describe_instances()