Skip to content

Commit

Permalink
weightedroundrobin: prefer application_utilization to cpu_utilization (
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley committed Jun 7, 2023
1 parent 7aeea8f commit 2ac1aae
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 44 deletions.
8 changes: 6 additions & 2 deletions balancer/weightedroundrobin/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,11 @@ func (w *weightedSubConn) OnLoadReport(load *v3orcapb.OrcaLoadReport) {
w.logger.Infof("Received load report for subchannel %v: %v", w.SubConn, load)
}
// Update weights of this subchannel according to the reported load
if load.CpuUtilization == 0 || load.RpsFractional == 0 {
utilization := load.ApplicationUtilization
if utilization == 0 {
utilization = load.CpuUtilization
}
if utilization == 0 || load.RpsFractional == 0 {
if w.logger.V(2) {
w.logger.Infof("Ignoring empty load report for subchannel %v", w.SubConn)
}
Expand All @@ -430,7 +434,7 @@ func (w *weightedSubConn) OnLoadReport(load *v3orcapb.OrcaLoadReport) {
defer w.mu.Unlock()

errorRate := load.Eps / load.RpsFractional
w.weightVal = load.RpsFractional / (load.CpuUtilization + errorRate*w.cfg.ErrorUtilizationPenalty)
w.weightVal = load.RpsFractional / (utilization + errorRate*w.cfg.ErrorUtilizationPenalty)
if w.logger.V(2) {
w.logger.Infof("New weight for subchannel %v: %v", w.SubConn, w.weightVal)
}
Expand Down
109 changes: 67 additions & 42 deletions balancer/weightedroundrobin/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func startServer(t *testing.T, r reportType) *testServer {
if r := orca.CallMetricsRecorderFromContext(ctx); r != nil {
// Copy metrics from what the test set in cmr into r.
sm := cmr.(orca.ServerMetricsProvider).ServerMetrics()
r.SetCPUUtilization(sm.CPUUtilization)
r.SetApplicationUtilization(sm.AppUtilization)
r.SetQPS(sm.QPS)
r.SetEPS(sm.EPS)
}
Expand Down Expand Up @@ -230,10 +230,10 @@ func (s) TestBalancer_TwoAddresses_ReportingEnabledPerCall(t *testing.T) {
// srv1 starts loaded and srv2 starts without load; ensure RPCs are routed
// disproportionately to srv2 (10:1).
srv1.callMetrics.SetQPS(10.0)
srv1.callMetrics.SetCPUUtilization(1.0)
srv1.callMetrics.SetApplicationUtilization(1.0)

srv2.callMetrics.SetQPS(10.0)
srv2.callMetrics.SetCPUUtilization(.1)
srv2.callMetrics.SetApplicationUtilization(.1)

sc := svcConfig(t, perCallConfig)
if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil {
Expand All @@ -253,33 +253,58 @@ func (s) TestBalancer_TwoAddresses_ReportingEnabledPerCall(t *testing.T) {
// Tests two addresses with OOB ORCA reporting enabled. Checks the backends
// are called in the appropriate ratios.
func (s) TestBalancer_TwoAddresses_ReportingEnabledOOB(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
testCases := []struct {
name string
utilSetter func(orca.ServerMetricsRecorder, float64)
}{{
name: "application_utilization",
utilSetter: func(smr orca.ServerMetricsRecorder, val float64) {
smr.SetApplicationUtilization(val)
},
}, {
name: "cpu_utilization",
utilSetter: func(smr orca.ServerMetricsRecorder, val float64) {
smr.SetCPUUtilization(val)
},
}, {
name: "application over cpu",
utilSetter: func(smr orca.ServerMetricsRecorder, val float64) {
smr.SetApplicationUtilization(val)
smr.SetCPUUtilization(2.0) // ignored because ApplicationUtilization is set
},
}}

srv1 := startServer(t, reportOOB)
srv2 := startServer(t, reportOOB)
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

// srv1 starts loaded and srv2 starts without load; ensure RPCs are routed
// disproportionately to srv2 (10:1).
srv1.oobMetrics.SetQPS(10.0)
srv1.oobMetrics.SetCPUUtilization(1.0)
srv1 := startServer(t, reportOOB)
srv2 := startServer(t, reportOOB)

srv2.oobMetrics.SetQPS(10.0)
srv2.oobMetrics.SetCPUUtilization(.1)
// srv1 starts loaded and srv2 starts without load; ensure RPCs are routed
// disproportionately to srv2 (10:1).
srv1.oobMetrics.SetQPS(10.0)
tc.utilSetter(srv1.oobMetrics, 1.0)

sc := svcConfig(t, oobConfig)
if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil {
t.Fatalf("Error starting client: %v", err)
}
addrs := []resolver.Address{{Addr: srv1.Address}, {Addr: srv2.Address}}
srv1.R.UpdateState(resolver.State{Addresses: addrs})
srv2.oobMetrics.SetQPS(10.0)
tc.utilSetter(srv2.oobMetrics, 0.1)

// Call each backend once to ensure the weights have been received.
ensureReached(ctx, t, srv1.Client, 2)
sc := svcConfig(t, oobConfig)
if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil {
t.Fatalf("Error starting client: %v", err)
}
addrs := []resolver.Address{{Addr: srv1.Address}, {Addr: srv2.Address}}
srv1.R.UpdateState(resolver.State{Addresses: addrs})

// Wait for the weight update period to allow the new weights to be processed.
time.Sleep(weightUpdatePeriod)
checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 10})
// Call each backend once to ensure the weights have been received.
ensureReached(ctx, t, srv1.Client, 2)

// Wait for the weight update period to allow the new weights to be processed.
time.Sleep(weightUpdatePeriod)
checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 10})
})
}
}

// Tests two addresses with OOB ORCA reporting enabled, where the reports
Expand All @@ -295,10 +320,10 @@ func (s) TestBalancer_TwoAddresses_UpdateLoads(t *testing.T) {
// srv1 starts loaded and srv2 starts without load; ensure RPCs are routed
// disproportionately to srv2 (10:1).
srv1.oobMetrics.SetQPS(10.0)
srv1.oobMetrics.SetCPUUtilization(1.0)
srv1.oobMetrics.SetApplicationUtilization(1.0)

srv2.oobMetrics.SetQPS(10.0)
srv2.oobMetrics.SetCPUUtilization(.1)
srv2.oobMetrics.SetApplicationUtilization(.1)

sc := svcConfig(t, oobConfig)
if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil {
Expand All @@ -317,10 +342,10 @@ func (s) TestBalancer_TwoAddresses_UpdateLoads(t *testing.T) {
// Update the loads so srv2 is loaded and srv1 is not; ensure RPCs are
// routed disproportionately to srv1.
srv1.oobMetrics.SetQPS(10.0)
srv1.oobMetrics.SetCPUUtilization(.1)
srv1.oobMetrics.SetApplicationUtilization(.1)

srv2.oobMetrics.SetQPS(10.0)
srv2.oobMetrics.SetCPUUtilization(1.0)
srv2.oobMetrics.SetApplicationUtilization(1.0)

// Wait for the weight update period to allow the new weights to be processed.
time.Sleep(weightUpdatePeriod + oobReportingInterval)
Expand All @@ -340,19 +365,19 @@ func (s) TestBalancer_TwoAddresses_OOBThenPerCall(t *testing.T) {
// srv1 starts loaded and srv2 starts without load; ensure RPCs are routed
// disproportionately to srv2 (10:1).
srv1.oobMetrics.SetQPS(10.0)
srv1.oobMetrics.SetCPUUtilization(1.0)
srv1.oobMetrics.SetApplicationUtilization(1.0)

srv2.oobMetrics.SetQPS(10.0)
srv2.oobMetrics.SetCPUUtilization(.1)
srv2.oobMetrics.SetApplicationUtilization(.1)

// For per-call metrics (not used initially), srv2 reports that it is
// loaded and srv1 reports low load. After confirming OOB works, switch to
// per-call and confirm the new routing weights are applied.
srv1.callMetrics.SetQPS(10.0)
srv1.callMetrics.SetCPUUtilization(.1)
srv1.callMetrics.SetApplicationUtilization(.1)

srv2.callMetrics.SetQPS(10.0)
srv2.callMetrics.SetCPUUtilization(1.0)
srv2.callMetrics.SetApplicationUtilization(1.0)

sc := svcConfig(t, oobConfig)
if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil {
Expand Down Expand Up @@ -396,13 +421,13 @@ func (s) TestBalancer_TwoAddresses_ErrorPenalty(t *testing.T) {
// to 0.9 which will cause the weights to be equal and RPCs to be routed
// 50/50.
srv1.oobMetrics.SetQPS(10.0)
srv1.oobMetrics.SetCPUUtilization(1.0)
srv1.oobMetrics.SetApplicationUtilization(1.0)
srv1.oobMetrics.SetEPS(0)
// srv1 weight before: 10.0 / 1.0 = 10.0
// srv1 weight after: 10.0 / 1.0 = 10.0

srv2.oobMetrics.SetQPS(10.0)
srv2.oobMetrics.SetCPUUtilization(.1)
srv2.oobMetrics.SetApplicationUtilization(.1)
srv2.oobMetrics.SetEPS(10.0)
// srv2 weight before: 10.0 / 0.1 = 100.0
// srv2 weight after: 10.0 / 1.0 = 10.0
Expand Down Expand Up @@ -476,10 +501,10 @@ func (s) TestBalancer_TwoAddresses_BlackoutPeriod(t *testing.T) {
// srv1 starts loaded and srv2 starts without load; ensure RPCs are routed
// disproportionately to srv2 (10:1).
srv1.oobMetrics.SetQPS(10.0)
srv1.oobMetrics.SetCPUUtilization(1.0)
srv1.oobMetrics.SetApplicationUtilization(1.0)

srv2.oobMetrics.SetQPS(10.0)
srv2.oobMetrics.SetCPUUtilization(.1)
srv2.oobMetrics.SetApplicationUtilization(.1)

cfg := oobConfig
cfg.BlackoutPeriod = tc.blackoutPeriodCfg
Expand Down Expand Up @@ -544,10 +569,10 @@ func (s) TestBalancer_TwoAddresses_WeightExpiration(t *testing.T) {
// is 1 minute but the weights expire in 1 second, routing will go to 50/50
// after the weights expire.
srv1.oobMetrics.SetQPS(10.0)
srv1.oobMetrics.SetCPUUtilization(1.0)
srv1.oobMetrics.SetApplicationUtilization(1.0)

srv2.oobMetrics.SetQPS(10.0)
srv2.oobMetrics.SetCPUUtilization(.1)
srv2.oobMetrics.SetApplicationUtilization(.1)

cfg := oobConfig
cfg.OOBReportingPeriod = stringp("60s")
Expand Down Expand Up @@ -594,16 +619,16 @@ func (s) TestBalancer_AddressesChanging(t *testing.T) {

// srv1: weight 10
srv1.oobMetrics.SetQPS(10.0)
srv1.oobMetrics.SetCPUUtilization(1.0)
srv1.oobMetrics.SetApplicationUtilization(1.0)
// srv2: weight 100
srv2.oobMetrics.SetQPS(10.0)
srv2.oobMetrics.SetCPUUtilization(.1)
srv2.oobMetrics.SetApplicationUtilization(.1)
// srv3: weight 20
srv3.oobMetrics.SetQPS(20.0)
srv3.oobMetrics.SetCPUUtilization(1.0)
srv3.oobMetrics.SetApplicationUtilization(1.0)
// srv4: weight 200
srv4.oobMetrics.SetQPS(20.0)
srv4.oobMetrics.SetCPUUtilization(.1)
srv4.oobMetrics.SetApplicationUtilization(.1)

sc := svcConfig(t, oobConfig)
if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil {
Expand Down

0 comments on commit 2ac1aae

Please sign in to comment.