Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds/clusterimpl: update UpdateClientConnState to handle updates synchronously #7533

Merged
merged 3 commits into from
Aug 30, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 51 additions & 95 deletions xds/internal/balancer/clusterimpl/clusterimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
package clusterimpl

import (
"context"
"encoding/json"
"fmt"
"sync"
Expand All @@ -33,7 +34,6 @@
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancer/gracefulswitch"
"google.golang.org/grpc/internal/buffer"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/pretty"
Expand Down Expand Up @@ -62,18 +62,18 @@
type bb struct{}

func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
ctx, cancel := context.WithCancel(context.Background())

easwars marked this conversation as resolved.
Show resolved Hide resolved
b := &clusterImplBalancer{
ClientConn: cc,
bOpts: bOpts,
closed: grpcsync.NewEvent(),
done: grpcsync.NewEvent(),
loadWrapper: loadstore.NewWrapper(),
pickerUpdateCh: buffer.NewUnbounded(),
requestCountMax: defaultRequestCountMax,
ClientConn: cc,
bOpts: bOpts,
loadWrapper: loadstore.NewWrapper(),
requestCountMax: defaultRequestCountMax,
serializer: grpcsync.NewCallbackSerializer(ctx),
serializerCancel: cancel,
}
b.logger = prefixLogger(b)
b.child = gracefulswitch.NewBalancer(b, bOpts)
go b.run()
b.logger.Infof("Created")
return b
}
Expand All @@ -89,18 +89,6 @@
type clusterImplBalancer struct {
balancer.ClientConn

// mu guarantees mutual exclusion between Close() and handling of picker
// update to the parent ClientConn in run(). It's to make sure that the
// run() goroutine doesn't send picker update to parent after the balancer
// is closed.
//
// It's only used by the run() goroutine, but not the other exported
// functions. Because the exported functions are guaranteed to be
// synchronized with Close().
mu sync.Mutex
closed *grpcsync.Event
done *grpcsync.Event

bOpts balancer.BuildOptions
logger *grpclog.PrefixLogger
xdsClient xdsclient.XDSClient
Expand All @@ -115,10 +103,11 @@
clusterNameMu sync.Mutex
clusterName string

serializer *grpcsync.CallbackSerializer
serializerCancel context.CancelFunc

// childState/drops/requestCounter keeps the state used by the most recently
// generated picker. All fields can only be accessed in run(). And run() is
// the only goroutine that sends picker to the parent ClientConn. All
// requests to update picker need to be sent to pickerUpdateCh.
// generated picker.
childState balancer.State
dropCategories []DropConfig // The categories for drops.
drops []*dropper
Expand All @@ -127,7 +116,6 @@
requestCounter *xdsclient.ClusterRequestsCounter
requestCountMax uint32
telemetryLabels map[string]string
pickerUpdateCh *buffer.Unbounded
}

// updateLoadStore checks the config for load store, and decides whether it
Expand Down Expand Up @@ -209,11 +197,6 @@
}

func (b *clusterImplBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
if b.closed.HasFired() {
b.logger.Warningf("xds: received ClientConnState {%+v} after clusterImplBalancer was closed", s)
return nil
}

if b.logger.V(2) {
b.logger.Infof("Received update from resolver, balancer config: %s", pretty.ToJSON(s.BalancerConfig))
easwars marked this conversation as resolved.
Show resolved Hide resolved
}
Expand Down Expand Up @@ -253,9 +236,23 @@
}
b.config = newConfig

// Notify run() of this new config, in case drop and request counter need
// update (which means a new picker needs to be generated).
b.pickerUpdateCh.Put(newConfig)
callback := func(context.Context) {
easwars marked this conversation as resolved.
Show resolved Hide resolved
b.telemetryLabels = newConfig.TelemetryLabels
dc := b.handleDropAndRequestCount(newConfig)
if dc != nil && b.childState.Picker != nil {
b.ClientConn.UpdateState(balancer.State{
ConnectivityState: b.childState.ConnectivityState,
Picker: b.newPicker(dc),
})
}
}

onFailure := func() {
// Handle the case where scheduling fails (e.g., if the serializer is closed)
b.logger.Warningf("xds: failed to schedule config update, balancer might be closed")

Check warning on line 252 in xds/internal/balancer/clusterimpl/clusterimpl.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/balancer/clusterimpl/clusterimpl.go#L252

Added line #L252 was not covered by tests
}
// Schedule the config update on the serializer
b.serializer.ScheduleOr(callback, onFailure)

// Addresses and sub-balancer config are sent to sub-balancer.
return b.child.UpdateClientConnState(balancer.ClientConnState{
Expand All @@ -265,19 +262,10 @@
}

func (b *clusterImplBalancer) ResolverError(err error) {
easwars marked this conversation as resolved.
Show resolved Hide resolved
if b.closed.HasFired() {
b.logger.Warningf("xds: received resolver error {%+v} after clusterImplBalancer was closed", err)
return
}
b.child.ResolverError(err)
}

func (b *clusterImplBalancer) updateSubConnState(sc balancer.SubConn, s balancer.SubConnState, cb func(balancer.SubConnState)) {
if b.closed.HasFired() {
b.logger.Warningf("xds: received subconn state change {%+v, %+v} after clusterImplBalancer was closed", sc, s)
return
}

// Trigger re-resolution when a SubConn turns transient failure. This is
// necessary for the LogicalDNS in cluster_resolver policy to re-resolve.
//
Expand All @@ -299,14 +287,15 @@
}

func (b *clusterImplBalancer) Close() {
easwars marked this conversation as resolved.
Show resolved Hide resolved
b.mu.Lock()
b.closed.Fire()
b.mu.Unlock()
if b.cancelLoadReport != nil {
b.cancelLoadReport()
b.cancelLoadReport = nil
}

b.serializerCancel()
<-b.serializer.Done()
b.child.Close()
b.childState = balancer.State{}
b.pickerUpdateCh.Close()
<-b.done.Done()
b.logger.Infof("Shutdown")
}

easwars marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -317,8 +306,21 @@
// Override methods to accept updates from the child LB.

func (b *clusterImplBalancer) UpdateState(state balancer.State) {
// Instead of updating parent ClientConn inline, send state to run().
b.pickerUpdateCh.Put(state)
// Schedule picker update on the serializer using ScheduleOr
b.serializer.ScheduleOr(func(context.Context) {
easwars marked this conversation as resolved.
Show resolved Hide resolved
b.childState = state
b.ClientConn.UpdateState(balancer.State{
ConnectivityState: b.childState.ConnectivityState,
Picker: b.newPicker(&dropConfigs{
drops: b.drops,
requestCounter: b.requestCounter,
requestCountMax: b.requestCountMax,
}),
})
}, func() {
// Handle the case where scheduling fails
b.logger.Warningf("xds: failed to schedule picker update, balancer might be closed")
})

Check warning on line 323 in xds/internal/balancer/clusterimpl/clusterimpl.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/balancer/clusterimpl/clusterimpl.go#L322-L323

Added lines #L322 - L323 were not covered by tests
}

func (b *clusterImplBalancer) setClusterName(n string) {
Expand Down Expand Up @@ -464,49 +466,3 @@
requestCountMax: b.requestCountMax,
}
}

easwars marked this conversation as resolved.
Show resolved Hide resolved
func (b *clusterImplBalancer) run() {
defer b.done.Fire()
for {
select {
case update, ok := <-b.pickerUpdateCh.Get():
if !ok {
return
}
b.pickerUpdateCh.Load()
b.mu.Lock()
if b.closed.HasFired() {
b.mu.Unlock()
return
}
switch u := update.(type) {
case balancer.State:
b.childState = u
b.ClientConn.UpdateState(balancer.State{
ConnectivityState: b.childState.ConnectivityState,
Picker: b.newPicker(&dropConfigs{
drops: b.drops,
requestCounter: b.requestCounter,
requestCountMax: b.requestCountMax,
}),
})
case *LBConfig:
b.telemetryLabels = u.TelemetryLabels
dc := b.handleDropAndRequestCount(u)
if dc != nil && b.childState.Picker != nil {
b.ClientConn.UpdateState(balancer.State{
ConnectivityState: b.childState.ConnectivityState,
Picker: b.newPicker(dc),
})
}
}
b.mu.Unlock()
case <-b.closed.Done():
if b.cancelLoadReport != nil {
b.cancelLoadReport()
b.cancelLoadReport = nil
}
return
}
}
}
Loading