Skip to content

Commit

Permalink
embed: split peer/client/metrics serve methods
Browse files Browse the repository at this point in the history
Priliminary commit to start client server later.

Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
  • Loading branch information
gyuho committed Nov 20, 2017
1 parent 08434d0 commit ddc5e2e
Showing 1 changed file with 54 additions and 39 deletions.
93 changes: 54 additions & 39 deletions embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ import (
"github.com/coreos/etcd/pkg/transport"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/rafthttp"
"github.com/coreos/pkg/capnslog"

"github.com/coreos/pkg/capnslog"
"github.com/soheilhy/cmux"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
Expand All @@ -66,13 +66,14 @@ const (
type Etcd struct {
Peers []*peerListener
Clients []net.Listener
sctxs map[string]*serveCtx // client servers
metricsListeners []net.Listener
Server *etcdserver.EtcdServer

Server *etcdserver.EtcdServer

cfg Config
stopc chan struct{}
errc chan error
sctxs map[string]*serveCtx

closeOnce sync.Once
}
Expand Down Expand Up @@ -185,37 +186,16 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {

e.Server.Start()

// configure peer handlers after rafthttp.Transport started
ph := etcdhttp.NewPeerHandler(e.Server)
var peerTLScfg *tls.Config
if !cfg.PeerTLSInfo.Empty() {
if peerTLScfg, err = cfg.PeerTLSInfo.ServerConfig(); err != nil {
return e, err
}
if err = e.servePeers(); err != nil {
return e, err
}
for _, p := range e.Peers {
gs := v3rpc.Server(e.Server, peerTLScfg)
m := cmux.New(p.Listener)
go gs.Serve(m.Match(cmux.HTTP2()))
srv := &http.Server{
Handler: grpcHandlerFunc(gs, ph),
ReadTimeout: 5 * time.Minute,
ErrorLog: defaultLog.New(ioutil.Discard, "", 0), // do not log user error
}
go srv.Serve(m.Match(cmux.Any()))
p.serve = func() error { return m.Serve() }
p.close = func(ctx context.Context) error {
// gracefully shutdown http.Server
// close open listeners, idle connections
// until context cancel or time-out
e.stopGRPCServer(gs)
return srv.Shutdown(ctx)
}
if err = e.serveClients(); err != nil {
return e, err
}

if err = e.serve(); err != nil {
if err = e.serveMetrics(); err != nil {
return e, err
}

serving = true
return e, nil
}
Expand Down Expand Up @@ -331,6 +311,44 @@ func startPeerListeners(cfg *Config) (peers []*peerListener, err error) {
return peers, nil
}

// configure peer handlers after rafthttp.Transport started
func (e *Etcd) servePeers() (err error) {
ph := etcdhttp.NewPeerHandler(e.Server)
var peerTLScfg *tls.Config
if !e.cfg.PeerTLSInfo.Empty() {
if peerTLScfg, err = e.cfg.PeerTLSInfo.ServerConfig(); err != nil {
return err
}
}
for _, p := range e.Peers {
gs := v3rpc.Server(e.Server, peerTLScfg)
m := cmux.New(p.Listener)
go gs.Serve(m.Match(cmux.HTTP2()))
srv := &http.Server{
Handler: grpcHandlerFunc(gs, ph),
ReadTimeout: 5 * time.Minute,
ErrorLog: defaultLog.New(ioutil.Discard, "", 0), // do not log user error
}
go srv.Serve(m.Match(cmux.Any()))
p.serve = func() error { return m.Serve() }
p.close = func(ctx context.Context) error {
// gracefully shutdown http.Server
// close open listeners, idle connections
// until context cancel or time-out
e.stopGRPCServer(gs)
return srv.Shutdown(ctx)
}
}

// start peer servers in a goroutine
for _, pl := range e.Peers {
go func(l *peerListener) {
e.errHandler(l.serve())
}(pl)
}
return nil
}

func startClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err error) {
if err = cfg.ClientSelfCert(); err != nil {
plog.Fatalf("could not get certs (%v)", err)
Expand Down Expand Up @@ -412,7 +430,7 @@ func startClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err error) {
return sctxs, nil
}

func (e *Etcd) serve() (err error) {
func (e *Etcd) serveClients() (err error) {
if !e.cfg.ClientTLSInfo.Empty() {
plog.Infof("ClientTLS: %s", e.cfg.ClientTLSInfo)
}
Expand All @@ -421,13 +439,6 @@ func (e *Etcd) serve() (err error) {
plog.Infof("cors = %s", e.cfg.CorsInfo)
}

// Start the peer server in a goroutine
for _, pl := range e.Peers {
go func(l *peerListener) {
e.errHandler(l.serve())
}(pl)
}

// Start a client server goroutine for each listen address
var h http.Handler
if e.Config().EnableV2 {
Expand Down Expand Up @@ -458,12 +469,17 @@ func (e *Etcd) serve() (err error) {
Timeout: e.cfg.GRPCKeepAliveTimeout,
}))
}

// start client servers in a goroutine
for _, sctx := range e.sctxs {
go func(s *serveCtx) {
e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, h, e.errHandler, gopts...))
}(sctx)
}
return nil
}

func (e *Etcd) serveMetrics() (err error) {
if len(e.cfg.ListenMetricsUrls) > 0 {
metricsMux := http.NewServeMux()
etcdhttp.HandleMetricsHealth(metricsMux, e.Server)
Expand All @@ -484,7 +500,6 @@ func (e *Etcd) serve() (err error) {
}(murl, ml)
}
}

return nil
}

Expand Down

0 comments on commit ddc5e2e

Please sign in to comment.