Skip to content

Commit

Permalink
Randomly check the etcd leader health to proactively resign
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <ghzpotato@gmail.com>
  • Loading branch information
JmPotato committed Jan 4, 2024
1 parent 61b15c6 commit 223de1e
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 5 deletions.
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,11 @@ error = '''
etcd leader not found
'''

["PD:member:ErrEtcdLeaderNotHealthy"]
error = '''
etcd leader is not healthy, %s
'''

["PD:member:ErrMarshalLeader"]
error = '''
marshal leader failed
Expand Down
7 changes: 4 additions & 3 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,10 @@ var (

// member errors
var (
ErrEtcdLeaderNotFound = errors.Normalize("etcd leader not found", errors.RFCCodeText("PD:member:ErrEtcdLeaderNotFound"))
ErrMarshalLeader = errors.Normalize("marshal leader failed", errors.RFCCodeText("PD:member:ErrMarshalLeader"))
ErrCheckCampaign = errors.Normalize("check campaign failed", errors.RFCCodeText("PD:member:ErrCheckCampaign"))
ErrEtcdLeaderNotFound = errors.Normalize("etcd leader not found", errors.RFCCodeText("PD:member:ErrEtcdLeaderNotFound"))
ErrEtcdLeaderNotHealthy = errors.Normalize("etcd leader is not healthy, %s", errors.RFCCodeText("PD:member:ErrEtcdLeaderNotHealthy"))
ErrMarshalLeader = errors.Normalize("marshal leader failed", errors.RFCCodeText("PD:member:ErrMarshalLeader"))
ErrCheckCampaign = errors.Normalize("check campaign failed", errors.RFCCodeText("PD:member:ErrCheckCampaign"))
)

// core errors
Expand Down
42 changes: 42 additions & 0 deletions pkg/member/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,11 @@ func (m *EmbeddedEtcdMember) GetLeaderPath() string {
return path.Join(m.rootPath, "leader")
}

// GetLeaderHealthPath returns the path of the PD leader health.
func (m *EmbeddedEtcdMember) GetLeaderHealthPath() string {
return path.Join(m.GetLeaderPath(), "health")
}

// GetLeadership returns the leadership of the PD member.
func (m *EmbeddedEtcdMember) GetLeadership() *election.Leadership {
return m.leadership
Expand Down Expand Up @@ -279,6 +284,43 @@ func (m *EmbeddedEtcdMember) ResetLeader() {
m.unsetLeader()
}

// CheckLeaderHealth checks the health of the current etcd leader.
func (m *EmbeddedEtcdMember) CheckLeaderHealth(ctx context.Context, leaseTimeout int64) error {
etcdLeader := m.GetEtcdLeader()
if etcdLeader != m.ID() {
return nil
}
// Double the lease timeout to give a more relaxed check to prevent misjudgment.
timeout := time.Duration(leaseTimeout) * time.Second * 2
// Check the leader writing.
randomHealthString := fmt.Sprintf("%d", rand.Int())
log.Debug("check etcd leader health via writing",
zap.String("health-key", m.GetLeaderHealthPath()),
zap.String("health-value", randomHealthString),
zap.Duration("timeout", timeout))
timeoutCtx, cancel := context.WithTimeout(clientv3.WithRequireLeader(ctx), timeout)
_, err := m.client.Put(timeoutCtx, m.GetLeaderHealthPath(), randomHealthString)
cancel()
if err != nil {
return errs.ErrEtcdLeaderNotHealthy.Wrap(err).FastGenByArgs("unable to write the health key")
}
// Check the leader reading.
log.Debug("check etcd leader health via reading",
zap.String("health-key", m.GetLeaderHealthPath()),
zap.String("health-value", randomHealthString),
zap.Duration("timeout", timeout))
timeoutCtx, cancel = context.WithTimeout(clientv3.WithRequireLeader(ctx), timeout)
resp, err := m.client.Get(timeoutCtx, m.GetLeaderHealthPath())
cancel()
if err != nil {
return errs.ErrEtcdLeaderNotHealthy.Wrap(err).FastGenByArgs("unable to read the health key")
}
if len(resp.Kvs) == 0 || string(resp.Kvs[0].Value) != randomHealthString {
return errs.ErrEtcdLeaderNotHealthy.FastGenByArgs("the health key not found or not matched")
}
return nil
}

// CheckPriority checks whether the etcd leader should be moved according to the priority.
func (m *EmbeddedEtcdMember) CheckPriority(ctx context.Context) {
etcdLeader := m.GetEtcdLeader()
Expand Down
28 changes: 26 additions & 2 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1762,11 +1762,13 @@ func (s *Server) campaignLeader() {
for {
select {
case <-leaderTicker.C:
// Check if the leadership is still avaliavle.
if !s.member.IsLeader() {
log.Info("no longer a leader because lease has expired, pd leader will step down")
return
}
// add failpoint to test exit leader, failpoint judge the member is the give value, then break

// Add failpoint to test exit leader, failpoint judge the member is the give value, then break
failpoint.Inject("exitCampaignLeader", func(val failpoint.Value) {
memberString := val.(string)
memberID, _ := strconv.ParseUint(memberString, 10, 64)
Expand All @@ -1776,13 +1778,35 @@ func (s *Server) campaignLeader() {
}
})

// Check if the etcd leader changed.
etcdLeader := s.member.GetEtcdLeader()
if etcdLeader != s.member.ID() {
log.Info("etcd leader changed, resigns pd leadership", zap.String("old-pd-leader-name", s.Name()))
return
}
// Check if the current etcd leader healthy randomly.
// 0.01 gives a probability of approximately 5 seconds to perform a check once.
if randFloat := rand.Float32(); randFloat < 0.01 {
log.Debug("check etcd leader health in this tick", zap.Float32("rand-float", randFloat))
err := s.member.CheckLeaderHealth(ctx, s.cfg.LeaderLease)
if err != nil {
select {
case <-ctx.Done():
// No need to resign the etcd leader if the server is closed.
log.Info("server is closed during checking leader health")
return
default:
}
log.Error("etcd leader is unhealthy, resign the etcd leader", errs.ZapError(err))
err = s.member.ResignEtcdLeader(ctx, s.member.Name(), "")
if err != nil {
log.Warn("failed to resign the etcd leader after unhealth is detected", errs.ZapError(err))
}
return
}
}
case <-ctx.Done():
// Server is closed and it should return nil.
// Server is closed.
log.Info("server is closed")
return
}
Expand Down

0 comments on commit 223de1e

Please sign in to comment.