Skip to content

Commit

Permalink
Merge pull request #1183 from hashicorp/f-atomic-eval-enqueue
Browse files Browse the repository at this point in the history
EnqueueAll ensures all evaluations are enqueued before unblocking Dequeue calls
  • Loading branch information
dadgar committed May 19, 2016
2 parents 6b0757c + ab21a76 commit b667912
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 142 deletions.
35 changes: 20 additions & 15 deletions nomad/eval_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,25 +129,24 @@ func (b *EvalBroker) SetEnabled(enabled bool) {
}

// EnqueueAll is used to enqueue many evaluations.
// TODO: Update enqueueLocked to take a list and use heap.Fix instead of
// heap.Push in order to make the running time O(log(n+m)) instead of
// O(m*log(n)) where m is the size of the evals and n is the size of the
// existing heap.
func (b *EvalBroker) EnqueueAll(evals []*structs.Evaluation) {
for _, e := range evals {
b.Enqueue(e)
}
}

// Enqueue is used to enqueue an evaluation
// TODO: remove the error return value
func (b *EvalBroker) Enqueue(eval *structs.Evaluation) error {
// The lock needs to be held until all evaluations are enqueued. This is so
// that when Dequeue operations are unblocked they will pick the highest
// priority evaluations.
b.l.Lock()
defer b.l.Unlock()
for _, eval := range evals {
b.processEnqueue(eval)
}
}

// processEnqueue deduplicates evals and either enqueue immediately
// or enforce the evals wait time. processEnqueue must be called with the lock
// held.
func (b *EvalBroker) processEnqueue(eval *structs.Evaluation) {
// Check if already enqueued
if _, ok := b.evals[eval.ID]; ok {
return nil
return
} else if b.enabled {
b.evals[eval.ID] = 0
}
Expand All @@ -159,11 +158,17 @@ func (b *EvalBroker) Enqueue(eval *structs.Evaluation) error {
})
b.timeWait[eval.ID] = timer
b.stats.TotalWaiting += 1
return nil
return
}

b.enqueueLocked(eval, eval.Type)
return nil
}

// Enqueue is used to enqueue an evaluation
func (b *EvalBroker) Enqueue(eval *structs.Evaluation) {
b.l.Lock()
defer b.l.Unlock()
b.processEnqueue(eval)
}

// enqueueWaiting is used to enqueue a waiting evaluation
Expand Down
120 changes: 60 additions & 60 deletions nomad/eval_broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,7 @@ func TestEvalBroker_Enqueue_Dequeue_Nack_Ack(t *testing.T) {

// Enqueue, but broker is disabled!
eval := mock.Eval()
err := b.Enqueue(eval)
if err != nil {
t.Fatalf("err: %v", err)
}
b.Enqueue(eval)

// Verify nothing was done
stats := b.Stats()
Expand All @@ -48,16 +45,10 @@ func TestEvalBroker_Enqueue_Dequeue_Nack_Ack(t *testing.T) {

// Enable the broker, and enqueue
b.SetEnabled(true)
err = b.Enqueue(eval)
if err != nil {
t.Fatalf("err: %v", err)
}
b.Enqueue(eval)

// Double enqueue is a no-op
err = b.Enqueue(eval)
if err != nil {
t.Fatalf("err: %v", err)
}
b.Enqueue(eval)

if !b.Enabled() {
t.Fatalf("should be enabled")
Expand Down Expand Up @@ -206,26 +197,17 @@ func TestEvalBroker_Serialize_DuplicateJobID(t *testing.T) {
b.SetEnabled(true)

eval := mock.Eval()
err := b.Enqueue(eval)
if err != nil {
t.Fatalf("err: %v", err)
}
b.Enqueue(eval)

eval2 := mock.Eval()
eval2.JobID = eval.JobID
eval2.CreateIndex = eval.CreateIndex + 1
err = b.Enqueue(eval2)
if err != nil {
t.Fatalf("err: %v", err)
}
b.Enqueue(eval2)

eval3 := mock.Eval()
eval3.JobID = eval.JobID
eval3.CreateIndex = eval.CreateIndex + 2
err = b.Enqueue(eval3)
if err != nil {
t.Fatalf("err: %v", err)
}
b.Enqueue(eval3)

stats := b.Stats()
if stats.TotalReady != 1 {
Expand Down Expand Up @@ -359,10 +341,7 @@ func TestEvalBroker_Enqueue_Disable(t *testing.T) {
// Enqueue
eval := mock.Eval()
b.SetEnabled(true)
err := b.Enqueue(eval)
if err != nil {
t.Fatalf("err: %v", err)
}
b.Enqueue(eval)

// Flush via SetEnabled
b.SetEnabled(false)
Expand Down Expand Up @@ -425,10 +404,7 @@ func TestEvalBroker_Dequeue_Empty_Timeout(t *testing.T) {

// Enqueue to unblock the dequeue.
eval := mock.Eval()
err := b.Enqueue(eval)
if err != nil {
t.Fatalf("err: %v", err)
}
b.Enqueue(eval)

select {
case <-doneCh:
Expand Down Expand Up @@ -558,10 +534,7 @@ func TestEvalBroker_Dequeue_Blocked(t *testing.T) {

// Enqueue
eval := mock.Eval()
err := b.Enqueue(eval)
if err != nil {
t.Fatalf("err: %v", err)
}
b.Enqueue(eval)

// Ensure dequeue
select {
Expand All @@ -581,10 +554,7 @@ func TestEvalBroker_Nack_Timeout(t *testing.T) {

// Enqueue
eval := mock.Eval()
err := b.Enqueue(eval)
if err != nil {
t.Fatalf("err: %v", err)
}
b.Enqueue(eval)

// Dequeue
out, _, err := b.Dequeue(defaultSched, time.Second)
Expand Down Expand Up @@ -619,10 +589,7 @@ func TestEvalBroker_Nack_TimeoutReset(t *testing.T) {

// Enqueue
eval := mock.Eval()
err := b.Enqueue(eval)
if err != nil {
t.Fatalf("err: %v", err)
}
b.Enqueue(eval)

// Dequeue
out, token, err := b.Dequeue(defaultSched, time.Second)
Expand Down Expand Up @@ -662,10 +629,7 @@ func TestEvalBroker_PauseResumeNackTimeout(t *testing.T) {

// Enqueue
eval := mock.Eval()
err := b.Enqueue(eval)
if err != nil {
t.Fatalf("err: %v", err)
}
b.Enqueue(eval)

// Dequeue
out, token, err := b.Dequeue(defaultSched, time.Second)
Expand Down Expand Up @@ -711,10 +675,7 @@ func TestEvalBroker_DeliveryLimit(t *testing.T) {
b.SetEnabled(true)

eval := mock.Eval()
err := b.Enqueue(eval)
if err != nil {
t.Fatalf("err: %v", err)
}
b.Enqueue(eval)

for i := 0; i < 3; i++ {
// Dequeue should work
Expand Down Expand Up @@ -803,10 +764,7 @@ func TestEvalBroker_AckAtDeliveryLimit(t *testing.T) {
b.SetEnabled(true)

eval := mock.Eval()
err := b.Enqueue(eval)
if err != nil {
t.Fatalf("err: %v", err)
}
b.Enqueue(eval)

for i := 0; i < 3; i++ {
// Dequeue should work
Expand Down Expand Up @@ -850,10 +808,7 @@ func TestEvalBroker_Wait(t *testing.T) {
// Create an eval that should wait
eval := mock.Eval()
eval.Wait = 10 * time.Millisecond
err := b.Enqueue(eval)
if err != nil {
t.Fatalf("err: %v", err)
}
b.Enqueue(eval)

// Verify waiting
stats := b.Stats()
Expand Down Expand Up @@ -885,3 +840,48 @@ func TestEvalBroker_Wait(t *testing.T) {
t.Fatalf("bad : %#v", out)
}
}

// Ensure that priority is taken into account when enqueueing many evaluations.
func TestEvalBroker_EnqueueAll_Dequeue_Fair(t *testing.T) {
b := testBroker(t, 0)
b.SetEnabled(true)

// Start with a blocked dequeue
outCh := make(chan *structs.Evaluation, 1)
go func() {
start := time.Now()
out, _, err := b.Dequeue(defaultSched, time.Second)
end := time.Now()
outCh <- out
if err != nil {
t.Fatalf("err: %v", err)
}
if d := end.Sub(start); d < 5*time.Millisecond {
t.Fatalf("bad: %v", d)
}
}()

// Wait for a bit
time.Sleep(5 * time.Millisecond)

// Enqueue
evals := make([]*structs.Evaluation, 0, 8)
expectedPriority := 90
for i := 10; i <= expectedPriority; i += 10 {
eval := mock.Eval()
eval.Priority = i
evals = append(evals, eval)

}
b.EnqueueAll(evals)

// Ensure dequeue
select {
case out := <-outCh:
if out.Priority != expectedPriority {
t.Fatalf("bad: %v", out)
}
case <-time.After(time.Second):
t.Fatalf("timeout")
}
}
7 changes: 1 addition & 6 deletions nomad/eval_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,7 @@ func TestEvalEndpoint_Dequeue(t *testing.T) {

// Create the register request
eval1 := mock.Eval()
testutil.WaitForResult(func() (bool, error) {
err := s1.evalBroker.Enqueue(eval1)
return err == nil, err
}, func(err error) {
t.Fatalf("err: %v", err)
})
s1.evalBroker.Enqueue(eval1)

// Dequeue the eval
get := &structs.EvalDequeueRequest{
Expand Down
5 changes: 1 addition & 4 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,10 +331,7 @@ func (n *nomadFSM) applyUpdateEval(buf []byte, index uint64) interface{} {

for _, eval := range req.Evals {
if eval.ShouldEnqueue() {
if err := n.evalBroker.Enqueue(eval); err != nil {
n.logger.Printf("[ERR] nomad.fsm: failed to enqueue evaluation %s: %v", eval.ID, err)
return err
}
n.evalBroker.Enqueue(eval)
} else if eval.ShouldBlock() {
n.blockedEvals.Block(eval)
}
Expand Down
4 changes: 1 addition & 3 deletions nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,7 @@ func (s *Server) restoreEvals() error {
eval := raw.(*structs.Evaluation)

if eval.ShouldEnqueue() {
if err := s.evalBroker.Enqueue(eval); err != nil {
return fmt.Errorf("failed to enqueue evaluation %s: %v", eval.ID, err)
}
s.evalBroker.Enqueue(eval)
} else if eval.ShouldBlock() {
s.blockedEvals.Block(eval)
}
Expand Down
7 changes: 1 addition & 6 deletions nomad/leader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,12 +496,7 @@ func TestLeader_ReapFailedEval(t *testing.T) {

// Wait for a periodic dispatch
eval := mock.Eval()
testutil.WaitForResult(func() (bool, error) {
err := s1.evalBroker.Enqueue(eval)
return err == nil, err
}, func(err error) {
t.Fatalf("err: %v", err)
})
s1.evalBroker.Enqueue(eval)

// Dequeue and Nack
out, token, err := s1.evalBroker.Dequeue(defaultSched, time.Second)
Expand Down
8 changes: 2 additions & 6 deletions nomad/plan_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,8 @@ func TestPlanEndpoint_Submit(t *testing.T) {

// Create the register request
eval1 := mock.Eval()
testutil.WaitForResult(func() (bool, error) {
err := s1.evalBroker.Enqueue(eval1)
return err == nil, err
}, func(err error) {
t.Fatalf("err: %v", err)
})
s1.evalBroker.Enqueue(eval1)

evalOut, token, err := s1.evalBroker.Dequeue([]string{eval1.Type}, time.Second)
if err != nil {
t.Fatalf("err: %v", err)
Expand Down
Loading

0 comments on commit b667912

Please sign in to comment.