Skip to content

Commit

Permalink
Merge pull request #8267 from gyuho/close-server
Browse files Browse the repository at this point in the history
embed: wait up to request-timeout for pending RPCs when closing
  • Loading branch information
gyuho committed Jul 15, 2017
2 parents d283348 + 34fd848 commit 9ce7bb6
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 3 deletions.
24 changes: 21 additions & 3 deletions embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,11 +192,29 @@ func (e *Etcd) Config() Config {
func (e *Etcd) Close() {
e.closeOnce.Do(func() { close(e.stopc) })

// (gRPC server) stops accepting new connections,
// RPCs, and blocks until all pending RPCs are finished
timeout := 2 * time.Second
if e.Server != nil {
timeout = e.Server.Cfg.ReqTimeout()
}
for _, sctx := range e.sctxs {
for gs := range sctx.grpcServerC {
gs.GracefulStop()
ch := make(chan struct{})
go func() {
defer close(ch)
// close listeners to stop accepting new connections,
// will block on any existing transports
gs.GracefulStop()
}()
// wait until all pending RPCs are finished
select {
case <-ch:
case <-time.After(timeout):
// took too long, manually close open transports
// e.g. watch streams
gs.Stop()
// concurrent GracefulStop should be interrupted
<-ch
}
}
}

Expand Down
44 changes: 44 additions & 0 deletions integration/embed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@
package integration

import (
"context"
"fmt"
"net/url"
"os"
"path/filepath"
"strings"
"testing"
"time"

"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/embed"
)

Expand Down Expand Up @@ -102,6 +105,47 @@ func TestEmbedEtcd(t *testing.T) {
}
}

// TestEmbedEtcdGracefulStop ensures embedded server stops
// cutting existing transports.
func TestEmbedEtcdGracefulStop(t *testing.T) {
cfg := embed.NewConfig()

urls := newEmbedURLs(2)
setupEmbedCfg(cfg, []url.URL{urls[0]}, []url.URL{urls[1]})

cfg.Dir = filepath.Join(os.TempDir(), fmt.Sprintf("embed-etcd"))
os.RemoveAll(cfg.Dir)
defer os.RemoveAll(cfg.Dir)

e, err := embed.StartEtcd(cfg)
if err != nil {
t.Fatal(err)
}
<-e.Server.ReadyNotify() // wait for e.Server to join the cluster

cli, err := clientv3.New(clientv3.Config{Endpoints: []string{urls[0].String()}})
if err != nil {
t.Fatal(err)
}
defer cli.Close()

// open watch connection
cli.Watch(context.Background(), "foo")

donec := make(chan struct{})
go func() {
e.Close()
close(donec)
}()
select {
case err := <-e.Err():
t.Fatal(err)
case <-donec:
case <-time.After(2*time.Second + e.Server.Cfg.ReqTimeout()):
t.Fatalf("took too long to close server")
}
}

func newEmbedURLs(n int) (urls []url.URL) {
for i := 0; i < n; i++ {
u, _ := url.Parse(fmt.Sprintf("unix://localhost:%d%06d", os.Getpid(), i))
Expand Down

0 comments on commit 9ce7bb6

Please sign in to comment.