diff --git a/.buildkite/core.rayci.yml b/.buildkite/core.rayci.yml index 98b25c6a3822..dcbebd735c3e 100644 --- a/.buildkite/core.rayci.yml +++ b/.buildkite/core.rayci.yml @@ -61,7 +61,7 @@ steps: - label: ":ray: core: python {{matrix.python}} tests ({{matrix.worker_id}})" if: build.pull_request.labels includes "continuous-build" || pipeline.id == "0189e759-8c96-4302-b6b5-b4274406bf89" || pipeline.id == "018f4f1e-1b73-4906-9802-92422e3badaa" - tags: + tags: - python - dashboard instance_type: large diff --git a/python/ray/tests/test_network_failure_e2e.py b/python/ray/tests/test_network_failure_e2e.py index 7a70b45e4a16..02e493acbc89 100644 --- a/python/ray/tests/test_network_failure_e2e.py +++ b/python/ray/tests/test_network_failure_e2e.py @@ -3,6 +3,7 @@ from time import sleep import pytest +import threading from ray._private.test_utils import wait_for_condition from ray.tests.conftest_docker import * # noqa from ray.tests.conftest_docker import gen_head_node, gen_worker_node @@ -157,7 +158,6 @@ def test_transient_network_error(head2, worker2, gcs_network): network = gcs_network check_two_nodes = """ -import sys import ray from ray._private.test_utils import wait_for_condition @@ -196,6 +196,124 @@ def ping(self): assert result.exit_code == 0, result.output.decode("utf-8") +head3 = gen_head_node( + { + "RAY_grpc_keepalive_time_ms": "1000", + "RAY_grpc_client_keepalive_time_ms": "1000", + "RAY_grpc_client_keepalive_timeout_ms": "1000", + "RAY_health_check_initial_delay_ms": "1000", + "RAY_health_check_period_ms": "1000", + "RAY_health_check_timeout_ms": "100000", + "RAY_health_check_failure_threshold": "20", + } +) + +worker3 = gen_worker_node( + envs={ + "RAY_grpc_keepalive_time_ms": "1000", + "RAY_grpc_client_keepalive_time_ms": "1000", + "RAY_grpc_client_keepalive_timeout_ms": "1000", + "RAY_health_check_initial_delay_ms": "1000", + "RAY_health_check_period_ms": "1000", + "RAY_health_check_timeout_ms": "100000", + "RAY_health_check_failure_threshold": "20", + }, + num_cpus=2, +) + + +def test_async_actor_task_retry(head3, worker3, gcs_network): + # Test that if transient network error happens + # after an async actor task is submitted and being executed, + # a secon attempt will be submitted and executed after the + # first attempt finishes. + network = gcs_network + + driver = """ +import asyncio +import ray +from ray.util.state import list_tasks + +ray.init(namespace="test") + +@ray.remote(num_cpus=0.1, name="counter", lifetime="detached") +class Counter: + def __init__(self): + self.count = 0 + + def inc(self): + self.count = self.count + 1 + return self.count + + @ray.method(max_task_retries=-1) + def get(self): + return self.count + +@ray.remote(num_cpus=0.1, max_task_retries=-1) +class AsyncActor: + def __init__(self, counter): + self.counter = counter + + async def run(self): + count = await self.counter.get.remote() + if count == 0: + # first attempt + await self.counter.inc.remote() + while len(list_tasks( + filters=[("name", "=", "AsyncActor.run")])) != 2: + # wait for second attempt to be made + await asyncio.sleep(1) + # wait until the second attempt reaches the actor + await asyncio.sleep(2) + await self.counter.inc.remote() + return "first" + else: + # second attempt + # make sure second attempt only runs + # after first attempt finishes + assert count == 2 + return "second" + +counter = Counter.remote() +async_actor = AsyncActor.remote(counter) +assert ray.get(async_actor.run.remote()) == "second" +""" + + check_async_actor_run_is_called = """ +import ray +from ray._private.test_utils import wait_for_condition +ray.init(namespace="test") + +wait_for_condition(lambda: ray.get_actor("counter") is not None) +counter = ray.get_actor("counter") +wait_for_condition(lambda: ray.get(counter.get.remote()) == 1) +""" + + def inject_transient_network_failure(): + try: + result = head3.exec_run( + cmd=f"python -c '{check_async_actor_run_is_called}'" + ) + assert result.exit_code == 0, result.output.decode("utf-8") + + worker_ip = worker3._container.attrs["NetworkSettings"]["Networks"][ + network.name + ]["IPAddress"] + network.disconnect(worker3.name, force=True) + sleep(2) + network.connect(worker3.name, ipv4_address=worker_ip) + except Exception as e: + print(f"Network failure injection failed {e}") + + t = threading.Thread(target=inject_transient_network_failure, daemon=True) + t.start() + + result = head3.exec_run( + cmd=f"python -c '{driver}'", + ) + assert result.exit_code == 0, result.output.decode("utf-8") + + if __name__ == "__main__": import os diff --git a/src/ray/common/ray_syncer/ray_syncer.cc b/src/ray/common/ray_syncer/ray_syncer.cc index fcde9d4defba..f303893eab99 100644 --- a/src/ray/common/ray_syncer/ray_syncer.cc +++ b/src/ray/common/ray_syncer/ray_syncer.cc @@ -233,8 +233,8 @@ void RaySyncer::Connect(const std::string &node_id, execute_after( io_context_, [this, node_id, channel]() { - RAY_LOG(INFO) << "Connection is broken. Reconnect to node: " - << NodeID::FromBinary(node_id); + RAY_LOG(INFO).WithField(NodeID::FromBinary(node_id)) + << "Connection is broken. Reconnect to node."; Connect(node_id, channel); }, /* delay_microseconds = */ std::chrono::milliseconds(2000)); @@ -370,10 +370,11 @@ ServerBidiReactor *RaySyncerService::StartSync(grpc::CallbackServerContext *cont // 4. OnDone method of the old reactor is called which calls this cleanup_cb_ return; } + RAY_LOG(INFO).WithField(NodeID::FromBinary(node_id)) << "Connection is broken."; syncer_.sync_reactors_.erase(node_id); syncer_.node_state_->RemoveNode(node_id); }); - RAY_LOG(INFO).WithField(kLogKeyNodeID, NodeID::FromBinary(reactor->GetRemoteNodeID())) + RAY_LOG(INFO).WithField(NodeID::FromBinary(reactor->GetRemoteNodeID())) << "Get connection"; // Disconnect exiting connection if there is any. // This can happen when there is transient network error diff --git a/src/ray/core_worker/test/scheduling_queue_test.cc b/src/ray/core_worker/test/scheduling_queue_test.cc index ec6d99688a9a..9b06408714a7 100644 --- a/src/ray/core_worker/test/scheduling_queue_test.cc +++ b/src/ray/core_worker/test/scheduling_queue_test.cc @@ -19,46 +19,11 @@ #include "ray/common/test_util.h" #include "ray/core_worker/transport/task_receiver.h" +using namespace std::chrono_literals; + namespace ray { namespace core { -class MockActorSchedulingQueue { - public: - MockActorSchedulingQueue(instrumented_io_context &main_io_service, - DependencyWaiter &waiter) - : queue_(main_io_service, - waiter, - /*pool_manager=*/ - std::make_shared>(), - /*fiber_state_manager=*/ - std::make_shared>(), - /*is_asyncio=*/false, - /*fiber_max_concurrency=*/1, - /*concurrency_groups=*/{}) {} - void Add(int64_t seq_no, - int64_t client_processed_up_to, - std::function accept_request, - std::function reject_request, - rpc::SendReplyCallback send_reply_callback = nullptr, - TaskID task_id = TaskID::Nil(), - const std::vector &dependencies = {}) { - queue_.Add(seq_no, - client_processed_up_to, - std::move(accept_request), - std::move(reject_request), - send_reply_callback, - "", - FunctionDescriptorBuilder::Empty(), - task_id, - dependencies); - } - - ~MockActorSchedulingQueue() { queue_.Stop(); } - - private: - ActorSchedulingQueue queue_; -}; - class MockWaiter : public DependencyWaiter { public: MockWaiter() {} @@ -77,20 +42,64 @@ class MockWaiter : public DependencyWaiter { TEST(SchedulingQueueTest, TestInOrder) { instrumented_io_context io_service; MockWaiter waiter; - MockActorSchedulingQueue queue(io_service, waiter); + ActorSchedulingQueue queue(io_service, + waiter, + std::make_shared>(), + std::make_shared>(), + /*is_asyncio=*/false, + /*fiber_max_concurrency=*/1, + /*concurrency_groups=*/{}); int n_ok = 0; int n_rej = 0; auto fn_ok = [&n_ok](rpc::SendReplyCallback callback) { n_ok++; }; auto fn_rej = [&n_rej](const Status &status, rpc::SendReplyCallback callback) { n_rej++; }; - queue.Add(0, -1, fn_ok, fn_rej, nullptr); - queue.Add(1, -1, fn_ok, fn_rej, nullptr); - queue.Add(2, -1, fn_ok, fn_rej, nullptr); - queue.Add(3, -1, fn_ok, fn_rej, nullptr); + queue.Add(0, + -1, + fn_ok, + fn_rej, + nullptr, + "", + FunctionDescriptorBuilder::Empty(), + TaskID::Nil(), + /*attempt_number=*/0, + {}); + queue.Add(1, + -1, + fn_ok, + fn_rej, + nullptr, + "", + FunctionDescriptorBuilder::Empty(), + TaskID::Nil(), + /*attempt_number=*/0, + {}); + queue.Add(2, + -1, + fn_ok, + fn_rej, + nullptr, + "", + FunctionDescriptorBuilder::Empty(), + TaskID::Nil(), + /*attempt_number=*/0, + {}); + queue.Add(3, + -1, + fn_ok, + fn_rej, + nullptr, + "", + FunctionDescriptorBuilder::Empty(), + TaskID::Nil(), + /*attempt_number=*/0, + {}); io_service.run(); ASSERT_EQ(n_ok, 4); ASSERT_EQ(n_rej, 0); + + queue.Stop(); } TEST(SchedulingQueueTest, TestWaitForObjects) { @@ -99,7 +108,13 @@ TEST(SchedulingQueueTest, TestWaitForObjects) { ObjectID obj3 = ObjectID::FromRandom(); instrumented_io_context io_service; MockWaiter waiter; - MockActorSchedulingQueue queue(io_service, waiter); + ActorSchedulingQueue queue(io_service, + waiter, + std::make_shared>(), + std::make_shared>(), + /*is_asyncio=*/false, + /*fiber_max_concurrency=*/1, + /*concurrency_groups=*/{}); int n_ok = 0; int n_rej = 0; @@ -107,10 +122,46 @@ TEST(SchedulingQueueTest, TestWaitForObjects) { auto fn_rej = [&n_rej](const Status &status, rpc::SendReplyCallback callback) { n_rej++; }; - queue.Add(0, -1, fn_ok, fn_rej, nullptr); - queue.Add(1, -1, fn_ok, fn_rej, nullptr, TaskID::Nil(), ObjectIdsToRefs({obj1})); - queue.Add(2, -1, fn_ok, fn_rej, nullptr, TaskID::Nil(), ObjectIdsToRefs({obj2})); - queue.Add(3, -1, fn_ok, fn_rej, nullptr, TaskID::Nil(), ObjectIdsToRefs({obj3})); + queue.Add(0, + -1, + fn_ok, + fn_rej, + nullptr, + "", + FunctionDescriptorBuilder::Empty(), + TaskID::Nil(), + /*attempt_number=*/0, + {}); + queue.Add(1, + -1, + fn_ok, + fn_rej, + nullptr, + "", + FunctionDescriptorBuilder::Empty(), + TaskID::Nil(), + /*attempt_number=*/0, + ObjectIdsToRefs({obj1})); + queue.Add(2, + -1, + fn_ok, + fn_rej, + nullptr, + "", + FunctionDescriptorBuilder::Empty(), + TaskID::Nil(), + /*attempt_number=*/0, + ObjectIdsToRefs({obj2})); + queue.Add(3, + -1, + fn_ok, + fn_rej, + nullptr, + "", + FunctionDescriptorBuilder::Empty(), + TaskID::Nil(), + /*attempt_number=*/0, + ObjectIdsToRefs({obj3})); ASSERT_EQ(n_ok, 1); @@ -122,13 +173,21 @@ TEST(SchedulingQueueTest, TestWaitForObjects) { waiter.Complete(1); ASSERT_EQ(n_ok, 4); + + queue.Stop(); } TEST(SchedulingQueueTest, TestWaitForObjectsNotSubjectToSeqTimeout) { ObjectID obj1 = ObjectID::FromRandom(); instrumented_io_context io_service; MockWaiter waiter; - MockActorSchedulingQueue queue(io_service, waiter); + ActorSchedulingQueue queue(io_service, + waiter, + std::make_shared>(), + std::make_shared>(), + /*is_asyncio=*/false, + /*fiber_max_concurrency=*/1, + /*concurrency_groups=*/{}); int n_ok = 0; int n_rej = 0; @@ -136,75 +195,227 @@ TEST(SchedulingQueueTest, TestWaitForObjectsNotSubjectToSeqTimeout) { auto fn_rej = [&n_rej](const Status &status, rpc::SendReplyCallback callback) { n_rej++; }; - queue.Add(0, -1, fn_ok, fn_rej, nullptr); - queue.Add(1, -1, fn_ok, fn_rej, nullptr, TaskID::Nil(), ObjectIdsToRefs({obj1})); + queue.Add(0, + -1, + fn_ok, + fn_rej, + nullptr, + "", + FunctionDescriptorBuilder::Empty(), + TaskID::Nil(), + /*attempt_number=*/0, + {}); + queue.Add(1, + -1, + fn_ok, + fn_rej, + nullptr, + "", + FunctionDescriptorBuilder::Empty(), + TaskID::Nil(), + /*attempt_number=*/0, + ObjectIdsToRefs({obj1})); ASSERT_EQ(n_ok, 1); io_service.run(); ASSERT_EQ(n_rej, 0); waiter.Complete(0); ASSERT_EQ(n_ok, 2); + + queue.Stop(); } TEST(SchedulingQueueTest, TestOutOfOrder) { instrumented_io_context io_service; MockWaiter waiter; - MockActorSchedulingQueue queue(io_service, waiter); + ActorSchedulingQueue queue(io_service, + waiter, + std::make_shared>(), + std::make_shared>(), + /*is_asyncio=*/false, + /*fiber_max_concurrency=*/1, + /*concurrency_groups=*/{}); int n_ok = 0; int n_rej = 0; auto fn_ok = [&n_ok](rpc::SendReplyCallback callback) { n_ok++; }; auto fn_rej = [&n_rej](const Status &status, rpc::SendReplyCallback callback) { n_rej++; }; - queue.Add(2, -1, fn_ok, fn_rej, nullptr); - queue.Add(0, -1, fn_ok, fn_rej, nullptr); - queue.Add(3, -1, fn_ok, fn_rej, nullptr); - queue.Add(1, -1, fn_ok, fn_rej, nullptr); + queue.Add(2, + -1, + fn_ok, + fn_rej, + nullptr, + "", + FunctionDescriptorBuilder::Empty(), + TaskID::Nil(), + /*attempt_number=*/0, + {}); + queue.Add(0, + -1, + fn_ok, + fn_rej, + nullptr, + "", + FunctionDescriptorBuilder::Empty(), + TaskID::Nil(), + /*attempt_number=*/0, + {}); + queue.Add(3, + -1, + fn_ok, + fn_rej, + nullptr, + "", + FunctionDescriptorBuilder::Empty(), + TaskID::Nil(), + /*attempt_number=*/0, + {}); + queue.Add(1, + -1, + fn_ok, + fn_rej, + nullptr, + "", + FunctionDescriptorBuilder::Empty(), + TaskID::Nil(), + /*attempt_number=*/0, + {}); io_service.run(); ASSERT_EQ(n_ok, 4); ASSERT_EQ(n_rej, 0); + + queue.Stop(); } TEST(SchedulingQueueTest, TestSeqWaitTimeout) { instrumented_io_context io_service; MockWaiter waiter; - MockActorSchedulingQueue queue(io_service, waiter); + ActorSchedulingQueue queue(io_service, + waiter, + std::make_shared>(), + std::make_shared>(), + /*is_asyncio=*/false, + /*fiber_max_concurrency=*/1, + /*concurrency_groups=*/{}); int n_ok = 0; int n_rej = 0; auto fn_ok = [&n_ok](rpc::SendReplyCallback callback) { n_ok++; }; auto fn_rej = [&n_rej](const Status &status, rpc::SendReplyCallback callback) { n_rej++; }; - queue.Add(2, -1, fn_ok, fn_rej, nullptr); - queue.Add(0, -1, fn_ok, fn_rej, nullptr); - queue.Add(3, -1, fn_ok, fn_rej, nullptr); + queue.Add(2, + -1, + fn_ok, + fn_rej, + nullptr, + "", + FunctionDescriptorBuilder::Empty(), + TaskID::Nil(), + /*attempt_number=*/0, + {}); + queue.Add(0, + -1, + fn_ok, + fn_rej, + nullptr, + "", + FunctionDescriptorBuilder::Empty(), + TaskID::Nil(), + /*attempt_number=*/0, + {}); + queue.Add(3, + -1, + fn_ok, + fn_rej, + nullptr, + "", + FunctionDescriptorBuilder::Empty(), + TaskID::Nil(), + /*attempt_number=*/0, + {}); ASSERT_EQ(n_ok, 1); ASSERT_EQ(n_rej, 0); io_service.run(); // immediately triggers timeout ASSERT_EQ(n_ok, 1); ASSERT_EQ(n_rej, 2); - queue.Add(4, -1, fn_ok, fn_rej, nullptr); - queue.Add(5, -1, fn_ok, fn_rej, nullptr); + queue.Add(4, + -1, + fn_ok, + fn_rej, + nullptr, + "", + FunctionDescriptorBuilder::Empty(), + TaskID::Nil(), + /*attempt_number=*/0, + {}); + queue.Add(5, + -1, + fn_ok, + fn_rej, + nullptr, + "", + FunctionDescriptorBuilder::Empty(), + TaskID::Nil(), + /*attempt_number=*/0, + {}); ASSERT_EQ(n_ok, 3); ASSERT_EQ(n_rej, 2); + + queue.Stop(); } TEST(SchedulingQueueTest, TestSkipAlreadyProcessedByClient) { instrumented_io_context io_service; MockWaiter waiter; - MockActorSchedulingQueue queue(io_service, waiter); + ActorSchedulingQueue queue(io_service, + waiter, + std::make_shared>(), + std::make_shared>(), + /*is_asyncio=*/false, + /*fiber_max_concurrency=*/1, + /*concurrency_groups=*/{}); int n_ok = 0; int n_rej = 0; auto fn_ok = [&n_ok](rpc::SendReplyCallback callback) { n_ok++; }; auto fn_rej = [&n_rej](const Status &status, rpc::SendReplyCallback callback) { n_rej++; }; - queue.Add(2, 2, fn_ok, fn_rej, nullptr); - queue.Add(3, 2, fn_ok, fn_rej, nullptr); - queue.Add(1, 2, fn_ok, fn_rej, nullptr); + queue.Add(2, + 2, + fn_ok, + fn_rej, + nullptr, + "", + FunctionDescriptorBuilder::Empty(), + TaskID::Nil(), + /*attempt_number=*/0, + {}); + queue.Add(3, + 2, + fn_ok, + fn_rej, + nullptr, + "", + FunctionDescriptorBuilder::Empty(), + TaskID::Nil(), + /*attempt_number=*/0, + {}); + queue.Add(1, + 2, + fn_ok, + fn_rej, + nullptr, + "", + FunctionDescriptorBuilder::Empty(), + TaskID::Nil(), + /*attempt_number=*/0, + {}); io_service.run(); ASSERT_EQ(n_ok, 1); ASSERT_EQ(n_rej, 2); + + queue.Stop(); } TEST(SchedulingQueueTest, TestCancelQueuedTask) { @@ -226,6 +437,200 @@ TEST(SchedulingQueueTest, TestCancelQueuedTask) { queue->ScheduleRequests(); ASSERT_EQ(n_ok, 4); ASSERT_EQ(n_rej, 1); + + queue->Stop(); +} + +TEST(OutOfOrderActorSchedulingQueueTest, TestSameTaskMultipleAttempts) { + // Test that if multiple attempts of the same task are received, + // the next attempt only runs after the previous attempt finishes. + instrumented_io_context io_service; + MockWaiter waiter; + OutOfOrderActorSchedulingQueue queue( + io_service, + waiter, + std::make_shared>( + std::vector(), + /*max_concurrency_for_default_concurrency_group=*/100), + std::make_shared>(), + /*is_asyncio=*/false, + /*fiber_max_concurrency=*/1, + /*concurrency_groups=*/{}); + JobID job_id = JobID::FromInt(1); + TaskID task_id = TaskID::FromRandom(job_id); + + std::promise attempt_1_start_promise; + std::promise attempt_1_finish_promise; + auto fn_ok_1 = [&attempt_1_start_promise, + &attempt_1_finish_promise](rpc::SendReplyCallback callback) { + attempt_1_start_promise.set_value(); + attempt_1_finish_promise.get_future().wait(); + }; + std::promise attempt_2_start_promise; + auto fn_ok_2 = [&attempt_2_start_promise](rpc::SendReplyCallback callback) { + attempt_2_start_promise.set_value(); + }; + int n_rej = 0; + auto fn_rej = [&n_rej](const Status &status, rpc::SendReplyCallback callback) { + n_rej++; + }; + queue.Add(-1, + -1, + fn_ok_1, + fn_rej, + nullptr, + "", + FunctionDescriptorBuilder::Empty(), + task_id, + /*attempt_number=*/1, + {}); + attempt_1_start_promise.get_future().wait(); + queue.Add(-1, + -1, + fn_ok_2, + fn_rej, + nullptr, + "", + FunctionDescriptorBuilder::Empty(), + task_id, + /*attempt_number=*/2, + {}); + io_service.poll(); + // Attempt 2 should only start after attempt 1 finishes. + auto attempt_2_start_future = attempt_2_start_promise.get_future(); + ASSERT_TRUE(attempt_2_start_future.wait_for(1s) == std::future_status::timeout); + + // Finish attempt 1 so attempt 2 can run. + attempt_1_finish_promise.set_value(); + while (attempt_2_start_future.wait_for(1s) != std::future_status::ready) { + io_service.restart(); + io_service.poll(); + } + + ASSERT_EQ(n_rej, 0); + auto no_leak = [&queue] { + absl::MutexLock lock(&queue.mu_); + return queue.queued_actor_tasks_.empty() && + queue.pending_task_id_to_is_canceled.empty(); + }; + ASSERT_TRUE(WaitForCondition(no_leak, 10000)); + + queue.Stop(); +} + +TEST(OutOfOrderActorSchedulingQueueTest, TestSameTaskMultipleAttemptsCancellation) { + instrumented_io_context io_service; + MockWaiter waiter; + OutOfOrderActorSchedulingQueue queue( + io_service, + waiter, + std::make_shared>( + std::vector(), + /*max_concurrency_for_default_concurrency_group=*/100), + std::make_shared>(), + /*is_asyncio=*/false, + /*fiber_max_concurrency=*/1, + /*concurrency_groups=*/{}); + JobID job_id = JobID::FromInt(1); + TaskID task_id = TaskID::FromRandom(job_id); + + std::promise attempt_1_start_promise; + std::promise attempt_1_finish_promise; + auto fn_ok_1 = [&attempt_1_start_promise, + &attempt_1_finish_promise](rpc::SendReplyCallback callback) { + attempt_1_start_promise.set_value(); + attempt_1_finish_promise.get_future().wait(); + }; + auto fn_rej_1 = [](const Status &status, rpc::SendReplyCallback callback) { + ASSERT_FALSE(true); + }; + queue.Add(-1, + -1, + fn_ok_1, + fn_rej_1, + nullptr, + "", + FunctionDescriptorBuilder::Empty(), + task_id, + /*attempt_number=*/1, + {}); + attempt_1_start_promise.get_future().wait(); + + auto fn_ok_2 = [](rpc::SendReplyCallback callback) { ASSERT_FALSE(true); }; + std::atomic attempt_2_cancelled = false; + auto fn_rej_2 = [&attempt_2_cancelled](const Status &status, + rpc::SendReplyCallback callback) { + ASSERT_TRUE(status.IsSchedulingCancelled()); + attempt_2_cancelled.store(true); + }; + queue.Add(-1, + -1, + fn_ok_2, + fn_rej_2, + nullptr, + "", + FunctionDescriptorBuilder::Empty(), + task_id, + /*attempt_number=*/2, + {}); + + auto fn_ok_4 = [](rpc::SendReplyCallback callback) { ASSERT_FALSE(true); }; + std::atomic attempt_4_cancelled = false; + auto fn_rej_4 = [&attempt_4_cancelled](const Status &status, + rpc::SendReplyCallback callback) { + ASSERT_TRUE(status.IsSchedulingCancelled()); + attempt_4_cancelled.store(true); + }; + // Adding attempt 4 should cancel the old attempt 2 + queue.Add(-1, + -1, + fn_ok_4, + fn_rej_4, + nullptr, + "", + FunctionDescriptorBuilder::Empty(), + task_id, + /*attempt_number=*/4, + {}); + ASSERT_TRUE(attempt_2_cancelled.load()); + + auto fn_ok_3 = [](rpc::SendReplyCallback callback) { ASSERT_FALSE(true); }; + std::atomic attempt_3_cancelled = false; + auto fn_rej_3 = [&attempt_3_cancelled](const Status &status, + rpc::SendReplyCallback callback) { + ASSERT_TRUE(status.IsSchedulingCancelled()); + attempt_3_cancelled.store(true); + }; + // Attempt 3 should be cancelled immediately since there is attempt 4 + // in the queue. + queue.Add(-1, + -1, + fn_ok_3, + fn_rej_3, + nullptr, + "", + FunctionDescriptorBuilder::Empty(), + task_id, + /*attempt_number=*/3, + {}); + ASSERT_TRUE(attempt_3_cancelled.load()); + + // Attempt 4 should be cancelled. + queue.CancelTaskIfFound(task_id); + attempt_1_finish_promise.set_value(); + while (!attempt_4_cancelled.load()) { + io_service.restart(); + io_service.poll(); + } + + auto no_leak = [&queue] { + absl::MutexLock lock(&queue.mu_); + return queue.queued_actor_tasks_.empty() && + queue.pending_task_id_to_is_canceled.empty(); + }; + ASSERT_TRUE(WaitForCondition(no_leak, 10000)); + + queue.Stop(); } } // namespace core diff --git a/src/ray/core_worker/transport/actor_scheduling_queue.cc b/src/ray/core_worker/transport/actor_scheduling_queue.cc index 47e7af1a4c8c..4b3e74248324 100644 --- a/src/ray/core_worker/transport/actor_scheduling_queue.cc +++ b/src/ray/core_worker/transport/actor_scheduling_queue.cc @@ -77,6 +77,7 @@ void ActorSchedulingQueue::Add( const std::string &concurrency_group_name, const ray::FunctionDescriptor &function_descriptor, TaskID task_id, + uint64_t attempt_number, const std::vector &dependencies) { // A seq_no of -1 means no ordering constraint. Actor tasks must be executed in order. RAY_CHECK(seq_no != -1); @@ -93,7 +94,8 @@ void ActorSchedulingQueue::Add( std::move(reject_request), std::move(send_reply_callback), task_id, - dependencies.size() > 0, + attempt_number, + dependencies, concurrency_group_name, function_descriptor); { diff --git a/src/ray/core_worker/transport/actor_scheduling_queue.h b/src/ray/core_worker/transport/actor_scheduling_queue.h index f73d80fbae55..a62c96632073 100644 --- a/src/ray/core_worker/transport/actor_scheduling_queue.h +++ b/src/ray/core_worker/transport/actor_scheduling_queue.h @@ -64,6 +64,7 @@ class ActorSchedulingQueue : public SchedulingQueue { const std::string &concurrency_group_name, const ray::FunctionDescriptor &function_descriptor, TaskID task_id = TaskID::Nil(), + uint64_t attempt_number = 0, const std::vector &dependencies = {}) override; /// Cancel the actor task in the queue. diff --git a/src/ray/core_worker/transport/actor_scheduling_util.cc b/src/ray/core_worker/transport/actor_scheduling_util.cc index 217214f5d688..50e11eb764d2 100644 --- a/src/ray/core_worker/transport/actor_scheduling_util.cc +++ b/src/ray/core_worker/transport/actor_scheduling_util.cc @@ -24,31 +24,37 @@ InboundRequest::InboundRequest( std::function reject_callback, rpc::SendReplyCallback send_reply_callback, class TaskID task_id, - bool has_dependencies, + uint64_t attempt_number, + const std::vector &dependencies, const std::string &concurrency_group_name, const ray::FunctionDescriptor &function_descriptor) : accept_callback_(std::move(accept_callback)), reject_callback_(std::move(reject_callback)), send_reply_callback_(std::move(send_reply_callback)), task_id_(task_id), + attempt_number_(attempt_number), concurrency_group_name_(concurrency_group_name), function_descriptor_(function_descriptor), - has_pending_dependencies_(has_dependencies) {} + pending_dependencies_(dependencies) {} void InboundRequest::Accept() { accept_callback_(std::move(send_reply_callback_)); } void InboundRequest::Cancel(const Status &status) { reject_callback_(status, std::move(send_reply_callback_)); } -bool InboundRequest::CanExecute() const { return !has_pending_dependencies_; } +bool InboundRequest::CanExecute() const { return pending_dependencies_.empty(); } ray::TaskID InboundRequest::TaskID() const { return task_id_; } +uint64_t InboundRequest::AttemptNumber() const { return attempt_number_; } const std::string &InboundRequest::ConcurrencyGroupName() const { return concurrency_group_name_; } const ray::FunctionDescriptor &InboundRequest::FunctionDescriptor() const { return function_descriptor_; } -void InboundRequest::MarkDependenciesSatisfied() { has_pending_dependencies_ = false; } +const std::vector &InboundRequest::PendingDependencies() const { + return pending_dependencies_; +}; +void InboundRequest::MarkDependenciesSatisfied() { pending_dependencies_.clear(); } DependencyWaiterImpl::DependencyWaiterImpl(DependencyWaiterInterface &dependency_client) : dependency_client_(dependency_client) {} diff --git a/src/ray/core_worker/transport/actor_scheduling_util.h b/src/ray/core_worker/transport/actor_scheduling_util.h index bfa7cad2c08d..0488d0243a94 100644 --- a/src/ray/core_worker/transport/actor_scheduling_util.h +++ b/src/ray/core_worker/transport/actor_scheduling_util.h @@ -32,7 +32,8 @@ class InboundRequest { std::function reject_callback, rpc::SendReplyCallback send_reply_callback, TaskID task_id, - bool has_dependencies, + uint64_t attempt_number, + const std::vector &dependencies, const std::string &concurrency_group_name, const ray::FunctionDescriptor &function_descriptor); @@ -40,9 +41,11 @@ class InboundRequest { void Cancel(const Status &status); bool CanExecute() const; ray::TaskID TaskID() const; + uint64_t AttemptNumber() const; const std::string &ConcurrencyGroupName() const; const ray::FunctionDescriptor &FunctionDescriptor() const; void MarkDependenciesSatisfied(); + const std::vector &PendingDependencies() const; private: std::function accept_callback_; @@ -50,9 +53,10 @@ class InboundRequest { rpc::SendReplyCallback send_reply_callback_; ray::TaskID task_id_; + uint64_t attempt_number_; std::string concurrency_group_name_; ray::FunctionDescriptor function_descriptor_; - bool has_pending_dependencies_; + std::vector pending_dependencies_; }; /// Waits for an object dependency to become available. Abstract for testing. diff --git a/src/ray/core_worker/transport/normal_scheduling_queue.cc b/src/ray/core_worker/transport/normal_scheduling_queue.cc index 93505b942325..0b94ebdb61a8 100644 --- a/src/ray/core_worker/transport/normal_scheduling_queue.cc +++ b/src/ray/core_worker/transport/normal_scheduling_queue.cc @@ -44,6 +44,7 @@ void NormalSchedulingQueue::Add( const std::string &concurrency_group_name, const FunctionDescriptor &function_descriptor, TaskID task_id, + uint64_t attempt_number, const std::vector &dependencies) { absl::MutexLock lock(&mu_); // Normal tasks should not have ordering constraints. @@ -54,7 +55,8 @@ void NormalSchedulingQueue::Add( std::move(reject_request), std::move(send_reply_callback), task_id, - dependencies.size() > 0, + attempt_number, + dependencies, /*concurrency_group_name=*/"", function_descriptor)); } diff --git a/src/ray/core_worker/transport/normal_scheduling_queue.h b/src/ray/core_worker/transport/normal_scheduling_queue.h index 7d5c7d2464b0..3e0af1126a08 100644 --- a/src/ray/core_worker/transport/normal_scheduling_queue.h +++ b/src/ray/core_worker/transport/normal_scheduling_queue.h @@ -46,6 +46,7 @@ class NormalSchedulingQueue : public SchedulingQueue { const std::string &concurrency_group_name, const FunctionDescriptor &function_descriptor, TaskID task_id, + uint64_t attempt_number, const std::vector &dependencies) override; // Search for an InboundRequest associated with the task that we are trying to cancel. diff --git a/src/ray/core_worker/transport/out_of_order_actor_scheduling_queue.cc b/src/ray/core_worker/transport/out_of_order_actor_scheduling_queue.cc index acb6e96a2fae..40c8703a8a27 100644 --- a/src/ray/core_worker/transport/out_of_order_actor_scheduling_queue.cc +++ b/src/ray/core_worker/transport/out_of_order_actor_scheduling_queue.cc @@ -25,7 +25,8 @@ OutOfOrderActorSchedulingQueue::OutOfOrderActorSchedulingQueue( bool is_asyncio, int fiber_max_concurrency, const std::vector &concurrency_groups) - : main_thread_id_(boost::this_thread::get_id()), + : io_service_(main_io_service), + main_thread_id_(boost::this_thread::get_id()), waiter_(waiter), pool_manager_(pool_manager), fiber_state_manager_(fiber_state_manager), @@ -51,15 +52,19 @@ void OutOfOrderActorSchedulingQueue::Stop() { } bool OutOfOrderActorSchedulingQueue::TaskQueueEmpty() const { - RAY_CHECK(false) << "TaskQueueEmpty() not implemented for actor queues"; + RAY_LOG(FATAL) << "TaskQueueEmpty() not implemented for actor queues"; return false; } size_t OutOfOrderActorSchedulingQueue::Size() const { - RAY_CHECK(false) << "Size() not implemented for actor queues"; + RAY_LOG(FATAL) << "Size() not implemented for actor queues"; return 0; } +void OutOfOrderActorSchedulingQueue::ScheduleRequests() { + RAY_LOG(FATAL) << "ScheduleRequests() not implemented for actor queues"; +} + void OutOfOrderActorSchedulingQueue::Add( int64_t seq_no, int64_t client_processed_up_to, @@ -69,31 +74,61 @@ void OutOfOrderActorSchedulingQueue::Add( const std::string &concurrency_group_name, const ray::FunctionDescriptor &function_descriptor, TaskID task_id, + uint64_t attempt_number, const std::vector &dependencies) { + // Add and execute a task. For different attempts of the same + // task id, if an attempt is running, the other attempt will + // wait until the first attempt finishes so that no more + // than one attempt of the same task run at the same time. + // The reason why we don't run multiple attempts of the same + // task concurrently is that it's not safe to assume user's + // code can handle concurrent execution of the same actor method. RAY_CHECK(boost::this_thread::get_id() == main_thread_id_); auto request = InboundRequest(std::move(accept_request), std::move(reject_request), std::move(send_reply_callback), task_id, - dependencies.size() > 0, + attempt_number, + dependencies, concurrency_group_name, function_descriptor); + bool run_request = true; + std::optional request_to_cancel; { absl::MutexLock lock(&mu_); - pending_task_id_to_is_canceled.emplace(task_id, false); + if (pending_task_id_to_is_canceled.contains(task_id)) { + // There is a previous attempt of the same task running, + // queue the current attempt. + run_request = false; + + if (queued_actor_tasks_.contains(task_id)) { + // There is already an attempt of the same task queued, + // keep the one with larger attempt number and cancel the other one. + RAY_CHECK_NE(queued_actor_tasks_[task_id].AttemptNumber(), + request.AttemptNumber()); + if (queued_actor_tasks_[task_id].AttemptNumber() > request.AttemptNumber()) { + // This can happen if the PushTaskRequest arrives out of order. + request_to_cancel = request; + } else { + request_to_cancel = queued_actor_tasks_[task_id]; + queued_actor_tasks_[task_id] = request; + } + } else { + queued_actor_tasks_[task_id] = request; + } + } else { + pending_task_id_to_is_canceled.emplace(task_id, false); + run_request = true; + } } - if (dependencies.size() > 0) { - waiter_.Wait(dependencies, [this, request = std::move(request)]() mutable { - RAY_CHECK(boost::this_thread::get_id() == main_thread_id_); - request.MarkDependenciesSatisfied(); - pending_actor_tasks_.push_back(std::move(request)); - ScheduleRequests(); - }); - } else { - request.MarkDependenciesSatisfied(); - pending_actor_tasks_.push_back(std::move(request)); - ScheduleRequests(); + if (run_request) { + RunRequest(std::move(request)); + } + + if (request_to_cancel.has_value()) { + request_to_cancel->Cancel(Status::SchedulingCancelled( + "In favor of the same task with larger attempt number")); } } @@ -109,32 +144,44 @@ bool OutOfOrderActorSchedulingQueue::CancelTaskIfFound(TaskID task_id) { } } -/// Schedules as many requests as possible in sequence. -void OutOfOrderActorSchedulingQueue::ScheduleRequests() { - while (!pending_actor_tasks_.empty()) { - auto request = pending_actor_tasks_.front(); - const auto task_id = request.TaskID(); - if (is_asyncio_) { - // Process async actor task. - auto fiber = fiber_state_manager_->GetExecutor(request.ConcurrencyGroupName(), - request.FunctionDescriptor()); - fiber->EnqueueFiber([this, request, task_id]() mutable { - AcceptRequestOrRejectIfCanceled(task_id, request); - }); +void OutOfOrderActorSchedulingQueue::RunRequestWithSatisfiedDependencies( + InboundRequest &request) { + RAY_CHECK(request.CanExecute()); + const auto task_id = request.TaskID(); + if (is_asyncio_) { + // Process async actor task. + auto fiber = fiber_state_manager_->GetExecutor(request.ConcurrencyGroupName(), + request.FunctionDescriptor()); + fiber->EnqueueFiber([this, request, task_id]() mutable { + AcceptRequestOrRejectIfCanceled(task_id, request); + }); + } else { + // Process actor tasks. + RAY_CHECK(pool_manager_ != nullptr); + auto pool = pool_manager_->GetExecutor(request.ConcurrencyGroupName(), + request.FunctionDescriptor()); + if (pool == nullptr) { + AcceptRequestOrRejectIfCanceled(task_id, request); } else { - // Process actor tasks. - RAY_CHECK(pool_manager_ != nullptr); - auto pool = pool_manager_->GetExecutor(request.ConcurrencyGroupName(), - request.FunctionDescriptor()); - if (pool == nullptr) { + pool->Post([this, request, task_id]() mutable { AcceptRequestOrRejectIfCanceled(task_id, request); - } else { - pool->Post([this, request, task_id]() mutable { - AcceptRequestOrRejectIfCanceled(task_id, request); - }); - } + }); } - pending_actor_tasks_.pop_front(); + } +} + +void OutOfOrderActorSchedulingQueue::RunRequest(InboundRequest request) { + if (!request.PendingDependencies().empty()) { + // Make a copy since request is going to be moved. + auto dependencies = request.PendingDependencies(); + waiter_.Wait(dependencies, [this, request = std::move(request)]() mutable { + RAY_CHECK_EQ(boost::this_thread::get_id(), main_thread_id_); + request.MarkDependenciesSatisfied(); + RunRequestWithSatisfiedDependencies(request); + }); + } else { + request.MarkDependenciesSatisfied(); + RunRequestWithSatisfiedDependencies(request); } } @@ -157,8 +204,24 @@ void OutOfOrderActorSchedulingQueue::AcceptRequestOrRejectIfCanceled( request.Accept(); } - absl::MutexLock lock(&mu_); - pending_task_id_to_is_canceled.erase(task_id); + std::optional request_to_run; + { + absl::MutexLock lock(&mu_); + if (queued_actor_tasks_.contains(task_id)) { + request_to_run = queued_actor_tasks_[task_id]; + queued_actor_tasks_.erase(task_id); + } else { + pending_task_id_to_is_canceled.erase(task_id); + } + } + + if (request_to_run.has_value()) { + io_service_.post( + [this, request = std::move(*request_to_run)]() mutable { + RunRequest(std::move(request)); + }, + "OutOfOrderActorSchedulingQueue.RunRequest"); + } } } // namespace core diff --git a/src/ray/core_worker/transport/out_of_order_actor_scheduling_queue.h b/src/ray/core_worker/transport/out_of_order_actor_scheduling_queue.h index acbf17e2af90..ba7876c6b272 100644 --- a/src/ray/core_worker/transport/out_of_order_actor_scheduling_queue.h +++ b/src/ray/core_worker/transport/out_of_order_actor_scheduling_queue.h @@ -60,6 +60,7 @@ class OutOfOrderActorSchedulingQueue : public SchedulingQueue { const std::string &concurrency_group_name, const ray::FunctionDescriptor &function_descriptor, TaskID task_id = TaskID::Nil(), + uint64_t attempt_number = 0, const std::vector &dependencies = {}) override; /// Cancel the actor task in the queue. @@ -72,12 +73,15 @@ class OutOfOrderActorSchedulingQueue : public SchedulingQueue { void ScheduleRequests() override; private: + void RunRequest(InboundRequest request); + + void RunRequestWithSatisfiedDependencies(InboundRequest &request); + /// Accept the given InboundRequest or reject it if a task id is canceled via /// CancelTaskIfFound. void AcceptRequestOrRejectIfCanceled(TaskID task_id, InboundRequest &request); - /// The queue stores all the pending tasks. - std::deque pending_actor_tasks_; + instrumented_io_context &io_service_; /// The id of the thread that constructed this scheduling queue. boost::thread::id main_thread_id_; /// Reference to the waiter owned by the task receiver. @@ -92,11 +96,19 @@ class OutOfOrderActorSchedulingQueue : public SchedulingQueue { bool is_asyncio_ = false; /// Mutext to protect attributes used for thread safe APIs. absl::Mutex mu_; + /// This stores all the tasks that have previous attempts that are pending. + /// They are queued and will be executed after the previous attempt finishes. + /// This can happen if transient network error happens after an actor + /// task is submitted and recieved by the actor and the caller retries + /// the same task. + absl::flat_hash_map queued_actor_tasks_ ABSL_GUARDED_BY(mu_); /// A map of actor task IDs -> is_canceled. // Pending means tasks are queued or running. absl::flat_hash_map pending_task_id_to_is_canceled ABSL_GUARDED_BY(mu_); - friend class SchedulingQueueTest; + FRIEND_TEST(OutOfOrderActorSchedulingQueueTest, TestSameTaskMultipleAttempts); + FRIEND_TEST(OutOfOrderActorSchedulingQueueTest, + TestSameTaskMultipleAttemptsCancellation); }; } // namespace core diff --git a/src/ray/core_worker/transport/scheduling_queue.h b/src/ray/core_worker/transport/scheduling_queue.h index c5133831a36b..0fc56918669f 100644 --- a/src/ray/core_worker/transport/scheduling_queue.h +++ b/src/ray/core_worker/transport/scheduling_queue.h @@ -36,6 +36,7 @@ class SchedulingQueue { const std::string &concurrency_group_name, const ray::FunctionDescriptor &function_descriptor, TaskID task_id = TaskID::Nil(), + uint64_t attempt_number = 0, const std::vector &dependencies = {}) = 0; virtual void ScheduleRequests() = 0; virtual bool TaskQueueEmpty() const = 0; diff --git a/src/ray/core_worker/transport/task_receiver.cc b/src/ray/core_worker/transport/task_receiver.cc index 580e7b70768e..a6b4ab9a293a 100644 --- a/src/ray/core_worker/transport/task_receiver.cc +++ b/src/ray/core_worker/transport/task_receiver.cc @@ -270,6 +270,7 @@ void TaskReceiver::HandleTask(const rpc::PushTaskRequest &request, task_spec.ConcurrencyGroupName(), task_spec.FunctionDescriptor(), task_spec.TaskId(), + task_spec.AttemptNumber(), dependencies); } else { // Add the normal task's callbacks to the non-actor scheduling queue. @@ -283,6 +284,7 @@ void TaskReceiver::HandleTask(const rpc::PushTaskRequest &request, "", task_spec.FunctionDescriptor(), task_spec.TaskId(), + task_spec.AttemptNumber(), dependencies); } }