Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle Nomad leadership flapping (attempt 2) #6977

Merged
merged 6 commits into from
Jan 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 49 additions & 27 deletions nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,37 +59,59 @@ var defaultSchedulerConfig = &structs.SchedulerConfiguration{
func (s *Server) monitorLeadership() {
var weAreLeaderCh chan struct{}
var leaderLoop sync.WaitGroup

leaderStep := func(isLeader bool) {
if isLeader {
if weAreLeaderCh != nil {
s.logger.Error("attempted to start the leader loop while running")
return
}

weAreLeaderCh = make(chan struct{})
leaderLoop.Add(1)
go func(ch chan struct{}) {
defer leaderLoop.Done()
s.leaderLoop(ch)
}(weAreLeaderCh)
s.logger.Info("cluster leadership acquired")
return
}

if weAreLeaderCh == nil {
s.logger.Error("attempted to stop the leader loop while not running")
return
}

s.logger.Debug("shutting down leader loop")
close(weAreLeaderCh)
leaderLoop.Wait()
weAreLeaderCh = nil
s.logger.Info("cluster leadership lost")
}

wasLeader := false
for {
select {
case isLeader := <-s.leaderCh:
switch {
case isLeader:
if weAreLeaderCh != nil {
s.logger.Error("attempted to start the leader loop while running")
continue
}

weAreLeaderCh = make(chan struct{})
leaderLoop.Add(1)
go func(ch chan struct{}) {
defer leaderLoop.Done()
s.leaderLoop(ch)
}(weAreLeaderCh)
s.logger.Info("cluster leadership acquired")

default:
if weAreLeaderCh == nil {
s.logger.Error("attempted to stop the leader loop while not running")
continue
}

s.logger.Debug("shutting down leader loop")
close(weAreLeaderCh)
leaderLoop.Wait()
weAreLeaderCh = nil
s.logger.Info("cluster leadership lost")
if wasLeader != isLeader {
wasLeader = isLeader
// normal case where we went through a transition
leaderStep(isLeader)
} else if wasLeader && isLeader {
// Server lost but then gained leadership immediately.
// During this time, this server may have received
// Raft transitions that haven't been applied to the FSM
// yet.
// Ensure that that FSM caught up and eval queues are refreshed
s.logger.Warn("cluster leadership lost and gained leadership immediately. Could indicate network issues, memory paging, or high CPU load.")

leaderStep(false)
leaderStep(true)
} else {
// Server gained but lost leadership immediately
// before it reacted; nothing to do, move on
s.logger.Warn("cluster leadership gained and lost leadership immediately. Could indicate network issues, memory paging, or high CPU load.")
}

case <-s.shutdownCh:
return
}
Expand Down
4 changes: 2 additions & 2 deletions nomad/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1234,10 +1234,10 @@ func (s *Server) setupRaft() error {
}
}

// Setup the leader channel
// Setup the leader channel; that keeps the latest leadership alone
leaderCh := make(chan bool, 1)
s.config.RaftConfig.NotifyCh = leaderCh
s.leaderCh = leaderCh
s.leaderCh = dropButLastChannel(leaderCh, s.shutdownCh)

// Setup the Raft store
s.raft, err = raft.NewRaft(s.config.RaftConfig, s.fsm, log, stable, snap, trans)
Expand Down
93 changes: 93 additions & 0 deletions nomad/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,3 +301,96 @@ func getAlloc(state AllocGetter, allocID string) (*structs.Allocation, error) {

return alloc, nil
}

// dropButLastChannel returns a channel that drops all but last value from sourceCh.
//
// Useful for aggressively consuming sourceCh when intermediate values aren't relevant.
//
// This function propagates values to result quickly and drops intermediate messages
// in best effort basis. Golang scheduler may delay delivery or result in extra
// deliveries.
//
// Consider this function for example:
//
// ```
// src := make(chan bool)
// dst := dropButLastChannel(src, nil)
//
// go func() {
// src <- true
// src <- false
// }()
//
// // v can be `true` here but is very unlikely
// v := <-dst
// ```
//
func dropButLastChannel(sourceCh <-chan bool, shutdownCh <-chan struct{}) chan bool {
// buffer the most recent result
dst := make(chan bool)

go func() {
// last value received
lv := false
// ok source was closed
ok := false
// received message since last delivery to destination
messageReceived := false

DEQUE_SOURCE:
// wait for first message
select {
case lv, ok = <-sourceCh:
if !ok {
goto SOURCE_CLOSED
}
messageReceived = true
goto ENQUEUE_DST
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary.

Suggested change
goto ENQUEUE_DST

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's unnecessary indeed. I'd like to keep though just because I find it easier to see all state machine transitions in goto statements.

case <-shutdownCh:
return
}

ENQUEUE_DST:
// prioritize draining source first dequeue without blocking
for {
select {
case lv, ok = <-sourceCh:
if !ok {
goto SOURCE_CLOSED
}
messageReceived = true
default:
break ENQUEUE_DST
}
}

// attempt to enqueue but keep monitoring source channel
select {
case lv, ok = <-sourceCh:
if !ok {
goto SOURCE_CLOSED
}
messageReceived = true
goto ENQUEUE_DST
case dst <- lv:
messageReceived = false
// enqueued value; back to dequeing from source
goto DEQUE_SOURCE
case <-shutdownCh:
return
}

SOURCE_CLOSED:
if messageReceived {
select {
case dst <- lv:
case <-shutdownCh:
return
}
}
close(dst)
}()

return dst

}
174 changes: 174 additions & 0 deletions nomad/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"net"
"reflect"
"testing"
"time"

version "github.com/hashicorp/go-version"
"github.com/hashicorp/nomad/helper/uuid"
Expand Down Expand Up @@ -258,3 +259,176 @@ func TestMaxUint64(t *testing.T) {
t.Fatalf("bad")
}
}

func TestDropButLastChannelDropsValues(t *testing.T) {
sourceCh := make(chan bool)
shutdownCh := make(chan struct{})
defer close(shutdownCh)

dstCh := dropButLastChannel(sourceCh, shutdownCh)

// timeout duration for any channel propagation delay
timeoutDuration := 5 * time.Millisecond

// test that dstCh doesn't emit anything initially
select {
case <-dstCh:
require.Fail(t, "received a message unexpectedly")
case <-time.After(timeoutDuration):
// yay no message - it could have been a default: but
// checking for goroutine effect
}

sourceCh <- false
select {
case v := <-dstCh:
require.False(t, v, "unexpected value from dstCh Ch")
case <-time.After(timeoutDuration):
require.Fail(t, "timed out waiting for source->dstCh propagation")
}

// channel is drained now
select {
case v := <-dstCh:
require.Failf(t, "received a message unexpectedly", "value: %v", v)
case <-time.After(timeoutDuration):
// yay no message - it could have been a default: but
// checking for goroutine effect
}

// now enqueue many messages and ensure only last one is received
// enqueueing should be fast!
sourceCh <- false
sourceCh <- false
sourceCh <- false
sourceCh <- false
sourceCh <- true

// I suspect that dstCh may contain a stale (i.e. `false`) value if golang executes
// this select before the implementation goroutine dequeues last value.
//
// However, never got it to fail in test - so leaving it now to see if it ever fails;
// and if/when test fails, we can learn of how much of an issue it is and adjust
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can fail it pretty trivially if we push the values into the channel concurrently to the test thread, but I'm not sure that tells us anything other than we didn't get a chance to consume everything on the channel. If we pull all the values off, we're fine:

package main

import (
	"fmt"
	"testing"
	"time"

	"github.com/stretchr/testify/assert"
)

func TestDropButLastChannel(t *testing.T) {

	testFunc := func(t *testing.T) {
		t.Parallel()
		shutdownCh := make(chan struct{})

		src := make(chan bool)
		dst := dropButLastChannel(src, shutdownCh)

		timeoutDuration := 1 * time.Millisecond

		go func() {
			src <- false
			src <- false
			src <- false
			src <- false
			src <- false
			src <- false
			src <- true
			src <- false
			src <- true
		}()

		var v bool
	BREAK:
		for {
			select {
			case v = <-dst:
				fmt.Println("ok")
			case <-time.After(timeoutDuration):
				break BREAK
			}
		}

		assert.True(t, v)
		close(shutdownCh)
	}

	for i := 0; i < 1000; i++ {
		t.Run(fmt.Sprintf("test-%d", i), testFunc)
	}
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct - if it's running on different goroutine, we have no guarantees of delivery. This feels like another test to add.

Here, I wanted to test that intermediate messages get sent but get dropped when no receive is happening on the channel - so I made the sends happen in the same goroutine. Though, in current form, we still cannot 100% guarantee that the first message we receive is the last sent message, but this hasn't happened in practice yet, hence my comment.

Your test is good to have in that we should check that ultimately, we always send the last message last.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW - seeing your test, I realized I didn't support close source channel (raft doesn't attempt to close notify channel); I updated function to handle close signal by always attempting to deliver last known value, just in case someone adopt function for something else in future.

select {
case v := <-dstCh:
require.True(t, v, "unexpected value from dstCh Ch")
case <-time.After(timeoutDuration):
require.Fail(t, "timed out waiting for source->dstCh propagation")
}

sourceCh <- true
sourceCh <- true
sourceCh <- true
sourceCh <- true
sourceCh <- true
sourceCh <- false
select {
case v := <-dstCh:
require.False(t, v, "unexpected value from dstCh Ch")
case <-time.After(timeoutDuration):
require.Fail(t, "timed out waiting for source->dstCh propagation")
}
}

// TestDropButLastChannel_DeliversMessages asserts that last
// message is always delivered, some messages are dropped but never
// introduce new messages.
// On tight loop, receivers may get some intermediary messages.
func TestDropButLastChannel_DeliversMessages(t *testing.T) {
sourceCh := make(chan bool)
shutdownCh := make(chan struct{})
defer close(shutdownCh)

dstCh := dropButLastChannel(sourceCh, shutdownCh)

// timeout duration for any channel propagation delay
timeoutDuration := 5 * time.Millisecond

sentMessages := 100
go func() {
for i := 0; i < sentMessages-1; i++ {
sourceCh <- true
}
sourceCh <- false
}()

receivedTrue, receivedFalse := 0, 0
var lastReceived *bool

RECEIVE_LOOP:
for {
select {
case v := <-dstCh:
lastReceived = &v
if v {
receivedTrue++
} else {
receivedFalse++
}

case <-time.After(timeoutDuration):
break RECEIVE_LOOP
}
}

t.Logf("receiver got %v out %v true messages, and %v out of %v false messages",
receivedTrue, sentMessages-1, receivedFalse, 1)

require.NotNil(t, lastReceived)
require.False(t, *lastReceived)
require.Equal(t, 1, receivedFalse)
require.LessOrEqual(t, receivedTrue, sentMessages-1)
}

// TestDropButLastChannel_DeliversMessages_Close asserts that last
// message is always delivered, some messages are dropped but never
// introduce new messages, even with a closed signal.
func TestDropButLastChannel_DeliversMessages_Close(t *testing.T) {
sourceCh := make(chan bool)
shutdownCh := make(chan struct{})
defer close(shutdownCh)

dstCh := dropButLastChannel(sourceCh, shutdownCh)

// timeout duration for any channel propagation delay
timeoutDuration := 5 * time.Millisecond

sentMessages := 100
go func() {
for i := 0; i < sentMessages-1; i++ {
sourceCh <- true
}
sourceCh <- false
close(sourceCh)
}()

receivedTrue, receivedFalse := 0, 0
var lastReceived *bool

RECEIVE_LOOP:
for {
select {
case v, ok := <-dstCh:
if !ok {
break RECEIVE_LOOP
}
lastReceived = &v
if v {
receivedTrue++
} else {
receivedFalse++
}

case <-time.After(timeoutDuration):
require.Fail(t, "timed out while waiting for messages")
}
}

t.Logf("receiver got %v out %v true messages, and %v out of %v false messages",
receivedTrue, sentMessages-1, receivedFalse, 1)

require.NotNil(t, lastReceived)
require.False(t, *lastReceived)
require.Equal(t, 1, receivedFalse)
require.LessOrEqual(t, receivedTrue, sentMessages-1)
}