Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds/clusterimpl: update UpdateClientConnState to handle updates synchronously #7533

Merged
merged 3 commits into from
Aug 30, 2024
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 87 additions & 119 deletions xds/internal/balancer/clusterimpl/clusterimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
package clusterimpl

import (
"context"
"encoding/json"
"fmt"
"sync"
Expand All @@ -33,7 +34,6 @@
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancer/gracefulswitch"
"google.golang.org/grpc/internal/buffer"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/pretty"
Expand All @@ -53,7 +53,10 @@
defaultRequestCountMax = 1024
)

var connectedAddress = internal.ConnectedAddress.(func(balancer.SubConnState) resolver.Address)
var (
connectedAddress = internal.ConnectedAddress.(func(balancer.SubConnState) resolver.Address)
errBalancerClosed = fmt.Errorf("%s LB policy is closed", Name)
)

func init() {
balancer.Register(bb{})
Expand All @@ -62,18 +65,17 @@
type bb struct{}

func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
ctx, cancel := context.WithCancel(context.Background())
b := &clusterImplBalancer{
ClientConn: cc,
bOpts: bOpts,
closed: grpcsync.NewEvent(),
done: grpcsync.NewEvent(),
loadWrapper: loadstore.NewWrapper(),
pickerUpdateCh: buffer.NewUnbounded(),
requestCountMax: defaultRequestCountMax,
ClientConn: cc,
bOpts: bOpts,
loadWrapper: loadstore.NewWrapper(),
requestCountMax: defaultRequestCountMax,
serializer: grpcsync.NewCallbackSerializer(ctx),
serializerCancel: cancel,
}
b.logger = prefixLogger(b)
b.child = gracefulswitch.NewBalancer(b, bOpts)
go b.run()
b.logger.Infof("Created")
return b
}
Expand All @@ -89,18 +91,6 @@
type clusterImplBalancer struct {
balancer.ClientConn

// mu guarantees mutual exclusion between Close() and handling of picker
// update to the parent ClientConn in run(). It's to make sure that the
// run() goroutine doesn't send picker update to parent after the balancer
// is closed.
//
// It's only used by the run() goroutine, but not the other exported
// functions. Because the exported functions are guaranteed to be
// synchronized with Close().
mu sync.Mutex
closed *grpcsync.Event
done *grpcsync.Event

bOpts balancer.BuildOptions
logger *grpclog.PrefixLogger
xdsClient xdsclient.XDSClient
Expand All @@ -115,10 +105,11 @@
clusterNameMu sync.Mutex
clusterName string

serializer *grpcsync.CallbackSerializer
serializerCancel context.CancelFunc

// childState/drops/requestCounter keeps the state used by the most recently
// generated picker. All fields can only be accessed in run(). And run() is
// the only goroutine that sends picker to the parent ClientConn. All
// requests to update picker need to be sent to pickerUpdateCh.
// generated picker.
childState balancer.State
dropCategories []DropConfig // The categories for drops.
drops []*dropper
Expand All @@ -127,7 +118,6 @@
requestCounter *xdsclient.ClusterRequestsCounter
requestCountMax uint32
telemetryLabels map[string]string
pickerUpdateCh *buffer.Unbounded
}

// updateLoadStore checks the config for load store, and decides whether it
Expand Down Expand Up @@ -208,14 +198,9 @@
return nil
}

func (b *clusterImplBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
if b.closed.HasFired() {
b.logger.Warningf("xds: received ClientConnState {%+v} after clusterImplBalancer was closed", s)
return nil
}

func (b *clusterImplBalancer) updateClientConnState(s balancer.ClientConnState) error {
if b.logger.V(2) {
b.logger.Infof("Received update from resolver, balancer config: %s", pretty.ToJSON(s.BalancerConfig))
b.logger.Infof("Received configuration: %s", pretty.ToJSON(s.BalancerConfig))

Check warning on line 203 in xds/internal/balancer/clusterimpl/clusterimpl.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/balancer/clusterimpl/clusterimpl.go#L203

Added line #L203 was not covered by tests
}
newConfig, ok := s.BalancerConfig.(*LBConfig)
if !ok {
Expand All @@ -227,7 +212,7 @@
// it.
bb := balancer.Get(newConfig.ChildPolicy.Name)
if bb == nil {
return fmt.Errorf("balancer %q not registered", newConfig.ChildPolicy.Name)
return fmt.Errorf("child policy %q not registered", newConfig.ChildPolicy.Name)

Check warning on line 215 in xds/internal/balancer/clusterimpl/clusterimpl.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/balancer/clusterimpl/clusterimpl.go#L215

Added line #L215 was not covered by tests
}

if b.xdsClient == nil {
Expand All @@ -253,9 +238,14 @@
}
b.config = newConfig

// Notify run() of this new config, in case drop and request counter need
// update (which means a new picker needs to be generated).
b.pickerUpdateCh.Put(newConfig)
b.telemetryLabels = newConfig.TelemetryLabels
dc := b.handleDropAndRequestCount(newConfig)
if dc != nil && b.childState.Picker != nil {
b.ClientConn.UpdateState(balancer.State{
ConnectivityState: b.childState.ConnectivityState,
Picker: b.newPicker(dc),
})
}

// Addresses and sub-balancer config are sent to sub-balancer.
return b.child.UpdateClientConnState(balancer.ClientConnState{
Expand All @@ -264,20 +254,28 @@
})
}

func (b *clusterImplBalancer) ResolverError(err error) {
if b.closed.HasFired() {
b.logger.Warningf("xds: received resolver error {%+v} after clusterImplBalancer was closed", err)
return
func (b *clusterImplBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
// Handle the update in a blocking fashion.
errCh := make(chan error, 1)
callback := func(context.Context) {
errCh <- b.updateClientConnState(s)
}
b.child.ResolverError(err)
onFailure := func() {
// The call to Schedule returns false *only* if the serializer has been
// closed, which happens only when we receive an update after close.
aranjans marked this conversation as resolved.
Show resolved Hide resolved
errCh <- errBalancerClosed

Check warning on line 266 in xds/internal/balancer/clusterimpl/clusterimpl.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/balancer/clusterimpl/clusterimpl.go#L266

Added line #L266 was not covered by tests
}
b.serializer.ScheduleOr(callback, onFailure)
return <-errCh
}

func (b *clusterImplBalancer) updateSubConnState(sc balancer.SubConn, s balancer.SubConnState, cb func(balancer.SubConnState)) {
if b.closed.HasFired() {
b.logger.Warningf("xds: received subconn state change {%+v, %+v} after clusterImplBalancer was closed", sc, s)
return
}
func (b *clusterImplBalancer) ResolverError(err error) {
easwars marked this conversation as resolved.
Show resolved Hide resolved
b.serializer.TrySchedule(func(context.Context) {
b.child.ResolverError(err)
})
}

func (b *clusterImplBalancer) updateSubConnState(sc balancer.SubConn, s balancer.SubConnState, cb func(balancer.SubConnState)) {
// Trigger re-resolution when a SubConn turns transient failure. This is
// necessary for the LogicalDNS in cluster_resolver policy to re-resolve.
//
Expand All @@ -299,26 +297,40 @@
}

func (b *clusterImplBalancer) Close() {
easwars marked this conversation as resolved.
Show resolved Hide resolved
b.mu.Lock()
b.closed.Fire()
b.mu.Unlock()

b.child.Close()
b.childState = balancer.State{}
b.pickerUpdateCh.Close()
<-b.done.Done()
b.logger.Infof("Shutdown")
b.serializer.TrySchedule(func(ctx context.Context) {
b.child.Close()
b.childState = balancer.State{}

if b.cancelLoadReport != nil {
b.cancelLoadReport()
b.cancelLoadReport = nil
}
b.logger.Infof("Shutdown")
})
b.serializerCancel()
<-b.serializer.Done()
}

easwars marked this conversation as resolved.
Show resolved Hide resolved
func (b *clusterImplBalancer) ExitIdle() {
b.child.ExitIdle()
b.serializer.TrySchedule(func(context.Context) {
b.child.ExitIdle()
})
}

// Override methods to accept updates from the child LB.

func (b *clusterImplBalancer) UpdateState(state balancer.State) {
// Instead of updating parent ClientConn inline, send state to run().
b.pickerUpdateCh.Put(state)
b.serializer.TrySchedule(func(context.Context) {
b.childState = state
b.ClientConn.UpdateState(balancer.State{
ConnectivityState: b.childState.ConnectivityState,
Picker: b.newPicker(&dropConfigs{
drops: b.drops,
requestCounter: b.requestCounter,
requestCountMax: b.requestCountMax,
}),
})
})
}

func (b *clusterImplBalancer) setClusterName(n string) {
Expand Down Expand Up @@ -370,21 +382,23 @@
scw := &scWrapper{}
oldListener := opts.StateListener
opts.StateListener = func(state balancer.SubConnState) {
b.updateSubConnState(sc, state, oldListener)
if state.ConnectivityState != connectivity.Ready {
return
}
// Read connected address and call updateLocalityID() based on the connected
// address's locality. https://github.com/grpc/grpc-go/issues/7339
addr := connectedAddress(state)
lID := xdsinternal.GetLocalityID(addr)
if lID.Empty() {
if b.logger.V(2) {
b.logger.Infof("Locality ID for %s unexpectedly empty", addr)
b.serializer.TrySchedule(func(context.Context) {
b.updateSubConnState(sc, state, oldListener)
if state.ConnectivityState != connectivity.Ready {
return
}
return
}
scw.updateLocalityID(lID)
// Read connected address and call updateLocalityID() based on the connected
// address's locality. https://github.com/grpc/grpc-go/issues/7339
addr := connectedAddress(state)
lID := xdsinternal.GetLocalityID(addr)
if lID.Empty() {
if b.logger.V(2) {
b.logger.Infof("Locality ID for %s unexpectedly empty", addr)

Check warning on line 396 in xds/internal/balancer/clusterimpl/clusterimpl.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/balancer/clusterimpl/clusterimpl.go#L396

Added line #L396 was not covered by tests
}
return
}
scw.updateLocalityID(lID)
})
}
sc, err := b.ClientConn.NewSubConn(newAddrs, opts)
if err != nil {
Expand Down Expand Up @@ -464,49 +478,3 @@
requestCountMax: b.requestCountMax,
}
}

easwars marked this conversation as resolved.
Show resolved Hide resolved
func (b *clusterImplBalancer) run() {
defer b.done.Fire()
for {
select {
case update, ok := <-b.pickerUpdateCh.Get():
if !ok {
return
}
b.pickerUpdateCh.Load()
b.mu.Lock()
if b.closed.HasFired() {
b.mu.Unlock()
return
}
switch u := update.(type) {
case balancer.State:
b.childState = u
b.ClientConn.UpdateState(balancer.State{
ConnectivityState: b.childState.ConnectivityState,
Picker: b.newPicker(&dropConfigs{
drops: b.drops,
requestCounter: b.requestCounter,
requestCountMax: b.requestCountMax,
}),
})
case *LBConfig:
b.telemetryLabels = u.TelemetryLabels
dc := b.handleDropAndRequestCount(u)
if dc != nil && b.childState.Picker != nil {
b.ClientConn.UpdateState(balancer.State{
ConnectivityState: b.childState.ConnectivityState,
Picker: b.newPicker(dc),
})
}
}
b.mu.Unlock()
case <-b.closed.Done():
if b.cancelLoadReport != nil {
b.cancelLoadReport()
b.cancelLoadReport = nil
}
return
}
}
}
Loading