Skip to content

Commit

Permalink
[core] C++ update on gcs health check (#48843)
Browse files Browse the repository at this point in the history
As titled.


Signed-off-by: hjiang <hjiang@anyscale.com>
  • Loading branch information
dentiny authored Nov 22, 2024
1 parent 335bd66 commit 8d35885
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 29 deletions.
50 changes: 32 additions & 18 deletions src/ray/gcs/gcs_server/gcs_health_check_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,17 @@

#include "ray/gcs/gcs_server/gcs_health_check_manager.h"

#include <string_view>

#include "ray/stats/metric.h"

DEFINE_stats(health_check_rpc_latency_ms,
"Latency of rpc request for health check.",
(),
({1, 10, 100, 1000, 10000}, ),
ray::stats::HISTOGRAM);

namespace ray {
namespace gcs {
namespace ray::gcs {

GcsHealthCheckManager::GcsHealthCheckManager(
instrumented_io_context &io_service,
Expand All @@ -38,17 +40,18 @@ GcsHealthCheckManager::GcsHealthCheckManager(
period_ms_(period_ms),
failure_threshold_(failure_threshold) {
RAY_CHECK(on_node_death_callback != nullptr);
RAY_CHECK(initial_delay_ms >= 0);
RAY_CHECK(timeout_ms >= 0);
RAY_CHECK(period_ms >= 0);
RAY_CHECK(failure_threshold >= 0);
RAY_CHECK_GE(initial_delay_ms, 0);
RAY_CHECK_GE(timeout_ms, 0);
RAY_CHECK_GE(period_ms, 0);
RAY_CHECK_GE(failure_threshold, 0);
}

GcsHealthCheckManager::~GcsHealthCheckManager() {}
GcsHealthCheckManager::~GcsHealthCheckManager() = default;

void GcsHealthCheckManager::RemoveNode(const NodeID &node_id) {
io_service_.dispatch(
[this, node_id]() {
thread_checker_.IsOnSameThread();
auto iter = health_check_contexts_.find(node_id);
if (iter == health_check_contexts_.end()) {
return;
Expand All @@ -61,6 +64,7 @@ void GcsHealthCheckManager::RemoveNode(const NodeID &node_id) {

void GcsHealthCheckManager::FailNode(const NodeID &node_id) {
RAY_LOG(WARNING).WithField(node_id) << "Node is dead because the health check failed.";
thread_checker_.IsOnSameThread();
auto iter = health_check_contexts_.find(node_id);
if (iter != health_check_contexts_.end()) {
on_node_death_callback_(node_id);
Expand All @@ -69,7 +73,9 @@ void GcsHealthCheckManager::FailNode(const NodeID &node_id) {
}

std::vector<NodeID> GcsHealthCheckManager::GetAllNodes() const {
thread_checker_.IsOnSameThread();
std::vector<NodeID> nodes;
nodes.reserve(health_check_contexts_.size());
for (const auto &[node_id, _] : health_check_contexts_) {
nodes.emplace_back(node_id);
}
Expand All @@ -84,24 +90,29 @@ void GcsHealthCheckManager::HealthCheckContext::StartHealthCheck() {
new (&context_) grpc::ClientContext();
response_.Clear();

auto deadline =
std::chrono::system_clock::now() + std::chrono::milliseconds(manager_->timeout_ms_);
const auto now = std::chrono::system_clock::now();
auto deadline = now + std::chrono::milliseconds(manager_->timeout_ms_);
context_.set_deadline(deadline);
stub_->async()->Check(
&context_, &request_, &response_, [this, now = absl::Now()](::grpc::Status status) {
&context_,
&request_,
&response_,
[this, start = absl::FromChrono(now)](::grpc::Status status) {
// This callback is done in gRPC's thread pool.
STATS_health_check_rpc_latency_ms.Record(
absl::ToInt64Milliseconds(absl::Now() - now));
absl::ToInt64Milliseconds(absl::Now() - start));
manager_->io_service_.post(
[this, status]() {
if (stopped_) {
delete this;
return;
}
RAY_LOG(DEBUG) << "Health check status: " << int(response_.status());
RAY_LOG(DEBUG) << "Health check status: "
<< HealthCheckResponse_ServingStatus_Name(
response_.status());

if (status.ok() && response_.status() == HealthCheckResponse::SERVING) {
// Health check passed
// Health check passed.
health_check_remaining_ = manager_->failure_threshold_;
} else {
--health_check_remaining_;
Expand All @@ -118,6 +129,9 @@ void GcsHealthCheckManager::HealthCheckContext::StartHealthCheck() {
delete this;
} else {
// Do another health check.
//
// TODO(hjiang): Able to reduce a few health check based on know resource
// usage communication between GCS and raylet.
timer_.expires_from_now(
boost::posix_time::milliseconds(manager_->period_ms_));
timer_.async_wait([this](auto) { StartHealthCheck(); });
Expand All @@ -132,13 +146,13 @@ void GcsHealthCheckManager::HealthCheckContext::Stop() { stopped_ = true; }
void GcsHealthCheckManager::AddNode(const NodeID &node_id,
std::shared_ptr<grpc::Channel> channel) {
io_service_.dispatch(
[this, channel, node_id]() {
RAY_CHECK(health_check_contexts_.count(node_id) == 0);
[this, channel = std::move(channel), node_id]() {
thread_checker_.IsOnSameThread();
auto context = new HealthCheckContext(this, channel, node_id);
health_check_contexts_.emplace(std::make_pair(node_id, context));
auto [_, is_new] = health_check_contexts_.emplace(node_id, context);
RAY_CHECK(is_new);
},
"GcsHealthCheckManager::AddNode");
}

} // namespace gcs
} // namespace ray
} // namespace ray::gcs
28 changes: 20 additions & 8 deletions src/ray/gcs/gcs_server/gcs_health_check_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,19 @@

#include <grpcpp/grpcpp.h>

#include <cstdint>
#include <functional>
#include <memory>
#include <vector>

#include "absl/container/flat_hash_map.h"
#include "ray/common/asio/instrumented_io_context.h"
#include "ray/common/id.h"
#include "ray/common/ray_config.h"
#include "ray/util/thread_checker.h"
#include "src/proto/grpc/health/v1/health.grpc.pb.h"

class GcsHealthCheckManagerTest;

namespace ray {
namespace gcs {
namespace ray::gcs {

/// GcsHealthCheckManager is used to track the healthiness of the nodes in the ray
/// cluster. The health check is done in pull based way, which means this module will send
Expand All @@ -35,6 +38,9 @@ namespace gcs {
/// node will be removed from GcsHealthCheckManager. The node can be added into this class
/// later. Although the same node id is not supposed to be reused in ray cluster, this is
/// not enforced in this class.
///
/// All IO operations happens on the same thread, which is managed by the pass-ed in
/// [io_service].
/// TODO (iycheng): Move the GcsHealthCheckManager to ray/common.
class GcsHealthCheckManager {
public:
Expand All @@ -58,24 +64,27 @@ class GcsHealthCheckManager {
~GcsHealthCheckManager();

/// Start to track the healthiness of a node.
/// Safe to call from non-io-context threads.
///
/// \param node_id The id of the node.
/// \param channel The gRPC channel to the node.
void AddNode(const NodeID &node_id, std::shared_ptr<grpc::Channel> channel);

/// Stop tracking the healthiness of a node.
/// Safe to call from non-io-context threads.
///
/// \param node_id The id of the node to stop tracking.
void RemoveNode(const NodeID &node_id);

/// Return all the nodes monitored.
/// Return all the nodes monitored and alive.
/// Notice: have to invoke from io-context thread.
///
/// \return A list of node id which are being monitored by this class.
std::vector<NodeID> GetAllNodes() const;

private:
/// Fail a node when health check failed. It'll stop the health checking and
/// call on_node_death_callback.
/// call `on_node_death_callback_`.
///
/// \param node_id The id of the node.
void FailNode(const NodeID &node_id);
Expand Down Expand Up @@ -133,8 +142,12 @@ class GcsHealthCheckManager {
std::function<void(const NodeID &)> on_node_death_callback_;

/// The context of the health check for each nodes.
/// Only living nodes are bookkept, while failed one will be removed.
absl::flat_hash_map<NodeID, HealthCheckContext *> health_check_contexts_;

/// Checker to make sure there's no concurrent access for node addition and removal.
const ThreadChecker thread_checker_;

/// The delay for the first health check request.
const int64_t initial_delay_ms_;
/// Timeout for each health check request.
Expand All @@ -145,5 +158,4 @@ class GcsHealthCheckManager {
const int64_t failure_threshold_;
};

} // namespace gcs
} // namespace ray
} // namespace ray::gcs
2 changes: 1 addition & 1 deletion src/ray/util/thread_checker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

namespace ray {

bool ThreadChecker::IsOnSameThread() {
bool ThreadChecker::IsOnSameThread() const {
const auto cur_id = std::this_thread::get_id();
std::thread::id uninitialized_id;
return thread_id_.compare_exchange_strong(uninitialized_id, cur_id) ||
Expand Down
4 changes: 2 additions & 2 deletions src/ray/util/thread_checker.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ class ThreadChecker {
public:
// Return true at initialization, or current invocation happens on the same thread as
// initialization.
bool IsOnSameThread();
bool IsOnSameThread() const;

private:
std::atomic<std::thread::id> thread_id_{};
mutable std::atomic<std::thread::id> thread_id_{};
};

} // namespace ray

0 comments on commit 8d35885

Please sign in to comment.