Skip to content

Commit

Permalink
[core] move GetInternalConfig: NodeInfo -> InternalKV (#47755)
Browse files Browse the repository at this point in the history
Every time GetInternalConfig reads from table_storage, but it's never
mutated. Moves to internal kv as a simple in-mem get (no more read from
redis). This itself should slightly update performance. But with #47736
it should improve start up latency a lot in thousand-node clusters. In
theory we can remove it all for good, instead just put it as an
InternalKV entry. but let's do things step by step.

Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
Signed-off-by: Ruiyang Wang <56065503+rynewang@users.noreply.github.com>
  • Loading branch information
rynewang committed Sep 26, 2024
1 parent 0fd0437 commit 3285452
Show file tree
Hide file tree
Showing 15 changed files with 77 additions and 78 deletions.
8 changes: 4 additions & 4 deletions src/mock/ray/gcs/gcs_client/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,6 @@ class MockNodeInfoAccessor : public NodeInfoAccessor {
(override));
MOCK_METHOD(bool, IsRemoved, (const NodeID &node_id), (const, override));
MOCK_METHOD(void, AsyncResubscribe, (), (override));
MOCK_METHOD(Status,
AsyncGetInternalConfig,
(const OptionalItemCallback<std::string> &callback),
(override));
};

} // namespace gcs
Expand Down Expand Up @@ -348,6 +344,10 @@ class MockInternalKVAccessor : public InternalKVAccessor {
const int64_t timeout_ms,
const OptionalItemCallback<int> &callback),
(override));
MOCK_METHOD(Status,
AsyncGetInternalConfig,
(const OptionalItemCallback<std::string> &callback),
(override));
};

} // namespace gcs
Expand Down
6 changes: 0 additions & 6 deletions src/mock/ray/gcs/gcs_server/gcs_node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,6 @@ class MockGcsNodeManager : public GcsNodeManager {
rpc::GetAllNodeInfoReply *reply,
rpc::SendReplyCallback send_reply_callback),
(override));
MOCK_METHOD(void,
HandleGetInternalConfig,
(rpc::GetInternalConfigRequest request,
rpc::GetInternalConfigReply *reply,
rpc::SendReplyCallback send_reply_callback),
(override));
MOCK_METHOD(void, DrainNode, (const NodeID &node_id), (override));
};

Expand Down
30 changes: 15 additions & 15 deletions src/ray/gcs/gcs_client/accessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -803,21 +803,6 @@ void NodeInfoAccessor::AsyncResubscribe() {
}
}

Status NodeInfoAccessor::AsyncGetInternalConfig(
const OptionalItemCallback<std::string> &callback) {
rpc::GetInternalConfigRequest request;
client_impl_->GetGcsRpcClient().GetInternalConfig(
request, [callback](const Status &status, rpc::GetInternalConfigReply &&reply) {
if (status.ok()) {
RAY_LOG(DEBUG) << "Fetched internal config: " << reply.config();
} else {
RAY_LOG(ERROR) << "Failed to get internal config: " << status.message();
}
callback(status, reply.config());
});
return Status::OK();
}

NodeResourceInfoAccessor::NodeResourceInfoAccessor(GcsClient *client_impl)
: client_impl_(client_impl) {}

Expand Down Expand Up @@ -1414,6 +1399,21 @@ Status InternalKVAccessor::Exists(const std::string &ns,
return ret_promise.get_future().get();
}

Status InternalKVAccessor::AsyncGetInternalConfig(
const OptionalItemCallback<std::string> &callback) {
rpc::GetInternalConfigRequest request;
client_impl_->GetGcsRpcClient().GetInternalConfig(
request, [callback](const Status &status, rpc::GetInternalConfigReply &&reply) {
if (status.ok()) {
RAY_LOG(DEBUG) << "Fetched internal config: " << reply.config();
} else {
RAY_LOG(ERROR) << "Failed to get internal config: " << status.message();
}
callback(status, reply.config());
});
return Status::OK();
}

RuntimeEnvAccessor::RuntimeEnvAccessor(GcsClient *client_impl)
: client_impl_(client_impl) {}

Expand Down
14 changes: 7 additions & 7 deletions src/ray/gcs/gcs_client/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -456,13 +456,6 @@ class NodeInfoAccessor {
/// server.
virtual void AsyncResubscribe();

/// Get the internal config string from GCS.
///
/// \param callback Processes a map of config options
/// \return Status
virtual Status AsyncGetInternalConfig(
const OptionalItemCallback<std::string> &callback);

/// Add a node to accessor cache.
virtual void HandleNotification(rpc::GcsNodeInfo &&node_info);

Expand Down Expand Up @@ -939,6 +932,13 @@ class InternalKVAccessor {
const int64_t timeout_ms,
bool &exists);

/// Get the internal config string from GCS.
///
/// \param callback Processes a map of config options
/// \return Status
virtual Status AsyncGetInternalConfig(
const OptionalItemCallback<std::string> &callback);

private:
GcsClient *client_impl_;
};
Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs/gcs_client/global_state_accessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ std::string GlobalStateAccessor::GetSystemConfig() {
std::promise<std::string> promise;
{
absl::ReaderMutexLock lock(&mutex_);
RAY_CHECK_OK(gcs_client_->Nodes().AsyncGetInternalConfig(
RAY_CHECK_OK(gcs_client_->InternalKV().AsyncGetInternalConfig(
[&promise](const Status &status,
const std::optional<std::string> &stored_raylet_config) {
RAY_CHECK_OK(status);
Expand Down
8 changes: 8 additions & 0 deletions src/ray/gcs/gcs_server/gcs_kv_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,14 @@ void GcsInternalKVManager::HandleInternalKVKeys(
}
}

void GcsInternalKVManager::HandleGetInternalConfig(
rpc::GetInternalConfigRequest request,
rpc::GetInternalConfigReply *reply,
rpc::SendReplyCallback send_reply_callback) {
reply->set_config(raylet_config_list_);
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
}

Status GcsInternalKVManager::ValidateKey(const std::string &key) const {
constexpr std::string_view kNamespacePrefix = "@namespace_";
if (absl::StartsWith(key, kNamespacePrefix)) {
Expand Down
13 changes: 10 additions & 3 deletions src/ray/gcs/gcs_server/gcs_kv_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,15 @@ class InternalKVInterface {
const std::string &prefix,
std::function<void(std::vector<std::string>)> callback) = 0;

virtual ~InternalKVInterface(){};
virtual ~InternalKVInterface() = default;
};

/// This implementation class of `InternalKVHandler`.
class GcsInternalKVManager : public rpc::InternalKVHandler {
public:
explicit GcsInternalKVManager(std::unique_ptr<InternalKVInterface> kv_instance)
: kv_instance_(std::move(kv_instance)) {}
explicit GcsInternalKVManager(std::unique_ptr<InternalKVInterface> kv_instance,
const std::string &raylet_config_list)
: kv_instance_(std::move(kv_instance)), raylet_config_list_(raylet_config_list) {}

void HandleInternalKVGet(rpc::InternalKVGetRequest request,
rpc::InternalKVGetReply *reply,
Expand All @@ -127,10 +128,16 @@ class GcsInternalKVManager : public rpc::InternalKVHandler {
rpc::InternalKVKeysReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

/// Handle get internal config.
void HandleGetInternalConfig(rpc::GetInternalConfigRequest request,
rpc::GetInternalConfigReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

InternalKVInterface &GetInstance() { return *kv_instance_; }

private:
std::unique_ptr<InternalKVInterface> kv_instance_;
const std::string raylet_config_list_;
Status ValidateKey(const std::string &key) const;
};

Expand Down
20 changes: 1 addition & 19 deletions src/ray/gcs/gcs_server/gcs_node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -274,22 +274,6 @@ void GcsNodeManager::HandleGetAllNodeInfo(rpc::GetAllNodeInfoRequest request,
++counts_[CountType::GET_ALL_NODE_INFO_REQUEST];
}

void GcsNodeManager::HandleGetInternalConfig(rpc::GetInternalConfigRequest request,
rpc::GetInternalConfigReply *reply,
rpc::SendReplyCallback send_reply_callback) {
auto get_system_config = [reply, send_reply_callback](
const ray::Status &status,
const std::optional<rpc::StoredConfig> &config) {
if (config.has_value()) {
reply->set_config(config->config());
}
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
RAY_CHECK_OK(
gcs_table_storage_->InternalConfigTable().Get(UniqueID::Nil(), get_system_config));
++counts_[CountType::GET_INTERNAL_CONFIG_REQUEST];
}

absl::optional<std::shared_ptr<rpc::GcsNodeInfo>> GcsNodeManager::GetAliveNode(
const ray::NodeID &node_id) const {
auto iter = alive_nodes_.find(node_id);
Expand Down Expand Up @@ -496,9 +480,7 @@ std::string GcsNodeManager::DebugString() const {
<< counts_[CountType::REGISTER_NODE_REQUEST]
<< "\n- DrainNode request count: " << counts_[CountType::DRAIN_NODE_REQUEST]
<< "\n- GetAllNodeInfo request count: "
<< counts_[CountType::GET_ALL_NODE_INFO_REQUEST]
<< "\n- GetInternalConfig request count: "
<< counts_[CountType::GET_INTERNAL_CONFIG_REQUEST];
<< counts_[CountType::GET_ALL_NODE_INFO_REQUEST];
return stream.str();
}

Expand Down
8 changes: 1 addition & 7 deletions src/ray/gcs/gcs_server/gcs_node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,6 @@ class GcsNodeManager : public rpc::NodeInfoHandler {
rpc::GetAllNodeInfoReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

/// Handle get internal config.
void HandleGetInternalConfig(rpc::GetInternalConfigRequest request,
rpc::GetInternalConfigReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

/// Handle check alive request for GCS.
void HandleCheckAlive(rpc::CheckAliveRequest request,
rpc::CheckAliveReply *reply,
Expand Down Expand Up @@ -262,8 +257,7 @@ class GcsNodeManager : public rpc::NodeInfoHandler {
REGISTER_NODE_REQUEST = 0,
DRAIN_NODE_REQUEST = 1,
GET_ALL_NODE_INFO_REQUEST = 2,
GET_INTERNAL_CONFIG_REQUEST = 3,
CountType_MAX = 4,
CountType_MAX = 3,
};
uint64_t counts_[CountType::CountType_MAX] = {0};

Expand Down
3 changes: 2 additions & 1 deletion src/ray/gcs/gcs_server/gcs_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,8 @@ void GcsServer::InitKVManager() {
RAY_LOG(FATAL) << "Unexpected storage type! " << storage_type_;
}

kv_manager_ = std::make_unique<GcsInternalKVManager>(std::move(instance));
kv_manager_ = std::make_unique<GcsInternalKVManager>(std::move(instance),
config_.raylet_config_list);
}

void GcsServer::InitKVService() {
Expand Down
16 changes: 15 additions & 1 deletion src/ray/gcs/gcs_server/test/gcs_autoscaler_state_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class GcsAutoscalerStateManagerTest : public ::testing::Test {
GcsAutoscalerStateManagerTest() {}

protected:
static constexpr char kRayletConfig[] = R"({"raylet_config":"this is a config"})";
instrumented_io_context io_service_;
std::shared_ptr<GcsServerMocker::MockRayletClient> raylet_client_;
std::shared_ptr<rpc::NodeManagerClientPool> client_pool_;
Expand All @@ -65,7 +66,8 @@ class GcsAutoscalerStateManagerTest : public ::testing::Test {
cluster_resource_manager_ = std::make_unique<ClusterResourceManager>(io_service_);
gcs_node_manager_ = std::make_shared<MockGcsNodeManager>();
kv_manager_ = std::make_unique<GcsInternalKVManager>(
std::make_unique<StoreClientInternalKV>(std::make_unique<MockStoreClient>()));
std::make_unique<StoreClientInternalKV>(std::make_unique<MockStoreClient>()),
kRayletConfig);
function_manager_ = std::make_unique<GcsFunctionManager>(kv_manager_->GetInstance());
runtime_env_manager_ = std::make_unique<RuntimeEnvManager>(
[](const std::string &, std::function<void(bool)>) {});
Expand Down Expand Up @@ -831,6 +833,18 @@ TEST_F(GcsAutoscalerStateManagerTest, TestIdleTime) {
}
}

TEST_F(GcsAutoscalerStateManagerTest, TestGcsKvManagerInternalConfig) {
// This is really a test for GcsKvManager. However gcs_kv_manager_test.cc is a larger
// misnomer - it does not test that class at all; it only tests StoreClientInternalKV.
// We temporarily put this test here.
rpc::GetInternalConfigRequest request;
rpc::GetInternalConfigReply reply;
auto send_reply_callback =
[](ray::Status status, std::function<void()> f1, std::function<void()> f2) {};
kv_manager_->HandleGetInternalConfig(request, &reply, send_reply_callback);
EXPECT_EQ(reply.config(), kRayletConfig);
}

} // namespace gcs
} // namespace ray

Expand Down
3 changes: 1 addition & 2 deletions src/ray/protobuf/gcs_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,7 @@ service InternalKVGcsService {
rpc InternalKVDel(InternalKVDelRequest) returns (InternalKVDelReply);
rpc InternalKVExists(InternalKVExistsRequest) returns (InternalKVExistsReply);
rpc InternalKVKeys(InternalKVKeysRequest) returns (InternalKVKeysReply);
rpc GetInternalConfig(GetInternalConfigRequest) returns (GetInternalConfigReply);
}

message PinRuntimeEnvURIRequest {
Expand Down Expand Up @@ -750,8 +751,6 @@ service NodeInfoGcsService {
rpc DrainNode(DrainNodeRequest) returns (DrainNodeReply);
// Get information of all nodes from GCS Service.
rpc GetAllNodeInfo(GetAllNodeInfoRequest) returns (GetAllNodeInfoReply);
// Get cluster internal config.
rpc GetInternalConfig(GetInternalConfigRequest) returns (GetInternalConfigReply);
// Check alive.
rpc CheckAlive(CheckAliveRequest) returns (CheckAliveReply);
}
Expand Down
2 changes: 1 addition & 1 deletion src/ray/raylet/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ int main(int argc, char *argv[]) {
"shutdown_raylet_gracefully_internal");
};

RAY_CHECK_OK(gcs_client->Nodes().AsyncGetInternalConfig(
RAY_CHECK_OK(gcs_client->InternalKV().AsyncGetInternalConfig(
[&](::ray::Status status, const std::optional<std::string> &stored_raylet_config) {
RAY_CHECK_OK(status);
RAY_CHECK(stored_raylet_config.has_value());
Expand Down
12 changes: 6 additions & 6 deletions src/ray/rpc/gcs_server/gcs_rpc_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -403,12 +403,6 @@ class GcsRpcClient {
node_info_grpc_client_,
/*method_timeout_ms*/ -1, )

/// Get internal config of the node from the GCS Service.
VOID_GCS_RPC_CLIENT_METHOD(NodeInfoGcsService,
GetInternalConfig,
node_info_grpc_client_,
/*method_timeout_ms*/ -1, )

/// Get available resources of all nodes from the GCS Service.
VOID_GCS_RPC_CLIENT_METHOD(NodeResourceInfoGcsService,
GetAllAvailableResources,
Expand Down Expand Up @@ -541,6 +535,12 @@ class GcsRpcClient {
internal_kv_grpc_client_,
/*method_timeout_ms*/ -1, )

/// Get internal config of the node from the GCS Service.
VOID_GCS_RPC_CLIENT_METHOD(InternalKVGcsService,
GetInternalConfig,
internal_kv_grpc_client_,
/*method_timeout_ms*/ -1, )

/// Operations for pubsub
VOID_GCS_RPC_CLIENT_METHOD(InternalPubSubGcsService,
GcsPublish,
Expand Down
10 changes: 5 additions & 5 deletions src/ray/rpc/gcs_server/gcs_rpc_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -326,10 +326,6 @@ class NodeInfoGcsServiceHandler {
virtual void HandleGetAllNodeInfo(GetAllNodeInfoRequest request,
GetAllNodeInfoReply *reply,
SendReplyCallback send_reply_callback) = 0;

virtual void HandleGetInternalConfig(GetInternalConfigRequest request,
GetInternalConfigReply *reply,
SendReplyCallback send_reply_callback) = 0;
};

/// The `GrpcService` for `NodeInfoGcsService`.
Expand Down Expand Up @@ -360,7 +356,6 @@ class NodeInfoGrpcService : public GrpcService {
NODE_INFO_SERVICE_RPC_HANDLER(UnregisterNode);
NODE_INFO_SERVICE_RPC_HANDLER(DrainNode);
NODE_INFO_SERVICE_RPC_HANDLER(GetAllNodeInfo);
NODE_INFO_SERVICE_RPC_HANDLER(GetInternalConfig);
NODE_INFO_SERVICE_RPC_HANDLER(CheckAlive);
}

Expand Down Expand Up @@ -573,6 +568,10 @@ class InternalKVGcsServiceHandler {
virtual void HandleInternalKVExists(InternalKVExistsRequest request,
InternalKVExistsReply *reply,
SendReplyCallback send_reply_callback) = 0;

virtual void HandleGetInternalConfig(GetInternalConfigRequest request,
GetInternalConfigReply *reply,
SendReplyCallback send_reply_callback) = 0;
};

class InternalKVGrpcService : public GrpcService {
Expand All @@ -593,6 +592,7 @@ class InternalKVGrpcService : public GrpcService {
INTERNAL_KV_SERVICE_RPC_HANDLER(InternalKVDel);
INTERNAL_KV_SERVICE_RPC_HANDLER(InternalKVExists);
INTERNAL_KV_SERVICE_RPC_HANDLER(InternalKVKeys);
INTERNAL_KV_SERVICE_RPC_HANDLER(GetInternalConfig);
}

private:
Expand Down

0 comments on commit 3285452

Please sign in to comment.