Skip to content

Commit

Permalink
embed: split peer/client/metrics serve methods
Browse files Browse the repository at this point in the history
Priliminary commit to start client server later.

Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
  • Loading branch information
gyuho committed Nov 20, 2017
1 parent 08434d0 commit e93131a
Showing 1 changed file with 59 additions and 45 deletions.
104 changes: 59 additions & 45 deletions embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ import (
"github.com/coreos/etcd/pkg/transport"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/rafthttp"
"github.com/coreos/pkg/capnslog"

"github.com/coreos/pkg/capnslog"
"github.com/soheilhy/cmux"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
Expand All @@ -66,13 +66,13 @@ const (
type Etcd struct {
Peers []*peerListener
Clients []net.Listener
clientServers map[string]*serveCtx
metricsListeners []net.Listener
Server *etcdserver.EtcdServer

cfg Config
stopc chan struct{}
errc chan error
sctxs map[string]*serveCtx

closeOnce sync.Once
}
Expand All @@ -99,7 +99,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
}
if !serving {
// errored before starting gRPC server for serveCtx.grpcServerC
for _, sctx := range e.sctxs {
for _, sctx := range e.clientServers {
close(sctx.grpcServerC)
}
}
Expand All @@ -110,10 +110,10 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
if e.Peers, err = startPeerListeners(cfg); err != nil {
return e, err
}
if e.sctxs, err = startClientListeners(cfg); err != nil {
if e.clientServers, err = startClientListeners(cfg); err != nil {
return e, err
}
for _, sctx := range e.sctxs {
for _, sctx := range e.clientServers {
e.Clients = append(e.Clients, sctx.l)
}

Expand Down Expand Up @@ -181,41 +181,20 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
}

// buffer channel so goroutines on closed connections won't wait forever
e.errc = make(chan error, len(e.Peers)+len(e.Clients)+2*len(e.sctxs))
e.errc = make(chan error, len(e.Peers)+len(e.Clients)+2*len(e.clientServers))

e.Server.Start()

// configure peer handlers after rafthttp.Transport started
ph := etcdhttp.NewPeerHandler(e.Server)
var peerTLScfg *tls.Config
if !cfg.PeerTLSInfo.Empty() {
if peerTLScfg, err = cfg.PeerTLSInfo.ServerConfig(); err != nil {
return e, err
}
if err = e.servePeers(); err != nil {
return e, err
}
for _, p := range e.Peers {
gs := v3rpc.Server(e.Server, peerTLScfg)
m := cmux.New(p.Listener)
go gs.Serve(m.Match(cmux.HTTP2()))
srv := &http.Server{
Handler: grpcHandlerFunc(gs, ph),
ReadTimeout: 5 * time.Minute,
ErrorLog: defaultLog.New(ioutil.Discard, "", 0), // do not log user error
}
go srv.Serve(m.Match(cmux.Any()))
p.serve = func() error { return m.Serve() }
p.close = func(ctx context.Context) error {
// gracefully shutdown http.Server
// close open listeners, idle connections
// until context cancel or time-out
e.stopGRPCServer(gs)
return srv.Shutdown(ctx)
}
if err = e.serveClients(); err != nil {
return e, err
}

if err = e.serve(); err != nil {
if err = e.serveMetrics(); err != nil {
return e, err
}

serving = true
return e, nil
}
Expand All @@ -228,13 +207,13 @@ func (e *Etcd) Config() Config {
func (e *Etcd) Close() {
e.closeOnce.Do(func() { close(e.stopc) })

for _, sctx := range e.sctxs {
for _, sctx := range e.clientServers {
for gs := range sctx.grpcServerC {
e.stopGRPCServer(gs)
}
}

for _, sctx := range e.sctxs {
for _, sctx := range e.clientServers {
sctx.cancel()
}
for i := range e.Clients {
Expand Down Expand Up @@ -331,6 +310,44 @@ func startPeerListeners(cfg *Config) (peers []*peerListener, err error) {
return peers, nil
}

// configure peer handlers after rafthttp.Transport started
func (e *Etcd) servePeers() (err error) {
ph := etcdhttp.NewPeerHandler(e.Server)
var peerTLScfg *tls.Config
if !e.cfg.PeerTLSInfo.Empty() {
if peerTLScfg, err = e.cfg.PeerTLSInfo.ServerConfig(); err != nil {
return err
}
}
for _, p := range e.Peers {
gs := v3rpc.Server(e.Server, peerTLScfg)
m := cmux.New(p.Listener)
go gs.Serve(m.Match(cmux.HTTP2()))
srv := &http.Server{
Handler: grpcHandlerFunc(gs, ph),
ReadTimeout: 5 * time.Minute,
ErrorLog: defaultLog.New(ioutil.Discard, "", 0), // do not log user error
}
go srv.Serve(m.Match(cmux.Any()))
p.serve = func() error { return m.Serve() }
p.close = func(ctx context.Context) error {
// gracefully shutdown http.Server
// close open listeners, idle connections
// until context cancel or time-out
e.stopGRPCServer(gs)
return srv.Shutdown(ctx)
}
}

// start peer servers in a goroutine
for _, pl := range e.Peers {
go func(l *peerListener) {
e.errHandler(l.serve())
}(pl)
}
return nil
}

func startClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err error) {
if err = cfg.ClientSelfCert(); err != nil {
plog.Fatalf("could not get certs (%v)", err)
Expand Down Expand Up @@ -412,7 +429,7 @@ func startClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err error) {
return sctxs, nil
}

func (e *Etcd) serve() (err error) {
func (e *Etcd) serveClients() (err error) {
if !e.cfg.ClientTLSInfo.Empty() {
plog.Infof("ClientTLS: %s", e.cfg.ClientTLSInfo)
}
Expand All @@ -421,13 +438,6 @@ func (e *Etcd) serve() (err error) {
plog.Infof("cors = %s", e.cfg.CorsInfo)
}

// Start the peer server in a goroutine
for _, pl := range e.Peers {
go func(l *peerListener) {
e.errHandler(l.serve())
}(pl)
}

// Start a client server goroutine for each listen address
var h http.Handler
if e.Config().EnableV2 {
Expand Down Expand Up @@ -458,12 +468,17 @@ func (e *Etcd) serve() (err error) {
Timeout: e.cfg.GRPCKeepAliveTimeout,
}))
}
for _, sctx := range e.sctxs {

// start client servers in a goroutine
for _, sctx := range e.clientServers {
go func(s *serveCtx) {
e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, h, e.errHandler, gopts...))
}(sctx)
}
return nil
}

func (e *Etcd) serveMetrics() (err error) {
if len(e.cfg.ListenMetricsUrls) > 0 {
metricsMux := http.NewServeMux()
etcdhttp.HandleMetricsHealth(metricsMux, e.Server)
Expand All @@ -484,7 +499,6 @@ func (e *Etcd) serve() (err error) {
}(murl, ml)
}
}

return nil
}

Expand Down

0 comments on commit e93131a

Please sign in to comment.