diff --git a/src/mock/ray/gcs/gcs_server/gcs_actor_scheduler.h b/src/mock/ray/gcs/gcs_server/gcs_actor_scheduler.h index 4ae0a06ad97f..2715c57849eb 100644 --- a/src/mock/ray/gcs/gcs_server/gcs_actor_scheduler.h +++ b/src/mock/ray/gcs/gcs_server/gcs_actor_scheduler.h @@ -30,7 +30,7 @@ class MockGcsActorSchedulerInterface : public GcsActorSchedulerInterface { (override)); MOCK_METHOD( void, - ReleaseUnusedWorkers, + ReleaseUnusedActorWorkers, ((const absl::flat_hash_map> &node_to_workers)), (override)); }; @@ -70,7 +70,7 @@ class MockGcsActorScheduler : public GcsActorScheduler { (override)); MOCK_METHOD( void, - ReleaseUnusedWorkers, + ReleaseUnusedActorWorkers, ((const absl::flat_hash_map> &node_to_workers)), (override)); MOCK_METHOD(void, diff --git a/src/mock/ray/raylet_client/raylet_client.h b/src/mock/ray/raylet_client/raylet_client.h index 7d566d91897e..de30333bdf78 100644 --- a/src/mock/ray/raylet_client/raylet_client.h +++ b/src/mock/ray/raylet_client/raylet_client.h @@ -49,9 +49,9 @@ class MockRayletClientInterface : public RayletClientInterface { const rpc::ClientCallback &callback), (override)); MOCK_METHOD(void, - ReleaseUnusedWorkers, + ReleaseUnusedActorWorkers, (const std::vector &workers_in_use, - const rpc::ClientCallback &callback), + const rpc::ClientCallback &callback), (override)); MOCK_METHOD(void, CancelWorkerLease, diff --git a/src/ray/core_worker/test/direct_task_transport_test.cc b/src/ray/core_worker/test/direct_task_transport_test.cc index 8cc2961dc0fd..8f11897ad1ca 100644 --- a/src/ray/core_worker/test/direct_task_transport_test.cc +++ b/src/ray/core_worker/test/direct_task_transport_test.cc @@ -233,9 +233,10 @@ class MockRayletClient : public WorkerLeaseInterface { callbacks.push_back(callback); } - void ReleaseUnusedWorkers( + void ReleaseUnusedActorWorkers( const std::vector &workers_in_use, - const rpc::ClientCallback &callback) override {} + const rpc::ClientCallback &callback) override { + } void CancelWorkerLease( const TaskID &task_id, diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.cc b/src/ray/gcs/gcs_server/gcs_actor_manager.cc index a0b743e353bd..196c9c8d3fa4 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.cc @@ -1480,7 +1480,7 @@ void GcsActorManager::Initialize(const GcsInitData &gcs_init_data) { }); // Notify raylets to release unused workers. - gcs_actor_scheduler_->ReleaseUnusedWorkers(node_to_workers); + gcs_actor_scheduler_->ReleaseUnusedActorWorkers(node_to_workers); RAY_LOG(DEBUG) << "The number of registered actors is " << registered_actors_.size() << ", and the number of created actors is " << created_actors_.size(); diff --git a/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc b/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc index 4d4ba0812c2d..b4279d74f0e3 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc @@ -271,7 +271,7 @@ ActorID GcsActorScheduler::CancelOnWorker(const NodeID &node_id, return assigned_actor_id; } -void GcsActorScheduler::ReleaseUnusedWorkers( +void GcsActorScheduler::ReleaseUnusedActorWorkers( const absl::flat_hash_map> &node_to_workers) { // The purpose of this function is to release leased workers that may be leaked. // When GCS restarts, it doesn't know which workers it has leased in the previous @@ -291,7 +291,7 @@ void GcsActorScheduler::ReleaseUnusedWorkers( auto lease_client = GetOrConnectLeaseClient(address); auto release_unused_workers_callback = [this, node_id](const Status &status, - const rpc::ReleaseUnusedWorkersReply &reply) { + const rpc::ReleaseUnusedActorWorkersReply &reply) { nodes_of_releasing_unused_workers_.erase(node_id); }; auto iter = node_to_workers.find(alive_node.first); @@ -300,7 +300,8 @@ void GcsActorScheduler::ReleaseUnusedWorkers( // nodes do not have leased workers. In this case, GCS will send an empty list. auto workers_in_use = iter != node_to_workers.end() ? iter->second : std::vector{}; - lease_client->ReleaseUnusedWorkers(workers_in_use, release_unused_workers_callback); + lease_client->ReleaseUnusedActorWorkers(workers_in_use, + release_unused_workers_callback); } } @@ -313,7 +314,7 @@ void GcsActorScheduler::LeaseWorkerFromNode(std::shared_ptr actor, << actor->GetActorID() << ", job id = " << actor->GetActorID().JobId(); // We need to ensure that the RequestWorkerLease won't be sent before the reply of - // ReleaseUnusedWorkers is returned. + // ReleaseUnusedActorWorkers is returned. if (nodes_of_releasing_unused_workers_.contains(node_id)) { RetryLeasingWorkerFromNode(actor, node); return; diff --git a/src/ray/gcs/gcs_server/gcs_actor_scheduler.h b/src/ray/gcs/gcs_server/gcs_actor_scheduler.h index eeb51ee2686d..25ee4b7687b0 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_scheduler.h +++ b/src/ray/gcs/gcs_server/gcs_actor_scheduler.h @@ -82,7 +82,7 @@ class GcsActorSchedulerInterface { /// Notify raylets to release unused workers. /// /// \param node_to_workers Workers used by each node. - virtual void ReleaseUnusedWorkers( + virtual void ReleaseUnusedActorWorkers( const absl::flat_hash_map> &node_to_workers) = 0; /// Handle the destruction of an actor. @@ -178,7 +178,7 @@ class GcsActorScheduler : public GcsActorSchedulerInterface { /// Notify raylets to release unused workers. /// /// \param node_to_workers Workers used by each node. - void ReleaseUnusedWorkers( + void ReleaseUnusedActorWorkers( const absl::flat_hash_map> &node_to_workers) override; /// Handle the destruction of an actor. @@ -418,7 +418,7 @@ class GcsActorScheduler : public GcsActorSchedulerInterface { FRIEND_TEST(GcsActorSchedulerTest, TestWorkerFailedWhenCreating); FRIEND_TEST(GcsActorSchedulerTest, TestSpillback); FRIEND_TEST(GcsActorSchedulerTest, TestReschedule); - FRIEND_TEST(GcsActorSchedulerTest, TestReleaseUnusedWorkers); + FRIEND_TEST(GcsActorSchedulerTest, TestReleaseUnusedActorWorkers); FRIEND_TEST(GcsActorSchedulerTest, TestScheduleFailedWithZeroNodeByGcs); FRIEND_TEST(GcsActorSchedulerTest, TestNotEnoughClusterResources); FRIEND_TEST(GcsActorSchedulerTest, TestScheduleAndDestroyOneActor); @@ -431,7 +431,7 @@ class GcsActorScheduler : public GcsActorSchedulerInterface { FRIEND_TEST(GcsActorSchedulerTest, TestNodeFailedWhenCreatingByGcs); FRIEND_TEST(GcsActorSchedulerTest, TestWorkerFailedWhenCreatingByGcs); FRIEND_TEST(GcsActorSchedulerTest, TestRescheduleByGcs); - FRIEND_TEST(GcsActorSchedulerTest, TestReleaseUnusedWorkersByGcs); + FRIEND_TEST(GcsActorSchedulerTest, TestReleaseUnusedActorWorkersByGcs); friend class GcsActorSchedulerMockTest; FRIEND_TEST(GcsActorSchedulerMockTest, KillWorkerLeak1); diff --git a/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc index 9a069e208303..b58ca7f6abe3 100644 --- a/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc @@ -37,7 +37,7 @@ class MockActorScheduler : public gcs::GcsActorSchedulerInterface { void Schedule(std::shared_ptr actor) { actors.push_back(actor); } void Reschedule(std::shared_ptr actor) {} - void ReleaseUnusedWorkers( + void ReleaseUnusedActorWorkers( const absl::flat_hash_map> &node_to_workers) {} void OnActorDestruction(std::shared_ptr actor) { const auto &actor_id = actor->GetActorID(); diff --git a/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc b/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc index 557a23c7fddc..c14497db7eaa 100644 --- a/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc @@ -601,9 +601,9 @@ TEST_F(GcsActorSchedulerTest, TestReschedule) { ASSERT_EQ(2, success_actors_.size()); } -TEST_F(GcsActorSchedulerTest, TestReleaseUnusedWorkers) { +TEST_F(GcsActorSchedulerTest, TestReleaseUnusedActorWorkers) { // Test the case that GCS won't send `RequestWorkerLease` request to the raylet, - // if there is still a pending `ReleaseUnusedWorkers` request. + // if there is still a pending `ReleaseUnusedActorWorkers` request. // Add a node to the cluster. auto node = Mocker::GenNodeInfo(); @@ -611,18 +611,18 @@ TEST_F(GcsActorSchedulerTest, TestReleaseUnusedWorkers) { gcs_node_manager_->AddNode(node); ASSERT_EQ(1, gcs_node_manager_->GetAllAliveNodes().size()); - // Send a `ReleaseUnusedWorkers` request to the node. + // Send a `ReleaseUnusedActorWorkers` request to the node. absl::flat_hash_map> node_to_workers; node_to_workers[node_id].push_back({WorkerID::FromRandom()}); - gcs_actor_scheduler_->ReleaseUnusedWorkers(node_to_workers); + gcs_actor_scheduler_->ReleaseUnusedActorWorkers(node_to_workers); ASSERT_EQ(1, raylet_client_->num_release_unused_workers); ASSERT_EQ(1, raylet_client_->release_callbacks.size()); // Schedule an actor which is not tied to a worker, this should invoke the // `LeaseWorkerFromNode` method. - // But since the `ReleaseUnusedWorkers` request hasn't finished, `GcsActorScheduler` - // won't send `RequestWorkerLease` request to node immediately. But instead, it will - // invoke the `RetryLeasingWorkerFromNode` to retry later. + // But since the `ReleaseUnusedActorWorkers` request hasn't finished, + // `GcsActorScheduler` won't send `RequestWorkerLease` request to node immediately. But + // instead, it will invoke the `RetryLeasingWorkerFromNode` to retry later. auto job_id = JobID::FromInt(1); auto request = Mocker::GenCreateActorRequest(job_id); auto actor = std::make_shared(request.task_spec(), "", counter); @@ -630,9 +630,9 @@ TEST_F(GcsActorSchedulerTest, TestReleaseUnusedWorkers) { ASSERT_EQ(2, gcs_actor_scheduler_->num_retry_leasing_count_); ASSERT_EQ(raylet_client_->num_workers_requested, 0); - // When `GcsActorScheduler` receives the `ReleaseUnusedWorkers` reply, it will send + // When `GcsActorScheduler` receives the `ReleaseUnusedActorWorkers` reply, it will send // out the `RequestWorkerLease` request. - ASSERT_TRUE(raylet_client_->ReplyReleaseUnusedWorkers()); + ASSERT_TRUE(raylet_client_->ReplyReleaseUnusedActorWorkers()); gcs_actor_scheduler_->TryLeaseWorkerFromNodeAgain(actor, node); ASSERT_EQ(raylet_client_->num_workers_requested, 1); } @@ -1150,9 +1150,9 @@ TEST_F(GcsActorSchedulerTest, TestRescheduleByGcs) { ASSERT_EQ(2, success_actors_.size()); } -TEST_F(GcsActorSchedulerTest, TestReleaseUnusedWorkersByGcs) { +TEST_F(GcsActorSchedulerTest, TestReleaseUnusedActorWorkersByGcs) { // Test the case that GCS won't send `RequestWorkerLease` request to the raylet, - // if there is still a pending `ReleaseUnusedWorkers` request. + // if there is still a pending `ReleaseUnusedActorWorkers` request. // Add a node to the cluster. // Add a node with 64 memory units and 8 CPU. @@ -1162,19 +1162,19 @@ TEST_F(GcsActorSchedulerTest, TestReleaseUnusedWorkersByGcs) { auto node_id = NodeID::FromBinary(node->node_id()); ASSERT_EQ(1, gcs_node_manager_->GetAllAliveNodes().size()); - // Send a `ReleaseUnusedWorkers` request to the node. + // Send a `ReleaseUnusedActorWorkers` request to the node. absl::flat_hash_map> node_to_workers; node_to_workers[node_id].push_back({WorkerID::FromRandom()}); - gcs_actor_scheduler_->ReleaseUnusedWorkers(node_to_workers); + gcs_actor_scheduler_->ReleaseUnusedActorWorkers(node_to_workers); ASSERT_EQ(1, raylet_client_->num_release_unused_workers); ASSERT_EQ(1, raylet_client_->release_callbacks.size()); // Schedule an actor which is not tied to a worker, this should invoke the // `LeaseWorkerFromNode` method. - // But since the `ReleaseUnusedWorkers` request hasn't finished, `GcsActorScheduler` - // won't send `RequestWorkerLease` request to node immediately. But instead, it will - // invoke the `RetryLeasingWorkerFromNode` to retry later. - // Schedule a actor (requiring 32 memory units and 4 CPU). + // But since the `ReleaseUnusedActorWorkers` request hasn't finished, + // `GcsActorScheduler` won't send `RequestWorkerLease` request to node immediately. But + // instead, it will invoke the `RetryLeasingWorkerFromNode` to retry later. Schedule a + // actor (requiring 32 memory units and 4 CPU). std::unordered_map required_placement_resources = { {kMemory_ResourceLabel, 32}, {kCPU_ResourceLabel, 4}}; auto actor = NewGcsActor(required_placement_resources); @@ -1182,9 +1182,9 @@ TEST_F(GcsActorSchedulerTest, TestReleaseUnusedWorkersByGcs) { ASSERT_EQ(2, gcs_actor_scheduler_->num_retry_leasing_count_); ASSERT_EQ(raylet_client_->num_workers_requested, 0); - // When `GcsActorScheduler` receives the `ReleaseUnusedWorkers` reply, it will send + // When `GcsActorScheduler` receives the `ReleaseUnusedActorWorkers` reply, it will send // out the `RequestWorkerLease` request. - ASSERT_TRUE(raylet_client_->ReplyReleaseUnusedWorkers()); + ASSERT_TRUE(raylet_client_->ReplyReleaseUnusedActorWorkers()); gcs_actor_scheduler_->TryLeaseWorkerFromNodeAgain(actor, node); ASSERT_EQ(raylet_client_->num_workers_requested, 1); } diff --git a/src/ray/gcs/gcs_server/test/gcs_server_test_util.h b/src/ray/gcs/gcs_server/test/gcs_server_test_util.h index fcd6b6bf6a3a..5c954bedb5d7 100644 --- a/src/ray/gcs/gcs_server/test/gcs_server_test_util.h +++ b/src/ray/gcs/gcs_server/test/gcs_server_test_util.h @@ -100,9 +100,10 @@ struct GcsServerMocker { } /// WorkerLeaseInterface - void ReleaseUnusedWorkers( + void ReleaseUnusedActorWorkers( const std::vector &workers_in_use, - const rpc::ClientCallback &callback) override { + const rpc::ClientCallback &callback) + override { num_release_unused_workers += 1; release_callbacks.push_back(callback); } @@ -188,8 +189,8 @@ struct GcsServerMocker { } } - bool ReplyReleaseUnusedWorkers() { - rpc::ReleaseUnusedWorkersReply reply; + bool ReplyReleaseUnusedActorWorkers() { + rpc::ReleaseUnusedActorWorkersReply reply; if (release_callbacks.size() == 0) { return false; } else { @@ -335,7 +336,8 @@ struct GcsServerMocker { std::list> drain_raylet_callbacks = {}; std::list> callbacks = {}; std::list> cancel_callbacks = {}; - std::list> release_callbacks = {}; + std::list> + release_callbacks = {}; int num_lease_requested = 0; int num_return_requested = 0; int num_commit_requested = 0; diff --git a/src/ray/protobuf/node_manager.proto b/src/ray/protobuf/node_manager.proto index 925c227fa349..082660b2ba0a 100644 --- a/src/ray/protobuf/node_manager.proto +++ b/src/ray/protobuf/node_manager.proto @@ -138,11 +138,11 @@ message ReturnWorkerRequest { message ReturnWorkerReply { } -message ReleaseUnusedWorkersRequest { +message ReleaseUnusedActorWorkersRequest { repeated bytes worker_ids_in_use = 1; } -message ReleaseUnusedWorkersReply { +message ReleaseUnusedActorWorkersReply { } message ShutdownRayletRequest { @@ -383,8 +383,8 @@ service NodeManagerService { // that may be leaked. When GCS restarts, it doesn't know which workers it has leased // in the previous lifecycle. In this case, GCS will send a list of worker ids that // are still needed. And Raylet will release other leased workers. - rpc ReleaseUnusedWorkers(ReleaseUnusedWorkersRequest) - returns (ReleaseUnusedWorkersReply); + rpc ReleaseUnusedActorWorkers(ReleaseUnusedActorWorkersRequest) + returns (ReleaseUnusedActorWorkersReply); /// Shutdown the raylet (node manager). rpc ShutdownRaylet(ShutdownRayletRequest) returns (ShutdownRayletReply); // Request to drain the raylet. diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index a39eb4f2789c..1f87409ec830 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -2020,9 +2020,10 @@ void NodeManager::HandleShutdownRaylet(rpc::ShutdownRayletRequest request, send_reply_callback(Status::OK(), shutdown_after_reply, shutdown_after_reply); } -void NodeManager::HandleReleaseUnusedWorkers(rpc::ReleaseUnusedWorkersRequest request, - rpc::ReleaseUnusedWorkersReply *reply, - rpc::SendReplyCallback send_reply_callback) { +void NodeManager::HandleReleaseUnusedActorWorkers( + rpc::ReleaseUnusedActorWorkersRequest request, + rpc::ReleaseUnusedActorWorkersReply *reply, + rpc::SendReplyCallback send_reply_callback) { std::unordered_set in_use_worker_ids; for (int index = 0; index < request.worker_ids_in_use_size(); ++index) { auto worker_id = WorkerID::FromBinary(request.worker_ids_in_use(index)); @@ -2031,8 +2032,7 @@ void NodeManager::HandleReleaseUnusedWorkers(rpc::ReleaseUnusedWorkersRequest re std::vector> unused_workers; for (auto &iter : leased_workers_) { - // We need to exclude workers used by common tasks. - // Because they are not used by GCS. + // We only kill *actor* workers. if (!iter.second->GetActorId().IsNil() && !in_use_worker_ids.count(iter.first)) { unused_workers.push_back(iter.second); } diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 74b95332fc77..2be12bdf7bab 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -533,12 +533,13 @@ class NodeManager : public rpc::NodeManagerServiceHandler, rpc::ReturnWorkerReply *reply, rpc::SendReplyCallback send_reply_callback) override; - /// Handle a `ReleaseUnusedWorkers` request. + /// Handle a `ReleaseUnusedActorWorkers` request. // On GCS restart, there's a pruning effort. GCS sends raylet a list of workers it still // wants (that it keeps tracks of); and the raylet destroys all other workers. - void HandleReleaseUnusedWorkers(rpc::ReleaseUnusedWorkersRequest request, - rpc::ReleaseUnusedWorkersReply *reply, - rpc::SendReplyCallback send_reply_callback) override; + void HandleReleaseUnusedActorWorkers( + rpc::ReleaseUnusedActorWorkersRequest request, + rpc::ReleaseUnusedActorWorkersReply *reply, + rpc::SendReplyCallback send_reply_callback) override; /// Handle a `ShutdownRaylet` request. void HandleShutdownRaylet(rpc::ShutdownRayletRequest request, diff --git a/src/ray/raylet_client/raylet_client.cc b/src/ray/raylet_client/raylet_client.cc index 45d1c5dbae33..dea1a7c3e38b 100644 --- a/src/ray/raylet_client/raylet_client.cc +++ b/src/ray/raylet_client/raylet_client.cc @@ -441,16 +441,16 @@ void raylet::RayletClient::PushMutableObject( }); } -void raylet::RayletClient::ReleaseUnusedWorkers( +void raylet::RayletClient::ReleaseUnusedActorWorkers( const std::vector &workers_in_use, - const rpc::ClientCallback &callback) { - rpc::ReleaseUnusedWorkersRequest request; + const rpc::ClientCallback &callback) { + rpc::ReleaseUnusedActorWorkersRequest request; for (auto &worker_id : workers_in_use) { request.add_worker_ids_in_use(worker_id.Binary()); } - grpc_client_->ReleaseUnusedWorkers( + grpc_client_->ReleaseUnusedActorWorkers( request, - [callback](const Status &status, const rpc::ReleaseUnusedWorkersReply &reply) { + [callback](const Status &status, const rpc::ReleaseUnusedActorWorkersReply &reply) { if (!status.ok()) { RAY_LOG(WARNING) << "Error releasing workers from raylet, the raylet may have died:" diff --git a/src/ray/raylet_client/raylet_client.h b/src/ray/raylet_client/raylet_client.h index 0c090506a8fd..f40c97edf620 100644 --- a/src/ray/raylet_client/raylet_client.h +++ b/src/ray/raylet_client/raylet_client.h @@ -90,9 +90,9 @@ class WorkerLeaseInterface { /// \param workers_in_use Workers currently in use. /// \param callback Callback that will be called after raylet completes the release of /// unused workers. \return ray::Status - virtual void ReleaseUnusedWorkers( + virtual void ReleaseUnusedActorWorkers( const std::vector &workers_in_use, - const rpc::ClientCallback &callback) = 0; + const rpc::ClientCallback &callback) = 0; virtual void CancelWorkerLease( const TaskID &task_id, @@ -476,9 +476,9 @@ class RayletClient : public RayletClientInterface { const std::vector &backlog_reports) override; /// Implements WorkerLeaseInterface. - void ReleaseUnusedWorkers( + void ReleaseUnusedActorWorkers( const std::vector &workers_in_use, - const rpc::ClientCallback &callback) override; + const rpc::ClientCallback &callback) override; void CancelWorkerLease( const TaskID &task_id, diff --git a/src/ray/rpc/node_manager/node_manager_client.h b/src/ray/rpc/node_manager/node_manager_client.h index a8ff72e876c8..95ca4846c3f5 100644 --- a/src/ray/rpc/node_manager/node_manager_client.h +++ b/src/ray/rpc/node_manager/node_manager_client.h @@ -111,7 +111,7 @@ class NodeManagerWorkerClient /// Release unused workers. VOID_RPC_CLIENT_METHOD(NodeManagerService, - ReleaseUnusedWorkers, + ReleaseUnusedActorWorkers, grpc_client_, /*method_timeout_ms*/ -1, ) diff --git a/src/ray/rpc/node_manager/node_manager_server.h b/src/ray/rpc/node_manager/node_manager_server.h index 3333aa6716cd..bb11333ae35d 100644 --- a/src/ray/rpc/node_manager/node_manager_server.h +++ b/src/ray/rpc/node_manager/node_manager_server.h @@ -28,29 +28,29 @@ namespace rpc { RPC_SERVICE_HANDLER_CUSTOM_AUTH(NodeManagerService, METHOD, -1, AuthType::NO_AUTH) /// NOTE: See src/ray/core_worker/core_worker.h on how to add a new grpc handler. -#define RAY_NODE_MANAGER_RPC_HANDLERS \ - RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(GetResourceLoad) \ - RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(NotifyGCSRestart) \ - RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(RequestWorkerLease) \ - RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(ReportWorkerBacklog) \ - RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(ReturnWorker) \ - RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(ReleaseUnusedWorkers) \ - RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(CancelWorkerLease) \ - RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(PinObjectIDs) \ - RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(GetNodeStats) \ - RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(GlobalGC) \ - RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(FormatGlobalMemoryInfo) \ - RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(PrepareBundleResources) \ - RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(CommitBundleResources) \ - RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(CancelResourceReserve) \ - RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(ReleaseUnusedBundles) \ - RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(GetSystemConfig) \ - RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(ShutdownRaylet) \ - RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(DrainRaylet) \ - RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(GetTasksInfo) \ - RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(GetObjectsInfo) \ - RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(GetTaskFailureCause) \ - RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(RegisterMutableObject) \ +#define RAY_NODE_MANAGER_RPC_HANDLERS \ + RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(GetResourceLoad) \ + RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(NotifyGCSRestart) \ + RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(RequestWorkerLease) \ + RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(ReportWorkerBacklog) \ + RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(ReturnWorker) \ + RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(ReleaseUnusedActorWorkers) \ + RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(CancelWorkerLease) \ + RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(PinObjectIDs) \ + RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(GetNodeStats) \ + RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(GlobalGC) \ + RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(FormatGlobalMemoryInfo) \ + RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(PrepareBundleResources) \ + RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(CommitBundleResources) \ + RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(CancelResourceReserve) \ + RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(ReleaseUnusedBundles) \ + RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(GetSystemConfig) \ + RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(ShutdownRaylet) \ + RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(DrainRaylet) \ + RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(GetTasksInfo) \ + RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(GetObjectsInfo) \ + RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(GetTaskFailureCause) \ + RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(RegisterMutableObject) \ RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(PushMutableObject) /// Interface of the `NodeManagerService`, see `src/ray/protobuf/node_manager.proto`. @@ -87,9 +87,9 @@ class NodeManagerServiceHandler { ReturnWorkerReply *reply, SendReplyCallback send_reply_callback) = 0; - virtual void HandleReleaseUnusedWorkers(ReleaseUnusedWorkersRequest request, - ReleaseUnusedWorkersReply *reply, - SendReplyCallback send_reply_callback) = 0; + virtual void HandleReleaseUnusedActorWorkers(ReleaseUnusedActorWorkersRequest request, + ReleaseUnusedActorWorkersReply *reply, + SendReplyCallback send_reply_callback) = 0; virtual void HandleShutdownRaylet(ShutdownRayletRequest request, ShutdownRayletReply *reply,