Skip to content

Commit

Permalink
resolver: remove deprecated AddressType
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley committed Jul 14, 2023
1 parent 9489082 commit 5258fd2
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 273 deletions.
14 changes: 3 additions & 11 deletions balancer/grpclb/grpclb.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,17 +448,9 @@ func (lb *lbBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error
gc, _ := ccs.BalancerConfig.(*grpclbServiceConfig)
lb.handleServiceConfig(gc)

addrs := ccs.ResolverState.Addresses

var remoteBalancerAddrs, backendAddrs []resolver.Address
for _, a := range addrs {
if a.Type == resolver.GRPCLB {
a.Type = resolver.Backend
remoteBalancerAddrs = append(remoteBalancerAddrs, a)
} else {
backendAddrs = append(backendAddrs, a)
}
}
backendAddrs := ccs.ResolverState.Addresses

var remoteBalancerAddrs []resolver.Address
if sd := grpclbstate.Get(ccs.ResolverState); sd != nil {
// Override any balancer addresses provided via
// ccs.ResolverState.Addresses.
Expand Down
6 changes: 3 additions & 3 deletions balancer/grpclb/grpclb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1430,11 +1430,11 @@ func runAndCheckStats(t *testing.T, drop bool, statsChan chan *lbpb.ClientStats,
}
defer cc.Close()

r.UpdateState(resolver.State{Addresses: []resolver.Address{{
rstate := resolver.State{ServiceConfig: r.CC.ParseServiceConfig(grpclbConfig)}
r.UpdateState(grpclbstate.Set(rstate, &grpclbstate.State{BalancerAddresses: []resolver.Address{{
Addr: tss.lbAddr,
Type: resolver.GRPCLB,
ServerName: lbServerName,
}}})
}}}))

runRPCs(cc)
end := time.Now().Add(time.Second)
Expand Down
14 changes: 0 additions & 14 deletions balancer_conn_wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,20 +99,6 @@ func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnStat
// lock held. But the lock guards only the scheduling part. The actual
// callback is called asynchronously without the lock being held.
ok := ccb.serializer.Schedule(func(_ context.Context) {
// If the addresses specified in the update contain addresses of type
// "grpclb" and the selected LB policy is not "grpclb", these addresses
// will be filtered out and ccs will be modified with the updated
// address list.
if ccb.curBalancerName != grpclbName {
var addrs []resolver.Address
for _, addr := range ccs.ResolverState.Addresses {
if addr.Type == resolver.GRPCLB {
continue
}
addrs = append(addrs, addr)
}
ccs.ResolverState.Addresses = addrs
}
errCh <- ccb.balancer.UpdateClientConnState(*ccs)
})
if !ok {
Expand Down
22 changes: 6 additions & 16 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1153,23 +1153,13 @@ func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSel
}

var newBalancerName string
if cc.sc != nil && cc.sc.lbConfig != nil {
if cc.sc == nil || (cc.sc.lbConfig == nil && cc.sc.LB == nil) {
// No service config or no LB policy specified in config.
newBalancerName = PickFirstBalancerName
} else if cc.sc.lbConfig != nil {
newBalancerName = cc.sc.lbConfig.name
} else {
var isGRPCLB bool
for _, a := range addrs {
if a.Type == resolver.GRPCLB {
isGRPCLB = true
break
}
}
if isGRPCLB {
newBalancerName = grpclbName
} else if cc.sc != nil && cc.sc.LB != nil {
newBalancerName = *cc.sc.LB
} else {
newBalancerName = PickFirstBalancerName
}
} else { // cc.sc.LB != nil
newBalancerName = *cc.sc.LB
}
cc.balancerWrapper.switchTo(newBalancerName)
}
Expand Down
160 changes: 30 additions & 130 deletions test/balancer_switching_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
grpclbstate "google.golang.org/grpc/balancer/grpclb/state"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancer/stub"
Expand Down Expand Up @@ -168,10 +169,13 @@ func (s) TestBalancerSwitch_grpclbToPickFirst(t *testing.T) {
}
defer cc.Close()

// Push a resolver update with no service config and a single address pointing
// to the grpclb server we created above. This will cause the channel to
// switch to the "grpclb" balancer, which returns a single backend address.
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: lbServer.Address(), Type: resolver.GRPCLB}}})
// Push a resolver update with a GRPCLB service config and a single address
// pointing to the grpclb server we created above. This will cause the
// channel to switch to the "grpclb" balancer, which returns a single
// backend address.
grpclbConfig := parseServiceConfig(t, r, `{"loadBalancingPolicy": "grpclb"}`)
state := resolver.State{ServiceConfig: grpclbConfig}
r.UpdateState(grpclbstate.Set(state, &grpclbstate.State{BalancerAddresses: []resolver.Address{{Addr: lbServer.Address()}}}))
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
client := testgrpc.NewTestServiceClient(cc)
Expand All @@ -182,7 +186,7 @@ func (s) TestBalancerSwitch_grpclbToPickFirst(t *testing.T) {
// Push a resolver update containing a non-existent grpclb server address.
// This should not lead to a balancer switch.
const nonExistentServer = "non-existent-grpclb-server-address"
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: nonExistentServer, Type: resolver.GRPCLB}}})
r.UpdateState(grpclbstate.Set(state, &grpclbstate.State{BalancerAddresses: []resolver.Address{{Addr: nonExistentServer}}}))
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[:1]); err != nil {
t.Fatal(err)
}
Expand All @@ -192,7 +196,8 @@ func (s) TestBalancerSwitch_grpclbToPickFirst(t *testing.T) {
// list of addresses pushed as part of this update is different from the one
// returned by the "grpclb" balancer. So, we should see RPCs going to the
// newly configured backends, as part of the balancer switch.
r.UpdateState(resolver.State{Addresses: addrs[1:]})
emptyConfig := parseServiceConfig(t, r, `{}`)
r.UpdateState(resolver.State{Addresses: addrs[1:], ServiceConfig: emptyConfig})
if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[1]); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -225,21 +230,24 @@ func (s) TestBalancerSwitch_pickFirstToGRPCLB(t *testing.T) {
// Push a resolver update with no service config and a single address pointing
// to the grpclb server we created above. This will cause the channel to
// switch to the "grpclb" balancer, which returns a single backend address.
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: lbServer.Address(), Type: resolver.GRPCLB}}})
grpclbConfig := parseServiceConfig(t, r, `{"loadBalancingPolicy": "grpclb"}`)
state := resolver.State{ServiceConfig: grpclbConfig}
r.UpdateState(grpclbstate.Set(state, &grpclbstate.State{BalancerAddresses: []resolver.Address{{Addr: lbServer.Address()}}}))
client := testgrpc.NewTestServiceClient(cc)
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[:1]); err != nil {
t.Fatal(err)
}

// Push a resolver update containing a non-existent grpclb server address.
// This should not lead to a balancer switch.
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "nonExistentServer", Type: resolver.GRPCLB}}})
r.UpdateState(grpclbstate.Set(state, &grpclbstate.State{BalancerAddresses: []resolver.Address{{Addr: "nonExistentServer"}}}))
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[:1]); err != nil {
t.Fatal(err)
}

// Switch to "pick_first" again by sending no grpclb server addresses.
r.UpdateState(resolver.State{Addresses: addrs[1:]})
emptyConfig := parseServiceConfig(t, r, `{}`)
r.UpdateState(resolver.State{Addresses: addrs[1:], ServiceConfig: emptyConfig})
if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[1]); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -284,13 +292,13 @@ func (s) TestBalancerSwitch_RoundRobinToGRPCLB(t *testing.T) {
t.Fatal(err)
}

// Push a resolver update with no service config and a single address pointing
// to the grpclb server we created above. This will cause the channel to
// switch to the "grpclb" balancer, which returns a single backend address.
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: lbServer.Address(), Type: resolver.GRPCLB}},
ServiceConfig: scpr,
})
// Push a resolver update with grpclb and a single balancer address
// pointing to the grpclb server we created above. This will cause the
// channel to switch to the "grpclb" balancer, which returns a single
// backend address.
grpclbConfig := parseServiceConfig(t, r, `{"loadBalancingPolicy": "grpclb"}`)
state := resolver.State{ServiceConfig: grpclbConfig}
r.UpdateState(grpclbstate.Set(state, &grpclbstate.State{BalancerAddresses: []resolver.Address{{Addr: lbServer.Address()}}}))
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[:1]); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -329,12 +337,13 @@ func (s) TestBalancerSwitch_grpclbNotRegistered(t *testing.T) {
// fallback to the default LB policy which is pick_first. The ClientConn is
// also expected to filter out the grpclb address when sending the addresses
// list fo pick_first.
grpclbAddr := []resolver.Address{{Addr: "non-existent-grpclb-server-address", Type: resolver.GRPCLB}}
addrs = append(grpclbAddr, addrs...)
r.UpdateState(resolver.State{Addresses: addrs})
grpclbAddr := []resolver.Address{{Addr: "non-existent-grpclb-server-address"}}
grpclbConfig := parseServiceConfig(t, r, `{"loadBalancingPolicy": "grpclb"}`)
state := resolver.State{ServiceConfig: grpclbConfig, Addresses: addrs}
r.UpdateState(grpclbstate.Set(state, &grpclbstate.State{BalancerAddresses: grpclbAddr}))
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[1]); err != nil {
if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
t.Fatal(err)
}

Expand All @@ -346,116 +355,7 @@ func (s) TestBalancerSwitch_grpclbNotRegistered(t *testing.T) {
ServiceConfig: parseServiceConfig(t, r, rrServiceConfig),
})
client := testgrpc.NewTestServiceClient(cc)
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[1:]); err != nil {
t.Fatal(err)
}
}

// TestBalancerSwitch_grpclbAddressOverridesLoadBalancingPolicy verifies that
// if the resolver update contains any addresses of type "grpclb", it overrides
// the LB policy specifies in the deprecated `loadBalancingPolicy` field of the
// service config.
func (s) TestBalancerSwitch_grpclbAddressOverridesLoadBalancingPolicy(t *testing.T) {
backends, lbServer, cleanup := setupBackendsAndFakeGRPCLB(t)
defer cleanup()

addrs := stubBackendsToResolverAddrs(backends)
r := manual.NewBuilderWithScheme("whatever")
target := fmt.Sprintf("%s:///%s", r.Scheme(), loadBalancedServiceName)
cc, err := grpc.Dial(target, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
defer cc.Close()

// Push a resolver update containing no grpclb server address. This should
// lead to the channel using the default LB policy which is pick_first.
r.UpdateState(resolver.State{Addresses: addrs[1:]})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[1]); err != nil {
t.Fatal(err)
}

// Push a resolver update with no service config. The addresses list contains
// the stub backend addresses and a single address pointing to the grpclb
// server we created above. This will cause the channel to switch to the
// "grpclb" balancer, which returns a single backend address.
r.UpdateState(resolver.State{
Addresses: append(addrs[1:], resolver.Address{Addr: lbServer.Address(), Type: resolver.GRPCLB}),
})
client := testgrpc.NewTestServiceClient(cc)
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[:1]); err != nil {
t.Fatal(err)
}

// Push a resolver update with a service config using the deprecated
// `loadBalancingPolicy` field pointing to round_robin. The addresses list
// contains an address of type "grpclb". This should be preferred and hence
// there should be no balancer switch.
scpr := parseServiceConfig(t, r, `{"loadBalancingPolicy": "round_robin"}`)
r.UpdateState(resolver.State{
Addresses: append(addrs[1:], resolver.Address{Addr: lbServer.Address(), Type: resolver.GRPCLB}),
ServiceConfig: scpr,
})
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[:1]); err != nil {
t.Fatal(err)
}

// Switch to "round_robin" by removing the address of type "grpclb".
r.UpdateState(resolver.State{Addresses: addrs[1:]})
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[1:]); err != nil {
t.Fatal(err)
}
}

// TestBalancerSwitch_LoadBalancingConfigTrumps verifies that the
// `loadBalancingConfig` field in the service config trumps over addresses of
// type "grpclb" when it comes to deciding which LB policy is applied on the
// channel.
func (s) TestBalancerSwitch_LoadBalancingConfigTrumps(t *testing.T) {
backends, lbServer, cleanup := setupBackendsAndFakeGRPCLB(t)
defer cleanup()

addrs := stubBackendsToResolverAddrs(backends)
r := manual.NewBuilderWithScheme("whatever")
target := fmt.Sprintf("%s:///%s", r.Scheme(), loadBalancedServiceName)
cc, err := grpc.Dial(target, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
defer cc.Close()

// Push a resolver update with no service config and a single address pointing
// to the grpclb server we created above. This will cause the channel to
// switch to the "grpclb" balancer, which returns a single backend address.
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: lbServer.Address(), Type: resolver.GRPCLB}}})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
client := testgrpc.NewTestServiceClient(cc)
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[:1]); err != nil {
t.Fatal(err)
}

// Push a resolver update with the service config specifying "round_robin"
// through the recommended `loadBalancingConfig` field.
r.UpdateState(resolver.State{
Addresses: addrs[1:],
ServiceConfig: parseServiceConfig(t, r, rrServiceConfig),
})
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[1:]); err != nil {
t.Fatal(err)
}

// Push a resolver update with no service config and an address of type
// "grpclb". The ClientConn should continue to use the service config received
// earlier, which specified the use of "round_robin" through the
// `loadBalancingConfig` field, and therefore the balancer should not be
// switched. And because the `loadBalancingConfig` field trumps everything
// else, the address of type "grpclb" should be ignored.
grpclbAddr := resolver.Address{Addr: "non-existent-grpclb-server-address", Type: resolver.GRPCLB}
r.UpdateState(resolver.State{Addresses: append(addrs[1:], grpclbAddr)})
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[1:]); err != nil {
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs); err != nil {
t.Fatal(err)
}
}
Expand Down
Loading

0 comments on commit 5258fd2

Please sign in to comment.