Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

etcdutil: refine the etcd client healthy checker code #7727

Merged
merged 1 commit into from
Jan 17, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 107 additions & 76 deletions pkg/utils/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,97 +269,129 @@
failpoint.Inject("closeTick", func() {
failpoint.Return(client, err)
})
initHealthyChecker(tickerInterval, tlsConfig, client)

checker := &healthyChecker{
tlsConfig: tlsConfig,
return client, err
}

// healthyClient will wrap a etcd client and record its last health time.
// The etcd client inside will only maintain one connection to the etcd server
// to make sure each healthyClient could be used to check the health of a certain
// etcd endpoint without involving the load balancer of etcd client.
type healthyClient struct {
*clientv3.Client
lastHealth time.Time
}

// healthyChecker is used to check the health of etcd endpoints. Inside the checker,
// we will maintain a map from each available etcd endpoint to its healthyClient.
type healthyChecker struct {
tickerInterval time.Duration
tlsConfig *tls.Config

sync.Map // map[string]*healthyClient
// client is the etcd client the healthy checker is guarding, it will be set with
// the checked healthy endpoints dynamically and periodically.
client *clientv3.Client
}

// initHealthyChecker initializes the healthy checker for etcd client.
func initHealthyChecker(tickerInterval time.Duration, tlsConfig *tls.Config, client *clientv3.Client) {
healthyChecker := &healthyChecker{
tickerInterval: tickerInterval,
tlsConfig: tlsConfig,
client: client,
}
eps := syncUrls(client)
checker.update(eps)
// Healthy checker has the same lifetime with the given etcd client.
ctx := client.Ctx()
// Sync etcd endpoints and check the last health time of each endpoint periodically.
go healthyChecker.syncer(ctx)
// Inspect the health of each endpoint by reading the health key periodically.
go healthyChecker.inspector(ctx)
}

// Create a goroutine to check the health of etcd endpoints periodically.
go func(client *clientv3.Client) {
defer logutil.LogPanic()
ticker := time.NewTicker(tickerInterval)
defer ticker.Stop()
lastAvailable := time.Now()
for {
select {
case <-client.Ctx().Done():
log.Info("etcd client is closed, exit health check goroutine")
checker.Range(func(key, value interface{}) bool {
client := value.(*healthyClient)
client.Close()
return true
})
return
case <-ticker.C:
usedEps := client.Endpoints()
healthyEps := checker.patrol(client.Ctx())
if len(healthyEps) == 0 {
// when all endpoints are unhealthy, try to reset endpoints to update connect
// rather than delete them to avoid there is no any endpoint in client.
// Note: reset endpoints will trigger subconn closed, and then trigger reconnect.
// otherwise, the subconn will be retrying in grpc layer and use exponential backoff,
// and it cannot recover as soon as possible.
if time.Since(lastAvailable) > etcdServerDisconnectedTimeout {
log.Info("no available endpoint, try to reset endpoints", zap.Strings("last-endpoints", usedEps))
client.SetEndpoints([]string{}...)
client.SetEndpoints(usedEps...)
}
} else {
if !typeutil.AreStringSlicesEquivalent(healthyEps, usedEps) {
client.SetEndpoints(healthyEps...)
change := fmt.Sprintf("%d->%d", len(usedEps), len(healthyEps))
etcdStateGauge.WithLabelValues("endpoints").Set(float64(len(healthyEps)))
log.Info("update endpoints", zap.String("num-change", change),
zap.Strings("last-endpoints", usedEps), zap.Strings("endpoints", client.Endpoints()))
}
lastAvailable = time.Now()
}
}
func (checker *healthyChecker) syncer(ctx context.Context) {
defer logutil.LogPanic()
checker.update()
ticker := time.NewTicker(checker.tickerInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
log.Info("etcd client is closed, exit update endpoint goroutine")
return
case <-ticker.C:
checker.update()
}
}(client)
}
}

// Notes: use another goroutine to update endpoints to avoid blocking health check in the first goroutine.
go func(client *clientv3.Client) {
defer logutil.LogPanic()
ticker := time.NewTicker(tickerInterval)
defer ticker.Stop()
for {
select {
case <-client.Ctx().Done():
log.Info("etcd client is closed, exit update endpoint goroutine")
return
case <-ticker.C:
eps := syncUrls(client)
checker.update(eps)
func (checker *healthyChecker) inspector(ctx context.Context) {
defer logutil.LogPanic()
ticker := time.NewTicker(checker.tickerInterval)
defer ticker.Stop()
lastAvailable := time.Now()
for {
select {
case <-ctx.Done():
log.Info("etcd client is closed, exit health check goroutine")
checker.close()
return
case <-ticker.C:
lastEps := checker.client.Endpoints()
healthyEps := checker.patrol(ctx)
if len(healthyEps) == 0 {
// when all endpoints are unhealthy, try to reset endpoints to update connect
// rather than delete them to avoid there is no any endpoint in client.
// Note: reset endpoints will trigger subconn closed, and then trigger reconnect.
// otherwise, the subconn will be retrying in grpc layer and use exponential backoff,
// and it cannot recover as soon as possible.
if time.Since(lastAvailable) > etcdServerDisconnectedTimeout {
log.Info("no available endpoint, try to reset endpoints",
zap.Strings("last-endpoints", lastEps))
resetClientEndpoints(checker.client, lastEps...)

Check warning on line 352 in pkg/utils/etcdutil/etcdutil.go

View check run for this annotation

Codecov / codecov/patch

pkg/utils/etcdutil/etcdutil.go#L350-L352

Added lines #L350 - L352 were not covered by tests
}
} else {
if !typeutil.AreStringSlicesEquivalent(healthyEps, lastEps) {
checker.client.SetEndpoints(healthyEps...)
etcdStateGauge.WithLabelValues("endpoints").Set(float64(len(healthyEps)))
log.Info("update endpoints",
zap.String("num-change", fmt.Sprintf("%d->%d", len(lastEps), len(healthyEps))),
zap.Strings("last-endpoints", lastEps),
zap.Strings("endpoints", checker.client.Endpoints()))
}
lastAvailable = time.Now()
}
}
}(client)

return client, err
}
}

type healthyClient struct {
*clientv3.Client
lastHealth time.Time
func (checker *healthyChecker) close() {
checker.Range(func(key, value interface{}) bool {
client := value.(*healthyClient)
client.Close()
return true
})
}

type healthyChecker struct {
sync.Map // map[string]*healthyClient
tlsConfig *tls.Config
// Reset the etcd client endpoints to trigger reconnect.
func resetClientEndpoints(client *clientv3.Client, endpoints ...string) {
client.SetEndpoints()
client.SetEndpoints(endpoints...)

Check warning on line 380 in pkg/utils/etcdutil/etcdutil.go

View check run for this annotation

Codecov / codecov/patch

pkg/utils/etcdutil/etcdutil.go#L379-L380

Added lines #L379 - L380 were not covered by tests
}

func (checker *healthyChecker) patrol(ctx context.Context) []string {
// See https://github.com/etcd-io/etcd/blob/85b640cee793e25f3837c47200089d14a8392dc7/etcdctl/ctlv3/command/ep_command.go#L105-L145
var wg sync.WaitGroup
count := 0
checker.Range(func(key, value interface{}) bool {
count++
return true
})
hch := make(chan string, count)
healthyList := make([]string, 0, count)
var (
wg sync.WaitGroup
hch = make(chan string, count)
healthyList = make([]string, 0, count)
)
checker.Range(func(key, value interface{}) bool {
wg.Add(1)
go func(key, value interface{}) {
Expand All @@ -386,8 +418,9 @@
return healthyList
}

func (checker *healthyChecker) update(eps []string) {
epMap := make(map[string]struct{})
func (checker *healthyChecker) update() {
eps := syncUrls(checker.client)
epMap := make(map[string]struct{}, len(eps))
for _, ep := range eps {
epMap[ep] = struct{}{}
}
Expand All @@ -401,9 +434,7 @@
checker.removeClient(ep)
}
if time.Since(lastHealthy) > etcdServerDisconnectedTimeout {
// try to reset client endpoint to trigger reconnect
client.(*healthyClient).Client.SetEndpoints([]string{}...)
client.(*healthyClient).Client.SetEndpoints(ep)
resetClientEndpoints(client.(*healthyClient).Client, ep)

Check warning on line 437 in pkg/utils/etcdutil/etcdutil.go

View check run for this annotation

Codecov / codecov/patch

pkg/utils/etcdutil/etcdutil.go#L437

Added line #L437 was not covered by tests
}
continue
}
Expand Down
Loading