diff --git a/embed/etcd.go b/embed/etcd.go index a44ba5e2805..223a8aaeaa8 100644 --- a/embed/etcd.go +++ b/embed/etcd.go @@ -424,18 +424,20 @@ func (e *Etcd) Close() { } func stopServers(ctx context.Context, ss *servers) { - shutdownNow := func() { - // first, close the http.Server + // first, close the http.Server + if ss.http != nil { ss.http.Shutdown(ctx) - // then close grpc.Server; cancels all active RPCs - ss.grpc.Stop() + } + + if ss.grpc == nil { + return } // do not grpc.Server.GracefulStop with TLS enabled etcd server // See https://github.com/grpc/grpc-go/issues/1384#issuecomment-317124531 // and https://github.com/etcd-io/etcd/issues/8916 if ss.secure && ss.http != nil { - shutdownNow() + ss.grpc.Stop() return } @@ -453,7 +455,7 @@ func stopServers(ctx context.Context, ss *servers) { case <-ctx.Done(): // took too long, manually close open transports // e.g. watch streams - shutdownNow() + ss.grpc.Stop() // concurrent GracefulStop should be interrupted <-ch diff --git a/tests/e2e/cluster_test.go b/tests/e2e/cluster_test.go index 9186accd181..f13fc88c590 100644 --- a/tests/e2e/cluster_test.go +++ b/tests/e2e/cluster_test.go @@ -133,6 +133,8 @@ type etcdProcessClusterConfig struct { WatchProcessNotifyInterval time.Duration debug bool + + stopSignal os.Signal } // newEtcdProcessCluster launches a new cluster from etcd processes, returning @@ -151,12 +153,20 @@ func newEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster, epc.Close() return nil, err } + epc.procs[i] = proc } if err := epc.Start(); err != nil { return nil, err } + + // overwrite the default signal + if cfg.stopSignal != nil { + for _, proc := range epc.procs { + proc.WithStopSignal(cfg.stopSignal) + } + } return epc, nil } diff --git a/tests/e2e/cmux_test.go b/tests/e2e/cmux_test.go index df9ee3ac114..8d394b31c66 100644 --- a/tests/e2e/cmux_test.go +++ b/tests/e2e/cmux_test.go @@ -21,6 +21,7 @@ import ( "context" "encoding/json" "fmt" + "syscall" "testing" "github.com/stretchr/testify/assert" @@ -63,10 +64,19 @@ func TestConnectionMultiplexing(t *testing.T) { } { t.Run(tc.name, func(t *testing.T) { ctx := context.Background() - cfg := etcdProcessClusterConfig{clusterSize: 1, clientTLS: tc.serverTLS, enableV2: true, clientHttpSeparate: tc.separateHttpPort} + cfg := etcdProcessClusterConfig{ + clusterSize: 1, + clientTLS: tc.serverTLS, + enableV2: true, + clientHttpSeparate: tc.separateHttpPort, + stopSignal: syscall.SIGTERM, // check graceful stop + } clus, err := newEtcdProcessCluster(&cfg) require.NoError(t, err) - defer clus.Close() + + defer func() { + require.NoError(t, clus.Close()) + }() var clientScenarios []clientConnType switch tc.serverTLS {