From b33409d7481d6edae4569201b5a4d3585ff82eaa Mon Sep 17 00:00:00 2001 From: Abhishek Ranjan Date: Sun, 18 Aug 2024 20:41:05 +0530 Subject: [PATCH 1/3] Use a grpcsync.CallbackSerializer instead of a run goroutine to synchronize updates --- .../balancer/clusterimpl/clusterimpl.go | 146 ++++++------------ 1 file changed, 51 insertions(+), 95 deletions(-) diff --git a/xds/internal/balancer/clusterimpl/clusterimpl.go b/xds/internal/balancer/clusterimpl/clusterimpl.go index 9058f0d01fc8..82e673c60644 100644 --- a/xds/internal/balancer/clusterimpl/clusterimpl.go +++ b/xds/internal/balancer/clusterimpl/clusterimpl.go @@ -24,6 +24,7 @@ package clusterimpl import ( + "context" "encoding/json" "fmt" "sync" @@ -33,7 +34,6 @@ import ( "google.golang.org/grpc/connectivity" "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/balancer/gracefulswitch" - "google.golang.org/grpc/internal/buffer" "google.golang.org/grpc/internal/grpclog" "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/pretty" @@ -62,18 +62,18 @@ func init() { type bb struct{} func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer { + ctx, cancel := context.WithCancel(context.Background()) + b := &clusterImplBalancer{ - ClientConn: cc, - bOpts: bOpts, - closed: grpcsync.NewEvent(), - done: grpcsync.NewEvent(), - loadWrapper: loadstore.NewWrapper(), - pickerUpdateCh: buffer.NewUnbounded(), - requestCountMax: defaultRequestCountMax, + ClientConn: cc, + bOpts: bOpts, + loadWrapper: loadstore.NewWrapper(), + requestCountMax: defaultRequestCountMax, + serializer: grpcsync.NewCallbackSerializer(ctx), + serializerCancel: cancel, } b.logger = prefixLogger(b) b.child = gracefulswitch.NewBalancer(b, bOpts) - go b.run() b.logger.Infof("Created") return b } @@ -89,18 +89,6 @@ func (bb) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, err type clusterImplBalancer struct { balancer.ClientConn - // mu guarantees mutual exclusion between Close() and handling of picker - // update to the parent ClientConn in run(). It's to make sure that the - // run() goroutine doesn't send picker update to parent after the balancer - // is closed. - // - // It's only used by the run() goroutine, but not the other exported - // functions. Because the exported functions are guaranteed to be - // synchronized with Close(). - mu sync.Mutex - closed *grpcsync.Event - done *grpcsync.Event - bOpts balancer.BuildOptions logger *grpclog.PrefixLogger xdsClient xdsclient.XDSClient @@ -115,10 +103,11 @@ type clusterImplBalancer struct { clusterNameMu sync.Mutex clusterName string + serializer *grpcsync.CallbackSerializer + serializerCancel context.CancelFunc + // childState/drops/requestCounter keeps the state used by the most recently - // generated picker. All fields can only be accessed in run(). And run() is - // the only goroutine that sends picker to the parent ClientConn. All - // requests to update picker need to be sent to pickerUpdateCh. + // generated picker. childState balancer.State dropCategories []DropConfig // The categories for drops. drops []*dropper @@ -127,7 +116,6 @@ type clusterImplBalancer struct { requestCounter *xdsclient.ClusterRequestsCounter requestCountMax uint32 telemetryLabels map[string]string - pickerUpdateCh *buffer.Unbounded } // updateLoadStore checks the config for load store, and decides whether it @@ -209,11 +197,6 @@ func (b *clusterImplBalancer) updateLoadStore(newConfig *LBConfig) error { } func (b *clusterImplBalancer) UpdateClientConnState(s balancer.ClientConnState) error { - if b.closed.HasFired() { - b.logger.Warningf("xds: received ClientConnState {%+v} after clusterImplBalancer was closed", s) - return nil - } - if b.logger.V(2) { b.logger.Infof("Received update from resolver, balancer config: %s", pretty.ToJSON(s.BalancerConfig)) } @@ -253,9 +236,23 @@ func (b *clusterImplBalancer) UpdateClientConnState(s balancer.ClientConnState) } b.config = newConfig - // Notify run() of this new config, in case drop and request counter need - // update (which means a new picker needs to be generated). - b.pickerUpdateCh.Put(newConfig) + callback := func(context.Context) { + b.telemetryLabels = newConfig.TelemetryLabels + dc := b.handleDropAndRequestCount(newConfig) + if dc != nil && b.childState.Picker != nil { + b.ClientConn.UpdateState(balancer.State{ + ConnectivityState: b.childState.ConnectivityState, + Picker: b.newPicker(dc), + }) + } + } + + onFailure := func() { + // Handle the case where scheduling fails (e.g., if the serializer is closed) + b.logger.Warningf("xds: failed to schedule config update, balancer might be closed") + } + // Schedule the config update on the serializer + b.serializer.ScheduleOr(callback, onFailure) // Addresses and sub-balancer config are sent to sub-balancer. return b.child.UpdateClientConnState(balancer.ClientConnState{ @@ -265,19 +262,10 @@ func (b *clusterImplBalancer) UpdateClientConnState(s balancer.ClientConnState) } func (b *clusterImplBalancer) ResolverError(err error) { - if b.closed.HasFired() { - b.logger.Warningf("xds: received resolver error {%+v} after clusterImplBalancer was closed", err) - return - } b.child.ResolverError(err) } func (b *clusterImplBalancer) updateSubConnState(sc balancer.SubConn, s balancer.SubConnState, cb func(balancer.SubConnState)) { - if b.closed.HasFired() { - b.logger.Warningf("xds: received subconn state change {%+v, %+v} after clusterImplBalancer was closed", sc, s) - return - } - // Trigger re-resolution when a SubConn turns transient failure. This is // necessary for the LogicalDNS in cluster_resolver policy to re-resolve. // @@ -299,14 +287,15 @@ func (b *clusterImplBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer } func (b *clusterImplBalancer) Close() { - b.mu.Lock() - b.closed.Fire() - b.mu.Unlock() + if b.cancelLoadReport != nil { + b.cancelLoadReport() + b.cancelLoadReport = nil + } + b.serializerCancel() + <-b.serializer.Done() b.child.Close() b.childState = balancer.State{} - b.pickerUpdateCh.Close() - <-b.done.Done() b.logger.Infof("Shutdown") } @@ -317,8 +306,21 @@ func (b *clusterImplBalancer) ExitIdle() { // Override methods to accept updates from the child LB. func (b *clusterImplBalancer) UpdateState(state balancer.State) { - // Instead of updating parent ClientConn inline, send state to run(). - b.pickerUpdateCh.Put(state) + // Schedule picker update on the serializer using ScheduleOr + b.serializer.ScheduleOr(func(context.Context) { + b.childState = state + b.ClientConn.UpdateState(balancer.State{ + ConnectivityState: b.childState.ConnectivityState, + Picker: b.newPicker(&dropConfigs{ + drops: b.drops, + requestCounter: b.requestCounter, + requestCountMax: b.requestCountMax, + }), + }) + }, func() { + // Handle the case where scheduling fails + b.logger.Warningf("xds: failed to schedule picker update, balancer might be closed") + }) } func (b *clusterImplBalancer) setClusterName(n string) { @@ -464,49 +466,3 @@ func (b *clusterImplBalancer) handleDropAndRequestCount(newConfig *LBConfig) *dr requestCountMax: b.requestCountMax, } } - -func (b *clusterImplBalancer) run() { - defer b.done.Fire() - for { - select { - case update, ok := <-b.pickerUpdateCh.Get(): - if !ok { - return - } - b.pickerUpdateCh.Load() - b.mu.Lock() - if b.closed.HasFired() { - b.mu.Unlock() - return - } - switch u := update.(type) { - case balancer.State: - b.childState = u - b.ClientConn.UpdateState(balancer.State{ - ConnectivityState: b.childState.ConnectivityState, - Picker: b.newPicker(&dropConfigs{ - drops: b.drops, - requestCounter: b.requestCounter, - requestCountMax: b.requestCountMax, - }), - }) - case *LBConfig: - b.telemetryLabels = u.TelemetryLabels - dc := b.handleDropAndRequestCount(u) - if dc != nil && b.childState.Picker != nil { - b.ClientConn.UpdateState(balancer.State{ - ConnectivityState: b.childState.ConnectivityState, - Picker: b.newPicker(dc), - }) - } - } - b.mu.Unlock() - case <-b.closed.Done(): - if b.cancelLoadReport != nil { - b.cancelLoadReport() - b.cancelLoadReport = nil - } - return - } - } -} From 5e90f8e39917bbbc6bc3875f668bbfdc3ad181a6 Mon Sep 17 00:00:00 2001 From: Abhishek Ranjan Date: Wed, 21 Aug 2024 15:58:01 +0530 Subject: [PATCH 2/3] fix: put methods like Close, ResolverError, ExitIdle in CallbackSerializer --- .../balancer/clusterimpl/clusterimpl.go | 110 ++++++++++-------- 1 file changed, 61 insertions(+), 49 deletions(-) diff --git a/xds/internal/balancer/clusterimpl/clusterimpl.go b/xds/internal/balancer/clusterimpl/clusterimpl.go index 82e673c60644..b2ea2815e30b 100644 --- a/xds/internal/balancer/clusterimpl/clusterimpl.go +++ b/xds/internal/balancer/clusterimpl/clusterimpl.go @@ -53,7 +53,10 @@ const ( defaultRequestCountMax = 1024 ) -var connectedAddress = internal.ConnectedAddress.(func(balancer.SubConnState) resolver.Address) +var ( + connectedAddress = internal.ConnectedAddress.(func(balancer.SubConnState) resolver.Address) + errBalancerClosed = fmt.Errorf("%s LB policy is closed", Name) +) func init() { balancer.Register(bb{}) @@ -63,7 +66,6 @@ type bb struct{} func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer { ctx, cancel := context.WithCancel(context.Background()) - b := &clusterImplBalancer{ ClientConn: cc, bOpts: bOpts, @@ -196,9 +198,9 @@ func (b *clusterImplBalancer) updateLoadStore(newConfig *LBConfig) error { return nil } -func (b *clusterImplBalancer) UpdateClientConnState(s balancer.ClientConnState) error { +func (b *clusterImplBalancer) updateClientConnState(s balancer.ClientConnState) error { if b.logger.V(2) { - b.logger.Infof("Received update from resolver, balancer config: %s", pretty.ToJSON(s.BalancerConfig)) + b.logger.Infof("Received configuration: %s", pretty.ToJSON(s.BalancerConfig)) } newConfig, ok := s.BalancerConfig.(*LBConfig) if !ok { @@ -210,7 +212,7 @@ func (b *clusterImplBalancer) UpdateClientConnState(s balancer.ClientConnState) // it. bb := balancer.Get(newConfig.ChildPolicy.Name) if bb == nil { - return fmt.Errorf("balancer %q not registered", newConfig.ChildPolicy.Name) + return fmt.Errorf("child policy %q not registered", newConfig.ChildPolicy.Name) } if b.xdsClient == nil { @@ -236,23 +238,14 @@ func (b *clusterImplBalancer) UpdateClientConnState(s balancer.ClientConnState) } b.config = newConfig - callback := func(context.Context) { - b.telemetryLabels = newConfig.TelemetryLabels - dc := b.handleDropAndRequestCount(newConfig) - if dc != nil && b.childState.Picker != nil { - b.ClientConn.UpdateState(balancer.State{ - ConnectivityState: b.childState.ConnectivityState, - Picker: b.newPicker(dc), - }) - } - } - - onFailure := func() { - // Handle the case where scheduling fails (e.g., if the serializer is closed) - b.logger.Warningf("xds: failed to schedule config update, balancer might be closed") + b.telemetryLabels = newConfig.TelemetryLabels + dc := b.handleDropAndRequestCount(newConfig) + if dc != nil && b.childState.Picker != nil { + b.ClientConn.UpdateState(balancer.State{ + ConnectivityState: b.childState.ConnectivityState, + Picker: b.newPicker(dc), + }) } - // Schedule the config update on the serializer - b.serializer.ScheduleOr(callback, onFailure) // Addresses and sub-balancer config are sent to sub-balancer. return b.child.UpdateClientConnState(balancer.ClientConnState{ @@ -261,8 +254,25 @@ func (b *clusterImplBalancer) UpdateClientConnState(s balancer.ClientConnState) }) } +func (b *clusterImplBalancer) UpdateClientConnState(s balancer.ClientConnState) error { + // Handle the update in a blocking fashion. + errCh := make(chan error, 1) + callback := func(context.Context) { + errCh <- b.updateClientConnState(s) + } + onFailure := func() { + // The call to Schedule returns false *only* if the serializer has been + // closed, which happens only when we receive an update after close. + errCh <- errBalancerClosed + } + b.serializer.ScheduleOr(callback, onFailure) + return <-errCh +} + func (b *clusterImplBalancer) ResolverError(err error) { - b.child.ResolverError(err) + b.serializer.TrySchedule(func(context.Context) { + b.child.ResolverError(err) + }) } func (b *clusterImplBalancer) updateSubConnState(sc balancer.SubConn, s balancer.SubConnState, cb func(balancer.SubConnState)) { @@ -287,27 +297,30 @@ func (b *clusterImplBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer } func (b *clusterImplBalancer) Close() { - if b.cancelLoadReport != nil { - b.cancelLoadReport() - b.cancelLoadReport = nil - } + b.serializer.TrySchedule(func(ctx context.Context) { + b.child.Close() + b.childState = balancer.State{} + if b.cancelLoadReport != nil { + b.cancelLoadReport() + b.cancelLoadReport = nil + } + b.logger.Infof("Shutdown") + }) b.serializerCancel() <-b.serializer.Done() - b.child.Close() - b.childState = balancer.State{} - b.logger.Infof("Shutdown") } func (b *clusterImplBalancer) ExitIdle() { - b.child.ExitIdle() + b.serializer.TrySchedule(func(context.Context) { + b.child.ExitIdle() + }) } // Override methods to accept updates from the child LB. func (b *clusterImplBalancer) UpdateState(state balancer.State) { - // Schedule picker update on the serializer using ScheduleOr - b.serializer.ScheduleOr(func(context.Context) { + b.serializer.TrySchedule(func(context.Context) { b.childState = state b.ClientConn.UpdateState(balancer.State{ ConnectivityState: b.childState.ConnectivityState, @@ -317,9 +330,6 @@ func (b *clusterImplBalancer) UpdateState(state balancer.State) { requestCountMax: b.requestCountMax, }), }) - }, func() { - // Handle the case where scheduling fails - b.logger.Warningf("xds: failed to schedule picker update, balancer might be closed") }) } @@ -372,21 +382,23 @@ func (b *clusterImplBalancer) NewSubConn(addrs []resolver.Address, opts balancer scw := &scWrapper{} oldListener := opts.StateListener opts.StateListener = func(state balancer.SubConnState) { - b.updateSubConnState(sc, state, oldListener) - if state.ConnectivityState != connectivity.Ready { - return - } - // Read connected address and call updateLocalityID() based on the connected - // address's locality. https://github.com/grpc/grpc-go/issues/7339 - addr := connectedAddress(state) - lID := xdsinternal.GetLocalityID(addr) - if lID.Empty() { - if b.logger.V(2) { - b.logger.Infof("Locality ID for %s unexpectedly empty", addr) + b.serializer.TrySchedule(func(context.Context) { + b.updateSubConnState(sc, state, oldListener) + if state.ConnectivityState != connectivity.Ready { + return } - return - } - scw.updateLocalityID(lID) + // Read connected address and call updateLocalityID() based on the connected + // address's locality. https://github.com/grpc/grpc-go/issues/7339 + addr := connectedAddress(state) + lID := xdsinternal.GetLocalityID(addr) + if lID.Empty() { + if b.logger.V(2) { + b.logger.Infof("Locality ID for %s unexpectedly empty", addr) + } + return + } + scw.updateLocalityID(lID) + }) } sc, err := b.ClientConn.NewSubConn(newAddrs, opts) if err != nil { From fc2054e838665974908f6800919fb047b776a3ad Mon Sep 17 00:00:00 2001 From: Abhishek Ranjan <159750762+aranjans@users.noreply.github.com> Date: Fri, 30 Aug 2024 11:08:30 +0530 Subject: [PATCH 3/3] fix: updated comment Co-authored-by: Arvind Bright --- xds/internal/balancer/clusterimpl/clusterimpl.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/xds/internal/balancer/clusterimpl/clusterimpl.go b/xds/internal/balancer/clusterimpl/clusterimpl.go index b2ea2815e30b..c8017c7ed2d2 100644 --- a/xds/internal/balancer/clusterimpl/clusterimpl.go +++ b/xds/internal/balancer/clusterimpl/clusterimpl.go @@ -261,8 +261,8 @@ func (b *clusterImplBalancer) UpdateClientConnState(s balancer.ClientConnState) errCh <- b.updateClientConnState(s) } onFailure := func() { - // The call to Schedule returns false *only* if the serializer has been - // closed, which happens only when we receive an update after close. + // An attempt to schedule callback fails only when an update is received + // after Close(). errCh <- errBalancerClosed } b.serializer.ScheduleOr(callback, onFailure)