From 2af1e4c4603a65279b888d7911dad9f4ac169bb2 Mon Sep 17 00:00:00 2001 From: Victoria Tsai Date: Thu, 22 Jun 2023 13:03:07 -0700 Subject: [PATCH] [core] Add ClusterID to ClientCallManager [2/n] (#36526) This change adds a Cluster ID to GRPC client (client call, client call manager), and attaches it to the metadata of each ClientCall provided it is non-nil. Previous PR (GRPC server): #36517 Next PR (GCS server): #36535 Part 2 of breaking down #35014 into more digestible parts. Related issue number #34763 Signed-off-by: e428265 --- src/ray/common/constants.h | 2 ++ src/ray/common/id.h | 21 +++++++++++++++++ src/ray/gcs/gcs_server/gcs_server.cc | 1 + src/ray/object_manager/object_manager.cc | 3 ++- src/ray/rpc/client_call.h | 25 +++++++++++++++++++-- src/ray/rpc/grpc_server.h | 19 ++-------------- src/ray/rpc/test/grpc_server_client_test.cc | 5 ++++- 7 files changed, 55 insertions(+), 21 deletions(-) diff --git a/src/ray/common/constants.h b/src/ray/common/constants.h index 3b4c8e9282bd..4667d8841d82 100644 --- a/src/ray/common/constants.h +++ b/src/ray/common/constants.h @@ -39,6 +39,8 @@ constexpr int kRayletStoreErrorExitCode = 100; /// Prefix for the object table keys in redis. constexpr char kObjectTablePrefix[] = "ObjectTable"; +constexpr char kClusterIdKey[] = "ray_cluster_id"; + constexpr char kWorkerDynamicOptionPlaceholder[] = "RAY_WORKER_DYNAMIC_OPTION_PLACEHOLDER"; diff --git a/src/ray/common/id.h b/src/ray/common/id.h index a6c753a1de35..7c5f430830ab 100644 --- a/src/ray/common/id.h +++ b/src/ray/common/id.h @@ -414,6 +414,27 @@ std::ostream &operator<<(std::ostream &os, const PlacementGroupID &id); // Restore the compiler alignment to default (8 bytes). #pragma pack(pop) +struct SafeClusterID { + private: + mutable absl::Mutex m_; + ClusterID id_ GUARDED_BY(m_); + + public: + SafeClusterID(const ClusterID &id) : id_(id) {} + + const ClusterID load() const { + absl::MutexLock l(&m_); + return id_; + } + + ClusterID exchange(const ClusterID &newId) { + absl::MutexLock l(&m_); + ClusterID old = id_; + id_ = newId; + return old; + } +}; + template BaseID::BaseID() { // Using const_cast to directly change data is dangerous. The cached diff --git a/src/ray/gcs/gcs_server/gcs_server.cc b/src/ray/gcs/gcs_server/gcs_server.cc index c731eb343654..0b4710ff04e1 100644 --- a/src/ray/gcs/gcs_server/gcs_server.cc +++ b/src/ray/gcs/gcs_server/gcs_server.cc @@ -61,6 +61,7 @@ GcsServer::GcsServer(const ray::gcs::GcsServerConfig &config, config.grpc_server_thread_num, /*keepalive_time_ms=*/RayConfig::instance().grpc_keepalive_time_ms()), client_call_manager_(main_service, + ClusterID::Nil(), RayConfig::instance().gcs_server_rpc_client_thread_num()), raylet_client_pool_( std::make_shared(client_call_manager_)), diff --git a/src/ray/object_manager/object_manager.cc b/src/ray/object_manager/object_manager.cc index 18d3e76eb297..cf054aecfe42 100644 --- a/src/ray/object_manager/object_manager.cc +++ b/src/ray/object_manager/object_manager.cc @@ -102,7 +102,8 @@ ObjectManager::ObjectManager( config_.object_manager_address == "127.0.0.1", config_.rpc_service_threads_number), object_manager_service_(rpc_service_, *this), - client_call_manager_(main_service, config_.rpc_service_threads_number), + client_call_manager_( + main_service, ClusterID::Nil(), config_.rpc_service_threads_number), restore_spilled_object_(restore_spilled_object), get_spilled_object_url_(get_spilled_object_url), pull_retry_timer_(*main_service_, diff --git a/src/ray/rpc/client_call.h b/src/ray/rpc/client_call.h index d39d4373b83a..98c1e519d3c5 100644 --- a/src/ray/rpc/client_call.h +++ b/src/ray/rpc/client_call.h @@ -16,12 +16,14 @@ #include +#include #include #include #include "absl/synchronization/mutex.h" #include "ray/common/asio/instrumented_io_context.h" #include "ray/common/grpc_util.h" +#include "ray/common/id.h" #include "ray/common/status.h" #include "ray/util/util.h" @@ -67,6 +69,7 @@ class ClientCallImpl : public ClientCall { /// /// \param[in] callback The callback function to handle the reply. explicit ClientCallImpl(const ClientCallback &callback, + const ClusterID &cluster_id, std::shared_ptr stats_handle, int64_t timeout_ms = -1) : callback_(std::move(const_cast &>(callback))), @@ -76,6 +79,9 @@ class ClientCallImpl : public ClientCall { std::chrono::system_clock::now() + std::chrono::milliseconds(timeout_ms); context_.set_deadline(deadline); } + if (!cluster_id.IsNil()) { + context_.AddMetadata(kClusterIdKey, cluster_id.Hex()); + } } Status GetStatus() override { @@ -185,9 +191,11 @@ class ClientCallManager { /// \param[in] main_service The main event loop, to which the callback functions will be /// posted. explicit ClientCallManager(instrumented_io_context &main_service, + const ClusterID &cluster_id = ClusterID::Nil(), int num_threads = 1, int64_t call_timeout_ms = -1) - : main_service_(main_service), + : cluster_id_(ClusterID::Nil()), + main_service_(main_service), num_threads_(num_threads), shutdown_(false), call_timeout_ms_(call_timeout_ms) { @@ -239,8 +247,9 @@ class ClientCallManager { if (method_timeout_ms == -1) { method_timeout_ms = call_timeout_ms_; } + auto call = std::make_shared>( - callback, std::move(stats_handle), method_timeout_ms); + callback, cluster_id_.load(), std::move(stats_handle), method_timeout_ms); // Send request. // Find the next completion queue to wait for response. call->response_reader_ = (stub.*prepare_async_function)( @@ -258,6 +267,14 @@ class ClientCallManager { return call; } + void SetClusterId(const ClusterID &cluster_id) { + auto old_id = cluster_id_.exchange(ClusterID::Nil()); + if (!old_id.IsNil() && (old_id != cluster_id)) { + RAY_LOG(FATAL) << "Expected cluster ID to be Nil or " << cluster_id << ", but got" + << old_id; + } + } + /// Get the main service of this rpc. instrumented_io_context &GetMainService() { return main_service_; } @@ -309,6 +326,10 @@ class ClientCallManager { } } + /// UUID of the cluster. Potential race between creating a ClientCall object + /// and setting the cluster ID. + SafeClusterID cluster_id_; + /// The main event loop, to which the callback functions will be posted. instrumented_io_context &main_service_; diff --git a/src/ray/rpc/grpc_server.h b/src/ray/rpc/grpc_server.h index 1044f8ab777c..7e7cfa7dbdba 100644 --- a/src/ray/rpc/grpc_server.h +++ b/src/ray/rpc/grpc_server.h @@ -87,7 +87,7 @@ class GrpcServer { : name_(std::move(name)), port_(port), listen_to_localhost_only_(listen_to_localhost_only), - cluster_id_{absl::Mutex{}, ClusterID::Nil()}, + cluster_id_(ClusterID::Nil()), is_closed_(true), num_threads_(num_threads), keepalive_time_ms_(keepalive_time_ms) { @@ -148,22 +148,7 @@ class GrpcServer { /// interfaces (0.0.0.0) const bool listen_to_localhost_only_; /// Token representing ID of this cluster. - struct SafeClusterID { - absl::Mutex m_; - ClusterID id GUARDED_BY(m_); - - const ClusterID load() { - absl::MutexLock l(&m_); - return id; - } - - ClusterID exchange(const ClusterID &newId) { - absl::MutexLock l(&m_); - ClusterID old = id; - id = newId; - return old; - } - } cluster_id_; + SafeClusterID cluster_id_; /// Indicates whether this server has been closed. bool is_closed_; /// The `grpc::Service` objects which should be registered to `ServerBuilder`. diff --git a/src/ray/rpc/test/grpc_server_client_test.cc b/src/ray/rpc/test/grpc_server_client_test.cc index e1e8e9d6c0f5..5670725437da 100644 --- a/src/ray/rpc/test/grpc_server_client_test.cc +++ b/src/ray/rpc/test/grpc_server_client_test.cc @@ -211,6 +211,7 @@ TEST_F(TestGrpcServerClientFixture, TestClientCallManagerTimeout) { grpc_client_.reset(); client_call_manager_.reset(); client_call_manager_.reset(new ClientCallManager(client_io_service_, + ClusterID::Nil(), /*num_thread=*/1, /*call_timeout_ms=*/100)); grpc_client_.reset(new GrpcClient( @@ -244,6 +245,7 @@ TEST_F(TestGrpcServerClientFixture, TestClientDiedBeforeReply) { grpc_client_.reset(); client_call_manager_.reset(); client_call_manager_.reset(new ClientCallManager(client_io_service_, + ClusterID::Nil(), /*num_thread=*/1, /*call_timeout_ms=*/100)); grpc_client_.reset(new GrpcClient( @@ -273,7 +275,8 @@ TEST_F(TestGrpcServerClientFixture, TestClientDiedBeforeReply) { std::this_thread::sleep_for(std::chrono::milliseconds(1000)); } // Reinit client with infinite timeout. - client_call_manager_.reset(new ClientCallManager(client_io_service_)); + client_call_manager_.reset( + new ClientCallManager(client_io_service_, ClusterID::FromRandom())); grpc_client_.reset(new GrpcClient( "127.0.0.1", grpc_server_->GetPort(), *client_call_manager_)); // Send again, this request should be replied. If any leaking happened, this call won't