From e3455a3f7d52b2bb02a88e5a42c8399f3226caee Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 4 Jan 2017 15:25:03 -0800 Subject: [PATCH 1/2] Cancel blocked evals upon successful one for job This PR causes blocked evaluations to be cancelled if there is a subsequent successful evaluation for the job. This fixes UX problems showing failed placements when there are not any in reality and makes GC possible for these jobs in certain cases. Fixes https://github.com/hashicorp/nomad/issues/2124 --- nomad/blocked_evals.go | 44 +++++++++++-- nomad/blocked_evals_test.go | 24 +++++++ nomad/fsm.go | 5 ++ nomad/fsm_test.go | 108 ++++++++++++++++++++++++++++++++ nomad/node_endpoint_test.go | 4 +- nomad/state/schema.go | 14 ++++- nomad/state/state_store.go | 41 ++++++++++-- nomad/state/state_store_test.go | 68 ++++++++++++++++++++ 8 files changed, 293 insertions(+), 15 deletions(-) diff --git a/nomad/blocked_evals.go b/nomad/blocked_evals.go index 623d81d57213..548968249fd2 100644 --- a/nomad/blocked_evals.go +++ b/nomad/blocked_evals.go @@ -39,8 +39,8 @@ type BlockedEvals struct { capacityChangeCh chan *capacityUpdate // jobs is the map of blocked job and is used to ensure that only one - // blocked eval exists for each job. - jobs map[string]struct{} + // blocked eval exists for each job. The value is the blocked evaluation ID. + jobs map[string]string // unblockIndexes maps computed node classes to the index in which they were // unblocked. This is used to check if an evaluation could have been @@ -91,7 +91,7 @@ func NewBlockedEvals(evalBroker *EvalBroker) *BlockedEvals { evalBroker: evalBroker, captured: make(map[string]wrappedEval), escaped: make(map[string]wrappedEval), - jobs: make(map[string]struct{}), + jobs: make(map[string]string), unblockIndexes: make(map[string]uint64), capacityChangeCh: make(chan *capacityUpdate, unblockBuffer), duplicateCh: make(chan struct{}, 1), @@ -183,7 +183,7 @@ func (b *BlockedEvals) processBlock(eval *structs.Evaluation, token string) { // Mark the job as tracked. b.stats.TotalBlocked++ - b.jobs[eval.JobID] = struct{}{} + b.jobs[eval.JobID] = eval.ID // Wrap the evaluation, capturing its token. wrapped := wrappedEval{ @@ -244,6 +244,40 @@ func (b *BlockedEvals) missedUnblock(eval *structs.Evaluation) bool { return false } +// Untrack causes any blocked evaluation for the passed job to be no longer +// tracked. Untrack is called when there is a successful evaluation for the job +// and a blocked evaluation is no longer needed. +func (b *BlockedEvals) Untrack(jobID string) { + b.l.Lock() + defer b.l.Unlock() + + // Do nothing if not enabled + if !b.enabled { + return + } + + // Get the evaluation ID to cancel + evalID, ok := b.jobs[jobID] + if !ok { + // No blocked evaluation so exit + return + } + + // Attempt to delete the evaluation + if w, ok := b.captured[evalID]; ok { + delete(b.jobs, w.eval.JobID) + delete(b.captured, evalID) + b.stats.TotalBlocked-- + } + + if w, ok := b.escaped[evalID]; ok { + delete(b.jobs, w.eval.JobID) + delete(b.escaped, evalID) + b.stats.TotalEscaped-- + b.stats.TotalBlocked-- + } +} + // Unblock causes any evaluation that could potentially make progress on a // capacity change on the passed computed node class to be enqueued into the // eval broker. @@ -410,7 +444,7 @@ func (b *BlockedEvals) Flush() { b.stats.TotalBlocked = 0 b.captured = make(map[string]wrappedEval) b.escaped = make(map[string]wrappedEval) - b.jobs = make(map[string]struct{}) + b.jobs = make(map[string]string) b.duplicates = nil b.capacityChangeCh = make(chan *capacityUpdate, unblockBuffer) b.stopCh = make(chan struct{}) diff --git a/nomad/blocked_evals_test.go b/nomad/blocked_evals_test.go index 646b24d809d6..2a3192e9a019 100644 --- a/nomad/blocked_evals_test.go +++ b/nomad/blocked_evals_test.go @@ -484,3 +484,27 @@ func TestBlockedEvals_UnblockFailed(t *testing.T) { t.Fatalf("bad: %#v", blockedStats) } } + +func TestBlockedEvals_Untrack(t *testing.T) { + blocked, _ := testBlockedEvals(t) + + // Create two blocked evals and add them to the blocked tracker. + e := mock.Eval() + e.Status = structs.EvalStatusBlocked + e.ClassEligibility = map[string]bool{"v1:123": false, "v1:456": false} + e.SnapshotIndex = 1000 + blocked.Block(e) + + // Verify block did track + bStats := blocked.Stats() + if bStats.TotalBlocked != 1 || bStats.TotalEscaped != 0 { + t.Fatalf("bad: %#v", bStats) + } + + // Untrack and verify + blocked.Untrack(e.JobID) + bStats = blocked.Stats() + if bStats.TotalBlocked != 0 || bStats.TotalEscaped != 0 { + t.Fatalf("bad: %#v", bStats) + } +} diff --git a/nomad/fsm.go b/nomad/fsm.go index 695a95fa591b..01deb5f33cd0 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -350,6 +350,11 @@ func (n *nomadFSM) applyUpdateEval(buf []byte, index uint64) interface{} { n.evalBroker.Enqueue(eval) } else if eval.ShouldBlock() { n.blockedEvals.Block(eval) + } else if eval.Status == structs.EvalStatusComplete && + len(eval.FailedTGAllocs) == 0 { + // If we have a successful evaluation for a node, untrack any + // blocked evaluation + n.blockedEvals.Untrack(eval.JobID) } } return nil diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 90cebdc48b16..1883a6c97436 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -445,6 +445,114 @@ func TestFSM_UpdateEval_Blocked(t *testing.T) { } } +func TestFSM_UpdateEval_Untrack(t *testing.T) { + fsm := testFSM(t) + fsm.evalBroker.SetEnabled(true) + fsm.blockedEvals.SetEnabled(true) + + // Mark an eval as blocked. + bEval := mock.Eval() + bEval.ClassEligibility = map[string]bool{"v1:123": true} + fsm.blockedEvals.Block(bEval) + + // Create a successful eval for the same job + eval := mock.Eval() + eval.JobID = bEval.JobID + eval.Status = structs.EvalStatusComplete + + req := structs.EvalUpdateRequest{ + Evals: []*structs.Evaluation{eval}, + } + buf, err := structs.Encode(structs.EvalUpdateRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + // Verify we are registered + out, err := fsm.State().EvalByID(eval.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if out == nil { + t.Fatalf("not found!") + } + if out.CreateIndex != 1 { + t.Fatalf("bad index: %d", out.CreateIndex) + } + + // Verify the eval wasn't enqueued + stats := fsm.evalBroker.Stats() + if stats.TotalReady != 0 { + t.Fatalf("bad: %#v %#v", stats, out) + } + + // Verify the eval was untracked in the blocked tracker. + bStats := fsm.blockedEvals.Stats() + if bStats.TotalBlocked != 0 { + t.Fatalf("bad: %#v %#v", bStats, out) + } +} + +func TestFSM_UpdateEval_NoUntrack(t *testing.T) { + fsm := testFSM(t) + fsm.evalBroker.SetEnabled(true) + fsm.blockedEvals.SetEnabled(true) + + // Mark an eval as blocked. + bEval := mock.Eval() + bEval.ClassEligibility = map[string]bool{"v1:123": true} + fsm.blockedEvals.Block(bEval) + + // Create a successful eval for the same job but with placement failures + eval := mock.Eval() + eval.JobID = bEval.JobID + eval.Status = structs.EvalStatusComplete + eval.FailedTGAllocs = make(map[string]*structs.AllocMetric) + eval.FailedTGAllocs["test"] = new(structs.AllocMetric) + + req := structs.EvalUpdateRequest{ + Evals: []*structs.Evaluation{eval}, + } + buf, err := structs.Encode(structs.EvalUpdateRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + // Verify we are registered + out, err := fsm.State().EvalByID(eval.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if out == nil { + t.Fatalf("not found!") + } + if out.CreateIndex != 1 { + t.Fatalf("bad index: %d", out.CreateIndex) + } + + // Verify the eval wasn't enqueued + stats := fsm.evalBroker.Stats() + if stats.TotalReady != 0 { + t.Fatalf("bad: %#v %#v", stats, out) + } + + // Verify the eval was not untracked in the blocked tracker. + bStats := fsm.blockedEvals.Stats() + if bStats.TotalBlocked != 1 { + t.Fatalf("bad: %#v %#v", bStats, out) + } +} + func TestFSM_DeleteEval(t *testing.T) { fsm := testFSM(t) diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index d1882824b171..3f59ceb6dd7f 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -683,11 +683,11 @@ func TestClientEndpoint_Drain_Down(t *testing.T) { // Wait for the scheduler to create an allocation testutil.WaitForResult(func() (bool, error) { - allocs, err := s1.fsm.state.AllocsByJob(job.ID) + allocs, err := s1.fsm.state.AllocsByJob(job.ID, true) if err != nil { return false, err } - allocs1, err := s1.fsm.state.AllocsByJob(job1.ID) + allocs1, err := s1.fsm.state.AllocsByJob(job1.ID, true) if err != nil { return false, err } diff --git a/nomad/state/schema.go b/nomad/state/schema.go index 05ffd78cfe2d..aed0e7ab39f0 100644 --- a/nomad/state/schema.go +++ b/nomad/state/schema.go @@ -214,9 +214,17 @@ func evalTableSchema() *memdb.TableSchema { Name: "job", AllowMissing: false, Unique: false, - Indexer: &memdb.StringFieldIndex{ - Field: "JobID", - Lowercase: true, + Indexer: &memdb.CompoundIndex{ + Indexes: []memdb.Indexer{ + &memdb.StringFieldIndex{ + Field: "JobID", + Lowercase: true, + }, + &memdb.StringFieldIndex{ + Field: "Status", + Lowercase: true, + }, + }, }, }, }, diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 671222ef8f1a..d0800a871049 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -626,7 +626,7 @@ func (s *StateStore) PeriodicLaunches() (memdb.ResultIterator, error) { return iter, nil } -// UpsertEvaluation is used to upsert an evaluation +// UpsertEvals is used to upsert a set of evaluations func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) error { txn := s.db.Txn(true) defer txn.Abort() @@ -639,7 +639,7 @@ func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) erro for _, eval := range evals { watcher.Add(watch.Item{Eval: eval.ID}) watcher.Add(watch.Item{EvalJob: eval.JobID}) - if err := s.nestedUpsertEval(txn, index, eval); err != nil { + if err := s.nestedUpsertEval(txn, watcher, index, eval); err != nil { return err } @@ -657,7 +657,7 @@ func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) erro } // nestedUpsertEvaluation is used to nest an evaluation upsert within a transaction -func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, index uint64, eval *structs.Evaluation) error { +func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, watcher watch.Items, index uint64, eval *structs.Evaluation) error { // Lookup the evaluation existing, err := txn.First("evals", "id", eval.ID) if err != nil { @@ -705,6 +705,37 @@ func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, index uint64, eval *struct } } + // Check if the job has any blocked evaluations and cancel them + if eval.Status == structs.EvalStatusComplete && len(eval.FailedTGAllocs) == 0 { + // Get the blocked evaluation for a job if it exists + iter, err := txn.Get("evals", "job", eval.JobID, structs.EvalStatusBlocked) + if err != nil { + return fmt.Errorf("failed to get blocked evals for job %q", eval.JobID, err) + } + + var blocked []*structs.Evaluation + for { + raw := iter.Next() + if raw == nil { + break + } + blocked = append(blocked, raw.(*structs.Evaluation)) + } + + // Go through and update the evals + for _, eval := range blocked { + newEval := eval.Copy() + newEval.Status = structs.EvalStatusCancelled + newEval.StatusDescription = fmt.Sprintf("evaluation %q successful", newEval.ID) + newEval.ModifyIndex = index + if err := txn.Insert("evals", newEval); err != nil { + return fmt.Errorf("eval insert failed: %v", err) + } + + watcher.Add(watch.Item{Eval: newEval.ID}) + } + } + // Insert the eval if err := txn.Insert("evals", eval); err != nil { return fmt.Errorf("eval insert failed: %v", err) @@ -809,7 +840,7 @@ func (s *StateStore) EvalsByJob(jobID string) ([]*structs.Evaluation, error) { txn := s.db.Txn(false) // Get an iterator over the node allocations - iter, err := txn.Get("evals", "job", jobID) + iter, err := txn.Get("evals", "job_prefix", jobID) if err != nil { return nil, err } @@ -1490,7 +1521,7 @@ func (s *StateStore) getJobStatus(txn *memdb.Txn, job *structs.Job, evalDelete b } } - evals, err := txn.Get("evals", "job", job.ID) + evals, err := txn.Get("evals", "job_prefix", job.ID) if err != nil { return "", err } diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index e8241b15bea2..071f1519547c 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -1255,6 +1255,74 @@ func TestStateStore_UpsertEvals_Eval(t *testing.T) { notify.verify(t) } +func TestStateStore_UpsertEvals_CancelBlocked(t *testing.T) { + state := testStateStore(t) + + // Create two blocked evals for the same job + j := "test-job" + b1, b2 := mock.Eval(), mock.Eval() + b1.JobID = j + b1.Status = structs.EvalStatusBlocked + b2.JobID = j + b2.Status = structs.EvalStatusBlocked + + err := state.UpsertEvals(999, []*structs.Evaluation{b1, b2}) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Create one complete and successful eval for the job + eval := mock.Eval() + eval.JobID = j + eval.Status = structs.EvalStatusComplete + + notify := setupNotifyTest( + state, + watch.Item{Table: "evals"}, + watch.Item{Eval: b1.ID}, + watch.Item{Eval: b2.ID}, + watch.Item{Eval: eval.ID}, + watch.Item{EvalJob: eval.JobID}) + + if err := state.UpsertEvals(1000, []*structs.Evaluation{eval}); err != nil { + t.Fatalf("err: %v", err) + } + + out, err := state.EvalByID(eval.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + + if !reflect.DeepEqual(eval, out) { + t.Fatalf("bad: %#v %#v", eval, out) + } + + index, err := state.Index("evals") + if err != nil { + t.Fatalf("err: %v", err) + } + if index != 1000 { + t.Fatalf("bad: %d", index) + } + + // Get b1/b2 and check they are cancelled + out1, err := state.EvalByID(b1.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + + out2, err := state.EvalByID(b2.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + + if out1.Status != structs.EvalStatusCancelled || out2.Status != structs.EvalStatusCancelled { + t.Fatalf("bad: %#v %#v", out1, out2) + } + + notify.verify(t) +} + func TestStateStore_Update_UpsertEvals_Eval(t *testing.T) { state := testStateStore(t) eval := mock.Eval() From 7ef228aeaf561e18dfb0759a7798cd566864ee44 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 4 Jan 2017 15:25:03 -0800 Subject: [PATCH 2/2] Cancel blocked evals upon successful one for job This PR causes blocked evaluations to be cancelled if there is a subsequent successful evaluation for the job. This fixes UX problems showing failed placements when there are not any in reality and makes GC possible for these jobs in certain cases. Fixes https://github.com/hashicorp/nomad/issues/2124 --- nomad/blocked_evals.go | 44 +++++++++++-- nomad/blocked_evals_test.go | 24 +++++++ nomad/fsm.go | 5 ++ nomad/fsm_test.go | 108 ++++++++++++++++++++++++++++++++ nomad/state/schema.go | 14 ++++- nomad/state/state_store.go | 41 ++++++++++-- nomad/state/state_store_test.go | 68 ++++++++++++++++++++ 7 files changed, 291 insertions(+), 13 deletions(-) diff --git a/nomad/blocked_evals.go b/nomad/blocked_evals.go index 623d81d57213..548968249fd2 100644 --- a/nomad/blocked_evals.go +++ b/nomad/blocked_evals.go @@ -39,8 +39,8 @@ type BlockedEvals struct { capacityChangeCh chan *capacityUpdate // jobs is the map of blocked job and is used to ensure that only one - // blocked eval exists for each job. - jobs map[string]struct{} + // blocked eval exists for each job. The value is the blocked evaluation ID. + jobs map[string]string // unblockIndexes maps computed node classes to the index in which they were // unblocked. This is used to check if an evaluation could have been @@ -91,7 +91,7 @@ func NewBlockedEvals(evalBroker *EvalBroker) *BlockedEvals { evalBroker: evalBroker, captured: make(map[string]wrappedEval), escaped: make(map[string]wrappedEval), - jobs: make(map[string]struct{}), + jobs: make(map[string]string), unblockIndexes: make(map[string]uint64), capacityChangeCh: make(chan *capacityUpdate, unblockBuffer), duplicateCh: make(chan struct{}, 1), @@ -183,7 +183,7 @@ func (b *BlockedEvals) processBlock(eval *structs.Evaluation, token string) { // Mark the job as tracked. b.stats.TotalBlocked++ - b.jobs[eval.JobID] = struct{}{} + b.jobs[eval.JobID] = eval.ID // Wrap the evaluation, capturing its token. wrapped := wrappedEval{ @@ -244,6 +244,40 @@ func (b *BlockedEvals) missedUnblock(eval *structs.Evaluation) bool { return false } +// Untrack causes any blocked evaluation for the passed job to be no longer +// tracked. Untrack is called when there is a successful evaluation for the job +// and a blocked evaluation is no longer needed. +func (b *BlockedEvals) Untrack(jobID string) { + b.l.Lock() + defer b.l.Unlock() + + // Do nothing if not enabled + if !b.enabled { + return + } + + // Get the evaluation ID to cancel + evalID, ok := b.jobs[jobID] + if !ok { + // No blocked evaluation so exit + return + } + + // Attempt to delete the evaluation + if w, ok := b.captured[evalID]; ok { + delete(b.jobs, w.eval.JobID) + delete(b.captured, evalID) + b.stats.TotalBlocked-- + } + + if w, ok := b.escaped[evalID]; ok { + delete(b.jobs, w.eval.JobID) + delete(b.escaped, evalID) + b.stats.TotalEscaped-- + b.stats.TotalBlocked-- + } +} + // Unblock causes any evaluation that could potentially make progress on a // capacity change on the passed computed node class to be enqueued into the // eval broker. @@ -410,7 +444,7 @@ func (b *BlockedEvals) Flush() { b.stats.TotalBlocked = 0 b.captured = make(map[string]wrappedEval) b.escaped = make(map[string]wrappedEval) - b.jobs = make(map[string]struct{}) + b.jobs = make(map[string]string) b.duplicates = nil b.capacityChangeCh = make(chan *capacityUpdate, unblockBuffer) b.stopCh = make(chan struct{}) diff --git a/nomad/blocked_evals_test.go b/nomad/blocked_evals_test.go index 646b24d809d6..2a3192e9a019 100644 --- a/nomad/blocked_evals_test.go +++ b/nomad/blocked_evals_test.go @@ -484,3 +484,27 @@ func TestBlockedEvals_UnblockFailed(t *testing.T) { t.Fatalf("bad: %#v", blockedStats) } } + +func TestBlockedEvals_Untrack(t *testing.T) { + blocked, _ := testBlockedEvals(t) + + // Create two blocked evals and add them to the blocked tracker. + e := mock.Eval() + e.Status = structs.EvalStatusBlocked + e.ClassEligibility = map[string]bool{"v1:123": false, "v1:456": false} + e.SnapshotIndex = 1000 + blocked.Block(e) + + // Verify block did track + bStats := blocked.Stats() + if bStats.TotalBlocked != 1 || bStats.TotalEscaped != 0 { + t.Fatalf("bad: %#v", bStats) + } + + // Untrack and verify + blocked.Untrack(e.JobID) + bStats = blocked.Stats() + if bStats.TotalBlocked != 0 || bStats.TotalEscaped != 0 { + t.Fatalf("bad: %#v", bStats) + } +} diff --git a/nomad/fsm.go b/nomad/fsm.go index 695a95fa591b..01deb5f33cd0 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -350,6 +350,11 @@ func (n *nomadFSM) applyUpdateEval(buf []byte, index uint64) interface{} { n.evalBroker.Enqueue(eval) } else if eval.ShouldBlock() { n.blockedEvals.Block(eval) + } else if eval.Status == structs.EvalStatusComplete && + len(eval.FailedTGAllocs) == 0 { + // If we have a successful evaluation for a node, untrack any + // blocked evaluation + n.blockedEvals.Untrack(eval.JobID) } } return nil diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 90cebdc48b16..1883a6c97436 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -445,6 +445,114 @@ func TestFSM_UpdateEval_Blocked(t *testing.T) { } } +func TestFSM_UpdateEval_Untrack(t *testing.T) { + fsm := testFSM(t) + fsm.evalBroker.SetEnabled(true) + fsm.blockedEvals.SetEnabled(true) + + // Mark an eval as blocked. + bEval := mock.Eval() + bEval.ClassEligibility = map[string]bool{"v1:123": true} + fsm.blockedEvals.Block(bEval) + + // Create a successful eval for the same job + eval := mock.Eval() + eval.JobID = bEval.JobID + eval.Status = structs.EvalStatusComplete + + req := structs.EvalUpdateRequest{ + Evals: []*structs.Evaluation{eval}, + } + buf, err := structs.Encode(structs.EvalUpdateRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + // Verify we are registered + out, err := fsm.State().EvalByID(eval.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if out == nil { + t.Fatalf("not found!") + } + if out.CreateIndex != 1 { + t.Fatalf("bad index: %d", out.CreateIndex) + } + + // Verify the eval wasn't enqueued + stats := fsm.evalBroker.Stats() + if stats.TotalReady != 0 { + t.Fatalf("bad: %#v %#v", stats, out) + } + + // Verify the eval was untracked in the blocked tracker. + bStats := fsm.blockedEvals.Stats() + if bStats.TotalBlocked != 0 { + t.Fatalf("bad: %#v %#v", bStats, out) + } +} + +func TestFSM_UpdateEval_NoUntrack(t *testing.T) { + fsm := testFSM(t) + fsm.evalBroker.SetEnabled(true) + fsm.blockedEvals.SetEnabled(true) + + // Mark an eval as blocked. + bEval := mock.Eval() + bEval.ClassEligibility = map[string]bool{"v1:123": true} + fsm.blockedEvals.Block(bEval) + + // Create a successful eval for the same job but with placement failures + eval := mock.Eval() + eval.JobID = bEval.JobID + eval.Status = structs.EvalStatusComplete + eval.FailedTGAllocs = make(map[string]*structs.AllocMetric) + eval.FailedTGAllocs["test"] = new(structs.AllocMetric) + + req := structs.EvalUpdateRequest{ + Evals: []*structs.Evaluation{eval}, + } + buf, err := structs.Encode(structs.EvalUpdateRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + // Verify we are registered + out, err := fsm.State().EvalByID(eval.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if out == nil { + t.Fatalf("not found!") + } + if out.CreateIndex != 1 { + t.Fatalf("bad index: %d", out.CreateIndex) + } + + // Verify the eval wasn't enqueued + stats := fsm.evalBroker.Stats() + if stats.TotalReady != 0 { + t.Fatalf("bad: %#v %#v", stats, out) + } + + // Verify the eval was not untracked in the blocked tracker. + bStats := fsm.blockedEvals.Stats() + if bStats.TotalBlocked != 1 { + t.Fatalf("bad: %#v %#v", bStats, out) + } +} + func TestFSM_DeleteEval(t *testing.T) { fsm := testFSM(t) diff --git a/nomad/state/schema.go b/nomad/state/schema.go index 05ffd78cfe2d..aed0e7ab39f0 100644 --- a/nomad/state/schema.go +++ b/nomad/state/schema.go @@ -214,9 +214,17 @@ func evalTableSchema() *memdb.TableSchema { Name: "job", AllowMissing: false, Unique: false, - Indexer: &memdb.StringFieldIndex{ - Field: "JobID", - Lowercase: true, + Indexer: &memdb.CompoundIndex{ + Indexes: []memdb.Indexer{ + &memdb.StringFieldIndex{ + Field: "JobID", + Lowercase: true, + }, + &memdb.StringFieldIndex{ + Field: "Status", + Lowercase: true, + }, + }, }, }, }, diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 671222ef8f1a..d0800a871049 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -626,7 +626,7 @@ func (s *StateStore) PeriodicLaunches() (memdb.ResultIterator, error) { return iter, nil } -// UpsertEvaluation is used to upsert an evaluation +// UpsertEvals is used to upsert a set of evaluations func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) error { txn := s.db.Txn(true) defer txn.Abort() @@ -639,7 +639,7 @@ func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) erro for _, eval := range evals { watcher.Add(watch.Item{Eval: eval.ID}) watcher.Add(watch.Item{EvalJob: eval.JobID}) - if err := s.nestedUpsertEval(txn, index, eval); err != nil { + if err := s.nestedUpsertEval(txn, watcher, index, eval); err != nil { return err } @@ -657,7 +657,7 @@ func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) erro } // nestedUpsertEvaluation is used to nest an evaluation upsert within a transaction -func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, index uint64, eval *structs.Evaluation) error { +func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, watcher watch.Items, index uint64, eval *structs.Evaluation) error { // Lookup the evaluation existing, err := txn.First("evals", "id", eval.ID) if err != nil { @@ -705,6 +705,37 @@ func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, index uint64, eval *struct } } + // Check if the job has any blocked evaluations and cancel them + if eval.Status == structs.EvalStatusComplete && len(eval.FailedTGAllocs) == 0 { + // Get the blocked evaluation for a job if it exists + iter, err := txn.Get("evals", "job", eval.JobID, structs.EvalStatusBlocked) + if err != nil { + return fmt.Errorf("failed to get blocked evals for job %q", eval.JobID, err) + } + + var blocked []*structs.Evaluation + for { + raw := iter.Next() + if raw == nil { + break + } + blocked = append(blocked, raw.(*structs.Evaluation)) + } + + // Go through and update the evals + for _, eval := range blocked { + newEval := eval.Copy() + newEval.Status = structs.EvalStatusCancelled + newEval.StatusDescription = fmt.Sprintf("evaluation %q successful", newEval.ID) + newEval.ModifyIndex = index + if err := txn.Insert("evals", newEval); err != nil { + return fmt.Errorf("eval insert failed: %v", err) + } + + watcher.Add(watch.Item{Eval: newEval.ID}) + } + } + // Insert the eval if err := txn.Insert("evals", eval); err != nil { return fmt.Errorf("eval insert failed: %v", err) @@ -809,7 +840,7 @@ func (s *StateStore) EvalsByJob(jobID string) ([]*structs.Evaluation, error) { txn := s.db.Txn(false) // Get an iterator over the node allocations - iter, err := txn.Get("evals", "job", jobID) + iter, err := txn.Get("evals", "job_prefix", jobID) if err != nil { return nil, err } @@ -1490,7 +1521,7 @@ func (s *StateStore) getJobStatus(txn *memdb.Txn, job *structs.Job, evalDelete b } } - evals, err := txn.Get("evals", "job", job.ID) + evals, err := txn.Get("evals", "job_prefix", job.ID) if err != nil { return "", err } diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index e8241b15bea2..071f1519547c 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -1255,6 +1255,74 @@ func TestStateStore_UpsertEvals_Eval(t *testing.T) { notify.verify(t) } +func TestStateStore_UpsertEvals_CancelBlocked(t *testing.T) { + state := testStateStore(t) + + // Create two blocked evals for the same job + j := "test-job" + b1, b2 := mock.Eval(), mock.Eval() + b1.JobID = j + b1.Status = structs.EvalStatusBlocked + b2.JobID = j + b2.Status = structs.EvalStatusBlocked + + err := state.UpsertEvals(999, []*structs.Evaluation{b1, b2}) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Create one complete and successful eval for the job + eval := mock.Eval() + eval.JobID = j + eval.Status = structs.EvalStatusComplete + + notify := setupNotifyTest( + state, + watch.Item{Table: "evals"}, + watch.Item{Eval: b1.ID}, + watch.Item{Eval: b2.ID}, + watch.Item{Eval: eval.ID}, + watch.Item{EvalJob: eval.JobID}) + + if err := state.UpsertEvals(1000, []*structs.Evaluation{eval}); err != nil { + t.Fatalf("err: %v", err) + } + + out, err := state.EvalByID(eval.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + + if !reflect.DeepEqual(eval, out) { + t.Fatalf("bad: %#v %#v", eval, out) + } + + index, err := state.Index("evals") + if err != nil { + t.Fatalf("err: %v", err) + } + if index != 1000 { + t.Fatalf("bad: %d", index) + } + + // Get b1/b2 and check they are cancelled + out1, err := state.EvalByID(b1.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + + out2, err := state.EvalByID(b2.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + + if out1.Status != structs.EvalStatusCancelled || out2.Status != structs.EvalStatusCancelled { + t.Fatalf("bad: %#v %#v", out1, out2) + } + + notify.verify(t) +} + func TestStateStore_Update_UpsertEvals_Eval(t *testing.T) { state := testStateStore(t) eval := mock.Eval()