Skip to content

Commit

Permalink
Refine the etcd client healthy checker code
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <ghzpotato@gmail.com>
  • Loading branch information
JmPotato committed Jan 17, 2024
1 parent aa9c83c commit 341c3c7
Showing 1 changed file with 102 additions and 73 deletions.
175 changes: 102 additions & 73 deletions pkg/utils/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,85 +269,115 @@ func CreateEtcdClient(tlsConfig *tls.Config, acURLs []url.URL) (*clientv3.Client
failpoint.Inject("closeTick", func() {
failpoint.Return(client, err)
})
initHealthyChecker(tickerInterval, tlsConfig, client)

checker := &healthyChecker{
tlsConfig: tlsConfig,
return client, err
}

// healthyClient will wrap a etcd client and record its last health time.
// The etcd client inside will only maintain one connection to the etcd server
// to make sure each healthyClient could be used to check the health of a certain
// etcd endpoint without involving the load balancer of etcd client.
type healthyClient struct {
*clientv3.Client
lastHealth time.Time
}

// healthyChecker is used to check the health of etcd endpoints. Inside the checker,
// we will maintain a map from each available etcd endpoint to its healthyClient.
type healthyChecker struct {
tickerInterval time.Duration
tlsConfig *tls.Config

sync.Map // map[string]*healthyClient
// client is the etcd client the healthy checker is guarding, it will be set with
// the checked healthy endpoints dynamically and periodically.
client *clientv3.Client
}

// initHealthyChecker initializes the healthy checker for etcd client.
func initHealthyChecker(tickerInterval time.Duration, tlsConfig *tls.Config, client *clientv3.Client) {
healthyChecker := &healthyChecker{
tickerInterval: tickerInterval,
tlsConfig: tlsConfig,
client: client,
}
eps := syncUrls(client)
checker.update(eps)
// Healthy checker has the same lifetime with the given etcd client.
ctx := client.Ctx()
// Sync etcd endpoints and check the last health time of each endpoint periodically.
go healthyChecker.syncer(ctx)
// Inspect the health of each endpoint by reading the health key periodically.
go healthyChecker.inspector(ctx)
}

// Create a goroutine to check the health of etcd endpoints periodically.
go func(client *clientv3.Client) {
defer logutil.LogPanic()
ticker := time.NewTicker(tickerInterval)
defer ticker.Stop()
lastAvailable := time.Now()
for {
select {
case <-client.Ctx().Done():
log.Info("etcd client is closed, exit health check goroutine")
checker.Range(func(key, value interface{}) bool {
client := value.(*healthyClient)
client.Close()
return true
})
return
case <-ticker.C:
usedEps := client.Endpoints()
healthyEps := checker.patrol(client.Ctx())
if len(healthyEps) == 0 {
// when all endpoints are unhealthy, try to reset endpoints to update connect
// rather than delete them to avoid there is no any endpoint in client.
// Note: reset endpoints will trigger subconn closed, and then trigger reconnect.
// otherwise, the subconn will be retrying in grpc layer and use exponential backoff,
// and it cannot recover as soon as possible.
if time.Since(lastAvailable) > etcdServerDisconnectedTimeout {
log.Info("no available endpoint, try to reset endpoints", zap.Strings("last-endpoints", usedEps))
client.SetEndpoints([]string{}...)
client.SetEndpoints(usedEps...)
}
} else {
if !typeutil.AreStringSlicesEquivalent(healthyEps, usedEps) {
client.SetEndpoints(healthyEps...)
change := fmt.Sprintf("%d->%d", len(usedEps), len(healthyEps))
etcdStateGauge.WithLabelValues("endpoints").Set(float64(len(healthyEps)))
log.Info("update endpoints", zap.String("num-change", change),
zap.Strings("last-endpoints", usedEps), zap.Strings("endpoints", client.Endpoints()))
}
lastAvailable = time.Now()
}
}
func (checker *healthyChecker) syncer(ctx context.Context) {
defer logutil.LogPanic()
checker.update()
ticker := time.NewTicker(checker.tickerInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
log.Info("etcd client is closed, exit update endpoint goroutine")
return
case <-ticker.C:
checker.update()
}
}(client)
}
}

// Notes: use another goroutine to update endpoints to avoid blocking health check in the first goroutine.
go func(client *clientv3.Client) {
defer logutil.LogPanic()
ticker := time.NewTicker(tickerInterval)
defer ticker.Stop()
for {
select {
case <-client.Ctx().Done():
log.Info("etcd client is closed, exit update endpoint goroutine")
return
case <-ticker.C:
eps := syncUrls(client)
checker.update(eps)
func (checker *healthyChecker) inspector(ctx context.Context) {
defer logutil.LogPanic()
ticker := time.NewTicker(checker.tickerInterval)
defer ticker.Stop()
lastAvailable := time.Now()
for {
select {
case <-ctx.Done():
log.Info("etcd client is closed, exit health check goroutine")
checker.close()
return
case <-ticker.C:
usedEps := checker.client.Endpoints()
healthyEps := checker.patrol(ctx)
if len(healthyEps) == 0 {
// when all endpoints are unhealthy, try to reset endpoints to update connect
// rather than delete them to avoid there is no any endpoint in client.
// Note: reset endpoints will trigger subconn closed, and then trigger reconnect.
// otherwise, the subconn will be retrying in grpc layer and use exponential backoff,
// and it cannot recover as soon as possible.
if time.Since(lastAvailable) > etcdServerDisconnectedTimeout {
log.Info("no available endpoint, try to reset endpoints",
zap.Strings("last-endpoints", usedEps))
resetClientEndpoints(checker.client, usedEps...)

Check warning on line 352 in pkg/utils/etcdutil/etcdutil.go

View check run for this annotation

Codecov / codecov/patch

pkg/utils/etcdutil/etcdutil.go#L350-L352

Added lines #L350 - L352 were not covered by tests
}
} else {
if !typeutil.AreStringSlicesEquivalent(healthyEps, usedEps) {
checker.client.SetEndpoints(healthyEps...)
etcdStateGauge.WithLabelValues("endpoints").Set(float64(len(healthyEps)))
log.Info("update endpoints",
zap.String("num-change", fmt.Sprintf("%d->%d", len(usedEps), len(healthyEps))),
zap.Strings("last-endpoints", usedEps),
zap.Strings("endpoints", checker.client.Endpoints()))
}
lastAvailable = time.Now()
}
}
}(client)

return client, err
}
}

type healthyClient struct {
*clientv3.Client
lastHealth time.Time
func (checker *healthyChecker) close() {
checker.Range(func(key, value interface{}) bool {
client := value.(*healthyClient)
client.Close()
return true
})
}

type healthyChecker struct {
sync.Map // map[string]*healthyClient
tlsConfig *tls.Config
// Reset the etcd client endpoints to trigger reconnect.
func resetClientEndpoints(client *clientv3.Client, endpoints ...string) {
client.SetEndpoints([]string{}...)
client.SetEndpoints(endpoints...)

Check warning on line 380 in pkg/utils/etcdutil/etcdutil.go

View check run for this annotation

Codecov / codecov/patch

pkg/utils/etcdutil/etcdutil.go#L379-L380

Added lines #L379 - L380 were not covered by tests
}

func (checker *healthyChecker) patrol(ctx context.Context) []string {
Expand Down Expand Up @@ -386,8 +416,9 @@ func (checker *healthyChecker) patrol(ctx context.Context) []string {
return healthyList
}

func (checker *healthyChecker) update(eps []string) {
epMap := make(map[string]struct{})
func (checker *healthyChecker) update() {
eps := syncUrls(checker.client)
epMap := make(map[string]struct{}, len(eps))
for _, ep := range eps {
epMap[ep] = struct{}{}
}
Expand All @@ -401,9 +432,7 @@ func (checker *healthyChecker) update(eps []string) {
checker.removeClient(ep)
}
if time.Since(lastHealthy) > etcdServerDisconnectedTimeout {
// try to reset client endpoint to trigger reconnect
client.(*healthyClient).Client.SetEndpoints([]string{}...)
client.(*healthyClient).Client.SetEndpoints(ep)
resetClientEndpoints(client.(*healthyClient).Client, ep)

Check warning on line 435 in pkg/utils/etcdutil/etcdutil.go

View check run for this annotation

Codecov / codecov/patch

pkg/utils/etcdutil/etcdutil.go#L435

Added line #L435 was not covered by tests
}
continue
}
Expand Down

0 comments on commit 341c3c7

Please sign in to comment.