Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Autoscaling: issues with labelled drained nodes #5348

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ leave: ## Forces to stop all services, networks, etc by the node leaving the swa
.PHONY: .init-swarm
.init-swarm:
# Ensures swarm is initialized
$(if $(SWARM_HOSTS),,docker swarm init --advertise-addr=$(get_my_ip))
$(if $(SWARM_HOSTS),,docker swarm init --advertise-addr=$(get_my_ip) --default-addr-pool 192.168.0.1/16)


## DOCKER TAGS -------------------------------
Expand Down
10 changes: 10 additions & 0 deletions services/autoscaling/requirements/_base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ fastapi==0.99.1
# -c requirements/../../../requirements/constraints.txt
# -r requirements/../../../packages/service-library/requirements/_fastapi.in
# -r requirements/_base.in
# prometheus-fastapi-instrumentator
frozenlist==1.4.0
# via
# aiohttp
Expand Down Expand Up @@ -273,6 +274,12 @@ partd==1.4.0
# via
# -c requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt
# dask
prometheus-client==0.20.0
# via
# -r requirements/../../../packages/service-library/requirements/_fastapi.in
# prometheus-fastapi-instrumentator
prometheus-fastapi-instrumentator==6.1.0
# via -r requirements/../../../packages/service-library/requirements/_fastapi.in
psutil==5.9.5
# via
# -c requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt
Expand Down Expand Up @@ -452,6 +459,8 @@ types-aiobotocore==2.7.0
# via -r requirements/../../../packages/aws-library/requirements/_base.in
types-aiobotocore-ec2==2.7.0
# via types-aiobotocore
types-aiobotocore-s3==2.7.0
# via types-aiobotocore
types-awscrt==0.19.8
# via botocore-stubs
types-python-dateutil==2.8.19.14
Expand All @@ -465,6 +474,7 @@ typing-extensions==4.8.0
# typer
# types-aiobotocore
# types-aiobotocore-ec2
# types-aiobotocore-s3
# uvicorn
urllib3==1.26.16
# via
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,25 @@
from ..modules.remote_debug import setup_remote_debugging
from .settings import ApplicationSettings

_LOG_LEVEL_STEP = logging.CRITICAL - logging.ERROR
_NOISY_LOGGERS = (
"aiobotocore",
"aio_pika",
"aiormq",
"botocore",
)

logger = logging.getLogger(__name__)


def create_app(settings: ApplicationSettings) -> FastAPI:
# keep mostly quiet noisy loggers
quiet_level: int = max(
min(logging.root.level + _LOG_LEVEL_STEP, logging.CRITICAL), logging.WARNING
)
for name in _NOISY_LOGGERS:
logging.getLogger(name).setLevel(quiet_level)

logger.info("app settings: %s", settings.json(indent=1))

app = FastAPI(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,19 @@ async def _analyze_current_cluster(
terminated_instances=terminated_ec2_instances,
disconnected_nodes=[n for n in docker_nodes if _node_not_ready(n)],
)
_logger.debug(
cluster_state = jsonable_encoder(
cluster,
include={
"active_nodes": True,
"pending_nodes": True,
"drained_nodes": "available_resources",
"reserve_drained_nodes": True,
"pending_ec2s": "ec2_instance",
},
)
_logger.warning(
"current state: %s",
f"{json.dumps(jsonable_encoder(cluster, include={'active_nodes', 'pending_nodes', 'drained_nodes', 'reserve_drained_nodes', 'pending_ec2s'}), indent=2)}",
f"{json.dumps(cluster_state, indent=2)}",
)
return cluster

Expand Down Expand Up @@ -744,11 +754,11 @@ async def _find_terminateable_instances(
terminateable_nodes: list[AssociatedInstance] = []

for instance in cluster.drained_nodes:
assert instance.node.UpdatedAt # nosec
node_last_updated = arrow.get(instance.node.UpdatedAt).datetime
node_last_updated = utils_docker.get_node_last_readyness_update(instance.node)
elapsed_time_since_drained = (
datetime.datetime.now(datetime.timezone.utc) - node_last_updated
)
_logger.warning("%s", f"{node_last_updated=}, {elapsed_time_since_drained=}")
if (
elapsed_time_since_drained
> app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_TERMINATION
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from pathlib import Path
from typing import Final, cast

import arrow
import yaml
from aws_library.ec2.models import EC2InstanceData, Resources
from models_library.docker import (
Expand Down Expand Up @@ -110,7 +111,7 @@ def _check_if_node_is_removable(node: Node) -> bool:
def _is_task_waiting_for_resources(task: Task) -> bool:
# NOTE: https://docs.docker.com/engine/swarm/how-swarm-mode-works/swarm-task-states/
with log_context(
logger, level=logging.DEBUG, msg=f"_is_task_waiting_for_resources: {task}"
logger, level=logging.DEBUG, msg=f"_is_task_waiting_for_resources: {task.ID}"
):
if (
not task.Status
Expand Down Expand Up @@ -550,7 +551,10 @@ def is_node_ready_and_available(node: Node, *, availability: Availability) -> bo


_OSPARC_SERVICE_READY_LABEL_KEY: Final[DockerLabelKey] = parse_obj_as(
DockerLabelKey, "osparc-services-ready"
DockerLabelKey, "io.simcore.osparc-services-ready"
)
_OSPARC_SERVICES_READY_DATETIME_LABEL_KEY: Final[DockerLabelKey] = parse_obj_as(
DockerLabelKey, f"{_OSPARC_SERVICE_READY_LABEL_KEY}-last-changed"
)


Expand All @@ -575,6 +579,7 @@ async def set_node_osparc_ready(
assert node.Spec # nosec
new_tags = deepcopy(cast(dict[DockerLabelKey, str], node.Spec.Labels))
new_tags[_OSPARC_SERVICE_READY_LABEL_KEY] = "true" if ready else "false"
new_tags[_OSPARC_SERVICES_READY_DATETIME_LABEL_KEY] = arrow.utcnow().isoformat()
# NOTE: docker drain sometimes impeed on performance when undraining see https://github.com/ITISFoundation/osparc-simcore/issues/5339
available = app_settings.AUTOSCALING_DRAIN_NODES_WITH_LABELS or ready
return await tag_node(
Expand All @@ -585,6 +590,15 @@ async def set_node_osparc_ready(
)


def get_node_last_readyness_update(node: Node) -> datetime.datetime:
assert node.Spec # nosec
assert node.Spec.Labels # nosec
return cast(
datetime.datetime,
arrow.get(node.Spec.Labels[_OSPARC_SERVICES_READY_DATETIME_LABEL_KEY]).datetime,
) # mypy


async def attach_node(
app_settings: ApplicationSettings,
docker_client: AutoscalingDocker,
Expand All @@ -595,6 +609,7 @@ async def attach_node(
assert node.Spec # nosec
current_tags = cast(dict[DockerLabelKey, str], node.Spec.Labels or {})
new_tags = current_tags | tags | {_OSPARC_SERVICE_READY_LABEL_KEY: "false"}
new_tags[_OSPARC_SERVICES_READY_DATETIME_LABEL_KEY] = arrow.utcnow().isoformat()
return await tag_node(
docker_client,
node,
Expand Down
30 changes: 28 additions & 2 deletions services/autoscaling/tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from unittest import mock

import aiodocker
import arrow
import distributed
import httpx
import psutil
Expand Down Expand Up @@ -50,6 +51,10 @@
)
from simcore_service_autoscaling.models import Cluster, DaskTaskResources
from simcore_service_autoscaling.modules.docker import AutoscalingDocker
from simcore_service_autoscaling.utils.utils_docker import (
_OSPARC_SERVICE_READY_LABEL_KEY,
_OSPARC_SERVICES_READY_DATETIME_LABEL_KEY,
)
from tenacity import retry
from tenacity._asyncio import AsyncRetrying
from tenacity.retry import retry_if_exception_type
Expand Down Expand Up @@ -321,13 +326,34 @@ async def host_node(
) -> AsyncIterator[DockerNode]:
nodes = parse_obj_as(list[DockerNode], await async_docker_client.nodes.list())
assert len(nodes) == 1
# keep state of node for later revert
old_node = deepcopy(nodes[0])
assert old_node.ID
assert old_node.Spec
assert old_node.Spec.Role
assert old_node.Spec.Availability
yield nodes[0]
assert old_node.Version
assert old_node.Version.Index
labels = old_node.Spec.Labels or {}
# ensure we have the necessary labels
await async_docker_client.nodes.update(
node_id=old_node.ID,
version=old_node.Version.Index,
spec={
"Availability": old_node.Spec.Availability.value,
"Labels": labels
| {
_OSPARC_SERVICE_READY_LABEL_KEY: "true",
_OSPARC_SERVICES_READY_DATETIME_LABEL_KEY: arrow.utcnow().isoformat(),
},
"Role": old_node.Spec.Role.value,
},
)
modified_host_node = parse_obj_as(
DockerNode, await async_docker_client.nodes.inspect(node_id=old_node.ID)
)
yield modified_host_node
# revert state
assert old_node.ID
current_node = parse_obj_as(
DockerNode, await async_docker_client.nodes.inspect(node_id=old_node.ID)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from typing import Any
from unittest import mock

import arrow
import distributed
import pytest
from aws_library.ec2.models import Resources
Expand All @@ -43,6 +44,7 @@
from simcore_service_autoscaling.modules.docker import get_docker_client
from simcore_service_autoscaling.utils.utils_docker import (
_OSPARC_SERVICE_READY_LABEL_KEY,
_OSPARC_SERVICES_READY_DATETIME_LABEL_KEY,
)
from types_aiobotocore_ec2.client import EC2Client
from types_aiobotocore_ec2.literals import InstanceTypeType
Expand Down Expand Up @@ -468,29 +470,55 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915
)
assert fake_attached_node.Spec.Labels
fake_attached_node.Spec.Labels |= expected_docker_node_tags | {
_OSPARC_SERVICE_READY_LABEL_KEY: "false"
_OSPARC_SERVICE_READY_LABEL_KEY: "false",
}
mock_docker_tag_node.assert_has_calls(
(
# attach node call
mock.call(
get_docker_client(initialized_app),
fake_node,
tags=fake_node.Spec.Labels
| expected_docker_node_tags
| {_OSPARC_SERVICE_READY_LABEL_KEY: "false"},
available=with_drain_nodes_labelled,
),
mock.call(
get_docker_client(initialized_app),
fake_attached_node,
tags=fake_node.Spec.Labels
| expected_docker_node_tags
| {_OSPARC_SERVICE_READY_LABEL_KEY: "true"},
available=True,
),
)
# check attach call
assert mock_docker_tag_node.call_args_list[0] == mock.call(
get_docker_client(initialized_app),
fake_node,
tags=fake_node.Spec.Labels
| expected_docker_node_tags
| {
_OSPARC_SERVICE_READY_LABEL_KEY: "false",
_OSPARC_SERVICES_READY_DATETIME_LABEL_KEY: mock.ANY,
},
available=with_drain_nodes_labelled,
)
# update our fake node
fake_attached_node.Spec.Labels[
_OSPARC_SERVICES_READY_DATETIME_LABEL_KEY
] = mock_docker_tag_node.call_args_list[0][1]["tags"][
_OSPARC_SERVICES_READY_DATETIME_LABEL_KEY
]
# check the activate time is later than attach time
assert arrow.get(
mock_docker_tag_node.call_args_list[1][1]["tags"][
_OSPARC_SERVICES_READY_DATETIME_LABEL_KEY
]
) > arrow.get(
mock_docker_tag_node.call_args_list[0][1]["tags"][
_OSPARC_SERVICES_READY_DATETIME_LABEL_KEY
]
)

# check activate call
assert mock_docker_tag_node.call_args_list[1] == mock.call(
get_docker_client(initialized_app),
fake_attached_node,
tags=fake_node.Spec.Labels
| expected_docker_node_tags
| {
_OSPARC_SERVICE_READY_LABEL_KEY: "true",
_OSPARC_SERVICES_READY_DATETIME_LABEL_KEY: mock.ANY,
},
available=True,
)
# update our fake node
fake_attached_node.Spec.Labels[
_OSPARC_SERVICES_READY_DATETIME_LABEL_KEY
] = mock_docker_tag_node.call_args_list[1][1]["tags"][
_OSPARC_SERVICES_READY_DATETIME_LABEL_KEY
]
mock_docker_tag_node.reset_mock()
mock_docker_set_node_availability.assert_not_called()
mock_rabbitmq_post_message.assert_called_once()
Expand Down Expand Up @@ -567,9 +595,20 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915
get_docker_client(initialized_app),
fake_attached_node,
tags=fake_attached_node.Spec.Labels
| {_OSPARC_SERVICE_READY_LABEL_KEY: "false"},
| {
_OSPARC_SERVICE_READY_LABEL_KEY: "false",
_OSPARC_SERVICES_READY_DATETIME_LABEL_KEY: mock.ANY,
},
available=with_drain_nodes_labelled,
)
# check the datetime was updated
assert arrow.get(
mock_docker_tag_node.call_args_list[0][1]["tags"][
_OSPARC_SERVICES_READY_DATETIME_LABEL_KEY
]
) > arrow.get(
fake_attached_node.Spec.Labels[_OSPARC_SERVICES_READY_DATETIME_LABEL_KEY]
)
mock_docker_tag_node.reset_mock()

await _assert_ec2_instances(
Expand All @@ -583,10 +622,11 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915
# we artifically set the node to drain
fake_attached_node.Spec.Availability = Availability.drain
fake_attached_node.Spec.Labels[_OSPARC_SERVICE_READY_LABEL_KEY] = "false"
fake_attached_node.UpdatedAt = datetime.datetime.now(
tz=datetime.timezone.utc
).isoformat()
# the node will be not be terminated beforet the timeout triggers
fake_attached_node.Spec.Labels[
_OSPARC_SERVICES_READY_DATETIME_LABEL_KEY
] = datetime.datetime.now(tz=datetime.timezone.utc).isoformat()

# the node will be not be terminated before the timeout triggers
assert app_settings.AUTOSCALING_EC2_INSTANCES
assert (
datetime.timedelta(seconds=5)
Expand All @@ -608,7 +648,7 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915
)

# now changing the last update timepoint will trigger the node removal and shutdown the ec2 instance
fake_attached_node.UpdatedAt = (
fake_attached_node.Spec.Labels[_OSPARC_SERVICES_READY_DATETIME_LABEL_KEY] = (
datetime.datetime.now(tz=datetime.timezone.utc)
- app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_TERMINATION
- datetime.timedelta(seconds=1)
Expand Down
Loading
Loading