From 3ef6d15b23517e963ccb88591dbb4e2c0c354410 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Tue, 20 Sep 2022 17:04:51 -0400 Subject: [PATCH] eval broker: shed blocked evals older than acknowledged eval When an evaluation is acknowledged by a scheduler, the resulting plan is guaranteed to cover up to the `ModifyIndex` set by the worker based on the most recent evaluation for that job in the state store. At that point, we no longer need to retain blocked evaluations in the broker that are older than that index. Move these stale evals into a canceled set. When the `Eval.Ack` RPC returns from the eval broker it will retrieve a batch of canceable evals to write to raft. This paces the cancelations limited by how frequently the schedulers are acknowledging evals; this should reduce the risk of cancelations from overwhelming raft relative to scheduler progress. --- nomad/eval_broker.go | 97 +++++++++-- nomad/eval_broker_test.go | 338 ++++++++++++++------------------------ nomad/eval_endpoint.go | 21 +++ nomad/worker_test.go | 54 +++--- 4 files changed, 253 insertions(+), 257 deletions(-) diff --git a/nomad/eval_broker.go b/nomad/eval_broker.go index e13394b17258..0db3b7707edd 100644 --- a/nomad/eval_broker.go +++ b/nomad/eval_broker.go @@ -66,6 +66,10 @@ type EvalBroker struct { // blocked tracks the blocked evaluations by JobID in a priority queue blocked map[structs.NamespacedID]PendingEvaluations + // cancelable tracks previously blocked evaluations (for any job) that are + // now safe for the Eval.Ack RPC to cancel in batches + cancelable PendingEvaluations + // ready tracks the ready jobs by scheduler in a priority queue ready map[string]PendingEvaluations @@ -140,6 +144,7 @@ func NewEvalBroker(timeout, initialNackDelay, subsequentNackDelay time.Duration, evals: make(map[string]int), jobEvals: make(map[structs.NamespacedID]string), blocked: make(map[structs.NamespacedID]PendingEvaluations), + cancelable: PendingEvaluations{}, ready: make(map[string]PendingEvaluations), unack: make(map[string]*unackEval), waiting: make(map[string]chan struct{}), @@ -559,6 +564,7 @@ func (b *EvalBroker) Ack(evalID, token string) error { return fmt.Errorf("Token does not match for Evaluation ID") } jobID := unack.Eval.JobID + oldestUnackedIndex := unack.Eval.ModifyIndex // Ensure we were able to stop the timer if !unack.NackTimer.Stop() { @@ -586,15 +592,28 @@ func (b *EvalBroker) Ack(evalID, token string) error { // Check if there are any blocked evaluations if blocked := b.blocked[namespacedID]; len(blocked) != 0 { - raw := heap.Pop(&blocked) + + // Any blocked evaluations with ModifyIndexes older than the just-ack'd + // evaluation are no longer useful, so it's safe to drop them. + cancelable := blocked.MarkForCancel(oldestUnackedIndex) + b.cancelable = append(b.cancelable, cancelable...) + b.stats.TotalCancelable = b.cancelable.Len() + b.stats.TotalBlocked -= cancelable.Len() + + // If any remain, enqueue the eval with the *highest* index + if len(blocked) > 0 { + raw := heap.Pop(&blocked) + eval := raw.(*structs.Evaluation) + b.stats.TotalBlocked -= 1 + b.enqueueLocked(eval, eval.Type) + } + + // Clean up if there are no more after that if len(blocked) > 0 { b.blocked[namespacedID] = blocked } else { delete(b.blocked, namespacedID) } - eval := raw.(*structs.Evaluation) - b.stats.TotalBlocked -= 1 - b.enqueueLocked(eval, eval.Type) } // Re-enqueue the evaluation. @@ -733,11 +752,13 @@ func (b *EvalBroker) flush() { b.stats.TotalUnacked = 0 b.stats.TotalBlocked = 0 b.stats.TotalWaiting = 0 + b.stats.TotalCancelable = 0 b.stats.DelayedEvals = make(map[string]*structs.Evaluation) b.stats.ByScheduler = make(map[string]*SchedulerStats) b.evals = make(map[string]int) b.jobEvals = make(map[structs.NamespacedID]string) b.blocked = make(map[structs.NamespacedID]PendingEvaluations) + b.cancelable = PendingEvaluations{} b.ready = make(map[string]PendingEvaluations) b.unack = make(map[string]*unackEval) b.timeWait = make(map[string]*time.Timer) @@ -830,6 +851,7 @@ func (b *EvalBroker) Stats() *BrokerStats { stats.TotalUnacked = b.stats.TotalUnacked stats.TotalBlocked = b.stats.TotalBlocked stats.TotalWaiting = b.stats.TotalWaiting + stats.TotalCancelable = b.stats.TotalCancelable for id, eval := range b.stats.DelayedEvals { evalCopy := *eval stats.DelayedEvals[id] = &evalCopy @@ -841,6 +863,17 @@ func (b *EvalBroker) Stats() *BrokerStats { return stats } +// Cancelable retrieves a batch of previously-blocked evaluations that are now +// stale and ready to mark for canceling. The eval RPC will call this with a +// batch size set to avoid sending overly large raft messages. +func (b *EvalBroker) Cancelable(batchSize int) []*structs.Evaluation { + b.l.RLock() + defer b.l.RUnlock() + cancelable := b.cancelable.PopN(batchSize) + b.stats.TotalCancelable -= len(cancelable) + return cancelable +} + // EmitStats is used to export metrics about the broker while enabled func (b *EvalBroker) EmitStats(period time.Duration, stopCh <-chan struct{}) { timer, stop := helper.NewSafeTimer(period) @@ -856,6 +889,7 @@ func (b *EvalBroker) EmitStats(period time.Duration, stopCh <-chan struct{}) { metrics.SetGauge([]string{"nomad", "broker", "total_unacked"}, float32(stats.TotalUnacked)) metrics.SetGauge([]string{"nomad", "broker", "total_blocked"}, float32(stats.TotalBlocked)) metrics.SetGauge([]string{"nomad", "broker", "total_waiting"}, float32(stats.TotalWaiting)) + metrics.SetGauge([]string{"nomad", "broker", "total_cancelable"}, float32(stats.TotalCancelable)) for _, eval := range stats.DelayedEvals { metrics.SetGaugeWithLabels([]string{"nomad", "broker", "eval_waiting"}, float32(time.Until(eval.WaitUntil).Seconds()), @@ -878,12 +912,13 @@ func (b *EvalBroker) EmitStats(period time.Duration, stopCh <-chan struct{}) { // BrokerStats returns all the stats about the broker type BrokerStats struct { - TotalReady int - TotalUnacked int - TotalBlocked int - TotalWaiting int - DelayedEvals map[string]*structs.Evaluation - ByScheduler map[string]*SchedulerStats + TotalReady int + TotalUnacked int + TotalBlocked int + TotalWaiting int + TotalCancelable int + DelayedEvals map[string]*structs.Evaluation + ByScheduler map[string]*SchedulerStats } // SchedulerStats returns the stats per scheduler @@ -904,7 +939,7 @@ func (p PendingEvaluations) Less(i, j int) bool { if p[i].JobID != p[j].JobID && p[i].Priority != p[j].Priority { return !(p[i].Priority < p[j].Priority) } - return p[i].CreateIndex < p[j].CreateIndex + return (p[i].CreateIndex < p[j].CreateIndex) } // Swap is for the sorting interface @@ -934,3 +969,43 @@ func (p PendingEvaluations) Peek() *structs.Evaluation { } return p[n-1] } + +// PopN removes and returns the minimum N evaluations from the slice. +func (p *PendingEvaluations) PopN(n int) []*structs.Evaluation { + if n > len(*p) { + n = len(*p) + } + + popped := []*structs.Evaluation{} + for i := 0; i < n; i++ { + raw := heap.Pop(p) + eval := raw.(*structs.Evaluation) + popped = append(popped, eval) + } + return popped +} + +// MarkForCancel is used to remove any evaluations older than the index from the +// blocked list and returns a list of cancelable evals so that Eval.Ack RPCs can +// write batched raft entries to cancel them. This must be called inside the +// broker's lock. +func (p *PendingEvaluations) MarkForCancel(index uint64) PendingEvaluations { + + // In pathological cases, we can have a large number of blocked evals but + // will want to cancel most of them. Using heap.Remove requires we re-sort + // for each eval we remove. Because we expect to have very few if any + // blocked remaining, we'll just create a new heap + retain := PendingEvaluations{} + cancelable := PendingEvaluations{} + + for _, eval := range *p { + if eval.ModifyIndex >= index { + heap.Push(&retain, eval) + } else { + heap.Push(&cancelable, eval) + } + } + + *p = retain + return cancelable +} \ No newline at end of file diff --git a/nomad/eval_broker_test.go b/nomad/eval_broker_test.go index 3b0988eae724..9b33f117084e 100644 --- a/nomad/eval_broker_test.go +++ b/nomad/eval_broker_test.go @@ -1,6 +1,7 @@ package nomad import ( + "container/heap" "encoding/json" "errors" "fmt" @@ -11,6 +12,7 @@ import ( "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" + "github.com/shoenig/test/must" "github.com/stretchr/testify/require" ) @@ -393,236 +395,111 @@ func TestEvalBroker_Serialize_DuplicateJobID(t *testing.T) { ns1 := "namespace-one" ns2 := "namespace-two" - eval := mock.Eval() - eval.Namespace = ns1 - b.Enqueue(eval) - - eval2 := mock.Eval() - eval2.JobID = eval.JobID - eval2.Namespace = ns1 - eval2.CreateIndex = eval.CreateIndex + 1 - b.Enqueue(eval2) - - eval3 := mock.Eval() - eval3.JobID = eval.JobID - eval3.Namespace = ns1 - eval3.CreateIndex = eval.CreateIndex + 2 - b.Enqueue(eval3) - - eval4 := mock.Eval() - eval4.JobID = eval.JobID - eval4.Namespace = ns2 - eval4.CreateIndex = eval.CreateIndex + 3 - b.Enqueue(eval4) - - eval5 := mock.Eval() - eval5.JobID = eval.JobID - eval5.Namespace = ns2 - eval5.CreateIndex = eval.CreateIndex + 4 - b.Enqueue(eval5) - - stats := b.Stats() - if stats.TotalReady != 2 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 3 { - t.Fatalf("bad: %#v", stats) - } - - // Dequeue should work - out, token, err := b.Dequeue(defaultSched, time.Second) - if err != nil { - t.Fatalf("err: %v", err) - } - if out != eval { - t.Fatalf("bad : %#v", out) - } + jobID := "example" - // Check the stats - stats = b.Stats() - if stats.TotalReady != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 3 { - t.Fatalf("bad: %#v", stats) + newEval := func(idx uint64, ns string) *structs.Evaluation { + eval := mock.Eval() + eval.JobID = jobID + eval.Namespace = ns + eval.CreateIndex = idx + eval.ModifyIndex = idx + b.Enqueue(eval) + return eval + } + + // first job + eval1 := newEval(1, ns1) + newEval(2, ns1) + eval3 := newEval(3, ns1) + newEval(4, ns1) + + // second job + eval5 := newEval(5, ns2) + newEval(6, ns2) + eval7 := newEval(7, ns2) + + // retreive the stats from the broker, less some stats that aren't + // interesting for this test and make the test much more verbose + // to include + getStats := func() BrokerStats { + t.Helper() + stats := b.Stats() + stats.DelayedEvals = nil + stats.ByScheduler = nil + return *stats } - // Ack out - err = b.Ack(eval.ID, token) - if err != nil { - t.Fatalf("err: %v", err) - } + must.Eq(t, BrokerStats{TotalReady: 2, TotalUnacked: 0, + TotalBlocked: 5, TotalCancelable: 0}, getStats()) - // Check the stats - stats = b.Stats() - if stats.TotalReady != 2 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 0 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 2 { - t.Fatalf("bad: %#v", stats) - } + // Dequeue should get 1st eval + out, token, err := b.Dequeue(defaultSched, time.Second) + must.NoError(t, err) + must.Eq(t, out, eval1, must.Sprint("expected 1st eval")) - // Dequeue should work - out, token, err = b.Dequeue(defaultSched, time.Second) - if err != nil { - t.Fatalf("err: %v", err) - } - if out != eval2 { - t.Fatalf("bad : %#v", out) - } + must.Eq(t, BrokerStats{TotalReady: 1, TotalUnacked: 1, + TotalBlocked: 5, TotalCancelable: 0}, getStats()) - // Check the stats - stats = b.Stats() - if stats.TotalReady != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 2 { - t.Fatalf("bad: %#v", stats) - } + // Current wait index should be 4 but Ack to exercise behavior + // when worker's Eval.getWaitIndex gets a stale index + eval1.ModifyIndex = 3 + err = b.Ack(eval1.ID, token) + must.NoError(t, err) - // Ack out - err = b.Ack(eval2.ID, token) - if err != nil { - t.Fatalf("err: %v", err) - } + must.Eq(t, BrokerStats{TotalReady: 2, TotalUnacked: 0, + TotalBlocked: 3, TotalCancelable: 1}, getStats()) - // Check the stats - stats = b.Stats() - if stats.TotalReady != 2 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 0 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 1 { - t.Fatalf("bad: %#v", stats) - } - - // Dequeue should work + // Dequeue should get 3rd eval out, token, err = b.Dequeue(defaultSched, time.Second) - if err != nil { - t.Fatalf("err: %v", err) - } - if out != eval3 { - t.Fatalf("bad : %#v", out) - } + must.NoError(t, err) + must.Eq(t, out, eval3, must.Sprint("expected 3rd eval")) - // Check the stats - stats = b.Stats() - if stats.TotalReady != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 1 { - t.Fatalf("bad: %#v", stats) - } + must.Eq(t, BrokerStats{TotalReady: 1, TotalUnacked: 1, + TotalBlocked: 3, TotalCancelable: 1}, getStats()) - // Ack out + // Current wait index should be 4 but Ack to exercise behavior + // when newer eval has been written to state store and replicated + // but not yet written to broker. Ack should clear the rest of + // namespace-one blocked but leave namespace-two untouched + eval3.ModifyIndex = 8 err = b.Ack(eval3.ID, token) - if err != nil { - t.Fatalf("err: %v", err) - } + must.NoError(t, err) - // Check the stats - stats = b.Stats() - if stats.TotalReady != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 0 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 1 { - t.Fatalf("bad: %#v", stats) - } + must.Eq(t, BrokerStats{TotalReady: 1, TotalUnacked: 0, + TotalBlocked: 2, TotalCancelable: 2}, getStats()) - // Dequeue should work + // Dequeue should get 5th eval out, token, err = b.Dequeue(defaultSched, time.Second) - if err != nil { - t.Fatalf("err: %v", err) - } - if out != eval4 { - t.Fatalf("bad : %#v", out) - } + must.NoError(t, err) + must.Eq(t, out, eval5, must.Sprint("expected 5th eval")) - // Check the stats - stats = b.Stats() - if stats.TotalReady != 0 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 1 { - t.Fatalf("bad: %#v", stats) - } + must.Eq(t, BrokerStats{TotalReady: 0, TotalUnacked: 1, + TotalBlocked: 2, TotalCancelable: 2}, getStats()) - // Ack out - err = b.Ack(eval4.ID, token) - if err != nil { - t.Fatalf("err: %v", err) - } + // Ack with expected wait index, which should clear remaining + // namespace-two blocked evals + eval5.ModifyIndex = 7 + err = b.Ack(eval5.ID, token) + must.NoError(t, err) - // Check the stats - stats = b.Stats() - if stats.TotalReady != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 0 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 0 { - t.Fatalf("bad: %#v", stats) - } + must.Eq(t, BrokerStats{TotalReady: 1, TotalUnacked: 0, + TotalBlocked: 0, TotalCancelable: 3}, getStats()) - // Dequeue should work + // Dequeue should get 7th eval because that's all that's left out, token, err = b.Dequeue(defaultSched, time.Second) - if err != nil { - t.Fatalf("err: %v", err) - } - if out != eval5 { - t.Fatalf("bad : %#v", out) - } + must.NoError(t, err) + must.Eq(t, out, eval7, must.Sprint("expected 7th eval")) - // Check the stats - stats = b.Stats() - if stats.TotalReady != 0 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 0 { - t.Fatalf("bad: %#v", stats) - } + must.Eq(t, BrokerStats{TotalReady: 0, TotalUnacked: 1, + TotalBlocked: 0, TotalCancelable: 3}, getStats()) - // Ack out - err = b.Ack(eval5.ID, token) - if err != nil { - t.Fatalf("err: %v", err) - } + // Last ack should leave the broker empty except for cancels + eval7.ModifyIndex = 7 + err = b.Ack(eval7.ID, token) + must.NoError(t, err) - // Check the stats - stats = b.Stats() - if stats.TotalReady != 0 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 0 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 0 { - t.Fatalf("bad: %#v", stats) - } + must.Eq(t, BrokerStats{TotalReady: 0, TotalUnacked: 0, + TotalBlocked: 0, TotalCancelable: 3}, getStats()) } func TestEvalBroker_Enqueue_Disable(t *testing.T) { @@ -813,18 +690,18 @@ func TestEvalBroker_Dequeue_FIFO(t *testing.T) { b.SetEnabled(true) NUM := 100 - for i := 0; i < NUM; i++ { + for i := NUM; i > 0; i-- { eval1 := mock.Eval() eval1.CreateIndex = uint64(i) eval1.ModifyIndex = uint64(i) b.Enqueue(eval1) } - for i := 0; i < NUM; i++ { + for i := 1; i < NUM; i++ { out1, _, _ := b.Dequeue(defaultSched, time.Second) - if out1.CreateIndex != uint64(i) { - t.Fatalf("bad: %d %#v", i, out1) - } + must.Eq(t, out1.CreateIndex, uint64(i), + must.Sprintf("eval was not FIFO by CreateIndex"), + ) } } @@ -1506,3 +1383,40 @@ func TestEvalBroker_NamespacedJobs(t *testing.T) { require.Equal(1, len(b.blocked)) } + +func TestEvalBroker_BlockedEvals_MarkForCancel(t *testing.T) { + ci.Parallel(t) + + blocked := PendingEvaluations{} + + // evals are pushed on the blocked queue FIFO by CreateIndex order, so Push + // them in reverse order here to assert we're getting them back out in the + // right order and not just as inserted + for i := 100; i > 0; i -= 10 { + eval := mock.Eval() + eval.JobID = "example" + eval.CreateIndex = uint64(i) + eval.ModifyIndex = uint64(i) + heap.Push(&blocked, eval) + } + + canceled := blocked.MarkForCancel(40) + must.Eq(t, canceled.Len(), 3) + must.Eq(t, blocked.Len(), 7) + + got := []uint64{} + for i := 0; i < 7; i++ { + raw := heap.Pop(&blocked) + must.NotNil(t, raw) + eval := raw.(*structs.Evaluation) + got = append(got, eval.CreateIndex) + } + must.Eq(t, []uint64{40, 50, 60, 70, 80, 90, 100}, got) + + popped := canceled.PopN(2) + must.Len(t, 1, canceled) + must.Len(t, 2, popped) + + must.Eq(t, popped[0].CreateIndex, 10) + must.Eq(t, popped[1].CreateIndex, 20) +} \ No newline at end of file diff --git a/nomad/eval_endpoint.go b/nomad/eval_endpoint.go index 575f7edd9393..2b67fd1194eb 100644 --- a/nomad/eval_endpoint.go +++ b/nomad/eval_endpoint.go @@ -229,6 +229,27 @@ func (e *Eval) Ack(args *structs.EvalAckRequest, if err := e.srv.evalBroker.Ack(args.EvalID, args.Token); err != nil { return err } + + const cancelDesc = "cancelled after more recent eval was processed" + + cancelable := e.srv.evalBroker.Cancelable(1000) + if len(cancelable) > 0 { + for _, eval := range cancelable { + eval.Status = structs.EvalStatusCancelled + eval.StatusDescription = cancelDesc + } + + update := &structs.EvalUpdateRequest{ + Evals: cancelable, + WriteRequest: structs.WriteRequest{Region: args.Region}, + } + _, _, err = e.srv.raftApply(structs.EvalUpdateRequestType, update) + if err != nil { + e.logger.Error("eval cancel failed", "error", err, "method", "ack") + return err + } + } + return nil } diff --git a/nomad/worker_test.go b/nomad/worker_test.go index d8d5f4481095..293358e7b8d9 100644 --- a/nomad/worker_test.go +++ b/nomad/worker_test.go @@ -11,6 +11,7 @@ import ( log "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/ci" + "github.com/shoenig/test/must" "github.com/stretchr/testify/require" "github.com/hashicorp/nomad/helper/testlog" @@ -118,9 +119,10 @@ func TestWorker_dequeueEvaluation_SerialJobs(t *testing.T) { eval2.JobID = eval1.JobID // Insert the evals into the state store - if err := s1.fsm.State().UpsertEvals(structs.MsgTypeTestSetup, 1000, []*structs.Evaluation{eval1, eval2}); err != nil { - t.Fatal(err) - } + must.NoError(t, s1.fsm.State().UpsertEvals( + structs.MsgTypeTestSetup, 1000, []*structs.Evaluation{eval1})) + must.NoError(t, s1.fsm.State().UpsertEvals( + structs.MsgTypeTestSetup, 2000, []*structs.Evaluation{eval2})) s1.evalBroker.Enqueue(eval1) s1.evalBroker.Enqueue(eval2) @@ -131,45 +133,29 @@ func TestWorker_dequeueEvaluation_SerialJobs(t *testing.T) { // Attempt dequeue eval, token, waitIndex, shutdown := w.dequeueEvaluation(10 * time.Millisecond) - if shutdown { - t.Fatalf("should not shutdown") - } - if token == "" { - t.Fatalf("should get token") - } - if waitIndex != eval1.ModifyIndex { - t.Fatalf("bad wait index; got %d; want %d", waitIndex, eval1.ModifyIndex) - } - - // Ensure we get a sane eval - if !reflect.DeepEqual(eval, eval1) { - t.Fatalf("bad: %#v %#v", eval, eval1) - } + must.False(t, shutdown, must.Sprint("should not be shutdown")) + must.NotEq(t, token, "", must.Sprint("should get a token")) + must.NotEq(t, eval1.ModifyIndex, waitIndex, must.Sprintf("bad wait index")) + must.Eq(t, eval, eval1) // Update the modify index of the first eval - if err := s1.fsm.State().UpsertEvals(structs.MsgTypeTestSetup, 2000, []*structs.Evaluation{eval1}); err != nil { - t.Fatal(err) - } + must.NoError(t, s1.fsm.State().UpsertEvals( + structs.MsgTypeTestSetup, 1500, []*structs.Evaluation{eval1})) // Send the Ack w.sendAck(eval1, token) - // Attempt second dequeue + // Attempt second dequeue; it should succeed because the 2nd eval has a + // lower modify index than the snapshot used to schedule the 1st + // eval. Normally this can only happen if the worker is on a follower that's + // trailing behind in raft logs eval, token, waitIndex, shutdown = w.dequeueEvaluation(10 * time.Millisecond) - if shutdown { - t.Fatalf("should not shutdown") - } - if token == "" { - t.Fatalf("should get token") - } - if waitIndex != 2000 { - t.Fatalf("bad wait index; got %d; want 2000", eval2.ModifyIndex) - } - // Ensure we get a sane eval - if !reflect.DeepEqual(eval, eval2) { - t.Fatalf("bad: %#v %#v", eval, eval2) - } + must.False(t, shutdown, must.Sprint("should not be shutdown")) + must.NotEq(t, token, "", must.Sprint("should get a token")) + must.Eq(t, waitIndex, 2000, must.Sprintf("bad wait index")) + must.Eq(t, eval, eval2) + } func TestWorker_dequeueEvaluation_paused(t *testing.T) {