Skip to content

Commit

Permalink
Cherry-pick the etcd client health checker improvements
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <ghzpotato@gmail.com>
  • Loading branch information
JmPotato committed Feb 2, 2024
1 parent ae19047 commit 1484980
Show file tree
Hide file tree
Showing 14 changed files with 936 additions and 258 deletions.
4 changes: 2 additions & 2 deletions pkg/mcs/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,12 @@ func InitClient(s server) error {
if err != nil {
return err
}
etcdClient, httpClient, err := etcdutil.CreateClients(tlsConfig, backendUrls)
etcdClient, err := etcdutil.CreateEtcdClient(tlsConfig, backendUrls, "mcs-etcd-client")
if err != nil {
return err
}
s.SetETCDClient(etcdClient)
s.SetHTTPClient(httpClient)
s.SetHTTPClient(etcdutil.CreateHTTPClient(tlsConfig))
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/member/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ func (m *EmbeddedEtcdMember) ResignEtcdLeader(ctx context.Context, from string,
log.Info("try to resign etcd leader to next pd-server", zap.String("from", from), zap.String("to", nextEtcdLeader))
// Determine next etcd leader candidates.
var etcdLeaderIDs []uint64
res, err := etcdutil.ListEtcdMembers(m.client)
res, err := etcdutil.ListEtcdMembers(ctx, m.client)
if err != nil {
return err
}
Expand Down
190 changes: 10 additions & 180 deletions pkg/utils/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package etcdutil
import (
"context"
"crypto/tls"
"fmt"
"math/rand"
"net/http"
"net/url"
Expand Down Expand Up @@ -107,13 +106,13 @@ func AddEtcdMember(client *clientv3.Client, urls []string) (*clientv3.MemberAddR
}

// ListEtcdMembers returns a list of internal etcd members.
func ListEtcdMembers(client *clientv3.Client) (*clientv3.MemberListResponse, error) {
func ListEtcdMembers(ctx context.Context, client *clientv3.Client) (*clientv3.MemberListResponse, error) {
failpoint.Inject("SlowEtcdMemberList", func(val failpoint.Value) {
d := val.(int)
time.Sleep(time.Duration(d) * time.Second)
})
ctx, cancel := context.WithTimeout(client.Ctx(), DefaultRequestTimeout)
listResp, err := client.MemberList(ctx)
newCtx, cancel := context.WithTimeout(ctx, DefaultRequestTimeout)
listResp, err := client.MemberList(newCtx)
cancel()
if err != nil {
return listResp, errs.ErrEtcdMemberList.Wrap(err).GenWithStackByCause()
Expand Down Expand Up @@ -247,7 +246,7 @@ func newClient(tlsConfig *tls.Config, endpoints ...string) (*clientv3.Client, er
}

// CreateEtcdClient creates etcd v3 client with detecting endpoints.
func CreateEtcdClient(tlsConfig *tls.Config, acURLs []url.URL) (*clientv3.Client, error) {
func CreateEtcdClient(tlsConfig *tls.Config, acURLs []url.URL, sourceOpt ...string) (*clientv3.Client, error) {
urls := make([]string, 0, len(acURLs))
for _, u := range acURLs {
urls = append(urls, u.String())
Expand All @@ -264,185 +263,16 @@ func CreateEtcdClient(tlsConfig *tls.Config, acURLs []url.URL) (*clientv3.Client
failpoint.Inject("closeTick", func() {
failpoint.Return(client, err)
})

checker := &healthyChecker{
tlsConfig: tlsConfig,
source := "default-etcd-client"
if len(sourceOpt) > 0 {
source = sourceOpt[0]
}
eps := syncUrls(client)
checker.update(eps)

// Create a goroutine to check the health of etcd endpoints periodically.
go func(client *clientv3.Client) {
defer logutil.LogPanic()
ticker := time.NewTicker(tickerInterval)
defer ticker.Stop()
lastAvailable := time.Now()
for {
select {
case <-client.Ctx().Done():
log.Info("etcd client is closed, exit health check goroutine")
checker.Range(func(key, value interface{}) bool {
client := value.(*healthyClient)
client.Close()
return true
})
return
case <-ticker.C:
usedEps := client.Endpoints()
healthyEps := checker.patrol(client.Ctx())
if len(healthyEps) == 0 {
// when all endpoints are unhealthy, try to reset endpoints to update connect
// rather than delete them to avoid there is no any endpoint in client.
// Note: reset endpoints will trigger subconn closed, and then trigger reconnect.
// otherwise, the subconn will be retrying in grpc layer and use exponential backoff,
// and it cannot recover as soon as possible.
if time.Since(lastAvailable) > etcdServerDisconnectedTimeout {
log.Info("no available endpoint, try to reset endpoints", zap.Strings("last-endpoints", usedEps))
client.SetEndpoints([]string{}...)
client.SetEndpoints(usedEps...)
}
} else {
if !typeutil.AreStringSlicesEquivalent(healthyEps, usedEps) {
client.SetEndpoints(healthyEps...)
change := fmt.Sprintf("%d->%d", len(usedEps), len(healthyEps))
etcdStateGauge.WithLabelValues("endpoints").Set(float64(len(healthyEps)))
log.Info("update endpoints", zap.String("num-change", change),
zap.Strings("last-endpoints", usedEps), zap.Strings("endpoints", client.Endpoints()))
}
lastAvailable = time.Now()
}
}
}
}(client)

// Notes: use another goroutine to update endpoints to avoid blocking health check in the first goroutine.
go func(client *clientv3.Client) {
defer logutil.LogPanic()
ticker := time.NewTicker(tickerInterval)
defer ticker.Stop()
for {
select {
case <-client.Ctx().Done():
log.Info("etcd client is closed, exit update endpoint goroutine")
return
case <-ticker.C:
eps := syncUrls(client)
checker.update(eps)
}
}
}(client)

initHealthChecker(tickerInterval, tlsConfig, client, source)
return client, err
}

type healthyClient struct {
*clientv3.Client
lastHealth time.Time
}

type healthyChecker struct {
sync.Map // map[string]*healthyClient
tlsConfig *tls.Config
}

func (checker *healthyChecker) patrol(ctx context.Context) []string {
// See https://github.com/etcd-io/etcd/blob/85b640cee793e25f3837c47200089d14a8392dc7/etcdctl/ctlv3/command/ep_command.go#L105-L145
var wg sync.WaitGroup
count := 0
checker.Range(func(key, value interface{}) bool {
count++
return true
})
hch := make(chan string, count)
healthyList := make([]string, 0, count)
checker.Range(func(key, value interface{}) bool {
wg.Add(1)
go func(key, value interface{}) {
defer wg.Done()
defer logutil.LogPanic()
ep := key.(string)
client := value.(*healthyClient)
if IsHealthy(ctx, client.Client) {
hch <- ep
checker.Store(ep, &healthyClient{
Client: client.Client,
lastHealth: time.Now(),
})
return
}
}(key, value)
return true
})
wg.Wait()
close(hch)
for h := range hch {
healthyList = append(healthyList, h)
}
return healthyList
}

func (checker *healthyChecker) update(eps []string) {
for _, ep := range eps {
// check if client exists, if not, create one, if exists, check if it's offline or disconnected.
if client, ok := checker.Load(ep); ok {
lastHealthy := client.(*healthyClient).lastHealth
if time.Since(lastHealthy) > etcdServerOfflineTimeout {
log.Info("some etcd server maybe offline", zap.String("endpoint", ep))
checker.Delete(ep)
}
if time.Since(lastHealthy) > etcdServerDisconnectedTimeout {
// try to reset client endpoint to trigger reconnect
client.(*healthyClient).Client.SetEndpoints([]string{}...)
client.(*healthyClient).Client.SetEndpoints(ep)
}
continue
}
checker.addClient(ep, time.Now())
}
}

func (checker *healthyChecker) addClient(ep string, lastHealth time.Time) {
client, err := newClient(checker.tlsConfig, ep)
if err != nil {
log.Error("failed to create etcd healthy client", zap.Error(err))
return
}
checker.Store(ep, &healthyClient{
Client: client,
lastHealth: lastHealth,
})
}

func syncUrls(client *clientv3.Client) []string {
// See https://github.com/etcd-io/etcd/blob/85b640cee793e25f3837c47200089d14a8392dc7/clientv3/client.go#L170-L183
ctx, cancel := context.WithTimeout(clientv3.WithRequireLeader(client.Ctx()), DefaultRequestTimeout)
defer cancel()
mresp, err := client.MemberList(ctx)
if err != nil {
log.Error("failed to list members", errs.ZapError(err))
return []string{}
}
var eps []string
for _, m := range mresp.Members {
if len(m.Name) != 0 && !m.IsLearner {
eps = append(eps, m.ClientURLs...)
}
}
return eps
}

// CreateClients creates etcd v3 client and http client.
func CreateClients(tlsConfig *tls.Config, acUrls []url.URL) (*clientv3.Client, *http.Client, error) {
client, err := CreateEtcdClient(tlsConfig, acUrls)
if err != nil {
return nil, nil, errs.ErrNewEtcdClient.Wrap(err).GenWithStackByCause()
}
httpClient := createHTTPClient(tlsConfig)
return client, httpClient, nil
}

// createHTTPClient creates a http client with the given tls config.
func createHTTPClient(tlsConfig *tls.Config) *http.Client {
// CreateHTTPClient creates a http client with the given tls config.
func CreateHTTPClient(tlsConfig *tls.Config) *http.Client {
// FIXME: Currently, there is no timeout set for certain requests, such as GetRegions,
// which may take a significant amount of time. However, it might be necessary to
// define an appropriate timeout in the future.
Expand Down
39 changes: 21 additions & 18 deletions pkg/utils/etcdutil/etcdutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestMemberHelpers(t *testing.T) {
etcd1, cfg1 := servers[0], servers[0].Config()

// Test ListEtcdMembers
listResp1, err := ListEtcdMembers(client1)
listResp1, err := ListEtcdMembers(client1.Ctx(), client1)
re.NoError(err)
re.Len(listResp1.Members, 1)
// types.ID is an alias of uint64.
Expand All @@ -74,7 +74,7 @@ func TestMemberHelpers(t *testing.T) {
_, err = RemoveEtcdMember(client1, uint64(etcd2.Server.ID()))
re.NoError(err)

listResp3, err := ListEtcdMembers(client1)
listResp3, err := ListEtcdMembers(client1.Ctx(), client1)
re.NoError(err)
re.Len(listResp3.Members, 1)
re.Equal(uint64(etcd1.Server.ID()), listResp3.Members[0].ID)
Expand Down Expand Up @@ -177,23 +177,30 @@ func TestEtcdClientSync(t *testing.T) {
etcd2 := MustAddEtcdMember(t, &cfg1, client1)
defer etcd2.Close()
checkMembers(re, client1, []*embed.Etcd{etcd1, etcd2})
testutil.Eventually(re, func() bool {
// wait for etcd client sync endpoints
return len(client1.Endpoints()) == 2
})
// wait for etcd client sync endpoints
checkEtcdEndpointNum(re, client1, 2)

// Remove the first member and close the etcd1.
_, err := RemoveEtcdMember(client1, uint64(etcd1.Server.ID()))
re.NoError(err)
etcd1.Close()

// Check the client can get the new member with the new endpoints.
checkEtcdEndpointNum(re, client1, 1)

re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick"))
}

func checkEtcdEndpointNum(re *require.Assertions, client *clientv3.Client, num int) {
testutil.Eventually(re, func() bool {
// wait for etcd client sync endpoints
return len(client1.Endpoints()) == 1
return len(client.Endpoints()) == num
})
}

re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick"))
func checkEtcdClientHealth(re *require.Assertions, client *clientv3.Client) {
testutil.Eventually(re, func() bool {
return IsHealthy(context.Background(), client)
})
}

func TestEtcdScaleInAndOut(t *testing.T) {
Expand Down Expand Up @@ -228,25 +235,21 @@ func TestRandomKillEtcd(t *testing.T) {
// Start a etcd server.
etcds, client1, clean := NewTestEtcdCluster(t, 3)
defer clean()
testutil.Eventually(re, func() bool {
return len(client1.Endpoints()) == 3
})
checkEtcdEndpointNum(re, client1, 3)

// Randomly kill an etcd server and restart it
cfgs := []embed.Config{etcds[0].Config(), etcds[1].Config(), etcds[2].Config()}
for i := 0; i < 10; i++ {
killIndex := rand.Intn(len(etcds))
etcds[killIndex].Close()
testutil.Eventually(re, func() bool {
return IsHealthy(context.Background(), client1)
})
checkEtcdEndpointNum(re, client1, 2)
checkEtcdClientHealth(re, client1)
etcd, err := embed.StartEtcd(&cfgs[killIndex])
re.NoError(err)
<-etcd.Server.ReadyNotify()
etcds[killIndex] = etcd
testutil.Eventually(re, func() bool {
return IsHealthy(context.Background(), client1)
})
checkEtcdEndpointNum(re, client1, 3)
checkEtcdClientHealth(re, client1)
}
for _, etcd := range etcds {
if etcd != nil {
Expand Down
Loading

0 comments on commit 1484980

Please sign in to comment.