From f62b98b177594cdca92be0c9519e7d89d5687bac Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Fri, 14 Apr 2017 11:11:06 -0700 Subject: [PATCH] integration: test 'inflight' range requests Signed-off-by: Gyu-Ho Lee --- integration/cluster.go | 5 +++ ...nance_test.go => v3_grpc_inflight_test.go} | 42 ++++++++++++++++++- 2 files changed, 45 insertions(+), 2 deletions(-) rename integration/{v3_maintenance_test.go => v3_grpc_inflight_test.go} (65%) diff --git a/integration/cluster.go b/integration/cluster.go index 211758ce532b..80a2f713c5ce 100644 --- a/integration/cluster.go +++ b/integration/cluster.go @@ -532,6 +532,11 @@ func mustNewMember(t *testing.T, mcfg memberConfig) *member { m.TickMs = uint(tickDuration / time.Millisecond) m.QuotaBackendBytes = mcfg.quotaBackendBytes m.AuthToken = "simple" // for the purpose of integration testing, simple token is enough + m.ServerConfig.OnShutdown = func() { + if m.grpcServer != nil { + m.grpcServer.GracefulStop() + } + } return m } diff --git a/integration/v3_maintenance_test.go b/integration/v3_grpc_inflight_test.go similarity index 65% rename from integration/v3_maintenance_test.go rename to integration/v3_grpc_inflight_test.go index e82219230241..a6d240f8b596 100644 --- a/integration/v3_maintenance_test.go +++ b/integration/v3_grpc_inflight_test.go @@ -15,14 +15,15 @@ package integration import ( + "sync" "testing" "time" - "google.golang.org/grpc" - pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/pkg/testutil" + "golang.org/x/net/context" + "google.golang.org/grpc" ) // TestV3MaintenanceHashInflight ensures inflight Hash call @@ -75,3 +76,40 @@ func TestV3MaintenanceDefragmentInflightRange(t *testing.T) { <-donec } + +// TestV3KVInflightRangeRequests ensures that inflight requests +// (sent before server shutdown) are gracefully handled by server-side. +// They are either finished or canceled, but never crash the backend. +// See https://github.com/coreos/etcd/issues/7322 for more detail. +func TestV3KVInflightRangeRequests(t *testing.T) { + defer testutil.AfterTest(t) + clus := NewClusterV3(t, &ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + cli := clus.RandClient() + kvc := toGRPC(cli).KV + + if _, err := kvc.Put(context.Background(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}); err != nil { + t.Fatal(err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + + reqN := 10 // use 500+ for fast machine + var wg sync.WaitGroup + wg.Add(reqN) + for i := 0; i < reqN; i++ { + go func() { + defer wg.Done() + _, err := kvc.Range(ctx, &pb.RangeRequest{Key: []byte("foo"), Serializable: true}, grpc.FailFast(false)) + if err != nil && grpc.ErrorDesc(err) != context.Canceled.Error() { + t.Fatalf("inflight request should be canceld with %v, got %v", context.Canceled, err) + } + }() + } + + clus.Members[0].s.HardStop() + cancel() + + wg.Wait() +}