Skip to content

Commit

Permalink
Merge pull request #6977 from hashicorp/b-leadership-flapping-2
Browse files Browse the repository at this point in the history
Handle Nomad leadership flapping (attempt 2)
  • Loading branch information
Mahmood Ali committed Jan 28, 2020
2 parents c68947b + 8ae03c3 commit 771c8ff
Show file tree
Hide file tree
Showing 4 changed files with 318 additions and 29 deletions.
76 changes: 49 additions & 27 deletions nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,37 +59,59 @@ var defaultSchedulerConfig = &structs.SchedulerConfiguration{
func (s *Server) monitorLeadership() {
var weAreLeaderCh chan struct{}
var leaderLoop sync.WaitGroup

leaderStep := func(isLeader bool) {
if isLeader {
if weAreLeaderCh != nil {
s.logger.Error("attempted to start the leader loop while running")
return
}

weAreLeaderCh = make(chan struct{})
leaderLoop.Add(1)
go func(ch chan struct{}) {
defer leaderLoop.Done()
s.leaderLoop(ch)
}(weAreLeaderCh)
s.logger.Info("cluster leadership acquired")
return
}

if weAreLeaderCh == nil {
s.logger.Error("attempted to stop the leader loop while not running")
return
}

s.logger.Debug("shutting down leader loop")
close(weAreLeaderCh)
leaderLoop.Wait()
weAreLeaderCh = nil
s.logger.Info("cluster leadership lost")
}

wasLeader := false
for {
select {
case isLeader := <-s.leaderCh:
switch {
case isLeader:
if weAreLeaderCh != nil {
s.logger.Error("attempted to start the leader loop while running")
continue
}

weAreLeaderCh = make(chan struct{})
leaderLoop.Add(1)
go func(ch chan struct{}) {
defer leaderLoop.Done()
s.leaderLoop(ch)
}(weAreLeaderCh)
s.logger.Info("cluster leadership acquired")

default:
if weAreLeaderCh == nil {
s.logger.Error("attempted to stop the leader loop while not running")
continue
}

s.logger.Debug("shutting down leader loop")
close(weAreLeaderCh)
leaderLoop.Wait()
weAreLeaderCh = nil
s.logger.Info("cluster leadership lost")
if wasLeader != isLeader {
wasLeader = isLeader
// normal case where we went through a transition
leaderStep(isLeader)
} else if wasLeader && isLeader {
// Server lost but then gained leadership immediately.
// During this time, this server may have received
// Raft transitions that haven't been applied to the FSM
// yet.
// Ensure that that FSM caught up and eval queues are refreshed
s.logger.Warn("cluster leadership lost and gained leadership immediately. Could indicate network issues, memory paging, or high CPU load.")

leaderStep(false)
leaderStep(true)
} else {
// Server gained but lost leadership immediately
// before it reacted; nothing to do, move on
s.logger.Warn("cluster leadership gained and lost leadership immediately. Could indicate network issues, memory paging, or high CPU load.")
}

case <-s.shutdownCh:
return
}
Expand Down
4 changes: 2 additions & 2 deletions nomad/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1234,10 +1234,10 @@ func (s *Server) setupRaft() error {
}
}

// Setup the leader channel
// Setup the leader channel; that keeps the latest leadership alone
leaderCh := make(chan bool, 1)
s.config.RaftConfig.NotifyCh = leaderCh
s.leaderCh = leaderCh
s.leaderCh = dropButLastChannel(leaderCh, s.shutdownCh)

// Setup the Raft store
s.raft, err = raft.NewRaft(s.config.RaftConfig, s.fsm, log, stable, snap, trans)
Expand Down
93 changes: 93 additions & 0 deletions nomad/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,3 +301,96 @@ func getAlloc(state AllocGetter, allocID string) (*structs.Allocation, error) {

return alloc, nil
}

// dropButLastChannel returns a channel that drops all but last value from sourceCh.
//
// Useful for aggressively consuming sourceCh when intermediate values aren't relevant.
//
// This function propagates values to result quickly and drops intermediate messages
// in best effort basis. Golang scheduler may delay delivery or result in extra
// deliveries.
//
// Consider this function for example:
//
// ```
// src := make(chan bool)
// dst := dropButLastChannel(src, nil)
//
// go func() {
// src <- true
// src <- false
// }()
//
// // v can be `true` here but is very unlikely
// v := <-dst
// ```
//
func dropButLastChannel(sourceCh <-chan bool, shutdownCh <-chan struct{}) chan bool {
// buffer the most recent result
dst := make(chan bool)

go func() {
// last value received
lv := false
// ok source was closed
ok := false
// received message since last delivery to destination
messageReceived := false

DEQUE_SOURCE:
// wait for first message
select {
case lv, ok = <-sourceCh:
if !ok {
goto SOURCE_CLOSED
}
messageReceived = true
goto ENQUEUE_DST
case <-shutdownCh:
return
}

ENQUEUE_DST:
// prioritize draining source first dequeue without blocking
for {
select {
case lv, ok = <-sourceCh:
if !ok {
goto SOURCE_CLOSED
}
messageReceived = true
default:
break ENQUEUE_DST
}
}

// attempt to enqueue but keep monitoring source channel
select {
case lv, ok = <-sourceCh:
if !ok {
goto SOURCE_CLOSED
}
messageReceived = true
goto ENQUEUE_DST
case dst <- lv:
messageReceived = false
// enqueued value; back to dequeing from source
goto DEQUE_SOURCE
case <-shutdownCh:
return
}

SOURCE_CLOSED:
if messageReceived {
select {
case dst <- lv:
case <-shutdownCh:
return
}
}
close(dst)
}()

return dst

}
174 changes: 174 additions & 0 deletions nomad/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"net"
"reflect"
"testing"
"time"

version "github.com/hashicorp/go-version"
"github.com/hashicorp/nomad/helper/uuid"
Expand Down Expand Up @@ -258,3 +259,176 @@ func TestMaxUint64(t *testing.T) {
t.Fatalf("bad")
}
}

func TestDropButLastChannelDropsValues(t *testing.T) {
sourceCh := make(chan bool)
shutdownCh := make(chan struct{})
defer close(shutdownCh)

dstCh := dropButLastChannel(sourceCh, shutdownCh)

// timeout duration for any channel propagation delay
timeoutDuration := 5 * time.Millisecond

// test that dstCh doesn't emit anything initially
select {
case <-dstCh:
require.Fail(t, "received a message unexpectedly")
case <-time.After(timeoutDuration):
// yay no message - it could have been a default: but
// checking for goroutine effect
}

sourceCh <- false
select {
case v := <-dstCh:
require.False(t, v, "unexpected value from dstCh Ch")
case <-time.After(timeoutDuration):
require.Fail(t, "timed out waiting for source->dstCh propagation")
}

// channel is drained now
select {
case v := <-dstCh:
require.Failf(t, "received a message unexpectedly", "value: %v", v)
case <-time.After(timeoutDuration):
// yay no message - it could have been a default: but
// checking for goroutine effect
}

// now enqueue many messages and ensure only last one is received
// enqueueing should be fast!
sourceCh <- false
sourceCh <- false
sourceCh <- false
sourceCh <- false
sourceCh <- true

// I suspect that dstCh may contain a stale (i.e. `false`) value if golang executes
// this select before the implementation goroutine dequeues last value.
//
// However, never got it to fail in test - so leaving it now to see if it ever fails;
// and if/when test fails, we can learn of how much of an issue it is and adjust
select {
case v := <-dstCh:
require.True(t, v, "unexpected value from dstCh Ch")
case <-time.After(timeoutDuration):
require.Fail(t, "timed out waiting for source->dstCh propagation")
}

sourceCh <- true
sourceCh <- true
sourceCh <- true
sourceCh <- true
sourceCh <- true
sourceCh <- false
select {
case v := <-dstCh:
require.False(t, v, "unexpected value from dstCh Ch")
case <-time.After(timeoutDuration):
require.Fail(t, "timed out waiting for source->dstCh propagation")
}
}

// TestDropButLastChannel_DeliversMessages asserts that last
// message is always delivered, some messages are dropped but never
// introduce new messages.
// On tight loop, receivers may get some intermediary messages.
func TestDropButLastChannel_DeliversMessages(t *testing.T) {
sourceCh := make(chan bool)
shutdownCh := make(chan struct{})
defer close(shutdownCh)

dstCh := dropButLastChannel(sourceCh, shutdownCh)

// timeout duration for any channel propagation delay
timeoutDuration := 5 * time.Millisecond

sentMessages := 100
go func() {
for i := 0; i < sentMessages-1; i++ {
sourceCh <- true
}
sourceCh <- false
}()

receivedTrue, receivedFalse := 0, 0
var lastReceived *bool

RECEIVE_LOOP:
for {
select {
case v := <-dstCh:
lastReceived = &v
if v {
receivedTrue++
} else {
receivedFalse++
}

case <-time.After(timeoutDuration):
break RECEIVE_LOOP
}
}

t.Logf("receiver got %v out %v true messages, and %v out of %v false messages",
receivedTrue, sentMessages-1, receivedFalse, 1)

require.NotNil(t, lastReceived)
require.False(t, *lastReceived)
require.Equal(t, 1, receivedFalse)
require.LessOrEqual(t, receivedTrue, sentMessages-1)
}

// TestDropButLastChannel_DeliversMessages_Close asserts that last
// message is always delivered, some messages are dropped but never
// introduce new messages, even with a closed signal.
func TestDropButLastChannel_DeliversMessages_Close(t *testing.T) {
sourceCh := make(chan bool)
shutdownCh := make(chan struct{})
defer close(shutdownCh)

dstCh := dropButLastChannel(sourceCh, shutdownCh)

// timeout duration for any channel propagation delay
timeoutDuration := 5 * time.Millisecond

sentMessages := 100
go func() {
for i := 0; i < sentMessages-1; i++ {
sourceCh <- true
}
sourceCh <- false
close(sourceCh)
}()

receivedTrue, receivedFalse := 0, 0
var lastReceived *bool

RECEIVE_LOOP:
for {
select {
case v, ok := <-dstCh:
if !ok {
break RECEIVE_LOOP
}
lastReceived = &v
if v {
receivedTrue++
} else {
receivedFalse++
}

case <-time.After(timeoutDuration):
require.Fail(t, "timed out while waiting for messages")
}
}

t.Logf("receiver got %v out %v true messages, and %v out of %v false messages",
receivedTrue, sentMessages-1, receivedFalse, 1)

require.NotNil(t, lastReceived)
require.False(t, *lastReceived)
require.Equal(t, 1, receivedFalse)
require.LessOrEqual(t, receivedTrue, sentMessages-1)
}

0 comments on commit 771c8ff

Please sign in to comment.