Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

etcdserver,embed: clean up/reorganize client/peer/corrupt handlers #8898

Merged
merged 4 commits into from
Nov 21, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 57 additions & 41 deletions embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ import (
"github.com/coreos/etcd/pkg/transport"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/rafthttp"
"github.com/coreos/pkg/capnslog"

"github.com/coreos/pkg/capnslog"
"github.com/soheilhy/cmux"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
Expand All @@ -64,15 +64,17 @@ const (

// Etcd contains a running etcd server and its listeners.
type Etcd struct {
Peers []*peerListener
Clients []net.Listener
Peers []*peerListener
Clients []net.Listener
// a map of contexts for the servers that serves client requests.
sctxs map[string]*serveCtx
metricsListeners []net.Listener
Server *etcdserver.EtcdServer

Server *etcdserver.EtcdServer

cfg Config
stopc chan struct{}
errc chan error
sctxs map[string]*serveCtx

closeOnce sync.Once
}
Expand Down Expand Up @@ -185,37 +187,16 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {

e.Server.Start()

// configure peer handlers after rafthttp.Transport started
ph := etcdhttp.NewPeerHandler(e.Server)
var peerTLScfg *tls.Config
if !cfg.PeerTLSInfo.Empty() {
if peerTLScfg, err = cfg.PeerTLSInfo.ServerConfig(); err != nil {
return e, err
}
if err = e.servePeers(); err != nil {
return e, err
}
for _, p := range e.Peers {
gs := v3rpc.Server(e.Server, peerTLScfg)
m := cmux.New(p.Listener)
go gs.Serve(m.Match(cmux.HTTP2()))
srv := &http.Server{
Handler: grpcHandlerFunc(gs, ph),
ReadTimeout: 5 * time.Minute,
ErrorLog: defaultLog.New(ioutil.Discard, "", 0), // do not log user error
}
go srv.Serve(m.Match(cmux.Any()))
p.serve = func() error { return m.Serve() }
p.close = func(ctx context.Context) error {
// gracefully shutdown http.Server
// close open listeners, idle connections
// until context cancel or time-out
e.stopGRPCServer(gs)
return srv.Shutdown(ctx)
}
if err = e.serveClients(); err != nil {
return e, err
}

if err = e.serve(); err != nil {
if err = e.serveMetrics(); err != nil {
return e, err
}

serving = true
return e, nil
}
Expand Down Expand Up @@ -331,6 +312,44 @@ func startPeerListeners(cfg *Config) (peers []*peerListener, err error) {
return peers, nil
}

// configure peer handlers after rafthttp.Transport started
func (e *Etcd) servePeers() (err error) {
ph := etcdhttp.NewPeerHandler(e.Server)
var peerTLScfg *tls.Config
if !e.cfg.PeerTLSInfo.Empty() {
if peerTLScfg, err = e.cfg.PeerTLSInfo.ServerConfig(); err != nil {
return err
}
}
for _, p := range e.Peers {
gs := v3rpc.Server(e.Server, peerTLScfg)
m := cmux.New(p.Listener)
go gs.Serve(m.Match(cmux.HTTP2()))
srv := &http.Server{
Handler: grpcHandlerFunc(gs, ph),
ReadTimeout: 5 * time.Minute,
ErrorLog: defaultLog.New(ioutil.Discard, "", 0), // do not log user error
}
go srv.Serve(m.Match(cmux.Any()))
p.serve = func() error { return m.Serve() }
p.close = func(ctx context.Context) error {
// gracefully shutdown http.Server
// close open listeners, idle connections
// until context cancel or time-out
e.stopGRPCServer(gs)
return srv.Shutdown(ctx)
}
}

// start peer servers in a goroutine
for _, pl := range e.Peers {
go func(l *peerListener) {
e.errHandler(l.serve())
}(pl)
}
return nil
}

func startClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err error) {
if err = cfg.ClientSelfCert(); err != nil {
plog.Fatalf("could not get certs (%v)", err)
Expand Down Expand Up @@ -412,7 +431,7 @@ func startClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err error) {
return sctxs, nil
}

func (e *Etcd) serve() (err error) {
func (e *Etcd) serveClients() (err error) {
if !e.cfg.ClientTLSInfo.Empty() {
plog.Infof("ClientTLS: %s", e.cfg.ClientTLSInfo)
}
Expand All @@ -421,13 +440,6 @@ func (e *Etcd) serve() (err error) {
plog.Infof("cors = %s", e.cfg.CorsInfo)
}

// Start the peer server in a goroutine
for _, pl := range e.Peers {
go func(l *peerListener) {
e.errHandler(l.serve())
}(pl)
}

// Start a client server goroutine for each listen address
var h http.Handler
if e.Config().EnableV2 {
Expand Down Expand Up @@ -458,12 +470,17 @@ func (e *Etcd) serve() (err error) {
Timeout: e.cfg.GRPCKeepAliveTimeout,
}))
}

// start client servers in a goroutine
for _, sctx := range e.sctxs {
go func(s *serveCtx) {
e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, h, e.errHandler, gopts...))
}(sctx)
}
return nil
}

func (e *Etcd) serveMetrics() (err error) {
if len(e.cfg.ListenMetricsUrls) > 0 {
metricsMux := http.NewServeMux()
etcdhttp.HandleMetricsHealth(metricsMux, e.Server)
Expand All @@ -484,7 +501,6 @@ func (e *Etcd) serve() (err error) {
}(murl, ml)
}
}

return nil
}

Expand Down
70 changes: 42 additions & 28 deletions etcdserver/corrupt.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,34 +50,7 @@ func (s *EtcdServer) checkHashKV() error {
if err != nil {
plog.Fatalf("failed to hash kv store (%v)", err)
}
resps := []*clientv3.HashKVResponse{}
for _, m := range s.cluster.Members() {
if m.ID == s.ID() {
continue
}

cli, cerr := clientv3.New(clientv3.Config{Endpoints: m.PeerURLs})
if cerr != nil {
continue
}

respsLen := len(resps)
for _, c := range cli.Endpoints() {
ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
resp, herr := cli.HashKV(ctx, c, rev)
cancel()
if herr == nil {
cerr = herr
resps = append(resps, resp)
break
}
}
cli.Close()

if respsLen == len(resps) {
plog.Warningf("failed to hash kv for peer %s (%v)", types.ID(m.ID), cerr)
}
}
resps := s.getPeerHashKVs(rev)

ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
err = s.linearizableReadNotify(ctx)
Expand Down Expand Up @@ -115,6 +88,8 @@ func (s *EtcdServer) checkHashKV() error {

for _, resp := range resps {
id := resp.Header.MemberId

// leader expects follower's latest revision less than or equal to leader's
if resp.Header.Revision > rev2 {
plog.Warningf(
"revision %d from member %v, expected at most %d",
Expand All @@ -123,6 +98,8 @@ func (s *EtcdServer) checkHashKV() error {
rev2)
mismatch(id)
}

// leader expects follower's latest compact revision less than or equal to leader's
if resp.CompactRevision > crev2 {
plog.Warningf(
"compact revision %d from member %v, expected at most %d",
Expand All @@ -132,6 +109,8 @@ func (s *EtcdServer) checkHashKV() error {
)
mismatch(id)
}

// follower's compact revision is leader's old one, then hashes must match
if resp.CompactRevision == crev && resp.Hash != h {
plog.Warningf(
"hash %d at revision %d from member %v, expected hash %d",
Expand All @@ -146,6 +125,41 @@ func (s *EtcdServer) checkHashKV() error {
return nil
}

func (s *EtcdServer) getPeerHashKVs(rev int64) (resps []*clientv3.HashKVResponse) {
for _, m := range s.cluster.Members() {
if m.ID == s.ID() {
continue
}

cli, cerr := clientv3.New(clientv3.Config{
DialTimeout: s.Cfg.ReqTimeout(),
Endpoints: m.PeerURLs,
})
if cerr != nil {
plog.Warningf("%s failed to create client to peer %s for hash checking (%q)", s.ID(), types.ID(m.ID), cerr.Error())
continue
}

respsLen := len(resps)
for _, c := range cli.Endpoints() {
ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
resp, herr := cli.HashKV(ctx, c, rev)
cancel()
if herr == nil {
cerr = herr
resps = append(resps, resp)
break
}
}
cli.Close()

if respsLen == len(resps) {
plog.Warningf("%s failed to hash kv for peer %s (%v)", s.ID(), types.ID(m.ID), cerr)
}
}
return resps
}

type applierV3Corrupt struct {
applierV3
}
Expand Down