Skip to content

Commit

Permalink
grpclb: recover after receiving an empty server list (#4879)
Browse files Browse the repository at this point in the history
  • Loading branch information
menghanl authored Oct 20, 2021
1 parent 0d50307 commit bd0f881
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 2 deletions.
12 changes: 10 additions & 2 deletions balancer/grpclb/grpclb_remote_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,19 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback
}

if lb.usePickFirst {
var sc balancer.SubConn
for _, sc = range lb.subConns {
var (
scKey resolver.Address
sc balancer.SubConn
)
for scKey, sc = range lb.subConns {
break
}
if sc != nil {
if len(backendAddrs) == 0 {
lb.cc.cc.RemoveSubConn(sc)
delete(lb.subConns, scKey)
return
}
lb.cc.cc.UpdateAddresses(sc, backendAddrs)
sc.Connect()
return
Expand Down
67 changes: 67 additions & 0 deletions balancer/grpclb/grpclb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1274,6 +1274,73 @@ func (s) TestGRPCLBBackendConnectionErrorPropagation(t *testing.T) {
wg.Wait()
}

func testGRPCLBEmptyServerList(t *testing.T, svcfg string) {
r := manual.NewBuilderWithScheme("whatever")

tss, cleanup, err := startBackendsAndRemoteLoadBalancer(1, "", nil)
if err != nil {
t.Fatalf("failed to create new load balancer: %v", err)
}
defer cleanup()

beServers := []*lbpb.Server{{
IpAddress: tss.beIPs[0],
Port: int32(tss.bePorts[0]),
LoadBalanceToken: lbToken,
}}

creds := serverNameCheckCreds{}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName,
grpc.WithResolvers(r),
grpc.WithTransportCredentials(&creds),
grpc.WithContextDialer(fakeNameDialer))
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
}
defer cc.Close()
testC := testpb.NewTestServiceClient(cc)

tss.ls.sls <- &lbpb.ServerList{Servers: beServers}

rs := grpclbstate.Set(resolver.State{ServiceConfig: r.CC.ParseServiceConfig(svcfg)},
&grpclbstate.State{BalancerAddresses: []resolver.Address{{
Addr: tss.lbAddr,
ServerName: lbServerName,
}}})
r.UpdateState(rs)
t.Log("Perform an initial RPC and expect it to succeed...")
if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
t.Fatalf("Initial _.EmptyCall(_, _) = _, %v, want _, <nil>", err)
}
t.Log("Now send an empty server list. Wait until we see an RPC failure to make sure the client got it...")
tss.ls.sls <- &lbpb.ServerList{}
gotError := false
for i := 0; i < 100; i++ {
if _, err := testC.EmptyCall(ctx, &testpb.Empty{}); err != nil {
gotError = true
break
}
}
if !gotError {
t.Fatalf("Expected to eventually see an RPC fail after the grpclb sends an empty server list, but none did.")
}
t.Log("Now send a non-empty server list. A wait-for-ready RPC should now succeed...")
tss.ls.sls <- &lbpb.ServerList{Servers: beServers}
if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
t.Fatalf("Final _.EmptyCall(_, _) = _, %v, want _, <nil>", err)
}
}

func (s) TestGRPCLBEmptyServerListRoundRobin(t *testing.T) {
testGRPCLBEmptyServerList(t, `{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"round_robin":{}}]}}]}`)
}

func (s) TestGRPCLBEmptyServerListPickFirst(t *testing.T) {
testGRPCLBEmptyServerList(t, `{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"pick_first":{}}]}}]}`)
}

func (s) TestGRPCLBWithTargetNameFieldInConfig(t *testing.T) {
r := manual.NewBuilderWithScheme("whatever")

Expand Down

0 comments on commit bd0f881

Please sign in to comment.