From 5f88358a9b7dbcccbcf86be68de1bc4db1220738 Mon Sep 17 00:00:00 2001 From: hjiang Date: Fri, 6 Dec 2024 01:32:48 +0000 Subject: [PATCH 1/8] ray syncer set observer Signed-off-by: hjiang --- src/ray/common/ray_syncer/common.h | 7 +++ src/ray/common/ray_syncer/node_state.cc | 8 ++++ src/ray/common/ray_syncer/node_state.h | 11 +++++ src/ray/common/ray_syncer/ray_syncer.cc | 6 ++- src/ray/common/ray_syncer/ray_syncer.h | 4 ++ src/ray/common/test/ray_syncer_test.cc | 60 ++++++++++++++++++++----- 6 files changed, 84 insertions(+), 12 deletions(-) diff --git a/src/ray/common/ray_syncer/common.h b/src/ray/common/ray_syncer/common.h index 9022514a1f60..52e3e74dfae6 100644 --- a/src/ray/common/ray_syncer/common.h +++ b/src/ray/common/ray_syncer/common.h @@ -16,11 +16,18 @@ #pragma once +#include + #include "src/ray/protobuf/ray_syncer.grpc.pb.h" +#include "src/ray/protobuf/ray_syncer.pb.h" namespace ray::syncer { inline constexpr size_t kComponentArraySize = static_cast(ray::rpc::syncer::MessageType_ARRAYSIZE); +// Observer callable for sync message response. +using RaySyncMsgObserver = + std::function; + } // namespace ray::syncer diff --git a/src/ray/common/ray_syncer/node_state.cc b/src/ray/common/ray_syncer/node_state.cc index 4db242274582..793b1852d08f 100644 --- a/src/ray/common/ray_syncer/node_state.cc +++ b/src/ray/common/ray_syncer/node_state.cc @@ -21,6 +21,11 @@ namespace ray::syncer { NodeState::NodeState() { sync_message_versions_taken_.fill(-1); } +void NodeState::SetRaySyncMsgObserverForOnce(RaySyncMsgObserver ray_sync_msg_observer) { + RAY_CHECK(!ray_sync_msg_observer_); + ray_sync_msg_observer_ = std::move(ray_sync_msg_observer); +} + bool NodeState::SetComponent(MessageType message_type, const ReporterInterface *reporter, ReceiverInterface *receiver) { @@ -67,6 +72,9 @@ bool NodeState::ConsumeSyncMessage(std::shared_ptr message } current = message; + if (ray_sync_msg_observer_) { + ray_sync_msg_observer_(*message); + } auto receiver = receivers_[message->message_type()]; if (receiver != nullptr) { RAY_LOG(DEBUG).WithField(NodeID::FromBinary(message->node_id())) diff --git a/src/ray/common/ray_syncer/node_state.h b/src/ray/common/ray_syncer/node_state.h index 65b140dec334..4dd88e549813 100644 --- a/src/ray/common/ray_syncer/node_state.h +++ b/src/ray/common/ray_syncer/node_state.h @@ -16,6 +16,7 @@ #include #include +#include #include #include @@ -74,6 +75,10 @@ class NodeState { /// \return true if the local node doesn't have message with newer version. bool ConsumeSyncMessage(std::shared_ptr message); + /// Set the observer callable for sync message response for once. + /// This function is expected to call only once, repeated invocations throws exception. + void SetRaySyncMsgObserverForOnce(RaySyncMsgObserver ray_sync_msg_observer); + /// Return the cluster view of this local node. const absl::flat_hash_map< std::string, @@ -99,6 +104,12 @@ class NodeState { std::string, std::array, kComponentArraySize>> cluster_view_; + + /// Sync message observer, which is a callback on received message response. + /// + /// As of now we only have one single usage for health check status update, update to + /// vector if we have more observers. + RaySyncMsgObserver ray_sync_msg_observer_; }; } // namespace ray::syncer diff --git a/src/ray/common/ray_syncer/ray_syncer.cc b/src/ray/common/ray_syncer/ray_syncer.cc index 73743b67f78c..477fda6f40cc 100644 --- a/src/ray/common/ray_syncer/ray_syncer.cc +++ b/src/ray/common/ray_syncer/ray_syncer.cc @@ -72,7 +72,7 @@ void RaySyncer::Connect(const std::string &node_id, boost::asio::dispatch( io_context_.get_executor(), std::packaged_task([=]() { auto stub = ray::rpc::syncer::RaySyncer::NewStub(channel); - auto reactor = new RayClientBidiReactor( + auto *reactor = new RayClientBidiReactor( /* remote_node_id */ node_id, /* local_node_id */ GetLocalNodeID(), /* io_context */ io_context_, @@ -144,6 +144,10 @@ void RaySyncer::Disconnect(const std::string &node_id) { boost::asio::dispatch(io_context_.get_executor(), std::move(task)).get(); } +void RaySyncer::SetRaySyncMsgObserverForOnce(RaySyncMsgObserver ray_sync_msg_observer) { + node_state_->SetRaySyncMsgObserverForOnce(std::move(ray_sync_msg_observer)); +} + void RaySyncer::Register(MessageType message_type, const ReporterInterface *reporter, ReceiverInterface *receiver, diff --git a/src/ray/common/ray_syncer/ray_syncer.h b/src/ray/common/ray_syncer/ray_syncer.h index 49928ff7fc59..ca11b27d4e52 100644 --- a/src/ray/common/ray_syncer/ray_syncer.h +++ b/src/ray/common/ray_syncer/ray_syncer.h @@ -145,6 +145,10 @@ class RaySyncer { std::vector GetAllConnectedNodeIDs() const; + /// Set the observer callable for sync message response for once. + /// This function is expected to call only once, repeated invocations throws exception. + void SetRaySyncMsgObserverForOnce(RaySyncMsgObserver ray_sync_msg_observer); + private: void Connect(RaySyncerBidiReactor *connection); diff --git a/src/ray/common/test/ray_syncer_test.cc b/src/ray/common/test/ray_syncer_test.cc index c76dda5bee95..c3e06e6fecae 100644 --- a/src/ray/common/test/ray_syncer_test.cc +++ b/src/ray/common/test/ray_syncer_test.cc @@ -12,20 +12,21 @@ // See the License for the specific language governing permissions and // limitations under the License. -// clang-format off -#include "gmock/gmock.h" -#include "gtest/gtest.h" -#include -#include +#include +#include +#include #include #include -#include -#include #include #include #include #include +#include +#include +#include + +// clang-format off #include "ray/common/ray_syncer/node_state.h" #include "ray/common/ray_syncer/ray_syncer.h" #include "ray/common/ray_syncer/ray_syncer_client.h" @@ -203,15 +204,23 @@ TEST_F(RaySyncerTest, RaySyncerBidiReactorBase) { } struct SyncerServerTest { - SyncerServerTest(std::string port) : work_guard(io_context.get_executor()) { + SyncerServerTest(std::string port) + : SyncerServerTest( + std::move(port), /*node_id=*/NodeID::FromRandom(), /*ray_sync_observer=*/{}) { + } + + SyncerServerTest(std::string port, NodeID node_id, RaySyncMsgObserver ray_sync_observer) + : work_guard(io_context.get_executor()) { this->server_port = port; // Setup io context - auto node_id = NodeID::FromRandom(); for (auto &v : local_versions) { v = 0; } // Setup syncer and grpc server syncer = std::make_unique(io_context, node_id.Binary()); + if (ray_sync_observer) { + syncer->SetRaySyncMsgObserverForOnce(std::move(ray_sync_observer)); + } thread = std::make_unique([this] { io_context.run(); }); auto server_address = std::string("0.0.0.0:") + port; @@ -421,6 +430,14 @@ class SyncerTest : public ::testing::Test { return *servers.back(); } + SyncerServerTest &MakeServer(std::string port, + NodeID node_id, + RaySyncMsgObserver ran_sync_observer) { + servers.emplace_back(std::make_unique( + port, std::move(node_id), std::move(ran_sync_observer))); + return *servers.back(); + } + protected: void TearDown() override { // Drain all grpc requests. @@ -434,9 +451,26 @@ class SyncerTest : public ::testing::Test { }; TEST_F(SyncerTest, Test1To1) { - auto &s1 = MakeServer("19990"); + // Generate node ids for checking. + NodeID node_id1 = NodeID::FromRandom(); + NodeID node_id2 = NodeID::FromRandom(); + + // Used to check the number of messages consumed for two servers. + int s1_observer_cb_call_cnt = 0; + int s2_observer_cb_call_cnt = 0; + + // Register observer callback for syncers. + auto syncer_observer_cb = [&](const ::ray::rpc::syncer::RaySyncMessage &msg) { + const auto &cur_node_id = NodeID::FromBinary(msg.node_id()); + if (cur_node_id == node_id1) { + ++s1_observer_cb_call_cnt; + } else if (cur_node_id == node_id2) { + ++s2_observer_cb_call_cnt; + } + }; - auto &s2 = MakeServer("19991"); + auto &s1 = MakeServer("19990", node_id1, syncer_observer_cb); + auto &s2 = MakeServer("19991", node_id2, syncer_observer_cb); // Make sure the setup is correct ASSERT_NE(nullptr, s1.receivers[MessageType::RESOURCE_VIEW]); @@ -538,6 +572,10 @@ TEST_F(SyncerTest, Test1To1) { ASSERT_LE(s1.GetNumConsumedMessages(s2.syncer->GetLocalNodeID()), max_sends * 2 + 3); // s1 has one reporter + 1 for the one send before the measure ASSERT_LE(s2.GetNumConsumedMessages(s1.syncer->GetLocalNodeID()), max_sends + 3); + + // Make sure registered callbacks have been called. + ASSERT_GT(s1_observer_cb_call_cnt, 0); + ASSERT_GT(s2_observer_cb_call_cnt, 0); } TEST_F(SyncerTest, Reconnect) { From 29657420ea502f3aba8cc37a02a6db92b501c151 Mon Sep 17 00:00:00 2001 From: hjiang Date: Thu, 12 Dec 2024 00:39:46 +0000 Subject: [PATCH 2/8] rename callback and doc Signed-off-by: hjiang --- src/ray/common/ray_syncer/common.h | 10 +++++++--- src/ray/common/ray_syncer/node_state.cc | 11 ++++++----- src/ray/common/ray_syncer/node_state.h | 5 +++-- src/ray/common/ray_syncer/ray_syncer.cc | 5 +++-- src/ray/common/ray_syncer/ray_syncer.h | 3 ++- src/ray/common/test/ray_syncer_test.cc | 17 +++++++++-------- 6 files changed, 30 insertions(+), 21 deletions(-) diff --git a/src/ray/common/ray_syncer/common.h b/src/ray/common/ray_syncer/common.h index 52e3e74dfae6..3a4e936e414b 100644 --- a/src/ray/common/ray_syncer/common.h +++ b/src/ray/common/ray_syncer/common.h @@ -18,6 +18,7 @@ #include +#include "ray/common/id.h" #include "src/ray/protobuf/ray_syncer.grpc.pb.h" #include "src/ray/protobuf/ray_syncer.pb.h" @@ -26,8 +27,11 @@ namespace ray::syncer { inline constexpr size_t kComponentArraySize = static_cast(ray::rpc::syncer::MessageType_ARRAYSIZE); -// Observer callable for sync message response. -using RaySyncMsgObserver = - std::function; +// TODO(hjiang): As of now, only ray syncer uses it so we put it under `ray_syncer` +// folder, better to place it into other common folders if uses elsewhere. +// +// A callback, which is called whenever a rpc completes between the current process and +// raylet, which represents the node. +using RayletCompletedRpcCallback = std::function; } // namespace ray::syncer diff --git a/src/ray/common/ray_syncer/node_state.cc b/src/ray/common/ray_syncer/node_state.cc index 793b1852d08f..4ad4a463c95c 100644 --- a/src/ray/common/ray_syncer/node_state.cc +++ b/src/ray/common/ray_syncer/node_state.cc @@ -21,9 +21,10 @@ namespace ray::syncer { NodeState::NodeState() { sync_message_versions_taken_.fill(-1); } -void NodeState::SetRaySyncMsgObserverForOnce(RaySyncMsgObserver ray_sync_msg_observer) { - RAY_CHECK(!ray_sync_msg_observer_); - ray_sync_msg_observer_ = std::move(ray_sync_msg_observer); +void NodeState::SetRayletCompletedRpcCallbackForOnce( + RayletCompletedRpcCallback on_raylet_rpc_completion) { + RAY_CHECK(on_raylet_rpc_completion); + on_raylet_rpc_completion_ = std::move(on_raylet_rpc_completion); } bool NodeState::SetComponent(MessageType message_type, @@ -72,8 +73,8 @@ bool NodeState::ConsumeSyncMessage(std::shared_ptr message } current = message; - if (ray_sync_msg_observer_) { - ray_sync_msg_observer_(*message); + if (on_raylet_rpc_completion_) { + on_raylet_rpc_completion_(NodeID::FromBinary(message->node_id())); } auto receiver = receivers_[message->message_type()]; if (receiver != nullptr) { diff --git a/src/ray/common/ray_syncer/node_state.h b/src/ray/common/ray_syncer/node_state.h index 4dd88e549813..03dc44e7eefc 100644 --- a/src/ray/common/ray_syncer/node_state.h +++ b/src/ray/common/ray_syncer/node_state.h @@ -77,7 +77,8 @@ class NodeState { /// Set the observer callable for sync message response for once. /// This function is expected to call only once, repeated invocations throws exception. - void SetRaySyncMsgObserverForOnce(RaySyncMsgObserver ray_sync_msg_observer); + void SetRayletCompletedRpcCallbackForOnce( + RayletCompletedRpcCallback on_raylet_rpc_completion); /// Return the cluster view of this local node. const absl::flat_hash_map< @@ -109,7 +110,7 @@ class NodeState { /// /// As of now we only have one single usage for health check status update, update to /// vector if we have more observers. - RaySyncMsgObserver ray_sync_msg_observer_; + RayletCompletedRpcCallback on_raylet_rpc_completion_; }; } // namespace ray::syncer diff --git a/src/ray/common/ray_syncer/ray_syncer.cc b/src/ray/common/ray_syncer/ray_syncer.cc index 345ec15173e3..5abcf63a4458 100644 --- a/src/ray/common/ray_syncer/ray_syncer.cc +++ b/src/ray/common/ray_syncer/ray_syncer.cc @@ -147,8 +147,9 @@ void RaySyncer::Disconnect(const std::string &node_id) { boost::asio::dispatch(io_context_.get_executor(), std::move(task)).get(); } -void RaySyncer::SetRaySyncMsgObserverForOnce(RaySyncMsgObserver ray_sync_msg_observer) { - node_state_->SetRaySyncMsgObserverForOnce(std::move(ray_sync_msg_observer)); +void RaySyncer::SetRayletCompletedRpcCallbackForOnce( + RayletCompletedRpcCallback on_raylet_rpc_completion) { + node_state_->SetRayletCompletedRpcCallbackForOnce(std::move(on_raylet_rpc_completion)); } void RaySyncer::Register(MessageType message_type, diff --git a/src/ray/common/ray_syncer/ray_syncer.h b/src/ray/common/ray_syncer/ray_syncer.h index d73c27dbf799..5fa2774788ea 100644 --- a/src/ray/common/ray_syncer/ray_syncer.h +++ b/src/ray/common/ray_syncer/ray_syncer.h @@ -147,7 +147,8 @@ class RaySyncer { /// Set the observer callable for sync message response for once. /// This function is expected to call only once, repeated invocations throws exception. - void SetRaySyncMsgObserverForOnce(RaySyncMsgObserver ray_sync_msg_observer); + void SetRayletCompletedRpcCallbackForOnce( + RayletCompletedRpcCallback on_raylet_rpc_completion); private: void Connect(RaySyncerBidiReactor *connection); diff --git a/src/ray/common/test/ray_syncer_test.cc b/src/ray/common/test/ray_syncer_test.cc index c3e06e6fecae..e96c95dc5c3d 100644 --- a/src/ray/common/test/ray_syncer_test.cc +++ b/src/ray/common/test/ray_syncer_test.cc @@ -209,7 +209,9 @@ struct SyncerServerTest { std::move(port), /*node_id=*/NodeID::FromRandom(), /*ray_sync_observer=*/{}) { } - SyncerServerTest(std::string port, NodeID node_id, RaySyncMsgObserver ray_sync_observer) + SyncerServerTest(std::string port, + NodeID node_id, + RayletCompletedRpcCallback ray_sync_observer) : work_guard(io_context.get_executor()) { this->server_port = port; // Setup io context @@ -219,7 +221,7 @@ struct SyncerServerTest { // Setup syncer and grpc server syncer = std::make_unique(io_context, node_id.Binary()); if (ray_sync_observer) { - syncer->SetRaySyncMsgObserverForOnce(std::move(ray_sync_observer)); + syncer->SetRayletCompletedRpcCallbackForOnce(std::move(ray_sync_observer)); } thread = std::make_unique([this] { io_context.run(); }); @@ -432,9 +434,9 @@ class SyncerTest : public ::testing::Test { SyncerServerTest &MakeServer(std::string port, NodeID node_id, - RaySyncMsgObserver ran_sync_observer) { + RayletCompletedRpcCallback on_raylet_rpc_completion) { servers.emplace_back(std::make_unique( - port, std::move(node_id), std::move(ran_sync_observer))); + port, std::move(node_id), std::move(on_raylet_rpc_completion))); return *servers.back(); } @@ -460,11 +462,10 @@ TEST_F(SyncerTest, Test1To1) { int s2_observer_cb_call_cnt = 0; // Register observer callback for syncers. - auto syncer_observer_cb = [&](const ::ray::rpc::syncer::RaySyncMessage &msg) { - const auto &cur_node_id = NodeID::FromBinary(msg.node_id()); - if (cur_node_id == node_id1) { + auto syncer_observer_cb = [&](const NodeID &node_id) { + if (node_id == node_id1) { ++s1_observer_cb_call_cnt; - } else if (cur_node_id == node_id2) { + } else if (node_id == node_id2) { ++s2_observer_cb_call_cnt; } }; From 30671655d0d4b1a9b5f701ae7c3eded8de32ef71 Mon Sep 17 00:00:00 2001 From: hjiang Date: Thu, 12 Dec 2024 05:17:14 +0000 Subject: [PATCH 3/8] skip health check rpc if possible Signed-off-by: hjiang --- .../gcs_server/gcs_health_check_manager.cc | 26 ++++++++++++++++++- .../gcs/gcs_server/gcs_health_check_manager.h | 13 ++++++++++ src/ray/gcs/gcs_server/gcs_server.cc | 12 ++++++--- .../test/gcs_health_check_manager_test.cc | 13 ++++++++++ 4 files changed, 60 insertions(+), 4 deletions(-) diff --git a/src/ray/gcs/gcs_server/gcs_health_check_manager.cc b/src/ray/gcs/gcs_server/gcs_health_check_manager.cc index d6e858482185..f6229c6dc86a 100644 --- a/src/ray/gcs/gcs_server/gcs_health_check_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_health_check_manager.cc @@ -82,15 +82,39 @@ std::vector GcsHealthCheckManager::GetAllNodes() const { return nodes; } +void GcsHealthCheckManager::MarkNodeHealthy(const NodeID &node_id) { + auto iter = health_check_contexts_.find(node_id); + + // A small chance other components (i.e. ray syncer) are initialized before health + // manager. + if (iter == health_check_contexts_.end()) { + return; + } + + auto *ctx = iter->second; + ctx->SetLatestHealthTimestamp(absl::Now()); +} + void GcsHealthCheckManager::HealthCheckContext::StartHealthCheck() { using ::grpc::health::v1::HealthCheckResponse; + // Check latest health status, see whether a new rpc message is needed. + const auto now = absl::Now(); + absl::Time next_check_time = + lastest_known_healthy_timestamp_ + absl::Milliseconds(manager_->period_ms_); + if (now <= next_check_time) { + // Update message is fresh enough, skip current check and schedule later. + int64_t next_schedule_millisec = (next_check_time - now) / absl::Milliseconds(1); + timer_.expires_from_now(boost::posix_time::milliseconds(next_schedule_millisec)); + timer_.async_wait([this](auto) { StartHealthCheck(); }); + return; + } + // Reset the context/request/response for the next request. context_.~ClientContext(); new (&context_) grpc::ClientContext(); response_.Clear(); - const auto now = absl::Now(); const auto deadline = now + absl::Milliseconds(manager_->timeout_ms_); context_.set_deadline(absl::ToChronoTime(deadline)); stub_->async()->Check( diff --git a/src/ray/gcs/gcs_server/gcs_health_check_manager.h b/src/ray/gcs/gcs_server/gcs_health_check_manager.h index a6e36d82972a..fe98272fc21a 100644 --- a/src/ray/gcs/gcs_server/gcs_health_check_manager.h +++ b/src/ray/gcs/gcs_server/gcs_health_check_manager.h @@ -82,6 +82,12 @@ class GcsHealthCheckManager { /// \return A list of node id which are being monitored by this class. std::vector GetAllNodes() const; + /// Mark the given node as healthy, so health check manager could save some checking + /// rpcs. Notice: have to invoke from io-context thread. + /// + /// \param node_id The id of the node. + void MarkNodeHealthy(const NodeID &node_id); + private: /// Fail a node when health check failed. It'll stop the health checking and /// call `on_node_death_callback_`. @@ -111,6 +117,10 @@ class GcsHealthCheckManager { void Stop(); + void SetLatestHealthTimestamp(absl::Time ts) { + lastest_known_healthy_timestamp_ = ts; + } + private: void StartHealthCheck(); @@ -118,6 +128,9 @@ class GcsHealthCheckManager { NodeID node_id_; + // Timestamp for latest known status when node is healthy. + absl::Time lastest_known_healthy_timestamp_ = absl::InfinitePast(); + // Whether the health check has stopped. bool stopped_ = false; diff --git a/src/ray/gcs/gcs_server/gcs_server.cc b/src/ray/gcs/gcs_server/gcs_server.cc index 416a4ec0a150..eee19f2d7b9a 100644 --- a/src/ray/gcs/gcs_server/gcs_server.cc +++ b/src/ray/gcs/gcs_server/gcs_server.cc @@ -173,12 +173,12 @@ void GcsServer::DoStart(const GcsInitData &gcs_init_data) { // Init gcs resource manager. InitGcsResourceManager(gcs_init_data); - // Init synchronization service - InitRaySyncer(gcs_init_data); - // Init gcs health check manager. InitGcsHealthCheckManager(gcs_init_data); + // Init synchronization service + InitRaySyncer(gcs_init_data); + // Init KV service. InitKVService(); @@ -527,6 +527,12 @@ void GcsServer::InitRaySyncer(const GcsInitData &gcs_init_data) { syncer::MessageType::RESOURCE_VIEW, nullptr, gcs_resource_manager_.get()); ray_syncer_->Register( syncer::MessageType::COMMANDS, nullptr, gcs_resource_manager_.get()); + + // Register completion callback on health check. + ray_syncer_->SetRayletCompletedRpcCallbackForOnce([this](const NodeID &node_id) { + gcs_healthcheck_manager_->MarkNodeHealthy(node_id); + }); + ray_syncer_service_ = std::make_unique(*ray_syncer_); rpc_server_.RegisterService(*ray_syncer_service_); } diff --git a/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc index 35fc308f28a9..74e9f2d97571 100644 --- a/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc @@ -162,6 +162,19 @@ TEST_F(GcsHealthCheckManagerTest, TestBasic) { ASSERT_TRUE(dead_nodes.count(node_id)); } +TEST_F(GcsHealthCheckManagerTest, MarkHealthAndSkipCheck) { + auto node_id = AddServer(); + Run(0); // Initial run + ASSERT_TRUE(dead_nodes.empty()); + + // Run the first health check: even we mark node down, health check is skipped due to + // fresh enough information. + StopServing(node_id); + health_check->MarkNodeHealthy(node_id); + Run(0); + ASSERT_TRUE(dead_nodes.empty()); +} + TEST_F(GcsHealthCheckManagerTest, StoppedAndResume) { auto node_id = AddServer(); Run(0); // Initial run From 7ad2ddfa7afa6d9e05bc60606eaaca26b4d0ab6f Mon Sep 17 00:00:00 2001 From: dentiny Date: Mon, 16 Dec 2024 23:37:02 +0000 Subject: [PATCH 4/8] fix conflict Signed-off-by: dentiny --- src/ray/common/ray_syncer/node_state.cc | 9 --------- src/ray/common/ray_syncer/node_state.h | 11 ----------- src/ray/common/ray_syncer/ray_syncer.cc | 5 ----- src/ray/common/ray_syncer/ray_syncer.h | 5 ----- src/ray/gcs/gcs_server/gcs_server.cc | 12 +++++------- 5 files changed, 5 insertions(+), 37 deletions(-) diff --git a/src/ray/common/ray_syncer/node_state.cc b/src/ray/common/ray_syncer/node_state.cc index 4ad4a463c95c..4db242274582 100644 --- a/src/ray/common/ray_syncer/node_state.cc +++ b/src/ray/common/ray_syncer/node_state.cc @@ -21,12 +21,6 @@ namespace ray::syncer { NodeState::NodeState() { sync_message_versions_taken_.fill(-1); } -void NodeState::SetRayletCompletedRpcCallbackForOnce( - RayletCompletedRpcCallback on_raylet_rpc_completion) { - RAY_CHECK(on_raylet_rpc_completion); - on_raylet_rpc_completion_ = std::move(on_raylet_rpc_completion); -} - bool NodeState::SetComponent(MessageType message_type, const ReporterInterface *reporter, ReceiverInterface *receiver) { @@ -73,9 +67,6 @@ bool NodeState::ConsumeSyncMessage(std::shared_ptr message } current = message; - if (on_raylet_rpc_completion_) { - on_raylet_rpc_completion_(NodeID::FromBinary(message->node_id())); - } auto receiver = receivers_[message->message_type()]; if (receiver != nullptr) { RAY_LOG(DEBUG).WithField(NodeID::FromBinary(message->node_id())) diff --git a/src/ray/common/ray_syncer/node_state.h b/src/ray/common/ray_syncer/node_state.h index 03dc44e7eefc..906a1d385ffe 100644 --- a/src/ray/common/ray_syncer/node_state.h +++ b/src/ray/common/ray_syncer/node_state.h @@ -75,11 +75,6 @@ class NodeState { /// \return true if the local node doesn't have message with newer version. bool ConsumeSyncMessage(std::shared_ptr message); - /// Set the observer callable for sync message response for once. - /// This function is expected to call only once, repeated invocations throws exception. - void SetRayletCompletedRpcCallbackForOnce( - RayletCompletedRpcCallback on_raylet_rpc_completion); - /// Return the cluster view of this local node. const absl::flat_hash_map< std::string, @@ -105,12 +100,6 @@ class NodeState { std::string, std::array, kComponentArraySize>> cluster_view_; - - /// Sync message observer, which is a callback on received message response. - /// - /// As of now we only have one single usage for health check status update, update to - /// vector if we have more observers. - RayletCompletedRpcCallback on_raylet_rpc_completion_; }; } // namespace ray::syncer diff --git a/src/ray/common/ray_syncer/ray_syncer.cc b/src/ray/common/ray_syncer/ray_syncer.cc index 6d059c4df265..1969d7f0f575 100644 --- a/src/ray/common/ray_syncer/ray_syncer.cc +++ b/src/ray/common/ray_syncer/ray_syncer.cc @@ -154,11 +154,6 @@ void RaySyncer::Disconnect(const std::string &node_id) { boost::asio::dispatch(io_context_.get_executor(), std::move(task)).get(); } -void RaySyncer::SetRayletCompletedRpcCallbackForOnce( - RayletCompletedRpcCallback on_raylet_rpc_completion) { - node_state_->SetRayletCompletedRpcCallbackForOnce(std::move(on_raylet_rpc_completion)); -} - void RaySyncer::Register(MessageType message_type, const ReporterInterface *reporter, ReceiverInterface *receiver, diff --git a/src/ray/common/ray_syncer/ray_syncer.h b/src/ray/common/ray_syncer/ray_syncer.h index 889dc2d495f5..d5dc657f46cd 100644 --- a/src/ray/common/ray_syncer/ray_syncer.h +++ b/src/ray/common/ray_syncer/ray_syncer.h @@ -148,11 +148,6 @@ class RaySyncer { std::vector GetAllConnectedNodeIDs() const; - /// Set the observer callable for sync message response for once. - /// This function is expected to call only once, repeated invocations throws exception. - void SetRayletCompletedRpcCallbackForOnce( - RayletCompletedRpcCallback on_raylet_rpc_completion); - private: void Connect(RaySyncerBidiReactor *connection); diff --git a/src/ray/gcs/gcs_server/gcs_server.cc b/src/ray/gcs/gcs_server/gcs_server.cc index eee19f2d7b9a..5ccab554169b 100644 --- a/src/ray/gcs/gcs_server/gcs_server.cc +++ b/src/ray/gcs/gcs_server/gcs_server.cc @@ -522,17 +522,15 @@ GcsServer::StorageType GcsServer::GetStorageType() const { void GcsServer::InitRaySyncer(const GcsInitData &gcs_init_data) { ray_syncer_ = std::make_unique( - io_context_provider_.GetIOContext(), kGCSNodeID.Binary()); + io_context_provider_.GetIOContext(), + kGCSNodeID.Binary(), + [this](const NodeID &node_id) { + gcs_healthcheck_manager_->MarkNodeHealthy(node_id); + }); ray_syncer_->Register( syncer::MessageType::RESOURCE_VIEW, nullptr, gcs_resource_manager_.get()); ray_syncer_->Register( syncer::MessageType::COMMANDS, nullptr, gcs_resource_manager_.get()); - - // Register completion callback on health check. - ray_syncer_->SetRayletCompletedRpcCallbackForOnce([this](const NodeID &node_id) { - gcs_healthcheck_manager_->MarkNodeHealthy(node_id); - }); - ray_syncer_service_ = std::make_unique(*ray_syncer_); rpc_server_.RegisterService(*ray_syncer_service_); } From fa5f85e4528d8242c428e930d060fc6ebca87215 Mon Sep 17 00:00:00 2001 From: dentiny Date: Wed, 18 Dec 2024 22:32:19 +0000 Subject: [PATCH 5/8] mark health in io context Signed-off-by: dentiny --- .../gcs_server/gcs_health_check_manager.cc | 22 ++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/src/ray/gcs/gcs_server/gcs_health_check_manager.cc b/src/ray/gcs/gcs_server/gcs_health_check_manager.cc index f6229c6dc86a..4607e6726eca 100644 --- a/src/ray/gcs/gcs_server/gcs_health_check_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_health_check_manager.cc @@ -83,16 +83,22 @@ std::vector GcsHealthCheckManager::GetAllNodes() const { } void GcsHealthCheckManager::MarkNodeHealthy(const NodeID &node_id) { - auto iter = health_check_contexts_.find(node_id); + io_service_.dispatch( + [this, node_id]() { + thread_checker_.IsOnSameThread(); - // A small chance other components (i.e. ray syncer) are initialized before health - // manager. - if (iter == health_check_contexts_.end()) { - return; - } + auto iter = health_check_contexts_.find(node_id); + + // A small chance other components (i.e. ray syncer) are initialized before health + // manager. + if (iter == health_check_contexts_.end()) { + return; + } - auto *ctx = iter->second; - ctx->SetLatestHealthTimestamp(absl::Now()); + auto *ctx = iter->second; + ctx->SetLatestHealthTimestamp(absl::Now()); + }, + "GcsHealthCheckManager::MarkNodeHealthy"); } void GcsHealthCheckManager::HealthCheckContext::StartHealthCheck() { From 4cc7819543f0bcbd328a80d948df3c2b0cd8cfd3 Mon Sep 17 00:00:00 2001 From: dentiny Date: Thu, 19 Dec 2024 01:20:29 +0000 Subject: [PATCH 6/8] check health check on same thread Signed-off-by: dentiny --- doc/source/data/api/input_output.rst | 2 +- doc/source/rllib/new-api-stack-migration-guide.rst | 6 +++--- src/ray/gcs/gcs_server/gcs_health_check_manager.cc | 4 +++- src/ray/util/thread_checker.h | 2 +- 4 files changed, 8 insertions(+), 6 deletions(-) diff --git a/doc/source/data/api/input_output.rst b/doc/source/data/api/input_output.rst index c4a4325b75f9..21790e8982ff 100644 --- a/doc/source/data/api/input_output.rst +++ b/doc/source/data/api/input_output.rst @@ -359,4 +359,4 @@ Shuffling API :nosignatures: :toctree: doc/ - FileShuffleConfig \ No newline at end of file + FileShuffleConfig diff --git a/doc/source/rllib/new-api-stack-migration-guide.rst b/doc/source/rllib/new-api-stack-migration-guide.rst index 1bebe30385f4..b06b91c6b39e 100644 --- a/doc/source/rllib/new-api-stack-migration-guide.rst +++ b/doc/source/rllib/new-api-stack-migration-guide.rst @@ -17,7 +17,7 @@ RLlib classes and code to RLlib's new API stack. .. note:: Even though the new API stack still provides rudimentary support for `TensorFlow `__, - RLlib supports a single deep learning framework, the `PyTorch `__ + RLlib supports a single deep learning framework, the `PyTorch `__ framework, dropping TensorFlow support entirely. Note, though, that the Ray team continues to design RLlib to be framework-agnostic. @@ -330,7 +330,7 @@ The following is a one-to-one translation guide for these types of Callbacks met **kwargs, ): # The `SingleAgentEpisode` or `MultiAgentEpisode` that RLlib has just started. - # See https://docs.ray.io/en/latest/rllib/single-agent-episode.html for more details: + # See https://docs.ray.io/en/latest/rllib/single-agent-episode.html for more details: print(episode) # The `EnvRunner` class that collects the episode in question. @@ -418,7 +418,7 @@ It also provides superior scalability, allowing training in a multi-GPU setup in and multi-node with multi-GPU training on the `Anyscale `__ platform. -Custom connectors (old-stack) +Custom connectors (old-stack) ----------------------------- If you're using custom connectors from the old API stack, move your logic into the diff --git a/src/ray/gcs/gcs_server/gcs_health_check_manager.cc b/src/ray/gcs/gcs_server/gcs_health_check_manager.cc index 4607e6726eca..173f6683e749 100644 --- a/src/ray/gcs/gcs_server/gcs_health_check_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_health_check_manager.cc @@ -85,7 +85,7 @@ std::vector GcsHealthCheckManager::GetAllNodes() const { void GcsHealthCheckManager::MarkNodeHealthy(const NodeID &node_id) { io_service_.dispatch( [this, node_id]() { - thread_checker_.IsOnSameThread(); + RAY_CHECK(thread_checker_.IsOnSameThread()); auto iter = health_check_contexts_.find(node_id); @@ -104,6 +104,8 @@ void GcsHealthCheckManager::MarkNodeHealthy(const NodeID &node_id) { void GcsHealthCheckManager::HealthCheckContext::StartHealthCheck() { using ::grpc::health::v1::HealthCheckResponse; + RAY_CHECK(thread_checker_.IsOnSameThread()); + // Check latest health status, see whether a new rpc message is needed. const auto now = absl::Now(); absl::Time next_check_time = diff --git a/src/ray/util/thread_checker.h b/src/ray/util/thread_checker.h index 2e3dcf1ed3df..562e1d2de503 100644 --- a/src/ray/util/thread_checker.h +++ b/src/ray/util/thread_checker.h @@ -34,7 +34,7 @@ class ThreadChecker { public: // Return true at initialization, or current invocation happens on the same thread as // initialization. - bool IsOnSameThread() const; + [[nodiscard]] bool IsOnSameThread() const; private: mutable std::atomic thread_id_{}; From 79bdc14df0b32c8c09b05a2c6552f94a68a08e5f Mon Sep 17 00:00:00 2001 From: dentiny Date: Thu, 19 Dec 2024 04:58:51 +0000 Subject: [PATCH 7/8] fix compilation Signed-off-by: dentiny --- src/ray/gcs/gcs_server/gcs_health_check_manager.cc | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/ray/gcs/gcs_server/gcs_health_check_manager.cc b/src/ray/gcs/gcs_server/gcs_health_check_manager.cc index 173f6683e749..d0c310a06bc3 100644 --- a/src/ray/gcs/gcs_server/gcs_health_check_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_health_check_manager.cc @@ -51,7 +51,7 @@ GcsHealthCheckManager::~GcsHealthCheckManager() = default; void GcsHealthCheckManager::RemoveNode(const NodeID &node_id) { io_service_.dispatch( [this, node_id]() { - thread_checker_.IsOnSameThread(); + RAY_CHECK(thread_checker_.IsOnSameThread()); auto iter = health_check_contexts_.find(node_id); if (iter == health_check_contexts_.end()) { return; @@ -64,7 +64,7 @@ void GcsHealthCheckManager::RemoveNode(const NodeID &node_id) { void GcsHealthCheckManager::FailNode(const NodeID &node_id) { RAY_LOG(WARNING).WithField(node_id) << "Node is dead because the health check failed."; - thread_checker_.IsOnSameThread(); + RAY_CHECK(thread_checker_.IsOnSameThread()); auto iter = health_check_contexts_.find(node_id); if (iter != health_check_contexts_.end()) { on_node_death_callback_(node_id); @@ -73,7 +73,7 @@ void GcsHealthCheckManager::FailNode(const NodeID &node_id) { } std::vector GcsHealthCheckManager::GetAllNodes() const { - thread_checker_.IsOnSameThread(); + RAY_CHECK(thread_checker_.IsOnSameThread()); std::vector nodes; nodes.reserve(health_check_contexts_.size()); for (const auto &[node_id, _] : health_check_contexts_) { @@ -104,7 +104,7 @@ void GcsHealthCheckManager::MarkNodeHealthy(const NodeID &node_id) { void GcsHealthCheckManager::HealthCheckContext::StartHealthCheck() { using ::grpc::health::v1::HealthCheckResponse; - RAY_CHECK(thread_checker_.IsOnSameThread()); + RAY_CHECK(manager_->thread_checker_.IsOnSameThread()); // Check latest health status, see whether a new rpc message is needed. const auto now = absl::Now(); @@ -176,7 +176,7 @@ void GcsHealthCheckManager::AddNode(const NodeID &node_id, std::shared_ptr channel) { io_service_.dispatch( [this, channel = std::move(channel), node_id]() { - thread_checker_.IsOnSameThread(); + RAY_CHECK(thread_checker_.IsOnSameThread()); auto context = new HealthCheckContext(this, channel, node_id); auto [_, is_new] = health_check_contexts_.emplace(node_id, context); RAY_CHECK(is_new); From f05ecb5c79e422d64203a8809f042472b1101628 Mon Sep 17 00:00:00 2001 From: dentiny Date: Thu, 19 Dec 2024 21:54:56 +0000 Subject: [PATCH 8/8] fix doc on io context invocation Signed-off-by: dentiny --- src/ray/gcs/gcs_server/gcs_health_check_manager.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/gcs/gcs_server/gcs_health_check_manager.h b/src/ray/gcs/gcs_server/gcs_health_check_manager.h index fe98272fc21a..4f369a814f32 100644 --- a/src/ray/gcs/gcs_server/gcs_health_check_manager.h +++ b/src/ray/gcs/gcs_server/gcs_health_check_manager.h @@ -83,7 +83,7 @@ class GcsHealthCheckManager { std::vector GetAllNodes() const; /// Mark the given node as healthy, so health check manager could save some checking - /// rpcs. Notice: have to invoke from io-context thread. + /// rpcs. Safe to call from non-io-context threads. /// /// \param node_id The id of the node. void MarkNodeHealthy(const NodeID &node_id);