diff --git a/embed/etcd.go b/embed/etcd.go index da1d8f610c3a..cafb71df9794 100644 --- a/embed/etcd.go +++ b/embed/etcd.go @@ -22,6 +22,8 @@ import ( "path/filepath" "sync" + "google.golang.org/grpc" + "github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/etcdserver/api/v2http" "github.com/coreos/etcd/pkg/cors" @@ -137,6 +139,15 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { if err = e.serve(); err != nil { return } + e.Server.Cfg.OnShutdown = func() { + for _, sctx := range e.sctxs { + for gs := range sctx.grpcServerC { + plog.Warning("gracefully stopping gRPC server") + gs.GracefulStop() + plog.Warning("gracefully stopped gRPC server") + } + } + } return } @@ -343,6 +354,13 @@ func (e *Etcd) serve() (err error) { } func (e *Etcd) errHandler(err error) { + // logs if errored from closed listeners, cmux, stopped gRPC servers, + // and do not send to 'errc' to let etcdserver do rest of its cleaning + if transport.IsClosedConnError(err) || err == grpc.ErrServerStopped { + plog.Warningf("server stopped with %q", err.Error()) + return + } + select { case <-e.stopc: return diff --git a/embed/serve.go b/embed/serve.go index 46634b7c5f1a..af2eab37d892 100644 --- a/embed/serve.go +++ b/embed/serve.go @@ -52,11 +52,12 @@ type serveCtx struct { userHandlers map[string]http.Handler serviceRegister func(*grpc.Server) + grpcServerC chan *grpc.Server } func newServeCtx() *serveCtx { ctx, cancel := context.WithCancel(context.Background()) - return &serveCtx{ctx: ctx, cancel: cancel, userHandlers: make(map[string]http.Handler)} + return &serveCtx{ctx: ctx, cancel: cancel, userHandlers: make(map[string]http.Handler), grpcServerC: make(chan *grpc.Server, 1)} } // serve accepts incoming connections on the listener l, @@ -74,6 +75,8 @@ func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlscfg *tls.Config, handle if sctx.insecure { gs := v3rpc.Server(s, nil) + sctx.grpcServerC <- gs + close(sctx.grpcServerC) v3electionpb.RegisterElectionServer(gs, servElection) v3lockpb.RegisterLockServer(gs, servLock) if sctx.serviceRegister != nil { @@ -103,6 +106,8 @@ func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlscfg *tls.Config, handle if sctx.secure { gs := v3rpc.Server(s, tlscfg) + sctx.grpcServerC <- gs + close(sctx.grpcServerC) v3electionpb.RegisterElectionServer(gs, servElection) v3lockpb.RegisterLockServer(gs, servLock) if sctx.serviceRegister != nil {