Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reuse blocked evaluations and handle unblock events that occurred during scheduling #1199

Merged
merged 6 commits into from
May 25, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 110 additions & 9 deletions nomad/blocked_evals.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,21 @@ type BlockedEvals struct {
escaped map[string]*structs.Evaluation

// unblockCh is used to buffer unblocking of evaluations.
capacityChangeCh chan string
capacityChangeCh chan *capacityUpdate

// jobs is the map of blocked job and is used to ensure that only one
// blocked eval exists for each job.
jobs map[string]struct{}

// unblockIndexes maps computed node classes to the index in which they were
// unblocked. This is used to check if an evaluation could have been
// unblocked between the time they were in the scheduler and the time they
// are being blocked.
unblockIndexes map[string]uint64

// duplicates is the set of evaluations for jobs that had pre-existing
// blocked evaluations. These should be marked as cancelled since only one
// blocked eval is neeeded bper job.
// blocked eval is neeeded per job.
duplicates []*structs.Evaluation

// duplicateCh is used to signal that a duplicate eval was added to the
Expand All @@ -55,6 +61,12 @@ type BlockedEvals struct {
stopCh chan struct{}
}

// capacityUpdate stores unblock data.
type capacityUpdate struct {
computedClass string
index uint64
}

// BlockedStats returns all the stats about the blocked eval tracker.
type BlockedStats struct {
// TotalEscaped is the total number of blocked evaluations that have escaped
Expand All @@ -73,7 +85,8 @@ func NewBlockedEvals(evalBroker *EvalBroker) *BlockedEvals {
captured: make(map[string]*structs.Evaluation),
escaped: make(map[string]*structs.Evaluation),
jobs: make(map[string]struct{}),
capacityChangeCh: make(chan string, unblockBuffer),
unblockIndexes: make(map[string]uint64),
capacityChangeCh: make(chan *capacityUpdate, unblockBuffer),
duplicateCh: make(chan struct{}, 1),
stopCh: make(chan struct{}),
stats: new(BlockedStats),
Expand Down Expand Up @@ -133,6 +146,16 @@ func (b *BlockedEvals) Block(eval *structs.Evaluation) {
return
}

// Check if the eval missed an unblock while it was in the scheduler at an
// older index. The scheduler could have been invoked with a snapshot of
// state that was prior to additional capacity being added or allocations
// becoming terminal.
if b.missedUnblock(eval) {
// Just re-enqueue the eval immediately
b.evalBroker.Enqueue(eval)
return
}

// Mark the job as tracked.
b.stats.TotalBlocked++
b.jobs[eval.JobID] = struct{}{}
Expand All @@ -152,16 +175,65 @@ func (b *BlockedEvals) Block(eval *structs.Evaluation) {
b.captured[eval.ID] = eval
}

// missedUnblock returns whether an evaluation missed an unblock while it was in
// the scheduler. Since the scheduler can operate at an index in the past, the
// evaluation may have been processed missing data that would allow it to
// complete. This method returns if that is the case and should be called with
// the lock held.
func (b *BlockedEvals) missedUnblock(eval *structs.Evaluation) bool {
var max uint64 = 0
for class, index := range b.unblockIndexes {
// Calculate the max unblock index
if max < index {
max = index
}

elig, ok := eval.ClassEligibility[class]
if !ok {
// The evaluation was processed and did not encounter this class.
// Thus for correctness we need to unblock it.
return true
}

// The evaluation could use the computed node class and the eval was
// processed before the last unblock.
if elig && eval.SnapshotIndex < index {
return true
}
}

// If the evaluation has escaped, and the map contains an index older than
// the evaluations, it should be unblocked.
if eval.EscapedComputedClass && eval.SnapshotIndex < max {
return true
}

// The evaluation is ahead of all recent unblocks.
return false
}

// Unblock causes any evaluation that could potentially make progress on a
// capacity change on the passed computed node class to be enqueued into the
// eval broker.
func (b *BlockedEvals) Unblock(computedClass string) {
func (b *BlockedEvals) Unblock(computedClass string, index uint64) {
b.l.Lock()

// Do nothing if not enabled
if !b.enabled {
b.l.Unlock()
return
}

b.capacityChangeCh <- computedClass
// Store the index in which the unblock happened. We use this on subsequent
// block calls in case the evaluation was in the scheduler when a trigger
// occured.
b.unblockIndexes[computedClass] = index
b.l.Unlock()

b.capacityChangeCh <- &capacityUpdate{
computedClass: computedClass,
index: index,
}
}

// watchCapacity is a long lived function that watches for capacity changes in
Expand All @@ -171,15 +243,15 @@ func (b *BlockedEvals) watchCapacity() {
select {
case <-b.stopCh:
return
case computedClass := <-b.capacityChangeCh:
b.unblock(computedClass)
case update := <-b.capacityChangeCh:
b.unblock(update.computedClass, update.index)
}
}
}

// unblock unblocks all blocked evals that could run on the passed computed node
// class.
func (b *BlockedEvals) unblock(computedClass string) {
func (b *BlockedEvals) unblock(computedClass string, index uint64) {
b.l.Lock()
defer b.l.Unlock()

Expand Down Expand Up @@ -229,6 +301,35 @@ func (b *BlockedEvals) unblock(computedClass string) {
}
}

// UnblockFailed unblocks all blocked evaluation that were due to scheduler
// failure.
func (b *BlockedEvals) UnblockFailed() {
b.l.Lock()
defer b.l.Unlock()

// Do nothing if not enabled
if !b.enabled {
return
}

var unblock []*structs.Evaluation
for id, eval := range b.captured {
if eval.TriggeredBy == structs.EvalTriggerMaxPlans {
unblock = append(unblock, eval)
delete(b.captured, id)
}
}

for id, eval := range b.escaped {
if eval.TriggeredBy == structs.EvalTriggerMaxPlans {
unblock = append(unblock, eval)
delete(b.escaped, id)
}
}

b.evalBroker.EnqueueAll(unblock)
}

// GetDuplicates returns all the duplicate evaluations and blocks until the
// passed timeout.
func (b *BlockedEvals) GetDuplicates(timeout time.Duration) []*structs.Evaluation {
Expand Down Expand Up @@ -273,7 +374,7 @@ func (b *BlockedEvals) Flush() {
b.escaped = make(map[string]*structs.Evaluation)
b.jobs = make(map[string]struct{})
b.duplicates = nil
b.capacityChangeCh = make(chan string, unblockBuffer)
b.capacityChangeCh = make(chan *capacityUpdate, unblockBuffer)
b.stopCh = make(chan struct{})
b.duplicateCh = make(chan struct{}, 1)
}
Expand Down
165 changes: 161 additions & 4 deletions nomad/blocked_evals_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,27 @@ func TestBlockedEvals_Block_SameJob(t *testing.T) {
}
}

func TestBlockedEvals_Block_PriorUnblocks(t *testing.T) {
blocked, _ := testBlockedEvals(t)

// Do unblocks prior to blocking
blocked.Unblock("v1:123", 1000)
blocked.Unblock("v1:123", 1001)

// Create two blocked evals and add them to the blocked tracker.
e := mock.Eval()
e.Status = structs.EvalStatusBlocked
e.ClassEligibility = map[string]bool{"v1:123": false, "v1:456": false}
e.SnapshotIndex = 999
blocked.Block(e)

// Verify block did track both
bStats := blocked.Stats()
if bStats.TotalBlocked != 1 || bStats.TotalEscaped != 0 {
t.Fatalf("bad: %#v", bStats)
}
}

func TestBlockedEvals_GetDuplicates(t *testing.T) {
blocked, _ := testBlockedEvals(t)

Expand Down Expand Up @@ -105,7 +126,7 @@ func TestBlockedEvals_UnblockEscaped(t *testing.T) {
t.Fatalf("bad: %#v", bStats)
}

blocked.Unblock("v1:123")
blocked.Unblock("v1:123", 1000)

testutil.WaitForResult(func() (bool, error) {
// Verify Unblock caused an enqueue
Expand Down Expand Up @@ -141,7 +162,7 @@ func TestBlockedEvals_UnblockEligible(t *testing.T) {
t.Fatalf("bad: %#v", blockedStats)
}

blocked.Unblock("v1:123")
blocked.Unblock("v1:123", 1000)

testutil.WaitForResult(func() (bool, error) {
// Verify Unblock caused an enqueue
Expand Down Expand Up @@ -178,7 +199,7 @@ func TestBlockedEvals_UnblockIneligible(t *testing.T) {
}

// Should do nothing
blocked.Unblock("v1:123")
blocked.Unblock("v1:123", 1000)

testutil.WaitForResult(func() (bool, error) {
// Verify Unblock didn't cause an enqueue
Expand Down Expand Up @@ -214,7 +235,7 @@ func TestBlockedEvals_UnblockUnknown(t *testing.T) {
}

// Should unblock because the eval hasn't seen this node class.
blocked.Unblock("v1:789")
blocked.Unblock("v1:789", 1000)

testutil.WaitForResult(func() (bool, error) {
// Verify Unblock causes an enqueue
Expand All @@ -233,3 +254,139 @@ func TestBlockedEvals_UnblockUnknown(t *testing.T) {
t.Fatalf("err: %s", err)
})
}

// Test the block case in which the eval should be immediately unblocked since
// it is escaped and old
func TestBlockedEvals_Block_ImmediateUnblock_Escaped(t *testing.T) {
blocked, broker := testBlockedEvals(t)

// Do an unblock prior to blocking
blocked.Unblock("v1:123", 1000)

// Create a blocked eval that is eligible on a specific node class and add
// it to the blocked tracker.
e := mock.Eval()
e.Status = structs.EvalStatusBlocked
e.EscapedComputedClass = true
e.SnapshotIndex = 900
blocked.Block(e)

// Verify block caused the eval to be immediately unblocked
blockedStats := blocked.Stats()
if blockedStats.TotalBlocked != 0 && blockedStats.TotalEscaped != 0 {
t.Fatalf("bad: %#v", blockedStats)
}

testutil.WaitForResult(func() (bool, error) {
// Verify Unblock caused an enqueue
brokerStats := broker.Stats()
if brokerStats.TotalReady != 1 {
return false, fmt.Errorf("bad: %#v", brokerStats)
}

return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
}

// Test the block case in which the eval should be immediately unblocked since
// it there is an unblock on an unseen class
func TestBlockedEvals_Block_ImmediateUnblock_UnseenClass(t *testing.T) {
blocked, broker := testBlockedEvals(t)

// Do an unblock prior to blocking
blocked.Unblock("v1:123", 1000)

// Create a blocked eval that is eligible on a specific node class and add
// it to the blocked tracker.
e := mock.Eval()
e.Status = structs.EvalStatusBlocked
e.EscapedComputedClass = false
e.SnapshotIndex = 900
blocked.Block(e)

// Verify block caused the eval to be immediately unblocked
blockedStats := blocked.Stats()
if blockedStats.TotalBlocked != 0 && blockedStats.TotalEscaped != 0 {
t.Fatalf("bad: %#v", blockedStats)
}

testutil.WaitForResult(func() (bool, error) {
// Verify Unblock caused an enqueue
brokerStats := broker.Stats()
if brokerStats.TotalReady != 1 {
return false, fmt.Errorf("bad: %#v", brokerStats)
}

return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
}

// Test the block case in which the eval should be immediately unblocked since
// it a class it is eligible for has been unblocked
func TestBlockedEvals_Block_ImmediateUnblock_SeenClass(t *testing.T) {
blocked, broker := testBlockedEvals(t)

// Do an unblock prior to blocking
blocked.Unblock("v1:123", 1000)

// Create a blocked eval that is eligible on a specific node class and add
// it to the blocked tracker.
e := mock.Eval()
e.Status = structs.EvalStatusBlocked
e.ClassEligibility = map[string]bool{"v1:123": true, "v1:456": false}
e.SnapshotIndex = 900
blocked.Block(e)

// Verify block caused the eval to be immediately unblocked
blockedStats := blocked.Stats()
if blockedStats.TotalBlocked != 0 && blockedStats.TotalEscaped != 0 {
t.Fatalf("bad: %#v", blockedStats)
}

testutil.WaitForResult(func() (bool, error) {
// Verify Unblock caused an enqueue
brokerStats := broker.Stats()
if brokerStats.TotalReady != 1 {
return false, fmt.Errorf("bad: %#v", brokerStats)
}

return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
}

func TestBlockedEvals_UnblockFailed(t *testing.T) {
blocked, broker := testBlockedEvals(t)

// Create blocked evals that are due to failures
e := mock.Eval()
e.Status = structs.EvalStatusBlocked
e.TriggeredBy = structs.EvalTriggerMaxPlans
e.EscapedComputedClass = true
blocked.Block(e)

e2 := mock.Eval()
e2.Status = structs.EvalStatusBlocked
e2.TriggeredBy = structs.EvalTriggerMaxPlans
e2.ClassEligibility = map[string]bool{"v1:123": true, "v1:456": false}
blocked.Block(e2)

// Trigger an unblock fail
blocked.UnblockFailed()

testutil.WaitForResult(func() (bool, error) {
// Verify Unblock caused an enqueue
brokerStats := broker.Stats()
if brokerStats.TotalReady != 2 {
return false, fmt.Errorf("bad: %#v", brokerStats)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
}
Loading