From c86c7d5ed79b316f59284395e537c55e48b8c2dc Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Thu, 7 Dec 2017 11:02:17 -0800 Subject: [PATCH] embed: only gracefully shutdown insecure grpc.Server Signed-off-by: Gyuho Lee --- embed/etcd.go | 67 +++++++++++++++++++++++++++-------------- embed/serve.go | 82 ++++++++------------------------------------------ 2 files changed, 57 insertions(+), 92 deletions(-) diff --git a/embed/etcd.go b/embed/etcd.go index 314fd0dde5e9..9e896e955b2b 100644 --- a/embed/etcd.go +++ b/embed/etcd.go @@ -82,7 +82,7 @@ type Etcd struct { type peerListener struct { net.Listener serve func() error - close func(context.Context) error + close func(time.Duration) error } // StartEtcd launches the etcd server and HTTP handlers for client/server communication. @@ -100,10 +100,9 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { return } if !serving { - // errored before starting gRPC server for serveCtx + // errored before starting gRPC server for serveCtx.serversC for _, sctx := range e.sctxs { - close(sctx.secureGrpcServerC) - close(sctx.insecureGrpcServerC) + close(sctx.serversC) } } e.Close() @@ -220,15 +219,22 @@ func (e *Etcd) Config() Config { return e.cfg } +// Close gracefully shuts down all servers/listeners. func (e *Etcd) Close() { e.closeOnce.Do(func() { close(e.stopc) }) - reqTimeout := 2 * time.Second + timeout := 2 * time.Second if e.Server != nil { - reqTimeout = e.Server.Cfg.ReqTimeout() + timeout = e.Server.Cfg.ReqTimeout() + } + for _, sctx := range e.sctxs { + for ss := range sctx.serversC { + stopServers(ss, timeout) + } } + for _, sctx := range e.sctxs { - teardownServeCtx(sctx, reqTimeout) + sctx.cancel() } for i := range e.Clients { @@ -236,6 +242,7 @@ func (e *Etcd) Close() { e.Clients[i].Close() } } + for i := range e.metricsListeners { e.metricsListeners[i].Close() } @@ -248,32 +255,45 @@ func (e *Etcd) Close() { // close all idle connections in peer handler (wait up to 1-second) for i := range e.Peers { if e.Peers[i] != nil && e.Peers[i].close != nil { - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - e.Peers[i].close(ctx) - cancel() + e.Peers[i].close(time.Second) } } } -func (e *Etcd) stopGRPCServer(gs *grpc.Server) { - timeout := 2 * time.Second - if e.Server != nil { - timeout = e.Server.Cfg.ReqTimeout() +func stopServers(ss *servers, timeout time.Duration) { + shutdownNow := func() { + // first, close the http.Server + ctx, cancel := context.WithTimeout(context.Background(), timeout) + ss.http.Shutdown(ctx) + cancel() + + // and then close grpc.Server; cancels all active RPCs + ss.grpc.Stop() } + + // if secure, do not grpc.Server.GracefulStop + // see https://github.com/grpc/grpc-go/issues/1384#issuecomment-317124531 + if ss.secure { + shutdownNow() + return + } + + // wait until all pending RPCs are finished ch := make(chan struct{}) go func() { defer close(ch) // close listeners to stop accepting new connections, // will block on any existing transports - gs.GracefulStop() + ss.grpc.GracefulStop() }() - // wait until all pending RPCs are finished + select { case <-ch: case <-time.After(timeout): // took too long, manually close open transports // e.g. watch streams - gs.Stop() + shutdownNow() + // concurrent GracefulStop should be interrupted <-ch } @@ -297,7 +317,7 @@ func startPeerListeners(cfg *Config) (peers []*peerListener, err error) { for i := range peers { if peers[i] != nil && peers[i].close != nil { plog.Info("stopping listening for peers on ", cfg.LPUrls[i].String()) - peers[i].close(context.Background()) + peers[i].close(time.Second) } } }() @@ -311,13 +331,13 @@ func startPeerListeners(cfg *Config) (peers []*peerListener, err error) { plog.Warningf("The scheme of peer url %s is HTTP while client cert auth (--peer-client-cert-auth) is enabled. Ignored client cert auth for this url.", u.String()) } } - peers[i] = &peerListener{close: func(context.Context) error { return nil }} + peers[i] = &peerListener{close: func(time.Duration) error { return nil }} peers[i].Listener, err = rafthttp.NewListener(u, &cfg.PeerTLSInfo) if err != nil { return nil, err } // once serve, overwrite with 'http.Server.Shutdown' - peers[i].close = func(context.Context) error { + peers[i].close = func(time.Duration) error { return peers[i].Listener.Close() } plog.Info("listening for peers on ", u.String()) @@ -334,6 +354,7 @@ func (e *Etcd) servePeers() (err error) { return err } } + for _, p := range e.Peers { gs := v3rpc.Server(e.Server, peerTLScfg) m := cmux.New(p.Listener) @@ -345,12 +366,12 @@ func (e *Etcd) servePeers() (err error) { } go srv.Serve(m.Match(cmux.Any())) p.serve = func() error { return m.Serve() } - p.close = func(ctx context.Context) error { + p.close = func(timeout time.Duration) error { // gracefully shutdown http.Server // close open listeners, idle connections // until context cancel or time-out - e.stopGRPCServer(gs) - return srv.Shutdown(ctx) + stopServers(&servers{secure: peerTLScfg != nil, grpc: gs, http: srv}, timeout) + return nil } } diff --git a/embed/serve.go b/embed/serve.go index 235eda6bd023..2811aaf06417 100644 --- a/embed/serve.go +++ b/embed/serve.go @@ -21,7 +21,6 @@ import ( "net" "net/http" "strings" - "time" "github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/etcdserver/api/v3client" @@ -55,20 +54,19 @@ type serveCtx struct { userHandlers map[string]http.Handler serviceRegister func(*grpc.Server) + serversC chan *servers +} - secureHTTPServer *http.Server - secureGrpcServerC chan *grpc.Server - insecureGrpcServerC chan *grpc.Server +type servers struct { + secure bool + grpc *grpc.Server + http *http.Server } func newServeCtx() *serveCtx { ctx, cancel := context.WithCancel(context.Background()) - return &serveCtx{ - ctx: ctx, - cancel: cancel, - userHandlers: make(map[string]http.Handler), - secureGrpcServerC: make(chan *grpc.Server, 1), - insecureGrpcServerC: make(chan *grpc.Server, 1), + return &serveCtx{ctx: ctx, cancel: cancel, userHandlers: make(map[string]http.Handler), + serversC: make(chan *servers, 2), // in case sctx.insecure,sctx.secure true } } @@ -92,7 +90,6 @@ func (sctx *serveCtx) serve( if sctx.insecure { gs := v3rpc.Server(s, nil, gopts...) - sctx.insecureGrpcServerC <- gs v3electionpb.RegisterElectionServer(gs, servElection) v3lockpb.RegisterLockServer(gs, servLock) if sctx.serviceRegister != nil { @@ -101,9 +98,7 @@ func (sctx *serveCtx) serve( grpcl := m.Match(cmux.HTTP2()) go func() { errHandler(gs.Serve(grpcl)) }() - opts := []grpc.DialOption{ - grpc.WithInsecure(), - } + opts := []grpc.DialOption{grpc.WithInsecure()} gwmux, err := sctx.registerGateway(opts) if err != nil { return err @@ -117,6 +112,8 @@ func (sctx *serveCtx) serve( } httpl := m.Match(cmux.HTTP1()) go func() { errHandler(srvhttp.Serve(httpl)) }() + + sctx.serversC <- &servers{grpc: gs, http: srvhttp} plog.Noticef("serving insecure client requests on %s, this is strongly discouraged!", sctx.l.Addr().String()) } @@ -126,7 +123,6 @@ func (sctx *serveCtx) serve( return tlsErr } gs := v3rpc.Server(s, tlscfg, gopts...) - sctx.secureGrpcServerC <- gs v3electionpb.RegisterElectionServer(gs, servElection) v3lockpb.RegisterLockServer(gs, servLock) if sctx.serviceRegister != nil { @@ -157,13 +153,12 @@ func (sctx *serveCtx) serve( ErrorLog: logger, // do not log user error } go func() { errHandler(srv.Serve(tlsl)) }() - sctx.secureHTTPServer = srv + sctx.serversC <- &servers{secure: true, grpc: gs, http: srv} plog.Infof("serving client requests on %s", sctx.l.Addr().String()) } - close(sctx.secureGrpcServerC) - close(sctx.insecureGrpcServerC) + close(sctx.serversC) return m.Serve() } @@ -279,54 +274,3 @@ func (sctx *serveCtx) registerTrace() { evf := func(w http.ResponseWriter, r *http.Request) { trace.RenderEvents(w, r, true) } sctx.registerUserHandler("/debug/events", http.HandlerFunc(evf)) } - -// Attempt to gracefully tear down gRPC server(s) and any associated mechanisms -func teardownServeCtx(sctx *serveCtx, timeout time.Duration) { - if sctx.secure && len(sctx.secureGrpcServerC) > 0 { - gs := <-sctx.secureGrpcServerC - stopSecureServer(gs, sctx.secureHTTPServer, timeout) - } - - if sctx.insecure && len(sctx.insecureGrpcServerC) > 0 { - gs := <-sctx.insecureGrpcServerC - stopInsecureServer(gs, timeout) - } - - // Close any open gRPC connections - sctx.cancel() -} - -// When using grpc's ServerHandlerTransport we are responsible for gracefully -// stopping connections and shutting down. -// https://github.com/grpc/grpc-go/issues/1384#issuecomment-317124531 -func stopSecureServer(gs *grpc.Server, httpSrv *http.Server, timeout time.Duration) { - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - - // Stop accepting new connections await pending handlers - httpSrv.Shutdown(ctx) - - // Teardown gRPC server - gs.Stop() -} - -// Gracefully shutdown gRPC server when using HTTP2 transport. -func stopInsecureServer(gs *grpc.Server, timeout time.Duration) { - ch := make(chan struct{}) - go func() { - defer close(ch) - // close listeners to stop accepting new connections, - // will block on any existing transports - gs.GracefulStop() - }() - // wait until all pending RPCs are finished - select { - case <-ch: - case <-time.After(timeout): - // took too long, manually close open transports - // e.g. watch streams - gs.Stop() - // concurrent GracefulStop should be interrupted - <-ch - } -}