From a716a9fa44c3a8d19f81f15a8d33e3efcb1ee6a4 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Fri, 3 Mar 2023 02:09:00 +0800 Subject: [PATCH 1/7] etcdutil: add dial keep alive params to switch connect as soon as possible Signed-off-by: lhy1024 --- pkg/utils/etcdutil/etcdutil.go | 21 +++- pkg/utils/etcdutil/etcdutil_test.go | 188 ++++++++++++++++++++-------- 2 files changed, 148 insertions(+), 61 deletions(-) diff --git a/pkg/utils/etcdutil/etcdutil.go b/pkg/utils/etcdutil/etcdutil.go index b055f79e75c..ef987609f33 100644 --- a/pkg/utils/etcdutil/etcdutil.go +++ b/pkg/utils/etcdutil/etcdutil.go @@ -41,6 +41,13 @@ const ( // defaultAutoSyncInterval is the interval to sync etcd cluster. defaultAutoSyncInterval = 60 * time.Second + // defaultDialKeepAliveTime is the time after which client pings the server to see if transport is alive. + defaultDialKeepAliveTime = 10 * time.Second + + // defaultDialKeepAliveTimeout is the time that the client waits for a response for the + // keep-alive probe. If the response is not received in this time, the connection is closed. + defaultDialKeepAliveTimeout = 3 * time.Second + // DefaultDialTimeout is the maximum amount of time a dial will wait for a // connection to setup. 30s is long enough for most of the network conditions. DefaultDialTimeout = 30 * time.Second @@ -216,13 +223,15 @@ func createEtcdClient(tlsConfig *tls.Config, acUrls []url.URL) (*clientv3.Client autoSyncInterval = 10 * time.Millisecond }) client, err := clientv3.New(clientv3.Config{ - Endpoints: endpoints, - DialTimeout: defaultEtcdClientTimeout, - AutoSyncInterval: autoSyncInterval, - TLS: tlsConfig, - LogConfig: &lgc, + Endpoints: endpoints, + DialTimeout: defaultEtcdClientTimeout, + AutoSyncInterval: autoSyncInterval, + TLS: tlsConfig, + LogConfig: &lgc, + DialKeepAliveTime: defaultDialKeepAliveTime, + DialKeepAliveTimeout: defaultDialKeepAliveTimeout, }) - if err != nil { + if err == nil { log.Info("create etcd v3 client", zap.Strings("endpoints", endpoints)) } return client, err diff --git a/pkg/utils/etcdutil/etcdutil_test.go b/pkg/utils/etcdutil/etcdutil_test.go index 79348e02eaf..6e4976698c2 100644 --- a/pkg/utils/etcdutil/etcdutil_test.go +++ b/pkg/utils/etcdutil/etcdutil_test.go @@ -18,13 +18,19 @@ import ( "context" "crypto/tls" "fmt" + "io" + "net" + "strings" + "sync/atomic" "testing" "time" "github.com/pingcap/failpoint" "github.com/stretchr/testify/require" + "github.com/tikv/pd/pkg/utils/tempurl" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/embed" + "go.etcd.io/etcd/etcdserver/etcdserverpb" "go.etcd.io/etcd/pkg/types" ) @@ -54,43 +60,15 @@ func TestMemberHelpers(t *testing.T) { re.Equal(uint64(etcd1.Server.ID()), listResp1.Members[0].ID) // Test AddEtcdMember - // Make a new etcd config. - cfg2 := NewTestSingleConfig(t) - cfg2.Name = "etcd2" - cfg2.InitialCluster = cfg1.InitialCluster + fmt.Sprintf(",%s=%s", cfg2.Name, &cfg2.LPUrls[0]) - cfg2.ClusterState = embed.ClusterStateFlagExisting - - // Add it to the cluster above. - peerURL := cfg2.LPUrls[0].String() - addResp, err := AddEtcdMember(client1, []string{peerURL}) - re.NoError(err) - - etcd2, err := embed.StartEtcd(cfg2) - defer func() { - etcd2.Close() - }() - re.NoError(err) - re.Equal(uint64(etcd2.Server.ID()), addResp.Member.ID) - + etcd2 := checkAddEtcdMember(t, cfg1, client1) + cfg2 := etcd2.Config() + defer etcd2.Close() ep2 := cfg2.LCUrls[0].String() client2, err := clientv3.New(clientv3.Config{ Endpoints: []string{ep2}, }) re.NoError(err) - - <-etcd2.Server.ReadyNotify() - - listResp2, err := ListEtcdMembers(client2) - re.NoError(err) - re.Len(listResp2.Members, 2) - for _, m := range listResp2.Members { - switch m.ID { - case uint64(etcd1.Server.ID()): - case uint64(etcd2.Server.ID()): - default: - t.Fatalf("unknown member: %v", m) - } - } + checkMembers(re, client2, []*embed.Etcd{etcd1, etcd2}) // Test CheckClusterID urlsMap, err := types.NewURLsMap(cfg2.InitialCluster) @@ -252,45 +230,145 @@ func TestEtcdClientSync(t *testing.T) { <-etcd1.Server.ReadyNotify() // Add a new member. + etcd2 := checkAddEtcdMember(t, cfg1, client1) + defer etcd2.Close() + checkMembers(re, client1, []*embed.Etcd{etcd1, etcd2}) + + // Remove the first member and close the etcd1. + _, err = RemoveEtcdMember(client1, uint64(etcd1.Server.ID())) + re.NoError(err) + time.Sleep(20 * time.Millisecond) // wait for etcd client sync endpoints and client will be connected to etcd2 + etcd1.Close() + + // Check the client can get the new member with the new endpoints. + listResp3, err := ListEtcdMembers(client1) + re.NoError(err) + re.Len(listResp3.Members, 1) + re.Equal(uint64(etcd2.Server.ID()), listResp3.Members[0].ID) + + require.NoError(t, failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/autoSyncInterval")) +} + +func TestEtcdWithHangLeader(t *testing.T) { + t.Parallel() + re := require.New(t) + // Start a etcd server. + cfg1 := NewTestSingleConfig(t) + etcd1, err := embed.StartEtcd(cfg1) + re.NoError(err) + ep1 := cfg1.LCUrls[0].String() + <-etcd1.Server.ReadyNotify() + + // Create a proxy to etcd1. + proxyAddr := tempurl.Alloc() + var enableDiscard atomic.Bool + go proxyWithDiscard(re, ep1, proxyAddr, &enableDiscard) + + // Create a etcd client with etcd1 as endpoint. + urls, err := types.NewURLs([]string{proxyAddr}) + re.NoError(err) + client1, err := createEtcdClient(nil, urls) + re.NoError(err) + + // Add a new member and set the client endpoints to etcd1 and etcd2. + etcd2 := checkAddEtcdMember(t, cfg1, client1) + defer etcd2.Close() + checkMembers(re, client1, []*embed.Etcd{etcd1, etcd2}) + etcd2Addr := etcd2.Config().LCUrls[0].String() + client1.SetEndpoints(proxyAddr, etcd2Addr) + + // Hang the etcd1 and wait for the client to connect to etcd2. + enableDiscard.Store(true) + time.Sleep(defaultDialKeepAliveTime + defaultDialKeepAliveTimeout*2) + _, err = EtcdKVGet(client1, "test/key1") + re.NoError(err) +} + +func checkAddEtcdMember(t *testing.T, cfg1 *embed.Config, client *clientv3.Client) *embed.Etcd { + re := require.New(t) cfg2 := NewTestSingleConfig(t) cfg2.Name = "etcd2" cfg2.InitialCluster = cfg1.InitialCluster + fmt.Sprintf(",%s=%s", cfg2.Name, &cfg2.LPUrls[0]) cfg2.ClusterState = embed.ClusterStateFlagExisting peerURL := cfg2.LPUrls[0].String() - addResp, err := AddEtcdMember(client1, []string{peerURL}) + addResp, err := AddEtcdMember(client, []string{peerURL}) re.NoError(err) etcd2, err := embed.StartEtcd(cfg2) - defer func() { - etcd2.Close() - }() re.NoError(err) re.Equal(uint64(etcd2.Server.ID()), addResp.Member.ID) <-etcd2.Server.ReadyNotify() + return etcd2 +} +func checkMembers(re *require.Assertions, client *clientv3.Client, etcds []*embed.Etcd) { // Check the client can get the new member. - listResp2, err := ListEtcdMembers(client1) - re.NoError(err) - re.Len(listResp2.Members, 2) - for _, m := range listResp2.Members { - switch m.ID { - case uint64(etcd1.Server.ID()): - case uint64(etcd2.Server.ID()): - default: - t.Fatalf("unknown member: %v", m) + listResp, err := ListEtcdMembers(client) + re.NoError(err) + re.Len(listResp.Members, len(etcds)) + inList := func(m *etcdserverpb.Member) bool { + for _, etcd := range etcds { + if m.ID == uint64(etcd.Server.ID()) { + return true + } } + return false } + for _, m := range listResp.Members { + re.True(inList(m)) + } +} - // Remove the first member and close the etcd1. - _, err = RemoveEtcdMember(client1, uint64(etcd1.Server.ID())) +func proxyWithDiscard(re *require.Assertions, server, proxy string, enableDiscard *atomic.Bool) { + server = strings.TrimPrefix(server, "http://") + proxy = strings.TrimPrefix(proxy, "http://") + l, err := net.Listen("tcp", proxy) re.NoError(err) - time.Sleep(20 * time.Millisecond) // wait for etcd client sync endpoints and client will be connected to etcd2 - etcd1.Close() + for { + connect, err := l.Accept() + re.NoError(err) + go func(connect net.Conn) { + serverConnect, err := net.Dial("tcp", server) + re.NoError(err) + pipe(connect, serverConnect, enableDiscard) + }(connect) + } +} - // Check the client can get the new member with the new endpoints. - listResp3, err := ListEtcdMembers(client1) - re.NoError(err) - re.Len(listResp3.Members, 1) - re.Equal(uint64(etcd2.Server.ID()), listResp3.Members[0].ID) +func pipe(src net.Conn, dst net.Conn, enableDiscard *atomic.Bool) { + errChan := make(chan error, 1) + go func() { + err := ioCopy(src, dst, enableDiscard) + errChan <- err + }() + go func() { + err := ioCopy(dst, src, enableDiscard) + errChan <- err + }() + <-errChan + dst.Close() + src.Close() +} - require.NoError(t, failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/autoSyncInterval")) +func ioCopy(dst io.Writer, src io.Reader, enableDiscard *atomic.Bool) (err error) { + buffer := make([]byte, 32*1024) + for { + if enableDiscard.Load() { + io.Copy(io.Discard, src) + } + readNum, errRead := src.Read(buffer) + if readNum > 0 { + writeNum, errWrite := dst.Write(buffer[0:readNum]) + if errWrite != nil { + return errWrite + } + if readNum != writeNum { + return io.ErrShortWrite + } + } + if errRead != nil { + err = errRead + break + } + } + return err } From 7924ed71a47c59bf4c3475dea59514a2946f6757 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Fri, 3 Mar 2023 10:20:53 +0800 Subject: [PATCH 2/7] address comments Signed-off-by: lhy1024 --- pkg/utils/etcdutil/etcdutil_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/utils/etcdutil/etcdutil_test.go b/pkg/utils/etcdutil/etcdutil_test.go index 6e4976698c2..e9ff2b13a06 100644 --- a/pkg/utils/etcdutil/etcdutil_test.go +++ b/pkg/utils/etcdutil/etcdutil_test.go @@ -357,7 +357,7 @@ func ioCopy(dst io.Writer, src io.Reader, enableDiscard *atomic.Bool) (err error } readNum, errRead := src.Read(buffer) if readNum > 0 { - writeNum, errWrite := dst.Write(buffer[0:readNum]) + writeNum, errWrite := dst.Write(buffer[:readNum]) if errWrite != nil { return errWrite } From f782325a4c241de507f1a8fe613e8be4ba574b64 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Fri, 3 Mar 2023 10:48:53 +0800 Subject: [PATCH 3/7] address comments Signed-off-by: lhy1024 --- pkg/utils/etcdutil/etcdutil_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/utils/etcdutil/etcdutil_test.go b/pkg/utils/etcdutil/etcdutil_test.go index e9ff2b13a06..28931918ab4 100644 --- a/pkg/utils/etcdutil/etcdutil_test.go +++ b/pkg/utils/etcdutil/etcdutil_test.go @@ -365,7 +365,7 @@ func ioCopy(dst io.Writer, src io.Reader, enableDiscard *atomic.Bool) (err error return io.ErrShortWrite } } - if errRead != nil { + if errRead != nil && errRead != io.EOF { err = errRead break } From 0e5e7a603c5263cd067534ad2c537e4ce4a90f16 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Fri, 3 Mar 2023 11:12:36 +0800 Subject: [PATCH 4/7] direct return Signed-off-by: lhy1024 --- pkg/utils/etcdutil/etcdutil_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/utils/etcdutil/etcdutil_test.go b/pkg/utils/etcdutil/etcdutil_test.go index 28931918ab4..e9ff2b13a06 100644 --- a/pkg/utils/etcdutil/etcdutil_test.go +++ b/pkg/utils/etcdutil/etcdutil_test.go @@ -365,7 +365,7 @@ func ioCopy(dst io.Writer, src io.Reader, enableDiscard *atomic.Bool) (err error return io.ErrShortWrite } } - if errRead != nil && errRead != io.EOF { + if errRead != nil { err = errRead break } From d2f41d14ecc1cf19453c475473e376b02fa9e056 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Mon, 6 Mar 2023 20:56:03 +0800 Subject: [PATCH 5/7] add rand Signed-off-by: lhy1024 --- pkg/storage/kv/kv_test.go | 27 ++------------------------- pkg/utils/etcdutil/etcdutil.go | 11 +++++++++-- pkg/utils/etcdutil/etcdutil_test.go | 25 ++++++++++++++++++------- pkg/utils/etcdutil/testutil.go | 4 +++- 4 files changed, 32 insertions(+), 35 deletions(-) diff --git a/pkg/storage/kv/kv_test.go b/pkg/storage/kv/kv_test.go index 9474eb23f4f..d2db558a748 100644 --- a/pkg/storage/kv/kv_test.go +++ b/pkg/storage/kv/kv_test.go @@ -16,22 +16,20 @@ package kv import ( "context" - "fmt" - "net/url" "path" "sort" "strconv" "testing" "github.com/stretchr/testify/require" - "github.com/tikv/pd/pkg/utils/tempurl" + "github.com/tikv/pd/pkg/utils/etcdutil" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/embed" ) func TestEtcd(t *testing.T) { re := require.New(t) - cfg := newTestSingleConfig(t) + cfg := etcdutil.NewTestSingleConfig(t) etcd, err := embed.StartEtcd(cfg) re.NoError(err) defer etcd.Close() @@ -122,27 +120,6 @@ func testRange(re *require.Assertions, kv Base) { } } -func newTestSingleConfig(t *testing.T) *embed.Config { - cfg := embed.NewConfig() - cfg.Name = "test_etcd" - cfg.Dir = t.TempDir() - cfg.WalDir = "" - cfg.Logger = "zap" - cfg.LogOutputs = []string{"stdout"} - - pu, _ := url.Parse(tempurl.Alloc()) - cfg.LPUrls = []url.URL{*pu} - cfg.APUrls = cfg.LPUrls - cu, _ := url.Parse(tempurl.Alloc()) - cfg.LCUrls = []url.URL{*cu} - cfg.ACUrls = cfg.LCUrls - - cfg.StrictReconfigCheck = false - cfg.InitialCluster = fmt.Sprintf("%s=%s", cfg.Name, &cfg.LPUrls[0]) - cfg.ClusterState = embed.ClusterStateFlagNew - return cfg -} - func testSaveMultiple(re *require.Assertions, kv Base, count int) { err := kv.RunInTxn(context.Background(), func(txn Txn) error { var saveErr error diff --git a/pkg/utils/etcdutil/etcdutil.go b/pkg/utils/etcdutil/etcdutil.go index ef987609f33..d94205f3051 100644 --- a/pkg/utils/etcdutil/etcdutil.go +++ b/pkg/utils/etcdutil/etcdutil.go @@ -219,17 +219,24 @@ func createEtcdClient(tlsConfig *tls.Config, acUrls []url.URL) (*clientv3.Client lgc := zap.NewProductionConfig() lgc.Encoding = log.ZapEncodingName autoSyncInterval := defaultAutoSyncInterval + dialKeepAliveTime := defaultDialKeepAliveTime + dialKeepAliveTimeout := defaultDialKeepAliveTimeout failpoint.Inject("autoSyncInterval", func() { autoSyncInterval = 10 * time.Millisecond }) + failpoint.Inject("closeKeepAliveCheck", func() { + autoSyncInterval = 0 + dialKeepAliveTime = 0 + dialKeepAliveTimeout = 0 + }) client, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: defaultEtcdClientTimeout, AutoSyncInterval: autoSyncInterval, TLS: tlsConfig, LogConfig: &lgc, - DialKeepAliveTime: defaultDialKeepAliveTime, - DialKeepAliveTimeout: defaultDialKeepAliveTimeout, + DialKeepAliveTime: dialKeepAliveTime, + DialKeepAliveTimeout: dialKeepAliveTimeout, }) if err == nil { log.Info("create etcd v3 client", zap.Strings("endpoints", endpoints)) diff --git a/pkg/utils/etcdutil/etcdutil_test.go b/pkg/utils/etcdutil/etcdutil_test.go index e9ff2b13a06..276b1686498 100644 --- a/pkg/utils/etcdutil/etcdutil_test.go +++ b/pkg/utils/etcdutil/etcdutil_test.go @@ -20,6 +20,7 @@ import ( "fmt" "io" "net" + "strconv" "strings" "sync/atomic" "testing" @@ -212,7 +213,6 @@ func TestInitClusterID(t *testing.T) { } func TestEtcdClientSync(t *testing.T) { - t.Parallel() re := require.New(t) re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/autoSyncInterval", "return(true)")) @@ -249,8 +249,20 @@ func TestEtcdClientSync(t *testing.T) { require.NoError(t, failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/autoSyncInterval")) } -func TestEtcdWithHangLeader(t *testing.T) { - t.Parallel() +func TestEtcdWithHangLeaderEnableCheck(t *testing.T) { + re := require.New(t) + var err error + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/autoSyncInterval", "return(true)")) + err = checkEtcdWithHangLeader(t) + re.NoError(err) + require.NoError(t, failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/autoSyncInterval")) + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/closeKeepAliveCheck", "return(true)")) + err = checkEtcdWithHangLeader(t) + re.Error(err) + require.NoError(t, failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/closeKeepAliveCheck")) +} + +func checkEtcdWithHangLeader(t *testing.T) error { re := require.New(t) // Start a etcd server. cfg1 := NewTestSingleConfig(t) @@ -274,20 +286,19 @@ func TestEtcdWithHangLeader(t *testing.T) { etcd2 := checkAddEtcdMember(t, cfg1, client1) defer etcd2.Close() checkMembers(re, client1, []*embed.Etcd{etcd1, etcd2}) - etcd2Addr := etcd2.Config().LCUrls[0].String() - client1.SetEndpoints(proxyAddr, etcd2Addr) + time.Sleep(1 * time.Second) // wait for etcd client sync endpoints // Hang the etcd1 and wait for the client to connect to etcd2. enableDiscard.Store(true) time.Sleep(defaultDialKeepAliveTime + defaultDialKeepAliveTimeout*2) _, err = EtcdKVGet(client1, "test/key1") - re.NoError(err) + return err } func checkAddEtcdMember(t *testing.T, cfg1 *embed.Config, client *clientv3.Client) *embed.Etcd { re := require.New(t) cfg2 := NewTestSingleConfig(t) - cfg2.Name = "etcd2" + cfg2.Name = "test_etcd_" + strconv.FormatInt(time.Now().UnixNano()%1000, 10) cfg2.InitialCluster = cfg1.InitialCluster + fmt.Sprintf(",%s=%s", cfg2.Name, &cfg2.LPUrls[0]) cfg2.ClusterState = embed.ClusterStateFlagExisting peerURL := cfg2.LPUrls[0].String() diff --git a/pkg/utils/etcdutil/testutil.go b/pkg/utils/etcdutil/testutil.go index a29a64881c9..d584c6f38a4 100644 --- a/pkg/utils/etcdutil/testutil.go +++ b/pkg/utils/etcdutil/testutil.go @@ -17,7 +17,9 @@ package etcdutil import ( "fmt" "net/url" + "strconv" "testing" + "time" "github.com/tikv/pd/pkg/utils/tempurl" "go.etcd.io/etcd/embed" @@ -26,7 +28,7 @@ import ( // NewTestSingleConfig is used to create a etcd config for the unit test purpose. func NewTestSingleConfig(t *testing.T) *embed.Config { cfg := embed.NewConfig() - cfg.Name = "test_etcd" + cfg.Name = "test_etcd_" + strconv.FormatInt(time.Now().UnixNano()%1000, 10) cfg.Dir = t.TempDir() cfg.WalDir = "" cfg.Logger = "zap" From 8c89c58c4e64e66a61d0f65d54e99db0ba241aa9 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Mon, 6 Mar 2023 21:01:14 +0800 Subject: [PATCH 6/7] remove parallel Signed-off-by: lhy1024 --- pkg/utils/etcdutil/etcdutil_test.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/pkg/utils/etcdutil/etcdutil_test.go b/pkg/utils/etcdutil/etcdutil_test.go index 276b1686498..4cc680bb6be 100644 --- a/pkg/utils/etcdutil/etcdutil_test.go +++ b/pkg/utils/etcdutil/etcdutil_test.go @@ -36,7 +36,6 @@ import ( ) func TestMemberHelpers(t *testing.T) { - t.Parallel() re := require.New(t) cfg1 := NewTestSingleConfig(t) etcd1, err := embed.StartEtcd(cfg1) @@ -88,7 +87,6 @@ func TestMemberHelpers(t *testing.T) { } func TestEtcdKVGet(t *testing.T) { - t.Parallel() re := require.New(t) cfg := NewTestSingleConfig(t) etcd, err := embed.StartEtcd(cfg) @@ -139,7 +137,6 @@ func TestEtcdKVGet(t *testing.T) { } func TestEtcdKVPutWithTTL(t *testing.T) { - t.Parallel() re := require.New(t) cfg := NewTestSingleConfig(t) etcd, err := embed.StartEtcd(cfg) @@ -180,7 +177,6 @@ func TestEtcdKVPutWithTTL(t *testing.T) { } func TestInitClusterID(t *testing.T) { - t.Parallel() re := require.New(t) cfg := NewTestSingleConfig(t) etcd, err := embed.StartEtcd(cfg) From 975a400d2a82d6f3d0096c1a212ba8c43a857130 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Mon, 6 Mar 2023 22:04:35 +0800 Subject: [PATCH 7/7] refactor Signed-off-by: lhy1024 --- pkg/utils/etcdutil/etcdutil_test.go | 6 ++++-- pkg/utils/etcdutil/testutil.go | 6 +++++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/pkg/utils/etcdutil/etcdutil_test.go b/pkg/utils/etcdutil/etcdutil_test.go index 4cc680bb6be..06ac61e264e 100644 --- a/pkg/utils/etcdutil/etcdutil_test.go +++ b/pkg/utils/etcdutil/etcdutil_test.go @@ -20,7 +20,6 @@ import ( "fmt" "io" "net" - "strconv" "strings" "sync/atomic" "testing" @@ -248,10 +247,13 @@ func TestEtcdClientSync(t *testing.T) { func TestEtcdWithHangLeaderEnableCheck(t *testing.T) { re := require.New(t) var err error + // Test with enable check. re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/autoSyncInterval", "return(true)")) err = checkEtcdWithHangLeader(t) re.NoError(err) require.NoError(t, failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/autoSyncInterval")) + + // Test with disable check. re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/closeKeepAliveCheck", "return(true)")) err = checkEtcdWithHangLeader(t) re.Error(err) @@ -294,7 +296,7 @@ func checkEtcdWithHangLeader(t *testing.T) error { func checkAddEtcdMember(t *testing.T, cfg1 *embed.Config, client *clientv3.Client) *embed.Etcd { re := require.New(t) cfg2 := NewTestSingleConfig(t) - cfg2.Name = "test_etcd_" + strconv.FormatInt(time.Now().UnixNano()%1000, 10) + cfg2.Name = genRandName() cfg2.InitialCluster = cfg1.InitialCluster + fmt.Sprintf(",%s=%s", cfg2.Name, &cfg2.LPUrls[0]) cfg2.ClusterState = embed.ClusterStateFlagExisting peerURL := cfg2.LPUrls[0].String() diff --git a/pkg/utils/etcdutil/testutil.go b/pkg/utils/etcdutil/testutil.go index d584c6f38a4..971e93e1ed6 100644 --- a/pkg/utils/etcdutil/testutil.go +++ b/pkg/utils/etcdutil/testutil.go @@ -28,7 +28,7 @@ import ( // NewTestSingleConfig is used to create a etcd config for the unit test purpose. func NewTestSingleConfig(t *testing.T) *embed.Config { cfg := embed.NewConfig() - cfg.Name = "test_etcd_" + strconv.FormatInt(time.Now().UnixNano()%1000, 10) + cfg.Name = genRandName() cfg.Dir = t.TempDir() cfg.WalDir = "" cfg.Logger = "zap" @@ -46,3 +46,7 @@ func NewTestSingleConfig(t *testing.T) *embed.Config { cfg.ClusterState = embed.ClusterStateFlagNew return cfg } + +func genRandName() string { + return "test_etcd_" + strconv.FormatInt(time.Now().UnixNano()%10000, 10) +}