Skip to content

Commit

Permalink
Cancel blocked evals upon successful one for job
Browse files Browse the repository at this point in the history
This PR causes blocked evaluations to be cancelled if there is a
subsequent successful evaluation for the job. This fixes UX problems
showing failed placements when there are not any in reality and makes GC
possible for these jobs in certain cases.

Fixes #2124
  • Loading branch information
dadgar committed Jan 5, 2017
1 parent cd04bc5 commit 7ef228a
Show file tree
Hide file tree
Showing 7 changed files with 291 additions and 13 deletions.
44 changes: 39 additions & 5 deletions nomad/blocked_evals.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ type BlockedEvals struct {
capacityChangeCh chan *capacityUpdate

// jobs is the map of blocked job and is used to ensure that only one
// blocked eval exists for each job.
jobs map[string]struct{}
// blocked eval exists for each job. The value is the blocked evaluation ID.
jobs map[string]string

// unblockIndexes maps computed node classes to the index in which they were
// unblocked. This is used to check if an evaluation could have been
Expand Down Expand Up @@ -91,7 +91,7 @@ func NewBlockedEvals(evalBroker *EvalBroker) *BlockedEvals {
evalBroker: evalBroker,
captured: make(map[string]wrappedEval),
escaped: make(map[string]wrappedEval),
jobs: make(map[string]struct{}),
jobs: make(map[string]string),
unblockIndexes: make(map[string]uint64),
capacityChangeCh: make(chan *capacityUpdate, unblockBuffer),
duplicateCh: make(chan struct{}, 1),
Expand Down Expand Up @@ -183,7 +183,7 @@ func (b *BlockedEvals) processBlock(eval *structs.Evaluation, token string) {

// Mark the job as tracked.
b.stats.TotalBlocked++
b.jobs[eval.JobID] = struct{}{}
b.jobs[eval.JobID] = eval.ID

// Wrap the evaluation, capturing its token.
wrapped := wrappedEval{
Expand Down Expand Up @@ -244,6 +244,40 @@ func (b *BlockedEvals) missedUnblock(eval *structs.Evaluation) bool {
return false
}

// Untrack causes any blocked evaluation for the passed job to be no longer
// tracked. Untrack is called when there is a successful evaluation for the job
// and a blocked evaluation is no longer needed.
func (b *BlockedEvals) Untrack(jobID string) {
b.l.Lock()
defer b.l.Unlock()

// Do nothing if not enabled
if !b.enabled {
return
}

// Get the evaluation ID to cancel
evalID, ok := b.jobs[jobID]
if !ok {
// No blocked evaluation so exit
return
}

// Attempt to delete the evaluation
if w, ok := b.captured[evalID]; ok {
delete(b.jobs, w.eval.JobID)
delete(b.captured, evalID)
b.stats.TotalBlocked--
}

if w, ok := b.escaped[evalID]; ok {
delete(b.jobs, w.eval.JobID)
delete(b.escaped, evalID)
b.stats.TotalEscaped--
b.stats.TotalBlocked--
}
}

// Unblock causes any evaluation that could potentially make progress on a
// capacity change on the passed computed node class to be enqueued into the
// eval broker.
Expand Down Expand Up @@ -410,7 +444,7 @@ func (b *BlockedEvals) Flush() {
b.stats.TotalBlocked = 0
b.captured = make(map[string]wrappedEval)
b.escaped = make(map[string]wrappedEval)
b.jobs = make(map[string]struct{})
b.jobs = make(map[string]string)
b.duplicates = nil
b.capacityChangeCh = make(chan *capacityUpdate, unblockBuffer)
b.stopCh = make(chan struct{})
Expand Down
24 changes: 24 additions & 0 deletions nomad/blocked_evals_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,3 +484,27 @@ func TestBlockedEvals_UnblockFailed(t *testing.T) {
t.Fatalf("bad: %#v", blockedStats)
}
}

func TestBlockedEvals_Untrack(t *testing.T) {
blocked, _ := testBlockedEvals(t)

// Create two blocked evals and add them to the blocked tracker.
e := mock.Eval()
e.Status = structs.EvalStatusBlocked
e.ClassEligibility = map[string]bool{"v1:123": false, "v1:456": false}
e.SnapshotIndex = 1000
blocked.Block(e)

// Verify block did track
bStats := blocked.Stats()
if bStats.TotalBlocked != 1 || bStats.TotalEscaped != 0 {
t.Fatalf("bad: %#v", bStats)
}

// Untrack and verify
blocked.Untrack(e.JobID)
bStats = blocked.Stats()
if bStats.TotalBlocked != 0 || bStats.TotalEscaped != 0 {
t.Fatalf("bad: %#v", bStats)
}
}
5 changes: 5 additions & 0 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,11 @@ func (n *nomadFSM) applyUpdateEval(buf []byte, index uint64) interface{} {
n.evalBroker.Enqueue(eval)
} else if eval.ShouldBlock() {
n.blockedEvals.Block(eval)
} else if eval.Status == structs.EvalStatusComplete &&
len(eval.FailedTGAllocs) == 0 {
// If we have a successful evaluation for a node, untrack any
// blocked evaluation
n.blockedEvals.Untrack(eval.JobID)
}
}
return nil
Expand Down
108 changes: 108 additions & 0 deletions nomad/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,114 @@ func TestFSM_UpdateEval_Blocked(t *testing.T) {
}
}

func TestFSM_UpdateEval_Untrack(t *testing.T) {
fsm := testFSM(t)
fsm.evalBroker.SetEnabled(true)
fsm.blockedEvals.SetEnabled(true)

// Mark an eval as blocked.
bEval := mock.Eval()
bEval.ClassEligibility = map[string]bool{"v1:123": true}
fsm.blockedEvals.Block(bEval)

// Create a successful eval for the same job
eval := mock.Eval()
eval.JobID = bEval.JobID
eval.Status = structs.EvalStatusComplete

req := structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
}
buf, err := structs.Encode(structs.EvalUpdateRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}

resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}

// Verify we are registered
out, err := fsm.State().EvalByID(eval.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("not found!")
}
if out.CreateIndex != 1 {
t.Fatalf("bad index: %d", out.CreateIndex)
}

// Verify the eval wasn't enqueued
stats := fsm.evalBroker.Stats()
if stats.TotalReady != 0 {
t.Fatalf("bad: %#v %#v", stats, out)
}

// Verify the eval was untracked in the blocked tracker.
bStats := fsm.blockedEvals.Stats()
if bStats.TotalBlocked != 0 {
t.Fatalf("bad: %#v %#v", bStats, out)
}
}

func TestFSM_UpdateEval_NoUntrack(t *testing.T) {
fsm := testFSM(t)
fsm.evalBroker.SetEnabled(true)
fsm.blockedEvals.SetEnabled(true)

// Mark an eval as blocked.
bEval := mock.Eval()
bEval.ClassEligibility = map[string]bool{"v1:123": true}
fsm.blockedEvals.Block(bEval)

// Create a successful eval for the same job but with placement failures
eval := mock.Eval()
eval.JobID = bEval.JobID
eval.Status = structs.EvalStatusComplete
eval.FailedTGAllocs = make(map[string]*structs.AllocMetric)
eval.FailedTGAllocs["test"] = new(structs.AllocMetric)

req := structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
}
buf, err := structs.Encode(structs.EvalUpdateRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}

resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}

// Verify we are registered
out, err := fsm.State().EvalByID(eval.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("not found!")
}
if out.CreateIndex != 1 {
t.Fatalf("bad index: %d", out.CreateIndex)
}

// Verify the eval wasn't enqueued
stats := fsm.evalBroker.Stats()
if stats.TotalReady != 0 {
t.Fatalf("bad: %#v %#v", stats, out)
}

// Verify the eval was not untracked in the blocked tracker.
bStats := fsm.blockedEvals.Stats()
if bStats.TotalBlocked != 1 {
t.Fatalf("bad: %#v %#v", bStats, out)
}
}

func TestFSM_DeleteEval(t *testing.T) {
fsm := testFSM(t)

Expand Down
14 changes: 11 additions & 3 deletions nomad/state/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,17 @@ func evalTableSchema() *memdb.TableSchema {
Name: "job",
AllowMissing: false,
Unique: false,
Indexer: &memdb.StringFieldIndex{
Field: "JobID",
Lowercase: true,
Indexer: &memdb.CompoundIndex{
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{
Field: "JobID",
Lowercase: true,
},
&memdb.StringFieldIndex{
Field: "Status",
Lowercase: true,
},
},
},
},
},
Expand Down
41 changes: 36 additions & 5 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ func (s *StateStore) PeriodicLaunches() (memdb.ResultIterator, error) {
return iter, nil
}

// UpsertEvaluation is used to upsert an evaluation
// UpsertEvals is used to upsert a set of evaluations
func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) error {
txn := s.db.Txn(true)
defer txn.Abort()
Expand All @@ -639,7 +639,7 @@ func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) erro
for _, eval := range evals {
watcher.Add(watch.Item{Eval: eval.ID})
watcher.Add(watch.Item{EvalJob: eval.JobID})
if err := s.nestedUpsertEval(txn, index, eval); err != nil {
if err := s.nestedUpsertEval(txn, watcher, index, eval); err != nil {
return err
}

Expand All @@ -657,7 +657,7 @@ func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) erro
}

// nestedUpsertEvaluation is used to nest an evaluation upsert within a transaction
func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, index uint64, eval *structs.Evaluation) error {
func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, watcher watch.Items, index uint64, eval *structs.Evaluation) error {
// Lookup the evaluation
existing, err := txn.First("evals", "id", eval.ID)
if err != nil {
Expand Down Expand Up @@ -705,6 +705,37 @@ func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, index uint64, eval *struct
}
}

// Check if the job has any blocked evaluations and cancel them
if eval.Status == structs.EvalStatusComplete && len(eval.FailedTGAllocs) == 0 {
// Get the blocked evaluation for a job if it exists
iter, err := txn.Get("evals", "job", eval.JobID, structs.EvalStatusBlocked)
if err != nil {
return fmt.Errorf("failed to get blocked evals for job %q", eval.JobID, err)
}

var blocked []*structs.Evaluation
for {
raw := iter.Next()
if raw == nil {
break
}
blocked = append(blocked, raw.(*structs.Evaluation))
}

// Go through and update the evals
for _, eval := range blocked {
newEval := eval.Copy()
newEval.Status = structs.EvalStatusCancelled
newEval.StatusDescription = fmt.Sprintf("evaluation %q successful", newEval.ID)
newEval.ModifyIndex = index
if err := txn.Insert("evals", newEval); err != nil {
return fmt.Errorf("eval insert failed: %v", err)
}

watcher.Add(watch.Item{Eval: newEval.ID})
}
}

// Insert the eval
if err := txn.Insert("evals", eval); err != nil {
return fmt.Errorf("eval insert failed: %v", err)
Expand Down Expand Up @@ -809,7 +840,7 @@ func (s *StateStore) EvalsByJob(jobID string) ([]*structs.Evaluation, error) {
txn := s.db.Txn(false)

// Get an iterator over the node allocations
iter, err := txn.Get("evals", "job", jobID)
iter, err := txn.Get("evals", "job_prefix", jobID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1490,7 +1521,7 @@ func (s *StateStore) getJobStatus(txn *memdb.Txn, job *structs.Job, evalDelete b
}
}

evals, err := txn.Get("evals", "job", job.ID)
evals, err := txn.Get("evals", "job_prefix", job.ID)
if err != nil {
return "", err
}
Expand Down
Loading

0 comments on commit 7ef228a

Please sign in to comment.