diff --git a/nomad/eval_broker.go b/nomad/eval_broker.go index d3dfdc61e438..f93a0ef274dc 100644 --- a/nomad/eval_broker.go +++ b/nomad/eval_broker.go @@ -161,6 +161,8 @@ func (b *EvalBroker) Enabled() bool { // should only be enabled on the active leader. func (b *EvalBroker) SetEnabled(enabled bool) { b.l.Lock() + defer b.l.Unlock() + prevEnabled := b.enabled b.enabled = enabled if !prevEnabled && enabled { @@ -169,7 +171,7 @@ func (b *EvalBroker) SetEnabled(enabled bool) { b.delayedEvalCancelFunc = cancel go b.runDelayedEvalsWatcher(ctx, b.delayedEvalsUpdateCh) } - b.l.Unlock() + if !enabled { b.flush() } @@ -208,6 +210,11 @@ func (b *EvalBroker) EnqueueAll(evals map[*structs.Evaluation]string) { // outstanding, the evaluation is blocked until an Ack/Nack is received. // processEnqueue must be called with the lock held. func (b *EvalBroker) processEnqueue(eval *structs.Evaluation, token string) { + // If we're not enabled, don't enable more queuing. + if !b.enabled { + return + } + // Check if already enqueued if _, ok := b.evals[eval.ID]; ok { if token == "" { @@ -259,8 +266,10 @@ func (b *EvalBroker) processWaitingEnqueue(eval *structs.Evaluation) { func (b *EvalBroker) enqueueWaiting(eval *structs.Evaluation) { b.l.Lock() defer b.l.Unlock() + delete(b.timeWait, eval.ID) b.stats.TotalWaiting -= 1 + b.enqueueLocked(eval, eval.Type) } @@ -678,11 +687,9 @@ func (b *EvalBroker) ResumeNackTimeout(evalID, token string) error { return nil } -// Flush is used to clear the state of the broker +// Flush is used to clear the state of the broker. It must be called from within +// the lock. func (b *EvalBroker) flush() { - b.l.Lock() - defer b.l.Unlock() - // Unblock any waiters for _, waitCh := range b.waiting { close(waitCh) @@ -778,13 +785,13 @@ func (b *EvalBroker) runDelayedEvalsWatcher(ctx context.Context, updateCh <-chan // This peeks at the heap to return the top. If the heap is empty, this returns nil and zero time. func (b *EvalBroker) nextDelayedEval() (*structs.Evaluation, time.Time) { b.l.RLock() + defer b.l.RUnlock() + // If there is nothing wait for an update. if b.delayHeap.Length() == 0 { - b.l.RUnlock() return nil, time.Time{} } nextEval := b.delayHeap.Peek() - b.l.RUnlock() if nextEval == nil { return nil, time.Time{} } diff --git a/nomad/eval_broker_test.go b/nomad/eval_broker_test.go index 8f7f71510a9f..9777e5abd22b 100644 --- a/nomad/eval_broker_test.go +++ b/nomad/eval_broker_test.go @@ -647,6 +647,64 @@ func TestEvalBroker_Enqueue_Disable(t *testing.T) { } } +func TestEvalBroker_Enqueue_Disable_Delay(t *testing.T) { + t.Parallel() + b := testBroker(t, 0) + baseEval := mock.Eval() + b.SetEnabled(true) + + { + // Enqueue + b.Enqueue(baseEval.Copy()) + + delayedEval := baseEval.Copy() + delayedEval.Wait = 30 + b.Enqueue(delayedEval) + + waitEval := baseEval.Copy() + waitEval.WaitUntil = time.Now().Add(30 * time.Second) + b.Enqueue(waitEval) + } + + // Flush via SetEnabled + b.SetEnabled(false) + + { + // Check the stats + stats := b.Stats() + require.Equal(t, 0, stats.TotalReady, "Expected ready to be flushed") + require.Equal(t, 0, stats.TotalWaiting, "Expected waiting to be flushed") + require.Equal(t, 0, stats.TotalBlocked, "Expected blocked to be flushed") + require.Equal(t, 0, stats.TotalUnacked, "Expected unacked to be flushed") + _, ok := stats.ByScheduler[baseEval.Type] + require.False(t, ok, "Expected scheduler to have no stats") + } + + { + // Enqueue again now we're disabled + b.Enqueue(baseEval.Copy()) + + delayedEval := baseEval.Copy() + delayedEval.Wait = 30 * time.Second + b.Enqueue(delayedEval) + + waitEval := baseEval.Copy() + waitEval.WaitUntil = time.Now().Add(30 * time.Second) + b.Enqueue(waitEval) + } + + { + // Check the stats again + stats := b.Stats() + require.Equal(t, 0, stats.TotalReady, "Expected ready to be flushed") + require.Equal(t, 0, stats.TotalWaiting, "Expected waiting to be flushed") + require.Equal(t, 0, stats.TotalBlocked, "Expected blocked to be flushed") + require.Equal(t, 0, stats.TotalUnacked, "Expected unacked to be flushed") + _, ok := stats.ByScheduler[baseEval.Type] + require.False(t, ok, "Expected scheduler to have no stats") + } +} + func TestEvalBroker_Dequeue_Timeout(t *testing.T) { t.Parallel() b := testBroker(t, 0)