Skip to content

Commit

Permalink
eval broker: shed all but one blocked eval per job after ack
Browse files Browse the repository at this point in the history
When an evaluation is acknowledged by a scheduler, the resulting plan is
guaranteed to cover up to the `waitIndex` set by the worker based on the most
recent evaluation for that job in the state store. At that point, we no longer
need to retain blocked evaluations in the broker that are older than that index.

Move all but the highest priority / highest `ModifyIndex` blocked eval into a
canceled set. When the `Eval.Ack` RPC returns from the eval broker it will
retrieve a batch of canceable evals to write to raft. This paces the
cancelations limited by how frequently the schedulers are acknowledging evals;
this should reduce the risk of cancelations from overwhelming raft relative to
scheduler progress. In order to avoid straggling batches when the cluster is
quiet, we also include a periodic sweep through the cancelable list.
  • Loading branch information
tgross committed Nov 9, 2022
1 parent 72d58fc commit e372037
Show file tree
Hide file tree
Showing 7 changed files with 489 additions and 264 deletions.
3 changes: 3 additions & 0 deletions .changelog/14621.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
eval broker: shed all but one blocked eval per job after successful ack
```
7 changes: 7 additions & 0 deletions nomad/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,12 @@ type Config struct {
// retrying a failed evaluation.
EvalFailedFollowupBaselineDelay time.Duration

// EvalReapCancelableInterval is the interval for the periodic reaping of
// cancelable evaluations. Cancelable evaluations are canceled whenever any
// eval is ack'd but this sweeps up on quiescent clusters. This config value
// exists only for testing.
EvalReapCancelableInterval time.Duration

// EvalFailedFollowupDelayRange defines the range of additional time from
// the baseline in which to wait before retrying a failed evaluation. The
// additional delay is selected from this range randomly.
Expand Down Expand Up @@ -471,6 +477,7 @@ func DefaultConfig() *Config {
EvalNackSubsequentReenqueueDelay: 20 * time.Second,
EvalFailedFollowupBaselineDelay: 1 * time.Minute,
EvalFailedFollowupDelayRange: 5 * time.Minute,
EvalReapCancelableInterval: 5 * time.Second,
MinHeartbeatTTL: 10 * time.Second,
MaxHeartbeatsPerSecond: 50.0,
HeartbeatGrace: 10 * time.Second,
Expand Down
132 changes: 116 additions & 16 deletions nomad/eval_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ type EvalBroker struct {
jobEvals map[structs.NamespacedID]string

// blocked tracks the blocked evaluations by JobID in a priority queue
blocked map[structs.NamespacedID]PendingEvaluations
blocked map[structs.NamespacedID]BlockedEvaluations

// cancelable tracks previously blocked evaluations (for any job) that are
// now safe for the Eval.Ack RPC to cancel in batches
cancelable []*structs.Evaluation

// ready tracks the ready jobs by scheduler in a priority queue
ready map[string]PendingEvaluations
Expand Down Expand Up @@ -115,11 +119,14 @@ type unackEval struct {
NackTimer *time.Timer
}

// PendingEvaluations is a list of waiting evaluations.
// We implement the container/heap interface so that this is a
// priority queue
// PendingEvaluations is a list of ready evaluations across multiple jobs. We
// implement the container/heap interface so that this is a priority queue.
type PendingEvaluations []*structs.Evaluation

// BlockedEvaluations is a list of blocked evaluations for a given job. We
// implement the container/heap interface so that this is a priority queue.
type BlockedEvaluations []*structs.Evaluation

// NewEvalBroker creates a new evaluation broker. This is parameterized
// with the timeout used for messages that are not acknowledged before we
// assume a Nack and attempt to redeliver as well as the deliveryLimit
Expand All @@ -139,7 +146,8 @@ func NewEvalBroker(timeout, initialNackDelay, subsequentNackDelay time.Duration,
stats: new(BrokerStats),
evals: make(map[string]int),
jobEvals: make(map[structs.NamespacedID]string),
blocked: make(map[structs.NamespacedID]PendingEvaluations),
blocked: make(map[structs.NamespacedID]BlockedEvaluations),
cancelable: []*structs.Evaluation{},
ready: make(map[string]PendingEvaluations),
unack: make(map[string]*unackEval),
waiting: make(map[string]chan struct{}),
Expand Down Expand Up @@ -586,15 +594,28 @@ func (b *EvalBroker) Ack(evalID, token string) error {

// Check if there are any blocked evaluations
if blocked := b.blocked[namespacedID]; len(blocked) != 0 {
raw := heap.Pop(&blocked)

// Any blocked evaluations with ModifyIndexes older than the just-ack'd
// evaluation are no longer useful, so it's safe to drop them.
cancelable := blocked.MarkForCancel()
b.cancelable = append(b.cancelable, cancelable...)
b.stats.TotalCancelable = len(b.cancelable)
b.stats.TotalBlocked -= len(cancelable)

// If any remain, enqueue an eval
if len(blocked) > 0 {
raw := heap.Pop(&blocked)
eval := raw.(*structs.Evaluation)
b.stats.TotalBlocked -= 1
b.enqueueLocked(eval, eval.Type)
}

// Clean up if there are no more after that
if len(blocked) > 0 {
b.blocked[namespacedID] = blocked
} else {
delete(b.blocked, namespacedID)
}
eval := raw.(*structs.Evaluation)
b.stats.TotalBlocked -= 1
b.enqueueLocked(eval, eval.Type)
}

// Re-enqueue the evaluation.
Expand Down Expand Up @@ -733,11 +754,13 @@ func (b *EvalBroker) flush() {
b.stats.TotalUnacked = 0
b.stats.TotalBlocked = 0
b.stats.TotalWaiting = 0
b.stats.TotalCancelable = 0
b.stats.DelayedEvals = make(map[string]*structs.Evaluation)
b.stats.ByScheduler = make(map[string]*SchedulerStats)
b.evals = make(map[string]int)
b.jobEvals = make(map[structs.NamespacedID]string)
b.blocked = make(map[structs.NamespacedID]PendingEvaluations)
b.blocked = make(map[structs.NamespacedID]BlockedEvaluations)
b.cancelable = []*structs.Evaluation{}
b.ready = make(map[string]PendingEvaluations)
b.unack = make(map[string]*unackEval)
b.timeWait = make(map[string]*time.Timer)
Expand Down Expand Up @@ -830,6 +853,7 @@ func (b *EvalBroker) Stats() *BrokerStats {
stats.TotalUnacked = b.stats.TotalUnacked
stats.TotalBlocked = b.stats.TotalBlocked
stats.TotalWaiting = b.stats.TotalWaiting
stats.TotalCancelable = b.stats.TotalCancelable
for id, eval := range b.stats.DelayedEvals {
evalCopy := *eval
stats.DelayedEvals[id] = &evalCopy
Expand All @@ -841,6 +865,24 @@ func (b *EvalBroker) Stats() *BrokerStats {
return stats
}

// Cancelable retrieves a batch of previously-blocked evaluations that are now
// stale and ready to mark for canceling. The eval RPC will call this with a
// batch size set to avoid sending overly large raft messages.
func (b *EvalBroker) Cancelable(batchSize int) []*structs.Evaluation {
b.l.RLock()
defer b.l.RUnlock()

if batchSize > len(b.cancelable) {
batchSize = len(b.cancelable)
}

cancelable := b.cancelable[:batchSize]
b.cancelable = b.cancelable[batchSize:]

b.stats.TotalCancelable = len(b.cancelable)
return cancelable
}

// EmitStats is used to export metrics about the broker while enabled
func (b *EvalBroker) EmitStats(period time.Duration, stopCh <-chan struct{}) {
timer, stop := helper.NewSafeTimer(period)
Expand All @@ -856,6 +898,7 @@ func (b *EvalBroker) EmitStats(period time.Duration, stopCh <-chan struct{}) {
metrics.SetGauge([]string{"nomad", "broker", "total_unacked"}, float32(stats.TotalUnacked))
metrics.SetGauge([]string{"nomad", "broker", "total_blocked"}, float32(stats.TotalBlocked))
metrics.SetGauge([]string{"nomad", "broker", "total_waiting"}, float32(stats.TotalWaiting))
metrics.SetGauge([]string{"nomad", "broker", "total_cancelable"}, float32(stats.TotalCancelable))
for _, eval := range stats.DelayedEvals {
metrics.SetGaugeWithLabels([]string{"nomad", "broker", "eval_waiting"},
float32(time.Until(eval.WaitUntil).Seconds()),
Expand All @@ -878,12 +921,13 @@ func (b *EvalBroker) EmitStats(period time.Duration, stopCh <-chan struct{}) {

// BrokerStats returns all the stats about the broker
type BrokerStats struct {
TotalReady int
TotalUnacked int
TotalBlocked int
TotalWaiting int
DelayedEvals map[string]*structs.Evaluation
ByScheduler map[string]*SchedulerStats
TotalReady int
TotalUnacked int
TotalBlocked int
TotalWaiting int
TotalCancelable int
DelayedEvals map[string]*structs.Evaluation
ByScheduler map[string]*SchedulerStats
}

// SchedulerStats returns the stats per scheduler
Expand Down Expand Up @@ -934,3 +978,59 @@ func (p PendingEvaluations) Peek() *structs.Evaluation {
}
return p[n-1]
}

// Len is for the sorting interface
func (p BlockedEvaluations) Len() int {
return len(p)
}

// Less is for the sorting interface. We flip the check
// so that the "min" in the min-heap is the element with the
// highest priority or highest modify index
func (p BlockedEvaluations) Less(i, j int) bool {
if p[i].Priority != p[j].Priority {
return !(p[i].Priority < p[j].Priority)
}
return !(p[i].ModifyIndex < p[j].ModifyIndex)
}

// Swap is for the sorting interface
func (p BlockedEvaluations) Swap(i, j int) {
p[i], p[j] = p[j], p[i]
}

// Push implements the heap interface and is used to add a new evaluation to the slice
func (p *BlockedEvaluations) Push(e interface{}) {
*p = append(*p, e.(*structs.Evaluation))
}

// Pop implements the heap interface and is used to remove an evaluation from the slice
func (p *BlockedEvaluations) Pop() interface{} {
n := len(*p)
e := (*p)[n-1]
(*p)[n-1] = nil
*p = (*p)[:n-1]
return e
}

// MarkForCancel is used to clear the blocked list of all but the one with the
// highest modify index and highest priority. It returns a slice of cancelable
// evals so that Eval.Ack RPCs can write batched raft entries to cancel
// them. This must be called inside the broker's lock.
func (p *BlockedEvaluations) MarkForCancel() []*structs.Evaluation {

// In pathological cases, we can have a large number of blocked evals but
// will want to cancel most of them. Using heap.Remove requires we re-sort
// for each eval we remove. Because we expect to have at most one remaining,
// we'll just create a new heap.
retain := BlockedEvaluations{}

raw := heap.Pop(p)
heap.Push(&retain, raw)

cancelable := make([]*structs.Evaluation, len(*p))
copy(cancelable, *p)

*p = retain
return cancelable
}
Loading

0 comments on commit e372037

Please sign in to comment.