Skip to content

Commit

Permalink
comments
Browse files Browse the repository at this point in the history
Signed-off-by: vitsai <victoria@anyscale.com>
  • Loading branch information
vitsai committed Jun 29, 2023
1 parent 5cf182b commit bff887d
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 20 deletions.
4 changes: 2 additions & 2 deletions src/ray/gcs/gcs_server/gcs_node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,8 @@ class GcsNodeManager : public rpc::NodeInfoHandler {
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage_;
/// Raylet client pool.
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool_;
/// Cluster ID.
const ClusterID &cluster_id_;
/// Cluster ID to be shared with clients when connecting.
const ClusterID cluster_id_;

// Debug info.
enum CountType {
Expand Down
10 changes: 5 additions & 5 deletions src/ray/gcs/gcs_server/gcs_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -140,18 +140,18 @@ RedisClientOptions GcsServer::GetRedisClientOptions() const {
void GcsServer::Start() {
// Load gcs tables data asynchronously.
auto gcs_init_data = std::make_shared<GcsInitData>(gcs_table_storage_);
// Init KV Manager. This needs to be initialized first here so that
// it can be used to retrieve the cluster ID.
InitKVManager();
gcs_init_data->AsyncLoad([this, gcs_init_data] {
// Init KV Manager. This needs to be initialized first here so that
// it can be used to retrieve the cluster ID.
InitKVManager();
RetrieveAndCacheClusterId([this, gcs_init_data](ClusterID cluster_id) {
GetOrGenerateClusterId([this, gcs_init_data](ClusterID cluster_id) {
rpc_server_.SetClusterId(cluster_id);
DoStart(*gcs_init_data);
});
});
}

void GcsServer::RetrieveAndCacheClusterId(
void GcsServer::GetOrGenerateClusterId(
std::function<void(ClusterID cluster_id)> &&continuation) {
static std::string const kTokenNamespace = "cluster";
kv_manager_->GetInstance().Get(
Expand Down
7 changes: 2 additions & 5 deletions src/ray/gcs/gcs_server/gcs_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@

#pragma once

#include <atomic>

#include "ray/common/asio/instrumented_io_context.h"
#include "ray/common/ray_syncer/ray_syncer.h"
#include "ray/common/runtime_env_manager.h"
Expand Down Expand Up @@ -187,11 +185,10 @@ class GcsServer {
/// Collect stats from each module.
void RecordMetrics() const;

/// Get server token if persisted, otherwise generate
/// Get cluster id if persisted, otherwise generate
/// a new one and persist as necessary.
/// Expected to be idempotent while server is up.
void RetrieveAndCacheClusterId(
std::function<void(ClusterID cluster_id)> &&continuation);
void GetOrGenerateClusterId(std::function<void(ClusterID cluster_id)> &&continuation);

/// Print the asio event loop stats for debugging.
void PrintAsioStats();
Expand Down
4 changes: 2 additions & 2 deletions src/ray/rpc/grpc_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,10 @@ void GrpcServer::RegisterService(GrpcService &service, bool token_auth) {
services_.emplace_back(service.GetGrpcService());

for (int i = 0; i < num_threads_; i++) {
if (token_auth && cluster_id_.load().IsNil()) {
if (token_auth && cluster_id_.IsNil()) {
RAY_LOG(FATAL) << "Expected cluster ID for token auth!";
}
service.InitServerCallFactories(cqs_[i], &server_call_factories_, cluster_id_.load());
service.InitServerCallFactories(cqs_[i], &server_call_factories_, cluster_id_);
}
}

Expand Down
12 changes: 6 additions & 6 deletions src/ray/rpc/grpc_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,17 +118,17 @@ class GrpcServer {
grpc::Server &GetServer() { return *server_; }

const ClusterID GetClusterId() {
RAY_CHECK(!cluster_id_.load().IsNil()) << "Cannot fetch cluster ID before it is set.";
return cluster_id_.load();
RAY_CHECK(!cluster_id_.IsNil()) << "Cannot fetch cluster ID before it is set.";
return cluster_id_;
}

void SetClusterId(const ClusterID &cluster_id) {
RAY_CHECK(!cluster_id.IsNil()) << "Cannot set cluster ID back to Nil!";
auto old_id = cluster_id_.exchange(cluster_id);
if (!old_id.IsNil() && old_id != cluster_id) {
if (!cluster_id_.IsNil() && cluster_id_ != cluster_id) {
RAY_LOG(FATAL) << "Resetting non-nil cluster ID! Setting to " << cluster_id
<< ", but old value is " << old_id;
<< ", but old value is " << cluster_id_;
}
cluster_id_ = cluster_id;
}

protected:
Expand All @@ -148,7 +148,7 @@ class GrpcServer {
/// interfaces (0.0.0.0)
const bool listen_to_localhost_only_;
/// Token representing ID of this cluster.
SafeClusterID cluster_id_;
ClusterID cluster_id_;
/// Indicates whether this server has been closed.
bool is_closed_;
/// The `grpc::Service` objects which should be registered to `ServerBuilder`.
Expand Down

0 comments on commit bff887d

Please sign in to comment.