diff --git a/nomad/eval_broker.go b/nomad/eval_broker.go index e13394b17258..5a60c1ce5f6b 100644 --- a/nomad/eval_broker.go +++ b/nomad/eval_broker.go @@ -66,6 +66,10 @@ type EvalBroker struct { // blocked tracks the blocked evaluations by JobID in a priority queue blocked map[structs.NamespacedID]PendingEvaluations + // cancelable tracks previously blocked evaluations (for any job) that are + // now safe for the Eval.Ack RPC to cancel in batches + cancelable PendingEvaluations + // ready tracks the ready jobs by scheduler in a priority queue ready map[string]PendingEvaluations @@ -140,6 +144,7 @@ func NewEvalBroker(timeout, initialNackDelay, subsequentNackDelay time.Duration, evals: make(map[string]int), jobEvals: make(map[structs.NamespacedID]string), blocked: make(map[structs.NamespacedID]PendingEvaluations), + cancelable: PendingEvaluations{}, ready: make(map[string]PendingEvaluations), unack: make(map[string]*unackEval), waiting: make(map[string]chan struct{}), @@ -559,6 +564,7 @@ func (b *EvalBroker) Ack(evalID, token string) error { return fmt.Errorf("Token does not match for Evaluation ID") } jobID := unack.Eval.JobID + oldestUnackedIndex := unack.Eval.ModifyIndex // Ensure we were able to stop the timer if !unack.NackTimer.Stop() { @@ -586,15 +592,27 @@ func (b *EvalBroker) Ack(evalID, token string) error { // Check if there are any blocked evaluations if blocked := b.blocked[namespacedID]; len(blocked) != 0 { - raw := heap.Pop(&blocked) + + // Any blocked evaluations with ModifyIndexes older than the just-ack'd + // evaluation are no longer useful, so it's safe to drop them. + cancelable := blocked.MarkForCancel(oldestUnackedIndex) + b.cancelable = append(b.cancelable, cancelable...) + b.stats.TotalBlocked -= cancelable.Len() + + // If any remain, enqueue the eval with the *highest* index + if len(blocked) > 0 { + raw := heap.Pop(&blocked) + eval := raw.(*structs.Evaluation) + b.stats.TotalBlocked -= 1 + b.enqueueLocked(eval, eval.Type) + } + + // Clean up if there are no more after that if len(blocked) > 0 { b.blocked[namespacedID] = blocked } else { delete(b.blocked, namespacedID) } - eval := raw.(*structs.Evaluation) - b.stats.TotalBlocked -= 1 - b.enqueueLocked(eval, eval.Type) } // Re-enqueue the evaluation. @@ -738,6 +756,7 @@ func (b *EvalBroker) flush() { b.evals = make(map[string]int) b.jobEvals = make(map[structs.NamespacedID]string) b.blocked = make(map[structs.NamespacedID]PendingEvaluations) + b.cancelable = PendingEvaluations{} b.ready = make(map[string]PendingEvaluations) b.unack = make(map[string]*unackEval) b.timeWait = make(map[string]*time.Timer) @@ -841,6 +860,15 @@ func (b *EvalBroker) Stats() *BrokerStats { return stats } +// Cancelable retrieves a batch of previously-blocked evaluations that are now +// stale and ready to mark for canceling. The eval RPC will call this with a +// batch size set to avoid sending overly large raft messages. +func (b *EvalBroker) Cancelable(batchSize int) []*structs.Evaluation { + b.l.RLock() + defer b.l.RUnlock() + return b.cancelable.PopN(batchSize) +} + // EmitStats is used to export metrics about the broker while enabled func (b *EvalBroker) EmitStats(period time.Duration, stopCh <-chan struct{}) { timer, stop := helper.NewSafeTimer(period) @@ -904,7 +932,7 @@ func (p PendingEvaluations) Less(i, j int) bool { if p[i].JobID != p[j].JobID && p[i].Priority != p[j].Priority { return !(p[i].Priority < p[j].Priority) } - return p[i].CreateIndex < p[j].CreateIndex + return (p[i].CreateIndex < p[j].CreateIndex) } // Swap is for the sorting interface @@ -934,3 +962,43 @@ func (p PendingEvaluations) Peek() *structs.Evaluation { } return p[n-1] } + +// PopN removes and returns the minimum N evaluations from the slice. +func (p *PendingEvaluations) PopN(n int) []*structs.Evaluation { + if n > len(*p) { + n = len(*p) + } + + popped := []*structs.Evaluation{} + for i := 0; i < n; i++ { + raw := heap.Pop(p) + eval := raw.(*structs.Evaluation) + popped = append(popped, eval) + } + return popped +} + +// MarkForCancel is used to remove any evaluations older than the index from the +// blocked list and returns a list of cancelable evals so that Eval.Ack RPCs can +// write batched raft entries to cancel them. This must be called inside the +// broker's lock. +func (p *PendingEvaluations) MarkForCancel(index uint64) PendingEvaluations { + + // In pathological cases, we can have a large number of blocked evals but + // will want to cancel most of them. Using heap.Remove requires we re-sort + // for each eval we remove. Because we expect to have very few if any + // blocked remaining, we'll just create a new heap + retain := PendingEvaluations{} + cancelable := PendingEvaluations{} + + for _, eval := range *p { + if eval.ModifyIndex >= index { + heap.Push(&retain, eval) + } else { + heap.Push(&cancelable, eval) + } + } + + *p = retain + return cancelable +} diff --git a/nomad/eval_broker_test.go b/nomad/eval_broker_test.go index 3b0988eae724..36255bb0fa6a 100644 --- a/nomad/eval_broker_test.go +++ b/nomad/eval_broker_test.go @@ -1,6 +1,7 @@ package nomad import ( + "container/heap" "encoding/json" "errors" "fmt" @@ -11,6 +12,7 @@ import ( "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" + "github.com/shoenig/test/must" "github.com/stretchr/testify/require" ) @@ -393,236 +395,88 @@ func TestEvalBroker_Serialize_DuplicateJobID(t *testing.T) { ns1 := "namespace-one" ns2 := "namespace-two" - eval := mock.Eval() - eval.Namespace = ns1 - b.Enqueue(eval) + eval1 := mock.Eval() + eval1.Namespace = ns1 + eval1.CreateIndex = 1 + b.Enqueue(eval1) eval2 := mock.Eval() - eval2.JobID = eval.JobID + eval2.JobID = eval1.JobID eval2.Namespace = ns1 - eval2.CreateIndex = eval.CreateIndex + 1 + eval2.CreateIndex = 2 b.Enqueue(eval2) eval3 := mock.Eval() - eval3.JobID = eval.JobID + eval3.JobID = eval1.JobID eval3.Namespace = ns1 - eval3.CreateIndex = eval.CreateIndex + 2 + eval3.CreateIndex = 3 b.Enqueue(eval3) eval4 := mock.Eval() - eval4.JobID = eval.JobID + eval4.JobID = eval1.JobID eval4.Namespace = ns2 - eval4.CreateIndex = eval.CreateIndex + 3 + eval4.CreateIndex = 4 b.Enqueue(eval4) eval5 := mock.Eval() - eval5.JobID = eval.JobID + eval5.JobID = eval1.JobID eval5.Namespace = ns2 - eval5.CreateIndex = eval.CreateIndex + 4 + eval5.CreateIndex = 5 b.Enqueue(eval5) stats := b.Stats() - if stats.TotalReady != 2 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 3 { - t.Fatalf("bad: %#v", stats) - } + must.Eq(t, stats.TotalReady, 2, must.Sprintf("expected 2 ready: %#v", stats)) + must.Eq(t, stats.TotalUnacked, 0, must.Sprintf("expected 0 unacked: %#v", stats)) + must.Eq(t, stats.TotalBlocked, 3, must.Sprintf("expected 3 blocked: %#v", stats)) - // Dequeue should work + // Dequeue should get 1st eval out, token, err := b.Dequeue(defaultSched, time.Second) - if err != nil { - t.Fatalf("err: %v", err) - } - if out != eval { - t.Fatalf("bad : %#v", out) - } + must.NoError(t, err) + must.Eq(t, out, eval1, must.Sprint("expected 1st eval")) - // Check the stats + // Check the stats and state stats = b.Stats() - if stats.TotalReady != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 3 { - t.Fatalf("bad: %#v", stats) - } - - // Ack out - err = b.Ack(eval.ID, token) - if err != nil { - t.Fatalf("err: %v", err) - } - - // Check the stats - stats = b.Stats() - if stats.TotalReady != 2 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 0 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 2 { - t.Fatalf("bad: %#v", stats) - } - - // Dequeue should work - out, token, err = b.Dequeue(defaultSched, time.Second) - if err != nil { - t.Fatalf("err: %v", err) - } - if out != eval2 { - t.Fatalf("bad : %#v", out) - } - - // Check the stats - stats = b.Stats() - if stats.TotalReady != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 2 { - t.Fatalf("bad: %#v", stats) - } - - // Ack out - err = b.Ack(eval2.ID, token) - if err != nil { - t.Fatalf("err: %v", err) - } - - // Check the stats - stats = b.Stats() - if stats.TotalReady != 2 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 0 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 1 { - t.Fatalf("bad: %#v", stats) - } - - // Dequeue should work - out, token, err = b.Dequeue(defaultSched, time.Second) - if err != nil { - t.Fatalf("err: %v", err) - } - if out != eval3 { - t.Fatalf("bad : %#v", out) - } - - // Check the stats - stats = b.Stats() - if stats.TotalReady != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 1 { - t.Fatalf("bad: %#v", stats) - } + must.Eq(t, stats.TotalReady, 1, must.Sprintf("expected 1 ready: %#v", stats)) + must.Eq(t, stats.TotalUnacked, 1, must.Sprintf("expected 1 unacked: %#v", stats)) + must.Eq(t, stats.TotalBlocked, 3, must.Sprintf("expected 3 blocked: %#v", stats)) + must.Eq(t, b.cancelable.Len(), 0, must.Sprintf("expected 0 cancelable")) - // Ack out - err = b.Ack(eval3.ID, token) - if err != nil { - t.Fatalf("err: %v", err) - } + // Ack should clear the rest of namespace-one blocked + eval1.ModifyIndex = 3 // update as Eval.getWaitIndex would + err = b.Ack(eval1.ID, token) + must.NoError(t, err) - // Check the stats + // Check the stats and state stats = b.Stats() - if stats.TotalReady != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 0 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 1 { - t.Fatalf("bad: %#v", stats) - } + must.Eq(t, stats.TotalReady, 1, must.Sprintf("expected 1 ready: %#v", stats)) + must.Eq(t, stats.TotalUnacked, 0, must.Sprintf("expected 0 unacked: %#v", stats)) + must.Eq(t, stats.TotalBlocked, 1, must.Sprintf("expected 1 blocked: %#v", stats)) + must.Eq(t, b.cancelable.Len(), 2, must.Sprintf("expected 2 cancelable")) // Dequeue should work out, token, err = b.Dequeue(defaultSched, time.Second) - if err != nil { - t.Fatalf("err: %v", err) - } - if out != eval4 { - t.Fatalf("bad : %#v", out) - } + must.NoError(t, err) + must.Eq(t, out, eval4, must.Sprint("expected 4th eval")) - // Check the stats + // Check the stats and state stats = b.Stats() - if stats.TotalReady != 0 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 1 { - t.Fatalf("bad: %#v", stats) - } + must.Eq(t, stats.TotalReady, 0, must.Sprintf("expected 0 ready: %#v", stats)) + must.Eq(t, stats.TotalUnacked, 1, must.Sprintf("expected 1 unacked: %#v", stats)) + must.Eq(t, stats.TotalBlocked, 1, must.Sprintf("expected 1 blocked: %#v", stats)) + must.Eq(t, b.cancelable.Len(), 2, must.Sprintf("expected 2 cancelable")) - // Ack out + // Ack should clear the rest of namespace-two blocked + eval4.ModifyIndex = 5 // update as Eval.getWaitIndex would err = b.Ack(eval4.ID, token) - if err != nil { - t.Fatalf("err: %v", err) - } + must.NoError(t, err) - // Check the stats + // Check the stats and state stats = b.Stats() - if stats.TotalReady != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 0 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 0 { - t.Fatalf("bad: %#v", stats) - } + must.Eq(t, stats.TotalReady, 0, must.Sprintf("expected 0 ready: %#v", stats)) + must.Eq(t, stats.TotalUnacked, 0, must.Sprintf("expected 0 unacked: %#v", stats)) + must.Eq(t, stats.TotalBlocked, 0, must.Sprintf("expected 0 blocked: %#v", stats)) + must.Eq(t, b.cancelable.Len(), 3, must.Sprintf("expected 3 cancelable")) - // Dequeue should work - out, token, err = b.Dequeue(defaultSched, time.Second) - if err != nil { - t.Fatalf("err: %v", err) - } - if out != eval5 { - t.Fatalf("bad : %#v", out) - } - - // Check the stats - stats = b.Stats() - if stats.TotalReady != 0 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 0 { - t.Fatalf("bad: %#v", stats) - } - - // Ack out - err = b.Ack(eval5.ID, token) - if err != nil { - t.Fatalf("err: %v", err) - } - - // Check the stats - stats = b.Stats() - if stats.TotalReady != 0 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 0 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 0 { - t.Fatalf("bad: %#v", stats) - } } func TestEvalBroker_Enqueue_Disable(t *testing.T) { @@ -813,18 +667,18 @@ func TestEvalBroker_Dequeue_FIFO(t *testing.T) { b.SetEnabled(true) NUM := 100 - for i := 0; i < NUM; i++ { + for i := NUM; i > 0; i-- { eval1 := mock.Eval() eval1.CreateIndex = uint64(i) eval1.ModifyIndex = uint64(i) b.Enqueue(eval1) } - for i := 0; i < NUM; i++ { + for i := 1; i < NUM; i++ { out1, _, _ := b.Dequeue(defaultSched, time.Second) - if out1.CreateIndex != uint64(i) { - t.Fatalf("bad: %d %#v", i, out1) - } + must.Eq(t, out1.CreateIndex, uint64(i), + must.Sprintf("eval was not FIFO by CreateIndex"), + ) } } @@ -1506,3 +1360,40 @@ func TestEvalBroker_NamespacedJobs(t *testing.T) { require.Equal(1, len(b.blocked)) } + +func TestEvalBroker_BlockedEvals_MarkForCancel(t *testing.T) { + ci.Parallel(t) + + blocked := PendingEvaluations{} + + // evals are pushed on the blocked queue FIFO by CreateIndex order, so Push + // them in reverse order here to assert we're getting them back out in the + // right order and not just as inserted + for i := 100; i > 0; i -= 10 { + eval := mock.Eval() + eval.JobID = "example" + eval.CreateIndex = uint64(i) + eval.ModifyIndex = uint64(i) + heap.Push(&blocked, eval) + } + + canceled := blocked.MarkForCancel(40) + must.Eq(t, canceled.Len(), 3) + must.Eq(t, blocked.Len(), 7) + + got := []uint64{} + for i := 0; i < 7; i++ { + raw := heap.Pop(&blocked) + must.NotNil(t, raw) + eval := raw.(*structs.Evaluation) + got = append(got, eval.CreateIndex) + } + must.Eq(t, []uint64{40, 50, 60, 70, 80, 90, 100}, got) + + popped := canceled.PopN(2) + must.Len(t, 1, canceled) + must.Len(t, 2, popped) + + must.Eq(t, popped[0].CreateIndex, 10) + must.Eq(t, popped[1].CreateIndex, 20) +} diff --git a/nomad/eval_endpoint.go b/nomad/eval_endpoint.go index 575f7edd9393..2b67fd1194eb 100644 --- a/nomad/eval_endpoint.go +++ b/nomad/eval_endpoint.go @@ -229,6 +229,27 @@ func (e *Eval) Ack(args *structs.EvalAckRequest, if err := e.srv.evalBroker.Ack(args.EvalID, args.Token); err != nil { return err } + + const cancelDesc = "cancelled after more recent eval was processed" + + cancelable := e.srv.evalBroker.Cancelable(1000) + if len(cancelable) > 0 { + for _, eval := range cancelable { + eval.Status = structs.EvalStatusCancelled + eval.StatusDescription = cancelDesc + } + + update := &structs.EvalUpdateRequest{ + Evals: cancelable, + WriteRequest: structs.WriteRequest{Region: args.Region}, + } + _, _, err = e.srv.raftApply(structs.EvalUpdateRequestType, update) + if err != nil { + e.logger.Error("eval cancel failed", "error", err, "method", "ack") + return err + } + } + return nil }