Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

etcdutil: add dial keep alive params to switch connect as soon as possible #6059

Merged
merged 8 commits into from
Mar 7, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 2 additions & 25 deletions pkg/storage/kv/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,20 @@ package kv

import (
"context"
"fmt"
"net/url"
"path"
"sort"
"strconv"
"testing"

"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/utils/tempurl"
"github.com/tikv/pd/pkg/utils/etcdutil"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
)

func TestEtcd(t *testing.T) {
re := require.New(t)
cfg := newTestSingleConfig(t)
cfg := etcdutil.NewTestSingleConfig(t)
etcd, err := embed.StartEtcd(cfg)
re.NoError(err)
defer etcd.Close()
Expand Down Expand Up @@ -122,27 +120,6 @@ func testRange(re *require.Assertions, kv Base) {
}
}

func newTestSingleConfig(t *testing.T) *embed.Config {
cfg := embed.NewConfig()
cfg.Name = "test_etcd"
cfg.Dir = t.TempDir()
cfg.WalDir = ""
cfg.Logger = "zap"
cfg.LogOutputs = []string{"stdout"}

pu, _ := url.Parse(tempurl.Alloc())
cfg.LPUrls = []url.URL{*pu}
cfg.APUrls = cfg.LPUrls
cu, _ := url.Parse(tempurl.Alloc())
cfg.LCUrls = []url.URL{*cu}
cfg.ACUrls = cfg.LCUrls

cfg.StrictReconfigCheck = false
cfg.InitialCluster = fmt.Sprintf("%s=%s", cfg.Name, &cfg.LPUrls[0])
cfg.ClusterState = embed.ClusterStateFlagNew
return cfg
}

func testSaveMultiple(re *require.Assertions, kv Base, count int) {
err := kv.RunInTxn(context.Background(), func(txn Txn) error {
var saveErr error
Expand Down
28 changes: 22 additions & 6 deletions pkg/utils/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ const (
// defaultAutoSyncInterval is the interval to sync etcd cluster.
defaultAutoSyncInterval = 60 * time.Second

// defaultDialKeepAliveTime is the time after which client pings the server to see if transport is alive.
defaultDialKeepAliveTime = 10 * time.Second
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the default value if we don't set them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

zero


// defaultDialKeepAliveTimeout is the time that the client waits for a response for the
// keep-alive probe. If the response is not received in this time, the connection is closed.
defaultDialKeepAliveTimeout = 3 * time.Second

// DefaultDialTimeout is the maximum amount of time a dial will wait for a
// connection to setup. 30s is long enough for most of the network conditions.
DefaultDialTimeout = 30 * time.Second
Expand Down Expand Up @@ -212,17 +219,26 @@ func createEtcdClient(tlsConfig *tls.Config, acUrls []url.URL) (*clientv3.Client
lgc := zap.NewProductionConfig()
lgc.Encoding = log.ZapEncodingName
autoSyncInterval := defaultAutoSyncInterval
dialKeepAliveTime := defaultDialKeepAliveTime
dialKeepAliveTimeout := defaultDialKeepAliveTimeout
failpoint.Inject("autoSyncInterval", func() {
autoSyncInterval = 10 * time.Millisecond
})
failpoint.Inject("closeKeepAliveCheck", func() {
autoSyncInterval = 0
dialKeepAliveTime = 0
dialKeepAliveTimeout = 0
})
client, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: defaultEtcdClientTimeout,
AutoSyncInterval: autoSyncInterval,
TLS: tlsConfig,
LogConfig: &lgc,
Endpoints: endpoints,
DialTimeout: defaultEtcdClientTimeout,
AutoSyncInterval: autoSyncInterval,
TLS: tlsConfig,
LogConfig: &lgc,
DialKeepAliveTime: dialKeepAliveTime,
DialKeepAliveTimeout: dialKeepAliveTimeout,
})
if err != nil {
if err == nil {
log.Info("create etcd v3 client", zap.Strings("endpoints", endpoints))
}
return client, err
Expand Down
209 changes: 148 additions & 61 deletions pkg/utils/etcdutil/etcdutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,23 @@ import (
"context"
"crypto/tls"
"fmt"
"io"
"net"
"strings"
"sync/atomic"
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/utils/tempurl"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
"go.etcd.io/etcd/etcdserver/etcdserverpb"
"go.etcd.io/etcd/pkg/types"
)

func TestMemberHelpers(t *testing.T) {
t.Parallel()
re := require.New(t)
cfg1 := NewTestSingleConfig(t)
etcd1, err := embed.StartEtcd(cfg1)
Expand All @@ -54,43 +59,15 @@ func TestMemberHelpers(t *testing.T) {
re.Equal(uint64(etcd1.Server.ID()), listResp1.Members[0].ID)

// Test AddEtcdMember
// Make a new etcd config.
cfg2 := NewTestSingleConfig(t)
cfg2.Name = "etcd2"
cfg2.InitialCluster = cfg1.InitialCluster + fmt.Sprintf(",%s=%s", cfg2.Name, &cfg2.LPUrls[0])
cfg2.ClusterState = embed.ClusterStateFlagExisting

// Add it to the cluster above.
peerURL := cfg2.LPUrls[0].String()
addResp, err := AddEtcdMember(client1, []string{peerURL})
re.NoError(err)

etcd2, err := embed.StartEtcd(cfg2)
defer func() {
etcd2.Close()
}()
re.NoError(err)
re.Equal(uint64(etcd2.Server.ID()), addResp.Member.ID)

etcd2 := checkAddEtcdMember(t, cfg1, client1)
cfg2 := etcd2.Config()
defer etcd2.Close()
ep2 := cfg2.LCUrls[0].String()
client2, err := clientv3.New(clientv3.Config{
Endpoints: []string{ep2},
})
re.NoError(err)

<-etcd2.Server.ReadyNotify()

listResp2, err := ListEtcdMembers(client2)
re.NoError(err)
re.Len(listResp2.Members, 2)
for _, m := range listResp2.Members {
switch m.ID {
case uint64(etcd1.Server.ID()):
case uint64(etcd2.Server.ID()):
default:
t.Fatalf("unknown member: %v", m)
}
}
checkMembers(re, client2, []*embed.Etcd{etcd1, etcd2})

// Test CheckClusterID
urlsMap, err := types.NewURLsMap(cfg2.InitialCluster)
Expand All @@ -109,7 +86,6 @@ func TestMemberHelpers(t *testing.T) {
}

func TestEtcdKVGet(t *testing.T) {
t.Parallel()
re := require.New(t)
cfg := NewTestSingleConfig(t)
etcd, err := embed.StartEtcd(cfg)
Expand Down Expand Up @@ -160,7 +136,6 @@ func TestEtcdKVGet(t *testing.T) {
}

func TestEtcdKVPutWithTTL(t *testing.T) {
t.Parallel()
re := require.New(t)
cfg := NewTestSingleConfig(t)
etcd, err := embed.StartEtcd(cfg)
Expand Down Expand Up @@ -201,7 +176,6 @@ func TestEtcdKVPutWithTTL(t *testing.T) {
}

func TestInitClusterID(t *testing.T) {
t.Parallel()
re := require.New(t)
cfg := NewTestSingleConfig(t)
etcd, err := embed.StartEtcd(cfg)
Expand Down Expand Up @@ -234,7 +208,6 @@ func TestInitClusterID(t *testing.T) {
}

func TestEtcdClientSync(t *testing.T) {
t.Parallel()
re := require.New(t)
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/autoSyncInterval", "return(true)"))

Expand All @@ -252,45 +225,159 @@ func TestEtcdClientSync(t *testing.T) {
<-etcd1.Server.ReadyNotify()

// Add a new member.
etcd2 := checkAddEtcdMember(t, cfg1, client1)
defer etcd2.Close()
checkMembers(re, client1, []*embed.Etcd{etcd1, etcd2})

// Remove the first member and close the etcd1.
_, err = RemoveEtcdMember(client1, uint64(etcd1.Server.ID()))
re.NoError(err)
time.Sleep(20 * time.Millisecond) // wait for etcd client sync endpoints and client will be connected to etcd2
etcd1.Close()

// Check the client can get the new member with the new endpoints.
listResp3, err := ListEtcdMembers(client1)
re.NoError(err)
re.Len(listResp3.Members, 1)
re.Equal(uint64(etcd2.Server.ID()), listResp3.Members[0].ID)

require.NoError(t, failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/autoSyncInterval"))
}

func TestEtcdWithHangLeaderEnableCheck(t *testing.T) {
re := require.New(t)
var err error
// Test with enable check.
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/autoSyncInterval", "return(true)"))
err = checkEtcdWithHangLeader(t)
re.NoError(err)
require.NoError(t, failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/autoSyncInterval"))

// Test with disable check.
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/closeKeepAliveCheck", "return(true)"))
err = checkEtcdWithHangLeader(t)
re.Error(err)
require.NoError(t, failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/closeKeepAliveCheck"))
}

func checkEtcdWithHangLeader(t *testing.T) error {
re := require.New(t)
// Start a etcd server.
cfg1 := NewTestSingleConfig(t)
etcd1, err := embed.StartEtcd(cfg1)
re.NoError(err)
ep1 := cfg1.LCUrls[0].String()
<-etcd1.Server.ReadyNotify()

// Create a proxy to etcd1.
proxyAddr := tempurl.Alloc()
var enableDiscard atomic.Bool
go proxyWithDiscard(re, ep1, proxyAddr, &enableDiscard)

// Create a etcd client with etcd1 as endpoint.
urls, err := types.NewURLs([]string{proxyAddr})
re.NoError(err)
client1, err := createEtcdClient(nil, urls)
re.NoError(err)

// Add a new member and set the client endpoints to etcd1 and etcd2.
etcd2 := checkAddEtcdMember(t, cfg1, client1)
defer etcd2.Close()
checkMembers(re, client1, []*embed.Etcd{etcd1, etcd2})
time.Sleep(1 * time.Second) // wait for etcd client sync endpoints

// Hang the etcd1 and wait for the client to connect to etcd2.
enableDiscard.Store(true)
time.Sleep(defaultDialKeepAliveTime + defaultDialKeepAliveTimeout*2)
_, err = EtcdKVGet(client1, "test/key1")
return err
}

func checkAddEtcdMember(t *testing.T, cfg1 *embed.Config, client *clientv3.Client) *embed.Etcd {
re := require.New(t)
cfg2 := NewTestSingleConfig(t)
cfg2.Name = "etcd2"
cfg2.Name = genRandName()
cfg2.InitialCluster = cfg1.InitialCluster + fmt.Sprintf(",%s=%s", cfg2.Name, &cfg2.LPUrls[0])
cfg2.ClusterState = embed.ClusterStateFlagExisting
peerURL := cfg2.LPUrls[0].String()
addResp, err := AddEtcdMember(client1, []string{peerURL})
addResp, err := AddEtcdMember(client, []string{peerURL})
re.NoError(err)
etcd2, err := embed.StartEtcd(cfg2)
defer func() {
etcd2.Close()
}()
re.NoError(err)
re.Equal(uint64(etcd2.Server.ID()), addResp.Member.ID)
<-etcd2.Server.ReadyNotify()
return etcd2
}

func checkMembers(re *require.Assertions, client *clientv3.Client, etcds []*embed.Etcd) {
// Check the client can get the new member.
listResp2, err := ListEtcdMembers(client1)
re.NoError(err)
re.Len(listResp2.Members, 2)
for _, m := range listResp2.Members {
switch m.ID {
case uint64(etcd1.Server.ID()):
case uint64(etcd2.Server.ID()):
default:
t.Fatalf("unknown member: %v", m)
listResp, err := ListEtcdMembers(client)
re.NoError(err)
re.Len(listResp.Members, len(etcds))
inList := func(m *etcdserverpb.Member) bool {
for _, etcd := range etcds {
if m.ID == uint64(etcd.Server.ID()) {
return true
}
}
return false
}
for _, m := range listResp.Members {
re.True(inList(m))
}
}

// Remove the first member and close the etcd1.
_, err = RemoveEtcdMember(client1, uint64(etcd1.Server.ID()))
func proxyWithDiscard(re *require.Assertions, server, proxy string, enableDiscard *atomic.Bool) {
server = strings.TrimPrefix(server, "http://")
proxy = strings.TrimPrefix(proxy, "http://")
l, err := net.Listen("tcp", proxy)
re.NoError(err)
time.Sleep(20 * time.Millisecond) // wait for etcd client sync endpoints and client will be connected to etcd2
etcd1.Close()
for {
connect, err := l.Accept()
re.NoError(err)
go func(connect net.Conn) {
serverConnect, err := net.Dial("tcp", server)
re.NoError(err)
pipe(connect, serverConnect, enableDiscard)
}(connect)
}
}

// Check the client can get the new member with the new endpoints.
listResp3, err := ListEtcdMembers(client1)
re.NoError(err)
re.Len(listResp3.Members, 1)
re.Equal(uint64(etcd2.Server.ID()), listResp3.Members[0].ID)
func pipe(src net.Conn, dst net.Conn, enableDiscard *atomic.Bool) {
errChan := make(chan error, 1)
go func() {
err := ioCopy(src, dst, enableDiscard)
errChan <- err
}()
go func() {
err := ioCopy(dst, src, enableDiscard)
errChan <- err
}()
<-errChan
dst.Close()
src.Close()
}

require.NoError(t, failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/autoSyncInterval"))
func ioCopy(dst io.Writer, src io.Reader, enableDiscard *atomic.Bool) (err error) {
buffer := make([]byte, 32*1024)
for {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a dead loop when src.Read(buffer) returns non-zero, EOF then next Read returns 0, EOF

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just direct return

if enableDiscard.Load() {
io.Copy(io.Discard, src)
}
readNum, errRead := src.Read(buffer)
if readNum > 0 {
writeNum, errWrite := dst.Write(buffer[:readNum])
if errWrite != nil {
return errWrite
}
if readNum != writeNum {
return io.ErrShortWrite
}
}
if errRead != nil {
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
err = errRead
break
}
}
return err
}
Loading