Skip to content

Commit

Permalink
Merge pull request #325 from hashicorp/f-reset-nack
Browse files Browse the repository at this point in the history
Reset evaluation Nack timer in response to scheduler operations
  • Loading branch information
dadgar committed Oct 24, 2015
2 parents 4718a05 + 2beae1a commit de3b0dd
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 23 deletions.
30 changes: 30 additions & 0 deletions nomad/eval_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package nomad

import (
"container/heap"
"errors"
"fmt"
"math/rand"
"sync"
Expand All @@ -18,6 +19,17 @@ const (
failedQueue = "_failed"
)

var (
// ErrNotOutstanding is returned if an evaluation is not outstanding
ErrNotOutstanding = errors.New("evaluation is not outstanding")

// ErrTokenMismatch is the outstanding eval has a different token
ErrTokenMismatch = errors.New("evaluation token does not match")

// ErrNackTimeoutReached is returned if an expired evaluation is reset
ErrNackTimeoutReached = errors.New("evaluation nack timeout reached")
)

// EvalBroker is used to manage brokering of evaluations. When an evaluation is
// created, due to a change in a job specification or a node, we put it into the
// broker. The broker sorts by evaluations by priority and scheduler type. This
Expand Down Expand Up @@ -381,6 +393,24 @@ func (b *EvalBroker) Outstanding(evalID string) (string, bool) {
return unack.Token, true
}

// OutstandingReset resets the Nack timer for the EvalID if the
// token matches and the eval is outstanding
func (b *EvalBroker) OutstandingReset(evalID, token string) error {
b.l.RLock()
defer b.l.RUnlock()
unack, ok := b.unack[evalID]
if !ok {
return ErrNotOutstanding
}
if unack.Token != token {
return ErrTokenMismatch
}
if !unack.NackTimer.Reset(b.nackTimeout) {
return ErrNackTimeoutReached
}
return nil
}

// Ack is used to positively acknowledge handling an evaluation
func (b *EvalBroker) Ack(evalID, token string) error {
b.l.Lock()
Expand Down
58 changes: 58 additions & 0 deletions nomad/eval_broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,20 @@ func TestEvalBroker_Enqueue_Dequeue_Nack_Ack(t *testing.T) {
t.Fatalf("Bad: %#v %#v", token, tokenOut)
}

// OutstandingReset should verify the token
err = b.OutstandingReset("nope", "foo")
if err != ErrNotOutstanding {
t.Fatalf("err: %v", err)
}
err = b.OutstandingReset(out.ID, "foo")
if err != ErrTokenMismatch {
t.Fatalf("err: %v", err)
}
err = b.OutstandingReset(out.ID, tokenOut)
if err != nil {
t.Fatalf("err: %v", err)
}

// Check the stats
stats = b.Stats()
if stats.TotalReady != 0 {
Expand Down Expand Up @@ -560,6 +574,50 @@ func TestEvalBroker_Nack_Timeout(t *testing.T) {
}
}

// Ensure we nack in a timely manner
func TestEvalBroker_Nack_TimeoutReset(t *testing.T) {
b := testBroker(t, 5*time.Millisecond)
b.SetEnabled(true)

// Enqueue
eval := mock.Eval()
err := b.Enqueue(eval)
if err != nil {
t.Fatalf("err: %v", err)
}

// Dequeue
out, token, err := b.Dequeue(defaultSched, time.Second)
start := time.Now()
if err != nil {
t.Fatalf("err: %v", err)
}
if out != eval {
t.Fatalf("bad: %v", out)
}

// Reset in 2 milliseconds
time.Sleep(2 * time.Millisecond)
if err := b.OutstandingReset(out.ID, token); err != nil {
t.Fatalf("err: %v", err)
}

// Dequeue, should block on Nack timer
out, _, err = b.Dequeue(defaultSched, time.Second)
end := time.Now()
if err != nil {
t.Fatalf("err: %v", err)
}
if out != eval {
t.Fatalf("bad: %v", out)
}

// Check the nack timer
if diff := end.Sub(start); diff < 7*time.Millisecond {
t.Fatalf("bad: %#v", diff)
}
}

func TestEvalBroker_DeliveryLimit(t *testing.T) {
b := testBroker(t, 0)
b.SetEnabled(true)
Expand Down
16 changes: 4 additions & 12 deletions nomad/eval_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,8 @@ func (e *Eval) Update(args *structs.EvalUpdateRequest,
eval := args.Evals[0]

// Verify the evaluation is outstanding, and that the tokens match.
token, ok := e.srv.evalBroker.Outstanding(eval.ID)
if !ok {
return fmt.Errorf("evaluation is not outstanding")
}
if args.EvalToken != token {
return fmt.Errorf("evaluation token does not match")
if err := e.srv.evalBroker.OutstandingReset(eval.ID, args.EvalToken); err != nil {
return err
}

// Update via Raft
Expand Down Expand Up @@ -168,12 +164,8 @@ func (e *Eval) Create(args *structs.EvalUpdateRequest,
eval := args.Evals[0]

// Verify the parent evaluation is outstanding, and that the tokens match.
token, ok := e.srv.evalBroker.Outstanding(eval.PreviousEval)
if !ok {
return fmt.Errorf("previous evaluation is not outstanding")
}
if args.EvalToken != token {
return fmt.Errorf("previous evaluation token does not match")
if err := e.srv.evalBroker.OutstandingReset(eval.PreviousEval, args.EvalToken); err != nil {
return err
}

// Look for the eval
Expand Down
15 changes: 4 additions & 11 deletions nomad/plan_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,10 @@ func (s *Server) planApply() {
}

// Verify the evaluation is outstanding, and that the tokens match.
token, ok := s.evalBroker.Outstanding(pending.plan.EvalID)
if !ok {
s.logger.Printf("[ERR] nomad: plan received for non-outstanding evaluation %s",
pending.plan.EvalID)
pending.respond(nil, fmt.Errorf("evaluation is not outstanding"))
continue
}
if pending.plan.EvalToken != token {
s.logger.Printf("[ERR] nomad: plan received for evaluation %s with wrong token",
pending.plan.EvalID)
pending.respond(nil, fmt.Errorf("evaluation token does not match"))
if err := s.evalBroker.OutstandingReset(pending.plan.EvalID, pending.plan.EvalToken); err != nil {
s.logger.Printf("[ERR] nomad: plan rejected for evaluation %s: %v",
pending.plan.EvalID, err)
pending.respond(nil, err)
continue
}

Expand Down

0 comments on commit de3b0dd

Please sign in to comment.