diff --git a/embed/etcd.go b/embed/etcd.go index da1d8f610c3..4d23a04009c 100644 --- a/embed/etcd.go +++ b/embed/etcd.go @@ -148,6 +148,14 @@ func (e *Etcd) Config() Config { func (e *Etcd) Close() { e.closeOnce.Do(func() { close(e.stopc) }) + // (gRPC server) stops accepting new connections, + // RPCs, and blocks until all pending RPCs are finished + for _, sctx := range e.sctxs { + for gs := range sctx.grpcServerC { + gs.GracefulStop() + } + } + for _, sctx := range e.sctxs { sctx.cancel() } diff --git a/embed/serve.go b/embed/serve.go index 46634b7c5f1..02c093ff81c 100644 --- a/embed/serve.go +++ b/embed/serve.go @@ -52,11 +52,14 @@ type serveCtx struct { userHandlers map[string]http.Handler serviceRegister func(*grpc.Server) + grpcServerC chan *grpc.Server } func newServeCtx() *serveCtx { ctx, cancel := context.WithCancel(context.Background()) - return &serveCtx{ctx: ctx, cancel: cancel, userHandlers: make(map[string]http.Handler)} + return &serveCtx{ctx: ctx, cancel: cancel, userHandlers: make(map[string]http.Handler), + grpcServerC: make(chan *grpc.Server, 2), // in case sctx.insecure,sctx.secure true + } } // serve accepts incoming connections on the listener l, @@ -72,8 +75,11 @@ func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlscfg *tls.Config, handle servElection := v3election.NewElectionServer(v3c) servLock := v3lock.NewLockServer(v3c) + defer close(sctx.grpcServerC) + if sctx.insecure { gs := v3rpc.Server(s, nil) + sctx.grpcServerC <- gs v3electionpb.RegisterElectionServer(gs, servElection) v3lockpb.RegisterLockServer(gs, servLock) if sctx.serviceRegister != nil { @@ -103,6 +109,7 @@ func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlscfg *tls.Config, handle if sctx.secure { gs := v3rpc.Server(s, tlscfg) + sctx.grpcServerC <- gs v3electionpb.RegisterElectionServer(gs, servElection) v3lockpb.RegisterLockServer(gs, servLock) if sctx.serviceRegister != nil { diff --git a/etcdmain/etcd.go b/etcdmain/etcd.go index d4edb56b51c..2f7f00d61ad 100644 --- a/etcdmain/etcd.go +++ b/etcdmain/etcd.go @@ -187,7 +187,7 @@ func startEtcd(cfg *embed.Config) (<-chan struct{}, <-chan error, error) { if err != nil { return nil, nil, err } - osutil.RegisterInterruptHandler(e.Server.Stop) + osutil.RegisterInterruptHandler(e.Close) select { case <-e.Server.ReadyNotify(): // wait for e.Server to join the cluster case <-e.Server.StopNotify(): // publish aborted from 'ErrStopped' diff --git a/integration/cluster.go b/integration/cluster.go index 211758ce532..b2e0566acff 100644 --- a/integration/cluster.go +++ b/integration/cluster.go @@ -719,7 +719,7 @@ func (m *member) Close() { m.serverClient = nil } if m.grpcServer != nil { - m.grpcServer.Stop() + m.grpcServer.GracefulStop() m.grpcServer = nil } m.s.HardStop() diff --git a/integration/v3_maintenance_test.go b/integration/v3_grpc_inflight_test.go similarity index 63% rename from integration/v3_maintenance_test.go rename to integration/v3_grpc_inflight_test.go index e8221923024..1994af06dd9 100644 --- a/integration/v3_maintenance_test.go +++ b/integration/v3_grpc_inflight_test.go @@ -15,63 +15,78 @@ package integration import ( + "sync" "testing" "time" - "google.golang.org/grpc" - pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/pkg/testutil" + "golang.org/x/net/context" + "google.golang.org/grpc" ) -// TestV3MaintenanceHashInflight ensures inflight Hash call -// to embedded being-stopped EtcdServer does not trigger panic. -func TestV3MaintenanceHashInflight(t *testing.T) { +// TestV3MaintenanceDefragmentInflightRange ensures inflight range requests +// does not panic the mvcc backend while defragment is running. +func TestV3MaintenanceDefragmentInflightRange(t *testing.T) { defer testutil.AfterTest(t) clus := NewClusterV3(t, &ClusterConfig{Size: 1}) defer clus.Terminate(t) cli := clus.RandClient() - mvc := toGRPC(cli).Maintenance + kvc := toGRPC(cli).KV + if _, err := kvc.Put(context.Background(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}); err != nil { + t.Fatal(err) + } + ctx, cancel := context.WithTimeout(context.Background(), time.Second) donec := make(chan struct{}) go func() { defer close(donec) - mvc.Hash(ctx, &pb.HashRequest{}, grpc.FailFast(false)) + kvc.Range(ctx, &pb.RangeRequest{Key: []byte("foo")}) }() - clus.Members[0].s.HardStop() + mvc := toGRPC(cli).Maintenance + mvc.Defragment(context.Background(), &pb.DefragmentRequest{}) cancel() <-donec } -// TestV3MaintenanceDefragmentInflightRange ensures inflight range requests -// does not panic the mvcc backend while defragment is running. -func TestV3MaintenanceDefragmentInflightRange(t *testing.T) { +// TestV3KVInflightRangeRequests ensures that inflight requests +// (sent before server shutdown) are gracefully handled by server-side. +// They are either finished or canceled, but never crash the backend. +// See https://github.com/coreos/etcd/issues/7322 for more detail. +func TestV3KVInflightRangeRequests(t *testing.T) { defer testutil.AfterTest(t) clus := NewClusterV3(t, &ClusterConfig{Size: 1}) defer clus.Terminate(t) cli := clus.RandClient() kvc := toGRPC(cli).KV + if _, err := kvc.Put(context.Background(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}); err != nil { t.Fatal(err) } - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - donec := make(chan struct{}) - go func() { - defer close(donec) - kvc.Range(ctx, &pb.RangeRequest{Key: []byte("foo")}) - }() + reqN := 10 // use 500+ for fast machine + var wg sync.WaitGroup + wg.Add(reqN) + for i := 0; i < reqN; i++ { + go func() { + defer wg.Done() + _, err := kvc.Range(ctx, &pb.RangeRequest{Key: []byte("foo"), Serializable: true}, grpc.FailFast(false)) + if err != nil && grpc.ErrorDesc(err) != context.Canceled.Error() { + t.Fatalf("inflight request should be canceld with %v, got %v", context.Canceled, err) + } + }() + } - mvc := toGRPC(cli).Maintenance - mvc.Defragment(context.Background(), &pb.DefragmentRequest{}) + clus.Members[0].Stop(t) cancel() - <-donec + wg.Wait() } diff --git a/mvcc/backend/batch_tx.go b/mvcc/backend/batch_tx.go index 2cafb9f7e5f..a47f67d49b8 100644 --- a/mvcc/backend/batch_tx.go +++ b/mvcc/backend/batch_tx.go @@ -166,19 +166,11 @@ func (t *batchTx) commit(stop bool) { t.backend.mu.RLock() defer t.backend.mu.RUnlock() - // batchTx.commit(true) calls *bolt.Tx.Commit, which - // initializes *bolt.Tx.db and *bolt.Tx.meta as nil, - // and subsequent *bolt.Tx.Size() call panics. - // - // This nil pointer reference panic happens when: - // 1. batchTx.commit(false) from newBatchTx - // 2. batchTx.commit(true) from stopping backend - // 3. batchTx.commit(false) from inflight mvcc Hash call - // - // Check if db is nil to prevent this panic - if t.tx.DB() != nil { - atomic.StoreInt64(&t.backend.size, t.tx.Size()) - } + // t.tx.DB()==nil if 'CommitAndStop' calls 'batchTx.commit(true)', + // which initializes *bolt.Tx.db and *bolt.Tx.meta as nil; panics t.tx.Size(). + // Server must make sure 'batchTx.commit(false)' does not follow + // 'batchTx.commit(true)' (e.g. stopping backend, and inflight Hash call). + atomic.StoreInt64(&t.backend.size, t.tx.Size()) return } diff --git a/mvcc/kvstore.go b/mvcc/kvstore.go index 6a0e4167dd0..36b3d9a261d 100644 --- a/mvcc/kvstore.go +++ b/mvcc/kvstore.go @@ -146,13 +146,6 @@ func (s *store) compactBarrier(ctx context.Context, ch chan struct{}) { } func (s *store) Hash() (hash uint32, revision int64, err error) { - // TODO: nothing should be able to call into backend when closed - select { - case <-s.stopc: - return 0, 0, ErrClosed - default: - } - s.b.ForceCommit() h, err := s.b.Hash(DefaultIgnores) return h, s.currentRev, err diff --git a/mvcc/kvstore_test.go b/mvcc/kvstore_test.go index 8f1972426cd..f1e8167c3b6 100644 --- a/mvcc/kvstore_test.go +++ b/mvcc/kvstore_test.go @@ -518,20 +518,6 @@ func newTestKeyBytes(rev revision, tombstone bool) []byte { return bytes } -// TestStoreHashAfterForceCommit ensures that later Hash call to -// closed backend with ForceCommit does not panic. -func TestStoreHashAfterForceCommit(t *testing.T) { - be, tmpPath := backend.NewDefaultTmpBackend() - kv := NewStore(be, &lease.FakeLessor{}, nil) - defer os.Remove(tmpPath) - - // as in EtcdServer.HardStop - kv.Close() - be.Close() - - kv.Hash() -} - func newFakeStore() *store { b := &fakeBackend{&fakeBatchTx{ Recorder: &testutil.RecorderBuffered{},