Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eval broker: shed all but one blocked eval per job after ack #14621

Merged
merged 3 commits into from
Nov 16, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/14621.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
eval broker: shed all but one blocked eval per job after successful ack
tgross marked this conversation as resolved.
Show resolved Hide resolved
```
7 changes: 7 additions & 0 deletions nomad/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,12 @@ type Config struct {
// retrying a failed evaluation.
EvalFailedFollowupBaselineDelay time.Duration

// EvalReapCancelableInterval is the interval for the periodic reaping of
// cancelable evaluations. Cancelable evaluations are canceled whenever any
// eval is ack'd but this sweeps up on quiescent clusters. This config value
// exists only for testing.
EvalReapCancelableInterval time.Duration

// EvalFailedFollowupDelayRange defines the range of additional time from
// the baseline in which to wait before retrying a failed evaluation. The
// additional delay is selected from this range randomly.
Expand Down Expand Up @@ -471,6 +477,7 @@ func DefaultConfig() *Config {
EvalNackSubsequentReenqueueDelay: 20 * time.Second,
EvalFailedFollowupBaselineDelay: 1 * time.Minute,
EvalFailedFollowupDelayRange: 5 * time.Minute,
EvalReapCancelableInterval: 5 * time.Second,
MinHeartbeatTTL: 10 * time.Second,
MaxHeartbeatsPerSecond: 50.0,
HeartbeatGrace: 10 * time.Second,
Expand Down
132 changes: 116 additions & 16 deletions nomad/eval_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ type EvalBroker struct {
jobEvals map[structs.NamespacedID]string

// blocked tracks the blocked evaluations by JobID in a priority queue
blocked map[structs.NamespacedID]PendingEvaluations
blocked map[structs.NamespacedID]BlockedEvaluations

// cancelable tracks previously blocked evaluations (for any job) that are
// now safe for the Eval.Ack RPC to cancel in batches
cancelable []*structs.Evaluation

// ready tracks the ready jobs by scheduler in a priority queue
ready map[string]PendingEvaluations
Expand Down Expand Up @@ -115,11 +119,14 @@ type unackEval struct {
NackTimer *time.Timer
}

// PendingEvaluations is a list of waiting evaluations.
// We implement the container/heap interface so that this is a
// priority queue
// PendingEvaluations is a list of ready evaluations across multiple jobs. We
// implement the container/heap interface so that this is a priority queue.
type PendingEvaluations []*structs.Evaluation

// BlockedEvaluations is a list of blocked evaluations for a given job. We
// implement the container/heap interface so that this is a priority queue.
type BlockedEvaluations []*structs.Evaluation
Comment on lines +122 to +128
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a slight preference for renaming [Bb]locked -> [Pp]ending everywhere and using ReadyEvaluations for the ready queue, but your naming scheme makes sense and changes less code which is nice for such a critical PR.

If we do want to rename for future readers, let's do it in a followup.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Plus I still need to fix the metric name for #6480 so we can do that at the same time.


// NewEvalBroker creates a new evaluation broker. This is parameterized
// with the timeout used for messages that are not acknowledged before we
// assume a Nack and attempt to redeliver as well as the deliveryLimit
Expand All @@ -139,7 +146,8 @@ func NewEvalBroker(timeout, initialNackDelay, subsequentNackDelay time.Duration,
stats: new(BrokerStats),
evals: make(map[string]int),
jobEvals: make(map[structs.NamespacedID]string),
blocked: make(map[structs.NamespacedID]PendingEvaluations),
blocked: make(map[structs.NamespacedID]BlockedEvaluations),
cancelable: []*structs.Evaluation{},
tgross marked this conversation as resolved.
Show resolved Hide resolved
ready: make(map[string]PendingEvaluations),
unack: make(map[string]*unackEval),
waiting: make(map[string]chan struct{}),
Expand Down Expand Up @@ -586,15 +594,28 @@ func (b *EvalBroker) Ack(evalID, token string) error {

// Check if there are any blocked evaluations
if blocked := b.blocked[namespacedID]; len(blocked) != 0 {
raw := heap.Pop(&blocked)

// Any blocked evaluations with ModifyIndexes older than the just-ack'd
// evaluation are no longer useful, so it's safe to drop them.
cancelable := blocked.MarkForCancel()
b.cancelable = append(b.cancelable, cancelable...)
b.stats.TotalCancelable = len(b.cancelable)
b.stats.TotalBlocked -= len(cancelable)

// If any remain, enqueue an eval
if len(blocked) > 0 {
raw := heap.Pop(&blocked)
eval := raw.(*structs.Evaluation)
schmichael marked this conversation as resolved.
Show resolved Hide resolved
b.stats.TotalBlocked -= 1
b.enqueueLocked(eval, eval.Type)
}

// Clean up if there are no more after that
if len(blocked) > 0 {
b.blocked[namespacedID] = blocked
} else {
delete(b.blocked, namespacedID)
}
eval := raw.(*structs.Evaluation)
b.stats.TotalBlocked -= 1
b.enqueueLocked(eval, eval.Type)
}

// Re-enqueue the evaluation.
Expand Down Expand Up @@ -733,11 +754,13 @@ func (b *EvalBroker) flush() {
b.stats.TotalUnacked = 0
b.stats.TotalBlocked = 0
b.stats.TotalWaiting = 0
b.stats.TotalCancelable = 0
b.stats.DelayedEvals = make(map[string]*structs.Evaluation)
b.stats.ByScheduler = make(map[string]*SchedulerStats)
b.evals = make(map[string]int)
b.jobEvals = make(map[structs.NamespacedID]string)
b.blocked = make(map[structs.NamespacedID]PendingEvaluations)
b.blocked = make(map[structs.NamespacedID]BlockedEvaluations)
b.cancelable = []*structs.Evaluation{}
b.ready = make(map[string]PendingEvaluations)
b.unack = make(map[string]*unackEval)
b.timeWait = make(map[string]*time.Timer)
Expand Down Expand Up @@ -830,6 +853,7 @@ func (b *EvalBroker) Stats() *BrokerStats {
stats.TotalUnacked = b.stats.TotalUnacked
stats.TotalBlocked = b.stats.TotalBlocked
stats.TotalWaiting = b.stats.TotalWaiting
stats.TotalCancelable = b.stats.TotalCancelable
for id, eval := range b.stats.DelayedEvals {
evalCopy := *eval
stats.DelayedEvals[id] = &evalCopy
Expand All @@ -841,6 +865,24 @@ func (b *EvalBroker) Stats() *BrokerStats {
return stats
}

// Cancelable retrieves a batch of previously-blocked evaluations that are now
// stale and ready to mark for canceling. The eval RPC will call this with a
// batch size set to avoid sending overly large raft messages.
func (b *EvalBroker) Cancelable(batchSize int) []*structs.Evaluation {
b.l.RLock()
defer b.l.RUnlock()

if batchSize > len(b.cancelable) {
batchSize = len(b.cancelable)
}

cancelable := b.cancelable[:batchSize]
b.cancelable = b.cancelable[batchSize:]

b.stats.TotalCancelable = len(b.cancelable)
return cancelable
}

// EmitStats is used to export metrics about the broker while enabled
func (b *EvalBroker) EmitStats(period time.Duration, stopCh <-chan struct{}) {
timer, stop := helper.NewSafeTimer(period)
Expand All @@ -856,6 +898,7 @@ func (b *EvalBroker) EmitStats(period time.Duration, stopCh <-chan struct{}) {
metrics.SetGauge([]string{"nomad", "broker", "total_unacked"}, float32(stats.TotalUnacked))
metrics.SetGauge([]string{"nomad", "broker", "total_blocked"}, float32(stats.TotalBlocked))
metrics.SetGauge([]string{"nomad", "broker", "total_waiting"}, float32(stats.TotalWaiting))
metrics.SetGauge([]string{"nomad", "broker", "total_cancelable"}, float32(stats.TotalCancelable))
for _, eval := range stats.DelayedEvals {
metrics.SetGaugeWithLabels([]string{"nomad", "broker", "eval_waiting"},
float32(time.Until(eval.WaitUntil).Seconds()),
Expand All @@ -878,12 +921,13 @@ func (b *EvalBroker) EmitStats(period time.Duration, stopCh <-chan struct{}) {

// BrokerStats returns all the stats about the broker
type BrokerStats struct {
TotalReady int
TotalUnacked int
TotalBlocked int
TotalWaiting int
DelayedEvals map[string]*structs.Evaluation
ByScheduler map[string]*SchedulerStats
TotalReady int
TotalUnacked int
TotalBlocked int
TotalWaiting int
TotalCancelable int
DelayedEvals map[string]*structs.Evaluation
ByScheduler map[string]*SchedulerStats
}

// SchedulerStats returns the stats per scheduler
Expand Down Expand Up @@ -934,3 +978,59 @@ func (p PendingEvaluations) Peek() *structs.Evaluation {
}
return p[n-1]
}

// Len is for the sorting interface
func (p BlockedEvaluations) Len() int {
return len(p)
}

// Less is for the sorting interface. We flip the check
// so that the "min" in the min-heap is the element with the
// highest priority or highest modify index
func (p BlockedEvaluations) Less(i, j int) bool {
if p[i].Priority != p[j].Priority {
return !(p[i].Priority < p[j].Priority)
}
return !(p[i].ModifyIndex < p[j].ModifyIndex)
}

// Swap is for the sorting interface
func (p BlockedEvaluations) Swap(i, j int) {
p[i], p[j] = p[j], p[i]
}

// Push implements the heap interface and is used to add a new evaluation to the slice
func (p *BlockedEvaluations) Push(e interface{}) {
*p = append(*p, e.(*structs.Evaluation))
}

// Pop implements the heap interface and is used to remove an evaluation from the slice
func (p *BlockedEvaluations) Pop() interface{} {
n := len(*p)
e := (*p)[n-1]
(*p)[n-1] = nil
*p = (*p)[:n-1]
return e
}

// MarkForCancel is used to clear the blocked list of all but the one with the
// highest modify index and highest priority. It returns a slice of cancelable
// evals so that Eval.Ack RPCs can write batched raft entries to cancel
// them. This must be called inside the broker's lock.
func (p *BlockedEvaluations) MarkForCancel() []*structs.Evaluation {

// In pathological cases, we can have a large number of blocked evals but
// will want to cancel most of them. Using heap.Remove requires we re-sort
// for each eval we remove. Because we expect to have at most one remaining,
// we'll just create a new heap.
retain := BlockedEvaluations{}

raw := heap.Pop(p)
heap.Push(&retain, raw)
tgross marked this conversation as resolved.
Show resolved Hide resolved

cancelable := make([]*structs.Evaluation, len(*p))
copy(cancelable, *p)

*p = retain
return cancelable
}
Loading