Skip to content

Commit

Permalink
more
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley committed Aug 7, 2023
1 parent f939e05 commit bd417fc
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 45 deletions.
2 changes: 1 addition & 1 deletion balancer/rls/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ func (b *rlsBalancer) ResolverError(err error) {
}

func (b *rlsBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
b.bg.UpdateSubConnState(sc, state)
b.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, state)
}

func (b *rlsBalancer) Close() {
Expand Down
11 changes: 10 additions & 1 deletion balancer/weightedroundrobin/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,12 @@ func (b *wrrBalancer) updateAddresses(addrs []resolver.Address) {
wsc = wsci.(*weightedSubConn)
} else {
// addr is a new address (not existing in b.subConns).
sc, err := b.cc.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{})
var sc balancer.SubConn
sc, err := b.cc.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{
StateListener: func(state balancer.SubConnState) {
b.updateSubConnState(sc, state)
},
})
if err != nil {
b.logger.Warningf("Failed to create new SubConn for address %v: %v", addr, err)
continue
Expand Down Expand Up @@ -205,6 +210,10 @@ func (b *wrrBalancer) ResolverError(err error) {
}

func (b *wrrBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
b.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, state)
}

func (b *wrrBalancer) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
wsc := b.scMap[sc]
if wsc == nil {
b.logger.Errorf("UpdateSubConnState called with an unknown SubConn: %p, %v", sc, state)
Expand Down
2 changes: 1 addition & 1 deletion balancer/weightedtarget/weightedtarget.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (b *weightedTargetBalancer) ResolverError(err error) {
}

func (b *weightedTargetBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
b.bg.UpdateSubConnState(sc, state)
b.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, state)
}

func (b *weightedTargetBalancer) Close() {
Expand Down
23 changes: 12 additions & 11 deletions balancer/weightedtarget/weightedtarget_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1211,23 +1211,24 @@ var errTestInitIdle = fmt.Errorf("init Idle balancer error 0")
func init() {
stub.Register(initIdleBalancerName, stub.BalancerFuncs{
UpdateClientConnState: func(bd *stub.BalancerData, opts balancer.ClientConnState) error {
sc, err := bd.ClientConn.NewSubConn(opts.ResolverState.Addresses, balancer.NewSubConnOptions{})
sc, err := bd.ClientConn.NewSubConn(opts.ResolverState.Addresses, balancer.NewSubConnOptions{
StateListener: func(state balancer.SubConnState) {
err := fmt.Errorf("wrong picker error")
if state.ConnectivityState == connectivity.Idle {
err = errTestInitIdle
}
bd.ClientConn.UpdateState(balancer.State{
ConnectivityState: state.ConnectivityState,
Picker: &testutils.TestConstPicker{Err: err},
})
},
})
if err != nil {
return err
}
sc.Connect()
return nil
},
UpdateSubConnState: func(bd *stub.BalancerData, sc balancer.SubConn, state balancer.SubConnState) {
err := fmt.Errorf("wrong picker error")
if state.ConnectivityState == connectivity.Idle {
err = errTestInitIdle
}
bd.ClientConn.UpdateState(balancer.State{
ConnectivityState: state.ConnectivityState,
Picker: &testutils.TestConstPicker{Err: err},
})
},
})
}

Expand Down
4 changes: 3 additions & 1 deletion clientconn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1146,7 +1146,9 @@ type stateRecordingBalancer struct {
balancer.Balancer
}

func (b *stateRecordingBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) {}
func (b *stateRecordingBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) {
panic(fmt.Sprintf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, s))
}

func (b *stateRecordingBalancer) Close() {
b.Balancer.Close()
Expand Down
6 changes: 2 additions & 4 deletions internal/balancer/stub/stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package stub

import (
"encoding/json"
"fmt"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/serviceconfig"
Expand All @@ -38,7 +39,6 @@ type BalancerFuncs struct {

UpdateClientConnState func(*BalancerData, balancer.ClientConnState) error
ResolverError func(*BalancerData, error)
UpdateSubConnState func(*BalancerData, balancer.SubConn, balancer.SubConnState)
Close func(*BalancerData)
ExitIdle func(*BalancerData)
}
Expand Down Expand Up @@ -72,9 +72,7 @@ func (b *bal) ResolverError(e error) {
}

func (b *bal) UpdateSubConnState(sc balancer.SubConn, scs balancer.SubConnState) {
if b.bf.UpdateSubConnState != nil {
b.bf.UpdateSubConnState(b.bd, sc, scs)
}
panic(fmt.Sprintf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, scs))
}

func (b *bal) Close() {
Expand Down
29 changes: 14 additions & 15 deletions interop/orcalb.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (o *orcab) UpdateClientConnState(s balancer.ClientConnState) error {
return fmt.Errorf("resolver produced no addresses")
}
var err error
o.sc, err = o.cc.NewSubConn(s.ResolverState.Addresses, balancer.NewSubConnOptions{})
o.sc, err = o.cc.NewSubConn(s.ResolverState.Addresses, balancer.NewSubConnOptions{StateListener: o.updateSubConnState})
if err != nil {
o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: base.NewErrPicker(fmt.Errorf("error creating subconn: %v", err))})
return nil
Expand All @@ -82,20 +82,20 @@ func (o *orcab) ResolverError(err error) {
}
}

func (o *orcab) UpdateSubConnState(sc balancer.SubConn, scState balancer.SubConnState) {
if o.sc != sc {
logger.Errorf("received subconn update for unknown subconn: %v vs %v", o.sc, sc)
return
}
switch scState.ConnectivityState {
func (o *orcab) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, state)
}

func (o *orcab) updateSubConnState(state balancer.SubConnState) {
switch state.ConnectivityState {
case connectivity.Ready:
o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Ready, Picker: &scPicker{sc: sc, o: o}})
o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Ready, Picker: &orcaPicker{o: o}})
case connectivity.TransientFailure:
o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: base.NewErrPicker(fmt.Errorf("all subchannels in transient failure: %v", scState.ConnectionError))})
o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: base.NewErrPicker(fmt.Errorf("all subchannels in transient failure: %v", state.ConnectionError))})
case connectivity.Connecting:
// Ignore; picker already set to "connecting".
case connectivity.Idle:
sc.Connect()
o.sc.Connect()
o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable)})
case connectivity.Shutdown:
// Ignore; we are closing but handle that in Close instead.
Expand All @@ -113,12 +113,11 @@ func (o *orcab) OnLoadReport(r *v3orcapb.OrcaLoadReport) {
o.report = r
}

type scPicker struct {
sc balancer.SubConn
o *orcab
type orcaPicker struct {
o *orcab
}

func (p *scPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
func (p *orcaPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
doneCB := func(di balancer.DoneInfo) {
if lr, _ := di.ServerLoad.(*v3orcapb.OrcaLoadReport); lr != nil &&
(lr.CpuUtilization != 0 || lr.MemUtilization != 0 || len(lr.Utilization) > 0 || len(lr.RequestCost) > 0) {
Expand All @@ -134,7 +133,7 @@ func (p *scPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
}
}
}
return balancer.PickResult{SubConn: p.sc, Done: doneCB}, nil
return balancer.PickResult{SubConn: p.o.sc, Done: doneCB}, nil
}

func setContextCMR(ctx context.Context, lr *v3orcapb.OrcaLoadReport) {
Expand Down
23 changes: 12 additions & 11 deletions xds/internal/balancer/clustermanager/clustermanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,23 +582,24 @@ var errTestInitIdle = fmt.Errorf("init Idle balancer error 0")
func init() {
stub.Register(initIdleBalancerName, stub.BalancerFuncs{
UpdateClientConnState: func(bd *stub.BalancerData, opts balancer.ClientConnState) error {
sc, err := bd.ClientConn.NewSubConn(opts.ResolverState.Addresses, balancer.NewSubConnOptions{})
sc, err := bd.ClientConn.NewSubConn(opts.ResolverState.Addresses, balancer.NewSubConnOptions{
StateListener: func(state balancer.SubConnState) {
err := fmt.Errorf("wrong picker error")
if state.ConnectivityState == connectivity.Idle {
err = errTestInitIdle
}
bd.ClientConn.UpdateState(balancer.State{
ConnectivityState: state.ConnectivityState,
Picker: &testutils.TestConstPicker{Err: err},
})
},
})
if err != nil {
return err
}
sc.Connect()
return nil
},
UpdateSubConnState: func(bd *stub.BalancerData, sc balancer.SubConn, state balancer.SubConnState) {
err := fmt.Errorf("wrong picker error")
if state.ConnectivityState == connectivity.Idle {
err = errTestInitIdle
}
bd.ClientConn.UpdateState(balancer.State{
ConnectivityState: state.ConnectivityState,
Picker: &testutils.TestConstPicker{Err: err},
})
},
})
}

Expand Down

0 comments on commit bd417fc

Please sign in to comment.