Skip to content

Commit

Permalink
embed: only gracefully shutdown insecure grpc.Server
Browse files Browse the repository at this point in the history
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
  • Loading branch information
gyuho committed Dec 7, 2017
1 parent aae48b6 commit c86c7d5
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 92 deletions.
67 changes: 44 additions & 23 deletions embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ type Etcd struct {
type peerListener struct {
net.Listener
serve func() error
close func(context.Context) error
close func(time.Duration) error
}

// StartEtcd launches the etcd server and HTTP handlers for client/server communication.
Expand All @@ -100,10 +100,9 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
return
}
if !serving {
// errored before starting gRPC server for serveCtx
// errored before starting gRPC server for serveCtx.serversC
for _, sctx := range e.sctxs {
close(sctx.secureGrpcServerC)
close(sctx.insecureGrpcServerC)
close(sctx.serversC)
}
}
e.Close()
Expand Down Expand Up @@ -220,22 +219,30 @@ func (e *Etcd) Config() Config {
return e.cfg
}

// Close gracefully shuts down all servers/listeners.
func (e *Etcd) Close() {
e.closeOnce.Do(func() { close(e.stopc) })

reqTimeout := 2 * time.Second
timeout := 2 * time.Second
if e.Server != nil {
reqTimeout = e.Server.Cfg.ReqTimeout()
timeout = e.Server.Cfg.ReqTimeout()
}
for _, sctx := range e.sctxs {
for ss := range sctx.serversC {
stopServers(ss, timeout)
}
}

for _, sctx := range e.sctxs {
teardownServeCtx(sctx, reqTimeout)
sctx.cancel()
}

for i := range e.Clients {
if e.Clients[i] != nil {
e.Clients[i].Close()
}
}

for i := range e.metricsListeners {
e.metricsListeners[i].Close()
}
Expand All @@ -248,32 +255,45 @@ func (e *Etcd) Close() {
// close all idle connections in peer handler (wait up to 1-second)
for i := range e.Peers {
if e.Peers[i] != nil && e.Peers[i].close != nil {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
e.Peers[i].close(ctx)
cancel()
e.Peers[i].close(time.Second)
}
}
}

func (e *Etcd) stopGRPCServer(gs *grpc.Server) {
timeout := 2 * time.Second
if e.Server != nil {
timeout = e.Server.Cfg.ReqTimeout()
func stopServers(ss *servers, timeout time.Duration) {
shutdownNow := func() {
// first, close the http.Server
ctx, cancel := context.WithTimeout(context.Background(), timeout)
ss.http.Shutdown(ctx)
cancel()

// and then close grpc.Server; cancels all active RPCs
ss.grpc.Stop()
}

// if secure, do not grpc.Server.GracefulStop
// see https://github.com/grpc/grpc-go/issues/1384#issuecomment-317124531
if ss.secure {
shutdownNow()
return
}

// wait until all pending RPCs are finished
ch := make(chan struct{})
go func() {
defer close(ch)
// close listeners to stop accepting new connections,
// will block on any existing transports
gs.GracefulStop()
ss.grpc.GracefulStop()
}()
// wait until all pending RPCs are finished

select {
case <-ch:
case <-time.After(timeout):
// took too long, manually close open transports
// e.g. watch streams
gs.Stop()
shutdownNow()

// concurrent GracefulStop should be interrupted
<-ch
}
Expand All @@ -297,7 +317,7 @@ func startPeerListeners(cfg *Config) (peers []*peerListener, err error) {
for i := range peers {
if peers[i] != nil && peers[i].close != nil {
plog.Info("stopping listening for peers on ", cfg.LPUrls[i].String())
peers[i].close(context.Background())
peers[i].close(time.Second)
}
}
}()
Expand All @@ -311,13 +331,13 @@ func startPeerListeners(cfg *Config) (peers []*peerListener, err error) {
plog.Warningf("The scheme of peer url %s is HTTP while client cert auth (--peer-client-cert-auth) is enabled. Ignored client cert auth for this url.", u.String())
}
}
peers[i] = &peerListener{close: func(context.Context) error { return nil }}
peers[i] = &peerListener{close: func(time.Duration) error { return nil }}
peers[i].Listener, err = rafthttp.NewListener(u, &cfg.PeerTLSInfo)
if err != nil {
return nil, err
}
// once serve, overwrite with 'http.Server.Shutdown'
peers[i].close = func(context.Context) error {
peers[i].close = func(time.Duration) error {
return peers[i].Listener.Close()
}
plog.Info("listening for peers on ", u.String())
Expand All @@ -334,6 +354,7 @@ func (e *Etcd) servePeers() (err error) {
return err
}
}

for _, p := range e.Peers {
gs := v3rpc.Server(e.Server, peerTLScfg)
m := cmux.New(p.Listener)
Expand All @@ -345,12 +366,12 @@ func (e *Etcd) servePeers() (err error) {
}
go srv.Serve(m.Match(cmux.Any()))
p.serve = func() error { return m.Serve() }
p.close = func(ctx context.Context) error {
p.close = func(timeout time.Duration) error {
// gracefully shutdown http.Server
// close open listeners, idle connections
// until context cancel or time-out
e.stopGRPCServer(gs)
return srv.Shutdown(ctx)
stopServers(&servers{secure: peerTLScfg != nil, grpc: gs, http: srv}, timeout)
return nil
}
}

Expand Down
82 changes: 13 additions & 69 deletions embed/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"net"
"net/http"
"strings"
"time"

"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/api/v3client"
Expand Down Expand Up @@ -55,20 +54,19 @@ type serveCtx struct {

userHandlers map[string]http.Handler
serviceRegister func(*grpc.Server)
serversC chan *servers
}

secureHTTPServer *http.Server
secureGrpcServerC chan *grpc.Server
insecureGrpcServerC chan *grpc.Server
type servers struct {
secure bool
grpc *grpc.Server
http *http.Server
}

func newServeCtx() *serveCtx {
ctx, cancel := context.WithCancel(context.Background())
return &serveCtx{
ctx: ctx,
cancel: cancel,
userHandlers: make(map[string]http.Handler),
secureGrpcServerC: make(chan *grpc.Server, 1),
insecureGrpcServerC: make(chan *grpc.Server, 1),
return &serveCtx{ctx: ctx, cancel: cancel, userHandlers: make(map[string]http.Handler),
serversC: make(chan *servers, 2), // in case sctx.insecure,sctx.secure true
}
}

Expand All @@ -92,7 +90,6 @@ func (sctx *serveCtx) serve(

if sctx.insecure {
gs := v3rpc.Server(s, nil, gopts...)
sctx.insecureGrpcServerC <- gs
v3electionpb.RegisterElectionServer(gs, servElection)
v3lockpb.RegisterLockServer(gs, servLock)
if sctx.serviceRegister != nil {
Expand All @@ -101,9 +98,7 @@ func (sctx *serveCtx) serve(
grpcl := m.Match(cmux.HTTP2())
go func() { errHandler(gs.Serve(grpcl)) }()

opts := []grpc.DialOption{
grpc.WithInsecure(),
}
opts := []grpc.DialOption{grpc.WithInsecure()}
gwmux, err := sctx.registerGateway(opts)
if err != nil {
return err
Expand All @@ -117,6 +112,8 @@ func (sctx *serveCtx) serve(
}
httpl := m.Match(cmux.HTTP1())
go func() { errHandler(srvhttp.Serve(httpl)) }()

sctx.serversC <- &servers{grpc: gs, http: srvhttp}
plog.Noticef("serving insecure client requests on %s, this is strongly discouraged!", sctx.l.Addr().String())
}

Expand All @@ -126,7 +123,6 @@ func (sctx *serveCtx) serve(
return tlsErr
}
gs := v3rpc.Server(s, tlscfg, gopts...)
sctx.secureGrpcServerC <- gs
v3electionpb.RegisterElectionServer(gs, servElection)
v3lockpb.RegisterLockServer(gs, servLock)
if sctx.serviceRegister != nil {
Expand Down Expand Up @@ -157,13 +153,12 @@ func (sctx *serveCtx) serve(
ErrorLog: logger, // do not log user error
}
go func() { errHandler(srv.Serve(tlsl)) }()
sctx.secureHTTPServer = srv

sctx.serversC <- &servers{secure: true, grpc: gs, http: srv}
plog.Infof("serving client requests on %s", sctx.l.Addr().String())
}

close(sctx.secureGrpcServerC)
close(sctx.insecureGrpcServerC)
close(sctx.serversC)
return m.Serve()
}

Expand Down Expand Up @@ -279,54 +274,3 @@ func (sctx *serveCtx) registerTrace() {
evf := func(w http.ResponseWriter, r *http.Request) { trace.RenderEvents(w, r, true) }
sctx.registerUserHandler("/debug/events", http.HandlerFunc(evf))
}

// Attempt to gracefully tear down gRPC server(s) and any associated mechanisms
func teardownServeCtx(sctx *serveCtx, timeout time.Duration) {
if sctx.secure && len(sctx.secureGrpcServerC) > 0 {
gs := <-sctx.secureGrpcServerC
stopSecureServer(gs, sctx.secureHTTPServer, timeout)
}

if sctx.insecure && len(sctx.insecureGrpcServerC) > 0 {
gs := <-sctx.insecureGrpcServerC
stopInsecureServer(gs, timeout)
}

// Close any open gRPC connections
sctx.cancel()
}

// When using grpc's ServerHandlerTransport we are responsible for gracefully
// stopping connections and shutting down.
// https://github.com/grpc/grpc-go/issues/1384#issuecomment-317124531
func stopSecureServer(gs *grpc.Server, httpSrv *http.Server, timeout time.Duration) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

// Stop accepting new connections await pending handlers
httpSrv.Shutdown(ctx)

// Teardown gRPC server
gs.Stop()
}

// Gracefully shutdown gRPC server when using HTTP2 transport.
func stopInsecureServer(gs *grpc.Server, timeout time.Duration) {
ch := make(chan struct{})
go func() {
defer close(ch)
// close listeners to stop accepting new connections,
// will block on any existing transports
gs.GracefulStop()
}()
// wait until all pending RPCs are finished
select {
case <-ch:
case <-time.After(timeout):
// took too long, manually close open transports
// e.g. watch streams
gs.Stop()
// concurrent GracefulStop should be interrupted
<-ch
}
}

0 comments on commit c86c7d5

Please sign in to comment.