Skip to content

Commit

Permalink
Merge pull request #3402 from hashicorp/leader-loop
Browse files Browse the repository at this point in the history
Applies leader loop fixes from Consul.
  • Loading branch information
dadgar committed Nov 3, 2017
2 parents 8a3fc18 + afdc537 commit eca9e09
Showing 1 changed file with 48 additions and 11 deletions.
59 changes: 48 additions & 11 deletions nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"math/rand"
"net"
"sync"
"time"

"golang.org/x/time/rate"
Expand All @@ -29,25 +30,50 @@ const (
// replicationRateLimit is used to rate limit how often data is replicated
// between the authoritative region and the local region
replicationRateLimit rate.Limit = 10.0

// barrierWriteTimeout is used to give Raft a chance to process a
// possible loss of leadership event if we are unable to get a barrier
// while leader.
barrierWriteTimeout = 2 * time.Minute
)

// monitorLeadership is used to monitor if we acquire or lose our role
// as the leader in the Raft cluster. There is some work the leader is
// expected to do, so we must react to changes
func (s *Server) monitorLeadership() {
var stopCh chan struct{}
var weAreLeaderCh chan struct{}
var leaderLoop sync.WaitGroup
for {
select {
case isLeader := <-s.leaderCh:
if isLeader {
stopCh = make(chan struct{})
go s.leaderLoop(stopCh)
switch {
case isLeader:
if weAreLeaderCh != nil {
s.logger.Printf("[ERR] nomad: attempted to start the leader loop while running")
continue
}

weAreLeaderCh = make(chan struct{})
leaderLoop.Add(1)
go func(ch chan struct{}) {
defer leaderLoop.Done()
s.leaderLoop(ch)
}(weAreLeaderCh)
s.logger.Printf("[INFO] nomad: cluster leadership acquired")
} else if stopCh != nil {
close(stopCh)
stopCh = nil

default:
if weAreLeaderCh == nil {
s.logger.Printf("[ERR] nomad: attempted to stop the leader loop while not running")
continue
}

s.logger.Printf("[DEBUG] nomad: shutting down leader loop")
close(weAreLeaderCh)
leaderLoop.Wait()
weAreLeaderCh = nil
s.logger.Printf("[INFO] nomad: cluster leadership lost")
}

case <-s.shutdownCh:
return
}
Expand All @@ -57,9 +83,6 @@ func (s *Server) monitorLeadership() {
// leaderLoop runs as long as we are the leader to run various
// maintence activities
func (s *Server) leaderLoop(stopCh chan struct{}) {
// Ensure we revoke leadership on stepdown
defer s.revokeLeadership()

var reconcileCh chan serf.Member
establishedLeader := false

Expand All @@ -70,7 +93,7 @@ RECONCILE:

// Apply a raft barrier to ensure our FSM is caught up
start := time.Now()
barrier := s.raft.Barrier(0)
barrier := s.raft.Barrier(barrierWriteTimeout)
if err := barrier.Error(); err != nil {
s.logger.Printf("[ERR] nomad: failed to wait for barrier: %v", err)
goto WAIT
Expand All @@ -84,6 +107,11 @@ RECONCILE:
goto WAIT
}
establishedLeader = true
defer func() {
if err := s.revokeLeadership(); err != nil {
s.logger.Printf("[ERR] nomad: failed to revoke leadership: %v", err)
}
}()
}

// Reconcile any missing data
Expand All @@ -96,6 +124,15 @@ RECONCILE:
// updates
reconcileCh = s.reconcileCh

// Poll the stop channel to give it priority so we don't waste time
// trying to perform the other operations if we have been asked to shut
// down.
select {
case <-stopCh:
return
default:
}

WAIT:
// Wait until leadership is lost
for {
Expand Down

0 comments on commit eca9e09

Please sign in to comment.