From 86989706bc1404068359ff15bf2606f08f0e8162 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 18 May 2016 11:35:15 -0700 Subject: [PATCH 1/2] eval_broker.Enqueue no longer returns an error --- nomad/eval_broker.go | 8 ++-- nomad/eval_broker_test.go | 75 ++++++++----------------------------- nomad/eval_endpoint_test.go | 7 +--- nomad/fsm.go | 5 +-- nomad/leader.go | 4 +- nomad/leader_test.go | 7 +--- nomad/plan_endpoint_test.go | 8 +--- nomad/worker_test.go | 52 +++++-------------------- 8 files changed, 34 insertions(+), 132 deletions(-) diff --git a/nomad/eval_broker.go b/nomad/eval_broker.go index 6e5a3ed59913..5d1ed19f9f79 100644 --- a/nomad/eval_broker.go +++ b/nomad/eval_broker.go @@ -140,14 +140,13 @@ func (b *EvalBroker) EnqueueAll(evals []*structs.Evaluation) { } // Enqueue is used to enqueue an evaluation -// TODO: remove the error return value -func (b *EvalBroker) Enqueue(eval *structs.Evaluation) error { +func (b *EvalBroker) Enqueue(eval *structs.Evaluation) { b.l.Lock() defer b.l.Unlock() // Check if already enqueued if _, ok := b.evals[eval.ID]; ok { - return nil + return } else if b.enabled { b.evals[eval.ID] = 0 } @@ -159,11 +158,10 @@ func (b *EvalBroker) Enqueue(eval *structs.Evaluation) error { }) b.timeWait[eval.ID] = timer b.stats.TotalWaiting += 1 - return nil + return } b.enqueueLocked(eval, eval.Type) - return nil } // enqueueWaiting is used to enqueue a waiting evaluation diff --git a/nomad/eval_broker_test.go b/nomad/eval_broker_test.go index b81fbd944265..d2f083233e00 100644 --- a/nomad/eval_broker_test.go +++ b/nomad/eval_broker_test.go @@ -31,10 +31,7 @@ func TestEvalBroker_Enqueue_Dequeue_Nack_Ack(t *testing.T) { // Enqueue, but broker is disabled! eval := mock.Eval() - err := b.Enqueue(eval) - if err != nil { - t.Fatalf("err: %v", err) - } + b.Enqueue(eval) // Verify nothing was done stats := b.Stats() @@ -48,16 +45,10 @@ func TestEvalBroker_Enqueue_Dequeue_Nack_Ack(t *testing.T) { // Enable the broker, and enqueue b.SetEnabled(true) - err = b.Enqueue(eval) - if err != nil { - t.Fatalf("err: %v", err) - } + b.Enqueue(eval) // Double enqueue is a no-op - err = b.Enqueue(eval) - if err != nil { - t.Fatalf("err: %v", err) - } + b.Enqueue(eval) if !b.Enabled() { t.Fatalf("should be enabled") @@ -206,26 +197,17 @@ func TestEvalBroker_Serialize_DuplicateJobID(t *testing.T) { b.SetEnabled(true) eval := mock.Eval() - err := b.Enqueue(eval) - if err != nil { - t.Fatalf("err: %v", err) - } + b.Enqueue(eval) eval2 := mock.Eval() eval2.JobID = eval.JobID eval2.CreateIndex = eval.CreateIndex + 1 - err = b.Enqueue(eval2) - if err != nil { - t.Fatalf("err: %v", err) - } + b.Enqueue(eval2) eval3 := mock.Eval() eval3.JobID = eval.JobID eval3.CreateIndex = eval.CreateIndex + 2 - err = b.Enqueue(eval3) - if err != nil { - t.Fatalf("err: %v", err) - } + b.Enqueue(eval3) stats := b.Stats() if stats.TotalReady != 1 { @@ -359,10 +341,7 @@ func TestEvalBroker_Enqueue_Disable(t *testing.T) { // Enqueue eval := mock.Eval() b.SetEnabled(true) - err := b.Enqueue(eval) - if err != nil { - t.Fatalf("err: %v", err) - } + b.Enqueue(eval) // Flush via SetEnabled b.SetEnabled(false) @@ -425,10 +404,7 @@ func TestEvalBroker_Dequeue_Empty_Timeout(t *testing.T) { // Enqueue to unblock the dequeue. eval := mock.Eval() - err := b.Enqueue(eval) - if err != nil { - t.Fatalf("err: %v", err) - } + b.Enqueue(eval) select { case <-doneCh: @@ -558,10 +534,7 @@ func TestEvalBroker_Dequeue_Blocked(t *testing.T) { // Enqueue eval := mock.Eval() - err := b.Enqueue(eval) - if err != nil { - t.Fatalf("err: %v", err) - } + b.Enqueue(eval) // Ensure dequeue select { @@ -581,10 +554,7 @@ func TestEvalBroker_Nack_Timeout(t *testing.T) { // Enqueue eval := mock.Eval() - err := b.Enqueue(eval) - if err != nil { - t.Fatalf("err: %v", err) - } + b.Enqueue(eval) // Dequeue out, _, err := b.Dequeue(defaultSched, time.Second) @@ -619,10 +589,7 @@ func TestEvalBroker_Nack_TimeoutReset(t *testing.T) { // Enqueue eval := mock.Eval() - err := b.Enqueue(eval) - if err != nil { - t.Fatalf("err: %v", err) - } + b.Enqueue(eval) // Dequeue out, token, err := b.Dequeue(defaultSched, time.Second) @@ -662,10 +629,7 @@ func TestEvalBroker_PauseResumeNackTimeout(t *testing.T) { // Enqueue eval := mock.Eval() - err := b.Enqueue(eval) - if err != nil { - t.Fatalf("err: %v", err) - } + b.Enqueue(eval) // Dequeue out, token, err := b.Dequeue(defaultSched, time.Second) @@ -711,10 +675,7 @@ func TestEvalBroker_DeliveryLimit(t *testing.T) { b.SetEnabled(true) eval := mock.Eval() - err := b.Enqueue(eval) - if err != nil { - t.Fatalf("err: %v", err) - } + b.Enqueue(eval) for i := 0; i < 3; i++ { // Dequeue should work @@ -803,10 +764,7 @@ func TestEvalBroker_AckAtDeliveryLimit(t *testing.T) { b.SetEnabled(true) eval := mock.Eval() - err := b.Enqueue(eval) - if err != nil { - t.Fatalf("err: %v", err) - } + b.Enqueue(eval) for i := 0; i < 3; i++ { // Dequeue should work @@ -850,10 +808,7 @@ func TestEvalBroker_Wait(t *testing.T) { // Create an eval that should wait eval := mock.Eval() eval.Wait = 10 * time.Millisecond - err := b.Enqueue(eval) - if err != nil { - t.Fatalf("err: %v", err) - } + b.Enqueue(eval) // Verify waiting stats := b.Stats() diff --git a/nomad/eval_endpoint_test.go b/nomad/eval_endpoint_test.go index 3d59f33ed10a..f037c8bd3a0b 100644 --- a/nomad/eval_endpoint_test.go +++ b/nomad/eval_endpoint_test.go @@ -138,12 +138,7 @@ func TestEvalEndpoint_Dequeue(t *testing.T) { // Create the register request eval1 := mock.Eval() - testutil.WaitForResult(func() (bool, error) { - err := s1.evalBroker.Enqueue(eval1) - return err == nil, err - }, func(err error) { - t.Fatalf("err: %v", err) - }) + s1.evalBroker.Enqueue(eval1) // Dequeue the eval get := &structs.EvalDequeueRequest{ diff --git a/nomad/fsm.go b/nomad/fsm.go index a22fc08c167b..6237e976004a 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -331,10 +331,7 @@ func (n *nomadFSM) applyUpdateEval(buf []byte, index uint64) interface{} { for _, eval := range req.Evals { if eval.ShouldEnqueue() { - if err := n.evalBroker.Enqueue(eval); err != nil { - n.logger.Printf("[ERR] nomad.fsm: failed to enqueue evaluation %s: %v", eval.ID, err) - return err - } + n.evalBroker.Enqueue(eval) } else if eval.ShouldBlock() { n.blockedEvals.Block(eval) } diff --git a/nomad/leader.go b/nomad/leader.go index 3bf6ea4a6825..04f5f1432d56 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -178,9 +178,7 @@ func (s *Server) restoreEvals() error { eval := raw.(*structs.Evaluation) if eval.ShouldEnqueue() { - if err := s.evalBroker.Enqueue(eval); err != nil { - return fmt.Errorf("failed to enqueue evaluation %s: %v", eval.ID, err) - } + s.evalBroker.Enqueue(eval) } else if eval.ShouldBlock() { s.blockedEvals.Block(eval) } diff --git a/nomad/leader_test.go b/nomad/leader_test.go index 57b63aba71c6..b16f714a5b55 100644 --- a/nomad/leader_test.go +++ b/nomad/leader_test.go @@ -496,12 +496,7 @@ func TestLeader_ReapFailedEval(t *testing.T) { // Wait for a periodic dispatch eval := mock.Eval() - testutil.WaitForResult(func() (bool, error) { - err := s1.evalBroker.Enqueue(eval) - return err == nil, err - }, func(err error) { - t.Fatalf("err: %v", err) - }) + s1.evalBroker.Enqueue(eval) // Dequeue and Nack out, token, err := s1.evalBroker.Dequeue(defaultSched, time.Second) diff --git a/nomad/plan_endpoint_test.go b/nomad/plan_endpoint_test.go index d18ca24c5871..9e0054d09194 100644 --- a/nomad/plan_endpoint_test.go +++ b/nomad/plan_endpoint_test.go @@ -20,12 +20,8 @@ func TestPlanEndpoint_Submit(t *testing.T) { // Create the register request eval1 := mock.Eval() - testutil.WaitForResult(func() (bool, error) { - err := s1.evalBroker.Enqueue(eval1) - return err == nil, err - }, func(err error) { - t.Fatalf("err: %v", err) - }) + s1.evalBroker.Enqueue(eval1) + evalOut, token, err := s1.evalBroker.Dequeue([]string{eval1.Type}, time.Second) if err != nil { t.Fatalf("err: %v", err) diff --git a/nomad/worker_test.go b/nomad/worker_test.go index a56dfefe181f..c1663592ae82 100644 --- a/nomad/worker_test.go +++ b/nomad/worker_test.go @@ -52,12 +52,7 @@ func TestWorker_dequeueEvaluation(t *testing.T) { // Create the evaluation eval1 := mock.Eval() - testutil.WaitForResult(func() (bool, error) { - err := s1.evalBroker.Enqueue(eval1) - return err == nil, err - }, func(err error) { - t.Fatalf("err: %v", err) - }) + s1.evalBroker.Enqueue(eval1) // Create a worker w := &Worker{srv: s1, logger: s1.logger} @@ -87,12 +82,7 @@ func TestWorker_dequeueEvaluation_paused(t *testing.T) { // Create the evaluation eval1 := mock.Eval() - testutil.WaitForResult(func() (bool, error) { - err := s1.evalBroker.Enqueue(eval1) - return err == nil, err - }, func(err error) { - t.Fatalf("err: %v", err) - }) + s1.evalBroker.Enqueue(eval1) // Create a worker w := &Worker{srv: s1, logger: s1.logger} @@ -163,12 +153,7 @@ func TestWorker_sendAck(t *testing.T) { // Create the evaluation eval1 := mock.Eval() - testutil.WaitForResult(func() (bool, error) { - err := s1.evalBroker.Enqueue(eval1) - return err == nil, err - }, func(err error) { - t.Fatalf("err: %v", err) - }) + s1.evalBroker.Enqueue(eval1) // Create a worker w := &Worker{srv: s1, logger: s1.logger} @@ -266,12 +251,8 @@ func TestWorker_SubmitPlan(t *testing.T) { // Create the register request eval1 := mock.Eval() - testutil.WaitForResult(func() (bool, error) { - err := s1.evalBroker.Enqueue(eval1) - return err == nil, err - }, func(err error) { - t.Fatalf("err: %v", err) - }) + s1.evalBroker.Enqueue(eval1) + evalOut, token, err := s1.evalBroker.Dequeue([]string{eval1.Type}, time.Second) if err != nil { t.Fatalf("err: %v", err) @@ -328,12 +309,8 @@ func TestWorker_SubmitPlan_MissingNodeRefresh(t *testing.T) { // Create the register request eval1 := mock.Eval() - testutil.WaitForResult(func() (bool, error) { - err := s1.evalBroker.Enqueue(eval1) - return err == nil, err - }, func(err error) { - t.Fatalf("err: %v", err) - }) + s1.evalBroker.Enqueue(eval1) + evalOut, token, err := s1.evalBroker.Dequeue([]string{eval1.Type}, time.Second) if err != nil { t.Fatalf("err: %v", err) @@ -395,12 +372,7 @@ func TestWorker_UpdateEval(t *testing.T) { // Create the register request eval1 := mock.Eval() - testutil.WaitForResult(func() (bool, error) { - err := s1.evalBroker.Enqueue(eval1) - return err == nil, err - }, func(err error) { - t.Fatalf("err: %v", err) - }) + s1.evalBroker.Enqueue(eval1) evalOut, token, err := s1.evalBroker.Dequeue([]string{eval1.Type}, time.Second) if err != nil { t.Fatalf("err: %v", err) @@ -442,12 +414,8 @@ func TestWorker_CreateEval(t *testing.T) { // Create the register request eval1 := mock.Eval() - testutil.WaitForResult(func() (bool, error) { - err := s1.evalBroker.Enqueue(eval1) - return err == nil, err - }, func(err error) { - t.Fatalf("err: %v", err) - }) + s1.evalBroker.Enqueue(eval1) + evalOut, token, err := s1.evalBroker.Dequeue([]string{eval1.Type}, time.Second) if err != nil { t.Fatalf("err: %v", err) From ab21a76f8b6c387d595c1d3d809b1067407fbe5b Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 18 May 2016 12:13:59 -0700 Subject: [PATCH 2/2] EnqueueAll inserts all evaluations before unblocking dequeue calls --- nomad/eval_broker.go | 29 +++++++++++++++---------- nomad/eval_broker_test.go | 45 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 63 insertions(+), 11 deletions(-) diff --git a/nomad/eval_broker.go b/nomad/eval_broker.go index 5d1ed19f9f79..feeb093eeced 100644 --- a/nomad/eval_broker.go +++ b/nomad/eval_broker.go @@ -129,21 +129,21 @@ func (b *EvalBroker) SetEnabled(enabled bool) { } // EnqueueAll is used to enqueue many evaluations. -// TODO: Update enqueueLocked to take a list and use heap.Fix instead of -// heap.Push in order to make the running time O(log(n+m)) instead of -// O(m*log(n)) where m is the size of the evals and n is the size of the -// existing heap. func (b *EvalBroker) EnqueueAll(evals []*structs.Evaluation) { - for _, e := range evals { - b.Enqueue(e) - } -} - -// Enqueue is used to enqueue an evaluation -func (b *EvalBroker) Enqueue(eval *structs.Evaluation) { + // The lock needs to be held until all evaluations are enqueued. This is so + // that when Dequeue operations are unblocked they will pick the highest + // priority evaluations. b.l.Lock() defer b.l.Unlock() + for _, eval := range evals { + b.processEnqueue(eval) + } +} +// processEnqueue deduplicates evals and either enqueue immediately +// or enforce the evals wait time. processEnqueue must be called with the lock +// held. +func (b *EvalBroker) processEnqueue(eval *structs.Evaluation) { // Check if already enqueued if _, ok := b.evals[eval.ID]; ok { return @@ -164,6 +164,13 @@ func (b *EvalBroker) Enqueue(eval *structs.Evaluation) { b.enqueueLocked(eval, eval.Type) } +// Enqueue is used to enqueue an evaluation +func (b *EvalBroker) Enqueue(eval *structs.Evaluation) { + b.l.Lock() + defer b.l.Unlock() + b.processEnqueue(eval) +} + // enqueueWaiting is used to enqueue a waiting evaluation func (b *EvalBroker) enqueueWaiting(eval *structs.Evaluation) { b.l.Lock() diff --git a/nomad/eval_broker_test.go b/nomad/eval_broker_test.go index d2f083233e00..aa9c2f18b701 100644 --- a/nomad/eval_broker_test.go +++ b/nomad/eval_broker_test.go @@ -840,3 +840,48 @@ func TestEvalBroker_Wait(t *testing.T) { t.Fatalf("bad : %#v", out) } } + +// Ensure that priority is taken into account when enqueueing many evaluations. +func TestEvalBroker_EnqueueAll_Dequeue_Fair(t *testing.T) { + b := testBroker(t, 0) + b.SetEnabled(true) + + // Start with a blocked dequeue + outCh := make(chan *structs.Evaluation, 1) + go func() { + start := time.Now() + out, _, err := b.Dequeue(defaultSched, time.Second) + end := time.Now() + outCh <- out + if err != nil { + t.Fatalf("err: %v", err) + } + if d := end.Sub(start); d < 5*time.Millisecond { + t.Fatalf("bad: %v", d) + } + }() + + // Wait for a bit + time.Sleep(5 * time.Millisecond) + + // Enqueue + evals := make([]*structs.Evaluation, 0, 8) + expectedPriority := 90 + for i := 10; i <= expectedPriority; i += 10 { + eval := mock.Eval() + eval.Priority = i + evals = append(evals, eval) + + } + b.EnqueueAll(evals) + + // Ensure dequeue + select { + case out := <-outCh: + if out.Priority != expectedPriority { + t.Fatalf("bad: %v", out) + } + case <-time.After(time.Second): + t.Fatalf("timeout") + } +}