From 905865bd2b313c0da6f3381d19cf5f78ee7d17b8 Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Fri, 14 Jul 2017 13:26:58 -0700 Subject: [PATCH] embed: wait up to election-timeout for pending RPCs when closing Both grpc.Server.Stop and grpc.Server.GracefulStop close the listeners first, to stop accepting the new connections. GracefulStop blocks until all clients close their open transports(connections). Unary RPCs only take a few seconds to finish. Stream RPCs, like watch, might never close the connections from client side, thus making gRPC server wait forever. This patch still calls GracefulStop, but waits up to 10s before manually closing the open transports. Address https://github.com/coreos/etcd/issues/8224. Signed-off-by: Gyu-Ho Lee --- embed/etcd.go | 22 +++++++++++++++++++--- etcdserver/config.go | 2 +- etcdserver/server.go | 2 +- 3 files changed, 21 insertions(+), 5 deletions(-) diff --git a/embed/etcd.go b/embed/etcd.go index 64fbfac702a5..5944b112c95d 100644 --- a/embed/etcd.go +++ b/embed/etcd.go @@ -189,11 +189,27 @@ func (e *Etcd) Config() Config { func (e *Etcd) Close() { e.closeOnce.Do(func() { close(e.stopc) }) - // (gRPC server) stops accepting new connections, - // RPCs, and blocks until all pending RPCs are finished + // 2s for RPC finish time + timeout := 2*time.Second + e.Server.Cfg.ElectionTimeout() for _, sctx := range e.sctxs { for gs := range sctx.grpcServerC { - gs.GracefulStop() + ch := make(chan struct{}) + go func() { + defer close(ch) + // close listeners to stop accepting new connections, + // will block on any existing transports + gs.GracefulStop() + }() + // wait until all pending RPCs are finished + select { + case <-ch: + case <-time.After(timeout): + // took too long, manually close open transports + // e.g. watch streams + gs.Stop() + // concurrent GracefulStop should be interrupted + <-ch + } } } diff --git a/etcdserver/config.go b/etcdserver/config.go index f6ed1f1bae7a..9e4a74bae237 100644 --- a/etcdserver/config.go +++ b/etcdserver/config.go @@ -175,7 +175,7 @@ func (c *ServerConfig) ReqTimeout() time.Duration { return 5*time.Second + 2*time.Duration(c.ElectionTicks)*time.Duration(c.TickMs)*time.Millisecond } -func (c *ServerConfig) electionTimeout() time.Duration { +func (c *ServerConfig) ElectionTimeout() time.Duration { return time.Duration(c.ElectionTicks) * time.Duration(c.TickMs) * time.Millisecond } diff --git a/etcdserver/server.go b/etcdserver/server.go index 151138953d22..f24a26316824 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -1313,7 +1313,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { // promote lessor when the local member is leader and finished // applying all entries from the last term. if s.isLeader() { - s.lessor.Promote(s.Cfg.electionTimeout()) + s.lessor.Promote(s.Cfg.ElectionTimeout()) } return }