From e93131afc0e861823f221a98023e7a4081708c47 Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Mon, 20 Nov 2017 09:35:39 -0800 Subject: [PATCH] embed: split peer/client/metrics serve methods Priliminary commit to start client server later. Signed-off-by: Gyu-Ho Lee --- embed/etcd.go | 104 ++++++++++++++++++++++++++++---------------------- 1 file changed, 59 insertions(+), 45 deletions(-) diff --git a/embed/etcd.go b/embed/etcd.go index 5e00ce13f1b8..0f0d324f18ae 100644 --- a/embed/etcd.go +++ b/embed/etcd.go @@ -39,8 +39,8 @@ import ( "github.com/coreos/etcd/pkg/transport" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/rafthttp" - "github.com/coreos/pkg/capnslog" + "github.com/coreos/pkg/capnslog" "github.com/soheilhy/cmux" "google.golang.org/grpc" "google.golang.org/grpc/keepalive" @@ -66,13 +66,13 @@ const ( type Etcd struct { Peers []*peerListener Clients []net.Listener + clientServers map[string]*serveCtx metricsListeners []net.Listener Server *etcdserver.EtcdServer cfg Config stopc chan struct{} errc chan error - sctxs map[string]*serveCtx closeOnce sync.Once } @@ -99,7 +99,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { } if !serving { // errored before starting gRPC server for serveCtx.grpcServerC - for _, sctx := range e.sctxs { + for _, sctx := range e.clientServers { close(sctx.grpcServerC) } } @@ -110,10 +110,10 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { if e.Peers, err = startPeerListeners(cfg); err != nil { return e, err } - if e.sctxs, err = startClientListeners(cfg); err != nil { + if e.clientServers, err = startClientListeners(cfg); err != nil { return e, err } - for _, sctx := range e.sctxs { + for _, sctx := range e.clientServers { e.Clients = append(e.Clients, sctx.l) } @@ -181,41 +181,20 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { } // buffer channel so goroutines on closed connections won't wait forever - e.errc = make(chan error, len(e.Peers)+len(e.Clients)+2*len(e.sctxs)) + e.errc = make(chan error, len(e.Peers)+len(e.Clients)+2*len(e.clientServers)) e.Server.Start() - // configure peer handlers after rafthttp.Transport started - ph := etcdhttp.NewPeerHandler(e.Server) - var peerTLScfg *tls.Config - if !cfg.PeerTLSInfo.Empty() { - if peerTLScfg, err = cfg.PeerTLSInfo.ServerConfig(); err != nil { - return e, err - } + if err = e.servePeers(); err != nil { + return e, err } - for _, p := range e.Peers { - gs := v3rpc.Server(e.Server, peerTLScfg) - m := cmux.New(p.Listener) - go gs.Serve(m.Match(cmux.HTTP2())) - srv := &http.Server{ - Handler: grpcHandlerFunc(gs, ph), - ReadTimeout: 5 * time.Minute, - ErrorLog: defaultLog.New(ioutil.Discard, "", 0), // do not log user error - } - go srv.Serve(m.Match(cmux.Any())) - p.serve = func() error { return m.Serve() } - p.close = func(ctx context.Context) error { - // gracefully shutdown http.Server - // close open listeners, idle connections - // until context cancel or time-out - e.stopGRPCServer(gs) - return srv.Shutdown(ctx) - } + if err = e.serveClients(); err != nil { + return e, err } - - if err = e.serve(); err != nil { + if err = e.serveMetrics(); err != nil { return e, err } + serving = true return e, nil } @@ -228,13 +207,13 @@ func (e *Etcd) Config() Config { func (e *Etcd) Close() { e.closeOnce.Do(func() { close(e.stopc) }) - for _, sctx := range e.sctxs { + for _, sctx := range e.clientServers { for gs := range sctx.grpcServerC { e.stopGRPCServer(gs) } } - for _, sctx := range e.sctxs { + for _, sctx := range e.clientServers { sctx.cancel() } for i := range e.Clients { @@ -331,6 +310,44 @@ func startPeerListeners(cfg *Config) (peers []*peerListener, err error) { return peers, nil } +// configure peer handlers after rafthttp.Transport started +func (e *Etcd) servePeers() (err error) { + ph := etcdhttp.NewPeerHandler(e.Server) + var peerTLScfg *tls.Config + if !e.cfg.PeerTLSInfo.Empty() { + if peerTLScfg, err = e.cfg.PeerTLSInfo.ServerConfig(); err != nil { + return err + } + } + for _, p := range e.Peers { + gs := v3rpc.Server(e.Server, peerTLScfg) + m := cmux.New(p.Listener) + go gs.Serve(m.Match(cmux.HTTP2())) + srv := &http.Server{ + Handler: grpcHandlerFunc(gs, ph), + ReadTimeout: 5 * time.Minute, + ErrorLog: defaultLog.New(ioutil.Discard, "", 0), // do not log user error + } + go srv.Serve(m.Match(cmux.Any())) + p.serve = func() error { return m.Serve() } + p.close = func(ctx context.Context) error { + // gracefully shutdown http.Server + // close open listeners, idle connections + // until context cancel or time-out + e.stopGRPCServer(gs) + return srv.Shutdown(ctx) + } + } + + // start peer servers in a goroutine + for _, pl := range e.Peers { + go func(l *peerListener) { + e.errHandler(l.serve()) + }(pl) + } + return nil +} + func startClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err error) { if err = cfg.ClientSelfCert(); err != nil { plog.Fatalf("could not get certs (%v)", err) @@ -412,7 +429,7 @@ func startClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err error) { return sctxs, nil } -func (e *Etcd) serve() (err error) { +func (e *Etcd) serveClients() (err error) { if !e.cfg.ClientTLSInfo.Empty() { plog.Infof("ClientTLS: %s", e.cfg.ClientTLSInfo) } @@ -421,13 +438,6 @@ func (e *Etcd) serve() (err error) { plog.Infof("cors = %s", e.cfg.CorsInfo) } - // Start the peer server in a goroutine - for _, pl := range e.Peers { - go func(l *peerListener) { - e.errHandler(l.serve()) - }(pl) - } - // Start a client server goroutine for each listen address var h http.Handler if e.Config().EnableV2 { @@ -458,12 +468,17 @@ func (e *Etcd) serve() (err error) { Timeout: e.cfg.GRPCKeepAliveTimeout, })) } - for _, sctx := range e.sctxs { + + // start client servers in a goroutine + for _, sctx := range e.clientServers { go func(s *serveCtx) { e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, h, e.errHandler, gopts...)) }(sctx) } + return nil +} +func (e *Etcd) serveMetrics() (err error) { if len(e.cfg.ListenMetricsUrls) > 0 { metricsMux := http.NewServeMux() etcdhttp.HandleMetricsHealth(metricsMux, e.Server) @@ -484,7 +499,6 @@ func (e *Etcd) serve() (err error) { }(murl, ml) } } - return nil }