Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid panic on shut down when TLS configuration is present #8986

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,10 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
return
}
if !serving {
// errored before starting gRPC server for serveCtx.grpcServerC
// errored before starting gRPC server for serveCtx
for _, sctx := range e.sctxs {
close(sctx.grpcServerC)
close(sctx.secureGrpcServerC)
close(sctx.insecureGrpcServerC)
}
}
e.Close()
Expand Down Expand Up @@ -222,15 +223,14 @@ func (e *Etcd) Config() Config {
func (e *Etcd) Close() {
e.closeOnce.Do(func() { close(e.stopc) })

for _, sctx := range e.sctxs {
for gs := range sctx.grpcServerC {
e.stopGRPCServer(gs)
}
reqTimeout := 2 * time.Second
if e.Server != nil {
reqTimeout = e.Server.Cfg.ReqTimeout()
}

for _, sctx := range e.sctxs {
sctx.cancel()
teardownServeCtx(sctx, reqTimeout)
}

for i := range e.Clients {
if e.Clients[i] != nil {
e.Clients[i].Close()
Expand Down
73 changes: 67 additions & 6 deletions embed/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"net"
"net/http"
"strings"
"time"

"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/api/v3client"
Expand Down Expand Up @@ -54,13 +55,20 @@ type serveCtx struct {

userHandlers map[string]http.Handler
serviceRegister func(*grpc.Server)
grpcServerC chan *grpc.Server

secureHTTPServer *http.Server
secureGrpcServerC chan *grpc.Server
insecureGrpcServerC chan *grpc.Server
}

func newServeCtx() *serveCtx {
ctx, cancel := context.WithCancel(context.Background())
return &serveCtx{ctx: ctx, cancel: cancel, userHandlers: make(map[string]http.Handler),
grpcServerC: make(chan *grpc.Server, 2), // in case sctx.insecure,sctx.secure true
return &serveCtx{
ctx: ctx,
cancel: cancel,
userHandlers: make(map[string]http.Handler),
secureGrpcServerC: make(chan *grpc.Server, 1),
insecureGrpcServerC: make(chan *grpc.Server, 1),
}
}

Expand All @@ -84,7 +92,7 @@ func (sctx *serveCtx) serve(

if sctx.insecure {
gs := v3rpc.Server(s, nil, gopts...)
sctx.grpcServerC <- gs
sctx.insecureGrpcServerC <- gs
v3electionpb.RegisterElectionServer(gs, servElection)
v3lockpb.RegisterLockServer(gs, servLock)
if sctx.serviceRegister != nil {
Expand Down Expand Up @@ -118,7 +126,7 @@ func (sctx *serveCtx) serve(
return tlsErr
}
gs := v3rpc.Server(s, tlscfg, gopts...)
sctx.grpcServerC <- gs
sctx.secureGrpcServerC <- gs
v3electionpb.RegisterElectionServer(gs, servElection)
v3lockpb.RegisterLockServer(gs, servLock)
if sctx.serviceRegister != nil {
Expand Down Expand Up @@ -149,11 +157,13 @@ func (sctx *serveCtx) serve(
ErrorLog: logger, // do not log user error
}
go func() { errHandler(srv.Serve(tlsl)) }()
sctx.secureHTTPServer = srv

plog.Infof("serving client requests on %s", sctx.l.Addr().String())
}

close(sctx.grpcServerC)
close(sctx.secureGrpcServerC)
close(sctx.insecureGrpcServerC)
return m.Serve()
}

Expand Down Expand Up @@ -269,3 +279,54 @@ func (sctx *serveCtx) registerTrace() {
evf := func(w http.ResponseWriter, r *http.Request) { trace.RenderEvents(w, r, true) }
sctx.registerUserHandler("/debug/events", http.HandlerFunc(evf))
}

// Attempt to gracefully tear down gRPC server(s) and any associated mechanisms
func teardownServeCtx(sctx *serveCtx, timeout time.Duration) {
if sctx.secure && len(sctx.secureGrpcServerC) > 0 {
gs := <-sctx.secureGrpcServerC
stopSecureServer(gs, sctx.secureHTTPServer, timeout)
}

if sctx.insecure && len(sctx.insecureGrpcServerC) > 0 {
gs := <-sctx.insecureGrpcServerC
stopInsecureServer(gs, timeout)
}

// Close any open gRPC connections
sctx.cancel()
}

// When using grpc's ServerHandlerTransport we are responsible for gracefully
// stopping connections and shutting down.
// https://github.com/grpc/grpc-go/issues/1384#issuecomment-317124531
func stopSecureServer(gs *grpc.Server, httpSrv *http.Server, timeout time.Duration) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

// Stop accepting new connections await pending handlers
httpSrv.Shutdown(ctx)

// Teardown gRPC server
gs.Stop()
}

// Gracefully shutdown gRPC server when using HTTP2 transport.
func stopInsecureServer(gs *grpc.Server, timeout time.Duration) {
Copy link
Contributor

@gyuho gyuho Dec 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ch := make(chan struct{})
go func() {
defer close(ch)
// close listeners to stop accepting new connections,
// will block on any existing transports
gs.GracefulStop()
}()
// wait until all pending RPCs are finished
select {
case <-ch:
case <-time.After(timeout):
// took too long, manually close open transports
// e.g. watch streams
gs.Stop()
// concurrent GracefulStop should be interrupted
<-ch
}
}