diff --git a/.changelog/14621.txt b/.changelog/14621.txt new file mode 100644 index 000000000000..7ca9a7ea863f --- /dev/null +++ b/.changelog/14621.txt @@ -0,0 +1,3 @@ +```release-note:improvement +eval broker: shed all but one blocked eval per job after successful ack +``` diff --git a/nomad/config.go b/nomad/config.go index 7014ef987fe5..d01e6b9d7916 100644 --- a/nomad/config.go +++ b/nomad/config.go @@ -252,6 +252,12 @@ type Config struct { // retrying a failed evaluation. EvalFailedFollowupBaselineDelay time.Duration + // EvalReapCancelableInterval is the interval for the periodic reaping of + // cancelable evaluations. Cancelable evaluations are canceled whenever any + // eval is ack'd but this sweeps up on quiescent clusters. This config value + // exists only for testing. + EvalReapCancelableInterval time.Duration + // EvalFailedFollowupDelayRange defines the range of additional time from // the baseline in which to wait before retrying a failed evaluation. The // additional delay is selected from this range randomly. @@ -471,6 +477,7 @@ func DefaultConfig() *Config { EvalNackSubsequentReenqueueDelay: 20 * time.Second, EvalFailedFollowupBaselineDelay: 1 * time.Minute, EvalFailedFollowupDelayRange: 5 * time.Minute, + EvalReapCancelableInterval: 10 * time.Second, MinHeartbeatTTL: 10 * time.Second, MaxHeartbeatsPerSecond: 50.0, HeartbeatGrace: 10 * time.Second, diff --git a/nomad/eval_broker.go b/nomad/eval_broker.go index e13394b17258..885027bab63d 100644 --- a/nomad/eval_broker.go +++ b/nomad/eval_broker.go @@ -64,7 +64,11 @@ type EvalBroker struct { jobEvals map[structs.NamespacedID]string // blocked tracks the blocked evaluations by JobID in a priority queue - blocked map[structs.NamespacedID]PendingEvaluations + blocked map[structs.NamespacedID]BlockedEvaluations + + // cancelable tracks previously blocked evaluations (for any job) that are + // now safe for the Eval.Ack RPC to cancel in batches + cancelable []*structs.Evaluation // ready tracks the ready jobs by scheduler in a priority queue ready map[string]PendingEvaluations @@ -115,11 +119,14 @@ type unackEval struct { NackTimer *time.Timer } -// PendingEvaluations is a list of waiting evaluations. -// We implement the container/heap interface so that this is a -// priority queue +// PendingEvaluations is a list of ready evaluations across multiple jobs. We +// implement the container/heap interface so that this is a priority queue. type PendingEvaluations []*structs.Evaluation +// BlockedEvaluations is a list of blocked evaluations for a given job. We +// implement the container/heap interface so that this is a priority queue. +type BlockedEvaluations []*structs.Evaluation + // NewEvalBroker creates a new evaluation broker. This is parameterized // with the timeout used for messages that are not acknowledged before we // assume a Nack and attempt to redeliver as well as the deliveryLimit @@ -139,7 +146,8 @@ func NewEvalBroker(timeout, initialNackDelay, subsequentNackDelay time.Duration, stats: new(BrokerStats), evals: make(map[string]int), jobEvals: make(map[structs.NamespacedID]string), - blocked: make(map[structs.NamespacedID]PendingEvaluations), + blocked: make(map[structs.NamespacedID]BlockedEvaluations), + cancelable: []*structs.Evaluation{}, // TODO: pre-allocate this? ready: make(map[string]PendingEvaluations), unack: make(map[string]*unackEval), waiting: make(map[string]chan struct{}), @@ -586,15 +594,28 @@ func (b *EvalBroker) Ack(evalID, token string) error { // Check if there are any blocked evaluations if blocked := b.blocked[namespacedID]; len(blocked) != 0 { - raw := heap.Pop(&blocked) + + // Any blocked evaluations with ModifyIndexes older than the just-ack'd + // evaluation are no longer useful, so it's safe to drop them. + cancelable := blocked.MarkForCancel() + b.cancelable = append(b.cancelable, cancelable...) + b.stats.TotalCancelable = len(b.cancelable) + b.stats.TotalBlocked -= len(cancelable) + + // If any remain, enqueue an eval + if len(blocked) > 0 { + raw := heap.Pop(&blocked) + eval := raw.(*structs.Evaluation) + b.stats.TotalBlocked -= 1 + b.enqueueLocked(eval, eval.Type) + } + + // Clean up if there are no more after that if len(blocked) > 0 { b.blocked[namespacedID] = blocked } else { delete(b.blocked, namespacedID) } - eval := raw.(*structs.Evaluation) - b.stats.TotalBlocked -= 1 - b.enqueueLocked(eval, eval.Type) } // Re-enqueue the evaluation. @@ -733,11 +754,13 @@ func (b *EvalBroker) flush() { b.stats.TotalUnacked = 0 b.stats.TotalBlocked = 0 b.stats.TotalWaiting = 0 + b.stats.TotalCancelable = 0 b.stats.DelayedEvals = make(map[string]*structs.Evaluation) b.stats.ByScheduler = make(map[string]*SchedulerStats) b.evals = make(map[string]int) b.jobEvals = make(map[structs.NamespacedID]string) - b.blocked = make(map[structs.NamespacedID]PendingEvaluations) + b.blocked = make(map[structs.NamespacedID]BlockedEvaluations) + b.cancelable = []*structs.Evaluation{} b.ready = make(map[string]PendingEvaluations) b.unack = make(map[string]*unackEval) b.timeWait = make(map[string]*time.Timer) @@ -830,6 +853,7 @@ func (b *EvalBroker) Stats() *BrokerStats { stats.TotalUnacked = b.stats.TotalUnacked stats.TotalBlocked = b.stats.TotalBlocked stats.TotalWaiting = b.stats.TotalWaiting + stats.TotalCancelable = b.stats.TotalCancelable for id, eval := range b.stats.DelayedEvals { evalCopy := *eval stats.DelayedEvals[id] = &evalCopy @@ -841,6 +865,24 @@ func (b *EvalBroker) Stats() *BrokerStats { return stats } +// Cancelable retrieves a batch of previously-blocked evaluations that are now +// stale and ready to mark for canceling. The eval RPC will call this with a +// batch size set to avoid sending overly large raft messages. +func (b *EvalBroker) Cancelable(batchSize int) []*structs.Evaluation { + b.l.RLock() + defer b.l.RUnlock() + + if batchSize > len(b.cancelable) { + batchSize = len(b.cancelable) + } + + cancelable := b.cancelable[:batchSize] + b.cancelable = b.cancelable[batchSize:] + + b.stats.TotalCancelable = len(b.cancelable) + return cancelable +} + // EmitStats is used to export metrics about the broker while enabled func (b *EvalBroker) EmitStats(period time.Duration, stopCh <-chan struct{}) { timer, stop := helper.NewSafeTimer(period) @@ -856,6 +898,7 @@ func (b *EvalBroker) EmitStats(period time.Duration, stopCh <-chan struct{}) { metrics.SetGauge([]string{"nomad", "broker", "total_unacked"}, float32(stats.TotalUnacked)) metrics.SetGauge([]string{"nomad", "broker", "total_blocked"}, float32(stats.TotalBlocked)) metrics.SetGauge([]string{"nomad", "broker", "total_waiting"}, float32(stats.TotalWaiting)) + metrics.SetGauge([]string{"nomad", "broker", "total_cancelable"}, float32(stats.TotalCancelable)) for _, eval := range stats.DelayedEvals { metrics.SetGaugeWithLabels([]string{"nomad", "broker", "eval_waiting"}, float32(time.Until(eval.WaitUntil).Seconds()), @@ -878,12 +921,13 @@ func (b *EvalBroker) EmitStats(period time.Duration, stopCh <-chan struct{}) { // BrokerStats returns all the stats about the broker type BrokerStats struct { - TotalReady int - TotalUnacked int - TotalBlocked int - TotalWaiting int - DelayedEvals map[string]*structs.Evaluation - ByScheduler map[string]*SchedulerStats + TotalReady int + TotalUnacked int + TotalBlocked int + TotalWaiting int + TotalCancelable int + DelayedEvals map[string]*structs.Evaluation + ByScheduler map[string]*SchedulerStats } // SchedulerStats returns the stats per scheduler @@ -934,3 +978,59 @@ func (p PendingEvaluations) Peek() *structs.Evaluation { } return p[n-1] } + +// Len is for the sorting interface +func (p BlockedEvaluations) Len() int { + return len(p) +} + +// Less is for the sorting interface. We flip the check +// so that the "min" in the min-heap is the element with the +// highest priority or highest modify index +func (p BlockedEvaluations) Less(i, j int) bool { + if p[i].Priority != p[j].Priority { + return !(p[i].Priority < p[j].Priority) + } + return !(p[i].ModifyIndex < p[j].ModifyIndex) +} + +// Swap is for the sorting interface +func (p BlockedEvaluations) Swap(i, j int) { + p[i], p[j] = p[j], p[i] +} + +// Push implements the heap interface and is used to add a new evaluation to the slice +func (p *BlockedEvaluations) Push(e interface{}) { + *p = append(*p, e.(*structs.Evaluation)) +} + +// Pop implements the heap interface and is used to remove an evaluation from the slice +func (p *BlockedEvaluations) Pop() interface{} { + n := len(*p) + e := (*p)[n-1] + (*p)[n-1] = nil + *p = (*p)[:n-1] + return e +} + +// MarkForCancel is used to clear the blocked list of all but the one with the +// highest modify index and highest priority. It returns a slice of cancelable +// evals so that Eval.Ack RPCs can write batched raft entries to cancel +// them. This must be called inside the broker's lock. +func (p *BlockedEvaluations) MarkForCancel() []*structs.Evaluation { + + // In pathological cases, we can have a large number of blocked evals but + // will want to cancel most of them. Using heap.Remove requires we re-sort + // for each eval we remove. Because we expect to have at most one remaining, + // we'll just create a new heap. + retain := BlockedEvaluations{} + + raw := heap.Pop(p) + heap.Push(&retain, raw) + + cancelable := make([]*structs.Evaluation, len(*p)) + copy(cancelable, *p) + + *p = retain + return cancelable +} diff --git a/nomad/eval_broker_test.go b/nomad/eval_broker_test.go index 3b0988eae724..c46f8bf4ac46 100644 --- a/nomad/eval_broker_test.go +++ b/nomad/eval_broker_test.go @@ -1,16 +1,21 @@ package nomad import ( + "container/heap" "encoding/json" "errors" "fmt" "testing" "time" + msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" + "github.com/shoenig/test" + "github.com/shoenig/test/must" "github.com/stretchr/testify/require" ) @@ -393,236 +398,107 @@ func TestEvalBroker_Serialize_DuplicateJobID(t *testing.T) { ns1 := "namespace-one" ns2 := "namespace-two" - eval := mock.Eval() - eval.Namespace = ns1 - b.Enqueue(eval) - - eval2 := mock.Eval() - eval2.JobID = eval.JobID - eval2.Namespace = ns1 - eval2.CreateIndex = eval.CreateIndex + 1 - b.Enqueue(eval2) - - eval3 := mock.Eval() - eval3.JobID = eval.JobID - eval3.Namespace = ns1 - eval3.CreateIndex = eval.CreateIndex + 2 - b.Enqueue(eval3) + jobID := "example" - eval4 := mock.Eval() - eval4.JobID = eval.JobID - eval4.Namespace = ns2 - eval4.CreateIndex = eval.CreateIndex + 3 - b.Enqueue(eval4) - - eval5 := mock.Eval() - eval5.JobID = eval.JobID - eval5.Namespace = ns2 - eval5.CreateIndex = eval.CreateIndex + 4 - b.Enqueue(eval5) - - stats := b.Stats() - if stats.TotalReady != 2 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 3 { - t.Fatalf("bad: %#v", stats) + newEval := func(idx uint64, ns string) *structs.Evaluation { + eval := mock.Eval() + eval.ID = fmt.Sprintf("eval:%d", idx) + eval.JobID = jobID + eval.Namespace = ns + eval.CreateIndex = idx + eval.ModifyIndex = idx + b.Enqueue(eval) + return eval + } + + // first job + eval1 := newEval(1, ns1) + newEval(2, ns1) + newEval(3, ns1) + eval4 := newEval(4, ns1) + + // second job + eval5 := newEval(5, ns2) + newEval(6, ns2) + eval7 := newEval(7, ns2) + + // retreive the stats from the broker, less some stats that aren't + // interesting for this test and make the test much more verbose + // to include + getStats := func() BrokerStats { + t.Helper() + stats := b.Stats() + stats.DelayedEvals = nil + stats.ByScheduler = nil + return *stats } - // Dequeue should work + must.Eq(t, BrokerStats{TotalReady: 2, TotalUnacked: 0, + TotalBlocked: 5, TotalCancelable: 0}, getStats()) + + // Dequeue should get 1st eval out, token, err := b.Dequeue(defaultSched, time.Second) - if err != nil { - t.Fatalf("err: %v", err) - } - if out != eval { - t.Fatalf("bad : %#v", out) - } + must.NoError(t, err) + must.Eq(t, out, eval1, must.Sprint("expected 1st eval")) - // Check the stats - stats = b.Stats() - if stats.TotalReady != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 3 { - t.Fatalf("bad: %#v", stats) - } + must.Eq(t, BrokerStats{TotalReady: 1, TotalUnacked: 1, + TotalBlocked: 5, TotalCancelable: 0}, getStats()) - // Ack out - err = b.Ack(eval.ID, token) - if err != nil { - t.Fatalf("err: %v", err) - } + // Current wait index should be 4 but Ack to exercise behavior + // when worker's Eval.getWaitIndex gets a stale index + err = b.Ack(eval1.ID, token) + must.NoError(t, err) - // Check the stats - stats = b.Stats() - if stats.TotalReady != 2 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 0 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 2 { - t.Fatalf("bad: %#v", stats) - } + must.Eq(t, BrokerStats{TotalReady: 2, TotalUnacked: 0, + TotalBlocked: 2, TotalCancelable: 2}, getStats()) - // Dequeue should work + // eval4 and eval5 are ready + // eval6 and eval7 are blocked + // Dequeue should get 4th eval out, token, err = b.Dequeue(defaultSched, time.Second) - if err != nil { - t.Fatalf("err: %v", err) - } - if out != eval2 { - t.Fatalf("bad : %#v", out) - } + must.NoError(t, err) + must.Eq(t, out, eval4, must.Sprint("expected 4th eval")) - // Check the stats - stats = b.Stats() - if stats.TotalReady != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 2 { - t.Fatalf("bad: %#v", stats) - } - - // Ack out - err = b.Ack(eval2.ID, token) - if err != nil { - t.Fatalf("err: %v", err) - } + must.Eq(t, BrokerStats{TotalReady: 1, TotalUnacked: 1, + TotalBlocked: 2, TotalCancelable: 2}, getStats()) - // Check the stats - stats = b.Stats() - if stats.TotalReady != 2 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 0 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 1 { - t.Fatalf("bad: %#v", stats) - } - - // Dequeue should work - out, token, err = b.Dequeue(defaultSched, time.Second) - if err != nil { - t.Fatalf("err: %v", err) - } - if out != eval3 { - t.Fatalf("bad : %#v", out) - } - - // Check the stats - stats = b.Stats() - if stats.TotalReady != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 1 { - t.Fatalf("bad: %#v", stats) - } - - // Ack out - err = b.Ack(eval3.ID, token) - if err != nil { - t.Fatalf("err: %v", err) - } + // Ack should clear the rest of namespace-one blocked but leave + // namespace-two untouched + err = b.Ack(eval4.ID, token) + must.NoError(t, err) - // Check the stats - stats = b.Stats() - if stats.TotalReady != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 0 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 1 { - t.Fatalf("bad: %#v", stats) - } + must.Eq(t, BrokerStats{TotalReady: 1, TotalUnacked: 0, + TotalBlocked: 2, TotalCancelable: 2}, getStats()) - // Dequeue should work + // Dequeue should get 5th eval out, token, err = b.Dequeue(defaultSched, time.Second) - if err != nil { - t.Fatalf("err: %v", err) - } - if out != eval4 { - t.Fatalf("bad : %#v", out) - } + must.NoError(t, err) + must.Eq(t, out, eval5, must.Sprint("expected 5th eval")) - // Check the stats - stats = b.Stats() - if stats.TotalReady != 0 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 1 { - t.Fatalf("bad: %#v", stats) - } + must.Eq(t, BrokerStats{TotalReady: 0, TotalUnacked: 1, + TotalBlocked: 2, TotalCancelable: 2}, getStats()) - // Ack out - err = b.Ack(eval4.ID, token) - if err != nil { - t.Fatalf("err: %v", err) - } + // Ack should clear remaining namespace-two blocked evals + err = b.Ack(eval5.ID, token) + must.NoError(t, err) - // Check the stats - stats = b.Stats() - if stats.TotalReady != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 0 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 0 { - t.Fatalf("bad: %#v", stats) - } + must.Eq(t, BrokerStats{TotalReady: 1, TotalUnacked: 0, + TotalBlocked: 0, TotalCancelable: 3}, getStats()) - // Dequeue should work + // Dequeue should get 7th eval because that's all that's left out, token, err = b.Dequeue(defaultSched, time.Second) - if err != nil { - t.Fatalf("err: %v", err) - } - if out != eval5 { - t.Fatalf("bad : %#v", out) - } + must.NoError(t, err) + must.Eq(t, out, eval7, must.Sprint("expected 7th eval")) - // Check the stats - stats = b.Stats() - if stats.TotalReady != 0 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 0 { - t.Fatalf("bad: %#v", stats) - } + must.Eq(t, BrokerStats{TotalReady: 0, TotalUnacked: 1, + TotalBlocked: 0, TotalCancelable: 3}, getStats()) - // Ack out - err = b.Ack(eval5.ID, token) - if err != nil { - t.Fatalf("err: %v", err) - } + // Last ack should leave the broker empty except for cancels + err = b.Ack(eval7.ID, token) + must.NoError(t, err) - // Check the stats - stats = b.Stats() - if stats.TotalReady != 0 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 0 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 0 { - t.Fatalf("bad: %#v", stats) - } + must.Eq(t, BrokerStats{TotalReady: 0, TotalUnacked: 0, + TotalBlocked: 0, TotalCancelable: 3}, getStats()) } func TestEvalBroker_Enqueue_Disable(t *testing.T) { @@ -813,18 +689,18 @@ func TestEvalBroker_Dequeue_FIFO(t *testing.T) { b.SetEnabled(true) NUM := 100 - for i := 0; i < NUM; i++ { + for i := NUM; i > 0; i-- { eval1 := mock.Eval() eval1.CreateIndex = uint64(i) eval1.ModifyIndex = uint64(i) b.Enqueue(eval1) } - for i := 0; i < NUM; i++ { + for i := 1; i < NUM; i++ { out1, _, _ := b.Dequeue(defaultSched, time.Second) - if out1.CreateIndex != uint64(i) { - t.Fatalf("bad: %d %#v", i, out1) - } + must.Eq(t, uint64(i), out1.CreateIndex, + must.Sprintf("eval was not FIFO by CreateIndex"), + ) } } @@ -1506,3 +1382,202 @@ func TestEvalBroker_NamespacedJobs(t *testing.T) { require.Equal(1, len(b.blocked)) } + +func TestEvalBroker_PendingEvals_Ordering(t *testing.T) { + + ready := PendingEvaluations{} + + newEval := func(jobID, evalID string, priority int, index uint64) *structs.Evaluation { + eval := mock.Eval() + eval.JobID = jobID + eval.ID = evalID + eval.Priority = priority + eval.CreateIndex = uint64(index) + return eval + } + + // note: we're intentionally pushing these out-of-order to assert we're + // getting them back out in the intended order and not just as inserted + heap.Push(&ready, newEval("example1", "eval01", 50, 1)) + heap.Push(&ready, newEval("example3", "eval03", 70, 3)) + heap.Push(&ready, newEval("example2", "eval02", 50, 2)) + + next := heap.Pop(&ready).(*structs.Evaluation) + test.Eq(t, "eval03", next.ID, + test.Sprint("expected highest Priority to be next ready")) + + next = heap.Pop(&ready).(*structs.Evaluation) + test.Eq(t, "eval01", next.ID, + test.Sprint("expected oldest CreateIndex to be next ready")) + + heap.Push(&ready, newEval("example4", "eval04", 50, 4)) + + next = heap.Pop(&ready).(*structs.Evaluation) + test.Eq(t, "eval02", next.ID, + test.Sprint("expected oldest CreateIndex to be next ready")) + +} + +func TestEvalBroker_BlockedEval_Ordering(t *testing.T) { + blocked := BlockedEvaluations{} + + newEval := func(evalID string, priority int, index uint64) *structs.Evaluation { + eval := mock.Eval() + eval.ID = evalID + eval.Priority = priority + eval.ModifyIndex = uint64(index) + return eval + } + + // note: we're intentionally pushing these out-of-order to assert we're + // getting them back out in the intended order and not just as inserted + heap.Push(&blocked, newEval("eval03", 50, 3)) + heap.Push(&blocked, newEval("eval02", 100, 2)) + heap.Push(&blocked, newEval("eval01", 50, 1)) + + unblocked := heap.Pop(&blocked).(*structs.Evaluation) + test.Eq(t, "eval02", unblocked.ID, + test.Sprint("expected eval with highest priority to get unblocked")) + + unblocked = heap.Pop(&blocked).(*structs.Evaluation) + test.Eq(t, "eval03", unblocked.ID, + test.Sprint("expected eval with highest modify index to get unblocked")) + + heap.Push(&blocked, newEval("eval04", 30, 4)) + unblocked = heap.Pop(&blocked).(*structs.Evaluation) + test.Eq(t, "eval01", unblocked.ID, + test.Sprint("expected eval with highest priority to get unblocked")) + +} + +func TestEvalBroker_BlockedEvals_MarkForCancel(t *testing.T) { + ci.Parallel(t) + + blocked := BlockedEvaluations{} + + // note: we're intentionally pushing these out-of-order to assert we're + // getting them back out in the intended order and not just as inserted + for i := 100; i > 0; i -= 10 { + eval := mock.Eval() + eval.JobID = "example" + eval.CreateIndex = uint64(i) + eval.ModifyIndex = uint64(i) + heap.Push(&blocked, eval) + } + + canceled := blocked.MarkForCancel() + must.Eq(t, 9, len(canceled)) + must.Eq(t, 1, blocked.Len()) + + raw := heap.Pop(&blocked) + must.NotNil(t, raw) + eval := raw.(*structs.Evaluation) + must.Eq(t, 100, eval.ModifyIndex) +} + +// TestEvalBroker_IntegrationTest exercises the eval broker with realistic +// workflows +func TestEvalBroker_IntegrationTest(t *testing.T) { + ci.Parallel(t) + + srv, cleanupS1 := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent dequeue + c.EvalReapCancelableInterval = time.Minute * 10 // Prevent sweep-up + }) + + defer cleanupS1() + testutil.WaitForLeader(t, srv.RPC) + + codec := rpcClient(t, srv) + store := srv.fsm.State() + + // create a system job, a node for it to run on, and a set of node up/down + // events that will result in evaluations queued. + + job := mock.SystemJob() + jobReq := &structs.JobRegisterRequest{ + Job: job, + EvalPriority: 50, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var jobResp structs.JobRegisterResponse + err := msgpackrpc.CallWithCodec(codec, "Job.Register", jobReq, &jobResp) + must.NoError(t, err) + + node := mock.Node() + nodeReq := &structs.NodeRegisterRequest{ + Node: node, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var nodeResp structs.NodeUpdateResponse + err = msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReq, &nodeResp) + must.NoError(t, err) + + for i := 0; i < 10; i++ { + status := structs.NodeStatusDown + if i%2 == 0 { + status = structs.NodeStatusReady + } + statusReq := &structs.NodeUpdateStatusRequest{ + NodeID: node.ID, + Status: status, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var statusResp structs.NodeUpdateResponse + err = msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", statusReq, &statusResp) + must.NoError(t, err) + } + + // ensure we have the expected number of evaluations and eval broker state + + // retreive the stats from the broker, less some uninteresting ones + getStats := func() BrokerStats { + t.Helper() + stats := srv.evalBroker.Stats() + stats.DelayedEvals = nil + stats.ByScheduler = nil + return *stats + } + + getEvalStatuses := func() map[string]int { + t.Helper() + statuses := map[string]int{} + iter, err := store.Evals(nil, state.SortDefault) + must.NoError(t, err) + for { + raw := iter.Next() + if raw == nil { + break + } + eval := raw.(*structs.Evaluation) + statuses[eval.Status] += 1 + if eval.Status == structs.EvalStatusCancelled { + must.Eq(t, "canceled after more recent eval was processed", eval.StatusDescription) + } + } + return statuses + } + + must.Eq(t, map[string]int{structs.EvalStatusPending: 11}, getEvalStatuses()) + must.Eq(t, BrokerStats{TotalReady: 1, TotalUnacked: 0, + TotalBlocked: 10, TotalCancelable: 0}, getStats()) + + // start schedulers: all the evals are for a single job so there should only + // be one eval processesed at a time no matter how many schedulers we run + + config := DefaultConfig() + config.NumSchedulers = 4 + config.EvalReapCancelableInterval = time.Minute * 10 + require.NoError(t, srv.Reload(config)) + + // assert that all but 2 evals were canceled and that the eval broker state + // has been cleared + + require.Eventually(t, func() bool { + got := getEvalStatuses() + return got[structs.EvalStatusComplete] == 2 && got[structs.EvalStatusCancelled] == 9 + }, 2*time.Second, time.Millisecond*100) + + must.Eq(t, BrokerStats{TotalReady: 0, TotalUnacked: 0, + TotalBlocked: 0, TotalCancelable: 0}, getStats()) +} diff --git a/nomad/eval_endpoint.go b/nomad/eval_endpoint.go index bedaf45a8084..e6ac4197308b 100644 --- a/nomad/eval_endpoint.go +++ b/nomad/eval_endpoint.go @@ -230,7 +230,8 @@ func (e *Eval) Ack(args *structs.EvalAckRequest, if err := e.srv.evalBroker.Ack(args.EvalID, args.Token); err != nil { return err } - return nil + + return cancelCancelableEvals(e.srv) } // Nack is used to negative acknowledge completion of a dequeued evaluation. @@ -766,3 +767,35 @@ func (e *Eval) Allocations(args *structs.EvalSpecificRequest, }} return e.srv.blockingRPC(&opts) } + +// cancelCancelableEvals pulls a batch of cancelable evaluations from the eval +// broker and updates their status to canceled. +func cancelCancelableEvals(srv *Server) error { + + const cancelDesc = "canceled after more recent eval was processed" + + // We *can* send larger raft logs but rough benchmarks show that a smaller + // page size strikes a balance between throughput and time we block the FSM + // apply for other operations + cancelable := srv.evalBroker.Cancelable(structs.MaxUUIDsPerWriteRequest / 10) + if len(cancelable) > 0 { + for i, eval := range cancelable { + eval = eval.Copy() + eval.Status = structs.EvalStatusCancelled + eval.StatusDescription = cancelDesc + eval.UpdateModifyTime() + cancelable[i] = eval + } + + update := &structs.EvalUpdateRequest{ + Evals: cancelable, + WriteRequest: structs.WriteRequest{Region: srv.Region()}, + } + _, _, err := srv.raftApply(structs.EvalUpdateRequestType, update) + if err != nil { + srv.logger.Warn("eval cancel failed", "error", err, "method", "ack") + return err + } + } + return nil +} diff --git a/nomad/leader.go b/nomad/leader.go index b6b2895fae67..0d8b6ed84477 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -365,6 +365,9 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { // Reap any duplicate blocked evaluations go s.reapDupBlockedEvaluations(stopCh) + // Reap any cancelable evaluations + go s.reapCancelableEvaluations(stopCh) + // Periodically unblock failed allocations go s.periodicUnblockFailedEvals(stopCh) @@ -992,6 +995,24 @@ func (s *Server) reapDupBlockedEvaluations(stopCh chan struct{}) { } } +// reapCancelableEvaluations is used to reap evaluations that were marked +// cancelable by the eval broker and should be cancelled. These get swept up +// whenever an eval Acks, but this ensures that we don't have a straggling batch +// when the cluster doesn't have any more work to do +func (s *Server) reapCancelableEvaluations(stopCh chan struct{}) { + timer, cancel := helper.NewSafeTimer(s.config.EvalReapCancelableInterval) + defer cancel() + for { + select { + case <-stopCh: + return + case <-timer.C: + cancelCancelableEvals(s) + timer.Reset(s.config.EvalReapCancelableInterval) + } + } +} + // periodicUnblockFailedEvals periodically unblocks failed, blocked evaluations. func (s *Server) periodicUnblockFailedEvals(stopCh chan struct{}) { ticker := time.NewTicker(failedEvalUnblockInterval) diff --git a/nomad/worker_test.go b/nomad/worker_test.go index a72304db92e9..9d4131e2d3e0 100644 --- a/nomad/worker_test.go +++ b/nomad/worker_test.go @@ -11,6 +11,7 @@ import ( log "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/ci" + "github.com/shoenig/test/must" "github.com/stretchr/testify/require" "github.com/hashicorp/nomad/helper/testlog" @@ -118,9 +119,10 @@ func TestWorker_dequeueEvaluation_SerialJobs(t *testing.T) { eval2.JobID = eval1.JobID // Insert the evals into the state store - if err := s1.fsm.State().UpsertEvals(structs.MsgTypeTestSetup, 1000, []*structs.Evaluation{eval1, eval2}); err != nil { - t.Fatal(err) - } + must.NoError(t, s1.fsm.State().UpsertEvals( + structs.MsgTypeTestSetup, 1000, []*structs.Evaluation{eval1})) + must.NoError(t, s1.fsm.State().UpsertEvals( + structs.MsgTypeTestSetup, 2000, []*structs.Evaluation{eval2})) s1.evalBroker.Enqueue(eval1) s1.evalBroker.Enqueue(eval2) @@ -131,45 +133,29 @@ func TestWorker_dequeueEvaluation_SerialJobs(t *testing.T) { // Attempt dequeue eval, token, waitIndex, shutdown := w.dequeueEvaluation(10 * time.Millisecond) - if shutdown { - t.Fatalf("should not shutdown") - } - if token == "" { - t.Fatalf("should get token") - } - if waitIndex != eval1.ModifyIndex { - t.Fatalf("bad wait index; got %d; want %d", waitIndex, eval1.ModifyIndex) - } - - // Ensure we get a sane eval - if !reflect.DeepEqual(eval, eval1) { - t.Fatalf("bad: %#v %#v", eval, eval1) - } + must.False(t, shutdown, must.Sprint("should not be shutdown")) + must.NotEq(t, token, "", must.Sprint("should get a token")) + must.NotEq(t, eval1.ModifyIndex, waitIndex, must.Sprintf("bad wait index")) + must.Eq(t, eval, eval1) // Update the modify index of the first eval - if err := s1.fsm.State().UpsertEvals(structs.MsgTypeTestSetup, 2000, []*structs.Evaluation{eval1}); err != nil { - t.Fatal(err) - } + must.NoError(t, s1.fsm.State().UpsertEvals( + structs.MsgTypeTestSetup, 1500, []*structs.Evaluation{eval1})) // Send the Ack w.sendAck(eval1, token) - // Attempt second dequeue + // Attempt second dequeue; it should succeed because the 2nd eval has a + // lower modify index than the snapshot used to schedule the 1st + // eval. Normally this can only happen if the worker is on a follower that's + // trailing behind in raft logs eval, token, waitIndex, shutdown = w.dequeueEvaluation(10 * time.Millisecond) - if shutdown { - t.Fatalf("should not shutdown") - } - if token == "" { - t.Fatalf("should get token") - } - if waitIndex != 2000 { - t.Fatalf("bad wait index; got %d; want 2000", eval2.ModifyIndex) - } - // Ensure we get a sane eval - if !reflect.DeepEqual(eval, eval2) { - t.Fatalf("bad: %#v %#v", eval, eval2) - } + must.False(t, shutdown, must.Sprint("should not be shutdown")) + must.NotEq(t, token, "", must.Sprint("should get a token")) + must.Eq(t, waitIndex, 2000, must.Sprintf("bad wait index")) + must.Eq(t, eval, eval2) + } func TestWorker_dequeueEvaluation_paused(t *testing.T) {