Skip to content

Commit

Permalink
[WRR] backport #33694 to 1.56 (#33698)
Browse files Browse the repository at this point in the history
  • Loading branch information
markdroth authored Jul 14, 2023
1 parent 6425ed2 commit 484820a
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ class OutlierDetectionLb : public LoadBalancingPolicy {
}

void DisableEjection() {
Uneject();
if (ejection_time_.has_value()) Uneject();
multiplier_ = 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -909,12 +909,21 @@ void WeightedRoundRobin::WeightedRoundRobinSubchannelData::
subchannel()->RequestConnection();
} else if (new_state == GRPC_CHANNEL_READY) {
// If we transition back to READY state, restart the blackout period.
// Skip this if this is the initial notification for this
// subchannel (which happens whenever we get updated addresses and
// create a new endpoint list). Also skip it if the previous state
// was READY (which should never happen in practice, but we've seen
// at least one bug that caused this in the outlier_detection
// policy, so let's be defensive here).
//
// Note that we cannot guarantee that we will never receive
// lingering callbacks for backend metric reports from the previous
// connection after the new connection has been established, but they
// should be masked by new backend metric reports from the new
// connection by the time the blackout period ends.
weight_->ResetNonEmptySince();
if (old_state.has_value() && old_state != GRPC_CHANNEL_READY) {
weight_->ResetNonEmptySince();
}
}
// Update logical connectivity state.
UpdateLogicalConnectivityStateLocked(new_state);
Expand Down
42 changes: 41 additions & 1 deletion test/core/client_channel/lb_policy/weighted_round_robin_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ class WeightedRoundRobinTest : public TimeAwareLoadBalancingPolicyTest {
backend_metrics,
std::map<absl::string_view /*address*/, size_t /*num_picks*/> expected,
absl::Duration timeout = absl::Seconds(5),
bool run_timer_callbacks = true,
SourceLocation location = SourceLocation()) {
gpr_log(GPR_INFO, "==> WaitForWeightedRoundRobinPicks(): Expecting %s",
PickMapString(expected).c_str());
Expand Down Expand Up @@ -308,7 +309,7 @@ class WeightedRoundRobinTest : public TimeAwareLoadBalancingPolicyTest {
EXPECT_NE(*picker, nullptr)
<< location.file() << ":" << location.line();
if (*picker == nullptr) return false;
} else {
} else if (run_timer_callbacks) {
gpr_log(GPR_INFO, "running timer callback...");
RunTimerCallback();
}
Expand Down Expand Up @@ -803,6 +804,45 @@ TEST_F(WeightedRoundRobinTest, BlackoutPeriodAfterDisconnect) {
{{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}});
}

TEST_F(WeightedRoundRobinTest, BlackoutPeriodDoesNotGetResetAfterUpdate) {
// Send address list to LB policy.
const std::array<absl::string_view, 3> kAddresses = {
"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
auto config_builder =
ConfigBuilder().SetWeightExpirationPeriod(Duration::Seconds(2));
auto picker =
SendInitialUpdateAndWaitForConnected(kAddresses, config_builder);
ASSERT_NE(picker, nullptr);
// All backends report weights.
WaitForWeightedRoundRobinPicks(
&picker,
{{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
/*qps=*/100.0, /*eps=*/0.0)},
{kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
/*qps=*/100.0, /*eps=*/0.0)},
{kAddresses[2], MakeBackendMetricData(/*app_utilization=*/0.3,
/*qps=*/100.0, /*eps=*/0.0)}},
{{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}});
// Send a duplicate update with the same addresses and config.
EXPECT_EQ(ApplyUpdate(BuildUpdate(kAddresses, config_builder.Build()),
lb_policy_.get()),
absl::OkStatus());
// Note that we have not advanced time, so if the update incorrectly
// triggers resetting the blackout period, none of the weights will
// actually be used.
picker = ExpectState(GRPC_CHANNEL_READY, absl::OkStatus());
WaitForWeightedRoundRobinPicks(
&picker,
{{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
/*qps=*/100.0, /*eps=*/0.0)},
{kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
/*qps=*/100.0, /*eps=*/0.0)},
{kAddresses[2], MakeBackendMetricData(/*app_utilization=*/0.3,
/*qps=*/100.0, /*eps=*/0.0)}},
{{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}},
/*timeout=*/absl::Seconds(5), /*run_timer_callbacks=*/false);
}

TEST_F(WeightedRoundRobinTest, ZeroErrorUtilPenalty) {
// Send address list to LB policy.
const std::array<absl::string_view, 3> kAddresses = {
Expand Down
78 changes: 76 additions & 2 deletions test/cpp/end2end/client_lb_end2end_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3174,13 +3174,27 @@ const char kServiceConfigOob[] =
" ]\n"
"}";

const char kServiceConfigWithOutlierDetection[] =
"{\n"
" \"loadBalancingConfig\": [\n"
" {\"outlier_detection_experimental\": {\n"
" \"childPolicy\": [\n"
" {\"weighted_round_robin\": {\n"
" \"blackoutPeriod\": \"%ds\",\n"
" \"weightUpdatePeriod\": \"0.1s\"\n"
" }}\n"
" ]\n"
" }}\n"
" ]\n"
"}";

class WeightedRoundRobinTest : public ClientLbEnd2endTest {
protected:
void ExpectWeightedRoundRobinPicks(
const grpc_core::DebugLocation& location,
const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
const std::vector<size_t>& expected_weights, size_t total_passes = 3,
EchoRequest* request_ptr = nullptr) {
EchoRequest* request_ptr = nullptr, int timeout_ms = 15000) {
GPR_ASSERT(expected_weights.size() == servers_.size());
size_t total_picks_per_pass = 0;
for (size_t picks : expected_weights) {
Expand Down Expand Up @@ -3210,7 +3224,7 @@ class WeightedRoundRobinTest : public ClientLbEnd2endTest {
}
return true;
},
request_ptr);
request_ptr, timeout_ms);
}
};

Expand Down Expand Up @@ -3254,6 +3268,66 @@ TEST_F(WeightedRoundRobinTest, CallAndServerMetric) {
EXPECT_EQ("weighted_round_robin", channel->GetLoadBalancingPolicyName());
}

// This tests a bug seen in production where the outlier_detection
// policy would incorrectly generate a duplicate READY notification on
// all of its subchannels every time it saw an update, thus causing the
// WRR policy to re-enter the blackout period for that address.
TEST_F(WeightedRoundRobinTest, WithOutlierDetection) {
const int kBlackoutPeriodSeconds = 5;
const int kNumServers = 3;
StartServers(kNumServers);
// Report server metrics that should give 6:4:3 WRR picks.
// weights = qps / (util + (eps/qps)) =
// 1/(0.2+0.2) : 1/(0.3+0.3) : 2/(1.5+0.1) = 6:4:3
// where util is app_util if set, or cpu_util.
servers_[0]->server_metric_recorder_->SetApplicationUtilization(0.2);
servers_[0]->server_metric_recorder_->SetEps(20);
servers_[0]->server_metric_recorder_->SetQps(100);
servers_[1]->server_metric_recorder_->SetApplicationUtilization(0.3);
servers_[1]->server_metric_recorder_->SetEps(30);
servers_[1]->server_metric_recorder_->SetQps(100);
servers_[2]->server_metric_recorder_->SetApplicationUtilization(1.5);
servers_[2]->server_metric_recorder_->SetEps(20);
servers_[2]->server_metric_recorder_->SetQps(200);
// Create channel.
// Initial blackout period is 0, so that we start seeing traffic in
// the right proportions right away.
auto response_generator = BuildResolverResponseGenerator();
auto channel = BuildChannel("", response_generator);
auto stub = BuildStub(channel);
response_generator.SetNextResolution(
GetServersPorts(),
absl::StrFormat(kServiceConfigWithOutlierDetection, 0).c_str());
// Send requests with per-call reported EPS/QPS set to 0/100.
// This should give 1/2:1/3:1/15 = 15:10:2 WRR picks.
// Keep sending RPCs long enough to go past the new blackout period
// that we're going to add later.
absl::Time deadline =
absl::Now() +
absl::Seconds(kBlackoutPeriodSeconds * grpc_test_slowdown_factor());
EchoRequest request;
// We cannot override with 0 with proto3, so setting it to almost 0.
request.mutable_param()->mutable_backend_metrics()->set_eps(
std::numeric_limits<double>::min());
request.mutable_param()->mutable_backend_metrics()->set_rps_fractional(100);
do {
ExpectWeightedRoundRobinPicks(DEBUG_LOCATION, stub,
/*expected_weights=*/{15, 10, 2},
/*total_passes=*/3, &request);
} while (absl::Now() < deadline);
// Send a new resolver response that increases blackout period.
response_generator.SetNextResolution(
GetServersPorts(),
absl::StrFormat(kServiceConfigWithOutlierDetection,
kBlackoutPeriodSeconds * grpc_test_slowdown_factor())
.c_str());
// Weights should be the same before the blackout period expires.
ExpectWeightedRoundRobinPicks(
DEBUG_LOCATION, stub, /*expected_weights=*/{15, 10, 2},
/*total_passes=*/3, &request,
/*timeout_ms=*/(kBlackoutPeriodSeconds - 1) * 1000);
}

class WeightedRoundRobinParamTest
: public WeightedRoundRobinTest,
public ::testing::WithParamInterface<const char*> {};
Expand Down

0 comments on commit 484820a

Please sign in to comment.