Skip to content

Commit

Permalink
core: Pause NackTimeout while in the plan_queue as progress is being …
Browse files Browse the repository at this point in the history
…made
  • Loading branch information
dadgar committed Mar 4, 2016
1 parent 44f5d75 commit 72901a9
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 1 deletion.
34 changes: 34 additions & 0 deletions nomad/eval_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,40 @@ func (b *EvalBroker) Nack(evalID, token string) error {
return nil
}

// PauseNackTimeout is used to pause the Nack timeout for an eval that is making
// progress but is in a potentially unbounded operation such as the plan queue.
func (b *EvalBroker) PauseNackTimeout(evalID, token string) error {
b.l.RLock()
defer b.l.RUnlock()
unack, ok := b.unack[evalID]
if !ok {
return ErrNotOutstanding
}
if unack.Token != token {
return ErrTokenMismatch
}
if !unack.NackTimer.Stop() {
return ErrNackTimeoutReached
}
return nil
}

// ResumeNackTimeout is used to resume the Nack timeout for an eval that was
// paused. It should be resumed after leaving an unbounded operation.
func (b *EvalBroker) ResumeNackTimeout(evalID, token string) error {
b.l.Lock()
defer b.l.Unlock()
unack, ok := b.unack[evalID]
if !ok {
return ErrNotOutstanding
}
if unack.Token != token {
return ErrTokenMismatch
}
unack.NackTimer.Reset(b.nackTimeout)
return nil
}

// Flush is used to clear the state of the broker
func (b *EvalBroker) Flush() {
b.l.Lock()
Expand Down
50 changes: 50 additions & 0 deletions nomad/eval_broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,56 @@ func TestEvalBroker_Nack_TimeoutReset(t *testing.T) {
}
}

func TestEvalBroker_PauseResumeNackTimeout(t *testing.T) {
b := testBroker(t, 5*time.Millisecond)
b.SetEnabled(true)

// Enqueue
eval := mock.Eval()
err := b.Enqueue(eval)
if err != nil {
t.Fatalf("err: %v", err)
}

// Dequeue
out, token, err := b.Dequeue(defaultSched, time.Second)
start := time.Now()
if err != nil {
t.Fatalf("err: %v", err)
}
if out != eval {
t.Fatalf("bad: %v", out)
}

// Pause in 2 milliseconds
time.Sleep(2 * time.Millisecond)
if err := b.PauseNackTimeout(out.ID, token); err != nil {
t.Fatalf("err: %v", err)
}

go func() {
time.Sleep(2 * time.Millisecond)
if err := b.ResumeNackTimeout(out.ID, token); err != nil {
t.Fatalf("err: %v", err)
}
}()

// Dequeue, should block until the timer is resumed
out, _, err = b.Dequeue(defaultSched, time.Second)
end := time.Now()
if err != nil {
t.Fatalf("err: %v", err)
}
if out != eval {
t.Fatalf("bad: %v", out)
}

// Check the nack timer
if diff := end.Sub(start); diff < 9*time.Millisecond {
t.Fatalf("bad: %#v", diff)
}
}

func TestEvalBroker_DeliveryLimit(t *testing.T) {
b := testBroker(t, 0)
b.SetEnabled(true)
Expand Down
13 changes: 12 additions & 1 deletion nomad/plan_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,19 @@ func (p *Plan) Submit(args *structs.PlanRequest, reply *structs.PlanResponse) er
}
defer metrics.MeasureSince([]string{"nomad", "plan", "submit"}, time.Now())

// Pause the Nack timer for the eval as it is making progress as long as it
// is in the plan queue. We resume immediately after we get a result to
// handle the case that the receiving worker dies.
plan := args.Plan
id := plan.EvalID
token := plan.EvalToken
if err := p.srv.evalBroker.PauseNackTimeout(id, token); err != nil {
return err
}
defer p.srv.evalBroker.ResumeNackTimeout(id, token)

// Submit the plan to the queue
future, err := p.srv.planQueue.Enqueue(args.Plan)
future, err := p.srv.planQueue.Enqueue(plan)
if err != nil {
return err
}
Expand Down

0 comments on commit 72901a9

Please sign in to comment.