diff --git a/python/ray/autoscaler/v2/tests/test_e2e.py b/python/ray/autoscaler/v2/tests/test_e2e.py index 0e47200b0223..01899cf4334a 100644 --- a/python/ray/autoscaler/v2/tests/test_e2e.py +++ b/python/ray/autoscaler/v2/tests/test_e2e.py @@ -3,7 +3,9 @@ import time from typing import Dict +import grpc import pytest +from grpc._channel import _InactiveRpcError import ray from ray._private.resource_spec import HEAD_NODE_RESOURCE_NAME @@ -12,6 +14,7 @@ from ray._raylet import GcsClient from ray.autoscaler.v2.sdk import get_cluster_status from ray.cluster_utils import AutoscalingCluster +from ray.core.generated import node_manager_pb2, node_manager_pb2_grpc from ray.core.generated.usage_pb2 import TagKey from ray.util.placement_group import placement_group, remove_placement_group from ray.util.state.api import list_placement_groups, list_tasks @@ -323,6 +326,104 @@ def verify(): cluster.shutdown() +@pytest.mark.parametrize("autoscaler_v2", [False, True], ids=["v1", "v2"]) +def test_placement_group_reschedule_node_dead(autoscaler_v2): + # Test autoscaler reschedules placement group when node dies. + # Note that it should only provision nodes for the bundles that haven't been placed. + + cluster = AutoscalingCluster( + head_resources={"CPU": 0}, + worker_node_types={ + "type-1": { + "resources": {"R1": 1}, + "node_config": {}, + "min_workers": 0, + "max_workers": 2, + }, + "type-2": { + "resources": {"R2": 1}, + "node_config": {}, + "min_workers": 0, + "max_workers": 2, + }, + "type-3": { + "resources": {"R3": 1}, + "node_config": {}, + "min_workers": 0, + "max_workers": 2, + }, + }, + autoscaler_v2=autoscaler_v2, + ) + + try: + cluster.start() + ray.init("auto") + gcs_address = ray.get_runtime_context().gcs_address + + pg = placement_group([{"R1": 1}, {"R2": 1}, {"R3": 1}]) + + ray.get(pg.ready()) + + from ray.autoscaler.v2.sdk import get_cluster_status + + def verify_nodes(active=3, idle=1): + cluster_state = get_cluster_status(gcs_address) + assert len(cluster_state.active_nodes) == active + assert len(cluster_state.idle_nodes) == idle + return True + + # 3 worker nodes, 1 head node (idle) + wait_for_condition(lambda: verify_nodes(3, 1)) + + # Kill a node + def kill_raylet(ip, port, graceful=True): + raylet_address = f"{ip}:{port}" + channel = grpc.insecure_channel(raylet_address) + stub = node_manager_pb2_grpc.NodeManagerServiceStub(channel) + print(f"Sending a shutdown request to {ip}:{port}") + try: + stub.ShutdownRaylet( + node_manager_pb2.ShutdownRayletRequest(graceful=graceful) + ) + except _InactiveRpcError: + assert not graceful + + def kill_node(node_id): + # kill -9 + import subprocess + + cmd = f"ps aux | grep {node_id} | grep -v grep | awk '{{print $2}}'" + pid = subprocess.check_output(cmd, shell=True).decode("utf-8").strip() + print(f"Killing pid {pid}") + # kill the pid + cmd = f"kill -9 {pid}" + subprocess.check_output(cmd, shell=True) + + # Kill a worker node with 'R1' in resources + for n in ray.nodes(): + if "R1" in n["Resources"]: + node = n + break + + # TODO(mimi): kill_raylet won't trigger reschedule in autoscaler v1 + # kill_raylet(node["NodeManagerAddress"], node["NodeManagerPort"]) + kill_node(node["NodeID"]) + + # Wait for the node to be removed + wait_for_condition(lambda: verify_nodes(2, 1), 20) + + # Check that the placement group is rescheduled + + # Only provision nodes for unplaced bundles; + # avoid rescheduling the whole placement group. + wait_for_condition(lambda: verify_nodes(3, 1)) + + finally: + ray.shutdown() + cluster.shutdown() + + def test_object_store_memory_idle_node(shutdown_only): ray.init()