Skip to content

Commit

Permalink
[Core][Autoscaler] Configure idleTimeoutSeconds per node type (#48813)
Browse files Browse the repository at this point in the history
<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Why are these changes needed?

Adds `idle_timeout_s` as a field to `node_type_configs`, enabling the v2
autoscaler to configure idle termination per worker type.

This PR depends on a change in KubeRay to the RayCluster CRD, since we
want to support passing `idleTimeoutSeconds` to individual worker groups
such that they can specify a custom idle duration:
ray-project/kuberay#2558

## Related issue number

Closes #36888

<!-- For example: "Closes #1234" -->

## Checks

- [x] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [x] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [x] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

---------

Signed-off-by: ryanaoleary <ryanaoleary@google.com>
Signed-off-by: ryanaoleary <113500783+ryanaoleary@users.noreply.github.com>
Co-authored-by: Kai-Hsun Chen <kaihsun@apache.org>
Co-authored-by: Ricky Xu <xuchen727@hotmail.com>
  • Loading branch information
3 people authored Nov 24, 2024
1 parent 4345c6c commit 4a29571
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 2 deletions.
8 changes: 7 additions & 1 deletion python/ray/autoscaler/_private/kuberay/autoscaling_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ def _node_type_from_group_spec(

resources = _get_ray_resources_from_group_spec(group_spec, is_head)

return {
node_type = {
"min_workers": min_workers,
"max_workers": max_workers,
# `node_config` is a legacy field required for compatibility.
Expand All @@ -228,6 +228,12 @@ def _node_type_from_group_spec(
"resources": resources,
}

idle_timeout_s = group_spec.get(IDLE_SECONDS_KEY)
if idle_timeout_s is not None:
node_type["idle_timeout_s"] = float(idle_timeout_s)

return node_type


def _get_ray_resources_from_group_spec(
group_spec: Dict[str, Any], is_head: bool
Expand Down
1 change: 1 addition & 0 deletions python/ray/autoscaler/ray-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@
},
"min_workers": {"type": "integer"},
"max_workers": {"type": "integer"},
"idle_timeout_s": {"type": "number", "nullable": true},
"resources": {
"type": "object",
"patternProperties": {
Expand Down
3 changes: 3 additions & 0 deletions python/ray/autoscaler/v2/instance_manager/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ class NodeTypeConfig:
min_worker_nodes: int
# The maximal number of worker nodes can be launched for this node type.
max_worker_nodes: int
# Idle timeout seconds for worker nodes of this node type.
idle_timeout_s: Optional[float] = None
# The total resources on the node.
resources: Dict[str, float] = field(default_factory=dict)
# The labels on the node.
Expand Down Expand Up @@ -346,6 +348,7 @@ def get_node_type_configs(self) -> Dict[NodeType, NodeTypeConfig]:
name=node_type,
min_worker_nodes=node_config.get("min_workers", 0),
max_worker_nodes=max_workers_nodes,
idle_timeout_s=node_config.get("idle_timeout_s", None),
resources=node_config.get("resources", {}),
labels=node_config.get("labels", {}),
launch_config_hash=launch_config_hash,
Expand Down
6 changes: 5 additions & 1 deletion python/ray/autoscaler/v2/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1584,6 +1584,11 @@ def _enforce_idle_termination(
continue

idle_timeout_s = ctx.get_idle_timeout_s()
# Override the scheduler idle_timeout_s if set for this node_type.
node_type = node.node_type
if node_type in node_type_configs:
if node_type_configs[node_type].idle_timeout_s is not None:
idle_timeout_s = node_type_configs[node_type].idle_timeout_s
if idle_timeout_s is None:
# No idle timeout is set, skip the idle termination.
continue
Expand All @@ -1606,7 +1611,6 @@ def _enforce_idle_termination(

# Honor the min_worker_nodes setting for the node type.
min_count = 0
node_type = node.node_type
if node_type in node_type_configs:
min_count = node_type_configs[node_type].min_worker_nodes
if (
Expand Down
76 changes: 76 additions & 0 deletions python/ray/autoscaler/v2/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1434,6 +1434,82 @@ def test_idle_termination_with_min_worker(min_workers):
assert len(to_terminate) == 0


@pytest.mark.parametrize("node_type_idle_timeout_s", [1, 2, 10])
def test_idle_termination_with_node_type_idle_timeout(node_type_idle_timeout_s):
"""
Test that idle nodes are terminated when idle_timeout_s is set for node type.
"""
scheduler = ResourceDemandScheduler(event_logger)

node_type_configs = {
"type_cpu_with_idle_timeout": NodeTypeConfig(
name="type_cpu",
resources={"CPU": 1},
min_worker_nodes=0,
max_worker_nodes=5,
idle_timeout_s=node_type_idle_timeout_s,
launch_config_hash="hash1",
),
}

idle_time_s = 5
constraints = []

request = sched_request(
node_type_configs=node_type_configs,
instances=[
make_autoscaler_instance(
im_instance=Instance(
instance_type="type_cpu_with_idle_timeout",
status=Instance.RAY_RUNNING,
launch_config_hash="hash1",
instance_id="i-1",
node_id="r-1",
),
ray_node=NodeState(
node_id=b"r-1",
ray_node_type_name="type_cpu_with_idle_timeout",
available_resources={"CPU": 0},
total_resources={"CPU": 1},
idle_duration_ms=0, # Non idle
status=NodeStatus.RUNNING,
),
cloud_instance_id="c-1",
),
make_autoscaler_instance(
im_instance=Instance(
instance_id="i-2",
instance_type="type_cpu_with_idle_timeout",
status=Instance.RAY_RUNNING,
launch_config_hash="hash1",
node_id="r-2",
),
ray_node=NodeState(
ray_node_type_name="type_cpu_with_idle_timeout",
node_id=b"r-2",
available_resources={"CPU": 1},
total_resources={"CPU": 1},
idle_duration_ms=idle_time_s * 1000,
status=NodeStatus.IDLE,
),
cloud_instance_id="c-2",
),
],
# Set autoscaler idle_timeout_s to a value greater than
# node_type_idle_timeout_s and idle_time_s.
idle_timeout_s=idle_time_s * 1000,
cluster_resource_constraints=constraints,
)

reply = scheduler.schedule(request)
_, to_terminate = _launch_and_terminate(reply)
if node_type_idle_timeout_s <= idle_time_s:
assert len(to_terminate) == 1
assert to_terminate == [("i-2", "r-2", TerminationRequest.Cause.IDLE)]
else:
assert len(to_terminate) == 0


def test_gang_scheduling():
"""
Test that gang scheduling works.
Expand Down

0 comments on commit 4a29571

Please sign in to comment.