Skip to content

Commit

Permalink
etcdutil: add dial keep alive params to switch connect as soon as pos…
Browse files Browse the repository at this point in the history
…sible

Signed-off-by: lhy1024 <admin@liudos.us>
  • Loading branch information
lhy1024 committed Mar 2, 2023
1 parent a99ff9f commit a716a9f
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 61 deletions.
21 changes: 15 additions & 6 deletions pkg/utils/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ const (
// defaultAutoSyncInterval is the interval to sync etcd cluster.
defaultAutoSyncInterval = 60 * time.Second

// defaultDialKeepAliveTime is the time after which client pings the server to see if transport is alive.
defaultDialKeepAliveTime = 10 * time.Second

// defaultDialKeepAliveTimeout is the time that the client waits for a response for the
// keep-alive probe. If the response is not received in this time, the connection is closed.
defaultDialKeepAliveTimeout = 3 * time.Second

// DefaultDialTimeout is the maximum amount of time a dial will wait for a
// connection to setup. 30s is long enough for most of the network conditions.
DefaultDialTimeout = 30 * time.Second
Expand Down Expand Up @@ -216,13 +223,15 @@ func createEtcdClient(tlsConfig *tls.Config, acUrls []url.URL) (*clientv3.Client
autoSyncInterval = 10 * time.Millisecond
})
client, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: defaultEtcdClientTimeout,
AutoSyncInterval: autoSyncInterval,
TLS: tlsConfig,
LogConfig: &lgc,
Endpoints: endpoints,
DialTimeout: defaultEtcdClientTimeout,
AutoSyncInterval: autoSyncInterval,
TLS: tlsConfig,
LogConfig: &lgc,
DialKeepAliveTime: defaultDialKeepAliveTime,
DialKeepAliveTimeout: defaultDialKeepAliveTimeout,
})
if err != nil {
if err == nil {
log.Info("create etcd v3 client", zap.Strings("endpoints", endpoints))
}
return client, err
Expand Down
188 changes: 133 additions & 55 deletions pkg/utils/etcdutil/etcdutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,19 @@ import (
"context"
"crypto/tls"
"fmt"
"io"
"net"
"strings"
"sync/atomic"
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/utils/tempurl"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
"go.etcd.io/etcd/etcdserver/etcdserverpb"
"go.etcd.io/etcd/pkg/types"
)

Expand Down Expand Up @@ -54,43 +60,15 @@ func TestMemberHelpers(t *testing.T) {
re.Equal(uint64(etcd1.Server.ID()), listResp1.Members[0].ID)

// Test AddEtcdMember
// Make a new etcd config.
cfg2 := NewTestSingleConfig(t)
cfg2.Name = "etcd2"
cfg2.InitialCluster = cfg1.InitialCluster + fmt.Sprintf(",%s=%s", cfg2.Name, &cfg2.LPUrls[0])
cfg2.ClusterState = embed.ClusterStateFlagExisting

// Add it to the cluster above.
peerURL := cfg2.LPUrls[0].String()
addResp, err := AddEtcdMember(client1, []string{peerURL})
re.NoError(err)

etcd2, err := embed.StartEtcd(cfg2)
defer func() {
etcd2.Close()
}()
re.NoError(err)
re.Equal(uint64(etcd2.Server.ID()), addResp.Member.ID)

etcd2 := checkAddEtcdMember(t, cfg1, client1)
cfg2 := etcd2.Config()
defer etcd2.Close()
ep2 := cfg2.LCUrls[0].String()
client2, err := clientv3.New(clientv3.Config{
Endpoints: []string{ep2},
})
re.NoError(err)

<-etcd2.Server.ReadyNotify()

listResp2, err := ListEtcdMembers(client2)
re.NoError(err)
re.Len(listResp2.Members, 2)
for _, m := range listResp2.Members {
switch m.ID {
case uint64(etcd1.Server.ID()):
case uint64(etcd2.Server.ID()):
default:
t.Fatalf("unknown member: %v", m)
}
}
checkMembers(re, client2, []*embed.Etcd{etcd1, etcd2})

// Test CheckClusterID
urlsMap, err := types.NewURLsMap(cfg2.InitialCluster)
Expand Down Expand Up @@ -252,45 +230,145 @@ func TestEtcdClientSync(t *testing.T) {
<-etcd1.Server.ReadyNotify()

// Add a new member.
etcd2 := checkAddEtcdMember(t, cfg1, client1)
defer etcd2.Close()
checkMembers(re, client1, []*embed.Etcd{etcd1, etcd2})

// Remove the first member and close the etcd1.
_, err = RemoveEtcdMember(client1, uint64(etcd1.Server.ID()))
re.NoError(err)
time.Sleep(20 * time.Millisecond) // wait for etcd client sync endpoints and client will be connected to etcd2
etcd1.Close()

// Check the client can get the new member with the new endpoints.
listResp3, err := ListEtcdMembers(client1)
re.NoError(err)
re.Len(listResp3.Members, 1)
re.Equal(uint64(etcd2.Server.ID()), listResp3.Members[0].ID)

require.NoError(t, failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/autoSyncInterval"))
}

func TestEtcdWithHangLeader(t *testing.T) {
t.Parallel()
re := require.New(t)
// Start a etcd server.
cfg1 := NewTestSingleConfig(t)
etcd1, err := embed.StartEtcd(cfg1)
re.NoError(err)
ep1 := cfg1.LCUrls[0].String()
<-etcd1.Server.ReadyNotify()

// Create a proxy to etcd1.
proxyAddr := tempurl.Alloc()
var enableDiscard atomic.Bool
go proxyWithDiscard(re, ep1, proxyAddr, &enableDiscard)

// Create a etcd client with etcd1 as endpoint.
urls, err := types.NewURLs([]string{proxyAddr})
re.NoError(err)
client1, err := createEtcdClient(nil, urls)
re.NoError(err)

// Add a new member and set the client endpoints to etcd1 and etcd2.
etcd2 := checkAddEtcdMember(t, cfg1, client1)
defer etcd2.Close()
checkMembers(re, client1, []*embed.Etcd{etcd1, etcd2})
etcd2Addr := etcd2.Config().LCUrls[0].String()
client1.SetEndpoints(proxyAddr, etcd2Addr)

// Hang the etcd1 and wait for the client to connect to etcd2.
enableDiscard.Store(true)
time.Sleep(defaultDialKeepAliveTime + defaultDialKeepAliveTimeout*2)
_, err = EtcdKVGet(client1, "test/key1")
re.NoError(err)
}

func checkAddEtcdMember(t *testing.T, cfg1 *embed.Config, client *clientv3.Client) *embed.Etcd {
re := require.New(t)
cfg2 := NewTestSingleConfig(t)
cfg2.Name = "etcd2"
cfg2.InitialCluster = cfg1.InitialCluster + fmt.Sprintf(",%s=%s", cfg2.Name, &cfg2.LPUrls[0])
cfg2.ClusterState = embed.ClusterStateFlagExisting
peerURL := cfg2.LPUrls[0].String()
addResp, err := AddEtcdMember(client1, []string{peerURL})
addResp, err := AddEtcdMember(client, []string{peerURL})
re.NoError(err)
etcd2, err := embed.StartEtcd(cfg2)
defer func() {
etcd2.Close()
}()
re.NoError(err)
re.Equal(uint64(etcd2.Server.ID()), addResp.Member.ID)
<-etcd2.Server.ReadyNotify()
return etcd2
}

func checkMembers(re *require.Assertions, client *clientv3.Client, etcds []*embed.Etcd) {
// Check the client can get the new member.
listResp2, err := ListEtcdMembers(client1)
re.NoError(err)
re.Len(listResp2.Members, 2)
for _, m := range listResp2.Members {
switch m.ID {
case uint64(etcd1.Server.ID()):
case uint64(etcd2.Server.ID()):
default:
t.Fatalf("unknown member: %v", m)
listResp, err := ListEtcdMembers(client)
re.NoError(err)
re.Len(listResp.Members, len(etcds))
inList := func(m *etcdserverpb.Member) bool {
for _, etcd := range etcds {
if m.ID == uint64(etcd.Server.ID()) {
return true
}
}
return false
}
for _, m := range listResp.Members {
re.True(inList(m))
}
}

// Remove the first member and close the etcd1.
_, err = RemoveEtcdMember(client1, uint64(etcd1.Server.ID()))
func proxyWithDiscard(re *require.Assertions, server, proxy string, enableDiscard *atomic.Bool) {
server = strings.TrimPrefix(server, "http://")
proxy = strings.TrimPrefix(proxy, "http://")
l, err := net.Listen("tcp", proxy)
re.NoError(err)
time.Sleep(20 * time.Millisecond) // wait for etcd client sync endpoints and client will be connected to etcd2
etcd1.Close()
for {
connect, err := l.Accept()
re.NoError(err)
go func(connect net.Conn) {
serverConnect, err := net.Dial("tcp", server)
re.NoError(err)
pipe(connect, serverConnect, enableDiscard)
}(connect)
}
}

// Check the client can get the new member with the new endpoints.
listResp3, err := ListEtcdMembers(client1)
re.NoError(err)
re.Len(listResp3.Members, 1)
re.Equal(uint64(etcd2.Server.ID()), listResp3.Members[0].ID)
func pipe(src net.Conn, dst net.Conn, enableDiscard *atomic.Bool) {
errChan := make(chan error, 1)
go func() {
err := ioCopy(src, dst, enableDiscard)
errChan <- err
}()
go func() {
err := ioCopy(dst, src, enableDiscard)
errChan <- err
}()
<-errChan
dst.Close()
src.Close()
}

require.NoError(t, failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/autoSyncInterval"))
func ioCopy(dst io.Writer, src io.Reader, enableDiscard *atomic.Bool) (err error) {
buffer := make([]byte, 32*1024)
for {
if enableDiscard.Load() {
io.Copy(io.Discard, src)
}
readNum, errRead := src.Read(buffer)
if readNum > 0 {
writeNum, errWrite := dst.Write(buffer[0:readNum])
if errWrite != nil {
return errWrite
}
if readNum != writeNum {
return io.ErrShortWrite
}
}
if errRead != nil {
err = errRead
break
}
}
return err
}

0 comments on commit a716a9f

Please sign in to comment.