Skip to content

Commit

Permalink
[core] [2/N] Skip GCS health check if possible (#49230)
Browse files Browse the repository at this point in the history
Signed-off-by: hjiang <dentinyhao@gmail.com>
  • Loading branch information
dentiny authored Dec 20, 2024
1 parent 3396c12 commit 47ae84e
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 10 deletions.
42 changes: 37 additions & 5 deletions src/ray/gcs/gcs_server/gcs_health_check_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ GcsHealthCheckManager::~GcsHealthCheckManager() = default;
void GcsHealthCheckManager::RemoveNode(const NodeID &node_id) {
io_service_.dispatch(
[this, node_id]() {
thread_checker_.IsOnSameThread();
RAY_CHECK(thread_checker_.IsOnSameThread());
auto iter = health_check_contexts_.find(node_id);
if (iter == health_check_contexts_.end()) {
return;
Expand All @@ -64,7 +64,7 @@ void GcsHealthCheckManager::RemoveNode(const NodeID &node_id) {

void GcsHealthCheckManager::FailNode(const NodeID &node_id) {
RAY_LOG(WARNING).WithField(node_id) << "Node is dead because the health check failed.";
thread_checker_.IsOnSameThread();
RAY_CHECK(thread_checker_.IsOnSameThread());
auto iter = health_check_contexts_.find(node_id);
if (iter != health_check_contexts_.end()) {
on_node_death_callback_(node_id);
Expand All @@ -73,7 +73,7 @@ void GcsHealthCheckManager::FailNode(const NodeID &node_id) {
}

std::vector<NodeID> GcsHealthCheckManager::GetAllNodes() const {
thread_checker_.IsOnSameThread();
RAY_CHECK(thread_checker_.IsOnSameThread());
std::vector<NodeID> nodes;
nodes.reserve(health_check_contexts_.size());
for (const auto &[node_id, _] : health_check_contexts_) {
Expand All @@ -82,21 +82,53 @@ std::vector<NodeID> GcsHealthCheckManager::GetAllNodes() const {
return nodes;
}

void GcsHealthCheckManager::MarkNodeHealthy(const NodeID &node_id) {
io_service_.dispatch(
[this, node_id]() {
RAY_CHECK(thread_checker_.IsOnSameThread());

auto iter = health_check_contexts_.find(node_id);

// A small chance other components (i.e. ray syncer) are initialized before health
// manager.
if (iter == health_check_contexts_.end()) {
return;
}

auto *ctx = iter->second;
ctx->SetLatestHealthTimestamp(absl::Now());
},
"GcsHealthCheckManager::MarkNodeHealthy");
}

void GcsHealthCheckManager::HealthCheckContext::StartHealthCheck() {
using ::grpc::health::v1::HealthCheckResponse;

RAY_CHECK(manager_->thread_checker_.IsOnSameThread());

// If current context is requested to stop, directly destruct itself and exit.
if (stopped_) {
delete this;
return;
}

// Check latest health status, see whether a new rpc message is needed.
const auto now = absl::Now();
absl::Time next_check_time =
lastest_known_healthy_timestamp_ + absl::Milliseconds(manager_->period_ms_);
if (now <= next_check_time) {
// Update message is fresh enough, skip current check and schedule later.
int64_t next_schedule_millisec = (next_check_time - now) / absl::Milliseconds(1);
timer_.expires_from_now(boost::posix_time::milliseconds(next_schedule_millisec));
timer_.async_wait([this](auto) { StartHealthCheck(); });
return;
}

// Reset the context/request/response for the next request.
context_.~ClientContext();
new (&context_) grpc::ClientContext();
response_.Clear();

const auto now = absl::Now();
const auto deadline = now + absl::Milliseconds(manager_->timeout_ms_);
context_.set_deadline(absl::ToChronoTime(deadline));
stub_->async()->Check(
Expand Down Expand Up @@ -150,7 +182,7 @@ void GcsHealthCheckManager::AddNode(const NodeID &node_id,
std::shared_ptr<grpc::Channel> channel) {
io_service_.dispatch(
[this, channel = std::move(channel), node_id]() {
thread_checker_.IsOnSameThread();
RAY_CHECK(thread_checker_.IsOnSameThread());
auto context = new HealthCheckContext(this, channel, node_id);
auto [_, is_new] = health_check_contexts_.emplace(node_id, context);
RAY_CHECK(is_new);
Expand Down
13 changes: 13 additions & 0 deletions src/ray/gcs/gcs_server/gcs_health_check_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ class GcsHealthCheckManager {
/// \return A list of node id which are being monitored by this class.
std::vector<NodeID> GetAllNodes() const;

/// Mark the given node as healthy, so health check manager could save some checking
/// rpcs. Safe to call from non-io-context threads.
///
/// \param node_id The id of the node.
void MarkNodeHealthy(const NodeID &node_id);

private:
/// Fail a node when health check failed. It'll stop the health checking and
/// call `on_node_death_callback_`.
Expand Down Expand Up @@ -111,13 +117,20 @@ class GcsHealthCheckManager {

void Stop();

void SetLatestHealthTimestamp(absl::Time ts) {
lastest_known_healthy_timestamp_ = ts;
}

private:
void StartHealthCheck();

GcsHealthCheckManager *manager_;

NodeID node_id_;

// Timestamp for latest known status when node is healthy.
absl::Time lastest_known_healthy_timestamp_ = absl::InfinitePast();

// Whether the health check has stopped.
bool stopped_ = false;

Expand Down
12 changes: 8 additions & 4 deletions src/ray/gcs/gcs_server/gcs_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -173,12 +173,12 @@ void GcsServer::DoStart(const GcsInitData &gcs_init_data) {
// Init gcs resource manager.
InitGcsResourceManager(gcs_init_data);

// Init synchronization service
InitRaySyncer(gcs_init_data);

// Init gcs health check manager.
InitGcsHealthCheckManager(gcs_init_data);

// Init synchronization service
InitRaySyncer(gcs_init_data);

// Init KV service.
InitKVService();

Expand Down Expand Up @@ -522,7 +522,11 @@ GcsServer::StorageType GcsServer::GetStorageType() const {

void GcsServer::InitRaySyncer(const GcsInitData &gcs_init_data) {
ray_syncer_ = std::make_unique<syncer::RaySyncer>(
io_context_provider_.GetIOContext<syncer::RaySyncer>(), kGCSNodeID.Binary());
io_context_provider_.GetIOContext<syncer::RaySyncer>(),
kGCSNodeID.Binary(),
[this](const NodeID &node_id) {
gcs_healthcheck_manager_->MarkNodeHealthy(node_id);
});
ray_syncer_->Register(
syncer::MessageType::RESOURCE_VIEW, nullptr, gcs_resource_manager_.get());
ray_syncer_->Register(
Expand Down
13 changes: 13 additions & 0 deletions src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,19 @@ TEST_F(GcsHealthCheckManagerTest, TestBasic) {
ASSERT_TRUE(dead_nodes.count(node_id));
}

TEST_F(GcsHealthCheckManagerTest, MarkHealthAndSkipCheck) {
auto node_id = AddServer();
Run(0); // Initial run
ASSERT_TRUE(dead_nodes.empty());

// Run the first health check: even we mark node down, health check is skipped due to
// fresh enough information.
StopServing(node_id);
health_check->MarkNodeHealthy(node_id);
Run(0);
ASSERT_TRUE(dead_nodes.empty());
}

TEST_F(GcsHealthCheckManagerTest, StoppedAndResume) {
auto node_id = AddServer();
Run(0); // Initial run
Expand Down
2 changes: 1 addition & 1 deletion src/ray/util/thread_checker.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class ThreadChecker {
public:
// Return true at initialization, or current invocation happens on the same thread as
// initialization.
bool IsOnSameThread() const;
[[nodiscard]] bool IsOnSameThread() const;

private:
mutable std::atomic<std::thread::id> thread_id_{};
Expand Down

0 comments on commit 47ae84e

Please sign in to comment.