Skip to content

Commit

Permalink
grpc: do not use balancer attributes during address comparison (#6439)
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars authored Jul 12, 2023
1 parent db32c5b commit 8e9c8f8
Show file tree
Hide file tree
Showing 5 changed files with 290 additions and 40 deletions.
28 changes: 20 additions & 8 deletions attributes/attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,19 +112,31 @@ func (a *Attributes) String() string {
sb.WriteString("{")
first := true
for k, v := range a.m {
var key, val string
if str, ok := k.(interface{ String() string }); ok {
key = str.String()
}
if str, ok := v.(interface{ String() string }); ok {
val = str.String()
}
if !first {
sb.WriteString(", ")
}
sb.WriteString(fmt.Sprintf("%q: %q, ", key, val))
sb.WriteString(fmt.Sprintf("%q: %q ", str(k), str(v)))
first = false
}
sb.WriteString("}")
return sb.String()
}

func str(x interface{}) string {
if v, ok := x.(fmt.Stringer); ok {
return v.String()
} else if v, ok := x.(string); ok {
return v
}
return fmt.Sprintf("<%p>", x)
}

// MarshalJSON helps implement the json.Marshaler interface, thereby rendering
// the Attributes correctly when printing (via pretty.JSON) structs containing
// Attributes as fields.
//
// Is it impossible to unmarshal attributes from a JSON representation and this
// method is meant only for debugging purposes.
func (a *Attributes) MarshalJSON() ([]byte, error) {
return []byte(a.String()), nil
}
20 changes: 18 additions & 2 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/pretty"
iresolver "google.golang.org/grpc/internal/resolver"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/keepalive"
Expand Down Expand Up @@ -867,14 +868,28 @@ func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivi
cc.balancerWrapper.updateSubConnState(sc, s, err)
}

// Makes a copy of the input addresses slice and clears out the balancer
// attributes field. Addresses are passed during subconn creation and address
// update operations. In both cases, we will clear the balancer attributes by
// calling this function, and therefore we will be able to use the Equal method
// provided by the resolver.Address type for comparison.
func copyAddressesWithoutBalancerAttributes(in []resolver.Address) []resolver.Address {
out := make([]resolver.Address, len(in))
for i := range in {
out[i] = in[i]
out[i].BalancerAttributes = nil
}
return out
}

// newAddrConn creates an addrConn for addrs and adds it to cc.conns.
//
// Caller needs to make sure len(addrs) > 0.
func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
ac := &addrConn{
state: connectivity.Idle,
cc: cc,
addrs: addrs,
addrs: copyAddressesWithoutBalancerAttributes(addrs),
scopts: opts,
dopts: cc.dopts,
czData: new(channelzData),
Expand Down Expand Up @@ -995,8 +1010,9 @@ func equalAddresses(a, b []resolver.Address) bool {
// connections or connection attempts.
func (ac *addrConn) updateAddrs(addrs []resolver.Address) {
ac.mu.Lock()
channelz.Infof(logger, ac.channelzID, "addrConn: updateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs)
channelz.Infof(logger, ac.channelzID, "addrConn: updateAddrs curAddr: %v, addrs: %v", pretty.ToJSON(ac.curAddr), pretty.ToJSON(addrs))

addrs = copyAddressesWithoutBalancerAttributes(addrs)
if equalAddresses(ac.addrs, addrs) {
ac.mu.Unlock()
return
Expand Down
14 changes: 11 additions & 3 deletions internal/stubserver/stubserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ type StubServer struct {
Address string
Target string

// Custom listener to use for serving. If unspecified, a new listener is
// created on a local port.
Listener net.Listener

cleanups []func() // Lambdas executed in Stop(); populated by Start().

// Set automatically if Target == ""
Expand Down Expand Up @@ -118,9 +122,13 @@ func (ss *StubServer) StartServer(sopts ...grpc.ServerOption) error {
ss.R = manual.NewBuilderWithScheme("whatever")
}

lis, err := net.Listen(ss.Network, ss.Address)
if err != nil {
return fmt.Errorf("net.Listen(%q, %q) = %v", ss.Network, ss.Address, err)
lis := ss.Listener
if lis == nil {
var err error
lis, err = net.Listen(ss.Network, ss.Address)
if err != nil {
return fmt.Errorf("net.Listen(%q, %q) = %v", ss.Network, ss.Address, err)
}
}
ss.Address = lis.Addr().String()
ss.cleanups = append(ss.cleanups, func() { lis.Close() })
Expand Down
4 changes: 4 additions & 0 deletions resolver/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ type Address struct {

// Equal returns whether a and o are identical. Metadata is compared directly,
// not with any recursive introspection.
//
// This method compares all fields of the address. When used to tell apart
// addresses during subchannel creation or connection establishment, it might be
// more appropriate for the caller to implement custom equality logic.
func (a Address) Equal(o Address) bool {
return a.Addr == o.Addr && a.ServerName == o.ServerName &&
a.Attributes.Equal(o.Attributes) &&
Expand Down
Loading

0 comments on commit 8e9c8f8

Please sign in to comment.