Skip to content

Commit

Permalink
outlierdetection: fix unconditional calls of child UpdateSubConnState
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley committed Aug 3, 2023
1 parent 8f496b2 commit c156b8e
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 11 deletions.
19 changes: 11 additions & 8 deletions xds/internal/balancer/outlierdetection/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ func (bb) Name() string {
type scUpdate struct {
scw *subConnWrapper
state balancer.SubConnState
cb func(balancer.SubConnState)
}

type ejectionUpdate struct {
Expand Down Expand Up @@ -346,7 +345,7 @@ func (b *outlierDetectionBalancer) ResolverError(err error) {
b.child.ResolverError(err)
}

func (b *outlierDetectionBalancer) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState, cb func(balancer.SubConnState)) {
func (b *outlierDetectionBalancer) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
b.mu.Lock()
defer b.mu.Unlock()
scw, ok := b.scWrappers[sc]
Expand All @@ -362,12 +361,11 @@ func (b *outlierDetectionBalancer) updateSubConnState(sc balancer.SubConn, state
b.scUpdateCh.Put(&scUpdate{
scw: scw,
state: state,
cb: cb,
})
}

func (b *outlierDetectionBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
b.updateSubConnState(sc, state, nil)
b.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, state)
}

func (b *outlierDetectionBalancer) Close() {
Expand Down Expand Up @@ -474,7 +472,7 @@ func (b *outlierDetectionBalancer) UpdateState(s balancer.State) {
func (b *outlierDetectionBalancer) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
var sc balancer.SubConn
oldListener := opts.StateListener
opts.StateListener = func(state balancer.SubConnState) { b.updateSubConnState(sc, state, oldListener) }
opts.StateListener = func(state balancer.SubConnState) { b.updateSubConnState(sc, state) }
sc, err := b.cc.NewSubConn(addrs, opts)
if err != nil {
return nil, err
Expand All @@ -483,6 +481,7 @@ func (b *outlierDetectionBalancer) NewSubConn(addrs []resolver.Address, opts bal
SubConn: sc,
addresses: addrs,
scUpdateCh: b.scUpdateCh,
listener: oldListener,
}
b.mu.Lock()
defer b.mu.Unlock()
Expand Down Expand Up @@ -624,8 +623,8 @@ func (b *outlierDetectionBalancer) handleSubConnUpdate(u *scUpdate) {
scw.latestState = u.state
if !scw.ejected {
b.childMu.Lock()
if u.cb != nil {
u.cb(u.state)
if scw.listener != nil {
scw.listener(u.state)
} else {
b.child.UpdateSubConnState(scw, u.state)
}
Expand All @@ -647,7 +646,11 @@ func (b *outlierDetectionBalancer) handleEjectedUpdate(u *ejectionUpdate) {
}
}
b.childMu.Lock()
b.child.UpdateSubConnState(scw, stateToUpdate)
if scw.listener != nil {
scw.listener(stateToUpdate)
} else {
b.child.UpdateSubConnState(scw, stateToUpdate)
}
b.childMu.Unlock()
}

Expand Down
6 changes: 3 additions & 3 deletions xds/internal/balancer/outlierdetection/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1085,7 +1085,7 @@ func (s) TestEjectUnejectSuccessRate(t *testing.T) {

// Since no addresses are ejected, a SubConn update should forward down
// to the child.
od.UpdateSubConnState(scw1.(*subConnWrapper).SubConn, balancer.SubConnState{
od.updateSubConnState(scw1.(*subConnWrapper).SubConn, balancer.SubConnState{
ConnectivityState: connectivity.Connecting,
})

Expand Down Expand Up @@ -1147,7 +1147,7 @@ func (s) TestEjectUnejectSuccessRate(t *testing.T) {
// that address should not be forwarded downward. These SubConn updates
// will be cached to update the child sometime in the future when the
// address gets unejected.
od.UpdateSubConnState(pi.SubConn, balancer.SubConnState{
od.updateSubConnState(pi.SubConn, balancer.SubConnState{
ConnectivityState: connectivity.Connecting,
})
sCtx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
Expand Down Expand Up @@ -1564,7 +1564,7 @@ func (s) TestConcurrentOperations(t *testing.T) {

// Call balancer.Balancers synchronously in this goroutine, upholding the
// balancer.Balancer API guarantee.
od.UpdateSubConnState(scw1.(*subConnWrapper).SubConn, balancer.SubConnState{
od.updateSubConnState(scw1.(*subConnWrapper).SubConn, balancer.SubConnState{
ConnectivityState: connectivity.Connecting,
})
od.ResolverError(errors.New("some error"))
Expand Down
1 change: 1 addition & 0 deletions xds/internal/balancer/outlierdetection/subconn_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
// whether or not this SubConn is ejected.
type subConnWrapper struct {
balancer.SubConn
listener func(balancer.SubConnState)

// addressInfo is a pointer to the subConnWrapper's corresponding address
// map entry, if the map entry exists.
Expand Down

0 comments on commit c156b8e

Please sign in to comment.