From 32854522848af045be1e113903cdb63461784785 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang <56065503+rynewang@users.noreply.github.com> Date: Thu, 26 Sep 2024 16:10:22 -0700 Subject: [PATCH] [core] move GetInternalConfig: NodeInfo -> InternalKV (#47755) Every time GetInternalConfig reads from table_storage, but it's never mutated. Moves to internal kv as a simple in-mem get (no more read from redis). This itself should slightly update performance. But with #47736 it should improve start up latency a lot in thousand-node clusters. In theory we can remove it all for good, instead just put it as an InternalKV entry. but let's do things step by step. Signed-off-by: Ruiyang Wang Signed-off-by: Ruiyang Wang <56065503+rynewang@users.noreply.github.com> --- src/mock/ray/gcs/gcs_client/accessor.h | 8 ++--- .../ray/gcs/gcs_server/gcs_node_manager.h | 6 ---- src/ray/gcs/gcs_client/accessor.cc | 30 +++++++++---------- src/ray/gcs/gcs_client/accessor.h | 14 ++++----- .../gcs/gcs_client/global_state_accessor.cc | 2 +- src/ray/gcs/gcs_server/gcs_kv_manager.cc | 8 +++++ src/ray/gcs/gcs_server/gcs_kv_manager.h | 13 ++++++-- src/ray/gcs/gcs_server/gcs_node_manager.cc | 20 +------------ src/ray/gcs/gcs_server/gcs_node_manager.h | 8 +---- src/ray/gcs/gcs_server/gcs_server.cc | 3 +- .../test/gcs_autoscaler_state_manager_test.cc | 16 +++++++++- src/ray/protobuf/gcs_service.proto | 3 +- src/ray/raylet/main.cc | 2 +- src/ray/rpc/gcs_server/gcs_rpc_client.h | 12 ++++---- src/ray/rpc/gcs_server/gcs_rpc_server.h | 10 +++---- 15 files changed, 77 insertions(+), 78 deletions(-) diff --git a/src/mock/ray/gcs/gcs_client/accessor.h b/src/mock/ray/gcs/gcs_client/accessor.h index 853f6bbae7f36..135b159a9f035 100644 --- a/src/mock/ray/gcs/gcs_client/accessor.h +++ b/src/mock/ray/gcs/gcs_client/accessor.h @@ -175,10 +175,6 @@ class MockNodeInfoAccessor : public NodeInfoAccessor { (override)); MOCK_METHOD(bool, IsRemoved, (const NodeID &node_id), (const, override)); MOCK_METHOD(void, AsyncResubscribe, (), (override)); - MOCK_METHOD(Status, - AsyncGetInternalConfig, - (const OptionalItemCallback &callback), - (override)); }; } // namespace gcs @@ -348,6 +344,10 @@ class MockInternalKVAccessor : public InternalKVAccessor { const int64_t timeout_ms, const OptionalItemCallback &callback), (override)); + MOCK_METHOD(Status, + AsyncGetInternalConfig, + (const OptionalItemCallback &callback), + (override)); }; } // namespace gcs diff --git a/src/mock/ray/gcs/gcs_server/gcs_node_manager.h b/src/mock/ray/gcs/gcs_server/gcs_node_manager.h index 2b77fe5fd1896..3a8f22949faeb 100644 --- a/src/mock/ray/gcs/gcs_server/gcs_node_manager.h +++ b/src/mock/ray/gcs/gcs_server/gcs_node_manager.h @@ -37,12 +37,6 @@ class MockGcsNodeManager : public GcsNodeManager { rpc::GetAllNodeInfoReply *reply, rpc::SendReplyCallback send_reply_callback), (override)); - MOCK_METHOD(void, - HandleGetInternalConfig, - (rpc::GetInternalConfigRequest request, - rpc::GetInternalConfigReply *reply, - rpc::SendReplyCallback send_reply_callback), - (override)); MOCK_METHOD(void, DrainNode, (const NodeID &node_id), (override)); }; diff --git a/src/ray/gcs/gcs_client/accessor.cc b/src/ray/gcs/gcs_client/accessor.cc index b9c8f07fdc818..5f7c8f8bcf516 100644 --- a/src/ray/gcs/gcs_client/accessor.cc +++ b/src/ray/gcs/gcs_client/accessor.cc @@ -803,21 +803,6 @@ void NodeInfoAccessor::AsyncResubscribe() { } } -Status NodeInfoAccessor::AsyncGetInternalConfig( - const OptionalItemCallback &callback) { - rpc::GetInternalConfigRequest request; - client_impl_->GetGcsRpcClient().GetInternalConfig( - request, [callback](const Status &status, rpc::GetInternalConfigReply &&reply) { - if (status.ok()) { - RAY_LOG(DEBUG) << "Fetched internal config: " << reply.config(); - } else { - RAY_LOG(ERROR) << "Failed to get internal config: " << status.message(); - } - callback(status, reply.config()); - }); - return Status::OK(); -} - NodeResourceInfoAccessor::NodeResourceInfoAccessor(GcsClient *client_impl) : client_impl_(client_impl) {} @@ -1414,6 +1399,21 @@ Status InternalKVAccessor::Exists(const std::string &ns, return ret_promise.get_future().get(); } +Status InternalKVAccessor::AsyncGetInternalConfig( + const OptionalItemCallback &callback) { + rpc::GetInternalConfigRequest request; + client_impl_->GetGcsRpcClient().GetInternalConfig( + request, [callback](const Status &status, rpc::GetInternalConfigReply &&reply) { + if (status.ok()) { + RAY_LOG(DEBUG) << "Fetched internal config: " << reply.config(); + } else { + RAY_LOG(ERROR) << "Failed to get internal config: " << status.message(); + } + callback(status, reply.config()); + }); + return Status::OK(); +} + RuntimeEnvAccessor::RuntimeEnvAccessor(GcsClient *client_impl) : client_impl_(client_impl) {} diff --git a/src/ray/gcs/gcs_client/accessor.h b/src/ray/gcs/gcs_client/accessor.h index b7d3c8f44c1e0..a863ad08cdacd 100644 --- a/src/ray/gcs/gcs_client/accessor.h +++ b/src/ray/gcs/gcs_client/accessor.h @@ -456,13 +456,6 @@ class NodeInfoAccessor { /// server. virtual void AsyncResubscribe(); - /// Get the internal config string from GCS. - /// - /// \param callback Processes a map of config options - /// \return Status - virtual Status AsyncGetInternalConfig( - const OptionalItemCallback &callback); - /// Add a node to accessor cache. virtual void HandleNotification(rpc::GcsNodeInfo &&node_info); @@ -939,6 +932,13 @@ class InternalKVAccessor { const int64_t timeout_ms, bool &exists); + /// Get the internal config string from GCS. + /// + /// \param callback Processes a map of config options + /// \return Status + virtual Status AsyncGetInternalConfig( + const OptionalItemCallback &callback); + private: GcsClient *client_impl_; }; diff --git a/src/ray/gcs/gcs_client/global_state_accessor.cc b/src/ray/gcs/gcs_client/global_state_accessor.cc index 0ad28e30e1aa9..cc482cfc672ed 100644 --- a/src/ray/gcs/gcs_client/global_state_accessor.cc +++ b/src/ray/gcs/gcs_client/global_state_accessor.cc @@ -380,7 +380,7 @@ std::string GlobalStateAccessor::GetSystemConfig() { std::promise promise; { absl::ReaderMutexLock lock(&mutex_); - RAY_CHECK_OK(gcs_client_->Nodes().AsyncGetInternalConfig( + RAY_CHECK_OK(gcs_client_->InternalKV().AsyncGetInternalConfig( [&promise](const Status &status, const std::optional &stored_raylet_config) { RAY_CHECK_OK(status); diff --git a/src/ray/gcs/gcs_server/gcs_kv_manager.cc b/src/ray/gcs/gcs_server/gcs_kv_manager.cc index b16a87da60211..47f7146710d24 100644 --- a/src/ray/gcs/gcs_server/gcs_kv_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_kv_manager.cc @@ -141,6 +141,14 @@ void GcsInternalKVManager::HandleInternalKVKeys( } } +void GcsInternalKVManager::HandleGetInternalConfig( + rpc::GetInternalConfigRequest request, + rpc::GetInternalConfigReply *reply, + rpc::SendReplyCallback send_reply_callback) { + reply->set_config(raylet_config_list_); + GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); +} + Status GcsInternalKVManager::ValidateKey(const std::string &key) const { constexpr std::string_view kNamespacePrefix = "@namespace_"; if (absl::StartsWith(key, kNamespacePrefix)) { diff --git a/src/ray/gcs/gcs_server/gcs_kv_manager.h b/src/ray/gcs/gcs_server/gcs_kv_manager.h index 47eab413f3718..9b8830102a915 100644 --- a/src/ray/gcs/gcs_server/gcs_kv_manager.h +++ b/src/ray/gcs/gcs_server/gcs_kv_manager.h @@ -94,14 +94,15 @@ class InternalKVInterface { const std::string &prefix, std::function)> callback) = 0; - virtual ~InternalKVInterface(){}; + virtual ~InternalKVInterface() = default; }; /// This implementation class of `InternalKVHandler`. class GcsInternalKVManager : public rpc::InternalKVHandler { public: - explicit GcsInternalKVManager(std::unique_ptr kv_instance) - : kv_instance_(std::move(kv_instance)) {} + explicit GcsInternalKVManager(std::unique_ptr kv_instance, + const std::string &raylet_config_list) + : kv_instance_(std::move(kv_instance)), raylet_config_list_(raylet_config_list) {} void HandleInternalKVGet(rpc::InternalKVGetRequest request, rpc::InternalKVGetReply *reply, @@ -127,10 +128,16 @@ class GcsInternalKVManager : public rpc::InternalKVHandler { rpc::InternalKVKeysReply *reply, rpc::SendReplyCallback send_reply_callback) override; + /// Handle get internal config. + void HandleGetInternalConfig(rpc::GetInternalConfigRequest request, + rpc::GetInternalConfigReply *reply, + rpc::SendReplyCallback send_reply_callback) override; + InternalKVInterface &GetInstance() { return *kv_instance_; } private: std::unique_ptr kv_instance_; + const std::string raylet_config_list_; Status ValidateKey(const std::string &key) const; }; diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.cc b/src/ray/gcs/gcs_server/gcs_node_manager.cc index 249b520091cda..7542deede15b0 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_node_manager.cc @@ -274,22 +274,6 @@ void GcsNodeManager::HandleGetAllNodeInfo(rpc::GetAllNodeInfoRequest request, ++counts_[CountType::GET_ALL_NODE_INFO_REQUEST]; } -void GcsNodeManager::HandleGetInternalConfig(rpc::GetInternalConfigRequest request, - rpc::GetInternalConfigReply *reply, - rpc::SendReplyCallback send_reply_callback) { - auto get_system_config = [reply, send_reply_callback]( - const ray::Status &status, - const std::optional &config) { - if (config.has_value()) { - reply->set_config(config->config()); - } - GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); - }; - RAY_CHECK_OK( - gcs_table_storage_->InternalConfigTable().Get(UniqueID::Nil(), get_system_config)); - ++counts_[CountType::GET_INTERNAL_CONFIG_REQUEST]; -} - absl::optional> GcsNodeManager::GetAliveNode( const ray::NodeID &node_id) const { auto iter = alive_nodes_.find(node_id); @@ -496,9 +480,7 @@ std::string GcsNodeManager::DebugString() const { << counts_[CountType::REGISTER_NODE_REQUEST] << "\n- DrainNode request count: " << counts_[CountType::DRAIN_NODE_REQUEST] << "\n- GetAllNodeInfo request count: " - << counts_[CountType::GET_ALL_NODE_INFO_REQUEST] - << "\n- GetInternalConfig request count: " - << counts_[CountType::GET_INTERNAL_CONFIG_REQUEST]; + << counts_[CountType::GET_ALL_NODE_INFO_REQUEST]; return stream.str(); } diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.h b/src/ray/gcs/gcs_server/gcs_node_manager.h index 7db037e767e59..db258d4cb00c0 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.h +++ b/src/ray/gcs/gcs_server/gcs_node_manager.h @@ -78,11 +78,6 @@ class GcsNodeManager : public rpc::NodeInfoHandler { rpc::GetAllNodeInfoReply *reply, rpc::SendReplyCallback send_reply_callback) override; - /// Handle get internal config. - void HandleGetInternalConfig(rpc::GetInternalConfigRequest request, - rpc::GetInternalConfigReply *reply, - rpc::SendReplyCallback send_reply_callback) override; - /// Handle check alive request for GCS. void HandleCheckAlive(rpc::CheckAliveRequest request, rpc::CheckAliveReply *reply, @@ -262,8 +257,7 @@ class GcsNodeManager : public rpc::NodeInfoHandler { REGISTER_NODE_REQUEST = 0, DRAIN_NODE_REQUEST = 1, GET_ALL_NODE_INFO_REQUEST = 2, - GET_INTERNAL_CONFIG_REQUEST = 3, - CountType_MAX = 4, + CountType_MAX = 3, }; uint64_t counts_[CountType::CountType_MAX] = {0}; diff --git a/src/ray/gcs/gcs_server/gcs_server.cc b/src/ray/gcs/gcs_server/gcs_server.cc index 88708b005e6ab..4affb289f42a9 100644 --- a/src/ray/gcs/gcs_server/gcs_server.cc +++ b/src/ray/gcs/gcs_server/gcs_server.cc @@ -575,7 +575,8 @@ void GcsServer::InitKVManager() { RAY_LOG(FATAL) << "Unexpected storage type! " << storage_type_; } - kv_manager_ = std::make_unique(std::move(instance)); + kv_manager_ = std::make_unique(std::move(instance), + config_.raylet_config_list); } void GcsServer::InitKVService() { diff --git a/src/ray/gcs/gcs_server/test/gcs_autoscaler_state_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_autoscaler_state_manager_test.cc index 913cce5a300c7..b0b31b182d77a 100644 --- a/src/ray/gcs/gcs_server/test/gcs_autoscaler_state_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_autoscaler_state_manager_test.cc @@ -45,6 +45,7 @@ class GcsAutoscalerStateManagerTest : public ::testing::Test { GcsAutoscalerStateManagerTest() {} protected: + static constexpr char kRayletConfig[] = R"({"raylet_config":"this is a config"})"; instrumented_io_context io_service_; std::shared_ptr raylet_client_; std::shared_ptr client_pool_; @@ -65,7 +66,8 @@ class GcsAutoscalerStateManagerTest : public ::testing::Test { cluster_resource_manager_ = std::make_unique(io_service_); gcs_node_manager_ = std::make_shared(); kv_manager_ = std::make_unique( - std::make_unique(std::make_unique())); + std::make_unique(std::make_unique()), + kRayletConfig); function_manager_ = std::make_unique(kv_manager_->GetInstance()); runtime_env_manager_ = std::make_unique( [](const std::string &, std::function) {}); @@ -831,6 +833,18 @@ TEST_F(GcsAutoscalerStateManagerTest, TestIdleTime) { } } +TEST_F(GcsAutoscalerStateManagerTest, TestGcsKvManagerInternalConfig) { + // This is really a test for GcsKvManager. However gcs_kv_manager_test.cc is a larger + // misnomer - it does not test that class at all; it only tests StoreClientInternalKV. + // We temporarily put this test here. + rpc::GetInternalConfigRequest request; + rpc::GetInternalConfigReply reply; + auto send_reply_callback = + [](ray::Status status, std::function f1, std::function f2) {}; + kv_manager_->HandleGetInternalConfig(request, &reply, send_reply_callback); + EXPECT_EQ(reply.config(), kRayletConfig); +} + } // namespace gcs } // namespace ray diff --git a/src/ray/protobuf/gcs_service.proto b/src/ray/protobuf/gcs_service.proto index 68e7c0ce776dd..af2d38ee2446d 100644 --- a/src/ray/protobuf/gcs_service.proto +++ b/src/ray/protobuf/gcs_service.proto @@ -602,6 +602,7 @@ service InternalKVGcsService { rpc InternalKVDel(InternalKVDelRequest) returns (InternalKVDelReply); rpc InternalKVExists(InternalKVExistsRequest) returns (InternalKVExistsReply); rpc InternalKVKeys(InternalKVKeysRequest) returns (InternalKVKeysReply); + rpc GetInternalConfig(GetInternalConfigRequest) returns (GetInternalConfigReply); } message PinRuntimeEnvURIRequest { @@ -750,8 +751,6 @@ service NodeInfoGcsService { rpc DrainNode(DrainNodeRequest) returns (DrainNodeReply); // Get information of all nodes from GCS Service. rpc GetAllNodeInfo(GetAllNodeInfoRequest) returns (GetAllNodeInfoReply); - // Get cluster internal config. - rpc GetInternalConfig(GetInternalConfigRequest) returns (GetInternalConfigReply); // Check alive. rpc CheckAlive(CheckAliveRequest) returns (CheckAliveReply); } diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index d5ebc7cf24171..712c43b705217 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -269,7 +269,7 @@ int main(int argc, char *argv[]) { "shutdown_raylet_gracefully_internal"); }; - RAY_CHECK_OK(gcs_client->Nodes().AsyncGetInternalConfig( + RAY_CHECK_OK(gcs_client->InternalKV().AsyncGetInternalConfig( [&](::ray::Status status, const std::optional &stored_raylet_config) { RAY_CHECK_OK(status); RAY_CHECK(stored_raylet_config.has_value()); diff --git a/src/ray/rpc/gcs_server/gcs_rpc_client.h b/src/ray/rpc/gcs_server/gcs_rpc_client.h index c46fbeb5059b6..a8fed2d20f272 100644 --- a/src/ray/rpc/gcs_server/gcs_rpc_client.h +++ b/src/ray/rpc/gcs_server/gcs_rpc_client.h @@ -403,12 +403,6 @@ class GcsRpcClient { node_info_grpc_client_, /*method_timeout_ms*/ -1, ) - /// Get internal config of the node from the GCS Service. - VOID_GCS_RPC_CLIENT_METHOD(NodeInfoGcsService, - GetInternalConfig, - node_info_grpc_client_, - /*method_timeout_ms*/ -1, ) - /// Get available resources of all nodes from the GCS Service. VOID_GCS_RPC_CLIENT_METHOD(NodeResourceInfoGcsService, GetAllAvailableResources, @@ -541,6 +535,12 @@ class GcsRpcClient { internal_kv_grpc_client_, /*method_timeout_ms*/ -1, ) + /// Get internal config of the node from the GCS Service. + VOID_GCS_RPC_CLIENT_METHOD(InternalKVGcsService, + GetInternalConfig, + internal_kv_grpc_client_, + /*method_timeout_ms*/ -1, ) + /// Operations for pubsub VOID_GCS_RPC_CLIENT_METHOD(InternalPubSubGcsService, GcsPublish, diff --git a/src/ray/rpc/gcs_server/gcs_rpc_server.h b/src/ray/rpc/gcs_server/gcs_rpc_server.h index be7247bff4bf0..e2f684f6f71bc 100644 --- a/src/ray/rpc/gcs_server/gcs_rpc_server.h +++ b/src/ray/rpc/gcs_server/gcs_rpc_server.h @@ -326,10 +326,6 @@ class NodeInfoGcsServiceHandler { virtual void HandleGetAllNodeInfo(GetAllNodeInfoRequest request, GetAllNodeInfoReply *reply, SendReplyCallback send_reply_callback) = 0; - - virtual void HandleGetInternalConfig(GetInternalConfigRequest request, - GetInternalConfigReply *reply, - SendReplyCallback send_reply_callback) = 0; }; /// The `GrpcService` for `NodeInfoGcsService`. @@ -360,7 +356,6 @@ class NodeInfoGrpcService : public GrpcService { NODE_INFO_SERVICE_RPC_HANDLER(UnregisterNode); NODE_INFO_SERVICE_RPC_HANDLER(DrainNode); NODE_INFO_SERVICE_RPC_HANDLER(GetAllNodeInfo); - NODE_INFO_SERVICE_RPC_HANDLER(GetInternalConfig); NODE_INFO_SERVICE_RPC_HANDLER(CheckAlive); } @@ -573,6 +568,10 @@ class InternalKVGcsServiceHandler { virtual void HandleInternalKVExists(InternalKVExistsRequest request, InternalKVExistsReply *reply, SendReplyCallback send_reply_callback) = 0; + + virtual void HandleGetInternalConfig(GetInternalConfigRequest request, + GetInternalConfigReply *reply, + SendReplyCallback send_reply_callback) = 0; }; class InternalKVGrpcService : public GrpcService { @@ -593,6 +592,7 @@ class InternalKVGrpcService : public GrpcService { INTERNAL_KV_SERVICE_RPC_HANDLER(InternalKVDel); INTERNAL_KV_SERVICE_RPC_HANDLER(InternalKVExists); INTERNAL_KV_SERVICE_RPC_HANDLER(InternalKVKeys); + INTERNAL_KV_SERVICE_RPC_HANDLER(GetInternalConfig); } private: