Skip to content

Commit

Permalink
embed: serve basic v3 grpc over peer port
Browse files Browse the repository at this point in the history
  • Loading branch information
Anthony Romano committed Aug 18, 2017
1 parent 64e97e3 commit 136d387
Showing 1 changed file with 36 additions and 25 deletions.
61 changes: 36 additions & 25 deletions embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,17 @@ import (
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/api/etcdhttp"
"github.com/coreos/etcd/etcdserver/api/v2http"
"github.com/coreos/etcd/etcdserver/api/v3rpc"
"github.com/coreos/etcd/pkg/cors"
"github.com/coreos/etcd/pkg/debugutil"
runtimeutil "github.com/coreos/etcd/pkg/runtime"
"github.com/coreos/etcd/pkg/transport"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/rafthttp"
"github.com/coreos/pkg/capnslog"

"github.com/cockroachdb/cmux"
"google.golang.org/grpc"
)

var plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "embed")
Expand Down Expand Up @@ -155,18 +159,21 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
// configure peer handlers after rafthttp.Transport started
ph := etcdhttp.NewPeerHandler(e.Server)
for _, p := range e.Peers {
gs := v3rpc.Server(e.Server, nil)
m := cmux.New(p.Listener)
srv := &http.Server{
Handler: ph,
Handler: grpcHandlerFunc(gs, ph),
ReadTimeout: 5 * time.Minute,
ErrorLog: defaultLog.New(ioutil.Discard, "", 0), // do not log user error
}

l := p.Listener
p.serve = func() error { return srv.Serve(l) }
go srv.Serve(m.Match(cmux.HTTP1()))
go gs.Serve(m.Match(cmux.HTTP2()))
p.serve = func() error { return m.Serve() }
p.close = func(ctx context.Context) error {
// gracefully shutdown http.Server
// close open listeners, idle connections
// until context cancel or time-out
e.stopGRPCServer(gs)
return srv.Shutdown(ctx)
}
}
Expand All @@ -190,29 +197,9 @@ func (e *Etcd) Config() Config {
func (e *Etcd) Close() {
e.closeOnce.Do(func() { close(e.stopc) })

timeout := 2 * time.Second
if e.Server != nil {
timeout = e.Server.Cfg.ReqTimeout()
}
for _, sctx := range e.sctxs {
for gs := range sctx.grpcServerC {
ch := make(chan struct{})
go func() {
defer close(ch)
// close listeners to stop accepting new connections,
// will block on any existing transports
gs.GracefulStop()
}()
// wait until all pending RPCs are finished
select {
case <-ch:
case <-time.After(timeout):
// took too long, manually close open transports
// e.g. watch streams
gs.Stop()
// concurrent GracefulStop should be interrupted
<-ch
}
e.stopGRPCServer(gs)
}
}

Expand Down Expand Up @@ -243,6 +230,30 @@ func (e *Etcd) Close() {
}
}

func (e *Etcd) stopGRPCServer(gs *grpc.Server) {
timeout := 2 * time.Second
if e.Server != nil {
timeout = e.Server.Cfg.ReqTimeout()
}
ch := make(chan struct{})
go func() {
defer close(ch)
// close listeners to stop accepting new connections,
// will block on any existing transports
gs.GracefulStop()
}()
// wait until all pending RPCs are finished
select {
case <-ch:
case <-time.After(timeout):
// took too long, manually close open transports
// e.g. watch streams
gs.Stop()
// concurrent GracefulStop should be interrupted
<-ch
}
}

func (e *Etcd) Err() <-chan error { return e.errc }

func startPeerListeners(cfg *Config) (peers []*peerListener, err error) {
Expand Down

0 comments on commit 136d387

Please sign in to comment.