From e37203736975250ae8ec37f4b0a277160f7f255b Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Tue, 20 Sep 2022 17:04:51 -0400 Subject: [PATCH] eval broker: shed all but one blocked eval per job after ack When an evaluation is acknowledged by a scheduler, the resulting plan is guaranteed to cover up to the `waitIndex` set by the worker based on the most recent evaluation for that job in the state store. At that point, we no longer need to retain blocked evaluations in the broker that are older than that index. Move all but the highest priority / highest `ModifyIndex` blocked eval into a canceled set. When the `Eval.Ack` RPC returns from the eval broker it will retrieve a batch of canceable evals to write to raft. This paces the cancelations limited by how frequently the schedulers are acknowledging evals; this should reduce the risk of cancelations from overwhelming raft relative to scheduler progress. In order to avoid straggling batches when the cluster is quiet, we also include a periodic sweep through the cancelable list. --- .changelog/14621.txt | 3 + nomad/config.go | 7 + nomad/eval_broker.go | 132 ++++++++-- nomad/eval_broker_test.go | 501 ++++++++++++++++++++++---------------- nomad/eval_endpoint.go | 35 ++- nomad/leader.go | 21 ++ nomad/worker_test.go | 54 ++-- 7 files changed, 489 insertions(+), 264 deletions(-) create mode 100644 .changelog/14621.txt diff --git a/.changelog/14621.txt b/.changelog/14621.txt new file mode 100644 index 000000000000..7ca9a7ea863f --- /dev/null +++ b/.changelog/14621.txt @@ -0,0 +1,3 @@ +```release-note:improvement +eval broker: shed all but one blocked eval per job after successful ack +``` diff --git a/nomad/config.go b/nomad/config.go index 7014ef987fe5..2bcb8714bb8a 100644 --- a/nomad/config.go +++ b/nomad/config.go @@ -252,6 +252,12 @@ type Config struct { // retrying a failed evaluation. EvalFailedFollowupBaselineDelay time.Duration + // EvalReapCancelableInterval is the interval for the periodic reaping of + // cancelable evaluations. Cancelable evaluations are canceled whenever any + // eval is ack'd but this sweeps up on quiescent clusters. This config value + // exists only for testing. + EvalReapCancelableInterval time.Duration + // EvalFailedFollowupDelayRange defines the range of additional time from // the baseline in which to wait before retrying a failed evaluation. The // additional delay is selected from this range randomly. @@ -471,6 +477,7 @@ func DefaultConfig() *Config { EvalNackSubsequentReenqueueDelay: 20 * time.Second, EvalFailedFollowupBaselineDelay: 1 * time.Minute, EvalFailedFollowupDelayRange: 5 * time.Minute, + EvalReapCancelableInterval: 5 * time.Second, MinHeartbeatTTL: 10 * time.Second, MaxHeartbeatsPerSecond: 50.0, HeartbeatGrace: 10 * time.Second, diff --git a/nomad/eval_broker.go b/nomad/eval_broker.go index e13394b17258..e4022ef88ec1 100644 --- a/nomad/eval_broker.go +++ b/nomad/eval_broker.go @@ -64,7 +64,11 @@ type EvalBroker struct { jobEvals map[structs.NamespacedID]string // blocked tracks the blocked evaluations by JobID in a priority queue - blocked map[structs.NamespacedID]PendingEvaluations + blocked map[structs.NamespacedID]BlockedEvaluations + + // cancelable tracks previously blocked evaluations (for any job) that are + // now safe for the Eval.Ack RPC to cancel in batches + cancelable []*structs.Evaluation // ready tracks the ready jobs by scheduler in a priority queue ready map[string]PendingEvaluations @@ -115,11 +119,14 @@ type unackEval struct { NackTimer *time.Timer } -// PendingEvaluations is a list of waiting evaluations. -// We implement the container/heap interface so that this is a -// priority queue +// PendingEvaluations is a list of ready evaluations across multiple jobs. We +// implement the container/heap interface so that this is a priority queue. type PendingEvaluations []*structs.Evaluation +// BlockedEvaluations is a list of blocked evaluations for a given job. We +// implement the container/heap interface so that this is a priority queue. +type BlockedEvaluations []*structs.Evaluation + // NewEvalBroker creates a new evaluation broker. This is parameterized // with the timeout used for messages that are not acknowledged before we // assume a Nack and attempt to redeliver as well as the deliveryLimit @@ -139,7 +146,8 @@ func NewEvalBroker(timeout, initialNackDelay, subsequentNackDelay time.Duration, stats: new(BrokerStats), evals: make(map[string]int), jobEvals: make(map[structs.NamespacedID]string), - blocked: make(map[structs.NamespacedID]PendingEvaluations), + blocked: make(map[structs.NamespacedID]BlockedEvaluations), + cancelable: []*structs.Evaluation{}, ready: make(map[string]PendingEvaluations), unack: make(map[string]*unackEval), waiting: make(map[string]chan struct{}), @@ -586,15 +594,28 @@ func (b *EvalBroker) Ack(evalID, token string) error { // Check if there are any blocked evaluations if blocked := b.blocked[namespacedID]; len(blocked) != 0 { - raw := heap.Pop(&blocked) + + // Any blocked evaluations with ModifyIndexes older than the just-ack'd + // evaluation are no longer useful, so it's safe to drop them. + cancelable := blocked.MarkForCancel() + b.cancelable = append(b.cancelable, cancelable...) + b.stats.TotalCancelable = len(b.cancelable) + b.stats.TotalBlocked -= len(cancelable) + + // If any remain, enqueue an eval + if len(blocked) > 0 { + raw := heap.Pop(&blocked) + eval := raw.(*structs.Evaluation) + b.stats.TotalBlocked -= 1 + b.enqueueLocked(eval, eval.Type) + } + + // Clean up if there are no more after that if len(blocked) > 0 { b.blocked[namespacedID] = blocked } else { delete(b.blocked, namespacedID) } - eval := raw.(*structs.Evaluation) - b.stats.TotalBlocked -= 1 - b.enqueueLocked(eval, eval.Type) } // Re-enqueue the evaluation. @@ -733,11 +754,13 @@ func (b *EvalBroker) flush() { b.stats.TotalUnacked = 0 b.stats.TotalBlocked = 0 b.stats.TotalWaiting = 0 + b.stats.TotalCancelable = 0 b.stats.DelayedEvals = make(map[string]*structs.Evaluation) b.stats.ByScheduler = make(map[string]*SchedulerStats) b.evals = make(map[string]int) b.jobEvals = make(map[structs.NamespacedID]string) - b.blocked = make(map[structs.NamespacedID]PendingEvaluations) + b.blocked = make(map[structs.NamespacedID]BlockedEvaluations) + b.cancelable = []*structs.Evaluation{} b.ready = make(map[string]PendingEvaluations) b.unack = make(map[string]*unackEval) b.timeWait = make(map[string]*time.Timer) @@ -830,6 +853,7 @@ func (b *EvalBroker) Stats() *BrokerStats { stats.TotalUnacked = b.stats.TotalUnacked stats.TotalBlocked = b.stats.TotalBlocked stats.TotalWaiting = b.stats.TotalWaiting + stats.TotalCancelable = b.stats.TotalCancelable for id, eval := range b.stats.DelayedEvals { evalCopy := *eval stats.DelayedEvals[id] = &evalCopy @@ -841,6 +865,24 @@ func (b *EvalBroker) Stats() *BrokerStats { return stats } +// Cancelable retrieves a batch of previously-blocked evaluations that are now +// stale and ready to mark for canceling. The eval RPC will call this with a +// batch size set to avoid sending overly large raft messages. +func (b *EvalBroker) Cancelable(batchSize int) []*structs.Evaluation { + b.l.RLock() + defer b.l.RUnlock() + + if batchSize > len(b.cancelable) { + batchSize = len(b.cancelable) + } + + cancelable := b.cancelable[:batchSize] + b.cancelable = b.cancelable[batchSize:] + + b.stats.TotalCancelable = len(b.cancelable) + return cancelable +} + // EmitStats is used to export metrics about the broker while enabled func (b *EvalBroker) EmitStats(period time.Duration, stopCh <-chan struct{}) { timer, stop := helper.NewSafeTimer(period) @@ -856,6 +898,7 @@ func (b *EvalBroker) EmitStats(period time.Duration, stopCh <-chan struct{}) { metrics.SetGauge([]string{"nomad", "broker", "total_unacked"}, float32(stats.TotalUnacked)) metrics.SetGauge([]string{"nomad", "broker", "total_blocked"}, float32(stats.TotalBlocked)) metrics.SetGauge([]string{"nomad", "broker", "total_waiting"}, float32(stats.TotalWaiting)) + metrics.SetGauge([]string{"nomad", "broker", "total_cancelable"}, float32(stats.TotalCancelable)) for _, eval := range stats.DelayedEvals { metrics.SetGaugeWithLabels([]string{"nomad", "broker", "eval_waiting"}, float32(time.Until(eval.WaitUntil).Seconds()), @@ -878,12 +921,13 @@ func (b *EvalBroker) EmitStats(period time.Duration, stopCh <-chan struct{}) { // BrokerStats returns all the stats about the broker type BrokerStats struct { - TotalReady int - TotalUnacked int - TotalBlocked int - TotalWaiting int - DelayedEvals map[string]*structs.Evaluation - ByScheduler map[string]*SchedulerStats + TotalReady int + TotalUnacked int + TotalBlocked int + TotalWaiting int + TotalCancelable int + DelayedEvals map[string]*structs.Evaluation + ByScheduler map[string]*SchedulerStats } // SchedulerStats returns the stats per scheduler @@ -934,3 +978,59 @@ func (p PendingEvaluations) Peek() *structs.Evaluation { } return p[n-1] } + +// Len is for the sorting interface +func (p BlockedEvaluations) Len() int { + return len(p) +} + +// Less is for the sorting interface. We flip the check +// so that the "min" in the min-heap is the element with the +// highest priority or highest modify index +func (p BlockedEvaluations) Less(i, j int) bool { + if p[i].Priority != p[j].Priority { + return !(p[i].Priority < p[j].Priority) + } + return !(p[i].ModifyIndex < p[j].ModifyIndex) +} + +// Swap is for the sorting interface +func (p BlockedEvaluations) Swap(i, j int) { + p[i], p[j] = p[j], p[i] +} + +// Push implements the heap interface and is used to add a new evaluation to the slice +func (p *BlockedEvaluations) Push(e interface{}) { + *p = append(*p, e.(*structs.Evaluation)) +} + +// Pop implements the heap interface and is used to remove an evaluation from the slice +func (p *BlockedEvaluations) Pop() interface{} { + n := len(*p) + e := (*p)[n-1] + (*p)[n-1] = nil + *p = (*p)[:n-1] + return e +} + +// MarkForCancel is used to clear the blocked list of all but the one with the +// highest modify index and highest priority. It returns a slice of cancelable +// evals so that Eval.Ack RPCs can write batched raft entries to cancel +// them. This must be called inside the broker's lock. +func (p *BlockedEvaluations) MarkForCancel() []*structs.Evaluation { + + // In pathological cases, we can have a large number of blocked evals but + // will want to cancel most of them. Using heap.Remove requires we re-sort + // for each eval we remove. Because we expect to have at most one remaining, + // we'll just create a new heap. + retain := BlockedEvaluations{} + + raw := heap.Pop(p) + heap.Push(&retain, raw) + + cancelable := make([]*structs.Evaluation, len(*p)) + copy(cancelable, *p) + + *p = retain + return cancelable +} diff --git a/nomad/eval_broker_test.go b/nomad/eval_broker_test.go index 3b0988eae724..c46f8bf4ac46 100644 --- a/nomad/eval_broker_test.go +++ b/nomad/eval_broker_test.go @@ -1,16 +1,21 @@ package nomad import ( + "container/heap" "encoding/json" "errors" "fmt" "testing" "time" + msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" + "github.com/shoenig/test" + "github.com/shoenig/test/must" "github.com/stretchr/testify/require" ) @@ -393,236 +398,107 @@ func TestEvalBroker_Serialize_DuplicateJobID(t *testing.T) { ns1 := "namespace-one" ns2 := "namespace-two" - eval := mock.Eval() - eval.Namespace = ns1 - b.Enqueue(eval) - - eval2 := mock.Eval() - eval2.JobID = eval.JobID - eval2.Namespace = ns1 - eval2.CreateIndex = eval.CreateIndex + 1 - b.Enqueue(eval2) - - eval3 := mock.Eval() - eval3.JobID = eval.JobID - eval3.Namespace = ns1 - eval3.CreateIndex = eval.CreateIndex + 2 - b.Enqueue(eval3) + jobID := "example" - eval4 := mock.Eval() - eval4.JobID = eval.JobID - eval4.Namespace = ns2 - eval4.CreateIndex = eval.CreateIndex + 3 - b.Enqueue(eval4) - - eval5 := mock.Eval() - eval5.JobID = eval.JobID - eval5.Namespace = ns2 - eval5.CreateIndex = eval.CreateIndex + 4 - b.Enqueue(eval5) - - stats := b.Stats() - if stats.TotalReady != 2 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 3 { - t.Fatalf("bad: %#v", stats) + newEval := func(idx uint64, ns string) *structs.Evaluation { + eval := mock.Eval() + eval.ID = fmt.Sprintf("eval:%d", idx) + eval.JobID = jobID + eval.Namespace = ns + eval.CreateIndex = idx + eval.ModifyIndex = idx + b.Enqueue(eval) + return eval + } + + // first job + eval1 := newEval(1, ns1) + newEval(2, ns1) + newEval(3, ns1) + eval4 := newEval(4, ns1) + + // second job + eval5 := newEval(5, ns2) + newEval(6, ns2) + eval7 := newEval(7, ns2) + + // retreive the stats from the broker, less some stats that aren't + // interesting for this test and make the test much more verbose + // to include + getStats := func() BrokerStats { + t.Helper() + stats := b.Stats() + stats.DelayedEvals = nil + stats.ByScheduler = nil + return *stats } - // Dequeue should work + must.Eq(t, BrokerStats{TotalReady: 2, TotalUnacked: 0, + TotalBlocked: 5, TotalCancelable: 0}, getStats()) + + // Dequeue should get 1st eval out, token, err := b.Dequeue(defaultSched, time.Second) - if err != nil { - t.Fatalf("err: %v", err) - } - if out != eval { - t.Fatalf("bad : %#v", out) - } + must.NoError(t, err) + must.Eq(t, out, eval1, must.Sprint("expected 1st eval")) - // Check the stats - stats = b.Stats() - if stats.TotalReady != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 3 { - t.Fatalf("bad: %#v", stats) - } + must.Eq(t, BrokerStats{TotalReady: 1, TotalUnacked: 1, + TotalBlocked: 5, TotalCancelable: 0}, getStats()) - // Ack out - err = b.Ack(eval.ID, token) - if err != nil { - t.Fatalf("err: %v", err) - } + // Current wait index should be 4 but Ack to exercise behavior + // when worker's Eval.getWaitIndex gets a stale index + err = b.Ack(eval1.ID, token) + must.NoError(t, err) - // Check the stats - stats = b.Stats() - if stats.TotalReady != 2 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 0 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 2 { - t.Fatalf("bad: %#v", stats) - } + must.Eq(t, BrokerStats{TotalReady: 2, TotalUnacked: 0, + TotalBlocked: 2, TotalCancelable: 2}, getStats()) - // Dequeue should work + // eval4 and eval5 are ready + // eval6 and eval7 are blocked + // Dequeue should get 4th eval out, token, err = b.Dequeue(defaultSched, time.Second) - if err != nil { - t.Fatalf("err: %v", err) - } - if out != eval2 { - t.Fatalf("bad : %#v", out) - } + must.NoError(t, err) + must.Eq(t, out, eval4, must.Sprint("expected 4th eval")) - // Check the stats - stats = b.Stats() - if stats.TotalReady != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 2 { - t.Fatalf("bad: %#v", stats) - } - - // Ack out - err = b.Ack(eval2.ID, token) - if err != nil { - t.Fatalf("err: %v", err) - } + must.Eq(t, BrokerStats{TotalReady: 1, TotalUnacked: 1, + TotalBlocked: 2, TotalCancelable: 2}, getStats()) - // Check the stats - stats = b.Stats() - if stats.TotalReady != 2 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 0 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 1 { - t.Fatalf("bad: %#v", stats) - } - - // Dequeue should work - out, token, err = b.Dequeue(defaultSched, time.Second) - if err != nil { - t.Fatalf("err: %v", err) - } - if out != eval3 { - t.Fatalf("bad : %#v", out) - } - - // Check the stats - stats = b.Stats() - if stats.TotalReady != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 1 { - t.Fatalf("bad: %#v", stats) - } - - // Ack out - err = b.Ack(eval3.ID, token) - if err != nil { - t.Fatalf("err: %v", err) - } + // Ack should clear the rest of namespace-one blocked but leave + // namespace-two untouched + err = b.Ack(eval4.ID, token) + must.NoError(t, err) - // Check the stats - stats = b.Stats() - if stats.TotalReady != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 0 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 1 { - t.Fatalf("bad: %#v", stats) - } + must.Eq(t, BrokerStats{TotalReady: 1, TotalUnacked: 0, + TotalBlocked: 2, TotalCancelable: 2}, getStats()) - // Dequeue should work + // Dequeue should get 5th eval out, token, err = b.Dequeue(defaultSched, time.Second) - if err != nil { - t.Fatalf("err: %v", err) - } - if out != eval4 { - t.Fatalf("bad : %#v", out) - } + must.NoError(t, err) + must.Eq(t, out, eval5, must.Sprint("expected 5th eval")) - // Check the stats - stats = b.Stats() - if stats.TotalReady != 0 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 1 { - t.Fatalf("bad: %#v", stats) - } + must.Eq(t, BrokerStats{TotalReady: 0, TotalUnacked: 1, + TotalBlocked: 2, TotalCancelable: 2}, getStats()) - // Ack out - err = b.Ack(eval4.ID, token) - if err != nil { - t.Fatalf("err: %v", err) - } + // Ack should clear remaining namespace-two blocked evals + err = b.Ack(eval5.ID, token) + must.NoError(t, err) - // Check the stats - stats = b.Stats() - if stats.TotalReady != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 0 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 0 { - t.Fatalf("bad: %#v", stats) - } + must.Eq(t, BrokerStats{TotalReady: 1, TotalUnacked: 0, + TotalBlocked: 0, TotalCancelable: 3}, getStats()) - // Dequeue should work + // Dequeue should get 7th eval because that's all that's left out, token, err = b.Dequeue(defaultSched, time.Second) - if err != nil { - t.Fatalf("err: %v", err) - } - if out != eval5 { - t.Fatalf("bad : %#v", out) - } + must.NoError(t, err) + must.Eq(t, out, eval7, must.Sprint("expected 7th eval")) - // Check the stats - stats = b.Stats() - if stats.TotalReady != 0 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 0 { - t.Fatalf("bad: %#v", stats) - } + must.Eq(t, BrokerStats{TotalReady: 0, TotalUnacked: 1, + TotalBlocked: 0, TotalCancelable: 3}, getStats()) - // Ack out - err = b.Ack(eval5.ID, token) - if err != nil { - t.Fatalf("err: %v", err) - } + // Last ack should leave the broker empty except for cancels + err = b.Ack(eval7.ID, token) + must.NoError(t, err) - // Check the stats - stats = b.Stats() - if stats.TotalReady != 0 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 0 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 0 { - t.Fatalf("bad: %#v", stats) - } + must.Eq(t, BrokerStats{TotalReady: 0, TotalUnacked: 0, + TotalBlocked: 0, TotalCancelable: 3}, getStats()) } func TestEvalBroker_Enqueue_Disable(t *testing.T) { @@ -813,18 +689,18 @@ func TestEvalBroker_Dequeue_FIFO(t *testing.T) { b.SetEnabled(true) NUM := 100 - for i := 0; i < NUM; i++ { + for i := NUM; i > 0; i-- { eval1 := mock.Eval() eval1.CreateIndex = uint64(i) eval1.ModifyIndex = uint64(i) b.Enqueue(eval1) } - for i := 0; i < NUM; i++ { + for i := 1; i < NUM; i++ { out1, _, _ := b.Dequeue(defaultSched, time.Second) - if out1.CreateIndex != uint64(i) { - t.Fatalf("bad: %d %#v", i, out1) - } + must.Eq(t, uint64(i), out1.CreateIndex, + must.Sprintf("eval was not FIFO by CreateIndex"), + ) } } @@ -1506,3 +1382,202 @@ func TestEvalBroker_NamespacedJobs(t *testing.T) { require.Equal(1, len(b.blocked)) } + +func TestEvalBroker_PendingEvals_Ordering(t *testing.T) { + + ready := PendingEvaluations{} + + newEval := func(jobID, evalID string, priority int, index uint64) *structs.Evaluation { + eval := mock.Eval() + eval.JobID = jobID + eval.ID = evalID + eval.Priority = priority + eval.CreateIndex = uint64(index) + return eval + } + + // note: we're intentionally pushing these out-of-order to assert we're + // getting them back out in the intended order and not just as inserted + heap.Push(&ready, newEval("example1", "eval01", 50, 1)) + heap.Push(&ready, newEval("example3", "eval03", 70, 3)) + heap.Push(&ready, newEval("example2", "eval02", 50, 2)) + + next := heap.Pop(&ready).(*structs.Evaluation) + test.Eq(t, "eval03", next.ID, + test.Sprint("expected highest Priority to be next ready")) + + next = heap.Pop(&ready).(*structs.Evaluation) + test.Eq(t, "eval01", next.ID, + test.Sprint("expected oldest CreateIndex to be next ready")) + + heap.Push(&ready, newEval("example4", "eval04", 50, 4)) + + next = heap.Pop(&ready).(*structs.Evaluation) + test.Eq(t, "eval02", next.ID, + test.Sprint("expected oldest CreateIndex to be next ready")) + +} + +func TestEvalBroker_BlockedEval_Ordering(t *testing.T) { + blocked := BlockedEvaluations{} + + newEval := func(evalID string, priority int, index uint64) *structs.Evaluation { + eval := mock.Eval() + eval.ID = evalID + eval.Priority = priority + eval.ModifyIndex = uint64(index) + return eval + } + + // note: we're intentionally pushing these out-of-order to assert we're + // getting them back out in the intended order and not just as inserted + heap.Push(&blocked, newEval("eval03", 50, 3)) + heap.Push(&blocked, newEval("eval02", 100, 2)) + heap.Push(&blocked, newEval("eval01", 50, 1)) + + unblocked := heap.Pop(&blocked).(*structs.Evaluation) + test.Eq(t, "eval02", unblocked.ID, + test.Sprint("expected eval with highest priority to get unblocked")) + + unblocked = heap.Pop(&blocked).(*structs.Evaluation) + test.Eq(t, "eval03", unblocked.ID, + test.Sprint("expected eval with highest modify index to get unblocked")) + + heap.Push(&blocked, newEval("eval04", 30, 4)) + unblocked = heap.Pop(&blocked).(*structs.Evaluation) + test.Eq(t, "eval01", unblocked.ID, + test.Sprint("expected eval with highest priority to get unblocked")) + +} + +func TestEvalBroker_BlockedEvals_MarkForCancel(t *testing.T) { + ci.Parallel(t) + + blocked := BlockedEvaluations{} + + // note: we're intentionally pushing these out-of-order to assert we're + // getting them back out in the intended order and not just as inserted + for i := 100; i > 0; i -= 10 { + eval := mock.Eval() + eval.JobID = "example" + eval.CreateIndex = uint64(i) + eval.ModifyIndex = uint64(i) + heap.Push(&blocked, eval) + } + + canceled := blocked.MarkForCancel() + must.Eq(t, 9, len(canceled)) + must.Eq(t, 1, blocked.Len()) + + raw := heap.Pop(&blocked) + must.NotNil(t, raw) + eval := raw.(*structs.Evaluation) + must.Eq(t, 100, eval.ModifyIndex) +} + +// TestEvalBroker_IntegrationTest exercises the eval broker with realistic +// workflows +func TestEvalBroker_IntegrationTest(t *testing.T) { + ci.Parallel(t) + + srv, cleanupS1 := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent dequeue + c.EvalReapCancelableInterval = time.Minute * 10 // Prevent sweep-up + }) + + defer cleanupS1() + testutil.WaitForLeader(t, srv.RPC) + + codec := rpcClient(t, srv) + store := srv.fsm.State() + + // create a system job, a node for it to run on, and a set of node up/down + // events that will result in evaluations queued. + + job := mock.SystemJob() + jobReq := &structs.JobRegisterRequest{ + Job: job, + EvalPriority: 50, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var jobResp structs.JobRegisterResponse + err := msgpackrpc.CallWithCodec(codec, "Job.Register", jobReq, &jobResp) + must.NoError(t, err) + + node := mock.Node() + nodeReq := &structs.NodeRegisterRequest{ + Node: node, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var nodeResp structs.NodeUpdateResponse + err = msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReq, &nodeResp) + must.NoError(t, err) + + for i := 0; i < 10; i++ { + status := structs.NodeStatusDown + if i%2 == 0 { + status = structs.NodeStatusReady + } + statusReq := &structs.NodeUpdateStatusRequest{ + NodeID: node.ID, + Status: status, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var statusResp structs.NodeUpdateResponse + err = msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", statusReq, &statusResp) + must.NoError(t, err) + } + + // ensure we have the expected number of evaluations and eval broker state + + // retreive the stats from the broker, less some uninteresting ones + getStats := func() BrokerStats { + t.Helper() + stats := srv.evalBroker.Stats() + stats.DelayedEvals = nil + stats.ByScheduler = nil + return *stats + } + + getEvalStatuses := func() map[string]int { + t.Helper() + statuses := map[string]int{} + iter, err := store.Evals(nil, state.SortDefault) + must.NoError(t, err) + for { + raw := iter.Next() + if raw == nil { + break + } + eval := raw.(*structs.Evaluation) + statuses[eval.Status] += 1 + if eval.Status == structs.EvalStatusCancelled { + must.Eq(t, "canceled after more recent eval was processed", eval.StatusDescription) + } + } + return statuses + } + + must.Eq(t, map[string]int{structs.EvalStatusPending: 11}, getEvalStatuses()) + must.Eq(t, BrokerStats{TotalReady: 1, TotalUnacked: 0, + TotalBlocked: 10, TotalCancelable: 0}, getStats()) + + // start schedulers: all the evals are for a single job so there should only + // be one eval processesed at a time no matter how many schedulers we run + + config := DefaultConfig() + config.NumSchedulers = 4 + config.EvalReapCancelableInterval = time.Minute * 10 + require.NoError(t, srv.Reload(config)) + + // assert that all but 2 evals were canceled and that the eval broker state + // has been cleared + + require.Eventually(t, func() bool { + got := getEvalStatuses() + return got[structs.EvalStatusComplete] == 2 && got[structs.EvalStatusCancelled] == 9 + }, 2*time.Second, time.Millisecond*100) + + must.Eq(t, BrokerStats{TotalReady: 0, TotalUnacked: 0, + TotalBlocked: 0, TotalCancelable: 0}, getStats()) +} diff --git a/nomad/eval_endpoint.go b/nomad/eval_endpoint.go index bedaf45a8084..e6ac4197308b 100644 --- a/nomad/eval_endpoint.go +++ b/nomad/eval_endpoint.go @@ -230,7 +230,8 @@ func (e *Eval) Ack(args *structs.EvalAckRequest, if err := e.srv.evalBroker.Ack(args.EvalID, args.Token); err != nil { return err } - return nil + + return cancelCancelableEvals(e.srv) } // Nack is used to negative acknowledge completion of a dequeued evaluation. @@ -766,3 +767,35 @@ func (e *Eval) Allocations(args *structs.EvalSpecificRequest, }} return e.srv.blockingRPC(&opts) } + +// cancelCancelableEvals pulls a batch of cancelable evaluations from the eval +// broker and updates their status to canceled. +func cancelCancelableEvals(srv *Server) error { + + const cancelDesc = "canceled after more recent eval was processed" + + // We *can* send larger raft logs but rough benchmarks show that a smaller + // page size strikes a balance between throughput and time we block the FSM + // apply for other operations + cancelable := srv.evalBroker.Cancelable(structs.MaxUUIDsPerWriteRequest / 10) + if len(cancelable) > 0 { + for i, eval := range cancelable { + eval = eval.Copy() + eval.Status = structs.EvalStatusCancelled + eval.StatusDescription = cancelDesc + eval.UpdateModifyTime() + cancelable[i] = eval + } + + update := &structs.EvalUpdateRequest{ + Evals: cancelable, + WriteRequest: structs.WriteRequest{Region: srv.Region()}, + } + _, _, err := srv.raftApply(structs.EvalUpdateRequestType, update) + if err != nil { + srv.logger.Warn("eval cancel failed", "error", err, "method", "ack") + return err + } + } + return nil +} diff --git a/nomad/leader.go b/nomad/leader.go index b6b2895fae67..0d8b6ed84477 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -365,6 +365,9 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { // Reap any duplicate blocked evaluations go s.reapDupBlockedEvaluations(stopCh) + // Reap any cancelable evaluations + go s.reapCancelableEvaluations(stopCh) + // Periodically unblock failed allocations go s.periodicUnblockFailedEvals(stopCh) @@ -992,6 +995,24 @@ func (s *Server) reapDupBlockedEvaluations(stopCh chan struct{}) { } } +// reapCancelableEvaluations is used to reap evaluations that were marked +// cancelable by the eval broker and should be cancelled. These get swept up +// whenever an eval Acks, but this ensures that we don't have a straggling batch +// when the cluster doesn't have any more work to do +func (s *Server) reapCancelableEvaluations(stopCh chan struct{}) { + timer, cancel := helper.NewSafeTimer(s.config.EvalReapCancelableInterval) + defer cancel() + for { + select { + case <-stopCh: + return + case <-timer.C: + cancelCancelableEvals(s) + timer.Reset(s.config.EvalReapCancelableInterval) + } + } +} + // periodicUnblockFailedEvals periodically unblocks failed, blocked evaluations. func (s *Server) periodicUnblockFailedEvals(stopCh chan struct{}) { ticker := time.NewTicker(failedEvalUnblockInterval) diff --git a/nomad/worker_test.go b/nomad/worker_test.go index a72304db92e9..9d4131e2d3e0 100644 --- a/nomad/worker_test.go +++ b/nomad/worker_test.go @@ -11,6 +11,7 @@ import ( log "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/ci" + "github.com/shoenig/test/must" "github.com/stretchr/testify/require" "github.com/hashicorp/nomad/helper/testlog" @@ -118,9 +119,10 @@ func TestWorker_dequeueEvaluation_SerialJobs(t *testing.T) { eval2.JobID = eval1.JobID // Insert the evals into the state store - if err := s1.fsm.State().UpsertEvals(structs.MsgTypeTestSetup, 1000, []*structs.Evaluation{eval1, eval2}); err != nil { - t.Fatal(err) - } + must.NoError(t, s1.fsm.State().UpsertEvals( + structs.MsgTypeTestSetup, 1000, []*structs.Evaluation{eval1})) + must.NoError(t, s1.fsm.State().UpsertEvals( + structs.MsgTypeTestSetup, 2000, []*structs.Evaluation{eval2})) s1.evalBroker.Enqueue(eval1) s1.evalBroker.Enqueue(eval2) @@ -131,45 +133,29 @@ func TestWorker_dequeueEvaluation_SerialJobs(t *testing.T) { // Attempt dequeue eval, token, waitIndex, shutdown := w.dequeueEvaluation(10 * time.Millisecond) - if shutdown { - t.Fatalf("should not shutdown") - } - if token == "" { - t.Fatalf("should get token") - } - if waitIndex != eval1.ModifyIndex { - t.Fatalf("bad wait index; got %d; want %d", waitIndex, eval1.ModifyIndex) - } - - // Ensure we get a sane eval - if !reflect.DeepEqual(eval, eval1) { - t.Fatalf("bad: %#v %#v", eval, eval1) - } + must.False(t, shutdown, must.Sprint("should not be shutdown")) + must.NotEq(t, token, "", must.Sprint("should get a token")) + must.NotEq(t, eval1.ModifyIndex, waitIndex, must.Sprintf("bad wait index")) + must.Eq(t, eval, eval1) // Update the modify index of the first eval - if err := s1.fsm.State().UpsertEvals(structs.MsgTypeTestSetup, 2000, []*structs.Evaluation{eval1}); err != nil { - t.Fatal(err) - } + must.NoError(t, s1.fsm.State().UpsertEvals( + structs.MsgTypeTestSetup, 1500, []*structs.Evaluation{eval1})) // Send the Ack w.sendAck(eval1, token) - // Attempt second dequeue + // Attempt second dequeue; it should succeed because the 2nd eval has a + // lower modify index than the snapshot used to schedule the 1st + // eval. Normally this can only happen if the worker is on a follower that's + // trailing behind in raft logs eval, token, waitIndex, shutdown = w.dequeueEvaluation(10 * time.Millisecond) - if shutdown { - t.Fatalf("should not shutdown") - } - if token == "" { - t.Fatalf("should get token") - } - if waitIndex != 2000 { - t.Fatalf("bad wait index; got %d; want 2000", eval2.ModifyIndex) - } - // Ensure we get a sane eval - if !reflect.DeepEqual(eval, eval2) { - t.Fatalf("bad: %#v %#v", eval, eval2) - } + must.False(t, shutdown, must.Sprint("should not be shutdown")) + must.NotEq(t, token, "", must.Sprint("should get a token")) + must.Eq(t, waitIndex, 2000, must.Sprintf("bad wait index")) + must.Eq(t, eval, eval2) + } func TestWorker_dequeueEvaluation_paused(t *testing.T) {