Skip to content

Commit

Permalink
[core] Add ClusterID to ClientCallManager [2/n] (ray-project#36526)
Browse files Browse the repository at this point in the history
This change adds a Cluster ID to GRPC client (client call, client call manager), and attaches it to the metadata of each ClientCall provided it is non-nil.

Previous PR (GRPC server): ray-project#36517
Next PR (GCS server): ray-project#36535

Part 2 of breaking down ray-project#35014 into more digestible parts.

Related issue number
ray-project#34763

Signed-off-by: e428265 <arvind.chandramouli@lmco.com>
  • Loading branch information
vitsai authored and arvind-chandra committed Aug 31, 2023
1 parent 6616443 commit 2af1e4c
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 21 deletions.
2 changes: 2 additions & 0 deletions src/ray/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ constexpr int kRayletStoreErrorExitCode = 100;
/// Prefix for the object table keys in redis.
constexpr char kObjectTablePrefix[] = "ObjectTable";

constexpr char kClusterIdKey[] = "ray_cluster_id";

constexpr char kWorkerDynamicOptionPlaceholder[] =
"RAY_WORKER_DYNAMIC_OPTION_PLACEHOLDER";

Expand Down
21 changes: 21 additions & 0 deletions src/ray/common/id.h
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,27 @@ std::ostream &operator<<(std::ostream &os, const PlacementGroupID &id);
// Restore the compiler alignment to default (8 bytes).
#pragma pack(pop)

struct SafeClusterID {
private:
mutable absl::Mutex m_;
ClusterID id_ GUARDED_BY(m_);

public:
SafeClusterID(const ClusterID &id) : id_(id) {}

const ClusterID load() const {
absl::MutexLock l(&m_);
return id_;
}

ClusterID exchange(const ClusterID &newId) {
absl::MutexLock l(&m_);
ClusterID old = id_;
id_ = newId;
return old;
}
};

template <typename T>
BaseID<T>::BaseID() {
// Using const_cast to directly change data is dangerous. The cached
Expand Down
1 change: 1 addition & 0 deletions src/ray/gcs/gcs_server/gcs_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ GcsServer::GcsServer(const ray::gcs::GcsServerConfig &config,
config.grpc_server_thread_num,
/*keepalive_time_ms=*/RayConfig::instance().grpc_keepalive_time_ms()),
client_call_manager_(main_service,
ClusterID::Nil(),
RayConfig::instance().gcs_server_rpc_client_thread_num()),
raylet_client_pool_(
std::make_shared<rpc::NodeManagerClientPool>(client_call_manager_)),
Expand Down
3 changes: 2 additions & 1 deletion src/ray/object_manager/object_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ ObjectManager::ObjectManager(
config_.object_manager_address == "127.0.0.1",
config_.rpc_service_threads_number),
object_manager_service_(rpc_service_, *this),
client_call_manager_(main_service, config_.rpc_service_threads_number),
client_call_manager_(
main_service, ClusterID::Nil(), config_.rpc_service_threads_number),
restore_spilled_object_(restore_spilled_object),
get_spilled_object_url_(get_spilled_object_url),
pull_retry_timer_(*main_service_,
Expand Down
25 changes: 23 additions & 2 deletions src/ray/rpc/client_call.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@

#include <grpcpp/grpcpp.h>

#include <atomic>
#include <boost/asio.hpp>
#include <chrono>

#include "absl/synchronization/mutex.h"
#include "ray/common/asio/instrumented_io_context.h"
#include "ray/common/grpc_util.h"
#include "ray/common/id.h"
#include "ray/common/status.h"
#include "ray/util/util.h"

Expand Down Expand Up @@ -67,6 +69,7 @@ class ClientCallImpl : public ClientCall {
///
/// \param[in] callback The callback function to handle the reply.
explicit ClientCallImpl(const ClientCallback<Reply> &callback,
const ClusterID &cluster_id,
std::shared_ptr<StatsHandle> stats_handle,
int64_t timeout_ms = -1)
: callback_(std::move(const_cast<ClientCallback<Reply> &>(callback))),
Expand All @@ -76,6 +79,9 @@ class ClientCallImpl : public ClientCall {
std::chrono::system_clock::now() + std::chrono::milliseconds(timeout_ms);
context_.set_deadline(deadline);
}
if (!cluster_id.IsNil()) {
context_.AddMetadata(kClusterIdKey, cluster_id.Hex());
}
}

Status GetStatus() override {
Expand Down Expand Up @@ -185,9 +191,11 @@ class ClientCallManager {
/// \param[in] main_service The main event loop, to which the callback functions will be
/// posted.
explicit ClientCallManager(instrumented_io_context &main_service,
const ClusterID &cluster_id = ClusterID::Nil(),
int num_threads = 1,
int64_t call_timeout_ms = -1)
: main_service_(main_service),
: cluster_id_(ClusterID::Nil()),
main_service_(main_service),
num_threads_(num_threads),
shutdown_(false),
call_timeout_ms_(call_timeout_ms) {
Expand Down Expand Up @@ -239,8 +247,9 @@ class ClientCallManager {
if (method_timeout_ms == -1) {
method_timeout_ms = call_timeout_ms_;
}

auto call = std::make_shared<ClientCallImpl<Reply>>(
callback, std::move(stats_handle), method_timeout_ms);
callback, cluster_id_.load(), std::move(stats_handle), method_timeout_ms);
// Send request.
// Find the next completion queue to wait for response.
call->response_reader_ = (stub.*prepare_async_function)(
Expand All @@ -258,6 +267,14 @@ class ClientCallManager {
return call;
}

void SetClusterId(const ClusterID &cluster_id) {
auto old_id = cluster_id_.exchange(ClusterID::Nil());
if (!old_id.IsNil() && (old_id != cluster_id)) {
RAY_LOG(FATAL) << "Expected cluster ID to be Nil or " << cluster_id << ", but got"
<< old_id;
}
}

/// Get the main service of this rpc.
instrumented_io_context &GetMainService() { return main_service_; }

Expand Down Expand Up @@ -309,6 +326,10 @@ class ClientCallManager {
}
}

/// UUID of the cluster. Potential race between creating a ClientCall object
/// and setting the cluster ID.
SafeClusterID cluster_id_;

/// The main event loop, to which the callback functions will be posted.
instrumented_io_context &main_service_;

Expand Down
19 changes: 2 additions & 17 deletions src/ray/rpc/grpc_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class GrpcServer {
: name_(std::move(name)),
port_(port),
listen_to_localhost_only_(listen_to_localhost_only),
cluster_id_{absl::Mutex{}, ClusterID::Nil()},
cluster_id_(ClusterID::Nil()),
is_closed_(true),
num_threads_(num_threads),
keepalive_time_ms_(keepalive_time_ms) {
Expand Down Expand Up @@ -148,22 +148,7 @@ class GrpcServer {
/// interfaces (0.0.0.0)
const bool listen_to_localhost_only_;
/// Token representing ID of this cluster.
struct SafeClusterID {
absl::Mutex m_;
ClusterID id GUARDED_BY(m_);

const ClusterID load() {
absl::MutexLock l(&m_);
return id;
}

ClusterID exchange(const ClusterID &newId) {
absl::MutexLock l(&m_);
ClusterID old = id;
id = newId;
return old;
}
} cluster_id_;
SafeClusterID cluster_id_;
/// Indicates whether this server has been closed.
bool is_closed_;
/// The `grpc::Service` objects which should be registered to `ServerBuilder`.
Expand Down
5 changes: 4 additions & 1 deletion src/ray/rpc/test/grpc_server_client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ TEST_F(TestGrpcServerClientFixture, TestClientCallManagerTimeout) {
grpc_client_.reset();
client_call_manager_.reset();
client_call_manager_.reset(new ClientCallManager(client_io_service_,
ClusterID::Nil(),
/*num_thread=*/1,
/*call_timeout_ms=*/100));
grpc_client_.reset(new GrpcClient<TestService>(
Expand Down Expand Up @@ -244,6 +245,7 @@ TEST_F(TestGrpcServerClientFixture, TestClientDiedBeforeReply) {
grpc_client_.reset();
client_call_manager_.reset();
client_call_manager_.reset(new ClientCallManager(client_io_service_,
ClusterID::Nil(),
/*num_thread=*/1,
/*call_timeout_ms=*/100));
grpc_client_.reset(new GrpcClient<TestService>(
Expand Down Expand Up @@ -273,7 +275,8 @@ TEST_F(TestGrpcServerClientFixture, TestClientDiedBeforeReply) {
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
// Reinit client with infinite timeout.
client_call_manager_.reset(new ClientCallManager(client_io_service_));
client_call_manager_.reset(
new ClientCallManager(client_io_service_, ClusterID::FromRandom()));
grpc_client_.reset(new GrpcClient<TestService>(
"127.0.0.1", grpc_server_->GetPort(), *client_call_manager_));
// Send again, this request should be replied. If any leaking happened, this call won't
Expand Down

0 comments on commit 2af1e4c

Please sign in to comment.