Skip to content

Commit

Permalink
embed: split peer/client/metrics serve methods
Browse files Browse the repository at this point in the history
Priliminary commit to start client server later.

Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
  • Loading branch information
gyuho committed Nov 20, 2017
1 parent 08434d0 commit 75ababa
Showing 1 changed file with 57 additions and 41 deletions.
98 changes: 57 additions & 41 deletions embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ import (
"github.com/coreos/etcd/pkg/transport"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/rafthttp"
"github.com/coreos/pkg/capnslog"

"github.com/coreos/pkg/capnslog"
"github.com/soheilhy/cmux"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
Expand All @@ -64,15 +64,17 @@ const (

// Etcd contains a running etcd server and its listeners.
type Etcd struct {
Peers []*peerListener
Clients []net.Listener
Peers []*peerListener
Clients []net.Listener
// a map of contexts for the servers that serves client requests.
sctxs map[string]*serveCtx
metricsListeners []net.Listener
Server *etcdserver.EtcdServer

Server *etcdserver.EtcdServer

cfg Config
stopc chan struct{}
errc chan error
sctxs map[string]*serveCtx

closeOnce sync.Once
}
Expand Down Expand Up @@ -185,37 +187,16 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {

e.Server.Start()

// configure peer handlers after rafthttp.Transport started
ph := etcdhttp.NewPeerHandler(e.Server)
var peerTLScfg *tls.Config
if !cfg.PeerTLSInfo.Empty() {
if peerTLScfg, err = cfg.PeerTLSInfo.ServerConfig(); err != nil {
return e, err
}
if err = e.servePeers(); err != nil {
return e, err
}
for _, p := range e.Peers {
gs := v3rpc.Server(e.Server, peerTLScfg)
m := cmux.New(p.Listener)
go gs.Serve(m.Match(cmux.HTTP2()))
srv := &http.Server{
Handler: grpcHandlerFunc(gs, ph),
ReadTimeout: 5 * time.Minute,
ErrorLog: defaultLog.New(ioutil.Discard, "", 0), // do not log user error
}
go srv.Serve(m.Match(cmux.Any()))
p.serve = func() error { return m.Serve() }
p.close = func(ctx context.Context) error {
// gracefully shutdown http.Server
// close open listeners, idle connections
// until context cancel or time-out
e.stopGRPCServer(gs)
return srv.Shutdown(ctx)
}
if err = e.serveClients(); err != nil {
return e, err
}

if err = e.serve(); err != nil {
if err = e.serveMetrics(); err != nil {
return e, err
}

serving = true
return e, nil
}
Expand Down Expand Up @@ -331,6 +312,44 @@ func startPeerListeners(cfg *Config) (peers []*peerListener, err error) {
return peers, nil
}

// configure peer handlers after rafthttp.Transport started
func (e *Etcd) servePeers() (err error) {
ph := etcdhttp.NewPeerHandler(e.Server)
var peerTLScfg *tls.Config
if !e.cfg.PeerTLSInfo.Empty() {
if peerTLScfg, err = e.cfg.PeerTLSInfo.ServerConfig(); err != nil {
return err
}
}
for _, p := range e.Peers {
gs := v3rpc.Server(e.Server, peerTLScfg)
m := cmux.New(p.Listener)
go gs.Serve(m.Match(cmux.HTTP2()))
srv := &http.Server{
Handler: grpcHandlerFunc(gs, ph),
ReadTimeout: 5 * time.Minute,
ErrorLog: defaultLog.New(ioutil.Discard, "", 0), // do not log user error
}
go srv.Serve(m.Match(cmux.Any()))
p.serve = func() error { return m.Serve() }
p.close = func(ctx context.Context) error {
// gracefully shutdown http.Server
// close open listeners, idle connections
// until context cancel or time-out
e.stopGRPCServer(gs)
return srv.Shutdown(ctx)
}
}

// start peer servers in a goroutine
for _, pl := range e.Peers {
go func(l *peerListener) {
e.errHandler(l.serve())
}(pl)
}
return nil
}

func startClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err error) {
if err = cfg.ClientSelfCert(); err != nil {
plog.Fatalf("could not get certs (%v)", err)
Expand Down Expand Up @@ -412,7 +431,7 @@ func startClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err error) {
return sctxs, nil
}

func (e *Etcd) serve() (err error) {
func (e *Etcd) serveClients() (err error) {
if !e.cfg.ClientTLSInfo.Empty() {
plog.Infof("ClientTLS: %s", e.cfg.ClientTLSInfo)
}
Expand All @@ -421,13 +440,6 @@ func (e *Etcd) serve() (err error) {
plog.Infof("cors = %s", e.cfg.CorsInfo)
}

// Start the peer server in a goroutine
for _, pl := range e.Peers {
go func(l *peerListener) {
e.errHandler(l.serve())
}(pl)
}

// Start a client server goroutine for each listen address
var h http.Handler
if e.Config().EnableV2 {
Expand Down Expand Up @@ -458,12 +470,17 @@ func (e *Etcd) serve() (err error) {
Timeout: e.cfg.GRPCKeepAliveTimeout,
}))
}

// start client servers in a goroutine
for _, sctx := range e.sctxs {
go func(s *serveCtx) {
e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, h, e.errHandler, gopts...))
}(sctx)
}
return nil
}

func (e *Etcd) serveMetrics() (err error) {
if len(e.cfg.ListenMetricsUrls) > 0 {
metricsMux := http.NewServeMux()
etcdhttp.HandleMetricsHealth(metricsMux, e.Server)
Expand All @@ -484,7 +501,6 @@ func (e *Etcd) serve() (err error) {
}(murl, ml)
}
}

return nil
}

Expand Down

0 comments on commit 75ababa

Please sign in to comment.