From 24ba62a6b198f1e2d2ee9ceb029e2bbb612d3849 Mon Sep 17 00:00:00 2001 From: kaihsun Date: Mon, 25 Nov 2024 04:52:12 +0000 Subject: [PATCH] update Signed-off-by: kaihsun --- .../cloud_providers/kuberay/cloud_provider.py | 6 +-- .../autoscaler/v2/tests/test_node_provider.py | 51 ++++++++++++++++++- 2 files changed, 52 insertions(+), 5 deletions(-) diff --git a/python/ray/autoscaler/v2/instance_manager/cloud_providers/kuberay/cloud_provider.py b/python/ray/autoscaler/v2/instance_manager/cloud_providers/kuberay/cloud_provider.py index 500bcd6a7188..342a1238e2ce 100644 --- a/python/ray/autoscaler/v2/instance_manager/cloud_providers/kuberay/cloud_provider.py +++ b/python/ray/autoscaler/v2/instance_manager/cloud_providers/kuberay/cloud_provider.py @@ -215,9 +215,7 @@ def _initialize_scale_request( worker_groups_with_pending_deletes, worker_groups_without_pending_deletes, worker_to_delete_set, - ) = self._get_workers_groups_with_deletes( - ray_cluster, set(cur_instances.keys()) - ) + ) = self._get_workers_delete_info(ray_cluster, set(cur_instances.keys())) # Calculate the desired number of workers by type. num_workers_dict = defaultdict(int) @@ -399,7 +397,7 @@ def instances(self) -> Dict[CloudInstanceId, CloudInstance]: return copy.deepcopy(self._cached_instances) @staticmethod - def _get_workers_groups_with_deletes( + def _get_workers_delete_info( ray_cluster_spec: Dict[str, Any], node_set: Set[CloudInstanceId] ) -> Tuple[Set[NodeType], Set[NodeType]]: """ diff --git a/python/ray/autoscaler/v2/tests/test_node_provider.py b/python/ray/autoscaler/v2/tests/test_node_provider.py index a8e010f58f65..e3ab30cf5787 100644 --- a/python/ray/autoscaler/v2/tests/test_node_provider.py +++ b/python/ray/autoscaler/v2/tests/test_node_provider.py @@ -492,7 +492,7 @@ def test_pending_deletes(self): }, ] - def test_inconsistent_pods_raycr(self): + def test_inconsistent_pods_raycr_scale_up(self): """ Test the case where the cluster state has not yet reached the desired state. Specifically, the replicas field in the RayCluster CR does not match the actual @@ -529,6 +529,55 @@ def test_inconsistent_pods_raycr(self): "value": desired_replicas + 1, } + def test_inconsistent_pods_raycr_scale_down(self): + """ + Test the case where the cluster state has not yet reached the desired state. + Specifically, the replicas field in the RayCluster CR does not match the actual + number of Pods. + """ + # Check the assumptions of the test + small_group = "small-group" + num_pods = 0 + pod_to_delete = None + for pod in self.mock_client._pod_list["items"]: + if pod["metadata"]["labels"]["ray.io/group"] == small_group: + num_pods += 1 + pod_to_delete = pod["metadata"]["name"] + assert pod_to_delete is not None + + assert ( + self.mock_client._ray_cluster["spec"]["workerGroupSpecs"][0]["groupName"] + == small_group + ) + desired_replicas = num_pods + 1 + self.mock_client._ray_cluster["spec"]["workerGroupSpecs"][0][ + "replicas" + ] = desired_replicas + + # Terminate a node. The replicas field should be decremented by 1, even though + # the cluster state has not yet reached the goal state. + self.provider.terminate(ids=[pod_to_delete], request_id="term-1") + patches = self.mock_client.get_patches( + f"rayclusters/{self.provider._cluster_name}" + ) + assert len(patches) == 2 + assert patches == [ + { + "op": "replace", + "path": "/spec/workerGroupSpecs/0/replicas", + "value": desired_replicas - 1, + }, + { + "op": "replace", + "path": "/spec/workerGroupSpecs/0/scaleStrategy", + "value": { + "workersToDelete": [ + pod_to_delete, + ] + }, + }, + ] + if __name__ == "__main__": if os.environ.get("PARALLEL_CI"):