Skip to content

Commit

Permalink
election, member, tso: fix the leader check bug and refine the logic (#…
Browse files Browse the repository at this point in the history
…3287) (#3289)

* cherry pick #3287 to release-5.0-rc

Signed-off-by: ti-srebot <ti-srebot@pingcap.com>

* add test for release-5.0

Signed-off-by: nolouch <nolouch@gmail.com>

Co-authored-by: JmPotato <ghzpotato@gmail.com>
Co-authored-by: nolouch <nolouch@gmail.com>
Co-authored-by: Ti Prow Robot <71242396+ti-community-prow-bot@users.noreply.github.com>
  • Loading branch information
4 people authored Dec 22, 2020
1 parent c268837 commit fa65de5
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 52 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/pd-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ on:
branches:
- master
- release-4.0
- release-5.0-rc
pull_request:
branches:
- master
- release-4.0
- release-5.0-rc
name: PD Tests
jobs:
chunks:
Expand Down
2 changes: 2 additions & 0 deletions server/election/leadership.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,11 @@ func (ls *Leadership) Campaign(leaseTimeout int64, leaderData string) error {
Commit()
log.Info("check campaign resp", zap.Any("resp", resp))
if err != nil {
ls.getLease().Close()
return errs.ErrEtcdTxn.Wrap(err).GenWithStackByCause()
}
if !resp.Succeeded {
ls.getLease().Close()
return errs.ErrEtcdTxn.FastGenByArgs()
}
log.Info("write leaderData to leaderPath ok", zap.String("leaderPath", ls.leaderKey), zap.String("purpose", ls.purpose))
Expand Down
6 changes: 3 additions & 3 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -980,7 +980,7 @@ func (s *Server) SyncMaxTS(ctx context.Context, request *pdpb.SyncMaxTSRequest)
for _, allocator := range allocatorLeaders {
// No longer leader, just skip here because
// the global allocator will check if all DCs are handled.
if !allocator.IsStillAllocatorLeader() {
if !allocator.IsAllocatorLeader() {
continue
}
currentLocalTSO, err := allocator.GetCurrentTSO()
Expand All @@ -1000,7 +1000,7 @@ func (s *Server) SyncMaxTS(ctx context.Context, request *pdpb.SyncMaxTSRequest)
}
// The second phase of synchronization: do the writing
for _, allocator := range allocatorLeaders {
if !allocator.IsStillAllocatorLeader() {
if !allocator.IsAllocatorLeader() {
continue
}
if err := allocator.WriteTSO(request.GetMaxTs()); err != nil {
Expand Down Expand Up @@ -1033,7 +1033,7 @@ func (s *Server) GetDCLocations(ctx context.Context, request *pdpb.GetDCLocation
if err := s.validateInternalRequest(request.GetHeader(), false); err != nil {
return nil, err
}
if !s.member.IsStillLeader() {
if !s.member.IsLeader() {
return nil, fmt.Errorf("receiving pd member[%v] is not pd leader", s.member.ID())
}
return &pdpb.GetDCLocationsResponse{
Expand Down
11 changes: 2 additions & 9 deletions server/member/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,9 @@ func (m *Member) Client() *clientv3.Client {
return m.client
}

// IsLeader returns whether the server is PD leader or not.
// IsLeader returns whether the server is PD leader or not by checking its leadership's lease and leader info.
func (m *Member) IsLeader() bool {
// If server is not started. Both leaderID and ID could be 0.
return m.GetLeaderID() == m.ID()
return m.leadership.Check() && m.GetLeader().GetMemberId() == m.member.GetMemberId()
}

// GetLeaderID returns current PD leader's member ID.
Expand Down Expand Up @@ -154,12 +153,6 @@ func (m *Member) KeepLeader(ctx context.Context) {
m.leadership.Keep(ctx)
}

// IsStillLeader returns whether the PD leader is still a PD leader
// by checking its leadership's lease.
func (m *Member) IsStillLeader() bool {
return m.leadership.Check()
}

// CheckLeader checks returns true if it is needed to check later.
func (m *Member) CheckLeader() (*pdpb.Member, int64, bool) {
if m.GetEtcdLeader() == 0 {
Expand Down
2 changes: 1 addition & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1234,7 +1234,7 @@ func (s *Server) campaignLeader() {
for {
select {
case <-leaderTicker.C:
if !s.member.IsStillLeader() {
if !s.member.IsLeader() {
log.Info("no longer a leader because lease has expired, pd leader will step down")
return
}
Expand Down
69 changes: 36 additions & 33 deletions server/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,39 @@ func (am *AllocatorManager) allocatorLeaderLoop(ctx context.Context, allocator *
default:
}

// Check whether the Local TSO Allocator has the leader already
allocatorLeader, rev, checkAgain := allocator.CheckAllocatorLeader()
if checkAgain {
continue
}
if allocatorLeader != nil {
log.Info("start to watch allocator leader",
zap.Stringer(fmt.Sprintf("%s-allocator-leader", allocator.dcLocation), allocatorLeader),
zap.String("local-tso-allocator-name", am.member.Member().Name))
// WatchAllocatorLeader will keep looping and never return unless the Local TSO Allocator leader has changed.
allocator.WatchAllocatorLeader(ctx, allocatorLeader, rev)
log.Info("local tso allocator leader has changed, try to re-campaign a local tso allocator leader",
zap.String("dc-location", allocator.dcLocation))
}

// Check the next-leader key
nextLeader, err := am.getNextLeaderID(allocator.dcLocation)
if err != nil {
log.Error("get next leader from etcd failed",
zap.String("dc-location", allocator.dcLocation),
errs.ZapError(err))
time.Sleep(200 * time.Millisecond)
continue
}
if nextLeader != 0 && nextLeader != am.member.ID() {
log.Info("skip campaigning of the local tso allocator leader and check later",
zap.String("server-name", am.member.Member().Name),
zap.Uint64("server-id", am.member.ID()),
zap.Uint64("next-leader-id", nextLeader))
time.Sleep(200 * time.Millisecond)
continue
}

// Make sure the leader is aware of this new dc-location in order to make the
// Global TSO synchronization can cover up this dc-location.
ok, suffix, err := am.isLeaderAwareOfDCLocation(ctx, allocator.dcLocation)
Expand Down Expand Up @@ -332,36 +365,6 @@ func (am *AllocatorManager) allocatorLeaderLoop(ctx context.Context, allocator *
}
}

allocatorLeader, rev, checkAgain := allocator.CheckAllocatorLeader()
if checkAgain {
continue
}
if allocatorLeader != nil {
log.Info("start to watch allocator leader",
zap.Stringer(fmt.Sprintf("%s-allocator-leader", allocator.dcLocation), allocatorLeader),
zap.String("local-tso-allocator-name", am.member.Member().Name))
// WatchAllocatorLeader will keep looping and never return unless the Local TSO Allocator leader has changed.
allocator.WatchAllocatorLeader(ctx, allocatorLeader, rev)
log.Info("local tso allocator leader has changed, try to re-campaign a local tso allocator leader",
zap.String("dc-location", allocator.dcLocation))
}
// Check the next-leader key
nextLeader, err := am.getNextLeaderID(allocator.dcLocation)
if err != nil {
log.Error("get next leader from etcd failed",
zap.String("dc-location", allocator.dcLocation),
errs.ZapError(err))
time.Sleep(200 * time.Millisecond)
continue
}
if nextLeader != 0 && nextLeader != am.member.ID() {
log.Info("skip campaigning of the local tso allocator leader and check later",
zap.String("server-name", am.member.Member().Name),
zap.Uint64("server-id", am.member.ID()),
zap.Uint64("next-leader-id", nextLeader))
time.Sleep(200 * time.Millisecond)
continue
}
am.campaignAllocatorLeader(ctx, allocator, suffix)
}
}
Expand Down Expand Up @@ -412,7 +415,7 @@ func (am *AllocatorManager) campaignAllocatorLeader(loopCtx context.Context, all
for {
select {
case <-leaderTicker.C:
if !allocator.IsStillAllocatorLeader() {
if !allocator.IsAllocatorLeader() {
log.Info("no longer a local tso allocator leader because lease has expired, local tso allocator leader will step down",
zap.String("dc-location", allocator.dcLocation),
zap.Int("suffix", suffix),
Expand Down Expand Up @@ -551,7 +554,7 @@ func (am *AllocatorManager) ClusterDCLocationChecker() {
}
}
// Only leader can write the TSO suffix to etcd in order to make it consistent in the cluster
if am.member.IsStillLeader() {
if am.member.IsLeader() {
for dcLocation, info := range am.mu.clusterDCLocations {
if info.suffix > 0 {
continue
Expand Down Expand Up @@ -883,7 +886,7 @@ func (am *AllocatorManager) isLeaderAwareOfDCLocation(ctx context.Context, dcLoc
}

func (am *AllocatorManager) getLeaderDCLocations(ctx context.Context) (map[string]int32, error) {
if am.member.IsStillLeader() {
if am.member.IsLeader() {
return am.GetSuffixDCLocations(), nil
}

Expand Down
8 changes: 4 additions & 4 deletions server/tso/local_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,10 @@ func (lta *LocalTSOAllocator) KeepAllocatorLeader(ctx context.Context) {
lta.leadership.Keep(ctx)
}

// IsStillAllocatorLeader returns whether the allocator is still a
// Local TSO Allocator leader by checking its leadership's lease.
func (lta *LocalTSOAllocator) IsStillAllocatorLeader() bool {
return lta.leadership.Check()
// IsAllocatorLeader returns whether the allocator is still a
// Local TSO Allocator leader by checking its leadership's lease and leader info.
func (lta *LocalTSOAllocator) IsAllocatorLeader() bool {
return lta.leadership.Check() && lta.GetAllocatorLeader().GetMemberId() == lta.GetMember().GetMemberId()
}

// isSameLeader checks whether a server is the leader itself.
Expand Down
4 changes: 2 additions & 2 deletions tests/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func (s *TestServer) IsAllocatorLeader(dcLocation string) bool {
if err != nil {
return false
}
return !s.server.IsClosed() && allocator.(*tso.LocalTSOAllocator).IsStillAllocatorLeader()
return !s.server.IsClosed() && allocator.(*tso.LocalTSOAllocator).IsAllocatorLeader()
}

// GetEtcdLeader returns the builtin etcd leader.
Expand Down Expand Up @@ -363,7 +363,7 @@ func (s *TestServer) BootstrapCluster() error {
// If it exceeds the maximum number of loops, it will return nil.
func (s *TestServer) WaitLeader() bool {
for i := 0; i < 100; i++ {
if s.server.GetMember().IsStillLeader() {
if s.server.GetMember().IsLeader() {
return true
}
time.Sleep(WaitLeaderCheckInterval)
Expand Down

0 comments on commit fa65de5

Please sign in to comment.