Skip to content

Commit

Permalink
Merge pull request #3051 from hashicorp/b-batcher
Browse files Browse the repository at this point in the history
Fix race creating EvalFuture
  • Loading branch information
dadgar committed Aug 17, 2017
2 parents 15e25f1 + 6f1506c commit 7750c6b
Showing 1 changed file with 24 additions and 26 deletions.
50 changes: 24 additions & 26 deletions nomad/deploymentwatcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package deploymentwatcher

import (
"context"
"sync"
"time"

"github.com/hashicorp/nomad/nomad/structs"
Expand All @@ -16,27 +15,22 @@ type EvalBatcher struct {
// raft is used to actually commit the evaluations
raft DeploymentRaftEndpoints

// future to be returned to callers
f *EvalFuture

// inCh is used to pass evaluations to the daemon process
inCh chan *structs.Evaluation
// workCh is used to pass evaluations to the daemon process
workCh chan *evalWrapper

// ctx is used to exit the daemon batcher
ctx context.Context

l sync.Mutex
}

// NewEvalBatcher returns an EvalBatcher that uses the passed raft endpoints to
// create the evaluations and exits the batcher when the passed exit channel is
// closed.
func NewEvalBatcher(batchDuration time.Duration, raft DeploymentRaftEndpoints, ctx context.Context) *EvalBatcher {
b := &EvalBatcher{
batch: batchDuration,
raft: raft,
ctx: ctx,
inCh: make(chan *structs.Evaluation, 10),
batch: batchDuration,
raft: raft,
ctx: ctx,
workCh: make(chan *evalWrapper, 10),
}

go b.batcher()
Expand All @@ -46,36 +40,41 @@ func NewEvalBatcher(batchDuration time.Duration, raft DeploymentRaftEndpoints, c
// CreateEval batches the creation of the evaluation and returns a future that
// tracks the evaluations creation.
func (b *EvalBatcher) CreateEval(e *structs.Evaluation) *EvalFuture {
b.l.Lock()
if b.f == nil {
b.f = NewEvalFuture()
wrapper := &evalWrapper{
e: e,
f: make(chan *EvalFuture, 1),
}
b.l.Unlock()

b.inCh <- e
return b.f
b.workCh <- wrapper
return <-wrapper.f
}

type evalWrapper struct {
e *structs.Evaluation
f chan *EvalFuture
}

// batcher is the long lived batcher goroutine
func (b *EvalBatcher) batcher() {
var timerCh <-chan time.Time
evals := make(map[string]*structs.Evaluation)
future := NewEvalFuture()
for {
select {
case <-b.ctx.Done():
return
case e := <-b.inCh:
case w := <-b.workCh:
if timerCh == nil {
timerCh = time.After(b.batch)
}

evals[e.DeploymentID] = e
// Store the eval and attach the future
evals[w.e.DeploymentID] = w.e
w.f <- future
case <-timerCh:
// Capture the future
b.l.Lock()
f := b.f
b.f = nil
b.l.Unlock()
// Capture the future and create a new one
f := future
future = NewEvalFuture()

// Shouldn't be possible
if f == nil {
Expand All @@ -94,7 +93,6 @@ func (b *EvalBatcher) batcher() {
// Reset the evals list and timer
evals = make(map[string]*structs.Evaluation)
timerCh = nil

}
}
}
Expand Down

0 comments on commit 7750c6b

Please sign in to comment.