Skip to content

Commit

Permalink
Merge pull request #5699 from hashicorp/dani/b-eval-broker-lifetime
Browse files Browse the repository at this point in the history
Eval Broker: Prevent redundant enqueue's when a node is not a leader
  • Loading branch information
endocrimes committed May 15, 2019
2 parents caa410a + 68c1454 commit 781c94b
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 7 deletions.
21 changes: 14 additions & 7 deletions nomad/eval_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ func (b *EvalBroker) Enabled() bool {
// should only be enabled on the active leader.
func (b *EvalBroker) SetEnabled(enabled bool) {
b.l.Lock()
defer b.l.Unlock()

prevEnabled := b.enabled
b.enabled = enabled
if !prevEnabled && enabled {
Expand All @@ -169,7 +171,7 @@ func (b *EvalBroker) SetEnabled(enabled bool) {
b.delayedEvalCancelFunc = cancel
go b.runDelayedEvalsWatcher(ctx, b.delayedEvalsUpdateCh)
}
b.l.Unlock()

if !enabled {
b.flush()
}
Expand Down Expand Up @@ -208,6 +210,11 @@ func (b *EvalBroker) EnqueueAll(evals map[*structs.Evaluation]string) {
// outstanding, the evaluation is blocked until an Ack/Nack is received.
// processEnqueue must be called with the lock held.
func (b *EvalBroker) processEnqueue(eval *structs.Evaluation, token string) {
// If we're not enabled, don't enable more queuing.
if !b.enabled {
return
}

// Check if already enqueued
if _, ok := b.evals[eval.ID]; ok {
if token == "" {
Expand Down Expand Up @@ -259,8 +266,10 @@ func (b *EvalBroker) processWaitingEnqueue(eval *structs.Evaluation) {
func (b *EvalBroker) enqueueWaiting(eval *structs.Evaluation) {
b.l.Lock()
defer b.l.Unlock()

delete(b.timeWait, eval.ID)
b.stats.TotalWaiting -= 1

b.enqueueLocked(eval, eval.Type)
}

Expand Down Expand Up @@ -678,11 +687,9 @@ func (b *EvalBroker) ResumeNackTimeout(evalID, token string) error {
return nil
}

// Flush is used to clear the state of the broker
// Flush is used to clear the state of the broker. It must be called from within
// the lock.
func (b *EvalBroker) flush() {
b.l.Lock()
defer b.l.Unlock()

// Unblock any waiters
for _, waitCh := range b.waiting {
close(waitCh)
Expand Down Expand Up @@ -778,13 +785,13 @@ func (b *EvalBroker) runDelayedEvalsWatcher(ctx context.Context, updateCh <-chan
// This peeks at the heap to return the top. If the heap is empty, this returns nil and zero time.
func (b *EvalBroker) nextDelayedEval() (*structs.Evaluation, time.Time) {
b.l.RLock()
defer b.l.RUnlock()

// If there is nothing wait for an update.
if b.delayHeap.Length() == 0 {
b.l.RUnlock()
return nil, time.Time{}
}
nextEval := b.delayHeap.Peek()
b.l.RUnlock()
if nextEval == nil {
return nil, time.Time{}
}
Expand Down
58 changes: 58 additions & 0 deletions nomad/eval_broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,64 @@ func TestEvalBroker_Enqueue_Disable(t *testing.T) {
}
}

func TestEvalBroker_Enqueue_Disable_Delay(t *testing.T) {
t.Parallel()
b := testBroker(t, 0)
baseEval := mock.Eval()
b.SetEnabled(true)

{
// Enqueue
b.Enqueue(baseEval.Copy())

delayedEval := baseEval.Copy()
delayedEval.Wait = 30
b.Enqueue(delayedEval)

waitEval := baseEval.Copy()
waitEval.WaitUntil = time.Now().Add(30 * time.Second)
b.Enqueue(waitEval)
}

// Flush via SetEnabled
b.SetEnabled(false)

{
// Check the stats
stats := b.Stats()
require.Equal(t, 0, stats.TotalReady, "Expected ready to be flushed")
require.Equal(t, 0, stats.TotalWaiting, "Expected waiting to be flushed")
require.Equal(t, 0, stats.TotalBlocked, "Expected blocked to be flushed")
require.Equal(t, 0, stats.TotalUnacked, "Expected unacked to be flushed")
_, ok := stats.ByScheduler[baseEval.Type]
require.False(t, ok, "Expected scheduler to have no stats")
}

{
// Enqueue again now we're disabled
b.Enqueue(baseEval.Copy())

delayedEval := baseEval.Copy()
delayedEval.Wait = 30 * time.Second
b.Enqueue(delayedEval)

waitEval := baseEval.Copy()
waitEval.WaitUntil = time.Now().Add(30 * time.Second)
b.Enqueue(waitEval)
}

{
// Check the stats again
stats := b.Stats()
require.Equal(t, 0, stats.TotalReady, "Expected ready to be flushed")
require.Equal(t, 0, stats.TotalWaiting, "Expected waiting to be flushed")
require.Equal(t, 0, stats.TotalBlocked, "Expected blocked to be flushed")
require.Equal(t, 0, stats.TotalUnacked, "Expected unacked to be flushed")
_, ok := stats.ByScheduler[baseEval.Type]
require.False(t, ok, "Expected scheduler to have no stats")
}
}

func TestEvalBroker_Dequeue_Timeout(t *testing.T) {
t.Parallel()
b := testBroker(t, 0)
Expand Down

0 comments on commit 781c94b

Please sign in to comment.