Skip to content

Commit

Permalink
grpc: perform graceful switching of LB policies in the ClientConn b…
Browse files Browse the repository at this point in the history
…y default (#5285)
  • Loading branch information
easwars authored Apr 1, 2022
1 parent 3cccf6a commit 0066bf6
Show file tree
Hide file tree
Showing 8 changed files with 438 additions and 423 deletions.
331 changes: 218 additions & 113 deletions balancer_conn_wrappers.go

Large diffs are not rendered by default.

93 changes: 14 additions & 79 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,15 +278,15 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
if creds := cc.dopts.copts.TransportCredentials; creds != nil {
credsClone = creds.Clone()
}
cc.balancerBuildOpts = balancer.BuildOptions{
cc.balancerWrapper = newCCBalancerWrapper(cc, balancer.BuildOptions{
DialCreds: credsClone,
CredsBundle: cc.dopts.copts.CredsBundle,
Dialer: cc.dopts.copts.Dialer,
Authority: cc.authority,
CustomUserAgent: cc.dopts.copts.UserAgent,
ChannelzParentID: cc.channelzID,
Target: cc.parsedTarget,
}
})

// Build the resolver.
rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)
Expand Down Expand Up @@ -465,12 +465,12 @@ type ClientConn struct {
cancel context.CancelFunc // Cancelled on close.

// The following are initialized at dial time, and are read-only after that.
target string // User's dial target.
parsedTarget resolver.Target // See parseTargetAndFindResolver().
authority string // See determineAuthority().
dopts dialOptions // Default and user specified dial options.
balancerBuildOpts balancer.BuildOptions // TODO: delete once we move to the gracefulswitch balancer.
channelzID *channelz.Identifier // Channelz identifier for the channel.
target string // User's dial target.
parsedTarget resolver.Target // See parseTargetAndFindResolver().
authority string // See determineAuthority().
dopts dialOptions // Default and user specified dial options.
channelzID *channelz.Identifier // Channelz identifier for the channel.
balancerWrapper *ccBalancerWrapper // Uses gracefulswitch.balancer underneath.

// The following provide their own synchronization, and therefore don't
// require cc.mu to be held to access them.
Expand All @@ -491,8 +491,6 @@ type ClientConn struct {
sc *ServiceConfig // Latest service config received from the resolver.
conns map[*addrConn]struct{} // Set to nil on close.
mkp keepalive.ClientParameters // May be updated upon receipt of a GoAway.
curBalancerName string // TODO: delete as part of https://github.com/grpc/grpc-go/issues/5229.
balancerWrapper *ccBalancerWrapper // TODO: Use gracefulswitch balancer to be able to initialize this once and never rewrite.

lceMu sync.Mutex // protects lastConnectionError
lastConnectionError error
Expand Down Expand Up @@ -537,14 +535,7 @@ func (cc *ClientConn) GetState() connectivity.State {
// Notice: This API is EXPERIMENTAL and may be changed or removed in a later
// release.
func (cc *ClientConn) Connect() {
cc.mu.Lock()
defer cc.mu.Unlock()
if cc.balancerWrapper.exitIdle() {
return
}
for ac := range cc.conns {
go ac.connect()
}
cc.balancerWrapper.exitIdle()
}

func (cc *ClientConn) scWatcher() {
Expand Down Expand Up @@ -666,21 +657,9 @@ func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
if cc.sc != nil && cc.sc.lbConfig != nil {
balCfg = cc.sc.lbConfig.cfg
}

cbn := cc.curBalancerName
bw := cc.balancerWrapper
cc.mu.Unlock()
if cbn != grpclbName {
// Filter any grpclb addresses since we don't have the grpclb balancer.
var addrs []resolver.Address
for _, addr := range s.Addresses {
if addr.Type == resolver.GRPCLB {
continue
}
addrs = append(addrs, addr)
}
s.Addresses = addrs
}

uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
if ret == nil {
ret = uccsErr // prefer ErrBadResolver state since any other error is
Expand Down Expand Up @@ -709,50 +688,8 @@ func (cc *ClientConn) applyFailingLB(sc *serviceconfig.ParseResult) {
cc.csMgr.updateState(connectivity.TransientFailure)
}

// switchBalancer starts the switching from current balancer to the balancer
// with the given name.
//
// It will NOT send the current address list to the new balancer. If needed,
// caller of this function should send address list to the new balancer after
// this function returns.
//
// Caller must hold cc.mu.
func (cc *ClientConn) switchBalancer(name string) {
if strings.EqualFold(cc.curBalancerName, name) {
return
}

channelz.Infof(logger, cc.channelzID, "ClientConn switching balancer to %q", name)
// Don't hold cc.mu while closing the balancers. The balancers may call
// methods that require cc.mu (e.g. cc.NewSubConn()). Holding the mutex
// would cause a deadlock in that case.
cc.mu.Unlock()
cc.balancerWrapper.close()
cc.mu.Lock()

builder := balancer.Get(name)
if builder == nil {
channelz.Warningf(logger, cc.channelzID, "Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName)
channelz.Infof(logger, cc.channelzID, "failed to get balancer builder for: %v, using pick_first instead", name)
builder = newPickfirstBuilder()
} else {
channelz.Infof(logger, cc.channelzID, "Channel switches to new LB policy %q", name)
}

cc.curBalancerName = builder.Name()
cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
}

func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) {
cc.mu.Lock()
if cc.conns == nil {
cc.mu.Unlock()
return
}
// TODO(bar switching) send updates to all balancer wrappers when balancer
// gracefully switching is supported.
cc.balancerWrapper.handleSubConnStateChange(sc, s, err)
cc.mu.Unlock()
cc.balancerWrapper.updateSubConnState(sc, s, err)
}

// newAddrConn creates an addrConn for addrs and adds it to cc.conns.
Expand Down Expand Up @@ -1002,8 +939,6 @@ func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSel
cc.retryThrottler.Store((*retryThrottler)(nil))
}

// Only look at balancer types and switch balancer if balancer dial
// option is not set.
var newBalancerName string
if cc.sc != nil && cc.sc.lbConfig != nil {
newBalancerName = cc.sc.lbConfig.name
Expand All @@ -1023,7 +958,7 @@ func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSel
newBalancerName = PickFirstBalancerName
}
}
cc.switchBalancer(newBalancerName)
cc.balancerWrapper.switchTo(newBalancerName)
}

func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) {
Expand Down Expand Up @@ -1074,11 +1009,11 @@ func (cc *ClientConn) Close() error {
rWrapper := cc.resolverWrapper
cc.resolverWrapper = nil
bWrapper := cc.balancerWrapper
cc.balancerWrapper = nil
cc.mu.Unlock()

// The order of closing matters here since the balancer wrapper assumes the
// picker is closed before it is closed.
cc.blockingpicker.close()

if bWrapper != nil {
bWrapper.close()
}
Expand Down
10 changes: 7 additions & 3 deletions clientconn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -845,9 +845,13 @@ func (s) TestBackoffCancel(t *testing.T) {
if err != nil {
t.Fatalf("Failed to create ClientConn: %v", err)
}
<-dialStrCh
cc.Close()
// Should not leak. May need -count 5000 to exercise.
defer cc.Close()

select {
case <-time.After(defaultTestTimeout):
t.Fatal("Timeout when waiting for custom dialer to be invoked during Dial")
case <-dialStrCh:
}
}

// UpdateAddresses should cause the next reconnect to begin from the top of the
Expand Down
7 changes: 7 additions & 0 deletions internal/balancer/gracefulswitch/gracefulswitch.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,9 @@ func (gsb *Balancer) ResolverError(err error) {
}

// ExitIdle forwards the call to the latest balancer created.
//
// If the latest balancer does not support ExitIdle, the subConns are
// re-connected to manually.
func (gsb *Balancer) ExitIdle() {
balToUpdate := gsb.latestBalancer()
if balToUpdate == nil {
Expand All @@ -188,6 +191,10 @@ func (gsb *Balancer) ExitIdle() {
// called.
if ei, ok := balToUpdate.Balancer.(balancer.ExitIdler); ok {
ei.ExitIdle()
return
}
for sc := range balToUpdate.subconns {
sc.Connect()
}
}

Expand Down
16 changes: 15 additions & 1 deletion internal/balancer/stub/stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@
// Package stub implements a balancer for testing purposes.
package stub

import "google.golang.org/grpc/balancer"
import (
"encoding/json"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/serviceconfig"
)

// BalancerFuncs contains all balancer.Balancer functions with a preceding
// *BalancerData parameter for passing additional instance information. Any
Expand All @@ -28,6 +33,8 @@ type BalancerFuncs struct {
// Init is called after ClientConn and BuildOptions are set in
// BalancerData. It may be used to initialize BalancerData.Data.
Init func(*BalancerData)
// ParseConfig is used for parsing LB configs, if specified.
ParseConfig func(json.RawMessage) (serviceconfig.LoadBalancingConfig, error)

UpdateClientConnState func(*BalancerData, balancer.ClientConnState) error
ResolverError func(*BalancerData, error)
Expand Down Expand Up @@ -97,6 +104,13 @@ func (bb bb) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.

func (bb bb) Name() string { return bb.name }

func (bb bb) ParseConfig(lbCfg json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
if bb.bf.ParseConfig != nil {
return bb.bf.ParseConfig(lbCfg)
}
return nil, nil
}

// Register registers a stub balancer builder which will call the provided
// functions. The name used should be unique.
func Register(name string, bf BalancerFuncs) {
Expand Down
Loading

0 comments on commit 0066bf6

Please sign in to comment.