From bff887dee540d860eff5776799bfe246a5e52a42 Mon Sep 17 00:00:00 2001 From: vitsai Date: Thu, 29 Jun 2023 13:58:27 -0700 Subject: [PATCH] comments Signed-off-by: vitsai --- src/ray/gcs/gcs_server/gcs_node_manager.h | 4 ++-- src/ray/gcs/gcs_server/gcs_server.cc | 10 +++++----- src/ray/gcs/gcs_server/gcs_server.h | 7 ++----- src/ray/rpc/grpc_server.cc | 4 ++-- src/ray/rpc/grpc_server.h | 12 ++++++------ 5 files changed, 17 insertions(+), 20 deletions(-) diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.h b/src/ray/gcs/gcs_server/gcs_node_manager.h index 888ec43c0098..21db16c5da23 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.h +++ b/src/ray/gcs/gcs_server/gcs_node_manager.h @@ -173,8 +173,8 @@ class GcsNodeManager : public rpc::NodeInfoHandler { std::shared_ptr gcs_table_storage_; /// Raylet client pool. std::shared_ptr raylet_client_pool_; - /// Cluster ID. - const ClusterID &cluster_id_; + /// Cluster ID to be shared with clients when connecting. + const ClusterID cluster_id_; // Debug info. enum CountType { diff --git a/src/ray/gcs/gcs_server/gcs_server.cc b/src/ray/gcs/gcs_server/gcs_server.cc index 4a780ba87e5d..090c5d3d3e8d 100644 --- a/src/ray/gcs/gcs_server/gcs_server.cc +++ b/src/ray/gcs/gcs_server/gcs_server.cc @@ -140,18 +140,18 @@ RedisClientOptions GcsServer::GetRedisClientOptions() const { void GcsServer::Start() { // Load gcs tables data asynchronously. auto gcs_init_data = std::make_shared(gcs_table_storage_); + // Init KV Manager. This needs to be initialized first here so that + // it can be used to retrieve the cluster ID. + InitKVManager(); gcs_init_data->AsyncLoad([this, gcs_init_data] { - // Init KV Manager. This needs to be initialized first here so that - // it can be used to retrieve the cluster ID. - InitKVManager(); - RetrieveAndCacheClusterId([this, gcs_init_data](ClusterID cluster_id) { + GetOrGenerateClusterId([this, gcs_init_data](ClusterID cluster_id) { rpc_server_.SetClusterId(cluster_id); DoStart(*gcs_init_data); }); }); } -void GcsServer::RetrieveAndCacheClusterId( +void GcsServer::GetOrGenerateClusterId( std::function &&continuation) { static std::string const kTokenNamespace = "cluster"; kv_manager_->GetInstance().Get( diff --git a/src/ray/gcs/gcs_server/gcs_server.h b/src/ray/gcs/gcs_server/gcs_server.h index 90b6e9169a2f..b80f1f906f6d 100644 --- a/src/ray/gcs/gcs_server/gcs_server.h +++ b/src/ray/gcs/gcs_server/gcs_server.h @@ -14,8 +14,6 @@ #pragma once -#include - #include "ray/common/asio/instrumented_io_context.h" #include "ray/common/ray_syncer/ray_syncer.h" #include "ray/common/runtime_env_manager.h" @@ -187,11 +185,10 @@ class GcsServer { /// Collect stats from each module. void RecordMetrics() const; - /// Get server token if persisted, otherwise generate + /// Get cluster id if persisted, otherwise generate /// a new one and persist as necessary. /// Expected to be idempotent while server is up. - void RetrieveAndCacheClusterId( - std::function &&continuation); + void GetOrGenerateClusterId(std::function &&continuation); /// Print the asio event loop stats for debugging. void PrintAsioStats(); diff --git a/src/ray/rpc/grpc_server.cc b/src/ray/rpc/grpc_server.cc index 2fd9e37772f1..0143bc39ee94 100644 --- a/src/ray/rpc/grpc_server.cc +++ b/src/ray/rpc/grpc_server.cc @@ -159,10 +159,10 @@ void GrpcServer::RegisterService(GrpcService &service, bool token_auth) { services_.emplace_back(service.GetGrpcService()); for (int i = 0; i < num_threads_; i++) { - if (token_auth && cluster_id_.load().IsNil()) { + if (token_auth && cluster_id_.IsNil()) { RAY_LOG(FATAL) << "Expected cluster ID for token auth!"; } - service.InitServerCallFactories(cqs_[i], &server_call_factories_, cluster_id_.load()); + service.InitServerCallFactories(cqs_[i], &server_call_factories_, cluster_id_); } } diff --git a/src/ray/rpc/grpc_server.h b/src/ray/rpc/grpc_server.h index 7e7cfa7dbdba..89ce79db734e 100644 --- a/src/ray/rpc/grpc_server.h +++ b/src/ray/rpc/grpc_server.h @@ -118,17 +118,17 @@ class GrpcServer { grpc::Server &GetServer() { return *server_; } const ClusterID GetClusterId() { - RAY_CHECK(!cluster_id_.load().IsNil()) << "Cannot fetch cluster ID before it is set."; - return cluster_id_.load(); + RAY_CHECK(!cluster_id_.IsNil()) << "Cannot fetch cluster ID before it is set."; + return cluster_id_; } void SetClusterId(const ClusterID &cluster_id) { RAY_CHECK(!cluster_id.IsNil()) << "Cannot set cluster ID back to Nil!"; - auto old_id = cluster_id_.exchange(cluster_id); - if (!old_id.IsNil() && old_id != cluster_id) { + if (!cluster_id_.IsNil() && cluster_id_ != cluster_id) { RAY_LOG(FATAL) << "Resetting non-nil cluster ID! Setting to " << cluster_id - << ", but old value is " << old_id; + << ", but old value is " << cluster_id_; } + cluster_id_ = cluster_id; } protected: @@ -148,7 +148,7 @@ class GrpcServer { /// interfaces (0.0.0.0) const bool listen_to_localhost_only_; /// Token representing ID of this cluster. - SafeClusterID cluster_id_; + ClusterID cluster_id_; /// Indicates whether this server has been closed. bool is_closed_; /// The `grpc::Service` objects which should be registered to `ServerBuilder`.