Skip to content

Commit

Permalink
Randomly check the etcd leader health to proactively resign
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <ghzpotato@gmail.com>
  • Loading branch information
JmPotato committed Jan 4, 2024
1 parent 754095a commit a4ef45f
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 5 deletions.
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,11 @@ error = '''
etcd leader not found
'''

["PD:member:ErrEtcdLeaderNotHealthy"]
error = '''
etcd leader is not healthy, %s
'''

["PD:member:ErrMarshalLeader"]
error = '''
marshal leader failed
Expand Down
7 changes: 4 additions & 3 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,10 @@ var (

// member errors
var (
ErrEtcdLeaderNotFound = errors.Normalize("etcd leader not found", errors.RFCCodeText("PD:member:ErrEtcdLeaderNotFound"))
ErrMarshalLeader = errors.Normalize("marshal leader failed", errors.RFCCodeText("PD:member:ErrMarshalLeader"))
ErrCheckCampaign = errors.Normalize("check campaign failed", errors.RFCCodeText("PD:member:ErrCheckCampaign"))
ErrEtcdLeaderNotFound = errors.Normalize("etcd leader not found", errors.RFCCodeText("PD:member:ErrEtcdLeaderNotFound"))
ErrEtcdLeaderNotHealthy = errors.Normalize("etcd leader is not healthy, %s", errors.RFCCodeText("PD:member:ErrEtcdLeaderNotHealthy"))
ErrMarshalLeader = errors.Normalize("marshal leader failed", errors.RFCCodeText("PD:member:ErrMarshalLeader"))
ErrCheckCampaign = errors.Normalize("check campaign failed", errors.RFCCodeText("PD:member:ErrCheckCampaign"))
)

// core errors
Expand Down
3 changes: 3 additions & 0 deletions pkg/mcs/utils/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ const (
DefaultLeaderLease = int64(3)
// LeaderTickInterval is the interval to check leader
LeaderTickInterval = 50 * time.Millisecond
// LeaderHealthCheckProbability is the expected probability frequency to check leader health
// For example, with the default value 0.01 and tick interval 50ms, the expected probability to check leader health is 5s once.
LeaderHealthCheckProbability = 0.01

// DefaultKeyspaceName is the name reserved for default keyspace.
DefaultKeyspaceName = "DEFAULT"
Expand Down
43 changes: 43 additions & 0 deletions pkg/member/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,11 @@ func (m *EmbeddedEtcdMember) GetLeaderPath() string {
return path.Join(m.rootPath, "leader")
}

// GetLeaderHealthPath returns the path of the PD leader health.
func (m *EmbeddedEtcdMember) GetLeaderHealthPath() string {
return path.Join(m.GetLeaderPath(), "health")
}

// GetLeadership returns the leadership of the PD member.
func (m *EmbeddedEtcdMember) GetLeadership() *election.Leadership {
return m.leadership
Expand Down Expand Up @@ -279,6 +284,44 @@ func (m *EmbeddedEtcdMember) ResetLeader() {
m.unsetLeader()
}

// CheckLeaderHealth checks the health of the current etcd leader.
func (m *EmbeddedEtcdMember) CheckLeaderHealth(ctx context.Context, leaseTimeout int64) error {
etcdLeader := m.GetEtcdLeader()
if etcdLeader != m.ID() {
return nil

Check warning on line 291 in pkg/member/member.go

View check run for this annotation

Codecov / codecov/patch

pkg/member/member.go#L291

Added line #L291 was not covered by tests
}

failpoint.Inject("failCheckLeaderHealth", func(val failpoint.Value) {
if memberName, ok := val.(string); ok && m.Name() == memberName {
failpoint.Return(errs.ErrEtcdLeaderNotHealthy.FastGenByArgs("failed by failpoint"))
}
})

// Double the lease timeout to give a more relaxed check to prevent misjudgment.
timeout := time.Duration(leaseTimeout) * time.Second * 2
// Check the leader writing.
log.Debug("check etcd leader health via writing",
zap.String("health-key", m.GetLeaderHealthPath()),
zap.Duration("timeout", timeout))
timeoutCtx, cancel := context.WithTimeout(clientv3.WithRequireLeader(ctx), timeout)
_, err := m.client.Put(timeoutCtx, m.GetLeaderHealthPath(), "")
cancel()
if err != nil {
return errs.ErrEtcdLeaderNotHealthy.Wrap(err).FastGenByArgs("unable to write the health key")
}
// Check the leader reading.
log.Debug("check etcd leader health via reading",
zap.String("health-key", m.GetLeaderHealthPath()),
zap.Duration("timeout", timeout))
timeoutCtx, cancel = context.WithTimeout(clientv3.WithRequireLeader(ctx), timeout)
_, err = m.client.Get(timeoutCtx, m.GetLeaderHealthPath())
cancel()
if err != nil {
return errs.ErrEtcdLeaderNotHealthy.Wrap(err).FastGenByArgs("unable to read the health key")

Check warning on line 320 in pkg/member/member.go

View check run for this annotation

Codecov / codecov/patch

pkg/member/member.go#L320

Added line #L320 was not covered by tests
}
return nil
}

// CheckPriority checks whether the etcd leader should be moved according to the priority.
func (m *EmbeddedEtcdMember) CheckPriority(ctx context.Context) {
etcdLeader := m.GetEtcdLeader()
Expand Down
28 changes: 26 additions & 2 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1762,11 +1762,35 @@ func (s *Server) campaignLeader() {
for {
select {
case <-leaderTicker.C:
// Check if the current etcd leader healthy randomly. This should be performed before checking
// the leadership to prevent the situation that the etcd leader is unhealthy but the same PD
// member keeps campaigning.
if randFloat := rand.Float32(); randFloat < mcs.LeaderHealthCheckProbability {
log.Debug("check etcd leader health in this tick", zap.Float32("rand-float", randFloat))
err := s.member.CheckLeaderHealth(ctx, s.cfg.LeaderLease)
if err != nil {
select {
case <-ctx.Done():
// No need to resign the etcd leader if the server is closed.
log.Info("server is closed during checking leader health")
return
default:
}
log.Error("etcd leader is unhealthy, resign the etcd leader", errs.ZapError(err))
err = s.member.ResignEtcdLeader(ctx, s.member.Name(), "")
if err != nil {
log.Warn("failed to resign the etcd leader after unhealth is detected", errs.ZapError(err))

Check warning on line 1782 in server/server.go

View check run for this annotation

Codecov / codecov/patch

server/server.go#L1782

Added line #L1782 was not covered by tests
}
return
}
}
// Check if the leadership is still avaliavle.
if !s.member.IsLeader() {
log.Info("no longer a leader because lease has expired, pd leader will step down")
return
}
// add failpoint to test exit leader, failpoint judge the member is the give value, then break

// Add failpoint to test exit leader, failpoint judge the member is the give value, then break
failpoint.Inject("exitCampaignLeader", func(val failpoint.Value) {
memberString := val.(string)
memberID, _ := strconv.ParseUint(memberString, 10, 64)
Expand All @@ -1776,13 +1800,13 @@ func (s *Server) campaignLeader() {
}
})

// Check if the etcd leader changed.
etcdLeader := s.member.GetEtcdLeader()
if etcdLeader != s.member.ID() {
log.Info("etcd leader changed, resigns pd leadership", zap.String("old-pd-leader-name", s.Name()))
return
}
case <-ctx.Done():
// Server is closed and it should return nil.
log.Info("server is closed")
return
}
Expand Down
25 changes: 25 additions & 0 deletions tests/server/member/member_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,3 +417,28 @@ func sendRequest(re *require.Assertions, wg *sync.WaitGroup, done <-chan bool, a
time.Sleep(10 * time.Millisecond)
}
}

func TestCheckLeaderHealth(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cluster, err := tests.NewTestCluster(ctx, 2)
defer cluster.Destroy()
re.NoError(err)

err = cluster.RunInitialServers()
re.NoError(err)
leader := cluster.WaitLeader()
re.NotEmpty(leader)

re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/member/failCheckLeaderHealth", fmt.Sprintf(`return("%s")`, leader)))
var newLeader string
testutil.Eventually(re, func() bool {
newLeader = cluster.WaitLeader()
return newLeader != leader
})
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/member/failCheckLeaderHealth"))
newLeader = cluster.WaitLeader()
re.NotEmpty(newLeader)
re.NotEqual(leader, newLeader)
}

0 comments on commit a4ef45f

Please sign in to comment.