Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: use gRPC server GracefulStop #7743

Merged
merged 6 commits into from
Apr 18, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,14 @@ func (e *Etcd) Config() Config {
func (e *Etcd) Close() {
e.closeOnce.Do(func() { close(e.stopc) })

// (gRPC server) stops accepting new connections,
// RPCs, and blocks until all pending RPCs are finished
for _, sctx := range e.sctxs {
for gs := range sctx.grpcServerC {
gs.GracefulStop()
}
}

for _, sctx := range e.sctxs {
sctx.cancel()
}
Expand Down
9 changes: 8 additions & 1 deletion embed/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,14 @@ type serveCtx struct {

userHandlers map[string]http.Handler
serviceRegister func(*grpc.Server)
grpcServerC chan *grpc.Server
}

func newServeCtx() *serveCtx {
ctx, cancel := context.WithCancel(context.Background())
return &serveCtx{ctx: ctx, cancel: cancel, userHandlers: make(map[string]http.Handler)}
return &serveCtx{ctx: ctx, cancel: cancel, userHandlers: make(map[string]http.Handler),
grpcServerC: make(chan *grpc.Server, 2), // in case sctx.insecure,sctx.secure true
}
}

// serve accepts incoming connections on the listener l,
Expand All @@ -72,8 +75,11 @@ func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlscfg *tls.Config, handle
servElection := v3election.NewElectionServer(v3c)
servLock := v3lock.NewLockServer(v3c)

defer close(sctx.grpcServerC)

if sctx.insecure {
gs := v3rpc.Server(s, nil)
sctx.grpcServerC <- gs
v3electionpb.RegisterElectionServer(gs, servElection)
v3lockpb.RegisterLockServer(gs, servLock)
if sctx.serviceRegister != nil {
Expand Down Expand Up @@ -103,6 +109,7 @@ func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlscfg *tls.Config, handle

if sctx.secure {
gs := v3rpc.Server(s, tlscfg)
sctx.grpcServerC <- gs
v3electionpb.RegisterElectionServer(gs, servElection)
v3lockpb.RegisterLockServer(gs, servLock)
if sctx.serviceRegister != nil {
Expand Down
2 changes: 1 addition & 1 deletion etcdmain/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func startEtcd(cfg *embed.Config) (<-chan struct{}, <-chan error, error) {
if err != nil {
return nil, nil, err
}
osutil.RegisterInterruptHandler(e.Server.Stop)
osutil.RegisterInterruptHandler(e.Close)
select {
case <-e.Server.ReadyNotify(): // wait for e.Server to join the cluster
case <-e.Server.StopNotify(): // publish aborted from 'ErrStopped'
Expand Down
2 changes: 1 addition & 1 deletion integration/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,7 @@ func (m *member) Close() {
m.serverClient = nil
}
if m.grpcServer != nil {
m.grpcServer.Stop()
m.grpcServer.GracefulStop()
m.grpcServer = nil
}
m.s.HardStop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,63 +15,78 @@
package integration

import (
"sync"
"testing"
"time"

"google.golang.org/grpc"

pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/pkg/testutil"

"golang.org/x/net/context"
"google.golang.org/grpc"
)

// TestV3MaintenanceHashInflight ensures inflight Hash call
// to embedded being-stopped EtcdServer does not trigger panic.
func TestV3MaintenanceHashInflight(t *testing.T) {
// TestV3MaintenanceDefragmentInflightRange ensures inflight range requests
// does not panic the mvcc backend while defragment is running.
func TestV3MaintenanceDefragmentInflightRange(t *testing.T) {
defer testutil.AfterTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
defer clus.Terminate(t)

cli := clus.RandClient()
mvc := toGRPC(cli).Maintenance
kvc := toGRPC(cli).KV
if _, err := kvc.Put(context.Background(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}); err != nil {
t.Fatal(err)
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second)

donec := make(chan struct{})
go func() {
defer close(donec)
mvc.Hash(ctx, &pb.HashRequest{}, grpc.FailFast(false))
kvc.Range(ctx, &pb.RangeRequest{Key: []byte("foo")})
}()

clus.Members[0].s.HardStop()
mvc := toGRPC(cli).Maintenance
mvc.Defragment(context.Background(), &pb.DefragmentRequest{})
cancel()

<-donec
}

// TestV3MaintenanceDefragmentInflightRange ensures inflight range requests
// does not panic the mvcc backend while defragment is running.
func TestV3MaintenanceDefragmentInflightRange(t *testing.T) {
// TestV3KVInflightRangeRequests ensures that inflight requests
// (sent before server shutdown) are gracefully handled by server-side.
// They are either finished or canceled, but never crash the backend.
// See https://github.com/coreos/etcd/issues/7322 for more detail.
func TestV3KVInflightRangeRequests(t *testing.T) {
defer testutil.AfterTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
defer clus.Terminate(t)

cli := clus.RandClient()
kvc := toGRPC(cli).KV

if _, err := kvc.Put(context.Background(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}); err != nil {
t.Fatal(err)
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)

donec := make(chan struct{})
go func() {
defer close(donec)
kvc.Range(ctx, &pb.RangeRequest{Key: []byte("foo")})
}()
reqN := 10 // use 500+ for fast machine
var wg sync.WaitGroup
wg.Add(reqN)
for i := 0; i < reqN; i++ {
go func() {
defer wg.Done()
_, err := kvc.Range(ctx, &pb.RangeRequest{Key: []byte("foo"), Serializable: true}, grpc.FailFast(false))
if err != nil && grpc.ErrorDesc(err) != context.Canceled.Error() {
t.Fatalf("inflight request should be canceld with %v, got %v", context.Canceled, err)
}
}()
}

mvc := toGRPC(cli).Maintenance
mvc.Defragment(context.Background(), &pb.DefragmentRequest{})
clus.Members[0].Stop(t)
cancel()

<-donec
wg.Wait()
}
18 changes: 5 additions & 13 deletions mvcc/backend/batch_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,19 +166,11 @@ func (t *batchTx) commit(stop bool) {
t.backend.mu.RLock()
defer t.backend.mu.RUnlock()

// batchTx.commit(true) calls *bolt.Tx.Commit, which
// initializes *bolt.Tx.db and *bolt.Tx.meta as nil,
// and subsequent *bolt.Tx.Size() call panics.
//
// This nil pointer reference panic happens when:
// 1. batchTx.commit(false) from newBatchTx
// 2. batchTx.commit(true) from stopping backend
// 3. batchTx.commit(false) from inflight mvcc Hash call
//
// Check if db is nil to prevent this panic
if t.tx.DB() != nil {
atomic.StoreInt64(&t.backend.size, t.tx.Size())
}
// t.tx.DB()==nil if 'CommitAndStop' calls 'batchTx.commit(true)',
// which initializes *bolt.Tx.db and *bolt.Tx.meta as nil; panics t.tx.Size().
// Server must make sure 'batchTx.commit(false)' does not follow
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This probably shouldn't mention the etcd server or gRPC. The contract is independent of all that-- don't have any operations inflight when closing the backend.

// 'batchTx.commit(true)' (e.g. stopping backend, and inflight Hash call).
atomic.StoreInt64(&t.backend.size, t.tx.Size())
return
}

Expand Down
7 changes: 0 additions & 7 deletions mvcc/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,13 +146,6 @@ func (s *store) compactBarrier(ctx context.Context, ch chan struct{}) {
}

func (s *store) Hash() (hash uint32, revision int64, err error) {
// TODO: nothing should be able to call into backend when closed
select {
case <-s.stopc:
return 0, 0, ErrClosed
default:
}

s.b.ForceCommit()
h, err := s.b.Hash(DefaultIgnores)
return h, s.currentRev, err
Expand Down
14 changes: 0 additions & 14 deletions mvcc/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,20 +518,6 @@ func newTestKeyBytes(rev revision, tombstone bool) []byte {
return bytes
}

// TestStoreHashAfterForceCommit ensures that later Hash call to
// closed backend with ForceCommit does not panic.
func TestStoreHashAfterForceCommit(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also remove the select in mvcc.store.Hash, which was faking this

be, tmpPath := backend.NewDefaultTmpBackend()
kv := NewStore(be, &lease.FakeLessor{}, nil)
defer os.Remove(tmpPath)

// as in EtcdServer.HardStop
kv.Close()
be.Close()

kv.Hash()
}

func newFakeStore() *store {
b := &fakeBackend{&fakeBatchTx{
Recorder: &testutil.RecorderBuffered{},
Expand Down