Skip to content

Commit

Permalink
add autoscaler test for placement group reschedule when node dies
Browse files Browse the repository at this point in the history
Signed-off-by: Mimi Liao <mimiliao2000@gmail.com>
  • Loading branch information
mimiliaogo committed Dec 13, 2024
1 parent 5de1e9b commit 6353847
Showing 1 changed file with 101 additions and 0 deletions.
101 changes: 101 additions & 0 deletions python/ray/autoscaler/v2/tests/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import time
from typing import Dict

import grpc
import pytest
from grpc._channel import _InactiveRpcError

import ray
from ray._private.resource_spec import HEAD_NODE_RESOURCE_NAME
Expand All @@ -12,6 +14,7 @@
from ray._raylet import GcsClient
from ray.autoscaler.v2.sdk import get_cluster_status
from ray.cluster_utils import AutoscalingCluster
from ray.core.generated import node_manager_pb2, node_manager_pb2_grpc
from ray.core.generated.usage_pb2 import TagKey
from ray.util.placement_group import placement_group, remove_placement_group
from ray.util.state.api import list_placement_groups, list_tasks
Expand Down Expand Up @@ -323,6 +326,104 @@ def verify():
cluster.shutdown()


@pytest.mark.parametrize("autoscaler_v2", [False, True], ids=["v1", "v2"])
def test_placement_group_reschedule_node_dead(autoscaler_v2):
# Test autoscaler reschedules placement group when node dies.
# Note that it should only provision nodes for the bundles that haven't been placed.

cluster = AutoscalingCluster(
head_resources={"CPU": 0},
worker_node_types={
"type-1": {
"resources": {"R1": 1},
"node_config": {},
"min_workers": 0,
"max_workers": 2,
},
"type-2": {
"resources": {"R2": 1},
"node_config": {},
"min_workers": 0,
"max_workers": 2,
},
"type-3": {
"resources": {"R3": 1},
"node_config": {},
"min_workers": 0,
"max_workers": 2,
},
},
autoscaler_v2=autoscaler_v2,
)

try:
cluster.start()
ray.init("auto")
gcs_address = ray.get_runtime_context().gcs_address

pg = placement_group([{"R1": 1}, {"R2": 1}, {"R3": 1}])

ray.get(pg.ready())

from ray.autoscaler.v2.sdk import get_cluster_status

def verify_nodes(active=3, idle=1):
cluster_state = get_cluster_status(gcs_address)
assert len(cluster_state.active_nodes) == active
assert len(cluster_state.idle_nodes) == idle
return True

# 3 worker nodes, 1 head node (idle)
wait_for_condition(lambda: verify_nodes(3, 1))

# Kill a node
def kill_raylet(ip, port, graceful=True):
raylet_address = f"{ip}:{port}"
channel = grpc.insecure_channel(raylet_address)
stub = node_manager_pb2_grpc.NodeManagerServiceStub(channel)
print(f"Sending a shutdown request to {ip}:{port}")
try:
stub.ShutdownRaylet(
node_manager_pb2.ShutdownRayletRequest(graceful=graceful)
)
except _InactiveRpcError:
assert not graceful

def kill_node(node_id):
# kill -9
import subprocess

cmd = f"ps aux | grep {node_id} | grep -v grep | awk '{{print $2}}'"
pid = subprocess.check_output(cmd, shell=True).decode("utf-8").strip()
print(f"Killing pid {pid}")
# kill the pid
cmd = f"kill -9 {pid}"
subprocess.check_output(cmd, shell=True)

# Kill a worker node with 'R1' in resources
for n in ray.nodes():
if "R1" in n["Resources"]:
node = n
break

# TODO(mimi): kill_raylet won't trigger reschedule in autoscaler v1
# kill_raylet(node["NodeManagerAddress"], node["NodeManagerPort"])
kill_node(node["NodeID"])

# Wait for the node to be removed
wait_for_condition(lambda: verify_nodes(2, 1), 20)

# Check that the placement group is rescheduled

# Only provision nodes for unplaced bundles;
# avoid rescheduling the whole placement group.
wait_for_condition(lambda: verify_nodes(3, 1))

finally:
ray.shutdown()
cluster.shutdown()


def test_object_store_memory_idle_node(shutdown_only):

ray.init()
Expand Down

0 comments on commit 6353847

Please sign in to comment.