Skip to content

Commit

Permalink
[core][autoscaler] GCS Autoscaler V2: Handle ReportAutoscalingState (r…
Browse files Browse the repository at this point in the history
…ay-project#36768)

Why are these changes needed?
Handle ReportAuotscalingState.

---------

Signed-off-by: Ricky Xu <xuchen727@hotmail.com>
Signed-off-by: rickyyx <rickyx@anyscale.com>
Signed-off-by: e428265 <arvind.chandramouli@lmco.com>
  • Loading branch information
rickyyx authored and arvind-chandra committed Aug 31, 2023
1 parent b9da642 commit 6c841fc
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 2 deletions.
28 changes: 26 additions & 2 deletions src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,32 @@ void GcsAutoscalerStateManager::HandleReportAutoscalingState(
rpc::autoscaler::ReportAutoscalingStateRequest request,
rpc::autoscaler::ReportAutoscalingStateReply *reply,
rpc::SendReplyCallback send_reply_callback) {
// Unimplemented.
throw std::runtime_error("Unimplemented");
// TODO(rickyx): We should handle the infeasible requests in the future.
// Right now, this info will only be used for observability, i.e. ray status.

// Never seen any autoscaling state before - so just takes this.
if (!autoscaling_state_.has_value()) {
autoscaling_state_ = std::move(request.autoscaling_state());
send_reply_callback(ray::Status::OK(), nullptr, nullptr);
return;
}

// We have a state cached. We discard the incoming state if it's older than the
// cached state.
if (request.autoscaling_state().autoscaler_state_version() <
autoscaling_state_->autoscaler_state_version()) {
RAY_LOG(INFO) << "Received an outdated autoscaling state. "
<< "Current version: " << autoscaling_state_->autoscaler_state_version()
<< ", received version: "
<< request.autoscaling_state().autoscaler_state_version()
<< ". Discarding incoming request.";
send_reply_callback(ray::Status::OK(), nullptr, nullptr);
return;
}

// We should overwrite the cache version.
autoscaling_state_ = std::move(request.autoscaling_state());
send_reply_callback(ray::Status::OK(), nullptr, nullptr);
}

void GcsAutoscalerStateManager::HandleRequestClusterResourceConstraint(
Expand Down
5 changes: 5 additions & 0 deletions src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ class GcsAutoscalerStateManager : public rpc::AutoscalerStateHandler {
/// This is requested through autoscaler SDK from request_resources().
absl::optional<rpc::ClusterResourceConstraint> cluster_resource_constraint_ =
absl::nullopt;

/// Cached autoscaling state.
absl::optional<rpc::AutoscalingState> autoscaling_state_ = absl::nullopt;

FRIEND_TEST(GcsAutoscalerStateManagerTest, TestReportAutoscalingState);
};

} // namespace gcs
Expand Down
51 changes: 51 additions & 0 deletions src/ray/gcs/gcs_server/test/gcs_autoscaler_state_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,16 @@ class GcsAutoscalerStateManagerTest : public ::testing::Test {
gcs_resource_manager_->UpdateResourceLoads(data);
}

void ReportAutoscalingState(const rpc::autoscaler::AutoscalingState &state) {
rpc::autoscaler::ReportAutoscalingStateRequest request;
request.mutable_autoscaling_state()->CopyFrom(state);
rpc::autoscaler::ReportAutoscalingStateReply reply;
auto send_reply_callback =
[](ray::Status status, std::function<void()> f1, std::function<void()> f2) {};
gcs_autoscaler_state_manager_->HandleReportAutoscalingState(
request, &reply, send_reply_callback);
}

std::string ShapeToString(const rpc::autoscaler::ResourceRequest &request) {
// Ordered map with bundle name as the key
std::map<std::string, double> m;
Expand Down Expand Up @@ -548,6 +558,47 @@ TEST_F(GcsAutoscalerStateManagerTest, TestClusterResourcesConstraint) {
}
}

TEST_F(GcsAutoscalerStateManagerTest, TestReportAutoscalingState) {
// Empty autoscaling state.
{
const auto &autoscaling_state = gcs_autoscaler_state_manager_->autoscaling_state_;
ASSERT_EQ(autoscaling_state, absl::nullopt);
}

// Return the updated state.
{
rpc::autoscaler::AutoscalingState actual_state;
actual_state.set_autoscaler_state_version(1);
ReportAutoscalingState(actual_state);

const auto &autoscaling_state = gcs_autoscaler_state_manager_->autoscaling_state_;
ASSERT_NE(autoscaling_state, absl::nullopt);
ASSERT_EQ(autoscaling_state->autoscaler_state_version(), 1);
}

// Reject an older version.
{
rpc::autoscaler::AutoscalingState state;
state.set_autoscaler_state_version(0);
ReportAutoscalingState(state);

const auto &autoscaling_state = gcs_autoscaler_state_manager_->autoscaling_state_;
ASSERT_NE(autoscaling_state, absl::nullopt);
ASSERT_EQ(autoscaling_state->autoscaler_state_version(), 1);
}

// Update with a new version.
{
rpc::autoscaler::AutoscalingState state;
state.set_autoscaler_state_version(2);
ReportAutoscalingState(state);

const auto &autoscaling_state = gcs_autoscaler_state_manager_->autoscaling_state_;
ASSERT_NE(autoscaling_state, absl::nullopt);
ASSERT_EQ(autoscaling_state->autoscaler_state_version(), 2);
}
}

} // namespace gcs
} // namespace ray

Expand Down

0 comments on commit 6c841fc

Please sign in to comment.