Skip to content

Commit

Permalink
[core][autoscaler] Autoscaler doesn't scale up correctly when the Kub…
Browse files Browse the repository at this point in the history
…eRay RayCluster is not in the goal state (#48909)

<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Why are these changes needed?

### Issue
  * Create a Autoscaler V2 RayCluster CR.
    * head Pod: `num-cpus: 0`
* worker Pod: Each worker Pod has 1 CPU, and the `maxReplicas` of the
worker group is 10.
* Run the following script in the head Pod:
https://gist.github.com/kevin85421/6f09368ba48572e28f53654dca854b57
* There are 10 scale requests to add a new node. However, only some of
them will be created (e.g., 5).

### Reason

In the reproduction script above, the `cloud_instance_updater` will send
a request to scale up one worker Pod 10 times because the `maxReplicas`
of the worker group is set to 10.

However, the construction of the scale_request depends on the Pods in
the Kubernetes cluster. For example,

* cluster state: RayCluster Replicas: 2, Ray Pods: 1
* 1st scale request: launch 1 node --> goal state: RayCluster Replicas:
2 (Ray Pods + 1)
* 2nd scale request: launch 1 node --> goal state: RayCluster Replicas:
2 (Ray Pods + 1) --> **this should be 3!**

The above example is expected to create 3 Pods. However, it will
ultimately create only 2 Pods.

### Solution

Use RayCluster CR instead of Ray Pods to build scale requests.


## Related issue number

Closes #46473

## Checks

10 worker Pods are created successfully.

<img width="1373" alt="Screenshot 2024-11-24 at 2 11 39 AM"
src="https://github.com/user-attachments/assets/c42c6cdd-3bf0-4aa9-a928-630c12ff5569">


- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

---------

Signed-off-by: kaihsun <kaihsun@anyscale.com>
  • Loading branch information
kevin85421 authored Nov 26, 2024
1 parent 0f2c62c commit ed3d48c
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 16 deletions.
2 changes: 1 addition & 1 deletion python/ray/autoscaler/kuberay/ray-cluster.complete.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ spec:
workerGroupSpecs:
# the pod replicas in this group typed worker
- replicas: 1
minReplicas: 1
minReplicas: 0
maxReplicas: 300
# logical group name, for this called small-group, also can be functional
groupName: small-group
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,21 +209,25 @@ def _initialize_scale_request(
cur_instances = self.instances

# Get the worker groups that have pending deletes and the worker groups that
# have finished deletes.
# have finished deletes, and the set of workers included in the workersToDelete
# field of any worker group.
(
worker_groups_with_pending_deletes,
worker_groups_without_pending_deletes,
) = self._get_workers_groups_with_deletes(
ray_cluster, set(cur_instances.keys())
)
worker_to_delete_set,
) = self._get_workers_delete_info(ray_cluster, set(cur_instances.keys()))

# Calculate the desired number of workers by type.
num_workers_dict = defaultdict(int)
for _, cur_instance in cur_instances.items():
if cur_instance.node_kind == NodeKind.HEAD:
# Only track workers.
continue
num_workers_dict[cur_instance.node_type] += 1
worker_groups = ray_cluster["spec"].get("workerGroupSpecs", [])
for worker_group in worker_groups:
node_type = worker_group["groupName"]
# Handle the case where users manually increase `minReplicas`
# to scale up the number of worker Pods. In this scenario,
# `replicas` will be smaller than `minReplicas`.
num_workers_dict[node_type] = max(
worker_group["replicas"], worker_group["minReplicas"]
)

# Add to launch nodes.
for node_type, count in to_launch.items():
Expand All @@ -242,6 +246,11 @@ def _initialize_scale_request(
# Not possible to delete head node.
continue

if to_delete_instance.cloud_instance_id in worker_to_delete_set:
# If the instance is already in the workersToDelete field of
# any worker group, skip it.
continue

num_workers_dict[to_delete_instance.node_type] -= 1
assert num_workers_dict[to_delete_instance.node_type] >= 0
to_delete_instances_by_type[to_delete_instance.node_type].append(
Expand Down Expand Up @@ -321,6 +330,7 @@ def _submit_scale_request(
# No patch required.
return

logger.info(f"Submitting a scale request: {scale_request}")
self._patch(f"rayclusters/{self._cluster_name}", patch_payload)

def _add_launch_errors(
Expand Down Expand Up @@ -392,9 +402,9 @@ def instances(self) -> Dict[CloudInstanceId, CloudInstance]:
return copy.deepcopy(self._cached_instances)

@staticmethod
def _get_workers_groups_with_deletes(
def _get_workers_delete_info(
ray_cluster_spec: Dict[str, Any], node_set: Set[CloudInstanceId]
) -> Tuple[Set[NodeType], Set[NodeType]]:
) -> Tuple[Set[NodeType], Set[NodeType], Set[CloudInstanceId]]:
"""
Gets the worker groups that have pending deletes and the worker groups that
have finished deletes.
Expand All @@ -404,10 +414,13 @@ def _get_workers_groups_with_deletes(
deletes.
worker_groups_with_finished_deletes: The worker groups that have finished
deletes.
worker_to_delete_set: A set of Pods that are included in the workersToDelete
field of any worker group.
"""

worker_groups_with_pending_deletes = set()
worker_groups_with_deletes = set()
worker_to_delete_set = set()

worker_groups = ray_cluster_spec["spec"].get("workerGroupSpecs", [])
for worker_group in worker_groups:
Expand All @@ -422,14 +435,19 @@ def _get_workers_groups_with_deletes(
worker_groups_with_deletes.add(node_type)

for worker in workersToDelete:
worker_to_delete_set.add(worker)
if worker in node_set:
worker_groups_with_pending_deletes.add(node_type)
break

worker_groups_with_finished_deletes = (
worker_groups_with_deletes - worker_groups_with_pending_deletes
)
return worker_groups_with_pending_deletes, worker_groups_with_finished_deletes
return (
worker_groups_with_pending_deletes,
worker_groups_with_finished_deletes,
worker_to_delete_set,
)

def _fetch_instances(self) -> Dict[CloudInstanceId, CloudInstance]:
"""
Expand Down
118 changes: 118 additions & 0 deletions python/ray/autoscaler/v2/tests/test_node_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,124 @@ def test_pending_deletes(self):
},
]

def test_increase_min_replicas_to_scale_up(self):
# Simulate the case where users manually increase the `minReplicas` field
# from 0 to $num_pods. KubeRay will create $num_pods worker Pods to meet the new
# `minReplicas`, even though the `replicas` field is still 0.
small_group = "small-group"
num_pods = 0
assert (
self.mock_client._ray_cluster["spec"]["workerGroupSpecs"][0]["groupName"]
== small_group
)
for pod in self.mock_client._pod_list["items"]:
if pod["metadata"]["labels"]["ray.io/group"] == small_group:
num_pods += 1
assert num_pods > 0
self.mock_client._ray_cluster["spec"]["workerGroupSpecs"][0]["replicas"] = 0
self.mock_client._ray_cluster["spec"]["workerGroupSpecs"][0][
"minReplicas"
] = num_pods

# Launching a new node and `replicas` should be
# `max(replicas, minReplicas) + 1`.
self.provider.launch(shape={small_group: 1}, request_id="launch-1")
patches = self.mock_client.get_patches(
f"rayclusters/{self.provider._cluster_name}"
)
assert len(patches) == 1
assert patches[0] == {
"op": "replace",
"path": "/spec/workerGroupSpecs/0/replicas",
"value": num_pods + 1,
}

def test_inconsistent_pods_raycr_scale_up(self):
"""
Test the case where the cluster state has not yet reached the desired state.
Specifically, the replicas field in the RayCluster CR does not match the actual
number of Pods.
"""
# Check the assumptions of the test
small_group = "small-group"
num_pods = 0
for pod in self.mock_client._pod_list["items"]:
if pod["metadata"]["labels"]["ray.io/group"] == small_group:
num_pods += 1

assert (
self.mock_client._ray_cluster["spec"]["workerGroupSpecs"][0]["groupName"]
== small_group
)
desired_replicas = num_pods + 1
self.mock_client._ray_cluster["spec"]["workerGroupSpecs"][0][
"replicas"
] = desired_replicas

# Launch a new node. The replicas field should be incremented by 1, even though
# the cluster state has not yet reached the goal state.
launch_request = {"small-group": 1}
self.provider.launch(shape=launch_request, request_id="launch-1")

patches = self.mock_client.get_patches(
f"rayclusters/{self.provider._cluster_name}"
)
assert len(patches) == 1
assert patches[0] == {
"op": "replace",
"path": "/spec/workerGroupSpecs/0/replicas",
"value": desired_replicas + 1,
}

def test_inconsistent_pods_raycr_scale_down(self):
"""
Test the case where the cluster state has not yet reached the desired state.
Specifically, the replicas field in the RayCluster CR does not match the actual
number of Pods.
"""
# Check the assumptions of the test
small_group = "small-group"
num_pods = 0
pod_to_delete = None
for pod in self.mock_client._pod_list["items"]:
if pod["metadata"]["labels"]["ray.io/group"] == small_group:
num_pods += 1
pod_to_delete = pod["metadata"]["name"]
assert pod_to_delete is not None

assert (
self.mock_client._ray_cluster["spec"]["workerGroupSpecs"][0]["groupName"]
== small_group
)
desired_replicas = num_pods + 1
self.mock_client._ray_cluster["spec"]["workerGroupSpecs"][0][
"replicas"
] = desired_replicas

# Terminate a node. The replicas field should be decremented by 1, even though
# the cluster state has not yet reached the goal state.
self.provider.terminate(ids=[pod_to_delete], request_id="term-1")
patches = self.mock_client.get_patches(
f"rayclusters/{self.provider._cluster_name}"
)
assert len(patches) == 2
assert patches == [
{
"op": "replace",
"path": "/spec/workerGroupSpecs/0/replicas",
"value": desired_replicas - 1,
},
{
"op": "replace",
"path": "/spec/workerGroupSpecs/0/scaleStrategy",
"value": {
"workersToDelete": [
pod_to_delete,
]
},
},
]


if __name__ == "__main__":
if os.environ.get("PARALLEL_CI"):
Expand Down
6 changes: 3 additions & 3 deletions python/ray/tests/kuberay/test_autoscaling_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def _get_basic_autoscaling_config() -> dict:
},
"small-group": {
"max_workers": 300,
"min_workers": 1,
"min_workers": 0,
"node_config": {},
"resources": {
"CPU": 1,
Expand All @@ -95,7 +95,7 @@ def _get_basic_autoscaling_config() -> dict:
# and modified max_workers.
"gpu-group": {
"max_workers": 200,
"min_workers": 1,
"min_workers": 0,
"node_config": {},
"resources": {
"CPU": 1,
Expand All @@ -109,7 +109,7 @@ def _get_basic_autoscaling_config() -> dict:
# and modified max_workers and node_config.
"tpu-group": {
"max_workers": 4,
"min_workers": 1,
"min_workers": 0,
"node_config": {},
"resources": {
"CPU": 1,
Expand Down

0 comments on commit ed3d48c

Please sign in to comment.