Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Back-pressure on Nacks and ensure scheduling progress on failures #2555

Merged
merged 5 commits into from
Apr 14, 2017
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 65 additions & 19 deletions nomad/eval_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ var (

// ErrNackTimeoutReached is returned if an expired evaluation is reset
ErrNackTimeoutReached = errors.New("evaluation nack timeout reached")

// initialNackReenqueueDelay is the delay applied before re-enqueuing a
// Nacked evaluation for the first time
initialNackReenqueueDelay = time.Second
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's thread these through a config like EvalNackTimeout, that way there isn't a weird hack for testing either.


// subsequentNackReenqueueDelay is a compounding delay applied on each
// subsequent Nack for an evaluation.
subsequentNackReenqueueDelay = 20 * time.Second
)

// EvalBroker is used to manage brokering of evaluations. When an evaluation is
Expand Down Expand Up @@ -76,6 +84,10 @@ type EvalBroker struct {
// timeWait has evaluations that are waiting for time to elapse
timeWait map[string]*time.Timer

// Nack delays are defaulted and are only made available for testing
initialNackDelay time.Duration
subsequentNackDelay time.Duration

l sync.RWMutex
}

Expand All @@ -100,18 +112,20 @@ func NewEvalBroker(timeout time.Duration, deliveryLimit int) (*EvalBroker, error
return nil, fmt.Errorf("timeout cannot be negative")
}
b := &EvalBroker{
nackTimeout: timeout,
deliveryLimit: deliveryLimit,
enabled: false,
stats: new(BrokerStats),
evals: make(map[string]int),
jobEvals: make(map[string]string),
blocked: make(map[string]PendingEvaluations),
ready: make(map[string]PendingEvaluations),
unack: make(map[string]*unackEval),
waiting: make(map[string]chan struct{}),
requeue: make(map[string]*structs.Evaluation),
timeWait: make(map[string]*time.Timer),
nackTimeout: timeout,
deliveryLimit: deliveryLimit,
enabled: false,
stats: new(BrokerStats),
evals: make(map[string]int),
jobEvals: make(map[string]string),
blocked: make(map[string]PendingEvaluations),
ready: make(map[string]PendingEvaluations),
unack: make(map[string]*unackEval),
waiting: make(map[string]chan struct{}),
requeue: make(map[string]*structs.Evaluation),
timeWait: make(map[string]*time.Timer),
initialNackDelay: initialNackReenqueueDelay,
subsequentNackDelay: subsequentNackReenqueueDelay,
}
b.stats.ByScheduler = make(map[string]*SchedulerStats)
return b, nil
Expand Down Expand Up @@ -187,17 +201,23 @@ func (b *EvalBroker) processEnqueue(eval *structs.Evaluation, token string) {

// Check if we need to enforce a wait
if eval.Wait > 0 {
timer := time.AfterFunc(eval.Wait, func() {
b.enqueueWaiting(eval)
})
b.timeWait[eval.ID] = timer
b.stats.TotalWaiting += 1
b.processWaitingEnqueue(eval)
return
}

b.enqueueLocked(eval, eval.Type)
}

// processWaitingEnqueue waits the given duration on the evaluation before
// enqueueing.
func (b *EvalBroker) processWaitingEnqueue(eval *structs.Evaluation) {
timer := time.AfterFunc(eval.Wait, func() {
b.enqueueWaiting(eval)
})
b.timeWait[eval.ID] = timer
b.stats.TotalWaiting += 1
}

// enqueueWaiting is used to enqueue a waiting evaluation
func (b *EvalBroker) enqueueWaiting(eval *structs.Evaluation) {
b.l.Lock()
Expand Down Expand Up @@ -547,14 +567,40 @@ func (b *EvalBroker) Nack(evalID, token string) error {

// Check if we've hit the delivery limit, and re-enqueue
// in the failedQueue
if b.evals[evalID] >= b.deliveryLimit {
if dequeues := b.evals[evalID]; dequeues >= b.deliveryLimit {
b.enqueueLocked(unack.Eval, failedQueue)
} else {
b.enqueueLocked(unack.Eval, unack.Eval.Type)
e := unack.Eval
e.Wait = b.nackReenqueueDelay(e, dequeues)

// See if there should be a delay before re-enqueuing
if e.Wait > 0 {
b.processWaitingEnqueue(e)
} else {
b.enqueueLocked(e, e.Type)
}
}

return nil
}

// nackReenqueueDelay is used to determine the delay that should be applied on
// the evaluation given the number of previous attempts
func (b *EvalBroker) nackReenqueueDelay(eval *structs.Evaluation, prevDequeues int) time.Duration {
var delay time.Duration

switch {
case prevDequeues <= 0:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make each case just return an explicit value? Makes it easier, especially since we don't post-process the value

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what you mean here. The number of retries is a config option so not sure how you enumerate all possibilities.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean instead of setting delay in the outer block and an empty clause, do an explicit return in each and remove the outer variable.

case prevDequeues == 1:
delay = b.initialNackDelay
default:
// For each subsequent nack compound a delay
delay = time.Duration(prevDequeues-1) * b.subsequentNackDelay
}

return delay
}

// PauseNackTimeout is used to pause the Nack timeout for an eval that is making
// progress but is in a potentially unbounded operation such as the plan queue.
func (b *EvalBroker) PauseNackTimeout(evalID, token string) error {
Expand Down
Loading