Skip to content

Commit

Permalink
[Core] Introduce spill_on_unavailable option for soft NodeAffinitySch…
Browse files Browse the repository at this point in the history
…edulingStrategy (#34224) (#34285)

Introduce a private _spill_on_unavailable semantic for soft NodeAffinitySchedulingStrategy.

Signed-off-by: Jiajun Yao <jeromeyjj@gmail.com>
  • Loading branch information
jjyao authored Apr 12, 2023
1 parent 127e007 commit d44d689
Show file tree
Hide file tree
Showing 14 changed files with 101 additions and 96 deletions.
11 changes: 7 additions & 4 deletions doc/source/ray-core/scheduling/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,13 @@ NodeAffinitySchedulingStrategy
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

:py:class:`~ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy` is a low-level strategy that allows a task or actor to be scheduled onto a particular node specified by its node id.
The ``soft`` flag specifies whether the task or actor is allowed to run somewhere else. If ``soft`` is True, the task or actor will be scheduled onto the node if it is alive, feasible, and has
sufficient resources at the time of scheduling. Otherwise, it will find a different node using the default scheduling strategy. If ``soft`` is False, the task or actor will be scheduled
onto the node if it is alive and feasible. It may be queued and wait on the node until it has resources to execute. If the node is infeasible, the task or actor will fail with the
:py:class:`~ray.exceptions.TaskUnschedulableError` or :py:class:`~ray.exceptions.ActorUnschedulableError`.
The ``soft`` flag specifies whether the task or actor is allowed to run somewhere else if the specified node doesn't exist (e.g. if the node dies)
or is infeasible because it does not have the resources required to run the task or actor.
In these cases, if ``soft`` is True, the task or actor will be scheduled onto a different feasible node.
Otherwise, the task or actor will fail with :py:class:`~ray.exceptions.TaskUnschedulableError` or :py:class:`~ray.exceptions.ActorUnschedulableError`.
As long as the specified node is alive and feasible, the task or actor will only run there
regardless of the ``soft`` flag. This means if the node currently has no available resources, the task or actor will wait until resources
become available.
This strategy should *only* be used if other high level scheduling strategies (e.g. :ref:`placement group <ray-placement-group-doc-ref>`) cannot give the
desired task or actor placements. It has the following known limitations:

Expand Down
2 changes: 2 additions & 0 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1954,6 +1954,8 @@ cdef class CoreWorker:
NodeID.from_hex(python_scheduling_strategy.node_id).binary())
c_node_affinity_scheduling_strategy[0].set_soft(
python_scheduling_strategy.soft)
c_node_affinity_scheduling_strategy[0].set_spill_on_unavailable(
python_scheduling_strategy._spill_on_unavailable)
else:
raise ValueError(
f"Invalid scheduling_strategy value "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ def get_or_create_autoscaling_requester_actor():
scheduling_strategy = NodeAffinitySchedulingStrategy(
ray.get_runtime_context().get_node_id(),
soft=True,
_spill_on_unavailable=True,
)
return AutoscalingRequester.options(
name="AutoscalingRequester",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ def __call__(self, args):
args["scheduling_strategy"] = NodeAffinitySchedulingStrategy(
self.locs[self.i],
soft=True,
_spill_on_unavailable=True,
)
self.i += 1
self.i %= len(self.locs)
Expand Down
1 change: 1 addition & 0 deletions python/ray/includes/common.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ cdef extern from "src/ray/protobuf/common.pb.h" nogil:
CNodeAffinitySchedulingStrategy()
void set_node_id(const c_string& node_id)
void set_soft(c_bool soft)
void set_spill_on_unavailable(c_bool spill_on_unavailable)
cdef cppclass CSchedulingStrategy "ray::rpc::SchedulingStrategy":
CSchedulingStrategy()
void clear_scheduling_strategy()
Expand Down
95 changes: 33 additions & 62 deletions python/ray/tests/test_scheduling_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,68 +245,6 @@ def func():
func.options(scheduling_strategy="XXX").remote()


def test_soft_chooses_another_node_if_not_available(ray_start_cluster):
cluster = ray_start_cluster
cluster.add_node(num_cpus=3)
cluster.add_node(num_cpus=3)
ray.init(address=cluster.address)
cluster.wait_for_nodes()

@ray.remote
def get_node_id_task(sleep_s=0):
time.sleep(sleep_s)
return ray.get_runtime_context().get_node_id()

target_node_id = ray.get(get_node_id_task.remote())

_ = [
get_node_id_task.options(
scheduling_strategy=NodeAffinitySchedulingStrategy(
target_node_id, soft=False
)
).remote(1000)
for _ in range(3)
]

soft_ref = get_node_id_task.options(
scheduling_strategy=NodeAffinitySchedulingStrategy(target_node_id, soft=True)
).remote()

soft_node_id = ray.get(soft_ref, timeout=3)
assert target_node_id != soft_node_id


def test_not_soft_queues_if_not_available(ray_start_cluster):
cluster = ray_start_cluster
cluster.add_node(num_cpus=3)
cluster.add_node(num_cpus=3)
ray.init(address=cluster.address)
cluster.wait_for_nodes()

@ray.remote
def get_node_id_task(sleep_s=0):
time.sleep(sleep_s)
return ray.get_runtime_context().get_node_id()

target_node_id = ray.get(get_node_id_task.remote())

_ = [
get_node_id_task.options(
scheduling_strategy=NodeAffinitySchedulingStrategy(
target_node_id, soft=False
)
).remote(1000)
for _ in range(3)
]

hard_ref = get_node_id_task.options(
scheduling_strategy=NodeAffinitySchedulingStrategy(target_node_id, soft=False)
).remote()

with pytest.raises(ray.exceptions.GetTimeoutError):
ray.get(hard_ref, timeout=3)


@pytest.mark.parametrize("connect_to_client", [True, False])
def test_node_affinity_scheduling_strategy(
monkeypatch, ray_start_cluster, connect_to_client
Expand Down Expand Up @@ -496,6 +434,39 @@ def get_node_id(self):
ray.get(actor.get_node_id.remote())


def test_node_affinity_scheduling_strategy_spill_on_unavailable(ray_start_cluster):
cluster = ray_start_cluster
cluster.add_node(num_cpus=3)
ray.init(address=cluster.address)
cluster.add_node(num_cpus=3)
cluster.wait_for_nodes()

@ray.remote
def get_node_id_task(sleep_s=0):
time.sleep(sleep_s)
return ray.get_runtime_context().get_node_id()

target_node_id = ray.get(get_node_id_task.remote())

_ = [
get_node_id_task.options(
scheduling_strategy=NodeAffinitySchedulingStrategy(
target_node_id, soft=False
)
).remote(1000)
for _ in range(3)
]

soft_ref = get_node_id_task.options(
scheduling_strategy=NodeAffinitySchedulingStrategy(
target_node_id, soft=True, _spill_on_unavailable=True
)
).remote()

soft_node_id = ray.get(soft_ref, timeout=3)
assert target_node_id != soft_node_id


@pytest.mark.parametrize("connect_to_client", [True, False])
def test_spread_scheduling_strategy(ray_start_cluster, connect_to_client):
cluster = ray_start_cluster
Expand Down
22 changes: 12 additions & 10 deletions python/ray/util/scheduling_strategies.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,24 +43,26 @@ class NodeAffinitySchedulingStrategy:
Attributes:
node_id: the hex id of the node where the task or actor should run.
soft: If set to true, the task or actor will be scheduled on the node, unless
the target does not exist (e.g. the node dies), is infeasible,
or has insufficient resources at the time of scheduling. If it does
not schedule on the node it will find another node using the default
scheduling policy.
If set to false, the task or actor will be scheduled on the node, unless
the target doesn't exist (e.g. the node dies) or is infeasible.
If it does not schedule on the node the task will fail with
TaskUnschedulableError or ActorUnschedulableError.
soft: whether the scheduler should run the task or actor somewhere else
if the target node doesn't exist (e.g. the node dies) or is infeasible
during scheduling.
If the node exists and is feasible, the task or actor
will only be scheduled there.
This means if the node doesn't have the available resources,
the task or actor will wait indefinitely until resources become available.
If the node doesn't exist or is infeasible, the task or actor
will fail if soft is False
or be scheduled somewhere else if soft is True.
"""

def __init__(self, node_id: str, soft: bool):
def __init__(self, node_id: str, soft: bool, _spill_on_unavailable: bool = False):
# This will be removed once we standardize on node id being hex string.
if not isinstance(node_id, str):
node_id = node_id.hex()

self.node_id = node_id
self.soft = soft
self._spill_on_unavailable = _spill_on_unavailable


SchedulingStrategyT = Union[
Expand Down
6 changes: 5 additions & 1 deletion src/ray/common/task/task_spec.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ inline bool operator==(const ray::rpc::SchedulingStrategy &lhs,
return (lhs.node_affinity_scheduling_strategy().node_id() ==
rhs.node_affinity_scheduling_strategy().node_id()) &&
(lhs.node_affinity_scheduling_strategy().soft() ==
rhs.node_affinity_scheduling_strategy().soft());
rhs.node_affinity_scheduling_strategy().soft()) &&
(lhs.node_affinity_scheduling_strategy().spill_on_unavailable() ==
rhs.node_affinity_scheduling_strategy().spill_on_unavailable());
}
case ray::rpc::SchedulingStrategy::kPlacementGroupSchedulingStrategy: {
return (lhs.placement_group_scheduling_strategy().placement_group_id() ==
Expand Down Expand Up @@ -114,6 +116,8 @@ struct hash<ray::rpc::SchedulingStrategy> {
// soft returns a bool
hash ^= static_cast<size_t>(
scheduling_strategy.node_affinity_scheduling_strategy().soft());
hash ^= static_cast<size_t>(
scheduling_strategy.node_affinity_scheduling_strategy().spill_on_unavailable());
} else if (scheduling_strategy.scheduling_strategy_case() ==
ray::rpc::SchedulingStrategy::kPlacementGroupSchedulingStrategy) {
hash ^= std::hash<std::string>()(
Expand Down
9 changes: 9 additions & 0 deletions src/ray/common/test/task_spec_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ TEST(TaskSpecTest, TestSchedulingClassDescriptor) {
scheduling_strategy.mutable_node_affinity_scheduling_strategy()->set_node_id("y");
SchedulingClassDescriptor descriptor5(resources, descriptor, 0, scheduling_strategy);
SchedulingClassDescriptor descriptor6(resources, descriptor, 0, scheduling_strategy);
scheduling_strategy.mutable_node_affinity_scheduling_strategy()
->set_spill_on_unavailable(true);
SchedulingClassDescriptor descriptor10(resources, descriptor, 0, scheduling_strategy);
scheduling_strategy.mutable_placement_group_scheduling_strategy()
->set_placement_group_id("o");
scheduling_strategy.mutable_placement_group_scheduling_strategy()
Expand Down Expand Up @@ -81,6 +84,12 @@ TEST(TaskSpecTest, TestSchedulingClassDescriptor) {
ASSERT_TRUE(TaskSpecification::GetSchedulingClass(descriptor5) ==
TaskSpecification::GetSchedulingClass(descriptor6));

ASSERT_FALSE(descriptor6 == descriptor10);
ASSERT_FALSE(std::hash<SchedulingClassDescriptor>()(descriptor6) ==
std::hash<SchedulingClassDescriptor>()(descriptor10));
ASSERT_FALSE(TaskSpecification::GetSchedulingClass(descriptor6) ==
TaskSpecification::GetSchedulingClass(descriptor10));

ASSERT_FALSE(descriptor6 == descriptor7);
ASSERT_FALSE(std::hash<SchedulingClassDescriptor>()(descriptor6) ==
std::hash<SchedulingClassDescriptor>()(descriptor7));
Expand Down
1 change: 1 addition & 0 deletions src/ray/protobuf/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ enum TaskType {
message NodeAffinitySchedulingStrategy {
bytes node_id = 1;
bool soft = 2;
bool spill_on_unavailable = 3;
}

message PlacementGroupSchedulingStrategy {
Expand Down
4 changes: 3 additions & 1 deletion src/ray/raylet/scheduling/cluster_resource_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,9 @@ scheduling::NodeID ClusterResourceScheduler::GetBestSchedulableNode(
force_spillback,
force_spillback,
scheduling_strategy.node_affinity_scheduling_strategy().node_id(),
scheduling_strategy.node_affinity_scheduling_strategy().soft()));
scheduling_strategy.node_affinity_scheduling_strategy().soft(),
scheduling_strategy.node_affinity_scheduling_strategy()
.spill_on_unavailable()));
} else if (IsAffinityWithBundleSchedule(scheduling_strategy) &&
!is_local_node_with_raylet_) {
// This scheduling strategy is only used for gcs scheduling for the time being.
Expand Down
26 changes: 11 additions & 15 deletions src/ray/raylet/scheduling/policy/node_affinity_scheduling_policy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,21 @@ scheduling::NodeID NodeAffinitySchedulingPolicy::Schedule(
RAY_CHECK(options.scheduling_type == SchedulingType::NODE_AFFINITY);

scheduling::NodeID target_node_id = scheduling::NodeID(options.node_affinity_node_id);
bool target_feasible =
nodes_.contains(target_node_id) && is_node_alive_(target_node_id) &&
nodes_.at(target_node_id).GetLocalView().IsFeasible(resource_request);

if (options.node_affinity_soft) {
if (target_feasible &&
nodes_.at(target_node_id).GetLocalView().IsAvailable(resource_request)) {
if (nodes_.contains(target_node_id) && is_node_alive_(target_node_id) &&
nodes_.at(target_node_id).GetLocalView().IsFeasible(resource_request)) {
if (!options.node_affinity_spill_on_unavailable) {
return target_node_id;
} else {
options.scheduling_type = SchedulingType::HYBRID;
return hybrid_policy_.Schedule(resource_request, options);
}
} else {
if (target_feasible) {
} else if (nodes_.at(target_node_id).GetLocalView().IsAvailable(resource_request)) {
return target_node_id;
} else {
return scheduling::NodeID::Nil();
}
}

if (!options.node_affinity_soft) {
return scheduling::NodeID::Nil();
}

options.scheduling_type = SchedulingType::HYBRID;
return hybrid_policy_.Schedule(resource_request, options);
}

} // namespace raylet_scheduling_policy
Expand Down
8 changes: 7 additions & 1 deletion src/ray/raylet/scheduling/policy/scheduling_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,17 @@ struct SchedulingOptions {
static SchedulingOptions NodeAffinity(bool avoid_local_node,
bool require_node_available,
std::string node_id,
bool soft) {
bool soft,
bool spill_on_unavailable = false) {
if (spill_on_unavailable) {
RAY_CHECK(soft);
}
SchedulingOptions scheduling_options =
Hybrid(avoid_local_node, require_node_available);
scheduling_options.scheduling_type = SchedulingType::NODE_AFFINITY;
scheduling_options.node_affinity_node_id = node_id;
scheduling_options.node_affinity_soft = soft;
scheduling_options.node_affinity_spill_on_unavailable = spill_on_unavailable;
return scheduling_options;
}

Expand Down Expand Up @@ -159,6 +164,7 @@ struct SchedulingOptions {
std::shared_ptr<SchedulingContext> scheduling_context;
std::string node_affinity_node_id;
bool node_affinity_soft = false;
bool node_affinity_spill_on_unavailable = false;
// The node where the task is preferred to be placed. By default, this node id
// is empty, which means no preferred node.
std::string preferred_node_id;
Expand Down
10 changes: 8 additions & 2 deletions src/ray/raylet/scheduling/policy/scheduling_policy_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,14 @@ TEST_F(SchedulingPolicyTest, NodeAffinityPolicyTest) {

to_schedule = scheduling_policy.Schedule(
req, SchedulingOptions::NodeAffinity(false, false, "unavailable", true));
// Choose a different node if it's not available right now.
ASSERT_NE(to_schedule, scheduling::NodeID("unavailable"));
// Prefer the specified node even if it's not available right now.
ASSERT_EQ(to_schedule, scheduling::NodeID("unavailable"));

to_schedule = scheduling_policy.Schedule(
req, SchedulingOptions::NodeAffinity(false, false, "unavailable", true, true));
// The task is scheduled somewhere else since soft is true and spill_on_unavailable is
// also true.
ASSERT_EQ(to_schedule, scheduling::NodeID("local"));

to_schedule = scheduling_policy.Schedule(
req, SchedulingOptions::NodeAffinity(false, false, "infeasible", false));
Expand Down

0 comments on commit d44d689

Please sign in to comment.