diff --git a/embed/etcd.go b/embed/etcd.go index 64fbfac702a5..5944b112c95d 100644 --- a/embed/etcd.go +++ b/embed/etcd.go @@ -189,11 +189,27 @@ func (e *Etcd) Config() Config { func (e *Etcd) Close() { e.closeOnce.Do(func() { close(e.stopc) }) - // (gRPC server) stops accepting new connections, - // RPCs, and blocks until all pending RPCs are finished + // 2s for RPC finish time + timeout := 2*time.Second + e.Server.Cfg.ElectionTimeout() for _, sctx := range e.sctxs { for gs := range sctx.grpcServerC { - gs.GracefulStop() + ch := make(chan struct{}) + go func() { + defer close(ch) + // close listeners to stop accepting new connections, + // will block on any existing transports + gs.GracefulStop() + }() + // wait until all pending RPCs are finished + select { + case <-ch: + case <-time.After(timeout): + // took too long, manually close open transports + // e.g. watch streams + gs.Stop() + // concurrent GracefulStop should be interrupted + <-ch + } } } diff --git a/etcdserver/config.go b/etcdserver/config.go index f6ed1f1bae7a..9e4a74bae237 100644 --- a/etcdserver/config.go +++ b/etcdserver/config.go @@ -175,7 +175,7 @@ func (c *ServerConfig) ReqTimeout() time.Duration { return 5*time.Second + 2*time.Duration(c.ElectionTicks)*time.Duration(c.TickMs)*time.Millisecond } -func (c *ServerConfig) electionTimeout() time.Duration { +func (c *ServerConfig) ElectionTimeout() time.Duration { return time.Duration(c.ElectionTicks) * time.Duration(c.TickMs) * time.Millisecond } diff --git a/etcdserver/server.go b/etcdserver/server.go index 151138953d22..f24a26316824 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -1313,7 +1313,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { // promote lessor when the local member is leader and finished // applying all entries from the last term. if s.isLeader() { - s.lessor.Promote(s.Cfg.electionTimeout()) + s.lessor.Promote(s.Cfg.ElectionTimeout()) } return }