From 099ac4c1d5b66759e2a0006ac936a31704fa6e32 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Fri, 23 Oct 2015 10:14:16 -0700 Subject: [PATCH 1/3] nomad: Adding OutstandingReset to EvalBroker --- nomad/eval_broker.go | 15 +++++++++++ nomad/eval_broker_test.go | 54 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 69 insertions(+) diff --git a/nomad/eval_broker.go b/nomad/eval_broker.go index 5ba29d3ddd23..63eb01794e64 100644 --- a/nomad/eval_broker.go +++ b/nomad/eval_broker.go @@ -381,6 +381,21 @@ func (b *EvalBroker) Outstanding(evalID string) (string, bool) { return unack.Token, true } +// OutstandingReset resets the Nack timer for the EvalID if the +// token matches and the eval is outstanding +func (b *EvalBroker) OutstandingReset(evalID, token string) bool { + b.l.RLock() + defer b.l.RUnlock() + unack, ok := b.unack[evalID] + if !ok { + return false + } + if unack.Token != token { + return false + } + return unack.NackTimer.Reset(b.nackTimeout) +} + // Ack is used to positively acknowledge handling an evaluation func (b *EvalBroker) Ack(evalID, token string) error { b.l.Lock() diff --git a/nomad/eval_broker_test.go b/nomad/eval_broker_test.go index 1fb99a5bcf9f..6143125dfb94 100644 --- a/nomad/eval_broker_test.go +++ b/nomad/eval_broker_test.go @@ -89,6 +89,16 @@ func TestEvalBroker_Enqueue_Dequeue_Nack_Ack(t *testing.T) { t.Fatalf("Bad: %#v %#v", token, tokenOut) } + // OutstandingReset should verify the token + reset := b.OutstandingReset(out.ID, "foo") + if reset { + t.Fatalf("bad") + } + reset = b.OutstandingReset(out.ID, tokenOut) + if !reset { + t.Fatalf("bad") + } + // Check the stats stats = b.Stats() if stats.TotalReady != 0 { @@ -560,6 +570,50 @@ func TestEvalBroker_Nack_Timeout(t *testing.T) { } } +// Ensure we nack in a timely manner +func TestEvalBroker_Nack_TimeoutReset(t *testing.T) { + b := testBroker(t, 5*time.Millisecond) + b.SetEnabled(true) + + // Enqueue + eval := mock.Eval() + err := b.Enqueue(eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Dequeue + out, token, err := b.Dequeue(defaultSched, time.Second) + start := time.Now() + if err != nil { + t.Fatalf("err: %v", err) + } + if out != eval { + t.Fatalf("bad: %v", out) + } + + // Reset in 2 milliseconds + time.Sleep(2 * time.Millisecond) + if reset := b.OutstandingReset(out.ID, token); !reset { + t.Fatalf("bad") + } + + // Dequeue, should block on Nack timer + out, _, err = b.Dequeue(defaultSched, time.Second) + end := time.Now() + if err != nil { + t.Fatalf("err: %v", err) + } + if out != eval { + t.Fatalf("bad: %v", out) + } + + // Check the nack timer + if diff := end.Sub(start); diff < 7*time.Millisecond { + t.Fatalf("bad: %#v", diff) + } +} + func TestEvalBroker_DeliveryLimit(t *testing.T) { b := testBroker(t, 0) b.SetEnabled(true) From a050185bf1f0e901530ef9b138d41fe266c00bfa Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Fri, 23 Oct 2015 10:22:17 -0700 Subject: [PATCH 2/3] nomad: OutstandingReset returns specific errors --- nomad/eval_broker.go | 23 +++++++++++++++++++---- nomad/eval_broker_test.go | 20 ++++++++++++-------- 2 files changed, 31 insertions(+), 12 deletions(-) diff --git a/nomad/eval_broker.go b/nomad/eval_broker.go index 63eb01794e64..e736d1c34270 100644 --- a/nomad/eval_broker.go +++ b/nomad/eval_broker.go @@ -2,6 +2,7 @@ package nomad import ( "container/heap" + "errors" "fmt" "math/rand" "sync" @@ -18,6 +19,17 @@ const ( failedQueue = "_failed" ) +var ( + // ErrNotOutstanding is returned if an evaluation is not outstanding + ErrNotOutstanding = errors.New("evaluation is not outstanding") + + // ErrTokenMismatch is the outstanding eval has a different token + ErrTokenMismatch = errors.New("evaluation token does not match") + + // ErrNackTimeoutReached is returned if an expired evaluation is reset + ErrNackTimeoutReached = errors.New("evaluation nack timeout reached") +) + // EvalBroker is used to manage brokering of evaluations. When an evaluation is // created, due to a change in a job specification or a node, we put it into the // broker. The broker sorts by evaluations by priority and scheduler type. This @@ -383,17 +395,20 @@ func (b *EvalBroker) Outstanding(evalID string) (string, bool) { // OutstandingReset resets the Nack timer for the EvalID if the // token matches and the eval is outstanding -func (b *EvalBroker) OutstandingReset(evalID, token string) bool { +func (b *EvalBroker) OutstandingReset(evalID, token string) error { b.l.RLock() defer b.l.RUnlock() unack, ok := b.unack[evalID] if !ok { - return false + return ErrNotOutstanding } if unack.Token != token { - return false + return ErrTokenMismatch } - return unack.NackTimer.Reset(b.nackTimeout) + if !unack.NackTimer.Reset(b.nackTimeout) { + return ErrNackTimeoutReached + } + return nil } // Ack is used to positively acknowledge handling an evaluation diff --git a/nomad/eval_broker_test.go b/nomad/eval_broker_test.go index 6143125dfb94..b3b3ea2213ad 100644 --- a/nomad/eval_broker_test.go +++ b/nomad/eval_broker_test.go @@ -90,13 +90,17 @@ func TestEvalBroker_Enqueue_Dequeue_Nack_Ack(t *testing.T) { } // OutstandingReset should verify the token - reset := b.OutstandingReset(out.ID, "foo") - if reset { - t.Fatalf("bad") + err = b.OutstandingReset("nope", "foo") + if err != ErrNotOutstanding { + t.Fatalf("err: %v", err) + } + err = b.OutstandingReset(out.ID, "foo") + if err != ErrTokenMismatch { + t.Fatalf("err: %v", err) } - reset = b.OutstandingReset(out.ID, tokenOut) - if !reset { - t.Fatalf("bad") + err = b.OutstandingReset(out.ID, tokenOut) + if err != nil { + t.Fatalf("err: %v", err) } // Check the stats @@ -594,8 +598,8 @@ func TestEvalBroker_Nack_TimeoutReset(t *testing.T) { // Reset in 2 milliseconds time.Sleep(2 * time.Millisecond) - if reset := b.OutstandingReset(out.ID, token); !reset { - t.Fatalf("bad") + if err := b.OutstandingReset(out.ID, token); err != nil { + t.Fatalf("err: %v", err) } // Dequeue, should block on Nack timer From 2beae1a8444af35e5605cafb6b5a075f47234d7a Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Fri, 23 Oct 2015 10:22:44 -0700 Subject: [PATCH 3/3] nomad: use OutstandingReset in Eval endpoints and Plan apply --- nomad/eval_endpoint.go | 16 ++++------------ nomad/plan_apply.go | 15 ++++----------- 2 files changed, 8 insertions(+), 23 deletions(-) diff --git a/nomad/eval_endpoint.go b/nomad/eval_endpoint.go index 97fb160eacc2..0dce98a52d87 100644 --- a/nomad/eval_endpoint.go +++ b/nomad/eval_endpoint.go @@ -134,12 +134,8 @@ func (e *Eval) Update(args *structs.EvalUpdateRequest, eval := args.Evals[0] // Verify the evaluation is outstanding, and that the tokens match. - token, ok := e.srv.evalBroker.Outstanding(eval.ID) - if !ok { - return fmt.Errorf("evaluation is not outstanding") - } - if args.EvalToken != token { - return fmt.Errorf("evaluation token does not match") + if err := e.srv.evalBroker.OutstandingReset(eval.ID, args.EvalToken); err != nil { + return err } // Update via Raft @@ -168,12 +164,8 @@ func (e *Eval) Create(args *structs.EvalUpdateRequest, eval := args.Evals[0] // Verify the parent evaluation is outstanding, and that the tokens match. - token, ok := e.srv.evalBroker.Outstanding(eval.PreviousEval) - if !ok { - return fmt.Errorf("previous evaluation is not outstanding") - } - if args.EvalToken != token { - return fmt.Errorf("previous evaluation token does not match") + if err := e.srv.evalBroker.OutstandingReset(eval.PreviousEval, args.EvalToken); err != nil { + return err } // Look for the eval diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index fe7bb84a7a1e..acf32fbd3c26 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -50,17 +50,10 @@ func (s *Server) planApply() { } // Verify the evaluation is outstanding, and that the tokens match. - token, ok := s.evalBroker.Outstanding(pending.plan.EvalID) - if !ok { - s.logger.Printf("[ERR] nomad: plan received for non-outstanding evaluation %s", - pending.plan.EvalID) - pending.respond(nil, fmt.Errorf("evaluation is not outstanding")) - continue - } - if pending.plan.EvalToken != token { - s.logger.Printf("[ERR] nomad: plan received for evaluation %s with wrong token", - pending.plan.EvalID) - pending.respond(nil, fmt.Errorf("evaluation token does not match")) + if err := s.evalBroker.OutstandingReset(pending.plan.EvalID, pending.plan.EvalToken); err != nil { + s.logger.Printf("[ERR] nomad: plan rejected for evaluation %s: %v", + pending.plan.EvalID, err) + pending.respond(nil, err) continue }