Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: Pause/Resume Nack timeout while eval is in plan queue #884

Merged
merged 1 commit into from
Mar 4, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions nomad/eval_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,40 @@ func (b *EvalBroker) Nack(evalID, token string) error {
return nil
}

// PauseNackTimeout is used to pause the Nack timeout for an eval that is making
// progress but is in a potentially unbounded operation such as the plan queue.
func (b *EvalBroker) PauseNackTimeout(evalID, token string) error {
b.l.RLock()
defer b.l.RUnlock()
unack, ok := b.unack[evalID]
if !ok {
return ErrNotOutstanding
}
if unack.Token != token {
return ErrTokenMismatch
}
if !unack.NackTimer.Stop() {
return ErrNackTimeoutReached
}
return nil
}

// ResumeNackTimeout is used to resume the Nack timeout for an eval that was
// paused. It should be resumed after leaving an unbounded operation.
func (b *EvalBroker) ResumeNackTimeout(evalID, token string) error {
b.l.Lock()
defer b.l.Unlock()
unack, ok := b.unack[evalID]
if !ok {
return ErrNotOutstanding
}
if unack.Token != token {
return ErrTokenMismatch
}
unack.NackTimer.Reset(b.nackTimeout)
return nil
}

// Flush is used to clear the state of the broker
func (b *EvalBroker) Flush() {
b.l.Lock()
Expand Down
50 changes: 50 additions & 0 deletions nomad/eval_broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,56 @@ func TestEvalBroker_Nack_TimeoutReset(t *testing.T) {
}
}

func TestEvalBroker_PauseResumeNackTimeout(t *testing.T) {
b := testBroker(t, 5*time.Millisecond)
b.SetEnabled(true)

// Enqueue
eval := mock.Eval()
err := b.Enqueue(eval)
if err != nil {
t.Fatalf("err: %v", err)
}

// Dequeue
out, token, err := b.Dequeue(defaultSched, time.Second)
start := time.Now()
if err != nil {
t.Fatalf("err: %v", err)
}
if out != eval {
t.Fatalf("bad: %v", out)
}

// Pause in 2 milliseconds
time.Sleep(2 * time.Millisecond)
if err := b.PauseNackTimeout(out.ID, token); err != nil {
t.Fatalf("err: %v", err)
}

go func() {
time.Sleep(2 * time.Millisecond)
if err := b.ResumeNackTimeout(out.ID, token); err != nil {
t.Fatalf("err: %v", err)
}
}()

// Dequeue, should block until the timer is resumed
out, _, err = b.Dequeue(defaultSched, time.Second)
end := time.Now()
if err != nil {
t.Fatalf("err: %v", err)
}
if out != eval {
t.Fatalf("bad: %v", out)
}

// Check the nack timer
if diff := end.Sub(start); diff < 9*time.Millisecond {
t.Fatalf("bad: %#v", diff)
}
}

func TestEvalBroker_DeliveryLimit(t *testing.T) {
b := testBroker(t, 0)
b.SetEnabled(true)
Expand Down
13 changes: 12 additions & 1 deletion nomad/plan_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,19 @@ func (p *Plan) Submit(args *structs.PlanRequest, reply *structs.PlanResponse) er
}
defer metrics.MeasureSince([]string{"nomad", "plan", "submit"}, time.Now())

// Pause the Nack timer for the eval as it is making progress as long as it
// is in the plan queue. We resume immediately after we get a result to
// handle the case that the receiving worker dies.
plan := args.Plan
id := plan.EvalID
token := plan.EvalToken
if err := p.srv.evalBroker.PauseNackTimeout(id, token); err != nil {
return err
}
defer p.srv.evalBroker.ResumeNackTimeout(id, token)

// Submit the plan to the queue
future, err := p.srv.planQueue.Enqueue(args.Plan)
future, err := p.srv.planQueue.Enqueue(plan)
if err != nil {
return err
}
Expand Down