From 7fced229c19a090869b20e586759f4f698b8f7e7 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Thu, 27 Jul 2023 11:38:19 -0700 Subject: [PATCH 1/3] pickfirst: add tests for resolver error scenarios --- test/pickfirst_test.go | 216 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 216 insertions(+) diff --git a/test/pickfirst_test.go b/test/pickfirst_test.go index 0e92a543ba4d..38dd02602c13 100644 --- a/test/pickfirst_test.go +++ b/test/pickfirst_test.go @@ -20,7 +20,9 @@ package test import ( "context" + "errors" "fmt" + "strings" "testing" "time" @@ -788,3 +790,217 @@ func (s) TestPickFirst_AddressUpdateWithBalancerAttributes(t *testing.T) { t.Fatal(err) } } + +// Tests the case where the pick_first LB policy receives an error from the name +// resolver without previously receiving a good update. Verifies that the +// channel moves to TRANSIENT_FAILURE and that error received from the name +// resolver is propagated to the caller of an RPC. +func (s) TestPickFirst_ResolverError_NoPreviousUpdate(t *testing.T) { + cc, r, _ := setupPickFirst(t, 0) + + nrErr := errors.New("error from name resolver") + r.ReportError(nrErr) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + awaitState(ctx, t, cc, connectivity.TransientFailure) + + client := testgrpc.NewTestServiceClient(cc) + _, err := client.EmptyCall(ctx, &testpb.Empty{}) + if err == nil { + t.Fatalf("EmptyCall() succeeded when expected to fail with error: %v", nrErr) + } + if !strings.Contains(err.Error(), nrErr.Error()) { + t.Fatalf("EmptyCall() failed with error: %v, want error: %v", err, nrErr) + } +} + +// Tests the case where the pick_first LB policy receives an error from the name +// resolver after receiving a good update (and the channel is currently READY). +// The test verifies that the channel continues to use the previously received +// good update. +func (s) TestPickFirst_ResolverError_WithPreviousUpdate_Ready(t *testing.T) { + cc, r, backends := setupPickFirst(t, 1) + + addrs := stubBackendsToResolverAddrs(backends) + r.UpdateState(resolver.State{Addresses: addrs}) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil { + t.Fatal(err) + } + + nrErr := errors.New("error from name resolver") + r.ReportError(nrErr) + + // Ensure that RPCs continue to succeed for the next second. + client := testgrpc.NewTestServiceClient(cc) + for end := time.Now().Add(time.Second); time.Now().Before(end); <-time.After(defaultTestShortTimeout) { + if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil { + t.Fatalf("EmptyCall() failed: %v", err) + } + } +} + +// Tests the case where the pick_first LB policy receives an error from the name +// resolver after receiving a good update (and the channel is currently in +// CONNECTING state). The test verifies that the channel continues to use the +// previously received good update, and that RPCs don't fail with the error +// received from the name resolver. +func (s) TestPickFirst_ResolverError_WithPreviousUpdate_Connecting(t *testing.T) { + lis, err := testutils.LocalTCPListener() + if err != nil { + t.Fatalf("net.Listen() failed: %v", err) + } + + // Listen on a local port and act like a server that blocks until the + // channel reaches CONNECTING and closes the connection without sending a + // server preface. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + waitForConnecting := make(chan struct{}) + go func() { + conn, err := lis.Accept() + if err != nil { + t.Errorf("Unexpected error when accepting a connection: %v", err) + } + defer conn.Close() + + select { + case <-waitForConnecting: + case <-ctx.Done(): + t.Error("Timeout when waiting for channel to move to CONNECTING state") + } + }() + + r := manual.NewBuilderWithScheme("whatever") + dopts := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithResolvers(r), + grpc.WithDefaultServiceConfig(pickFirstServiceConfig), + } + cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...) + if err != nil { + t.Fatalf("grpc.Dial() failed: %v", err) + } + t.Cleanup(func() { cc.Close() }) + + addrs := []resolver.Address{{Addr: lis.Addr().String()}} + r.UpdateState(resolver.State{Addresses: addrs}) + awaitState(ctx, t, cc, connectivity.Connecting) + + nrErr := errors.New("error from name resolver") + r.ReportError(nrErr) + + // RPCs should fail with deadline exceed error as long as they are in + // CONNECTING and not the error returned by the name resolver. + client := testgrpc.NewTestServiceClient(cc) + sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) + defer sCancel() + if _, err := client.EmptyCall(sCtx, &testpb.Empty{}); !strings.Contains(err.Error(), context.DeadlineExceeded.Error()) { + t.Fatalf("EmptyCall() failed with error: %v, want error: %v", err, context.DeadlineExceeded) + } + + // Closing this channel leads to closing of the connection by our listener. + // gRPC should see this as an error reading server preface. + close(waitForConnecting) + awaitState(ctx, t, cc, connectivity.TransientFailure) + + // RPCs should fail with the error that caused the channel to move to + // TRANSIENT_FAILURE and not the error returned by the name resolver. + wantErr := "error reading server preface" + if _, err := client.EmptyCall(ctx, &testpb.Empty{}); !strings.Contains(err.Error(), wantErr) { + t.Fatalf("EmptyCall() failed with error: %v, want error: %v", err, wantErr) + } +} + +// Tests the case where the pick_first LB policy receives an error from the name +// resolver after receiving a good update. The previous good update though has +// seen the channel move to TRANSIENT_FAILURE. The test verifies that the +// channel fails RPCs with the new error from the resolver. +func (s) TestPickFirst_ResolverError_WithPreviousUpdate_TransientFailure(t *testing.T) { + lis, err := testutils.LocalTCPListener() + if err != nil { + t.Fatalf("net.Listen() failed: %v", err) + } + + // Listen on a local port and act like a server that closes the connection + // without sending a server preface. + go func() { + conn, err := lis.Accept() + if err != nil { + t.Errorf("Unexpected error when accepting a connection: %v", err) + } + conn.Close() + }() + + r := manual.NewBuilderWithScheme("whatever") + dopts := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithResolvers(r), + grpc.WithDefaultServiceConfig(pickFirstServiceConfig), + } + cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...) + if err != nil { + t.Fatalf("grpc.Dial() failed: %v", err) + } + t.Cleanup(func() { cc.Close() }) + + addrs := []resolver.Address{{Addr: lis.Addr().String()}} + r.UpdateState(resolver.State{Addresses: addrs}) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + awaitState(ctx, t, cc, connectivity.TransientFailure) + + // RPCs should fail with the error that caused the channel to move to + // TRANSIENT_FAILURE. + wantErr := "error reading server preface" + client := testgrpc.NewTestServiceClient(cc) + if _, err := client.EmptyCall(ctx, &testpb.Empty{}); !strings.Contains(err.Error(), wantErr) { + t.Fatalf("EmptyCall() failed with error: %v, want error: %v", err, wantErr) + } + + // An error from the name resolver should result in RPCs failing with that + // error instead of the old error that caused the channel to move to + // TRANSIENT_FAILURE in the first place. + nrErr := errors.New("error from name resolver") + r.ReportError(nrErr) + for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) { + if _, err := client.EmptyCall(ctx, &testpb.Empty{}); strings.Contains(err.Error(), nrErr.Error()) { + break + } + } + if ctx.Err() != nil { + t.Fatal("Timeout when waiting for RPCs to fail with error returned by the name resolver") + } +} + +// Tests the case where the pick_first LB policy receives an update from the +// name resolver with no addresses after receiving a good update. The test +// verifies that the channel fails RPCs with an error indicating the fact that +// the name resolver returned no addresses. +func (s) TestPickFirst_ResolverError_ZeroAddresses_WithPreviousUpdate(t *testing.T) { + cc, r, backends := setupPickFirst(t, 1) + + addrs := stubBackendsToResolverAddrs(backends) + r.UpdateState(resolver.State{Addresses: addrs}) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil { + t.Fatal(err) + } + + r.UpdateState(resolver.State{}) + wantErr := "produced zero addresses" + client := testgrpc.NewTestServiceClient(cc) + for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) { + if _, err := client.EmptyCall(ctx, &testpb.Empty{}); strings.Contains(err.Error(), wantErr) { + break + } + } + if ctx.Err() != nil { + t.Fatal("Timeout when waiting for RPCs to fail with error returned by the name resolver") + } +} From 57b82ff22d74a42d62f7817a560db45a3e31701d Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Thu, 27 Jul 2023 12:28:22 -0700 Subject: [PATCH 2/3] address connection errors properly --- test/pickfirst_test.go | 35 +++++++++++++++++++---------------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/test/pickfirst_test.go b/test/pickfirst_test.go index 38dd02602c13..994e4593ec1f 100644 --- a/test/pickfirst_test.go +++ b/test/pickfirst_test.go @@ -903,16 +903,10 @@ func (s) TestPickFirst_ResolverError_WithPreviousUpdate_Connecting(t *testing.T) } // Closing this channel leads to closing of the connection by our listener. - // gRPC should see this as an error reading server preface. + // gRPC should see this as a connection error. close(waitForConnecting) awaitState(ctx, t, cc, connectivity.TransientFailure) - - // RPCs should fail with the error that caused the channel to move to - // TRANSIENT_FAILURE and not the error returned by the name resolver. - wantErr := "error reading server preface" - if _, err := client.EmptyCall(ctx, &testpb.Empty{}); !strings.Contains(err.Error(), wantErr) { - t.Fatalf("EmptyCall() failed with error: %v, want error: %v", err, wantErr) - } + checkForConnectionError(ctx, t, cc) } // Tests the case where the pick_first LB policy receives an error from the name @@ -952,20 +946,14 @@ func (s) TestPickFirst_ResolverError_WithPreviousUpdate_TransientFailure(t *test ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() awaitState(ctx, t, cc, connectivity.TransientFailure) - - // RPCs should fail with the error that caused the channel to move to - // TRANSIENT_FAILURE. - wantErr := "error reading server preface" - client := testgrpc.NewTestServiceClient(cc) - if _, err := client.EmptyCall(ctx, &testpb.Empty{}); !strings.Contains(err.Error(), wantErr) { - t.Fatalf("EmptyCall() failed with error: %v, want error: %v", err, wantErr) - } + checkForConnectionError(ctx, t, cc) // An error from the name resolver should result in RPCs failing with that // error instead of the old error that caused the channel to move to // TRANSIENT_FAILURE in the first place. nrErr := errors.New("error from name resolver") r.ReportError(nrErr) + client := testgrpc.NewTestServiceClient(cc) for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) { if _, err := client.EmptyCall(ctx, &testpb.Empty{}); strings.Contains(err.Error(), nrErr.Error()) { break @@ -976,6 +964,21 @@ func (s) TestPickFirst_ResolverError_WithPreviousUpdate_TransientFailure(t *test } } +func checkForConnectionError(ctx context.Context, t *testing.T, cc *grpc.ClientConn) { + t.Helper() + + // RPCs may fail on the client side in two ways, once the fake server closes + // the accepted connection: + // - writing the client preface succeeds, but not reading the server preface + // - writing the client preface fails + const readErr = "error reading server preface" + const writeErr = "write: broken pipe" + client := testgrpc.NewTestServiceClient(cc) + if _, err := client.EmptyCall(ctx, &testpb.Empty{}); !strings.Contains(err.Error(), readErr) && !strings.Contains(err.Error(), writeErr) { + t.Fatalf("EmptyCall() failed with error: %v, want %q or %q", err, readErr, writeErr) + } +} + // Tests the case where the pick_first LB policy receives an update from the // name resolver with no addresses after receiving a good update. The test // verifies that the channel fails RPCs with an error indicating the fact that From 3e61338f71afbd9cbc230fd7c0adb8cfbc0ee199 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Fri, 28 Jul 2023 09:41:38 -0700 Subject: [PATCH 3/3] review comment --- test/pickfirst_test.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/test/pickfirst_test.go b/test/pickfirst_test.go index 994e4593ec1f..a7496e854125 100644 --- a/test/pickfirst_test.go +++ b/test/pickfirst_test.go @@ -971,11 +971,10 @@ func checkForConnectionError(ctx context.Context, t *testing.T, cc *grpc.ClientC // the accepted connection: // - writing the client preface succeeds, but not reading the server preface // - writing the client preface fails - const readErr = "error reading server preface" - const writeErr = "write: broken pipe" + // In either case, we should see it fail with UNAVAILABLE. client := testgrpc.NewTestServiceClient(cc) - if _, err := client.EmptyCall(ctx, &testpb.Empty{}); !strings.Contains(err.Error(), readErr) && !strings.Contains(err.Error(), writeErr) { - t.Fatalf("EmptyCall() failed with error: %v, want %q or %q", err, readErr, writeErr) + if _, err := client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Unavailable { + t.Fatalf("EmptyCall() failed with error: %v, want code %v", err, codes.Unavailable) } }