diff --git a/embed/etcd.go b/embed/etcd.go index 5e00ce13f1b..94f3aca00ba 100644 --- a/embed/etcd.go +++ b/embed/etcd.go @@ -39,8 +39,8 @@ import ( "github.com/coreos/etcd/pkg/transport" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/rafthttp" - "github.com/coreos/pkg/capnslog" + "github.com/coreos/pkg/capnslog" "github.com/soheilhy/cmux" "google.golang.org/grpc" "google.golang.org/grpc/keepalive" @@ -64,15 +64,17 @@ const ( // Etcd contains a running etcd server and its listeners. type Etcd struct { - Peers []*peerListener - Clients []net.Listener + Peers []*peerListener + Clients []net.Listener + // a map of contexts for the servers that serves client requests. + sctxs map[string]*serveCtx metricsListeners []net.Listener - Server *etcdserver.EtcdServer + + Server *etcdserver.EtcdServer cfg Config stopc chan struct{} errc chan error - sctxs map[string]*serveCtx closeOnce sync.Once } @@ -185,37 +187,16 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { e.Server.Start() - // configure peer handlers after rafthttp.Transport started - ph := etcdhttp.NewPeerHandler(e.Server) - var peerTLScfg *tls.Config - if !cfg.PeerTLSInfo.Empty() { - if peerTLScfg, err = cfg.PeerTLSInfo.ServerConfig(); err != nil { - return e, err - } + if err = e.servePeers(); err != nil { + return e, err } - for _, p := range e.Peers { - gs := v3rpc.Server(e.Server, peerTLScfg) - m := cmux.New(p.Listener) - go gs.Serve(m.Match(cmux.HTTP2())) - srv := &http.Server{ - Handler: grpcHandlerFunc(gs, ph), - ReadTimeout: 5 * time.Minute, - ErrorLog: defaultLog.New(ioutil.Discard, "", 0), // do not log user error - } - go srv.Serve(m.Match(cmux.Any())) - p.serve = func() error { return m.Serve() } - p.close = func(ctx context.Context) error { - // gracefully shutdown http.Server - // close open listeners, idle connections - // until context cancel or time-out - e.stopGRPCServer(gs) - return srv.Shutdown(ctx) - } + if err = e.serveClients(); err != nil { + return e, err } - - if err = e.serve(); err != nil { + if err = e.serveMetrics(); err != nil { return e, err } + serving = true return e, nil } @@ -331,6 +312,44 @@ func startPeerListeners(cfg *Config) (peers []*peerListener, err error) { return peers, nil } +// configure peer handlers after rafthttp.Transport started +func (e *Etcd) servePeers() (err error) { + ph := etcdhttp.NewPeerHandler(e.Server) + var peerTLScfg *tls.Config + if !e.cfg.PeerTLSInfo.Empty() { + if peerTLScfg, err = e.cfg.PeerTLSInfo.ServerConfig(); err != nil { + return err + } + } + for _, p := range e.Peers { + gs := v3rpc.Server(e.Server, peerTLScfg) + m := cmux.New(p.Listener) + go gs.Serve(m.Match(cmux.HTTP2())) + srv := &http.Server{ + Handler: grpcHandlerFunc(gs, ph), + ReadTimeout: 5 * time.Minute, + ErrorLog: defaultLog.New(ioutil.Discard, "", 0), // do not log user error + } + go srv.Serve(m.Match(cmux.Any())) + p.serve = func() error { return m.Serve() } + p.close = func(ctx context.Context) error { + // gracefully shutdown http.Server + // close open listeners, idle connections + // until context cancel or time-out + e.stopGRPCServer(gs) + return srv.Shutdown(ctx) + } + } + + // start peer servers in a goroutine + for _, pl := range e.Peers { + go func(l *peerListener) { + e.errHandler(l.serve()) + }(pl) + } + return nil +} + func startClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err error) { if err = cfg.ClientSelfCert(); err != nil { plog.Fatalf("could not get certs (%v)", err) @@ -412,7 +431,7 @@ func startClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err error) { return sctxs, nil } -func (e *Etcd) serve() (err error) { +func (e *Etcd) serveClients() (err error) { if !e.cfg.ClientTLSInfo.Empty() { plog.Infof("ClientTLS: %s", e.cfg.ClientTLSInfo) } @@ -421,13 +440,6 @@ func (e *Etcd) serve() (err error) { plog.Infof("cors = %s", e.cfg.CorsInfo) } - // Start the peer server in a goroutine - for _, pl := range e.Peers { - go func(l *peerListener) { - e.errHandler(l.serve()) - }(pl) - } - // Start a client server goroutine for each listen address var h http.Handler if e.Config().EnableV2 { @@ -458,12 +470,17 @@ func (e *Etcd) serve() (err error) { Timeout: e.cfg.GRPCKeepAliveTimeout, })) } + + // start client servers in a goroutine for _, sctx := range e.sctxs { go func(s *serveCtx) { e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, h, e.errHandler, gopts...)) }(sctx) } + return nil +} +func (e *Etcd) serveMetrics() (err error) { if len(e.cfg.ListenMetricsUrls) > 0 { metricsMux := http.NewServeMux() etcdhttp.HandleMetricsHealth(metricsMux, e.Server) @@ -484,7 +501,6 @@ func (e *Etcd) serve() (err error) { }(murl, ml) } } - return nil }