Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Applies leader loop fixes from Consul. #3402

Merged
merged 1 commit into from
Nov 3, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 48 additions & 11 deletions nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"math/rand"
"net"
"sync"
"time"

"golang.org/x/time/rate"
Expand All @@ -29,25 +30,50 @@ const (
// replicationRateLimit is used to rate limit how often data is replicated
// between the authoritative region and the local region
replicationRateLimit rate.Limit = 10.0

// barrierWriteTimeout is used to give Raft a chance to process a
// possible loss of leadership event if we are unable to get a barrier
// while leader.
barrierWriteTimeout = 2 * time.Minute
)

// monitorLeadership is used to monitor if we acquire or lose our role
// as the leader in the Raft cluster. There is some work the leader is
// expected to do, so we must react to changes
func (s *Server) monitorLeadership() {
var stopCh chan struct{}
var weAreLeaderCh chan struct{}
var leaderLoop sync.WaitGroup
for {
select {
case isLeader := <-s.leaderCh:
if isLeader {
stopCh = make(chan struct{})
go s.leaderLoop(stopCh)
switch {
case isLeader:
if weAreLeaderCh != nil {
s.logger.Printf("[ERR] nomad: attempted to start the leader loop while running")
continue
}

weAreLeaderCh = make(chan struct{})
leaderLoop.Add(1)
go func(ch chan struct{}) {
defer leaderLoop.Done()
s.leaderLoop(ch)
}(weAreLeaderCh)
s.logger.Printf("[INFO] nomad: cluster leadership acquired")
} else if stopCh != nil {
close(stopCh)
stopCh = nil

default:
if weAreLeaderCh == nil {
s.logger.Printf("[ERR] nomad: attempted to stop the leader loop while not running")
continue
}

s.logger.Printf("[DEBUG] nomad: shutting down leader loop")
close(weAreLeaderCh)
leaderLoop.Wait()
weAreLeaderCh = nil
s.logger.Printf("[INFO] nomad: cluster leadership lost")
}

case <-s.shutdownCh:
return
}
Expand All @@ -57,9 +83,6 @@ func (s *Server) monitorLeadership() {
// leaderLoop runs as long as we are the leader to run various
// maintence activities
func (s *Server) leaderLoop(stopCh chan struct{}) {
// Ensure we revoke leadership on stepdown
defer s.revokeLeadership()

var reconcileCh chan serf.Member
establishedLeader := false

Expand All @@ -70,7 +93,7 @@ RECONCILE:

// Apply a raft barrier to ensure our FSM is caught up
start := time.Now()
barrier := s.raft.Barrier(0)
barrier := s.raft.Barrier(barrierWriteTimeout)
if err := barrier.Error(); err != nil {
s.logger.Printf("[ERR] nomad: failed to wait for barrier: %v", err)
goto WAIT
Expand All @@ -84,6 +107,11 @@ RECONCILE:
goto WAIT
}
establishedLeader = true
defer func() {
if err := s.revokeLeadership(); err != nil {
s.logger.Printf("[ERR] nomad: failed to revoke leadership: %v", err)
}
}()
}

// Reconcile any missing data
Expand All @@ -96,6 +124,15 @@ RECONCILE:
// updates
reconcileCh = s.reconcileCh

// Poll the stop channel to give it priority so we don't waste time
// trying to perform the other operations if we have been asked to shut
// down.
select {
case <-stopCh:
return
default:
}

WAIT:
// Wait until leadership is lost
for {
Expand Down