From 4ccb636f73bfa35c312dbcb39a41f2a53a877e53 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Tue, 20 Sep 2022 17:04:51 -0400 Subject: [PATCH] eval broker: shed blocked evals older than acknowledged eval When an evaluation is acknowledged by a scheduler, the resulting plan is guaranteed to cover up to the `ModifyIndex` set by the worker based on the most recent evaluation for that job in the state store. At that point, we no longer need to retain blocked evaluations in the broker that are older than that index. Move these stale evals into a canceled set. When the `Eval.Ack` RPC returns from the eval broker it will retrieve a batch of canceable evals to write to raft. This paces the cancelations limited by how frequently the schedulers are acknowledging evals; this should reduce the risk of cancelations from overwhelming raft relative to scheduler progress. --- nomad/eval_broker.go | 78 +++++++++- nomad/eval_broker_test.go | 291 ++++++++++++-------------------------- nomad/eval_endpoint.go | 21 +++ 3 files changed, 185 insertions(+), 205 deletions(-) diff --git a/nomad/eval_broker.go b/nomad/eval_broker.go index e13394b17258..5a60c1ce5f6b 100644 --- a/nomad/eval_broker.go +++ b/nomad/eval_broker.go @@ -66,6 +66,10 @@ type EvalBroker struct { // blocked tracks the blocked evaluations by JobID in a priority queue blocked map[structs.NamespacedID]PendingEvaluations + // cancelable tracks previously blocked evaluations (for any job) that are + // now safe for the Eval.Ack RPC to cancel in batches + cancelable PendingEvaluations + // ready tracks the ready jobs by scheduler in a priority queue ready map[string]PendingEvaluations @@ -140,6 +144,7 @@ func NewEvalBroker(timeout, initialNackDelay, subsequentNackDelay time.Duration, evals: make(map[string]int), jobEvals: make(map[structs.NamespacedID]string), blocked: make(map[structs.NamespacedID]PendingEvaluations), + cancelable: PendingEvaluations{}, ready: make(map[string]PendingEvaluations), unack: make(map[string]*unackEval), waiting: make(map[string]chan struct{}), @@ -559,6 +564,7 @@ func (b *EvalBroker) Ack(evalID, token string) error { return fmt.Errorf("Token does not match for Evaluation ID") } jobID := unack.Eval.JobID + oldestUnackedIndex := unack.Eval.ModifyIndex // Ensure we were able to stop the timer if !unack.NackTimer.Stop() { @@ -586,15 +592,27 @@ func (b *EvalBroker) Ack(evalID, token string) error { // Check if there are any blocked evaluations if blocked := b.blocked[namespacedID]; len(blocked) != 0 { - raw := heap.Pop(&blocked) + + // Any blocked evaluations with ModifyIndexes older than the just-ack'd + // evaluation are no longer useful, so it's safe to drop them. + cancelable := blocked.MarkForCancel(oldestUnackedIndex) + b.cancelable = append(b.cancelable, cancelable...) + b.stats.TotalBlocked -= cancelable.Len() + + // If any remain, enqueue the eval with the *highest* index + if len(blocked) > 0 { + raw := heap.Pop(&blocked) + eval := raw.(*structs.Evaluation) + b.stats.TotalBlocked -= 1 + b.enqueueLocked(eval, eval.Type) + } + + // Clean up if there are no more after that if len(blocked) > 0 { b.blocked[namespacedID] = blocked } else { delete(b.blocked, namespacedID) } - eval := raw.(*structs.Evaluation) - b.stats.TotalBlocked -= 1 - b.enqueueLocked(eval, eval.Type) } // Re-enqueue the evaluation. @@ -738,6 +756,7 @@ func (b *EvalBroker) flush() { b.evals = make(map[string]int) b.jobEvals = make(map[structs.NamespacedID]string) b.blocked = make(map[structs.NamespacedID]PendingEvaluations) + b.cancelable = PendingEvaluations{} b.ready = make(map[string]PendingEvaluations) b.unack = make(map[string]*unackEval) b.timeWait = make(map[string]*time.Timer) @@ -841,6 +860,15 @@ func (b *EvalBroker) Stats() *BrokerStats { return stats } +// Cancelable retrieves a batch of previously-blocked evaluations that are now +// stale and ready to mark for canceling. The eval RPC will call this with a +// batch size set to avoid sending overly large raft messages. +func (b *EvalBroker) Cancelable(batchSize int) []*structs.Evaluation { + b.l.RLock() + defer b.l.RUnlock() + return b.cancelable.PopN(batchSize) +} + // EmitStats is used to export metrics about the broker while enabled func (b *EvalBroker) EmitStats(period time.Duration, stopCh <-chan struct{}) { timer, stop := helper.NewSafeTimer(period) @@ -904,7 +932,7 @@ func (p PendingEvaluations) Less(i, j int) bool { if p[i].JobID != p[j].JobID && p[i].Priority != p[j].Priority { return !(p[i].Priority < p[j].Priority) } - return p[i].CreateIndex < p[j].CreateIndex + return (p[i].CreateIndex < p[j].CreateIndex) } // Swap is for the sorting interface @@ -934,3 +962,43 @@ func (p PendingEvaluations) Peek() *structs.Evaluation { } return p[n-1] } + +// PopN removes and returns the minimum N evaluations from the slice. +func (p *PendingEvaluations) PopN(n int) []*structs.Evaluation { + if n > len(*p) { + n = len(*p) + } + + popped := []*structs.Evaluation{} + for i := 0; i < n; i++ { + raw := heap.Pop(p) + eval := raw.(*structs.Evaluation) + popped = append(popped, eval) + } + return popped +} + +// MarkForCancel is used to remove any evaluations older than the index from the +// blocked list and returns a list of cancelable evals so that Eval.Ack RPCs can +// write batched raft entries to cancel them. This must be called inside the +// broker's lock. +func (p *PendingEvaluations) MarkForCancel(index uint64) PendingEvaluations { + + // In pathological cases, we can have a large number of blocked evals but + // will want to cancel most of them. Using heap.Remove requires we re-sort + // for each eval we remove. Because we expect to have very few if any + // blocked remaining, we'll just create a new heap + retain := PendingEvaluations{} + cancelable := PendingEvaluations{} + + for _, eval := range *p { + if eval.ModifyIndex >= index { + heap.Push(&retain, eval) + } else { + heap.Push(&cancelable, eval) + } + } + + *p = retain + return cancelable +} diff --git a/nomad/eval_broker_test.go b/nomad/eval_broker_test.go index 3b0988eae724..36255bb0fa6a 100644 --- a/nomad/eval_broker_test.go +++ b/nomad/eval_broker_test.go @@ -1,6 +1,7 @@ package nomad import ( + "container/heap" "encoding/json" "errors" "fmt" @@ -11,6 +12,7 @@ import ( "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" + "github.com/shoenig/test/must" "github.com/stretchr/testify/require" ) @@ -393,236 +395,88 @@ func TestEvalBroker_Serialize_DuplicateJobID(t *testing.T) { ns1 := "namespace-one" ns2 := "namespace-two" - eval := mock.Eval() - eval.Namespace = ns1 - b.Enqueue(eval) + eval1 := mock.Eval() + eval1.Namespace = ns1 + eval1.CreateIndex = 1 + b.Enqueue(eval1) eval2 := mock.Eval() - eval2.JobID = eval.JobID + eval2.JobID = eval1.JobID eval2.Namespace = ns1 - eval2.CreateIndex = eval.CreateIndex + 1 + eval2.CreateIndex = 2 b.Enqueue(eval2) eval3 := mock.Eval() - eval3.JobID = eval.JobID + eval3.JobID = eval1.JobID eval3.Namespace = ns1 - eval3.CreateIndex = eval.CreateIndex + 2 + eval3.CreateIndex = 3 b.Enqueue(eval3) eval4 := mock.Eval() - eval4.JobID = eval.JobID + eval4.JobID = eval1.JobID eval4.Namespace = ns2 - eval4.CreateIndex = eval.CreateIndex + 3 + eval4.CreateIndex = 4 b.Enqueue(eval4) eval5 := mock.Eval() - eval5.JobID = eval.JobID + eval5.JobID = eval1.JobID eval5.Namespace = ns2 - eval5.CreateIndex = eval.CreateIndex + 4 + eval5.CreateIndex = 5 b.Enqueue(eval5) stats := b.Stats() - if stats.TotalReady != 2 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 3 { - t.Fatalf("bad: %#v", stats) - } + must.Eq(t, stats.TotalReady, 2, must.Sprintf("expected 2 ready: %#v", stats)) + must.Eq(t, stats.TotalUnacked, 0, must.Sprintf("expected 0 unacked: %#v", stats)) + must.Eq(t, stats.TotalBlocked, 3, must.Sprintf("expected 3 blocked: %#v", stats)) - // Dequeue should work + // Dequeue should get 1st eval out, token, err := b.Dequeue(defaultSched, time.Second) - if err != nil { - t.Fatalf("err: %v", err) - } - if out != eval { - t.Fatalf("bad : %#v", out) - } + must.NoError(t, err) + must.Eq(t, out, eval1, must.Sprint("expected 1st eval")) - // Check the stats + // Check the stats and state stats = b.Stats() - if stats.TotalReady != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 3 { - t.Fatalf("bad: %#v", stats) - } - - // Ack out - err = b.Ack(eval.ID, token) - if err != nil { - t.Fatalf("err: %v", err) - } - - // Check the stats - stats = b.Stats() - if stats.TotalReady != 2 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 0 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 2 { - t.Fatalf("bad: %#v", stats) - } - - // Dequeue should work - out, token, err = b.Dequeue(defaultSched, time.Second) - if err != nil { - t.Fatalf("err: %v", err) - } - if out != eval2 { - t.Fatalf("bad : %#v", out) - } - - // Check the stats - stats = b.Stats() - if stats.TotalReady != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 2 { - t.Fatalf("bad: %#v", stats) - } - - // Ack out - err = b.Ack(eval2.ID, token) - if err != nil { - t.Fatalf("err: %v", err) - } - - // Check the stats - stats = b.Stats() - if stats.TotalReady != 2 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 0 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 1 { - t.Fatalf("bad: %#v", stats) - } - - // Dequeue should work - out, token, err = b.Dequeue(defaultSched, time.Second) - if err != nil { - t.Fatalf("err: %v", err) - } - if out != eval3 { - t.Fatalf("bad : %#v", out) - } - - // Check the stats - stats = b.Stats() - if stats.TotalReady != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 1 { - t.Fatalf("bad: %#v", stats) - } + must.Eq(t, stats.TotalReady, 1, must.Sprintf("expected 1 ready: %#v", stats)) + must.Eq(t, stats.TotalUnacked, 1, must.Sprintf("expected 1 unacked: %#v", stats)) + must.Eq(t, stats.TotalBlocked, 3, must.Sprintf("expected 3 blocked: %#v", stats)) + must.Eq(t, b.cancelable.Len(), 0, must.Sprintf("expected 0 cancelable")) - // Ack out - err = b.Ack(eval3.ID, token) - if err != nil { - t.Fatalf("err: %v", err) - } + // Ack should clear the rest of namespace-one blocked + eval1.ModifyIndex = 3 // update as Eval.getWaitIndex would + err = b.Ack(eval1.ID, token) + must.NoError(t, err) - // Check the stats + // Check the stats and state stats = b.Stats() - if stats.TotalReady != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 0 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 1 { - t.Fatalf("bad: %#v", stats) - } + must.Eq(t, stats.TotalReady, 1, must.Sprintf("expected 1 ready: %#v", stats)) + must.Eq(t, stats.TotalUnacked, 0, must.Sprintf("expected 0 unacked: %#v", stats)) + must.Eq(t, stats.TotalBlocked, 1, must.Sprintf("expected 1 blocked: %#v", stats)) + must.Eq(t, b.cancelable.Len(), 2, must.Sprintf("expected 2 cancelable")) // Dequeue should work out, token, err = b.Dequeue(defaultSched, time.Second) - if err != nil { - t.Fatalf("err: %v", err) - } - if out != eval4 { - t.Fatalf("bad : %#v", out) - } + must.NoError(t, err) + must.Eq(t, out, eval4, must.Sprint("expected 4th eval")) - // Check the stats + // Check the stats and state stats = b.Stats() - if stats.TotalReady != 0 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 1 { - t.Fatalf("bad: %#v", stats) - } + must.Eq(t, stats.TotalReady, 0, must.Sprintf("expected 0 ready: %#v", stats)) + must.Eq(t, stats.TotalUnacked, 1, must.Sprintf("expected 1 unacked: %#v", stats)) + must.Eq(t, stats.TotalBlocked, 1, must.Sprintf("expected 1 blocked: %#v", stats)) + must.Eq(t, b.cancelable.Len(), 2, must.Sprintf("expected 2 cancelable")) - // Ack out + // Ack should clear the rest of namespace-two blocked + eval4.ModifyIndex = 5 // update as Eval.getWaitIndex would err = b.Ack(eval4.ID, token) - if err != nil { - t.Fatalf("err: %v", err) - } + must.NoError(t, err) - // Check the stats + // Check the stats and state stats = b.Stats() - if stats.TotalReady != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 0 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 0 { - t.Fatalf("bad: %#v", stats) - } + must.Eq(t, stats.TotalReady, 0, must.Sprintf("expected 0 ready: %#v", stats)) + must.Eq(t, stats.TotalUnacked, 0, must.Sprintf("expected 0 unacked: %#v", stats)) + must.Eq(t, stats.TotalBlocked, 0, must.Sprintf("expected 0 blocked: %#v", stats)) + must.Eq(t, b.cancelable.Len(), 3, must.Sprintf("expected 3 cancelable")) - // Dequeue should work - out, token, err = b.Dequeue(defaultSched, time.Second) - if err != nil { - t.Fatalf("err: %v", err) - } - if out != eval5 { - t.Fatalf("bad : %#v", out) - } - - // Check the stats - stats = b.Stats() - if stats.TotalReady != 0 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 0 { - t.Fatalf("bad: %#v", stats) - } - - // Ack out - err = b.Ack(eval5.ID, token) - if err != nil { - t.Fatalf("err: %v", err) - } - - // Check the stats - stats = b.Stats() - if stats.TotalReady != 0 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 0 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 0 { - t.Fatalf("bad: %#v", stats) - } } func TestEvalBroker_Enqueue_Disable(t *testing.T) { @@ -813,18 +667,18 @@ func TestEvalBroker_Dequeue_FIFO(t *testing.T) { b.SetEnabled(true) NUM := 100 - for i := 0; i < NUM; i++ { + for i := NUM; i > 0; i-- { eval1 := mock.Eval() eval1.CreateIndex = uint64(i) eval1.ModifyIndex = uint64(i) b.Enqueue(eval1) } - for i := 0; i < NUM; i++ { + for i := 1; i < NUM; i++ { out1, _, _ := b.Dequeue(defaultSched, time.Second) - if out1.CreateIndex != uint64(i) { - t.Fatalf("bad: %d %#v", i, out1) - } + must.Eq(t, out1.CreateIndex, uint64(i), + must.Sprintf("eval was not FIFO by CreateIndex"), + ) } } @@ -1506,3 +1360,40 @@ func TestEvalBroker_NamespacedJobs(t *testing.T) { require.Equal(1, len(b.blocked)) } + +func TestEvalBroker_BlockedEvals_MarkForCancel(t *testing.T) { + ci.Parallel(t) + + blocked := PendingEvaluations{} + + // evals are pushed on the blocked queue FIFO by CreateIndex order, so Push + // them in reverse order here to assert we're getting them back out in the + // right order and not just as inserted + for i := 100; i > 0; i -= 10 { + eval := mock.Eval() + eval.JobID = "example" + eval.CreateIndex = uint64(i) + eval.ModifyIndex = uint64(i) + heap.Push(&blocked, eval) + } + + canceled := blocked.MarkForCancel(40) + must.Eq(t, canceled.Len(), 3) + must.Eq(t, blocked.Len(), 7) + + got := []uint64{} + for i := 0; i < 7; i++ { + raw := heap.Pop(&blocked) + must.NotNil(t, raw) + eval := raw.(*structs.Evaluation) + got = append(got, eval.CreateIndex) + } + must.Eq(t, []uint64{40, 50, 60, 70, 80, 90, 100}, got) + + popped := canceled.PopN(2) + must.Len(t, 1, canceled) + must.Len(t, 2, popped) + + must.Eq(t, popped[0].CreateIndex, 10) + must.Eq(t, popped[1].CreateIndex, 20) +} diff --git a/nomad/eval_endpoint.go b/nomad/eval_endpoint.go index 575f7edd9393..2b67fd1194eb 100644 --- a/nomad/eval_endpoint.go +++ b/nomad/eval_endpoint.go @@ -229,6 +229,27 @@ func (e *Eval) Ack(args *structs.EvalAckRequest, if err := e.srv.evalBroker.Ack(args.EvalID, args.Token); err != nil { return err } + + const cancelDesc = "cancelled after more recent eval was processed" + + cancelable := e.srv.evalBroker.Cancelable(1000) + if len(cancelable) > 0 { + for _, eval := range cancelable { + eval.Status = structs.EvalStatusCancelled + eval.StatusDescription = cancelDesc + } + + update := &structs.EvalUpdateRequest{ + Evals: cancelable, + WriteRequest: structs.WriteRequest{Region: args.Region}, + } + _, _, err = e.srv.raftApply(structs.EvalUpdateRequestType, update) + if err != nil { + e.logger.Error("eval cancel failed", "error", err, "method", "ack") + return err + } + } + return nil }