diff --git a/nomad/blocked_evals.go b/nomad/blocked_evals.go index ecb845c12f6a..06e7410ac6f7 100644 --- a/nomad/blocked_evals.go +++ b/nomad/blocked_evals.go @@ -35,15 +35,21 @@ type BlockedEvals struct { escaped map[string]*structs.Evaluation // unblockCh is used to buffer unblocking of evaluations. - capacityChangeCh chan string + capacityChangeCh chan *capacityUpdate // jobs is the map of blocked job and is used to ensure that only one // blocked eval exists for each job. jobs map[string]struct{} + // unblockIndexes maps computed node classes to the index in which they were + // unblocked. This is used to check if an evaluation could have been + // unblocked between the time they were in the scheduler and the time they + // are being blocked. + unblockIndexes map[string]uint64 + // duplicates is the set of evaluations for jobs that had pre-existing // blocked evaluations. These should be marked as cancelled since only one - // blocked eval is neeeded bper job. + // blocked eval is neeeded per job. duplicates []*structs.Evaluation // duplicateCh is used to signal that a duplicate eval was added to the @@ -55,6 +61,12 @@ type BlockedEvals struct { stopCh chan struct{} } +// capacityUpdate stores unblock data. +type capacityUpdate struct { + computedClass string + index uint64 +} + // BlockedStats returns all the stats about the blocked eval tracker. type BlockedStats struct { // TotalEscaped is the total number of blocked evaluations that have escaped @@ -73,7 +85,8 @@ func NewBlockedEvals(evalBroker *EvalBroker) *BlockedEvals { captured: make(map[string]*structs.Evaluation), escaped: make(map[string]*structs.Evaluation), jobs: make(map[string]struct{}), - capacityChangeCh: make(chan string, unblockBuffer), + unblockIndexes: make(map[string]uint64), + capacityChangeCh: make(chan *capacityUpdate, unblockBuffer), duplicateCh: make(chan struct{}, 1), stopCh: make(chan struct{}), stats: new(BlockedStats), @@ -133,6 +146,16 @@ func (b *BlockedEvals) Block(eval *structs.Evaluation) { return } + // Check if the eval missed an unblock while it was in the scheduler at an + // older index. The scheduler could have been invoked with a snapshot of + // state that was prior to additional capacity being added or allocations + // becoming terminal. + if b.missedUnblock(eval) { + // Just re-enqueue the eval immediately + b.evalBroker.Enqueue(eval) + return + } + // Mark the job as tracked. b.stats.TotalBlocked++ b.jobs[eval.JobID] = struct{}{} @@ -152,16 +175,65 @@ func (b *BlockedEvals) Block(eval *structs.Evaluation) { b.captured[eval.ID] = eval } +// missedUnblock returns whether an evaluation missed an unblock while it was in +// the scheduler. Since the scheduler can operate at an index in the past, the +// evaluation may have been processed missing data that would allow it to +// complete. This method returns if that is the case and should be called with +// the lock held. +func (b *BlockedEvals) missedUnblock(eval *structs.Evaluation) bool { + var max uint64 = 0 + for class, index := range b.unblockIndexes { + // Calculate the max unblock index + if max < index { + max = index + } + + elig, ok := eval.ClassEligibility[class] + if !ok { + // The evaluation was processed and did not encounter this class. + // Thus for correctness we need to unblock it. + return true + } + + // The evaluation could use the computed node class and the eval was + // processed before the last unblock. + if elig && eval.SnapshotIndex < index { + return true + } + } + + // If the evaluation has escaped, and the map contains an index older than + // the evaluations, it should be unblocked. + if eval.EscapedComputedClass && eval.SnapshotIndex < max { + return true + } + + // The evaluation is ahead of all recent unblocks. + return false +} + // Unblock causes any evaluation that could potentially make progress on a // capacity change on the passed computed node class to be enqueued into the // eval broker. -func (b *BlockedEvals) Unblock(computedClass string) { +func (b *BlockedEvals) Unblock(computedClass string, index uint64) { + b.l.Lock() + // Do nothing if not enabled if !b.enabled { + b.l.Unlock() return } - b.capacityChangeCh <- computedClass + // Store the index in which the unblock happened. We use this on subsequent + // block calls in case the evaluation was in the scheduler when a trigger + // occured. + b.unblockIndexes[computedClass] = index + b.l.Unlock() + + b.capacityChangeCh <- &capacityUpdate{ + computedClass: computedClass, + index: index, + } } // watchCapacity is a long lived function that watches for capacity changes in @@ -171,15 +243,15 @@ func (b *BlockedEvals) watchCapacity() { select { case <-b.stopCh: return - case computedClass := <-b.capacityChangeCh: - b.unblock(computedClass) + case update := <-b.capacityChangeCh: + b.unblock(update.computedClass, update.index) } } } // unblock unblocks all blocked evals that could run on the passed computed node // class. -func (b *BlockedEvals) unblock(computedClass string) { +func (b *BlockedEvals) unblock(computedClass string, index uint64) { b.l.Lock() defer b.l.Unlock() @@ -229,6 +301,35 @@ func (b *BlockedEvals) unblock(computedClass string) { } } +// UnblockFailed unblocks all blocked evaluation that were due to scheduler +// failure. +func (b *BlockedEvals) UnblockFailed() { + b.l.Lock() + defer b.l.Unlock() + + // Do nothing if not enabled + if !b.enabled { + return + } + + var unblock []*structs.Evaluation + for id, eval := range b.captured { + if eval.TriggeredBy == structs.EvalTriggerMaxPlans { + unblock = append(unblock, eval) + delete(b.captured, id) + } + } + + for id, eval := range b.escaped { + if eval.TriggeredBy == structs.EvalTriggerMaxPlans { + unblock = append(unblock, eval) + delete(b.escaped, id) + } + } + + b.evalBroker.EnqueueAll(unblock) +} + // GetDuplicates returns all the duplicate evaluations and blocks until the // passed timeout. func (b *BlockedEvals) GetDuplicates(timeout time.Duration) []*structs.Evaluation { @@ -273,7 +374,7 @@ func (b *BlockedEvals) Flush() { b.escaped = make(map[string]*structs.Evaluation) b.jobs = make(map[string]struct{}) b.duplicates = nil - b.capacityChangeCh = make(chan string, unblockBuffer) + b.capacityChangeCh = make(chan *capacityUpdate, unblockBuffer) b.stopCh = make(chan struct{}) b.duplicateCh = make(chan struct{}, 1) } diff --git a/nomad/blocked_evals_test.go b/nomad/blocked_evals_test.go index cf725a7a110a..9b963feffbae 100644 --- a/nomad/blocked_evals_test.go +++ b/nomad/blocked_evals_test.go @@ -53,6 +53,27 @@ func TestBlockedEvals_Block_SameJob(t *testing.T) { } } +func TestBlockedEvals_Block_PriorUnblocks(t *testing.T) { + blocked, _ := testBlockedEvals(t) + + // Do unblocks prior to blocking + blocked.Unblock("v1:123", 1000) + blocked.Unblock("v1:123", 1001) + + // Create two blocked evals and add them to the blocked tracker. + e := mock.Eval() + e.Status = structs.EvalStatusBlocked + e.ClassEligibility = map[string]bool{"v1:123": false, "v1:456": false} + e.SnapshotIndex = 999 + blocked.Block(e) + + // Verify block did track both + bStats := blocked.Stats() + if bStats.TotalBlocked != 1 || bStats.TotalEscaped != 0 { + t.Fatalf("bad: %#v", bStats) + } +} + func TestBlockedEvals_GetDuplicates(t *testing.T) { blocked, _ := testBlockedEvals(t) @@ -105,7 +126,7 @@ func TestBlockedEvals_UnblockEscaped(t *testing.T) { t.Fatalf("bad: %#v", bStats) } - blocked.Unblock("v1:123") + blocked.Unblock("v1:123", 1000) testutil.WaitForResult(func() (bool, error) { // Verify Unblock caused an enqueue @@ -141,7 +162,7 @@ func TestBlockedEvals_UnblockEligible(t *testing.T) { t.Fatalf("bad: %#v", blockedStats) } - blocked.Unblock("v1:123") + blocked.Unblock("v1:123", 1000) testutil.WaitForResult(func() (bool, error) { // Verify Unblock caused an enqueue @@ -178,7 +199,7 @@ func TestBlockedEvals_UnblockIneligible(t *testing.T) { } // Should do nothing - blocked.Unblock("v1:123") + blocked.Unblock("v1:123", 1000) testutil.WaitForResult(func() (bool, error) { // Verify Unblock didn't cause an enqueue @@ -214,7 +235,7 @@ func TestBlockedEvals_UnblockUnknown(t *testing.T) { } // Should unblock because the eval hasn't seen this node class. - blocked.Unblock("v1:789") + blocked.Unblock("v1:789", 1000) testutil.WaitForResult(func() (bool, error) { // Verify Unblock causes an enqueue @@ -233,3 +254,139 @@ func TestBlockedEvals_UnblockUnknown(t *testing.T) { t.Fatalf("err: %s", err) }) } + +// Test the block case in which the eval should be immediately unblocked since +// it is escaped and old +func TestBlockedEvals_Block_ImmediateUnblock_Escaped(t *testing.T) { + blocked, broker := testBlockedEvals(t) + + // Do an unblock prior to blocking + blocked.Unblock("v1:123", 1000) + + // Create a blocked eval that is eligible on a specific node class and add + // it to the blocked tracker. + e := mock.Eval() + e.Status = structs.EvalStatusBlocked + e.EscapedComputedClass = true + e.SnapshotIndex = 900 + blocked.Block(e) + + // Verify block caused the eval to be immediately unblocked + blockedStats := blocked.Stats() + if blockedStats.TotalBlocked != 0 && blockedStats.TotalEscaped != 0 { + t.Fatalf("bad: %#v", blockedStats) + } + + testutil.WaitForResult(func() (bool, error) { + // Verify Unblock caused an enqueue + brokerStats := broker.Stats() + if brokerStats.TotalReady != 1 { + return false, fmt.Errorf("bad: %#v", brokerStats) + } + + return true, nil + }, func(err error) { + t.Fatalf("err: %s", err) + }) +} + +// Test the block case in which the eval should be immediately unblocked since +// it there is an unblock on an unseen class +func TestBlockedEvals_Block_ImmediateUnblock_UnseenClass(t *testing.T) { + blocked, broker := testBlockedEvals(t) + + // Do an unblock prior to blocking + blocked.Unblock("v1:123", 1000) + + // Create a blocked eval that is eligible on a specific node class and add + // it to the blocked tracker. + e := mock.Eval() + e.Status = structs.EvalStatusBlocked + e.EscapedComputedClass = false + e.SnapshotIndex = 900 + blocked.Block(e) + + // Verify block caused the eval to be immediately unblocked + blockedStats := blocked.Stats() + if blockedStats.TotalBlocked != 0 && blockedStats.TotalEscaped != 0 { + t.Fatalf("bad: %#v", blockedStats) + } + + testutil.WaitForResult(func() (bool, error) { + // Verify Unblock caused an enqueue + brokerStats := broker.Stats() + if brokerStats.TotalReady != 1 { + return false, fmt.Errorf("bad: %#v", brokerStats) + } + + return true, nil + }, func(err error) { + t.Fatalf("err: %s", err) + }) +} + +// Test the block case in which the eval should be immediately unblocked since +// it a class it is eligible for has been unblocked +func TestBlockedEvals_Block_ImmediateUnblock_SeenClass(t *testing.T) { + blocked, broker := testBlockedEvals(t) + + // Do an unblock prior to blocking + blocked.Unblock("v1:123", 1000) + + // Create a blocked eval that is eligible on a specific node class and add + // it to the blocked tracker. + e := mock.Eval() + e.Status = structs.EvalStatusBlocked + e.ClassEligibility = map[string]bool{"v1:123": true, "v1:456": false} + e.SnapshotIndex = 900 + blocked.Block(e) + + // Verify block caused the eval to be immediately unblocked + blockedStats := blocked.Stats() + if blockedStats.TotalBlocked != 0 && blockedStats.TotalEscaped != 0 { + t.Fatalf("bad: %#v", blockedStats) + } + + testutil.WaitForResult(func() (bool, error) { + // Verify Unblock caused an enqueue + brokerStats := broker.Stats() + if brokerStats.TotalReady != 1 { + return false, fmt.Errorf("bad: %#v", brokerStats) + } + + return true, nil + }, func(err error) { + t.Fatalf("err: %s", err) + }) +} + +func TestBlockedEvals_UnblockFailed(t *testing.T) { + blocked, broker := testBlockedEvals(t) + + // Create blocked evals that are due to failures + e := mock.Eval() + e.Status = structs.EvalStatusBlocked + e.TriggeredBy = structs.EvalTriggerMaxPlans + e.EscapedComputedClass = true + blocked.Block(e) + + e2 := mock.Eval() + e2.Status = structs.EvalStatusBlocked + e2.TriggeredBy = structs.EvalTriggerMaxPlans + e2.ClassEligibility = map[string]bool{"v1:123": true, "v1:456": false} + blocked.Block(e2) + + // Trigger an unblock fail + blocked.UnblockFailed() + + testutil.WaitForResult(func() (bool, error) { + // Verify Unblock caused an enqueue + brokerStats := broker.Stats() + if brokerStats.TotalReady != 2 { + return false, fmt.Errorf("bad: %#v", brokerStats) + } + return true, nil + }, func(err error) { + t.Fatalf("err: %s", err) + }) +} diff --git a/nomad/eval_endpoint.go b/nomad/eval_endpoint.go index 71211169779e..25895096e0ff 100644 --- a/nomad/eval_endpoint.go +++ b/nomad/eval_endpoint.go @@ -202,6 +202,46 @@ func (e *Eval) Create(args *structs.EvalUpdateRequest, return nil } +// Reblock is used to reinsert an existing blocked evaluation into the blocked +// evaluation tracker. +func (e *Eval) Reblock(args *structs.EvalUpdateRequest, reply *structs.GenericResponse) error { + if done, err := e.srv.forward("Eval.Reblock", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "eval", "reblock"}, time.Now()) + + // Ensure there is only a single update with token + if len(args.Evals) != 1 { + return fmt.Errorf("only a single eval can be reblocked") + } + eval := args.Evals[0] + + // Verify the evaluation is outstanding, and that the tokens match. + if err := e.srv.evalBroker.OutstandingReset(eval.ID, args.EvalToken); err != nil { + return err + } + + // Look for the eval + snap, err := e.srv.fsm.State().Snapshot() + if err != nil { + return err + } + out, err := snap.EvalByID(eval.ID) + if err != nil { + return err + } + if out == nil { + return fmt.Errorf("evaluation does not exist") + } + if out.Status != structs.EvalStatusBlocked { + return fmt.Errorf("evaluation not blocked") + } + + // Reblock the eval + e.srv.blockedEvals.Block(eval) + return nil +} + // Reap is used to cleanup dead evaluations and allocations func (e *Eval) Reap(args *structs.EvalDeleteRequest, reply *structs.GenericResponse) error { diff --git a/nomad/eval_endpoint_test.go b/nomad/eval_endpoint_test.go index f037c8bd3a0b..3fa270ce886c 100644 --- a/nomad/eval_endpoint_test.go +++ b/nomad/eval_endpoint_test.go @@ -575,3 +575,121 @@ func TestEvalEndpoint_Allocations_Blocking(t *testing.T) { t.Fatalf("bad: %#v", resp.Allocations) } } + +func TestEvalEndpoint_Reblock_NonExistent(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + codec := rpcClient(t, s1) + + testutil.WaitForResult(func() (bool, error) { + return s1.evalBroker.Enabled(), nil + }, func(err error) { + t.Fatalf("should enable eval broker") + }) + + // Create the register request + eval1 := mock.Eval() + s1.evalBroker.Enqueue(eval1) + out, token, err := s1.evalBroker.Dequeue(defaultSched, time.Second) + if err != nil { + t.Fatalf("err: %v", err) + } + if out == nil { + t.Fatalf("missing eval") + } + + get := &structs.EvalUpdateRequest{ + Evals: []*structs.Evaluation{eval1}, + EvalToken: token, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp structs.GenericResponse + if err := msgpackrpc.CallWithCodec(codec, "Eval.Reblock", get, &resp); err == nil { + t.Fatalf("expect error since eval does not exist") + } +} + +func TestEvalEndpoint_Reblock_NonBlocked(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + codec := rpcClient(t, s1) + + testutil.WaitForResult(func() (bool, error) { + return s1.evalBroker.Enabled(), nil + }, func(err error) { + t.Fatalf("should enable eval broker") + }) + + // Create the eval + eval1 := mock.Eval() + s1.evalBroker.Enqueue(eval1) + + // Insert it into the state store + if err := s1.fsm.State().UpsertEvals(1000, []*structs.Evaluation{eval1}); err != nil { + t.Fatal(err) + } + + out, token, err := s1.evalBroker.Dequeue(defaultSched, time.Second) + if err != nil { + t.Fatalf("err: %v", err) + } + if out == nil { + t.Fatalf("missing eval") + } + + get := &structs.EvalUpdateRequest{ + Evals: []*structs.Evaluation{eval1}, + EvalToken: token, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp structs.GenericResponse + if err := msgpackrpc.CallWithCodec(codec, "Eval.Reblock", get, &resp); err == nil { + t.Fatalf("should error since eval was not in blocked state", err) + } +} + +func TestEvalEndpoint_Reblock(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + codec := rpcClient(t, s1) + + testutil.WaitForResult(func() (bool, error) { + return s1.evalBroker.Enabled(), nil + }, func(err error) { + t.Fatalf("should enable eval broker") + }) + + // Create the eval + eval1 := mock.Eval() + eval1.Status = structs.EvalStatusBlocked + s1.evalBroker.Enqueue(eval1) + + // Insert it into the state store + if err := s1.fsm.State().UpsertEvals(1000, []*structs.Evaluation{eval1}); err != nil { + t.Fatal(err) + } + + out, token, err := s1.evalBroker.Dequeue(defaultSched, time.Second) + if err != nil { + t.Fatalf("err: %v", err) + } + if out == nil { + t.Fatalf("missing eval") + } + + get := &structs.EvalUpdateRequest{ + Evals: []*structs.Evaluation{eval1}, + EvalToken: token, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp structs.GenericResponse + if err := msgpackrpc.CallWithCodec(codec, "Eval.Reblock", get, &resp); err != nil { + t.Fatalf("err: %v", err) + } + + // Check that it is blocked + bStats := s1.blockedEvals.Stats() + if bStats.TotalBlocked+bStats.TotalEscaped == 0 { + t.Fatalf("ReblockEval didn't insert eval into the blocked eval tracker") + } +} diff --git a/nomad/fsm.go b/nomad/fsm.go index 6237e976004a..a220138f8279 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -158,7 +158,7 @@ func (n *nomadFSM) applyUpsertNode(buf []byte, index uint64) interface{} { // Unblock evals for the nodes computed node class if it is in a ready // state. if req.Node.Status == structs.NodeStatusReady { - n.blockedEvals.Unblock(req.Node.ComputedClass) + n.blockedEvals.Unblock(req.Node.ComputedClass, index) } return nil @@ -199,7 +199,7 @@ func (n *nomadFSM) applyStatusUpdate(buf []byte, index uint64) interface{} { return err } - n.blockedEvals.Unblock(node.ComputedClass) + n.blockedEvals.Unblock(node.ComputedClass, index) } return nil @@ -420,7 +420,7 @@ func (n *nomadFSM) applyAllocClientUpdate(buf []byte, index uint64) interface{} return err } - n.blockedEvals.Unblock(node.ComputedClass) + n.blockedEvals.Unblock(node.ComputedClass, index) } } diff --git a/nomad/leader.go b/nomad/leader.go index 04f5f1432d56..f1e10b71d400 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -11,6 +11,13 @@ import ( "github.com/hashicorp/serf/serf" ) +const ( + // failedEvalUnblockInterval is the interval at which failed evaluations are + // unblocked to re-enter the scheduler. A failed evaluation occurs under + // high contention when the schedulers plan does not make progress. + failedEvalUnblockInterval = 1 * time.Minute +) + // monitorLeadership is used to monitor if we acquire or lose our role // as the leader in the Raft cluster. There is some work the leader is // expected to do, so we must react to changes @@ -143,6 +150,9 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { // Reap any duplicate blocked evaluations go s.reapDupBlockedEvaluations(stopCh) + // Periodically unblock failed allocations + go s.periodicUnblockFailedEvals(stopCh) + // Setup the heartbeat timers. This is done both when starting up or when // a leader fail over happens. Since the timers are maintained by the leader // node, effectively this means all the timers are renewed at the time of failover. @@ -341,6 +351,21 @@ func (s *Server) reapDupBlockedEvaluations(stopCh chan struct{}) { } } +// periodicUnblockFailedEvals periodically unblocks failed, blocked evaluations. +func (s *Server) periodicUnblockFailedEvals(stopCh chan struct{}) { + ticker := time.NewTimer(failedEvalUnblockInterval) + defer ticker.Stop() + for { + select { + case <-stopCh: + return + case <-ticker.C: + // Unblock the failed allocations + s.blockedEvals.UnblockFailed() + } + } +} + // revokeLeadership is invoked once we step down as leader. // This is used to cleanup any state that may be specific to a leader. func (s *Server) revokeLeadership() error { diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 42365225fc1c..63fae8e5a100 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -972,6 +972,32 @@ func (s *StateStore) Allocs() (memdb.ResultIterator, error) { return iter, nil } +// LastIndex returns the greatest index value for all indexes +func (s *StateStore) LatestIndex() (uint64, error) { + indexes, err := s.Indexes() + if err != nil { + return 0, err + } + + var max uint64 = 0 + for { + raw := indexes.Next() + if raw == nil { + break + } + + // Prepare the request struct + idx := raw.(*IndexEntry) + + // Determine the max + if idx.Value > max { + max = idx.Value + } + } + + return max, nil +} + // Index finds the matching index value func (s *StateStore) Index(name string) (uint64, error) { txn := s.db.Txn(false) diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 22166ef1cecc..dcdd135719cc 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -1019,6 +1019,28 @@ func TestStateStore_Indexes(t *testing.T) { } } +func TestStateStore_LatestIndex(t *testing.T) { + state := testStateStore(t) + + if err := state.UpsertNode(1000, mock.Node()); err != nil { + t.Fatalf("err: %v", err) + } + + exp := uint64(2000) + if err := state.UpsertJob(exp, mock.Job()); err != nil { + t.Fatalf("err: %v", err) + } + + latest, err := state.LatestIndex() + if err != nil { + t.Fatalf("err: %v", err) + } + + if latest != exp { + t.Fatalf("LatestIndex() returned %d; want %d", latest, exp) + } +} + func TestStateStore_RestoreIndex(t *testing.T) { state := testStateStore(t) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index bf885229bab1..64039cf511fe 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -2570,6 +2570,7 @@ const ( EvalTriggerNodeUpdate = "node-update" EvalTriggerScheduled = "scheduled" EvalTriggerRollingUpdate = "rolling-update" + EvalTriggerMaxPlans = "max-plan-attempts" ) const ( @@ -2671,6 +2672,11 @@ type Evaluation struct { // during the evaluation. This should not be set during normal operations. AnnotatePlan bool + // SnapshotIndex is the Raft index of the snapshot used to process the + // evaluation. As such it will only be set once it has gone through the + // scheduler. + SnapshotIndex uint64 + // Raft Indexes CreateIndex uint64 ModifyIndex uint64 diff --git a/nomad/worker.go b/nomad/worker.go index 1bae409d59c0..9326f112e442 100644 --- a/nomad/worker.go +++ b/nomad/worker.go @@ -59,6 +59,11 @@ type Worker struct { failures uint evalToken string + + // snapshotIndex is the index of the snapshot in which the scheduler was + // first envoked. It is used to mark the SnapshotIndex of evaluations + // Created, Updated or Reblocked. + snapshotIndex uint64 } // NewWorker starts a new worker associated with the given server @@ -241,6 +246,12 @@ func (w *Worker) invokeScheduler(eval *structs.Evaluation, token string) error { return fmt.Errorf("failed to snapshot state: %v", err) } + // Store the snapshot's index + w.snapshotIndex, err = snap.LatestIndex() + if err != nil { + return fmt.Errorf("failed to determine snapshot's index: %v", err) + } + // Create the scheduler, or use the special system scheduler var sched scheduler.Scheduler if eval.Type == structs.JobTypeCore { @@ -334,6 +345,9 @@ func (w *Worker) UpdateEval(eval *structs.Evaluation) error { } defer metrics.MeasureSince([]string{"nomad", "worker", "update_eval"}, time.Now()) + // Store the snapshot index in the eval + eval.SnapshotIndex = w.snapshotIndex + // Setup the request req := structs.EvalUpdateRequest{ Evals: []*structs.Evaluation{eval}, @@ -369,6 +383,9 @@ func (w *Worker) CreateEval(eval *structs.Evaluation) error { } defer metrics.MeasureSince([]string{"nomad", "worker", "create_eval"}, time.Now()) + // Store the snapshot index in the eval + eval.SnapshotIndex = w.snapshotIndex + // Setup the request req := structs.EvalUpdateRequest{ Evals: []*structs.Evaluation{eval}, @@ -395,6 +412,44 @@ SUBMIT: return nil } +// ReblockEval is used to reinsert a blocked evaluation into the blocked eval +// tracker. This allows the worker to act as the planner for the scheduler. +func (w *Worker) ReblockEval(eval *structs.Evaluation) error { + // Check for a shutdown before plan submission + if w.srv.IsShutdown() { + return fmt.Errorf("shutdown while planning") + } + defer metrics.MeasureSince([]string{"nomad", "worker", "reblock_eval"}, time.Now()) + + // Store the snapshot index in the eval + eval.SnapshotIndex = w.snapshotIndex + + // Setup the request + req := structs.EvalUpdateRequest{ + Evals: []*structs.Evaluation{eval}, + EvalToken: w.evalToken, + WriteRequest: structs.WriteRequest{ + Region: w.srv.config.Region, + }, + } + var resp structs.GenericResponse + +SUBMIT: + // Make the RPC call + if err := w.srv.RPC("Eval.Reblock", &req, &resp); err != nil { + w.logger.Printf("[ERR] worker: failed to reblock evaluation %#v: %v", + eval, err) + if w.shouldResubmit(err) && !w.backoffErr(backoffBaselineSlow, backoffLimitSlow) { + goto SUBMIT + } + return err + } else { + w.logger.Printf("[DEBUG] worker: reblocked evaluation %#v", eval) + w.backoffReset() + } + return nil +} + // shouldResubmit checks if a given error should be swallowed and the plan // resubmitted after a backoff. Usually these are transient errors that // the cluster should heal from quickly. diff --git a/nomad/worker_test.go b/nomad/worker_test.go index c1663592ae82..52b557bba6e2 100644 --- a/nomad/worker_test.go +++ b/nomad/worker_test.go @@ -398,6 +398,9 @@ func TestWorker_UpdateEval(t *testing.T) { if out.Status != structs.EvalStatusComplete { t.Fatalf("bad: %v", out) } + if out.SnapshotIndex != w.snapshotIndex { + t.Fatalf("bad: %v", out) + } } func TestWorker_CreateEval(t *testing.T) { @@ -441,4 +444,74 @@ func TestWorker_CreateEval(t *testing.T) { if out.PreviousEval != eval1.ID { t.Fatalf("bad: %v", out) } + if out.SnapshotIndex != w.snapshotIndex { + t.Fatalf("bad: %v", out) + } +} + +func TestWorker_ReblockEval(t *testing.T) { + s1 := testServer(t, func(c *Config) { + c.NumSchedulers = 0 + c.EnabledSchedulers = []string{structs.JobTypeService} + }) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + // Create the blocked eval + eval1 := mock.Eval() + eval1.Status = structs.EvalStatusBlocked + + // Insert it into the state store + if err := s1.fsm.State().UpsertEvals(1000, []*structs.Evaluation{eval1}); err != nil { + t.Fatal(err) + } + + // Enqueue the eval and then dequeue + s1.evalBroker.Enqueue(eval1) + evalOut, token, err := s1.evalBroker.Dequeue([]string{eval1.Type}, time.Second) + if err != nil { + t.Fatalf("err: %v", err) + } + if evalOut != eval1 { + t.Fatalf("Bad eval") + } + + eval2 := evalOut.Copy() + + // Attempt to reblock eval + w := &Worker{srv: s1, logger: s1.logger, evalToken: token} + err = w.ReblockEval(eval2) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ack the eval + w.sendAck(evalOut.ID, token, true) + + // Check that it is blocked + bStats := s1.blockedEvals.Stats() + if bStats.TotalBlocked+bStats.TotalEscaped != 1 { + t.Fatalf("ReblockEval didn't insert eval into the blocked eval tracker: %#v", bStats) + } + + // Check that the snapshot index was set properly by unblocking the eval and + // then dequeuing. + s1.blockedEvals.Unblock("foobar", 1000) + + reblockedEval, _, err := s1.evalBroker.Dequeue([]string{eval1.Type}, 1*time.Second) + if err != nil { + t.Fatalf("err: %v", err) + } + if reblockedEval == nil { + t.Fatalf("Nil eval") + } + if reblockedEval.ID != eval1.ID { + t.Fatalf("Bad eval") + } + + // Check that the SnapshotIndex is set + if reblockedEval.SnapshotIndex != w.snapshotIndex { + t.Fatalf("incorrect snapshot index; got %d; want %d", + reblockedEval.SnapshotIndex, w.snapshotIndex) + } } diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 4f3bce9b6d46..9db02a8137be 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -114,7 +114,7 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) error { // Scheduling was tried but made no forward progress so create a // blocked eval to retry once resources become available. var mErr multierror.Error - if err := s.createBlockedEval(); err != nil { + if err := s.createBlockedEval(true); err != nil { mErr.Errors = append(mErr.Errors, err) } if err := setStatus(s.logger, s.planner, s.eval, s.nextEval, s.blocked, statusErr.EvalStatus, err.Error()); err != nil { @@ -125,12 +125,19 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) error { return err } + // If the current evaluation is a blocked evaluation and we didn't place + // everything, do not update the status to complete. + if s.eval.Status == structs.EvalStatusBlocked && len(s.eval.FailedTGAllocs) != 0 { + return s.planner.ReblockEval(s.eval) + } + // Update the status to complete return setStatus(s.logger, s.planner, s.eval, s.nextEval, s.blocked, structs.EvalStatusComplete, "") } -// createBlockedEval creates a blocked eval and stores it. -func (s *GenericScheduler) createBlockedEval() error { +// createBlockedEval creates a blocked eval and submits it to the planner. If +// failure is set to true, the eval's trigger reason reflects that. +func (s *GenericScheduler) createBlockedEval(planFailure bool) error { e := s.ctx.Eligibility() escaped := e.HasEscaped() @@ -141,6 +148,10 @@ func (s *GenericScheduler) createBlockedEval() error { } s.blocked = s.eval.CreateBlockedEval(classEligibility, escaped) + if planFailure { + s.blocked.TriggeredBy = structs.EvalTriggerMaxPlans + } + return s.planner.CreateEval(s.blocked) } @@ -177,9 +188,10 @@ func (s *GenericScheduler) process() (bool, error) { } // If there are failed allocations, we need to create a blocked evaluation - // to place the failed allocations when resources become available. - if len(s.eval.FailedTGAllocs) != 0 && s.blocked == nil { - if err := s.createBlockedEval(); err != nil { + // to place the failed allocations when resources become available. If the + // current evaluation is already a blocked eval, we reuse it. + if s.eval.Status != structs.EvalStatusBlocked && len(s.eval.FailedTGAllocs) != 0 && s.blocked == nil { + if err := s.createBlockedEval(false); err != nil { s.logger.Printf("[ERR] sched: %#v failed to make blocked eval: %v", s.eval, err) return false, err } diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index 52a46ff905cf..4b9f9f4024c2 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -275,7 +275,7 @@ func TestServiceSched_JobRegister_AllocFail(t *testing.T) { h.AssertEvalStatus(t, structs.EvalStatusComplete) } -func TestServiceSched_JobRegister_BlockedEval(t *testing.T) { +func TestServiceSched_JobRegister_CreateBlockedEval(t *testing.T) { h := NewHarness(t) // Create a full node @@ -454,6 +454,126 @@ func TestServiceSched_JobRegister_FeasibleAndInfeasibleTG(t *testing.T) { h.AssertEvalStatus(t, structs.EvalStatusComplete) } +func TestServiceSched_EvaluateBlockedEval(t *testing.T) { + h := NewHarness(t) + + // Create a job and set the task group count to zero. + job := mock.Job() + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + // Create a mock blocked evaluation + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Status: structs.EvalStatusBlocked, + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + } + + // Insert it into the state store + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) + + // Process the evaluation + err := h.Process(NewServiceScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure there was no plan + if len(h.Plans) != 0 { + t.Fatalf("bad: %#v", h.Plans) + } + + // Ensure that the eval was reblocked + if len(h.ReblockEvals) != 1 { + t.Fatalf("bad: %#v", h.ReblockEvals) + } + if h.ReblockEvals[0].ID != eval.ID { + t.Fatalf("expect same eval to be reblocked; got %q; want %q", h.ReblockEvals[0].ID, eval.ID) + } + + // Ensure the eval status was not updated + if len(h.Evals) != 0 { + t.Fatalf("Existing eval should not have status set") + } +} + +func TestServiceSched_EvaluateBlockedEval_Finished(t *testing.T) { + h := NewHarness(t) + + // Create some nodes + for i := 0; i < 10; i++ { + node := mock.Node() + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + } + + // Create a job and set the task group count to zero. + job := mock.Job() + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + // Create a mock blocked evaluation + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Status: structs.EvalStatusBlocked, + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + } + + // Insert it into the state store + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) + + // Process the evaluation + err := h.Process(NewServiceScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure a single plan + if len(h.Plans) != 1 { + t.Fatalf("bad: %#v", h.Plans) + } + plan := h.Plans[0] + + // Ensure the plan doesn't have annotations. + if plan.Annotations != nil { + t.Fatalf("expected no annotations") + } + + // Ensure the eval has no spawned blocked eval + if len(h.Evals) != 1 { + t.Fatalf("bad: %#v", h.Evals) + if h.Evals[0].BlockedEval != "" { + t.Fatalf("bad: %#v", h.Evals[0]) + } + } + + // Ensure the plan allocated + var planned []*structs.Allocation + for _, allocList := range plan.NodeAllocation { + planned = append(planned, allocList...) + } + if len(planned) != 10 { + t.Fatalf("bad: %#v", plan) + } + + // Lookup the allocations by JobID + out, err := h.State.AllocsByJob(job.ID) + noErr(t, err) + + // Ensure all allocations placed + if len(out) != 10 { + t.Fatalf("bad: %#v", out) + } + + // Ensure the eval was not reblocked + if len(h.ReblockEvals) != 0 { + t.Fatalf("Existing eval should not have been reblocked as it placed all allocations") + } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) +} + func TestServiceSched_JobModify(t *testing.T) { h := NewHarness(t) diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 518280d07112..d96c6aed7250 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -87,4 +87,10 @@ type Planner interface { // CreateEval is used to create an evaluation. This should set the // PreviousEval to that of the current evaluation. CreateEval(*structs.Evaluation) error + + // ReblockEval takes a blocked evaluation and re-inserts it into the blocked + // evaluation tracker. This update occurs only in-memory on the leader. The + // evaluation must exist in a blocked state prior to this being called such + // that on leader changes, the evaluation will be reblocked properly. + ReblockEval(*structs.Evaluation) error } diff --git a/scheduler/testing.go b/scheduler/testing.go index b7a4d7da1d1d..19cc0221fe1c 100644 --- a/scheduler/testing.go +++ b/scheduler/testing.go @@ -1,6 +1,7 @@ package scheduler import ( + "fmt" "log" "os" "sync" @@ -29,6 +30,10 @@ func (r *RejectPlan) CreateEval(*structs.Evaluation) error { return nil } +func (r *RejectPlan) ReblockEval(*structs.Evaluation) error { + return nil +} + // Harness is a lightweight testing harness for schedulers. It manages a state // store copy and provides the planner interface. It can be extended for various // testing uses or for invoking the scheduler without side effects. @@ -38,9 +43,10 @@ type Harness struct { Planner Planner planLock sync.Mutex - Plans []*structs.Plan - Evals []*structs.Evaluation - CreateEvals []*structs.Evaluation + Plans []*structs.Plan + Evals []*structs.Evaluation + CreateEvals []*structs.Evaluation + ReblockEvals []*structs.Evaluation nextIndex uint64 nextIndexLock sync.Mutex @@ -138,6 +144,28 @@ func (h *Harness) CreateEval(eval *structs.Evaluation) error { return nil } +func (h *Harness) ReblockEval(eval *structs.Evaluation) error { + // Ensure sequential plan application + h.planLock.Lock() + defer h.planLock.Unlock() + + // Check that the evaluation was already blocked. + old, err := h.State.EvalByID(eval.ID) + if err != nil { + return err + } + + if old == nil { + return fmt.Errorf("evaluation does not exist to be reblocked") + } + if old.Status != structs.EvalStatusBlocked { + return fmt.Errorf("evaluation %q is not already in a blocked state", old.ID) + } + + h.ReblockEvals = append(h.ReblockEvals, eval) + return nil +} + // NextIndex returns the next index func (h *Harness) NextIndex() uint64 { h.nextIndexLock.Lock()