diff --git a/integration/cluster.go b/integration/cluster.go index 211758ce532b..80a2f713c5ce 100644 --- a/integration/cluster.go +++ b/integration/cluster.go @@ -532,6 +532,11 @@ func mustNewMember(t *testing.T, mcfg memberConfig) *member { m.TickMs = uint(tickDuration / time.Millisecond) m.QuotaBackendBytes = mcfg.quotaBackendBytes m.AuthToken = "simple" // for the purpose of integration testing, simple token is enough + m.ServerConfig.OnShutdown = func() { + if m.grpcServer != nil { + m.grpcServer.GracefulStop() + } + } return m } diff --git a/integration/v3_maintenance_test.go b/integration/v3_grpc_inflight_test.go similarity index 69% rename from integration/v3_maintenance_test.go rename to integration/v3_grpc_inflight_test.go index e82219230241..e1ab5c2aa91a 100644 --- a/integration/v3_maintenance_test.go +++ b/integration/v3_grpc_inflight_test.go @@ -20,6 +20,8 @@ import ( "google.golang.org/grpc" + "sync" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/pkg/testutil" "golang.org/x/net/context" @@ -75,3 +77,38 @@ func TestV3MaintenanceDefragmentInflightRange(t *testing.T) { <-donec } + +// TestV3KVInflightRangeRequests ensures that inflight requests before shutdown can finish +// when gRPC server gracefully shuts down. +func TestV3KVInflightRangeRequests(t *testing.T) { + defer testutil.AfterTest(t) + clus := NewClusterV3(t, &ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + cli := clus.RandClient() + kvc := toGRPC(cli).KV + + if _, err := kvc.Put(context.Background(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}); err != nil { + panic(err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + + reqN := 10 // use 500+ for fast machine + var wg sync.WaitGroup + wg.Add(reqN) + for i := 0; i < reqN; i++ { + go func() { + defer wg.Done() + _, err := kvc.Range(ctx, &pb.RangeRequest{Key: []byte("foo"), Serializable: true}, grpc.FailFast(false)) + if err != nil && grpc.ErrorDesc(err) != context.Canceled.Error() { + t.Fatal(err) + } + }() + } + + clus.Members[0].s.HardStop() + cancel() + + wg.Wait() +}