Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛Bugfix/autoscaling does not scale above limit #5129

Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from typing import Any, TypeAlias

from .constants import DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY

DaskTaskResources: TypeAlias = dict[str, Any]


def create_ec2_resource_constraint_key(ec2_instance_type: str) -> str:
return f"{DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY}:{ec2_instance_type}"


def get_ec2_instance_type_from_resources(
task_resources: DaskTaskResources,
) -> str | None:
for resource_name in task_resources:
if resource_name.startswith(DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY):
return resource_name.split(":")[-1]
return None
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from dask_task_models_library.constants import DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY
from dask_task_models_library.resource_constraints import (
create_ec2_resource_constraint_key,
get_ec2_instance_type_from_resources,
)
from faker import Faker


def test_create_ec2_resource_constraint_key(faker: Faker):
faker_instance_type = faker.pystr()
assert (
create_ec2_resource_constraint_key(faker_instance_type)
== f"{DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY}:{faker_instance_type}"
)

empty_instance_type = ""
assert (
create_ec2_resource_constraint_key(empty_instance_type)
== f"{DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY}:"
)


def test_get_ec2_instance_type_from_resources(faker: Faker):
empty_task_resources = {}
assert get_ec2_instance_type_from_resources(empty_task_resources) is None
no_ec2_types_in_resources = {"blahblah": 1}
assert get_ec2_instance_type_from_resources(no_ec2_types_in_resources) is None

faker_instance_type = faker.pystr()
ec2_type_in_resources = {create_ec2_resource_constraint_key(faker_instance_type): 1}
assert (
get_ec2_instance_type_from_resources(ec2_type_in_resources)
== faker_instance_type
)
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from dataclasses import dataclass, field
from typing import Any, TypeAlias
from typing import TypeAlias

from aws_library.ec2.models import EC2InstanceData, EC2InstanceType, Resources
from dask_task_models_library.resource_constraints import DaskTaskResources
from models_library.generated_models.docker_rest_api import Node


Expand Down Expand Up @@ -55,7 +56,6 @@ class Cluster:


DaskTaskId: TypeAlias = str
DaskTaskResources: TypeAlias = dict[str, Any]


@dataclass(frozen=True, kw_only=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
EC2InstanceConfig,
EC2InstanceData,
EC2InstanceType,
EC2Tags,
Resources,
)
from fastapi import FastAPI
Expand Down Expand Up @@ -441,6 +442,73 @@ async def _find_needed_instances(
return num_instances_per_type


async def _cap_needed_instances(
app: FastAPI, needed_instances: dict[EC2InstanceType, int], ec2_tags: EC2Tags
) -> dict[EC2InstanceType, int]:
"""caps the needed instances dict[EC2InstanceType, int] to the maximal allowed number of instances by
1. limiting to 1 per asked type
2. increasing each by 1 until the maximum allowed number of instances is reached
NOTE: the maximum allowed number of instances contains the current number of running/pending machines

Raises:
Ec2TooManyInstancesError: raised when the maximum of machines is already running/pending
"""
ec2_client = get_ec2_client(app)
app_settings = get_application_settings(app)
assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec
current_instances = await ec2_client.get_instances(
key_names=[app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_KEY_NAME],
tags=ec2_tags,
)
current_number_of_instances = len(current_instances)
if (
current_number_of_instances
>= app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES
):
# ok that is already too much
raise Ec2TooManyInstancesError(
num_instances=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES
)

total_number_of_needed_instances = sum(needed_instances.values())
if (
current_number_of_instances + total_number_of_needed_instances
<= app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES
):
# ok that fits no need to do anything here
return needed_instances

# this is asking for too many, so let's cap them
max_number_of_creatable_instances = (
app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES
- current_number_of_instances
)

# we start with 1 machine of each type until the max
capped_needed_instances = {
k: 1
for count, k in enumerate(needed_instances)
if (count + 1) <= max_number_of_creatable_instances
}

if len(capped_needed_instances) < len(needed_instances):
# there were too many types for the number of possible instances
return capped_needed_instances

# all instance types were added, now create more of them if possible
while sum(capped_needed_instances.values()) < max_number_of_creatable_instances:
for instance_type, num_to_create in needed_instances.items():
if (
sum(capped_needed_instances.values())
== max_number_of_creatable_instances
):
break
if num_to_create > capped_needed_instances[instance_type]:
capped_needed_instances[instance_type] += 1

return capped_needed_instances


async def _start_instances(
app: FastAPI,
needed_instances: dict[EC2InstanceType, int],
Expand All @@ -450,14 +518,28 @@ async def _start_instances(
ec2_client = get_ec2_client(app)
app_settings = get_application_settings(app)
assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec
new_instance_tags = auto_scaling_mode.get_ec2_tags(app)
capped_needed_machines = {}
try:
capped_needed_machines = await _cap_needed_instances(
app, needed_instances, new_instance_tags
)
except Ec2TooManyInstancesError:
await auto_scaling_mode.log_message_from_tasks(
app,
tasks,
"The maximum number of machines in the cluster was reached. Please wait for your running jobs "
"to complete and try again later or contact osparc support if this issue does not resolve.",
level=logging.ERROR,
)
return []

instance_tags = auto_scaling_mode.get_ec2_tags(app)
results = await asyncio.gather(
*[
ec2_client.start_aws_instance(
EC2InstanceConfig(
type=instance_type,
tags=instance_tags,
tags=new_instance_tags,
startup_script=await ec2_startup_script(
app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES[
instance_type.name
Expand All @@ -474,7 +556,7 @@ async def _start_instances(
number_of_instances=instance_num,
max_number_of_instances=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES,
)
for instance_type, instance_num in needed_instances.items()
for instance_type, instance_num in capped_needed_machines.items()
],
return_exceptions=True,
)
Expand All @@ -497,7 +579,10 @@ async def _start_instances(
else:
new_pending_instances.append(r)

log_message = f"{sum(n for n in needed_instances.values())} new machines launched, it might take up to 3 minutes to start, Please wait..."
log_message = (
f"{sum(n for n in capped_needed_machines.values())} new machines launched"
", it might take up to 3 minutes to start, Please wait..."
)
await auto_scaling_mode.log_message_from_tasks(
app, tasks, log_message, level=logging.INFO
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@

import distributed
from aws_library.ec2.models import EC2InstanceData, Resources
from dask_task_models_library.resource_constraints import DaskTaskResources
from pydantic import AnyUrl, ByteSize, parse_obj_as

from ..core.errors import (
DaskNoWorkersError,
DaskSchedulerNotFoundError,
DaskWorkerNotFoundError,
)
from ..models import AssociatedInstance, DaskTask, DaskTaskId, DaskTaskResources
from ..models import AssociatedInstance, DaskTask, DaskTaskId
from ..utils.auto_scaling_core import (
node_host_name_from_ec2_private_dns,
node_ip_from_ec2_private_dns,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
from typing import Final

from aws_library.ec2.models import Resources
from dask_task_models_library.constants import DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY
from dask_task_models_library.resource_constraints import (
get_ec2_instance_type_from_resources,
)
from fastapi import FastAPI
from servicelib.utils_formatting import timedelta_as_minute_second
from types_aiobotocore_ec2.literals import InstanceTypeType

from ..core.settings import get_application_settings
from ..models import (
Expand All @@ -30,8 +31,11 @@ def get_max_resources_from_dask_task(task: DaskTask) -> Resources:
)


def get_task_instance_restriction(task: DaskTask) -> InstanceTypeType | None:
return task.required_resources.get(DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY)
def get_task_instance_restriction(task: DaskTask) -> str | None:
instance_ec2_type: str | None = get_ec2_instance_type_from_resources(
task.required_resources
)
return instance_ec2_type


def _compute_tasks_needed_resources(tasks: list[DaskTask]) -> Resources:
Expand Down
Loading