Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] [2/N] Skip GCS health check if possible #49230

Merged
merged 12 commits into from
Dec 20, 2024
42 changes: 37 additions & 5 deletions src/ray/gcs/gcs_server/gcs_health_check_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ GcsHealthCheckManager::~GcsHealthCheckManager() = default;
void GcsHealthCheckManager::RemoveNode(const NodeID &node_id) {
io_service_.dispatch(
[this, node_id]() {
thread_checker_.IsOnSameThread();
RAY_CHECK(thread_checker_.IsOnSameThread());
auto iter = health_check_contexts_.find(node_id);
if (iter == health_check_contexts_.end()) {
return;
Expand All @@ -64,7 +64,7 @@ void GcsHealthCheckManager::RemoveNode(const NodeID &node_id) {

void GcsHealthCheckManager::FailNode(const NodeID &node_id) {
RAY_LOG(WARNING).WithField(node_id) << "Node is dead because the health check failed.";
thread_checker_.IsOnSameThread();
RAY_CHECK(thread_checker_.IsOnSameThread());
auto iter = health_check_contexts_.find(node_id);
if (iter != health_check_contexts_.end()) {
on_node_death_callback_(node_id);
Expand All @@ -73,7 +73,7 @@ void GcsHealthCheckManager::FailNode(const NodeID &node_id) {
}

std::vector<NodeID> GcsHealthCheckManager::GetAllNodes() const {
thread_checker_.IsOnSameThread();
RAY_CHECK(thread_checker_.IsOnSameThread());
std::vector<NodeID> nodes;
nodes.reserve(health_check_contexts_.size());
for (const auto &[node_id, _] : health_check_contexts_) {
Expand All @@ -82,21 +82,53 @@ std::vector<NodeID> GcsHealthCheckManager::GetAllNodes() const {
return nodes;
}

void GcsHealthCheckManager::MarkNodeHealthy(const NodeID &node_id) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method is called from another thread (ray syncer's thread). do we need a mutex?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The call comes from io context passed down to syner:

if (on_rpc_completion_) {
on_rpc_completion_(NodeID::FromBinary(remote_node_id_));
}

Aren't health check manager and syncer shared the same io context?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I found they're actually different io contexts:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated, I posted it to io context, which is the same implementation as AddNode.

void GcsHealthCheckManager::AddNode(const NodeID &node_id,
std::shared_ptr<grpc::Channel> channel) {
io_service_.dispatch(
[this, channel = std::move(channel), node_id]() {
thread_checker_.IsOnSameThread();
auto context = new HealthCheckContext(this, channel, node_id);
auto [_, is_new] = health_check_contexts_.emplace(node_id, context);
RAY_CHECK(is_new);
},
"GcsHealthCheckManager::AddNode");
}

io_service_.dispatch(
[this, node_id]() {
RAY_CHECK(thread_checker_.IsOnSameThread());

auto iter = health_check_contexts_.find(node_id);

// A small chance other components (i.e. ray syncer) are initialized before health
// manager.
if (iter == health_check_contexts_.end()) {
return;
}

auto *ctx = iter->second;
ctx->SetLatestHealthTimestamp(absl::Now());
},
"GcsHealthCheckManager::MarkNodeHealthy");
}

void GcsHealthCheckManager::HealthCheckContext::StartHealthCheck() {
using ::grpc::health::v1::HealthCheckResponse;

dentiny marked this conversation as resolved.
Show resolved Hide resolved
RAY_CHECK(manager_->thread_checker_.IsOnSameThread());

// If current context is requested to stop, directly destruct itself and exit.
if (stopped_) {
delete this;
return;
}

// Check latest health status, see whether a new rpc message is needed.
const auto now = absl::Now();
absl::Time next_check_time =
lastest_known_healthy_timestamp_ + absl::Milliseconds(manager_->period_ms_);
if (now <= next_check_time) {
// Update message is fresh enough, skip current check and schedule later.
int64_t next_schedule_millisec = (next_check_time - now) / absl::Milliseconds(1);
timer_.expires_from_now(boost::posix_time::milliseconds(next_schedule_millisec));
timer_.async_wait([this](auto) { StartHealthCheck(); });
return;
}

// Reset the context/request/response for the next request.
context_.~ClientContext();
new (&context_) grpc::ClientContext();
response_.Clear();

const auto now = absl::Now();
const auto deadline = now + absl::Milliseconds(manager_->timeout_ms_);
context_.set_deadline(absl::ToChronoTime(deadline));
stub_->async()->Check(
Expand Down Expand Up @@ -150,7 +182,7 @@ void GcsHealthCheckManager::AddNode(const NodeID &node_id,
std::shared_ptr<grpc::Channel> channel) {
io_service_.dispatch(
[this, channel = std::move(channel), node_id]() {
thread_checker_.IsOnSameThread();
RAY_CHECK(thread_checker_.IsOnSameThread());
auto context = new HealthCheckContext(this, channel, node_id);
auto [_, is_new] = health_check_contexts_.emplace(node_id, context);
RAY_CHECK(is_new);
Expand Down
13 changes: 13 additions & 0 deletions src/ray/gcs/gcs_server/gcs_health_check_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ class GcsHealthCheckManager {
/// \return A list of node id which are being monitored by this class.
std::vector<NodeID> GetAllNodes() const;

/// Mark the given node as healthy, so health check manager could save some checking
/// rpcs. Safe to call from non-io-context threads.
///
/// \param node_id The id of the node.
void MarkNodeHealthy(const NodeID &node_id);

private:
/// Fail a node when health check failed. It'll stop the health checking and
/// call `on_node_death_callback_`.
Expand Down Expand Up @@ -111,13 +117,20 @@ class GcsHealthCheckManager {

void Stop();

void SetLatestHealthTimestamp(absl::Time ts) {
lastest_known_healthy_timestamp_ = ts;
}

private:
void StartHealthCheck();

GcsHealthCheckManager *manager_;

NodeID node_id_;

// Timestamp for latest known status when node is healthy.
absl::Time lastest_known_healthy_timestamp_ = absl::InfinitePast();

// Whether the health check has stopped.
bool stopped_ = false;

Expand Down
12 changes: 8 additions & 4 deletions src/ray/gcs/gcs_server/gcs_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -173,12 +173,12 @@ void GcsServer::DoStart(const GcsInitData &gcs_init_data) {
// Init gcs resource manager.
InitGcsResourceManager(gcs_init_data);

// Init synchronization service
InitRaySyncer(gcs_init_data);

// Init gcs health check manager.
InitGcsHealthCheckManager(gcs_init_data);

// Init synchronization service
InitRaySyncer(gcs_init_data);

// Init KV service.
InitKVService();

Expand Down Expand Up @@ -522,7 +522,11 @@ GcsServer::StorageType GcsServer::GetStorageType() const {

void GcsServer::InitRaySyncer(const GcsInitData &gcs_init_data) {
ray_syncer_ = std::make_unique<syncer::RaySyncer>(
io_context_provider_.GetIOContext<syncer::RaySyncer>(), kGCSNodeID.Binary());
io_context_provider_.GetIOContext<syncer::RaySyncer>(),
kGCSNodeID.Binary(),
[this](const NodeID &node_id) {
gcs_healthcheck_manager_->MarkNodeHealthy(node_id);
});
ray_syncer_->Register(
syncer::MessageType::RESOURCE_VIEW, nullptr, gcs_resource_manager_.get());
ray_syncer_->Register(
Expand Down
13 changes: 13 additions & 0 deletions src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,19 @@ TEST_F(GcsHealthCheckManagerTest, TestBasic) {
ASSERT_TRUE(dead_nodes.count(node_id));
}

TEST_F(GcsHealthCheckManagerTest, MarkHealthAndSkipCheck) {
auto node_id = AddServer();
Run(0); // Initial run
ASSERT_TRUE(dead_nodes.empty());

// Run the first health check: even we mark node down, health check is skipped due to
// fresh enough information.
StopServing(node_id);
health_check->MarkNodeHealthy(node_id);
Run(0);
ASSERT_TRUE(dead_nodes.empty());
}

TEST_F(GcsHealthCheckManagerTest, StoppedAndResume) {
auto node_id = AddServer();
Run(0); // Initial run
Expand Down
2 changes: 1 addition & 1 deletion src/ray/util/thread_checker.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class ThreadChecker {
public:
// Return true at initialization, or current invocation happens on the same thread as
// initialization.
bool IsOnSameThread() const;
[[nodiscard]] bool IsOnSameThread() const;

private:
mutable std::atomic<std::thread::id> thread_id_{};
Expand Down
Loading