Skip to content

Commit

Permalink
Merge pull request #1199 from hashicorp/f-lighter-blocked-evals
Browse files Browse the repository at this point in the history
Reuse blocked evaluations and handle unblock events that occurred during scheduling
  • Loading branch information
dadgar committed May 25, 2016
2 parents 20f4042 + 230b663 commit 094a090
Show file tree
Hide file tree
Showing 15 changed files with 815 additions and 26 deletions.
119 changes: 110 additions & 9 deletions nomad/blocked_evals.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,21 @@ type BlockedEvals struct {
escaped map[string]*structs.Evaluation

// unblockCh is used to buffer unblocking of evaluations.
capacityChangeCh chan string
capacityChangeCh chan *capacityUpdate

// jobs is the map of blocked job and is used to ensure that only one
// blocked eval exists for each job.
jobs map[string]struct{}

// unblockIndexes maps computed node classes to the index in which they were
// unblocked. This is used to check if an evaluation could have been
// unblocked between the time they were in the scheduler and the time they
// are being blocked.
unblockIndexes map[string]uint64

// duplicates is the set of evaluations for jobs that had pre-existing
// blocked evaluations. These should be marked as cancelled since only one
// blocked eval is neeeded bper job.
// blocked eval is neeeded per job.
duplicates []*structs.Evaluation

// duplicateCh is used to signal that a duplicate eval was added to the
Expand All @@ -55,6 +61,12 @@ type BlockedEvals struct {
stopCh chan struct{}
}

// capacityUpdate stores unblock data.
type capacityUpdate struct {
computedClass string
index uint64
}

// BlockedStats returns all the stats about the blocked eval tracker.
type BlockedStats struct {
// TotalEscaped is the total number of blocked evaluations that have escaped
Expand All @@ -73,7 +85,8 @@ func NewBlockedEvals(evalBroker *EvalBroker) *BlockedEvals {
captured: make(map[string]*structs.Evaluation),
escaped: make(map[string]*structs.Evaluation),
jobs: make(map[string]struct{}),
capacityChangeCh: make(chan string, unblockBuffer),
unblockIndexes: make(map[string]uint64),
capacityChangeCh: make(chan *capacityUpdate, unblockBuffer),
duplicateCh: make(chan struct{}, 1),
stopCh: make(chan struct{}),
stats: new(BlockedStats),
Expand Down Expand Up @@ -133,6 +146,16 @@ func (b *BlockedEvals) Block(eval *structs.Evaluation) {
return
}

// Check if the eval missed an unblock while it was in the scheduler at an
// older index. The scheduler could have been invoked with a snapshot of
// state that was prior to additional capacity being added or allocations
// becoming terminal.
if b.missedUnblock(eval) {
// Just re-enqueue the eval immediately
b.evalBroker.Enqueue(eval)
return
}

// Mark the job as tracked.
b.stats.TotalBlocked++
b.jobs[eval.JobID] = struct{}{}
Expand All @@ -152,16 +175,65 @@ func (b *BlockedEvals) Block(eval *structs.Evaluation) {
b.captured[eval.ID] = eval
}

// missedUnblock returns whether an evaluation missed an unblock while it was in
// the scheduler. Since the scheduler can operate at an index in the past, the
// evaluation may have been processed missing data that would allow it to
// complete. This method returns if that is the case and should be called with
// the lock held.
func (b *BlockedEvals) missedUnblock(eval *structs.Evaluation) bool {
var max uint64 = 0
for class, index := range b.unblockIndexes {
// Calculate the max unblock index
if max < index {
max = index
}

elig, ok := eval.ClassEligibility[class]
if !ok {
// The evaluation was processed and did not encounter this class.
// Thus for correctness we need to unblock it.
return true
}

// The evaluation could use the computed node class and the eval was
// processed before the last unblock.
if elig && eval.SnapshotIndex < index {
return true
}
}

// If the evaluation has escaped, and the map contains an index older than
// the evaluations, it should be unblocked.
if eval.EscapedComputedClass && eval.SnapshotIndex < max {
return true
}

// The evaluation is ahead of all recent unblocks.
return false
}

// Unblock causes any evaluation that could potentially make progress on a
// capacity change on the passed computed node class to be enqueued into the
// eval broker.
func (b *BlockedEvals) Unblock(computedClass string) {
func (b *BlockedEvals) Unblock(computedClass string, index uint64) {
b.l.Lock()

// Do nothing if not enabled
if !b.enabled {
b.l.Unlock()
return
}

b.capacityChangeCh <- computedClass
// Store the index in which the unblock happened. We use this on subsequent
// block calls in case the evaluation was in the scheduler when a trigger
// occured.
b.unblockIndexes[computedClass] = index
b.l.Unlock()

b.capacityChangeCh <- &capacityUpdate{
computedClass: computedClass,
index: index,
}
}

// watchCapacity is a long lived function that watches for capacity changes in
Expand All @@ -171,15 +243,15 @@ func (b *BlockedEvals) watchCapacity() {
select {
case <-b.stopCh:
return
case computedClass := <-b.capacityChangeCh:
b.unblock(computedClass)
case update := <-b.capacityChangeCh:
b.unblock(update.computedClass, update.index)
}
}
}

// unblock unblocks all blocked evals that could run on the passed computed node
// class.
func (b *BlockedEvals) unblock(computedClass string) {
func (b *BlockedEvals) unblock(computedClass string, index uint64) {
b.l.Lock()
defer b.l.Unlock()

Expand Down Expand Up @@ -229,6 +301,35 @@ func (b *BlockedEvals) unblock(computedClass string) {
}
}

// UnblockFailed unblocks all blocked evaluation that were due to scheduler
// failure.
func (b *BlockedEvals) UnblockFailed() {
b.l.Lock()
defer b.l.Unlock()

// Do nothing if not enabled
if !b.enabled {
return
}

var unblock []*structs.Evaluation
for id, eval := range b.captured {
if eval.TriggeredBy == structs.EvalTriggerMaxPlans {
unblock = append(unblock, eval)
delete(b.captured, id)
}
}

for id, eval := range b.escaped {
if eval.TriggeredBy == structs.EvalTriggerMaxPlans {
unblock = append(unblock, eval)
delete(b.escaped, id)
}
}

b.evalBroker.EnqueueAll(unblock)
}

// GetDuplicates returns all the duplicate evaluations and blocks until the
// passed timeout.
func (b *BlockedEvals) GetDuplicates(timeout time.Duration) []*structs.Evaluation {
Expand Down Expand Up @@ -273,7 +374,7 @@ func (b *BlockedEvals) Flush() {
b.escaped = make(map[string]*structs.Evaluation)
b.jobs = make(map[string]struct{})
b.duplicates = nil
b.capacityChangeCh = make(chan string, unblockBuffer)
b.capacityChangeCh = make(chan *capacityUpdate, unblockBuffer)
b.stopCh = make(chan struct{})
b.duplicateCh = make(chan struct{}, 1)
}
Expand Down
165 changes: 161 additions & 4 deletions nomad/blocked_evals_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,27 @@ func TestBlockedEvals_Block_SameJob(t *testing.T) {
}
}

func TestBlockedEvals_Block_PriorUnblocks(t *testing.T) {
blocked, _ := testBlockedEvals(t)

// Do unblocks prior to blocking
blocked.Unblock("v1:123", 1000)
blocked.Unblock("v1:123", 1001)

// Create two blocked evals and add them to the blocked tracker.
e := mock.Eval()
e.Status = structs.EvalStatusBlocked
e.ClassEligibility = map[string]bool{"v1:123": false, "v1:456": false}
e.SnapshotIndex = 999
blocked.Block(e)

// Verify block did track both
bStats := blocked.Stats()
if bStats.TotalBlocked != 1 || bStats.TotalEscaped != 0 {
t.Fatalf("bad: %#v", bStats)
}
}

func TestBlockedEvals_GetDuplicates(t *testing.T) {
blocked, _ := testBlockedEvals(t)

Expand Down Expand Up @@ -105,7 +126,7 @@ func TestBlockedEvals_UnblockEscaped(t *testing.T) {
t.Fatalf("bad: %#v", bStats)
}

blocked.Unblock("v1:123")
blocked.Unblock("v1:123", 1000)

testutil.WaitForResult(func() (bool, error) {
// Verify Unblock caused an enqueue
Expand Down Expand Up @@ -141,7 +162,7 @@ func TestBlockedEvals_UnblockEligible(t *testing.T) {
t.Fatalf("bad: %#v", blockedStats)
}

blocked.Unblock("v1:123")
blocked.Unblock("v1:123", 1000)

testutil.WaitForResult(func() (bool, error) {
// Verify Unblock caused an enqueue
Expand Down Expand Up @@ -178,7 +199,7 @@ func TestBlockedEvals_UnblockIneligible(t *testing.T) {
}

// Should do nothing
blocked.Unblock("v1:123")
blocked.Unblock("v1:123", 1000)

testutil.WaitForResult(func() (bool, error) {
// Verify Unblock didn't cause an enqueue
Expand Down Expand Up @@ -214,7 +235,7 @@ func TestBlockedEvals_UnblockUnknown(t *testing.T) {
}

// Should unblock because the eval hasn't seen this node class.
blocked.Unblock("v1:789")
blocked.Unblock("v1:789", 1000)

testutil.WaitForResult(func() (bool, error) {
// Verify Unblock causes an enqueue
Expand All @@ -233,3 +254,139 @@ func TestBlockedEvals_UnblockUnknown(t *testing.T) {
t.Fatalf("err: %s", err)
})
}

// Test the block case in which the eval should be immediately unblocked since
// it is escaped and old
func TestBlockedEvals_Block_ImmediateUnblock_Escaped(t *testing.T) {
blocked, broker := testBlockedEvals(t)

// Do an unblock prior to blocking
blocked.Unblock("v1:123", 1000)

// Create a blocked eval that is eligible on a specific node class and add
// it to the blocked tracker.
e := mock.Eval()
e.Status = structs.EvalStatusBlocked
e.EscapedComputedClass = true
e.SnapshotIndex = 900
blocked.Block(e)

// Verify block caused the eval to be immediately unblocked
blockedStats := blocked.Stats()
if blockedStats.TotalBlocked != 0 && blockedStats.TotalEscaped != 0 {
t.Fatalf("bad: %#v", blockedStats)
}

testutil.WaitForResult(func() (bool, error) {
// Verify Unblock caused an enqueue
brokerStats := broker.Stats()
if brokerStats.TotalReady != 1 {
return false, fmt.Errorf("bad: %#v", brokerStats)
}

return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
}

// Test the block case in which the eval should be immediately unblocked since
// it there is an unblock on an unseen class
func TestBlockedEvals_Block_ImmediateUnblock_UnseenClass(t *testing.T) {
blocked, broker := testBlockedEvals(t)

// Do an unblock prior to blocking
blocked.Unblock("v1:123", 1000)

// Create a blocked eval that is eligible on a specific node class and add
// it to the blocked tracker.
e := mock.Eval()
e.Status = structs.EvalStatusBlocked
e.EscapedComputedClass = false
e.SnapshotIndex = 900
blocked.Block(e)

// Verify block caused the eval to be immediately unblocked
blockedStats := blocked.Stats()
if blockedStats.TotalBlocked != 0 && blockedStats.TotalEscaped != 0 {
t.Fatalf("bad: %#v", blockedStats)
}

testutil.WaitForResult(func() (bool, error) {
// Verify Unblock caused an enqueue
brokerStats := broker.Stats()
if brokerStats.TotalReady != 1 {
return false, fmt.Errorf("bad: %#v", brokerStats)
}

return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
}

// Test the block case in which the eval should be immediately unblocked since
// it a class it is eligible for has been unblocked
func TestBlockedEvals_Block_ImmediateUnblock_SeenClass(t *testing.T) {
blocked, broker := testBlockedEvals(t)

// Do an unblock prior to blocking
blocked.Unblock("v1:123", 1000)

// Create a blocked eval that is eligible on a specific node class and add
// it to the blocked tracker.
e := mock.Eval()
e.Status = structs.EvalStatusBlocked
e.ClassEligibility = map[string]bool{"v1:123": true, "v1:456": false}
e.SnapshotIndex = 900
blocked.Block(e)

// Verify block caused the eval to be immediately unblocked
blockedStats := blocked.Stats()
if blockedStats.TotalBlocked != 0 && blockedStats.TotalEscaped != 0 {
t.Fatalf("bad: %#v", blockedStats)
}

testutil.WaitForResult(func() (bool, error) {
// Verify Unblock caused an enqueue
brokerStats := broker.Stats()
if brokerStats.TotalReady != 1 {
return false, fmt.Errorf("bad: %#v", brokerStats)
}

return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
}

func TestBlockedEvals_UnblockFailed(t *testing.T) {
blocked, broker := testBlockedEvals(t)

// Create blocked evals that are due to failures
e := mock.Eval()
e.Status = structs.EvalStatusBlocked
e.TriggeredBy = structs.EvalTriggerMaxPlans
e.EscapedComputedClass = true
blocked.Block(e)

e2 := mock.Eval()
e2.Status = structs.EvalStatusBlocked
e2.TriggeredBy = structs.EvalTriggerMaxPlans
e2.ClassEligibility = map[string]bool{"v1:123": true, "v1:456": false}
blocked.Block(e2)

// Trigger an unblock fail
blocked.UnblockFailed()

testutil.WaitForResult(func() (bool, error) {
// Verify Unblock caused an enqueue
brokerStats := broker.Stats()
if brokerStats.TotalReady != 2 {
return false, fmt.Errorf("bad: %#v", brokerStats)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
}
Loading

0 comments on commit 094a090

Please sign in to comment.