Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Autoscaler][Placement Group] Skip placed bundle when requesting resource #48924

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
fix review comments
Signed-off-by: Mimi Liao <mimiliao2000@gmail.com>
  • Loading branch information
mimiliaogo committed Dec 16, 2024
commit 415dcf8967cbfc89faab55eefada429b16af737b
27 changes: 2 additions & 25 deletions python/ray/autoscaler/v2/tests/test_e2e.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import os
import subprocess
import sys
import time
from typing import Dict

import grpc
import pytest
from grpc._channel import _InactiveRpcError

import ray
from ray._private.resource_spec import HEAD_NODE_RESOURCE_NAME
@@ -14,7 +13,6 @@
from ray._raylet import GcsClient
from ray.autoscaler.v2.sdk import get_cluster_status
from ray.cluster_utils import AutoscalingCluster
from ray.core.generated import node_manager_pb2, node_manager_pb2_grpc
from ray.core.generated.usage_pb2 import TagKey
from ray.util.placement_group import placement_group, remove_placement_group
from ray.util.state.api import list_placement_groups, list_tasks
@@ -365,9 +363,7 @@ def test_placement_group_reschedule_node_dead(autoscaler_v2):

ray.get(pg.ready())

from ray.autoscaler.v2.sdk import get_cluster_status

def verify_nodes(active=3, idle=1):
def verify_nodes(active, idle):
cluster_state = get_cluster_status(gcs_address)
assert len(cluster_state.active_nodes) == active
assert len(cluster_state.idle_nodes) == idle
@@ -376,23 +372,7 @@ def verify_nodes(active=3, idle=1):
# 3 worker nodes, 1 head node (idle)
wait_for_condition(lambda: verify_nodes(3, 1))

# Kill a node
def kill_raylet(ip, port, graceful=True):
raylet_address = f"{ip}:{port}"
channel = grpc.insecure_channel(raylet_address)
stub = node_manager_pb2_grpc.NodeManagerServiceStub(channel)
print(f"Sending a shutdown request to {ip}:{port}")
try:
stub.ShutdownRaylet(
node_manager_pb2.ShutdownRayletRequest(graceful=graceful)
)
except _InactiveRpcError:
assert not graceful

def kill_node(node_id):
# kill -9
import subprocess

cmd = f"ps aux | grep {node_id} | grep -v grep | awk '{{print $2}}'"
pid = subprocess.check_output(cmd, shell=True).decode("utf-8").strip()
print(f"Killing pid {pid}")
@@ -407,14 +387,11 @@ def kill_node(node_id):
break

# TODO(mimi): kill_raylet won't trigger reschedule in autoscaler v1
# kill_raylet(node["NodeManagerAddress"], node["NodeManagerPort"])
kill_node(node["NodeID"])

# Wait for the node to be removed
wait_for_condition(lambda: verify_nodes(2, 1), 20)

# Check that the placement group is rescheduled

# Only provision nodes for unplaced bundles;
# avoid rescheduling the whole placement group.
wait_for_condition(lambda: verify_nodes(3, 1))
Loading