Skip to content

Commit

Permalink
Periodically unblock failed evaluations
Browse files Browse the repository at this point in the history
  • Loading branch information
dadgar committed May 25, 2016
1 parent 1fc3fec commit 8456f77
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 9 deletions.
27 changes: 27 additions & 0 deletions nomad/blocked_evals.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,33 @@ func (b *BlockedEvals) unblock(computedClass string, index uint64) {
}
}

// UnblockFailed unblocks all blocked evaluation that were due to scheduler
// failure.
func (b *BlockedEvals) UnblockFailed() {
b.l.Lock()
defer b.l.Unlock()

// Do nothing if not enabled
if !b.enabled {
return
}

var unblock []*structs.Evaluation
for _, eval := range b.captured {
if eval.TriggeredBy == structs.EvalTriggerMaxPlans {
unblock = append(unblock, eval)
}
}

for _, eval := range b.escaped {
if eval.TriggeredBy == structs.EvalTriggerMaxPlans {
unblock = append(unblock, eval)
}
}

b.evalBroker.EnqueueAll(unblock)
}

// GetDuplicates returns all the duplicate evaluations and blocks until the
// passed timeout.
func (b *BlockedEvals) GetDuplicates(timeout time.Duration) []*structs.Evaluation {
Expand Down
31 changes: 31 additions & 0 deletions nomad/blocked_evals_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,3 +359,34 @@ func TestBlockedEvals_Block_ImmediateUnblock_SeenClass(t *testing.T) {
t.Fatalf("err: %s", err)
})
}

func TestBlockedEvals_UnblockFailed(t *testing.T) {
blocked, broker := testBlockedEvals(t)

// Create blocked evals that are due to failures
e := mock.Eval()
e.Status = structs.EvalStatusBlocked
e.TriggeredBy = structs.EvalTriggerMaxPlans
e.EscapedComputedClass = true
blocked.Block(e)

e2 := mock.Eval()
e2.Status = structs.EvalStatusBlocked
e2.TriggeredBy = structs.EvalTriggerMaxPlans
e2.ClassEligibility = map[string]bool{"v1:123": true, "v1:456": false}
blocked.Block(e2)

// Trigger an unblock fail
blocked.UnblockFailed()

testutil.WaitForResult(func() (bool, error) {
// Verify Unblock caused an enqueue
brokerStats := broker.Stats()
if brokerStats.TotalReady != 2 {
return false, fmt.Errorf("bad: %#v", brokerStats)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
}
18 changes: 18 additions & 0 deletions nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {
// Reap any duplicate blocked evaluations
go s.reapDupBlockedEvaluations(stopCh)

// Periodically unblock failed allocations
go s.periodicUnblockFailedEvals(stopCh)

// Setup the heartbeat timers. This is done both when starting up or when
// a leader fail over happens. Since the timers are maintained by the leader
// node, effectively this means all the timers are renewed at the time of failover.
Expand Down Expand Up @@ -341,6 +344,21 @@ func (s *Server) reapDupBlockedEvaluations(stopCh chan struct{}) {
}
}

// periodicUnblockFailedEvals periodically unblocks failed, blocked evaluations.
func (s *Server) periodicUnblockFailedEvals(stopCh chan struct{}) {
ticker := time.NewTimer(1 * time.Minute)
for {
select {
case <-stopCh:
ticker.Stop()
return
case <-ticker.C:
// Unblock the failed allocations
s.blockedEvals.UnblockFailed()
}
}
}

// revokeLeadership is invoked once we step down as leader.
// This is used to cleanup any state that may be specific to a leader.
func (s *Server) revokeLeadership() error {
Expand Down
1 change: 1 addition & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2570,6 +2570,7 @@ const (
EvalTriggerNodeUpdate = "node-update"
EvalTriggerScheduled = "scheduled"
EvalTriggerRollingUpdate = "rolling-update"
EvalTriggerMaxPlans = "max-plan-attempts"
)

const (
Expand Down
18 changes: 9 additions & 9 deletions scheduler/generic_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,8 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) error {
if statusErr, ok := err.(*SetStatusError); ok {
// Scheduling was tried but made no forward progress so create a
// blocked eval to retry once resources become available.

// TODO: Set the trigger by reason of the blocked eval here to
// something like "max-attempts"
// We can then periodically dequeue these from the blocked_eval
// tracker.
var mErr multierror.Error
if err := s.createBlockedEval(); err != nil {
if err := s.createBlockedEval(true); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
if err := setStatus(s.logger, s.planner, s.eval, s.nextEval, s.blocked, statusErr.EvalStatus, err.Error()); err != nil {
Expand All @@ -140,8 +135,9 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) error {
return setStatus(s.logger, s.planner, s.eval, s.nextEval, s.blocked, structs.EvalStatusComplete, "")
}

// createBlockedEval creates a blocked eval and stores it.
func (s *GenericScheduler) createBlockedEval() error {
// createBlockedEval creates a blocked eval and submits it to the planner. If
// failure is set to true, the eval's trigger reason reflects that.
func (s *GenericScheduler) createBlockedEval(planFailure bool) error {
e := s.ctx.Eligibility()
escaped := e.HasEscaped()

Expand All @@ -152,6 +148,10 @@ func (s *GenericScheduler) createBlockedEval() error {
}

s.blocked = s.eval.CreateBlockedEval(classEligibility, escaped)
if planFailure {
s.blocked.TriggeredBy = structs.EvalTriggerMaxPlans
}

return s.planner.CreateEval(s.blocked)
}

Expand Down Expand Up @@ -191,7 +191,7 @@ func (s *GenericScheduler) process() (bool, error) {
// to place the failed allocations when resources become available. If the
// current evaluation is already a blocked eval, we reuse it.
if s.eval.Status != structs.EvalStatusBlocked && len(s.eval.FailedTGAllocs) != 0 && s.blocked == nil {
if err := s.createBlockedEval(); err != nil {
if err := s.createBlockedEval(false); err != nil {
s.logger.Printf("[ERR] sched: %#v failed to make blocked eval: %v", s.eval, err)
return false, err
}
Expand Down

0 comments on commit 8456f77

Please sign in to comment.