diff --git a/src/ray/gcs/gcs_server/gcs_health_check_manager.cc b/src/ray/gcs/gcs_server/gcs_health_check_manager.cc index 4d68771cbc584..de064ccaac657 100644 --- a/src/ray/gcs/gcs_server/gcs_health_check_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_health_check_manager.cc @@ -51,7 +51,7 @@ GcsHealthCheckManager::~GcsHealthCheckManager() = default; void GcsHealthCheckManager::RemoveNode(const NodeID &node_id) { io_service_.dispatch( [this, node_id]() { - thread_checker_.IsOnSameThread(); + RAY_CHECK(thread_checker_.IsOnSameThread()); auto iter = health_check_contexts_.find(node_id); if (iter == health_check_contexts_.end()) { return; @@ -64,7 +64,7 @@ void GcsHealthCheckManager::RemoveNode(const NodeID &node_id) { void GcsHealthCheckManager::FailNode(const NodeID &node_id) { RAY_LOG(WARNING).WithField(node_id) << "Node is dead because the health check failed."; - thread_checker_.IsOnSameThread(); + RAY_CHECK(thread_checker_.IsOnSameThread()); auto iter = health_check_contexts_.find(node_id); if (iter != health_check_contexts_.end()) { on_node_death_callback_(node_id); @@ -73,7 +73,7 @@ void GcsHealthCheckManager::FailNode(const NodeID &node_id) { } std::vector GcsHealthCheckManager::GetAllNodes() const { - thread_checker_.IsOnSameThread(); + RAY_CHECK(thread_checker_.IsOnSameThread()); std::vector nodes; nodes.reserve(health_check_contexts_.size()); for (const auto &[node_id, _] : health_check_contexts_) { @@ -82,21 +82,53 @@ std::vector GcsHealthCheckManager::GetAllNodes() const { return nodes; } +void GcsHealthCheckManager::MarkNodeHealthy(const NodeID &node_id) { + io_service_.dispatch( + [this, node_id]() { + RAY_CHECK(thread_checker_.IsOnSameThread()); + + auto iter = health_check_contexts_.find(node_id); + + // A small chance other components (i.e. ray syncer) are initialized before health + // manager. + if (iter == health_check_contexts_.end()) { + return; + } + + auto *ctx = iter->second; + ctx->SetLatestHealthTimestamp(absl::Now()); + }, + "GcsHealthCheckManager::MarkNodeHealthy"); +} + void GcsHealthCheckManager::HealthCheckContext::StartHealthCheck() { using ::grpc::health::v1::HealthCheckResponse; + RAY_CHECK(manager_->thread_checker_.IsOnSameThread()); + // If current context is requested to stop, directly destruct itself and exit. if (stopped_) { delete this; return; } + // Check latest health status, see whether a new rpc message is needed. + const auto now = absl::Now(); + absl::Time next_check_time = + lastest_known_healthy_timestamp_ + absl::Milliseconds(manager_->period_ms_); + if (now <= next_check_time) { + // Update message is fresh enough, skip current check and schedule later. + int64_t next_schedule_millisec = (next_check_time - now) / absl::Milliseconds(1); + timer_.expires_from_now(boost::posix_time::milliseconds(next_schedule_millisec)); + timer_.async_wait([this](auto) { StartHealthCheck(); }); + return; + } + // Reset the context/request/response for the next request. context_.~ClientContext(); new (&context_) grpc::ClientContext(); response_.Clear(); - const auto now = absl::Now(); const auto deadline = now + absl::Milliseconds(manager_->timeout_ms_); context_.set_deadline(absl::ToChronoTime(deadline)); stub_->async()->Check( @@ -150,7 +182,7 @@ void GcsHealthCheckManager::AddNode(const NodeID &node_id, std::shared_ptr channel) { io_service_.dispatch( [this, channel = std::move(channel), node_id]() { - thread_checker_.IsOnSameThread(); + RAY_CHECK(thread_checker_.IsOnSameThread()); auto context = new HealthCheckContext(this, channel, node_id); auto [_, is_new] = health_check_contexts_.emplace(node_id, context); RAY_CHECK(is_new); diff --git a/src/ray/gcs/gcs_server/gcs_health_check_manager.h b/src/ray/gcs/gcs_server/gcs_health_check_manager.h index a6e36d82972a1..4f369a814f32a 100644 --- a/src/ray/gcs/gcs_server/gcs_health_check_manager.h +++ b/src/ray/gcs/gcs_server/gcs_health_check_manager.h @@ -82,6 +82,12 @@ class GcsHealthCheckManager { /// \return A list of node id which are being monitored by this class. std::vector GetAllNodes() const; + /// Mark the given node as healthy, so health check manager could save some checking + /// rpcs. Safe to call from non-io-context threads. + /// + /// \param node_id The id of the node. + void MarkNodeHealthy(const NodeID &node_id); + private: /// Fail a node when health check failed. It'll stop the health checking and /// call `on_node_death_callback_`. @@ -111,6 +117,10 @@ class GcsHealthCheckManager { void Stop(); + void SetLatestHealthTimestamp(absl::Time ts) { + lastest_known_healthy_timestamp_ = ts; + } + private: void StartHealthCheck(); @@ -118,6 +128,9 @@ class GcsHealthCheckManager { NodeID node_id_; + // Timestamp for latest known status when node is healthy. + absl::Time lastest_known_healthy_timestamp_ = absl::InfinitePast(); + // Whether the health check has stopped. bool stopped_ = false; diff --git a/src/ray/gcs/gcs_server/gcs_server.cc b/src/ray/gcs/gcs_server/gcs_server.cc index 416a4ec0a1505..5ccab554169bb 100644 --- a/src/ray/gcs/gcs_server/gcs_server.cc +++ b/src/ray/gcs/gcs_server/gcs_server.cc @@ -173,12 +173,12 @@ void GcsServer::DoStart(const GcsInitData &gcs_init_data) { // Init gcs resource manager. InitGcsResourceManager(gcs_init_data); - // Init synchronization service - InitRaySyncer(gcs_init_data); - // Init gcs health check manager. InitGcsHealthCheckManager(gcs_init_data); + // Init synchronization service + InitRaySyncer(gcs_init_data); + // Init KV service. InitKVService(); @@ -522,7 +522,11 @@ GcsServer::StorageType GcsServer::GetStorageType() const { void GcsServer::InitRaySyncer(const GcsInitData &gcs_init_data) { ray_syncer_ = std::make_unique( - io_context_provider_.GetIOContext(), kGCSNodeID.Binary()); + io_context_provider_.GetIOContext(), + kGCSNodeID.Binary(), + [this](const NodeID &node_id) { + gcs_healthcheck_manager_->MarkNodeHealthy(node_id); + }); ray_syncer_->Register( syncer::MessageType::RESOURCE_VIEW, nullptr, gcs_resource_manager_.get()); ray_syncer_->Register( diff --git a/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc index 35fc308f28a9e..74e9f2d975716 100644 --- a/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc @@ -162,6 +162,19 @@ TEST_F(GcsHealthCheckManagerTest, TestBasic) { ASSERT_TRUE(dead_nodes.count(node_id)); } +TEST_F(GcsHealthCheckManagerTest, MarkHealthAndSkipCheck) { + auto node_id = AddServer(); + Run(0); // Initial run + ASSERT_TRUE(dead_nodes.empty()); + + // Run the first health check: even we mark node down, health check is skipped due to + // fresh enough information. + StopServing(node_id); + health_check->MarkNodeHealthy(node_id); + Run(0); + ASSERT_TRUE(dead_nodes.empty()); +} + TEST_F(GcsHealthCheckManagerTest, StoppedAndResume) { auto node_id = AddServer(); Run(0); // Initial run diff --git a/src/ray/util/thread_checker.h b/src/ray/util/thread_checker.h index 2e3dcf1ed3df5..562e1d2de5033 100644 --- a/src/ray/util/thread_checker.h +++ b/src/ray/util/thread_checker.h @@ -34,7 +34,7 @@ class ThreadChecker { public: // Return true at initialization, or current invocation happens on the same thread as // initialization. - bool IsOnSameThread() const; + [[nodiscard]] bool IsOnSameThread() const; private: mutable std::atomic thread_id_{};