diff --git a/doc/source/ray-core/scheduling/index.rst b/doc/source/ray-core/scheduling/index.rst index 59d8e5fe4275..16b28525e83b 100644 --- a/doc/source/ray-core/scheduling/index.rst +++ b/doc/source/ray-core/scheduling/index.rst @@ -78,10 +78,13 @@ NodeAffinitySchedulingStrategy ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ :py:class:`~ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy` is a low-level strategy that allows a task or actor to be scheduled onto a particular node specified by its node id. -The ``soft`` flag specifies whether the task or actor is allowed to run somewhere else. If ``soft`` is True, the task or actor will be scheduled onto the node if it is alive, feasible, and has -sufficient resources at the time of scheduling. Otherwise, it will find a different node using the default scheduling strategy. If ``soft`` is False, the task or actor will be scheduled -onto the node if it is alive and feasible. It may be queued and wait on the node until it has resources to execute. If the node is infeasible, the task or actor will fail with the -:py:class:`~ray.exceptions.TaskUnschedulableError` or :py:class:`~ray.exceptions.ActorUnschedulableError`. +The ``soft`` flag specifies whether the task or actor is allowed to run somewhere else if the specified node doesn't exist (e.g. if the node dies) +or is infeasible because it does not have the resources required to run the task or actor. +In these cases, if ``soft`` is True, the task or actor will be scheduled onto a different feasible node. +Otherwise, the task or actor will fail with :py:class:`~ray.exceptions.TaskUnschedulableError` or :py:class:`~ray.exceptions.ActorUnschedulableError`. +As long as the specified node is alive and feasible, the task or actor will only run there +regardless of the ``soft`` flag. This means if the node currently has no available resources, the task or actor will wait until resources +become available. This strategy should *only* be used if other high level scheduling strategies (e.g. :ref:`placement group `) cannot give the desired task or actor placements. It has the following known limitations: diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index e84ae5bb5567..c34179589cec 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -1954,6 +1954,8 @@ cdef class CoreWorker: NodeID.from_hex(python_scheduling_strategy.node_id).binary()) c_node_affinity_scheduling_strategy[0].set_soft( python_scheduling_strategy.soft) + c_node_affinity_scheduling_strategy[0].set_spill_on_unavailable( + python_scheduling_strategy._spill_on_unavailable) else: raise ValueError( f"Invalid scheduling_strategy value " diff --git a/python/ray/data/_internal/execution/autoscaling_requester.py b/python/ray/data/_internal/execution/autoscaling_requester.py index 15893a19a1a9..85720043bd0a 100644 --- a/python/ray/data/_internal/execution/autoscaling_requester.py +++ b/python/ray/data/_internal/execution/autoscaling_requester.py @@ -96,6 +96,7 @@ def get_or_create_autoscaling_requester_actor(): scheduling_strategy = NodeAffinitySchedulingStrategy( ray.get_runtime_context().get_node_id(), soft=True, + _spill_on_unavailable=True, ) return AutoscalingRequester.options( name="AutoscalingRequester", diff --git a/python/ray/data/_internal/execution/operators/map_operator.py b/python/ray/data/_internal/execution/operators/map_operator.py index 6115bdf02f53..39cdbde9dc7a 100644 --- a/python/ray/data/_internal/execution/operators/map_operator.py +++ b/python/ray/data/_internal/execution/operators/map_operator.py @@ -152,6 +152,7 @@ def __call__(self, args): args["scheduling_strategy"] = NodeAffinitySchedulingStrategy( self.locs[self.i], soft=True, + _spill_on_unavailable=True, ) self.i += 1 self.i %= len(self.locs) diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index c9f26db37678..f08c75a6d5d7 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -168,6 +168,7 @@ cdef extern from "src/ray/protobuf/common.pb.h" nogil: CNodeAffinitySchedulingStrategy() void set_node_id(const c_string& node_id) void set_soft(c_bool soft) + void set_spill_on_unavailable(c_bool spill_on_unavailable) cdef cppclass CSchedulingStrategy "ray::rpc::SchedulingStrategy": CSchedulingStrategy() void clear_scheduling_strategy() diff --git a/python/ray/tests/test_scheduling_2.py b/python/ray/tests/test_scheduling_2.py index 832e972edf11..f9a92f347eff 100644 --- a/python/ray/tests/test_scheduling_2.py +++ b/python/ray/tests/test_scheduling_2.py @@ -245,68 +245,6 @@ def func(): func.options(scheduling_strategy="XXX").remote() -def test_soft_chooses_another_node_if_not_available(ray_start_cluster): - cluster = ray_start_cluster - cluster.add_node(num_cpus=3) - cluster.add_node(num_cpus=3) - ray.init(address=cluster.address) - cluster.wait_for_nodes() - - @ray.remote - def get_node_id_task(sleep_s=0): - time.sleep(sleep_s) - return ray.get_runtime_context().get_node_id() - - target_node_id = ray.get(get_node_id_task.remote()) - - _ = [ - get_node_id_task.options( - scheduling_strategy=NodeAffinitySchedulingStrategy( - target_node_id, soft=False - ) - ).remote(1000) - for _ in range(3) - ] - - soft_ref = get_node_id_task.options( - scheduling_strategy=NodeAffinitySchedulingStrategy(target_node_id, soft=True) - ).remote() - - soft_node_id = ray.get(soft_ref, timeout=3) - assert target_node_id != soft_node_id - - -def test_not_soft_queues_if_not_available(ray_start_cluster): - cluster = ray_start_cluster - cluster.add_node(num_cpus=3) - cluster.add_node(num_cpus=3) - ray.init(address=cluster.address) - cluster.wait_for_nodes() - - @ray.remote - def get_node_id_task(sleep_s=0): - time.sleep(sleep_s) - return ray.get_runtime_context().get_node_id() - - target_node_id = ray.get(get_node_id_task.remote()) - - _ = [ - get_node_id_task.options( - scheduling_strategy=NodeAffinitySchedulingStrategy( - target_node_id, soft=False - ) - ).remote(1000) - for _ in range(3) - ] - - hard_ref = get_node_id_task.options( - scheduling_strategy=NodeAffinitySchedulingStrategy(target_node_id, soft=False) - ).remote() - - with pytest.raises(ray.exceptions.GetTimeoutError): - ray.get(hard_ref, timeout=3) - - @pytest.mark.parametrize("connect_to_client", [True, False]) def test_node_affinity_scheduling_strategy( monkeypatch, ray_start_cluster, connect_to_client @@ -496,6 +434,39 @@ def get_node_id(self): ray.get(actor.get_node_id.remote()) +def test_node_affinity_scheduling_strategy_spill_on_unavailable(ray_start_cluster): + cluster = ray_start_cluster + cluster.add_node(num_cpus=3) + ray.init(address=cluster.address) + cluster.add_node(num_cpus=3) + cluster.wait_for_nodes() + + @ray.remote + def get_node_id_task(sleep_s=0): + time.sleep(sleep_s) + return ray.get_runtime_context().get_node_id() + + target_node_id = ray.get(get_node_id_task.remote()) + + _ = [ + get_node_id_task.options( + scheduling_strategy=NodeAffinitySchedulingStrategy( + target_node_id, soft=False + ) + ).remote(1000) + for _ in range(3) + ] + + soft_ref = get_node_id_task.options( + scheduling_strategy=NodeAffinitySchedulingStrategy( + target_node_id, soft=True, _spill_on_unavailable=True + ) + ).remote() + + soft_node_id = ray.get(soft_ref, timeout=3) + assert target_node_id != soft_node_id + + @pytest.mark.parametrize("connect_to_client", [True, False]) def test_spread_scheduling_strategy(ray_start_cluster, connect_to_client): cluster = ray_start_cluster diff --git a/python/ray/util/scheduling_strategies.py b/python/ray/util/scheduling_strategies.py index 9be82552472b..c966348858c2 100644 --- a/python/ray/util/scheduling_strategies.py +++ b/python/ray/util/scheduling_strategies.py @@ -43,24 +43,26 @@ class NodeAffinitySchedulingStrategy: Attributes: node_id: the hex id of the node where the task or actor should run. - soft: If set to true, the task or actor will be scheduled on the node, unless - the target does not exist (e.g. the node dies), is infeasible, - or has insufficient resources at the time of scheduling. If it does - not schedule on the node it will find another node using the default - scheduling policy. - If set to false, the task or actor will be scheduled on the node, unless - the target doesn't exist (e.g. the node dies) or is infeasible. - If it does not schedule on the node the task will fail with - TaskUnschedulableError or ActorUnschedulableError. + soft: whether the scheduler should run the task or actor somewhere else + if the target node doesn't exist (e.g. the node dies) or is infeasible + during scheduling. + If the node exists and is feasible, the task or actor + will only be scheduled there. + This means if the node doesn't have the available resources, + the task or actor will wait indefinitely until resources become available. + If the node doesn't exist or is infeasible, the task or actor + will fail if soft is False + or be scheduled somewhere else if soft is True. """ - def __init__(self, node_id: str, soft: bool): + def __init__(self, node_id: str, soft: bool, _spill_on_unavailable: bool = False): # This will be removed once we standardize on node id being hex string. if not isinstance(node_id, str): node_id = node_id.hex() self.node_id = node_id self.soft = soft + self._spill_on_unavailable = _spill_on_unavailable SchedulingStrategyT = Union[ diff --git a/src/ray/common/task/task_spec.h b/src/ray/common/task/task_spec.h index e1aea6fc4459..3b29d2aadb3b 100644 --- a/src/ray/common/task/task_spec.h +++ b/src/ray/common/task/task_spec.h @@ -45,7 +45,9 @@ inline bool operator==(const ray::rpc::SchedulingStrategy &lhs, return (lhs.node_affinity_scheduling_strategy().node_id() == rhs.node_affinity_scheduling_strategy().node_id()) && (lhs.node_affinity_scheduling_strategy().soft() == - rhs.node_affinity_scheduling_strategy().soft()); + rhs.node_affinity_scheduling_strategy().soft()) && + (lhs.node_affinity_scheduling_strategy().spill_on_unavailable() == + rhs.node_affinity_scheduling_strategy().spill_on_unavailable()); } case ray::rpc::SchedulingStrategy::kPlacementGroupSchedulingStrategy: { return (lhs.placement_group_scheduling_strategy().placement_group_id() == @@ -114,6 +116,8 @@ struct hash { // soft returns a bool hash ^= static_cast( scheduling_strategy.node_affinity_scheduling_strategy().soft()); + hash ^= static_cast( + scheduling_strategy.node_affinity_scheduling_strategy().spill_on_unavailable()); } else if (scheduling_strategy.scheduling_strategy_case() == ray::rpc::SchedulingStrategy::kPlacementGroupSchedulingStrategy) { hash ^= std::hash()( diff --git a/src/ray/common/test/task_spec_test.cc b/src/ray/common/test/task_spec_test.cc index 8b72a73b3b17..6a7d1bfe65a3 100644 --- a/src/ray/common/test/task_spec_test.cc +++ b/src/ray/common/test/task_spec_test.cc @@ -32,6 +32,9 @@ TEST(TaskSpecTest, TestSchedulingClassDescriptor) { scheduling_strategy.mutable_node_affinity_scheduling_strategy()->set_node_id("y"); SchedulingClassDescriptor descriptor5(resources, descriptor, 0, scheduling_strategy); SchedulingClassDescriptor descriptor6(resources, descriptor, 0, scheduling_strategy); + scheduling_strategy.mutable_node_affinity_scheduling_strategy() + ->set_spill_on_unavailable(true); + SchedulingClassDescriptor descriptor10(resources, descriptor, 0, scheduling_strategy); scheduling_strategy.mutable_placement_group_scheduling_strategy() ->set_placement_group_id("o"); scheduling_strategy.mutable_placement_group_scheduling_strategy() @@ -81,6 +84,12 @@ TEST(TaskSpecTest, TestSchedulingClassDescriptor) { ASSERT_TRUE(TaskSpecification::GetSchedulingClass(descriptor5) == TaskSpecification::GetSchedulingClass(descriptor6)); + ASSERT_FALSE(descriptor6 == descriptor10); + ASSERT_FALSE(std::hash()(descriptor6) == + std::hash()(descriptor10)); + ASSERT_FALSE(TaskSpecification::GetSchedulingClass(descriptor6) == + TaskSpecification::GetSchedulingClass(descriptor10)); + ASSERT_FALSE(descriptor6 == descriptor7); ASSERT_FALSE(std::hash()(descriptor6) == std::hash()(descriptor7)); diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index fc436fd71430..751194bc8f0d 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -52,6 +52,7 @@ enum TaskType { message NodeAffinitySchedulingStrategy { bytes node_id = 1; bool soft = 2; + bool spill_on_unavailable = 3; } message PlacementGroupSchedulingStrategy { diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler.cc b/src/ray/raylet/scheduling/cluster_resource_scheduler.cc index 35afa17f94b1..2ad785a3cf9b 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler.cc +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler.cc @@ -150,7 +150,9 @@ scheduling::NodeID ClusterResourceScheduler::GetBestSchedulableNode( force_spillback, force_spillback, scheduling_strategy.node_affinity_scheduling_strategy().node_id(), - scheduling_strategy.node_affinity_scheduling_strategy().soft())); + scheduling_strategy.node_affinity_scheduling_strategy().soft(), + scheduling_strategy.node_affinity_scheduling_strategy() + .spill_on_unavailable())); } else if (IsAffinityWithBundleSchedule(scheduling_strategy) && !is_local_node_with_raylet_) { // This scheduling strategy is only used for gcs scheduling for the time being. diff --git a/src/ray/raylet/scheduling/policy/node_affinity_scheduling_policy.cc b/src/ray/raylet/scheduling/policy/node_affinity_scheduling_policy.cc index 152449449961..1f948ef07010 100644 --- a/src/ray/raylet/scheduling/policy/node_affinity_scheduling_policy.cc +++ b/src/ray/raylet/scheduling/policy/node_affinity_scheduling_policy.cc @@ -22,25 +22,21 @@ scheduling::NodeID NodeAffinitySchedulingPolicy::Schedule( RAY_CHECK(options.scheduling_type == SchedulingType::NODE_AFFINITY); scheduling::NodeID target_node_id = scheduling::NodeID(options.node_affinity_node_id); - bool target_feasible = - nodes_.contains(target_node_id) && is_node_alive_(target_node_id) && - nodes_.at(target_node_id).GetLocalView().IsFeasible(resource_request); - - if (options.node_affinity_soft) { - if (target_feasible && - nodes_.at(target_node_id).GetLocalView().IsAvailable(resource_request)) { + if (nodes_.contains(target_node_id) && is_node_alive_(target_node_id) && + nodes_.at(target_node_id).GetLocalView().IsFeasible(resource_request)) { + if (!options.node_affinity_spill_on_unavailable) { return target_node_id; - } else { - options.scheduling_type = SchedulingType::HYBRID; - return hybrid_policy_.Schedule(resource_request, options); - } - } else { - if (target_feasible) { + } else if (nodes_.at(target_node_id).GetLocalView().IsAvailable(resource_request)) { return target_node_id; - } else { - return scheduling::NodeID::Nil(); } } + + if (!options.node_affinity_soft) { + return scheduling::NodeID::Nil(); + } + + options.scheduling_type = SchedulingType::HYBRID; + return hybrid_policy_.Schedule(resource_request, options); } } // namespace raylet_scheduling_policy diff --git a/src/ray/raylet/scheduling/policy/scheduling_options.h b/src/ray/raylet/scheduling/policy/scheduling_options.h index cea8254ec9e1..ddfc377694ca 100644 --- a/src/ray/raylet/scheduling/policy/scheduling_options.h +++ b/src/ray/raylet/scheduling/policy/scheduling_options.h @@ -75,12 +75,17 @@ struct SchedulingOptions { static SchedulingOptions NodeAffinity(bool avoid_local_node, bool require_node_available, std::string node_id, - bool soft) { + bool soft, + bool spill_on_unavailable = false) { + if (spill_on_unavailable) { + RAY_CHECK(soft); + } SchedulingOptions scheduling_options = Hybrid(avoid_local_node, require_node_available); scheduling_options.scheduling_type = SchedulingType::NODE_AFFINITY; scheduling_options.node_affinity_node_id = node_id; scheduling_options.node_affinity_soft = soft; + scheduling_options.node_affinity_spill_on_unavailable = spill_on_unavailable; return scheduling_options; } @@ -159,6 +164,7 @@ struct SchedulingOptions { std::shared_ptr scheduling_context; std::string node_affinity_node_id; bool node_affinity_soft = false; + bool node_affinity_spill_on_unavailable = false; // The node where the task is preferred to be placed. By default, this node id // is empty, which means no preferred node. std::string preferred_node_id; diff --git a/src/ray/raylet/scheduling/policy/scheduling_policy_test.cc b/src/ray/raylet/scheduling/policy/scheduling_policy_test.cc index 3ed31b1ebb12..86d32b1d547d 100644 --- a/src/ray/raylet/scheduling/policy/scheduling_policy_test.cc +++ b/src/ray/raylet/scheduling/policy/scheduling_policy_test.cc @@ -99,8 +99,14 @@ TEST_F(SchedulingPolicyTest, NodeAffinityPolicyTest) { to_schedule = scheduling_policy.Schedule( req, SchedulingOptions::NodeAffinity(false, false, "unavailable", true)); - // Choose a different node if it's not available right now. - ASSERT_NE(to_schedule, scheduling::NodeID("unavailable")); + // Prefer the specified node even if it's not available right now. + ASSERT_EQ(to_schedule, scheduling::NodeID("unavailable")); + + to_schedule = scheduling_policy.Schedule( + req, SchedulingOptions::NodeAffinity(false, false, "unavailable", true, true)); + // The task is scheduled somewhere else since soft is true and spill_on_unavailable is + // also true. + ASSERT_EQ(to_schedule, scheduling::NodeID("local")); to_schedule = scheduling_policy.Schedule( req, SchedulingOptions::NodeAffinity(false, false, "infeasible", false));