Skip to content

Commit

Permalink
eval broker: shed blocked evals older than acknowledged eval
Browse files Browse the repository at this point in the history
When an evaluation is acknowledged by a scheduler, the resulting plan is
guaranteed to cover up to the `ModifyIndex` set by the worker based on the
most recent evaluation for that job in the state store. At that point, we no
longer need to retain blocked evaluations in the broker that are older than that
index.

Move these stale evals into a canceled set. When the `Eval.Ack` RPC returns from
the eval broker it will retrieve a batch of canceable evals to write to
raft. This paces the cancelations limited by how frequently the schedulers are
acknowledging evals; this should reduce the risk of cancelations from
overwhelming raft relative to scheduler progress.
  • Loading branch information
tgross committed Sep 22, 2022
1 parent 54474a9 commit e5cd347
Show file tree
Hide file tree
Showing 4 changed files with 205 additions and 239 deletions.
78 changes: 73 additions & 5 deletions nomad/eval_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ type EvalBroker struct {
// blocked tracks the blocked evaluations by JobID in a priority queue
blocked map[structs.NamespacedID]PendingEvaluations

// cancelable tracks previously blocked evaluations (for any job) that are
// now safe for the Eval.Ack RPC to cancel in batches
cancelable PendingEvaluations

// ready tracks the ready jobs by scheduler in a priority queue
ready map[string]PendingEvaluations

Expand Down Expand Up @@ -140,6 +144,7 @@ func NewEvalBroker(timeout, initialNackDelay, subsequentNackDelay time.Duration,
evals: make(map[string]int),
jobEvals: make(map[structs.NamespacedID]string),
blocked: make(map[structs.NamespacedID]PendingEvaluations),
cancelable: PendingEvaluations{},
ready: make(map[string]PendingEvaluations),
unack: make(map[string]*unackEval),
waiting: make(map[string]chan struct{}),
Expand Down Expand Up @@ -559,6 +564,7 @@ func (b *EvalBroker) Ack(evalID, token string) error {
return fmt.Errorf("Token does not match for Evaluation ID")
}
jobID := unack.Eval.JobID
oldestUnackedIndex := unack.Eval.ModifyIndex

// Ensure we were able to stop the timer
if !unack.NackTimer.Stop() {
Expand Down Expand Up @@ -586,15 +592,27 @@ func (b *EvalBroker) Ack(evalID, token string) error {

// Check if there are any blocked evaluations
if blocked := b.blocked[namespacedID]; len(blocked) != 0 {
raw := heap.Pop(&blocked)

// Any blocked evaluations with ModifyIndexes older than the just-ack'd
// evaluation are no longer useful, so it's safe to drop them.
cancelable := blocked.MarkForCancel(oldestUnackedIndex)
b.cancelable = append(b.cancelable, cancelable...)
b.stats.TotalBlocked -= cancelable.Len()

// If any remain, enqueue the eval with the *highest* index
if len(blocked) > 0 {
raw := heap.Pop(&blocked)
eval := raw.(*structs.Evaluation)
b.stats.TotalBlocked -= 1
b.enqueueLocked(eval, eval.Type)
}

// Clean up if there are no more after that
if len(blocked) > 0 {
b.blocked[namespacedID] = blocked
} else {
delete(b.blocked, namespacedID)
}
eval := raw.(*structs.Evaluation)
b.stats.TotalBlocked -= 1
b.enqueueLocked(eval, eval.Type)
}

// Re-enqueue the evaluation.
Expand Down Expand Up @@ -738,6 +756,7 @@ func (b *EvalBroker) flush() {
b.evals = make(map[string]int)
b.jobEvals = make(map[structs.NamespacedID]string)
b.blocked = make(map[structs.NamespacedID]PendingEvaluations)
b.cancelable = PendingEvaluations{}
b.ready = make(map[string]PendingEvaluations)
b.unack = make(map[string]*unackEval)
b.timeWait = make(map[string]*time.Timer)
Expand Down Expand Up @@ -841,6 +860,15 @@ func (b *EvalBroker) Stats() *BrokerStats {
return stats
}

// Cancelable retrieves a batch of previously-blocked evaluations that are now
// stale and ready to mark for canceling. The eval RPC will call this with a
// batch size set to avoid sending overly large raft messages.
func (b *EvalBroker) Cancelable(batchSize int) []*structs.Evaluation {
b.l.RLock()
defer b.l.RUnlock()
return b.cancelable.PopN(batchSize)
}

// EmitStats is used to export metrics about the broker while enabled
func (b *EvalBroker) EmitStats(period time.Duration, stopCh <-chan struct{}) {
timer, stop := helper.NewSafeTimer(period)
Expand Down Expand Up @@ -904,7 +932,7 @@ func (p PendingEvaluations) Less(i, j int) bool {
if p[i].JobID != p[j].JobID && p[i].Priority != p[j].Priority {
return !(p[i].Priority < p[j].Priority)
}
return p[i].CreateIndex < p[j].CreateIndex
return (p[i].CreateIndex < p[j].CreateIndex)
}

// Swap is for the sorting interface
Expand Down Expand Up @@ -934,3 +962,43 @@ func (p PendingEvaluations) Peek() *structs.Evaluation {
}
return p[n-1]
}

// PopN removes and returns the minimum N evaluations from the slice.
func (p *PendingEvaluations) PopN(n int) []*structs.Evaluation {
if n > len(*p) {
n = len(*p)
}

popped := []*structs.Evaluation{}
for i := 0; i < n; i++ {
raw := heap.Pop(p)
eval := raw.(*structs.Evaluation)
popped = append(popped, eval)
}
return popped
}

// MarkForCancel is used to remove any evaluations older than the index from the
// blocked list and returns a list of cancelable evals so that Eval.Ack RPCs can
// write batched raft entries to cancel them. This must be called inside the
// broker's lock.
func (p *PendingEvaluations) MarkForCancel(index uint64) PendingEvaluations {

// In pathological cases, we can have a large number of blocked evals but
// will want to cancel most of them. Using heap.Remove requires we re-sort
// for each eval we remove. Because we expect to have very few if any
// blocked remaining, we'll just create a new heap
retain := PendingEvaluations{}
cancelable := PendingEvaluations{}

for _, eval := range *p {
if eval.ModifyIndex >= index {
heap.Push(&retain, eval)
} else {
heap.Push(&cancelable, eval)
}
}

*p = retain
return cancelable
}
Loading

0 comments on commit e5cd347

Please sign in to comment.