From 6c841fcd6d17450b1bcccff0295e48a04b29069a Mon Sep 17 00:00:00 2001 From: Ricky Xu Date: Fri, 23 Jun 2023 19:32:22 -0700 Subject: [PATCH] [core][autoscaler] GCS Autoscaler V2: Handle ReportAutoscalingState (#36768) Why are these changes needed? Handle ReportAuotscalingState. --------- Signed-off-by: Ricky Xu Signed-off-by: rickyyx Signed-off-by: e428265 --- .../gcs_autoscaler_state_manager.cc | 28 +++++++++- .../gcs_server/gcs_autoscaler_state_manager.h | 5 ++ .../test/gcs_autoscaler_state_manager_test.cc | 51 +++++++++++++++++++ 3 files changed, 82 insertions(+), 2 deletions(-) diff --git a/src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.cc b/src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.cc index 8a6e835274e6..4ca598db8913 100644 --- a/src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.cc @@ -60,8 +60,32 @@ void GcsAutoscalerStateManager::HandleReportAutoscalingState( rpc::autoscaler::ReportAutoscalingStateRequest request, rpc::autoscaler::ReportAutoscalingStateReply *reply, rpc::SendReplyCallback send_reply_callback) { - // Unimplemented. - throw std::runtime_error("Unimplemented"); + // TODO(rickyx): We should handle the infeasible requests in the future. + // Right now, this info will only be used for observability, i.e. ray status. + + // Never seen any autoscaling state before - so just takes this. + if (!autoscaling_state_.has_value()) { + autoscaling_state_ = std::move(request.autoscaling_state()); + send_reply_callback(ray::Status::OK(), nullptr, nullptr); + return; + } + + // We have a state cached. We discard the incoming state if it's older than the + // cached state. + if (request.autoscaling_state().autoscaler_state_version() < + autoscaling_state_->autoscaler_state_version()) { + RAY_LOG(INFO) << "Received an outdated autoscaling state. " + << "Current version: " << autoscaling_state_->autoscaler_state_version() + << ", received version: " + << request.autoscaling_state().autoscaler_state_version() + << ". Discarding incoming request."; + send_reply_callback(ray::Status::OK(), nullptr, nullptr); + return; + } + + // We should overwrite the cache version. + autoscaling_state_ = std::move(request.autoscaling_state()); + send_reply_callback(ray::Status::OK(), nullptr, nullptr); } void GcsAutoscalerStateManager::HandleRequestClusterResourceConstraint( diff --git a/src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.h b/src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.h index a1175d80f152..119ea55e65cd 100644 --- a/src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.h +++ b/src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.h @@ -131,6 +131,11 @@ class GcsAutoscalerStateManager : public rpc::AutoscalerStateHandler { /// This is requested through autoscaler SDK from request_resources(). absl::optional cluster_resource_constraint_ = absl::nullopt; + + /// Cached autoscaling state. + absl::optional autoscaling_state_ = absl::nullopt; + + FRIEND_TEST(GcsAutoscalerStateManagerTest, TestReportAutoscalingState); }; } // namespace gcs diff --git a/src/ray/gcs/gcs_server/test/gcs_autoscaler_state_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_autoscaler_state_manager_test.cc index c2afdbf4f5d9..54a060829fc0 100644 --- a/src/ray/gcs/gcs_server/test/gcs_autoscaler_state_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_autoscaler_state_manager_test.cc @@ -137,6 +137,16 @@ class GcsAutoscalerStateManagerTest : public ::testing::Test { gcs_resource_manager_->UpdateResourceLoads(data); } + void ReportAutoscalingState(const rpc::autoscaler::AutoscalingState &state) { + rpc::autoscaler::ReportAutoscalingStateRequest request; + request.mutable_autoscaling_state()->CopyFrom(state); + rpc::autoscaler::ReportAutoscalingStateReply reply; + auto send_reply_callback = + [](ray::Status status, std::function f1, std::function f2) {}; + gcs_autoscaler_state_manager_->HandleReportAutoscalingState( + request, &reply, send_reply_callback); + } + std::string ShapeToString(const rpc::autoscaler::ResourceRequest &request) { // Ordered map with bundle name as the key std::map m; @@ -548,6 +558,47 @@ TEST_F(GcsAutoscalerStateManagerTest, TestClusterResourcesConstraint) { } } +TEST_F(GcsAutoscalerStateManagerTest, TestReportAutoscalingState) { + // Empty autoscaling state. + { + const auto &autoscaling_state = gcs_autoscaler_state_manager_->autoscaling_state_; + ASSERT_EQ(autoscaling_state, absl::nullopt); + } + + // Return the updated state. + { + rpc::autoscaler::AutoscalingState actual_state; + actual_state.set_autoscaler_state_version(1); + ReportAutoscalingState(actual_state); + + const auto &autoscaling_state = gcs_autoscaler_state_manager_->autoscaling_state_; + ASSERT_NE(autoscaling_state, absl::nullopt); + ASSERT_EQ(autoscaling_state->autoscaler_state_version(), 1); + } + + // Reject an older version. + { + rpc::autoscaler::AutoscalingState state; + state.set_autoscaler_state_version(0); + ReportAutoscalingState(state); + + const auto &autoscaling_state = gcs_autoscaler_state_manager_->autoscaling_state_; + ASSERT_NE(autoscaling_state, absl::nullopt); + ASSERT_EQ(autoscaling_state->autoscaler_state_version(), 1); + } + + // Update with a new version. + { + rpc::autoscaler::AutoscalingState state; + state.set_autoscaler_state_version(2); + ReportAutoscalingState(state); + + const auto &autoscaling_state = gcs_autoscaler_state_manager_->autoscaling_state_; + ASSERT_NE(autoscaling_state, absl::nullopt); + ASSERT_EQ(autoscaling_state->autoscaler_state_version(), 2); + } +} + } // namespace gcs } // namespace ray