Skip to content

Commit

Permalink
Backport of eval broker: shed all but one blocked eval per job after …
Browse files Browse the repository at this point in the history
…ack into release/1.4.x (#15272)

This pull request was automerged via backport-assistant
  • Loading branch information
hc-github-team-nomad-core committed Nov 16, 2022
1 parent b640a11 commit 1e7a918
Show file tree
Hide file tree
Showing 8 changed files with 503 additions and 264 deletions.
3 changes: 3 additions & 0 deletions .changelog/14621.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
scheduler: when multiple evaluations are pending for the same job, evaluate the latest and cancel the intermediaries on success
```
7 changes: 7 additions & 0 deletions nomad/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,12 @@ type Config struct {
// retrying a failed evaluation.
EvalFailedFollowupBaselineDelay time.Duration

// EvalReapCancelableInterval is the interval for the periodic reaping of
// cancelable evaluations. Cancelable evaluations are canceled whenever any
// eval is ack'd but this sweeps up on quiescent clusters. This config value
// exists only for testing.
EvalReapCancelableInterval time.Duration

// EvalFailedFollowupDelayRange defines the range of additional time from
// the baseline in which to wait before retrying a failed evaluation. The
// additional delay is selected from this range randomly.
Expand Down Expand Up @@ -471,6 +477,7 @@ func DefaultConfig() *Config {
EvalNackSubsequentReenqueueDelay: 20 * time.Second,
EvalFailedFollowupBaselineDelay: 1 * time.Minute,
EvalFailedFollowupDelayRange: 5 * time.Minute,
EvalReapCancelableInterval: 5 * time.Second,
MinHeartbeatTTL: 10 * time.Second,
MaxHeartbeatsPerSecond: 50.0,
HeartbeatGrace: 10 * time.Second,
Expand Down
129 changes: 113 additions & 16 deletions nomad/eval_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ type EvalBroker struct {
jobEvals map[structs.NamespacedID]string

// blocked tracks the blocked evaluations by JobID in a priority queue
blocked map[structs.NamespacedID]PendingEvaluations
blocked map[structs.NamespacedID]BlockedEvaluations

// cancelable tracks previously blocked evaluations (for any job) that are
// now safe for the Eval.Ack RPC to cancel in batches
cancelable []*structs.Evaluation

// ready tracks the ready jobs by scheduler in a priority queue
ready map[string]PendingEvaluations
Expand Down Expand Up @@ -115,11 +119,14 @@ type unackEval struct {
NackTimer *time.Timer
}

// PendingEvaluations is a list of waiting evaluations.
// We implement the container/heap interface so that this is a
// priority queue
// PendingEvaluations is a list of ready evaluations across multiple jobs. We
// implement the container/heap interface so that this is a priority queue.
type PendingEvaluations []*structs.Evaluation

// BlockedEvaluations is a list of blocked evaluations for a given job. We
// implement the container/heap interface so that this is a priority queue.
type BlockedEvaluations []*structs.Evaluation

// NewEvalBroker creates a new evaluation broker. This is parameterized
// with the timeout used for messages that are not acknowledged before we
// assume a Nack and attempt to redeliver as well as the deliveryLimit
Expand All @@ -139,7 +146,8 @@ func NewEvalBroker(timeout, initialNackDelay, subsequentNackDelay time.Duration,
stats: new(BrokerStats),
evals: make(map[string]int),
jobEvals: make(map[structs.NamespacedID]string),
blocked: make(map[structs.NamespacedID]PendingEvaluations),
blocked: make(map[structs.NamespacedID]BlockedEvaluations),
cancelable: make([]*structs.Evaluation, 0, structs.MaxUUIDsPerWriteRequest),
ready: make(map[string]PendingEvaluations),
unack: make(map[string]*unackEval),
waiting: make(map[string]chan struct{}),
Expand Down Expand Up @@ -586,15 +594,28 @@ func (b *EvalBroker) Ack(evalID, token string) error {

// Check if there are any blocked evaluations
if blocked := b.blocked[namespacedID]; len(blocked) != 0 {
raw := heap.Pop(&blocked)

// Any blocked evaluations with ModifyIndexes older than the just-ack'd
// evaluation are no longer useful, so it's safe to drop them.
cancelable := blocked.MarkForCancel()
b.cancelable = append(b.cancelable, cancelable...)
b.stats.TotalCancelable = len(b.cancelable)
b.stats.TotalBlocked -= len(cancelable)

// If any remain, enqueue an eval
if len(blocked) > 0 {
raw := heap.Pop(&blocked)
eval := raw.(*structs.Evaluation)
b.stats.TotalBlocked -= 1
b.enqueueLocked(eval, eval.Type)
}

// Clean up if there are no more after that
if len(blocked) > 0 {
b.blocked[namespacedID] = blocked
} else {
delete(b.blocked, namespacedID)
}
eval := raw.(*structs.Evaluation)
b.stats.TotalBlocked -= 1
b.enqueueLocked(eval, eval.Type)
}

// Re-enqueue the evaluation.
Expand Down Expand Up @@ -733,11 +754,13 @@ func (b *EvalBroker) flush() {
b.stats.TotalUnacked = 0
b.stats.TotalBlocked = 0
b.stats.TotalWaiting = 0
b.stats.TotalCancelable = 0
b.stats.DelayedEvals = make(map[string]*structs.Evaluation)
b.stats.ByScheduler = make(map[string]*SchedulerStats)
b.evals = make(map[string]int)
b.jobEvals = make(map[structs.NamespacedID]string)
b.blocked = make(map[structs.NamespacedID]PendingEvaluations)
b.blocked = make(map[structs.NamespacedID]BlockedEvaluations)
b.cancelable = make([]*structs.Evaluation, 0, structs.MaxUUIDsPerWriteRequest)
b.ready = make(map[string]PendingEvaluations)
b.unack = make(map[string]*unackEval)
b.timeWait = make(map[string]*time.Timer)
Expand Down Expand Up @@ -830,6 +853,7 @@ func (b *EvalBroker) Stats() *BrokerStats {
stats.TotalUnacked = b.stats.TotalUnacked
stats.TotalBlocked = b.stats.TotalBlocked
stats.TotalWaiting = b.stats.TotalWaiting
stats.TotalCancelable = b.stats.TotalCancelable
for id, eval := range b.stats.DelayedEvals {
evalCopy := *eval
stats.DelayedEvals[id] = &evalCopy
Expand All @@ -841,6 +865,24 @@ func (b *EvalBroker) Stats() *BrokerStats {
return stats
}

// Cancelable retrieves a batch of previously-blocked evaluations that are now
// stale and ready to mark for canceling. The eval RPC will call this with a
// batch size set to avoid sending overly large raft messages.
func (b *EvalBroker) Cancelable(batchSize int) []*structs.Evaluation {
b.l.RLock()
defer b.l.RUnlock()

if batchSize > len(b.cancelable) {
batchSize = len(b.cancelable)
}

cancelable := b.cancelable[:batchSize]
b.cancelable = b.cancelable[batchSize:]

b.stats.TotalCancelable = len(b.cancelable)
return cancelable
}

// EmitStats is used to export metrics about the broker while enabled
func (b *EvalBroker) EmitStats(period time.Duration, stopCh <-chan struct{}) {
timer, stop := helper.NewSafeTimer(period)
Expand All @@ -856,6 +898,7 @@ func (b *EvalBroker) EmitStats(period time.Duration, stopCh <-chan struct{}) {
metrics.SetGauge([]string{"nomad", "broker", "total_unacked"}, float32(stats.TotalUnacked))
metrics.SetGauge([]string{"nomad", "broker", "total_blocked"}, float32(stats.TotalBlocked))
metrics.SetGauge([]string{"nomad", "broker", "total_waiting"}, float32(stats.TotalWaiting))
metrics.SetGauge([]string{"nomad", "broker", "total_cancelable"}, float32(stats.TotalCancelable))
for _, eval := range stats.DelayedEvals {
metrics.SetGaugeWithLabels([]string{"nomad", "broker", "eval_waiting"},
float32(time.Until(eval.WaitUntil).Seconds()),
Expand All @@ -878,12 +921,13 @@ func (b *EvalBroker) EmitStats(period time.Duration, stopCh <-chan struct{}) {

// BrokerStats returns all the stats about the broker
type BrokerStats struct {
TotalReady int
TotalUnacked int
TotalBlocked int
TotalWaiting int
DelayedEvals map[string]*structs.Evaluation
ByScheduler map[string]*SchedulerStats
TotalReady int
TotalUnacked int
TotalBlocked int
TotalWaiting int
TotalCancelable int
DelayedEvals map[string]*structs.Evaluation
ByScheduler map[string]*SchedulerStats
}

// SchedulerStats returns the stats per scheduler
Expand Down Expand Up @@ -934,3 +978,56 @@ func (p PendingEvaluations) Peek() *structs.Evaluation {
}
return p[n-1]
}

// Len is for the sorting interface
func (p BlockedEvaluations) Len() int {
return len(p)
}

// Less is for the sorting interface. We flip the check
// so that the "min" in the min-heap is the element with the
// highest priority or highest modify index
func (p BlockedEvaluations) Less(i, j int) bool {
if p[i].Priority != p[j].Priority {
return !(p[i].Priority < p[j].Priority)
}
return !(p[i].ModifyIndex < p[j].ModifyIndex)
}

// Swap is for the sorting interface
func (p BlockedEvaluations) Swap(i, j int) {
p[i], p[j] = p[j], p[i]
}

// Push implements the heap interface and is used to add a new evaluation to the slice
func (p *BlockedEvaluations) Push(e interface{}) {
*p = append(*p, e.(*structs.Evaluation))
}

// Pop implements the heap interface and is used to remove an evaluation from the slice
func (p *BlockedEvaluations) Pop() interface{} {
n := len(*p)
e := (*p)[n-1]
(*p)[n-1] = nil
*p = (*p)[:n-1]
return e
}

// MarkForCancel is used to clear the blocked list of all but the one with the
// highest modify index and highest priority. It returns a slice of cancelable
// evals so that Eval.Ack RPCs can write batched raft entries to cancel
// them. This must be called inside the broker's lock.
func (p *BlockedEvaluations) MarkForCancel() []*structs.Evaluation {

// In pathological cases, we can have a large number of blocked evals but
// will want to cancel most of them. Using heap.Remove requires we re-sort
// for each eval we remove. Because we expect to have at most one remaining,
// we'll just create a new heap.
retain := BlockedEvaluations{(heap.Pop(p)).(*structs.Evaluation)}

cancelable := make([]*structs.Evaluation, len(*p))
copy(cancelable, *p)

*p = retain
return cancelable
}
Loading

0 comments on commit 1e7a918

Please sign in to comment.