-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
pickfirst: add tests for resolver error scenarios #6484
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,7 +20,9 @@ package test | |
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
"strings" | ||
"testing" | ||
"time" | ||
|
||
|
@@ -788,3 +790,220 @@ func (s) TestPickFirst_AddressUpdateWithBalancerAttributes(t *testing.T) { | |
t.Fatal(err) | ||
} | ||
} | ||
|
||
// Tests the case where the pick_first LB policy receives an error from the name | ||
// resolver without previously receiving a good update. Verifies that the | ||
// channel moves to TRANSIENT_FAILURE and that error received from the name | ||
// resolver is propagated to the caller of an RPC. | ||
func (s) TestPickFirst_ResolverError_NoPreviousUpdate(t *testing.T) { | ||
cc, r, _ := setupPickFirst(t, 0) | ||
|
||
nrErr := errors.New("error from name resolver") | ||
r.ReportError(nrErr) | ||
|
||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) | ||
defer cancel() | ||
awaitState(ctx, t, cc, connectivity.TransientFailure) | ||
|
||
client := testgrpc.NewTestServiceClient(cc) | ||
_, err := client.EmptyCall(ctx, &testpb.Empty{}) | ||
if err == nil { | ||
t.Fatalf("EmptyCall() succeeded when expected to fail with error: %v", nrErr) | ||
} | ||
if !strings.Contains(err.Error(), nrErr.Error()) { | ||
t.Fatalf("EmptyCall() failed with error: %v, want error: %v", err, nrErr) | ||
} | ||
} | ||
|
||
// Tests the case where the pick_first LB policy receives an error from the name | ||
// resolver after receiving a good update (and the channel is currently READY). | ||
// The test verifies that the channel continues to use the previously received | ||
// good update. | ||
func (s) TestPickFirst_ResolverError_WithPreviousUpdate_Ready(t *testing.T) { | ||
cc, r, backends := setupPickFirst(t, 1) | ||
|
||
addrs := stubBackendsToResolverAddrs(backends) | ||
r.UpdateState(resolver.State{Addresses: addrs}) | ||
|
||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) | ||
defer cancel() | ||
if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
nrErr := errors.New("error from name resolver") | ||
r.ReportError(nrErr) | ||
|
||
// Ensure that RPCs continue to succeed for the next second. | ||
client := testgrpc.NewTestServiceClient(cc) | ||
for end := time.Now().Add(time.Second); time.Now().Before(end); <-time.After(defaultTestShortTimeout) { | ||
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil { | ||
t.Fatalf("EmptyCall() failed: %v", err) | ||
} | ||
} | ||
} | ||
|
||
// Tests the case where the pick_first LB policy receives an error from the name | ||
// resolver after receiving a good update (and the channel is currently in | ||
// CONNECTING state). The test verifies that the channel continues to use the | ||
// previously received good update, and that RPCs don't fail with the error | ||
// received from the name resolver. | ||
func (s) TestPickFirst_ResolverError_WithPreviousUpdate_Connecting(t *testing.T) { | ||
lis, err := testutils.LocalTCPListener() | ||
if err != nil { | ||
t.Fatalf("net.Listen() failed: %v", err) | ||
} | ||
|
||
// Listen on a local port and act like a server that blocks until the | ||
// channel reaches CONNECTING and closes the connection without sending a | ||
// server preface. | ||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) | ||
defer cancel() | ||
waitForConnecting := make(chan struct{}) | ||
go func() { | ||
conn, err := lis.Accept() | ||
if err != nil { | ||
t.Errorf("Unexpected error when accepting a connection: %v", err) | ||
} | ||
defer conn.Close() | ||
|
||
select { | ||
case <-waitForConnecting: | ||
case <-ctx.Done(): | ||
t.Error("Timeout when waiting for channel to move to CONNECTING state") | ||
} | ||
}() | ||
|
||
r := manual.NewBuilderWithScheme("whatever") | ||
dopts := []grpc.DialOption{ | ||
grpc.WithTransportCredentials(insecure.NewCredentials()), | ||
grpc.WithResolvers(r), | ||
grpc.WithDefaultServiceConfig(pickFirstServiceConfig), | ||
} | ||
cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...) | ||
if err != nil { | ||
t.Fatalf("grpc.Dial() failed: %v", err) | ||
} | ||
t.Cleanup(func() { cc.Close() }) | ||
|
||
addrs := []resolver.Address{{Addr: lis.Addr().String()}} | ||
r.UpdateState(resolver.State{Addresses: addrs}) | ||
awaitState(ctx, t, cc, connectivity.Connecting) | ||
|
||
nrErr := errors.New("error from name resolver") | ||
r.ReportError(nrErr) | ||
|
||
// RPCs should fail with deadline exceed error as long as they are in | ||
// CONNECTING and not the error returned by the name resolver. | ||
client := testgrpc.NewTestServiceClient(cc) | ||
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) | ||
defer sCancel() | ||
if _, err := client.EmptyCall(sCtx, &testpb.Empty{}); !strings.Contains(err.Error(), context.DeadlineExceeded.Error()) { | ||
t.Fatalf("EmptyCall() failed with error: %v, want error: %v", err, context.DeadlineExceeded) | ||
} | ||
|
||
// Closing this channel leads to closing of the connection by our listener. | ||
// gRPC should see this as a connection error. | ||
close(waitForConnecting) | ||
awaitState(ctx, t, cc, connectivity.TransientFailure) | ||
checkForConnectionError(ctx, t, cc) | ||
} | ||
|
||
// Tests the case where the pick_first LB policy receives an error from the name | ||
// resolver after receiving a good update. The previous good update though has | ||
// seen the channel move to TRANSIENT_FAILURE. The test verifies that the | ||
// channel fails RPCs with the new error from the resolver. | ||
func (s) TestPickFirst_ResolverError_WithPreviousUpdate_TransientFailure(t *testing.T) { | ||
lis, err := testutils.LocalTCPListener() | ||
if err != nil { | ||
t.Fatalf("net.Listen() failed: %v", err) | ||
} | ||
|
||
// Listen on a local port and act like a server that closes the connection | ||
// without sending a server preface. | ||
go func() { | ||
conn, err := lis.Accept() | ||
if err != nil { | ||
t.Errorf("Unexpected error when accepting a connection: %v", err) | ||
} | ||
conn.Close() | ||
}() | ||
|
||
r := manual.NewBuilderWithScheme("whatever") | ||
dopts := []grpc.DialOption{ | ||
grpc.WithTransportCredentials(insecure.NewCredentials()), | ||
grpc.WithResolvers(r), | ||
grpc.WithDefaultServiceConfig(pickFirstServiceConfig), | ||
} | ||
cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...) | ||
if err != nil { | ||
t.Fatalf("grpc.Dial() failed: %v", err) | ||
} | ||
t.Cleanup(func() { cc.Close() }) | ||
|
||
addrs := []resolver.Address{{Addr: lis.Addr().String()}} | ||
r.UpdateState(resolver.State{Addresses: addrs}) | ||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) | ||
defer cancel() | ||
awaitState(ctx, t, cc, connectivity.TransientFailure) | ||
checkForConnectionError(ctx, t, cc) | ||
|
||
// An error from the name resolver should result in RPCs failing with that | ||
// error instead of the old error that caused the channel to move to | ||
// TRANSIENT_FAILURE in the first place. | ||
nrErr := errors.New("error from name resolver") | ||
r.ReportError(nrErr) | ||
client := testgrpc.NewTestServiceClient(cc) | ||
for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) { | ||
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); strings.Contains(err.Error(), nrErr.Error()) { | ||
break | ||
} | ||
} | ||
if ctx.Err() != nil { | ||
t.Fatal("Timeout when waiting for RPCs to fail with error returned by the name resolver") | ||
} | ||
} | ||
|
||
func checkForConnectionError(ctx context.Context, t *testing.T, cc *grpc.ClientConn) { | ||
t.Helper() | ||
|
||
// RPCs may fail on the client side in two ways, once the fake server closes | ||
// the accepted connection: | ||
// - writing the client preface succeeds, but not reading the server preface | ||
// - writing the client preface fails | ||
const readErr = "error reading server preface" | ||
const writeErr = "write: broken pipe" | ||
client := testgrpc.NewTestServiceClient(cc) | ||
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); !strings.Contains(err.Error(), readErr) && !strings.Contains(err.Error(), writeErr) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FWIW if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Switched to using status code instead. So, this is not required. |
||
t.Fatalf("EmptyCall() failed with error: %v, want %q or %q", err, readErr, writeErr) | ||
} | ||
} | ||
|
||
// Tests the case where the pick_first LB policy receives an update from the | ||
// name resolver with no addresses after receiving a good update. The test | ||
// verifies that the channel fails RPCs with an error indicating the fact that | ||
// the name resolver returned no addresses. | ||
func (s) TestPickFirst_ResolverError_ZeroAddresses_WithPreviousUpdate(t *testing.T) { | ||
cc, r, backends := setupPickFirst(t, 1) | ||
|
||
addrs := stubBackendsToResolverAddrs(backends) | ||
r.UpdateState(resolver.State{Addresses: addrs}) | ||
|
||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) | ||
defer cancel() | ||
if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
r.UpdateState(resolver.State{}) | ||
wantErr := "produced zero addresses" | ||
client := testgrpc.NewTestServiceClient(cc) | ||
for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) { | ||
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); strings.Contains(err.Error(), wantErr) { | ||
break | ||
} | ||
} | ||
if ctx.Err() != nil { | ||
t.Fatal("Timeout when waiting for RPCs to fail with error returned by the name resolver") | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little worried that this will be too brittle, because the
broken pipe
comes from the standard libraries IIRC. Can we settle for checkingstatus.Code(err) == codes.Unavailable
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I started writing this, it always was failing with
error reading server preface
locally. But then, when I pushed my code to GA, it was always failing withbroken pipe
error. I thought about switching tocodes.Unavailable
at that point, but then told myself that this should work. But I take your point, never know what it will fail with on google3. Switched it tocodes.Unavailable
. Thanks.