Skip to content

Commit

Permalink
rename RPC
Browse files Browse the repository at this point in the history
Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
  • Loading branch information
rynewang committed Jun 11, 2024
1 parent d83ad39 commit 547658e
Show file tree
Hide file tree
Showing 16 changed files with 94 additions and 89 deletions.
4 changes: 2 additions & 2 deletions src/mock/ray/gcs/gcs_server/gcs_actor_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class MockGcsActorSchedulerInterface : public GcsActorSchedulerInterface {
(override));
MOCK_METHOD(
void,
ReleaseUnusedWorkers,
ReleaseUnusedActorWorkers,
((const absl::flat_hash_map<NodeID, std::vector<WorkerID>> &node_to_workers)),
(override));
};
Expand Down Expand Up @@ -70,7 +70,7 @@ class MockGcsActorScheduler : public GcsActorScheduler {
(override));
MOCK_METHOD(
void,
ReleaseUnusedWorkers,
ReleaseUnusedActorWorkers,
((const absl::flat_hash_map<NodeID, std::vector<WorkerID>> &node_to_workers)),
(override));
MOCK_METHOD(void,
Expand Down
4 changes: 2 additions & 2 deletions src/mock/ray/raylet_client/raylet_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ class MockRayletClientInterface : public RayletClientInterface {
const rpc::ClientCallback<rpc::GetTaskFailureCauseReply> &callback),
(override));
MOCK_METHOD(void,
ReleaseUnusedWorkers,
ReleaseUnusedActorWorkers,
(const std::vector<WorkerID> &workers_in_use,
const rpc::ClientCallback<rpc::ReleaseUnusedWorkersReply> &callback),
const rpc::ClientCallback<rpc::ReleaseUnusedActorWorkersReply> &callback),
(override));
MOCK_METHOD(void,
CancelWorkerLease,
Expand Down
5 changes: 3 additions & 2 deletions src/ray/core_worker/test/direct_task_transport_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,10 @@ class MockRayletClient : public WorkerLeaseInterface {
callbacks.push_back(callback);
}

void ReleaseUnusedWorkers(
void ReleaseUnusedActorWorkers(
const std::vector<WorkerID> &workers_in_use,
const rpc::ClientCallback<rpc::ReleaseUnusedWorkersReply> &callback) override {}
const rpc::ClientCallback<rpc::ReleaseUnusedActorWorkersReply> &callback) override {
}

void CancelWorkerLease(
const TaskID &task_id,
Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs/gcs_server/gcs_actor_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1480,7 +1480,7 @@ void GcsActorManager::Initialize(const GcsInitData &gcs_init_data) {
});

// Notify raylets to release unused workers.
gcs_actor_scheduler_->ReleaseUnusedWorkers(node_to_workers);
gcs_actor_scheduler_->ReleaseUnusedActorWorkers(node_to_workers);

RAY_LOG(DEBUG) << "The number of registered actors is " << registered_actors_.size()
<< ", and the number of created actors is " << created_actors_.size();
Expand Down
9 changes: 5 additions & 4 deletions src/ray/gcs/gcs_server/gcs_actor_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ ActorID GcsActorScheduler::CancelOnWorker(const NodeID &node_id,
return assigned_actor_id;
}

void GcsActorScheduler::ReleaseUnusedWorkers(
void GcsActorScheduler::ReleaseUnusedActorWorkers(
const absl::flat_hash_map<NodeID, std::vector<WorkerID>> &node_to_workers) {
// The purpose of this function is to release leased workers that may be leaked.
// When GCS restarts, it doesn't know which workers it has leased in the previous
Expand All @@ -291,7 +291,7 @@ void GcsActorScheduler::ReleaseUnusedWorkers(
auto lease_client = GetOrConnectLeaseClient(address);
auto release_unused_workers_callback =
[this, node_id](const Status &status,
const rpc::ReleaseUnusedWorkersReply &reply) {
const rpc::ReleaseUnusedActorWorkersReply &reply) {
nodes_of_releasing_unused_workers_.erase(node_id);
};
auto iter = node_to_workers.find(alive_node.first);
Expand All @@ -300,7 +300,8 @@ void GcsActorScheduler::ReleaseUnusedWorkers(
// nodes do not have leased workers. In this case, GCS will send an empty list.
auto workers_in_use =
iter != node_to_workers.end() ? iter->second : std::vector<WorkerID>{};
lease_client->ReleaseUnusedWorkers(workers_in_use, release_unused_workers_callback);
lease_client->ReleaseUnusedActorWorkers(workers_in_use,
release_unused_workers_callback);
}
}

Expand All @@ -313,7 +314,7 @@ void GcsActorScheduler::LeaseWorkerFromNode(std::shared_ptr<GcsActor> actor,
<< actor->GetActorID() << ", job id = " << actor->GetActorID().JobId();

// We need to ensure that the RequestWorkerLease won't be sent before the reply of
// ReleaseUnusedWorkers is returned.
// ReleaseUnusedActorWorkers is returned.
if (nodes_of_releasing_unused_workers_.contains(node_id)) {
RetryLeasingWorkerFromNode(actor, node);
return;
Expand Down
8 changes: 4 additions & 4 deletions src/ray/gcs/gcs_server/gcs_actor_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class GcsActorSchedulerInterface {
/// Notify raylets to release unused workers.
///
/// \param node_to_workers Workers used by each node.
virtual void ReleaseUnusedWorkers(
virtual void ReleaseUnusedActorWorkers(
const absl::flat_hash_map<NodeID, std::vector<WorkerID>> &node_to_workers) = 0;

/// Handle the destruction of an actor.
Expand Down Expand Up @@ -178,7 +178,7 @@ class GcsActorScheduler : public GcsActorSchedulerInterface {
/// Notify raylets to release unused workers.
///
/// \param node_to_workers Workers used by each node.
void ReleaseUnusedWorkers(
void ReleaseUnusedActorWorkers(
const absl::flat_hash_map<NodeID, std::vector<WorkerID>> &node_to_workers) override;

/// Handle the destruction of an actor.
Expand Down Expand Up @@ -418,7 +418,7 @@ class GcsActorScheduler : public GcsActorSchedulerInterface {
FRIEND_TEST(GcsActorSchedulerTest, TestWorkerFailedWhenCreating);
FRIEND_TEST(GcsActorSchedulerTest, TestSpillback);
FRIEND_TEST(GcsActorSchedulerTest, TestReschedule);
FRIEND_TEST(GcsActorSchedulerTest, TestReleaseUnusedWorkers);
FRIEND_TEST(GcsActorSchedulerTest, TestReleaseUnusedActorWorkers);
FRIEND_TEST(GcsActorSchedulerTest, TestScheduleFailedWithZeroNodeByGcs);
FRIEND_TEST(GcsActorSchedulerTest, TestNotEnoughClusterResources);
FRIEND_TEST(GcsActorSchedulerTest, TestScheduleAndDestroyOneActor);
Expand All @@ -431,7 +431,7 @@ class GcsActorScheduler : public GcsActorSchedulerInterface {
FRIEND_TEST(GcsActorSchedulerTest, TestNodeFailedWhenCreatingByGcs);
FRIEND_TEST(GcsActorSchedulerTest, TestWorkerFailedWhenCreatingByGcs);
FRIEND_TEST(GcsActorSchedulerTest, TestRescheduleByGcs);
FRIEND_TEST(GcsActorSchedulerTest, TestReleaseUnusedWorkersByGcs);
FRIEND_TEST(GcsActorSchedulerTest, TestReleaseUnusedActorWorkersByGcs);

friend class GcsActorSchedulerMockTest;
FRIEND_TEST(GcsActorSchedulerMockTest, KillWorkerLeak1);
Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class MockActorScheduler : public gcs::GcsActorSchedulerInterface {

void Schedule(std::shared_ptr<gcs::GcsActor> actor) { actors.push_back(actor); }
void Reschedule(std::shared_ptr<gcs::GcsActor> actor) {}
void ReleaseUnusedWorkers(
void ReleaseUnusedActorWorkers(
const absl::flat_hash_map<NodeID, std::vector<WorkerID>> &node_to_workers) {}
void OnActorDestruction(std::shared_ptr<gcs::GcsActor> actor) {
const auto &actor_id = actor->GetActorID();
Expand Down
38 changes: 19 additions & 19 deletions src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -601,38 +601,38 @@ TEST_F(GcsActorSchedulerTest, TestReschedule) {
ASSERT_EQ(2, success_actors_.size());
}

TEST_F(GcsActorSchedulerTest, TestReleaseUnusedWorkers) {
TEST_F(GcsActorSchedulerTest, TestReleaseUnusedActorWorkers) {
// Test the case that GCS won't send `RequestWorkerLease` request to the raylet,
// if there is still a pending `ReleaseUnusedWorkers` request.
// if there is still a pending `ReleaseUnusedActorWorkers` request.

// Add a node to the cluster.
auto node = Mocker::GenNodeInfo();
auto node_id = NodeID::FromBinary(node->node_id());
gcs_node_manager_->AddNode(node);
ASSERT_EQ(1, gcs_node_manager_->GetAllAliveNodes().size());

// Send a `ReleaseUnusedWorkers` request to the node.
// Send a `ReleaseUnusedActorWorkers` request to the node.
absl::flat_hash_map<NodeID, std::vector<WorkerID>> node_to_workers;
node_to_workers[node_id].push_back({WorkerID::FromRandom()});
gcs_actor_scheduler_->ReleaseUnusedWorkers(node_to_workers);
gcs_actor_scheduler_->ReleaseUnusedActorWorkers(node_to_workers);
ASSERT_EQ(1, raylet_client_->num_release_unused_workers);
ASSERT_EQ(1, raylet_client_->release_callbacks.size());

// Schedule an actor which is not tied to a worker, this should invoke the
// `LeaseWorkerFromNode` method.
// But since the `ReleaseUnusedWorkers` request hasn't finished, `GcsActorScheduler`
// won't send `RequestWorkerLease` request to node immediately. But instead, it will
// invoke the `RetryLeasingWorkerFromNode` to retry later.
// But since the `ReleaseUnusedActorWorkers` request hasn't finished,
// `GcsActorScheduler` won't send `RequestWorkerLease` request to node immediately. But
// instead, it will invoke the `RetryLeasingWorkerFromNode` to retry later.
auto job_id = JobID::FromInt(1);
auto request = Mocker::GenCreateActorRequest(job_id);
auto actor = std::make_shared<gcs::GcsActor>(request.task_spec(), "", counter);
gcs_actor_scheduler_->ScheduleByRaylet(actor);
ASSERT_EQ(2, gcs_actor_scheduler_->num_retry_leasing_count_);
ASSERT_EQ(raylet_client_->num_workers_requested, 0);

// When `GcsActorScheduler` receives the `ReleaseUnusedWorkers` reply, it will send
// When `GcsActorScheduler` receives the `ReleaseUnusedActorWorkers` reply, it will send
// out the `RequestWorkerLease` request.
ASSERT_TRUE(raylet_client_->ReplyReleaseUnusedWorkers());
ASSERT_TRUE(raylet_client_->ReplyReleaseUnusedActorWorkers());
gcs_actor_scheduler_->TryLeaseWorkerFromNodeAgain(actor, node);
ASSERT_EQ(raylet_client_->num_workers_requested, 1);
}
Expand Down Expand Up @@ -1150,9 +1150,9 @@ TEST_F(GcsActorSchedulerTest, TestRescheduleByGcs) {
ASSERT_EQ(2, success_actors_.size());
}

TEST_F(GcsActorSchedulerTest, TestReleaseUnusedWorkersByGcs) {
TEST_F(GcsActorSchedulerTest, TestReleaseUnusedActorWorkersByGcs) {
// Test the case that GCS won't send `RequestWorkerLease` request to the raylet,
// if there is still a pending `ReleaseUnusedWorkers` request.
// if there is still a pending `ReleaseUnusedActorWorkers` request.

// Add a node to the cluster.
// Add a node with 64 memory units and 8 CPU.
Expand All @@ -1162,29 +1162,29 @@ TEST_F(GcsActorSchedulerTest, TestReleaseUnusedWorkersByGcs) {
auto node_id = NodeID::FromBinary(node->node_id());
ASSERT_EQ(1, gcs_node_manager_->GetAllAliveNodes().size());

// Send a `ReleaseUnusedWorkers` request to the node.
// Send a `ReleaseUnusedActorWorkers` request to the node.
absl::flat_hash_map<NodeID, std::vector<WorkerID>> node_to_workers;
node_to_workers[node_id].push_back({WorkerID::FromRandom()});
gcs_actor_scheduler_->ReleaseUnusedWorkers(node_to_workers);
gcs_actor_scheduler_->ReleaseUnusedActorWorkers(node_to_workers);
ASSERT_EQ(1, raylet_client_->num_release_unused_workers);
ASSERT_EQ(1, raylet_client_->release_callbacks.size());

// Schedule an actor which is not tied to a worker, this should invoke the
// `LeaseWorkerFromNode` method.
// But since the `ReleaseUnusedWorkers` request hasn't finished, `GcsActorScheduler`
// won't send `RequestWorkerLease` request to node immediately. But instead, it will
// invoke the `RetryLeasingWorkerFromNode` to retry later.
// Schedule a actor (requiring 32 memory units and 4 CPU).
// But since the `ReleaseUnusedActorWorkers` request hasn't finished,
// `GcsActorScheduler` won't send `RequestWorkerLease` request to node immediately. But
// instead, it will invoke the `RetryLeasingWorkerFromNode` to retry later. Schedule a
// actor (requiring 32 memory units and 4 CPU).
std::unordered_map<std::string, double> required_placement_resources = {
{kMemory_ResourceLabel, 32}, {kCPU_ResourceLabel, 4}};
auto actor = NewGcsActor(required_placement_resources);
gcs_actor_scheduler_->ScheduleByGcs(actor);
ASSERT_EQ(2, gcs_actor_scheduler_->num_retry_leasing_count_);
ASSERT_EQ(raylet_client_->num_workers_requested, 0);

// When `GcsActorScheduler` receives the `ReleaseUnusedWorkers` reply, it will send
// When `GcsActorScheduler` receives the `ReleaseUnusedActorWorkers` reply, it will send
// out the `RequestWorkerLease` request.
ASSERT_TRUE(raylet_client_->ReplyReleaseUnusedWorkers());
ASSERT_TRUE(raylet_client_->ReplyReleaseUnusedActorWorkers());
gcs_actor_scheduler_->TryLeaseWorkerFromNodeAgain(actor, node);
ASSERT_EQ(raylet_client_->num_workers_requested, 1);
}
Expand Down
12 changes: 7 additions & 5 deletions src/ray/gcs/gcs_server/test/gcs_server_test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,10 @@ struct GcsServerMocker {
}

/// WorkerLeaseInterface
void ReleaseUnusedWorkers(
void ReleaseUnusedActorWorkers(
const std::vector<WorkerID> &workers_in_use,
const rpc::ClientCallback<rpc::ReleaseUnusedWorkersReply> &callback) override {
const rpc::ClientCallback<rpc::ReleaseUnusedActorWorkersReply> &callback)
override {
num_release_unused_workers += 1;
release_callbacks.push_back(callback);
}
Expand Down Expand Up @@ -188,8 +189,8 @@ struct GcsServerMocker {
}
}

bool ReplyReleaseUnusedWorkers() {
rpc::ReleaseUnusedWorkersReply reply;
bool ReplyReleaseUnusedActorWorkers() {
rpc::ReleaseUnusedActorWorkersReply reply;
if (release_callbacks.size() == 0) {
return false;
} else {
Expand Down Expand Up @@ -335,7 +336,8 @@ struct GcsServerMocker {
std::list<rpc::ClientCallback<rpc::DrainRayletReply>> drain_raylet_callbacks = {};
std::list<rpc::ClientCallback<rpc::RequestWorkerLeaseReply>> callbacks = {};
std::list<rpc::ClientCallback<rpc::CancelWorkerLeaseReply>> cancel_callbacks = {};
std::list<rpc::ClientCallback<rpc::ReleaseUnusedWorkersReply>> release_callbacks = {};
std::list<rpc::ClientCallback<rpc::ReleaseUnusedActorWorkersReply>>
release_callbacks = {};
int num_lease_requested = 0;
int num_return_requested = 0;
int num_commit_requested = 0;
Expand Down
8 changes: 4 additions & 4 deletions src/ray/protobuf/node_manager.proto
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,11 @@ message ReturnWorkerRequest {
message ReturnWorkerReply {
}

message ReleaseUnusedWorkersRequest {
message ReleaseUnusedActorWorkersRequest {
repeated bytes worker_ids_in_use = 1;
}

message ReleaseUnusedWorkersReply {
message ReleaseUnusedActorWorkersReply {
}

message ShutdownRayletRequest {
Expand Down Expand Up @@ -383,8 +383,8 @@ service NodeManagerService {
// that may be leaked. When GCS restarts, it doesn't know which workers it has leased
// in the previous lifecycle. In this case, GCS will send a list of worker ids that
// are still needed. And Raylet will release other leased workers.
rpc ReleaseUnusedWorkers(ReleaseUnusedWorkersRequest)
returns (ReleaseUnusedWorkersReply);
rpc ReleaseUnusedActorWorkers(ReleaseUnusedActorWorkersRequest)
returns (ReleaseUnusedActorWorkersReply);
/// Shutdown the raylet (node manager).
rpc ShutdownRaylet(ShutdownRayletRequest) returns (ShutdownRayletReply);
// Request to drain the raylet.
Expand Down
10 changes: 5 additions & 5 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2020,9 +2020,10 @@ void NodeManager::HandleShutdownRaylet(rpc::ShutdownRayletRequest request,
send_reply_callback(Status::OK(), shutdown_after_reply, shutdown_after_reply);
}

void NodeManager::HandleReleaseUnusedWorkers(rpc::ReleaseUnusedWorkersRequest request,
rpc::ReleaseUnusedWorkersReply *reply,
rpc::SendReplyCallback send_reply_callback) {
void NodeManager::HandleReleaseUnusedActorWorkers(
rpc::ReleaseUnusedActorWorkersRequest request,
rpc::ReleaseUnusedActorWorkersReply *reply,
rpc::SendReplyCallback send_reply_callback) {
std::unordered_set<WorkerID> in_use_worker_ids;
for (int index = 0; index < request.worker_ids_in_use_size(); ++index) {
auto worker_id = WorkerID::FromBinary(request.worker_ids_in_use(index));
Expand All @@ -2031,8 +2032,7 @@ void NodeManager::HandleReleaseUnusedWorkers(rpc::ReleaseUnusedWorkersRequest re

std::vector<std::shared_ptr<WorkerInterface>> unused_workers;
for (auto &iter : leased_workers_) {
// We need to exclude workers used by common tasks.
// Because they are not used by GCS.
// We only kill *actor* workers.
if (!iter.second->GetActorId().IsNil() && !in_use_worker_ids.count(iter.first)) {
unused_workers.push_back(iter.second);
}
Expand Down
9 changes: 5 additions & 4 deletions src/ray/raylet/node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -533,12 +533,13 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
rpc::ReturnWorkerReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

/// Handle a `ReleaseUnusedWorkers` request.
/// Handle a `ReleaseUnusedActorWorkers` request.
// On GCS restart, there's a pruning effort. GCS sends raylet a list of workers it still
// wants (that it keeps tracks of); and the raylet destroys all other workers.
void HandleReleaseUnusedWorkers(rpc::ReleaseUnusedWorkersRequest request,
rpc::ReleaseUnusedWorkersReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
void HandleReleaseUnusedActorWorkers(
rpc::ReleaseUnusedActorWorkersRequest request,
rpc::ReleaseUnusedActorWorkersReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

/// Handle a `ShutdownRaylet` request.
void HandleShutdownRaylet(rpc::ShutdownRayletRequest request,
Expand Down
10 changes: 5 additions & 5 deletions src/ray/raylet_client/raylet_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -441,16 +441,16 @@ void raylet::RayletClient::PushMutableObject(
});
}

void raylet::RayletClient::ReleaseUnusedWorkers(
void raylet::RayletClient::ReleaseUnusedActorWorkers(
const std::vector<WorkerID> &workers_in_use,
const rpc::ClientCallback<rpc::ReleaseUnusedWorkersReply> &callback) {
rpc::ReleaseUnusedWorkersRequest request;
const rpc::ClientCallback<rpc::ReleaseUnusedActorWorkersReply> &callback) {
rpc::ReleaseUnusedActorWorkersRequest request;
for (auto &worker_id : workers_in_use) {
request.add_worker_ids_in_use(worker_id.Binary());
}
grpc_client_->ReleaseUnusedWorkers(
grpc_client_->ReleaseUnusedActorWorkers(
request,
[callback](const Status &status, const rpc::ReleaseUnusedWorkersReply &reply) {
[callback](const Status &status, const rpc::ReleaseUnusedActorWorkersReply &reply) {
if (!status.ok()) {
RAY_LOG(WARNING)
<< "Error releasing workers from raylet, the raylet may have died:"
Expand Down
8 changes: 4 additions & 4 deletions src/ray/raylet_client/raylet_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ class WorkerLeaseInterface {
/// \param workers_in_use Workers currently in use.
/// \param callback Callback that will be called after raylet completes the release of
/// unused workers. \return ray::Status
virtual void ReleaseUnusedWorkers(
virtual void ReleaseUnusedActorWorkers(
const std::vector<WorkerID> &workers_in_use,
const rpc::ClientCallback<rpc::ReleaseUnusedWorkersReply> &callback) = 0;
const rpc::ClientCallback<rpc::ReleaseUnusedActorWorkersReply> &callback) = 0;

virtual void CancelWorkerLease(
const TaskID &task_id,
Expand Down Expand Up @@ -476,9 +476,9 @@ class RayletClient : public RayletClientInterface {
const std::vector<rpc::WorkerBacklogReport> &backlog_reports) override;

/// Implements WorkerLeaseInterface.
void ReleaseUnusedWorkers(
void ReleaseUnusedActorWorkers(
const std::vector<WorkerID> &workers_in_use,
const rpc::ClientCallback<rpc::ReleaseUnusedWorkersReply> &callback) override;
const rpc::ClientCallback<rpc::ReleaseUnusedActorWorkersReply> &callback) override;

void CancelWorkerLease(
const TaskID &task_id,
Expand Down
2 changes: 1 addition & 1 deletion src/ray/rpc/node_manager/node_manager_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ class NodeManagerWorkerClient

/// Release unused workers.
VOID_RPC_CLIENT_METHOD(NodeManagerService,
ReleaseUnusedWorkers,
ReleaseUnusedActorWorkers,
grpc_client_,
/*method_timeout_ms*/ -1, )

Expand Down
Loading

0 comments on commit 547658e

Please sign in to comment.