diff --git a/embed/etcd.go b/embed/etcd.go index 5e00ce13f1b..94f3aca00ba 100644 --- a/embed/etcd.go +++ b/embed/etcd.go @@ -39,8 +39,8 @@ import ( "github.com/coreos/etcd/pkg/transport" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/rafthttp" - "github.com/coreos/pkg/capnslog" + "github.com/coreos/pkg/capnslog" "github.com/soheilhy/cmux" "google.golang.org/grpc" "google.golang.org/grpc/keepalive" @@ -64,15 +64,17 @@ const ( // Etcd contains a running etcd server and its listeners. type Etcd struct { - Peers []*peerListener - Clients []net.Listener + Peers []*peerListener + Clients []net.Listener + // a map of contexts for the servers that serves client requests. + sctxs map[string]*serveCtx metricsListeners []net.Listener - Server *etcdserver.EtcdServer + + Server *etcdserver.EtcdServer cfg Config stopc chan struct{} errc chan error - sctxs map[string]*serveCtx closeOnce sync.Once } @@ -185,37 +187,16 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { e.Server.Start() - // configure peer handlers after rafthttp.Transport started - ph := etcdhttp.NewPeerHandler(e.Server) - var peerTLScfg *tls.Config - if !cfg.PeerTLSInfo.Empty() { - if peerTLScfg, err = cfg.PeerTLSInfo.ServerConfig(); err != nil { - return e, err - } + if err = e.servePeers(); err != nil { + return e, err } - for _, p := range e.Peers { - gs := v3rpc.Server(e.Server, peerTLScfg) - m := cmux.New(p.Listener) - go gs.Serve(m.Match(cmux.HTTP2())) - srv := &http.Server{ - Handler: grpcHandlerFunc(gs, ph), - ReadTimeout: 5 * time.Minute, - ErrorLog: defaultLog.New(ioutil.Discard, "", 0), // do not log user error - } - go srv.Serve(m.Match(cmux.Any())) - p.serve = func() error { return m.Serve() } - p.close = func(ctx context.Context) error { - // gracefully shutdown http.Server - // close open listeners, idle connections - // until context cancel or time-out - e.stopGRPCServer(gs) - return srv.Shutdown(ctx) - } + if err = e.serveClients(); err != nil { + return e, err } - - if err = e.serve(); err != nil { + if err = e.serveMetrics(); err != nil { return e, err } + serving = true return e, nil } @@ -331,6 +312,44 @@ func startPeerListeners(cfg *Config) (peers []*peerListener, err error) { return peers, nil } +// configure peer handlers after rafthttp.Transport started +func (e *Etcd) servePeers() (err error) { + ph := etcdhttp.NewPeerHandler(e.Server) + var peerTLScfg *tls.Config + if !e.cfg.PeerTLSInfo.Empty() { + if peerTLScfg, err = e.cfg.PeerTLSInfo.ServerConfig(); err != nil { + return err + } + } + for _, p := range e.Peers { + gs := v3rpc.Server(e.Server, peerTLScfg) + m := cmux.New(p.Listener) + go gs.Serve(m.Match(cmux.HTTP2())) + srv := &http.Server{ + Handler: grpcHandlerFunc(gs, ph), + ReadTimeout: 5 * time.Minute, + ErrorLog: defaultLog.New(ioutil.Discard, "", 0), // do not log user error + } + go srv.Serve(m.Match(cmux.Any())) + p.serve = func() error { return m.Serve() } + p.close = func(ctx context.Context) error { + // gracefully shutdown http.Server + // close open listeners, idle connections + // until context cancel or time-out + e.stopGRPCServer(gs) + return srv.Shutdown(ctx) + } + } + + // start peer servers in a goroutine + for _, pl := range e.Peers { + go func(l *peerListener) { + e.errHandler(l.serve()) + }(pl) + } + return nil +} + func startClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err error) { if err = cfg.ClientSelfCert(); err != nil { plog.Fatalf("could not get certs (%v)", err) @@ -412,7 +431,7 @@ func startClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err error) { return sctxs, nil } -func (e *Etcd) serve() (err error) { +func (e *Etcd) serveClients() (err error) { if !e.cfg.ClientTLSInfo.Empty() { plog.Infof("ClientTLS: %s", e.cfg.ClientTLSInfo) } @@ -421,13 +440,6 @@ func (e *Etcd) serve() (err error) { plog.Infof("cors = %s", e.cfg.CorsInfo) } - // Start the peer server in a goroutine - for _, pl := range e.Peers { - go func(l *peerListener) { - e.errHandler(l.serve()) - }(pl) - } - // Start a client server goroutine for each listen address var h http.Handler if e.Config().EnableV2 { @@ -458,12 +470,17 @@ func (e *Etcd) serve() (err error) { Timeout: e.cfg.GRPCKeepAliveTimeout, })) } + + // start client servers in a goroutine for _, sctx := range e.sctxs { go func(s *serveCtx) { e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, h, e.errHandler, gopts...)) }(sctx) } + return nil +} +func (e *Etcd) serveMetrics() (err error) { if len(e.cfg.ListenMetricsUrls) > 0 { metricsMux := http.NewServeMux() etcdhttp.HandleMetricsHealth(metricsMux, e.Server) @@ -484,7 +501,6 @@ func (e *Etcd) serve() (err error) { }(murl, ml) } } - return nil } diff --git a/etcdserver/corrupt.go b/etcdserver/corrupt.go index ea614954336..035c62f041c 100644 --- a/etcdserver/corrupt.go +++ b/etcdserver/corrupt.go @@ -50,34 +50,7 @@ func (s *EtcdServer) checkHashKV() error { if err != nil { plog.Fatalf("failed to hash kv store (%v)", err) } - resps := []*clientv3.HashKVResponse{} - for _, m := range s.cluster.Members() { - if m.ID == s.ID() { - continue - } - - cli, cerr := clientv3.New(clientv3.Config{Endpoints: m.PeerURLs}) - if cerr != nil { - continue - } - - respsLen := len(resps) - for _, c := range cli.Endpoints() { - ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout()) - resp, herr := cli.HashKV(ctx, c, rev) - cancel() - if herr == nil { - cerr = herr - resps = append(resps, resp) - break - } - } - cli.Close() - - if respsLen == len(resps) { - plog.Warningf("failed to hash kv for peer %s (%v)", types.ID(m.ID), cerr) - } - } + resps := s.getPeerHashKVs(rev) ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout()) err = s.linearizableReadNotify(ctx) @@ -115,6 +88,8 @@ func (s *EtcdServer) checkHashKV() error { for _, resp := range resps { id := resp.Header.MemberId + + // leader expects follower's latest revision less than or equal to leader's if resp.Header.Revision > rev2 { plog.Warningf( "revision %d from member %v, expected at most %d", @@ -123,6 +98,8 @@ func (s *EtcdServer) checkHashKV() error { rev2) mismatch(id) } + + // leader expects follower's latest compact revision less than or equal to leader's if resp.CompactRevision > crev2 { plog.Warningf( "compact revision %d from member %v, expected at most %d", @@ -132,6 +109,8 @@ func (s *EtcdServer) checkHashKV() error { ) mismatch(id) } + + // follower's compact revision is leader's old one, then hashes must match if resp.CompactRevision == crev && resp.Hash != h { plog.Warningf( "hash %d at revision %d from member %v, expected hash %d", @@ -146,6 +125,41 @@ func (s *EtcdServer) checkHashKV() error { return nil } +func (s *EtcdServer) getPeerHashKVs(rev int64) (resps []*clientv3.HashKVResponse) { + for _, m := range s.cluster.Members() { + if m.ID == s.ID() { + continue + } + + cli, cerr := clientv3.New(clientv3.Config{ + DialTimeout: s.Cfg.ReqTimeout(), + Endpoints: m.PeerURLs, + }) + if cerr != nil { + plog.Warningf("%s failed to create client to peer %s for hash checking (%q)", s.ID(), types.ID(m.ID), cerr.Error()) + continue + } + + respsLen := len(resps) + for _, c := range cli.Endpoints() { + ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout()) + resp, herr := cli.HashKV(ctx, c, rev) + cancel() + if herr == nil { + cerr = herr + resps = append(resps, resp) + break + } + } + cli.Close() + + if respsLen == len(resps) { + plog.Warningf("%s failed to hash kv for peer %s (%v)", s.ID(), types.ID(m.ID), cerr) + } + } + return resps +} + type applierV3Corrupt struct { applierV3 }