Skip to content

Commit

Permalink
Merge pull request #2155 from hashicorp/f-cancel
Browse files Browse the repository at this point in the history
Cancel blocked evals upon successful one for job
  • Loading branch information
dadgar committed Jan 11, 2017
2 parents aa5c03f + 30d6654 commit 9323ce7
Show file tree
Hide file tree
Showing 7 changed files with 291 additions and 13 deletions.
44 changes: 39 additions & 5 deletions nomad/blocked_evals.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ type BlockedEvals struct {
capacityChangeCh chan *capacityUpdate

// jobs is the map of blocked job and is used to ensure that only one
// blocked eval exists for each job.
jobs map[string]struct{}
// blocked eval exists for each job. The value is the blocked evaluation ID.
jobs map[string]string

// unblockIndexes maps computed node classes to the index in which they were
// unblocked. This is used to check if an evaluation could have been
Expand Down Expand Up @@ -91,7 +91,7 @@ func NewBlockedEvals(evalBroker *EvalBroker) *BlockedEvals {
evalBroker: evalBroker,
captured: make(map[string]wrappedEval),
escaped: make(map[string]wrappedEval),
jobs: make(map[string]struct{}),
jobs: make(map[string]string),
unblockIndexes: make(map[string]uint64),
capacityChangeCh: make(chan *capacityUpdate, unblockBuffer),
duplicateCh: make(chan struct{}, 1),
Expand Down Expand Up @@ -183,7 +183,7 @@ func (b *BlockedEvals) processBlock(eval *structs.Evaluation, token string) {

// Mark the job as tracked.
b.stats.TotalBlocked++
b.jobs[eval.JobID] = struct{}{}
b.jobs[eval.JobID] = eval.ID

// Wrap the evaluation, capturing its token.
wrapped := wrappedEval{
Expand Down Expand Up @@ -244,6 +244,40 @@ func (b *BlockedEvals) missedUnblock(eval *structs.Evaluation) bool {
return false
}

// Untrack causes any blocked evaluation for the passed job to be no longer
// tracked. Untrack is called when there is a successful evaluation for the job
// and a blocked evaluation is no longer needed.
func (b *BlockedEvals) Untrack(jobID string) {
b.l.Lock()
defer b.l.Unlock()

// Do nothing if not enabled
if !b.enabled {
return
}

// Get the evaluation ID to cancel
evalID, ok := b.jobs[jobID]
if !ok {
// No blocked evaluation so exit
return
}

// Attempt to delete the evaluation
if w, ok := b.captured[evalID]; ok {
delete(b.jobs, w.eval.JobID)
delete(b.captured, evalID)
b.stats.TotalBlocked--
}

if w, ok := b.escaped[evalID]; ok {
delete(b.jobs, w.eval.JobID)
delete(b.escaped, evalID)
b.stats.TotalEscaped--
b.stats.TotalBlocked--
}
}

// Unblock causes any evaluation that could potentially make progress on a
// capacity change on the passed computed node class to be enqueued into the
// eval broker.
Expand Down Expand Up @@ -410,7 +444,7 @@ func (b *BlockedEvals) Flush() {
b.stats.TotalBlocked = 0
b.captured = make(map[string]wrappedEval)
b.escaped = make(map[string]wrappedEval)
b.jobs = make(map[string]struct{})
b.jobs = make(map[string]string)
b.duplicates = nil
b.capacityChangeCh = make(chan *capacityUpdate, unblockBuffer)
b.stopCh = make(chan struct{})
Expand Down
24 changes: 24 additions & 0 deletions nomad/blocked_evals_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,3 +484,27 @@ func TestBlockedEvals_UnblockFailed(t *testing.T) {
t.Fatalf("bad: %#v", blockedStats)
}
}

func TestBlockedEvals_Untrack(t *testing.T) {
blocked, _ := testBlockedEvals(t)

// Create two blocked evals and add them to the blocked tracker.
e := mock.Eval()
e.Status = structs.EvalStatusBlocked
e.ClassEligibility = map[string]bool{"v1:123": false, "v1:456": false}
e.SnapshotIndex = 1000
blocked.Block(e)

// Verify block did track
bStats := blocked.Stats()
if bStats.TotalBlocked != 1 || bStats.TotalEscaped != 0 {
t.Fatalf("bad: %#v", bStats)
}

// Untrack and verify
blocked.Untrack(e.JobID)
bStats = blocked.Stats()
if bStats.TotalBlocked != 0 || bStats.TotalEscaped != 0 {
t.Fatalf("bad: %#v", bStats)
}
}
5 changes: 5 additions & 0 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,11 @@ func (n *nomadFSM) applyUpdateEval(buf []byte, index uint64) interface{} {
n.evalBroker.Enqueue(eval)
} else if eval.ShouldBlock() {
n.blockedEvals.Block(eval)
} else if eval.Status == structs.EvalStatusComplete &&
len(eval.FailedTGAllocs) == 0 {
// If we have a successful evaluation for a node, untrack any
// blocked evaluation
n.blockedEvals.Untrack(eval.JobID)
}
}
return nil
Expand Down
108 changes: 108 additions & 0 deletions nomad/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,114 @@ func TestFSM_UpdateEval_Blocked(t *testing.T) {
}
}

func TestFSM_UpdateEval_Untrack(t *testing.T) {
fsm := testFSM(t)
fsm.evalBroker.SetEnabled(true)
fsm.blockedEvals.SetEnabled(true)

// Mark an eval as blocked.
bEval := mock.Eval()
bEval.ClassEligibility = map[string]bool{"v1:123": true}
fsm.blockedEvals.Block(bEval)

// Create a successful eval for the same job
eval := mock.Eval()
eval.JobID = bEval.JobID
eval.Status = structs.EvalStatusComplete

req := structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
}
buf, err := structs.Encode(structs.EvalUpdateRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}

resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}

// Verify we are registered
out, err := fsm.State().EvalByID(eval.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("not found!")
}
if out.CreateIndex != 1 {
t.Fatalf("bad index: %d", out.CreateIndex)
}

// Verify the eval wasn't enqueued
stats := fsm.evalBroker.Stats()
if stats.TotalReady != 0 {
t.Fatalf("bad: %#v %#v", stats, out)
}

// Verify the eval was untracked in the blocked tracker.
bStats := fsm.blockedEvals.Stats()
if bStats.TotalBlocked != 0 {
t.Fatalf("bad: %#v %#v", bStats, out)
}
}

func TestFSM_UpdateEval_NoUntrack(t *testing.T) {
fsm := testFSM(t)
fsm.evalBroker.SetEnabled(true)
fsm.blockedEvals.SetEnabled(true)

// Mark an eval as blocked.
bEval := mock.Eval()
bEval.ClassEligibility = map[string]bool{"v1:123": true}
fsm.blockedEvals.Block(bEval)

// Create a successful eval for the same job but with placement failures
eval := mock.Eval()
eval.JobID = bEval.JobID
eval.Status = structs.EvalStatusComplete
eval.FailedTGAllocs = make(map[string]*structs.AllocMetric)
eval.FailedTGAllocs["test"] = new(structs.AllocMetric)

req := structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
}
buf, err := structs.Encode(structs.EvalUpdateRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}

resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}

// Verify we are registered
out, err := fsm.State().EvalByID(eval.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("not found!")
}
if out.CreateIndex != 1 {
t.Fatalf("bad index: %d", out.CreateIndex)
}

// Verify the eval wasn't enqueued
stats := fsm.evalBroker.Stats()
if stats.TotalReady != 0 {
t.Fatalf("bad: %#v %#v", stats, out)
}

// Verify the eval was not untracked in the blocked tracker.
bStats := fsm.blockedEvals.Stats()
if bStats.TotalBlocked != 1 {
t.Fatalf("bad: %#v %#v", bStats, out)
}
}

func TestFSM_DeleteEval(t *testing.T) {
fsm := testFSM(t)

Expand Down
14 changes: 11 additions & 3 deletions nomad/state/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,17 @@ func evalTableSchema() *memdb.TableSchema {
Name: "job",
AllowMissing: false,
Unique: false,
Indexer: &memdb.StringFieldIndex{
Field: "JobID",
Lowercase: true,
Indexer: &memdb.CompoundIndex{
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{
Field: "JobID",
Lowercase: true,
},
&memdb.StringFieldIndex{
Field: "Status",
Lowercase: true,
},
},
},
},
},
Expand Down
41 changes: 36 additions & 5 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,7 @@ func (s *StateStore) PeriodicLaunches() (memdb.ResultIterator, error) {
return iter, nil
}

// UpsertEvaluation is used to upsert an evaluation
// UpsertEvals is used to upsert a set of evaluations
func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) error {
txn := s.db.Txn(true)
defer txn.Abort()
Expand All @@ -685,7 +685,7 @@ func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) erro
for _, eval := range evals {
watcher.Add(watch.Item{Eval: eval.ID})
watcher.Add(watch.Item{EvalJob: eval.JobID})
if err := s.nestedUpsertEval(txn, index, eval); err != nil {
if err := s.nestedUpsertEval(txn, watcher, index, eval); err != nil {
return err
}

Expand All @@ -703,7 +703,7 @@ func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) erro
}

// nestedUpsertEvaluation is used to nest an evaluation upsert within a transaction
func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, index uint64, eval *structs.Evaluation) error {
func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, watcher watch.Items, index uint64, eval *structs.Evaluation) error {
// Lookup the evaluation
existing, err := txn.First("evals", "id", eval.ID)
if err != nil {
Expand Down Expand Up @@ -751,6 +751,37 @@ func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, index uint64, eval *struct
}
}

// Check if the job has any blocked evaluations and cancel them
if eval.Status == structs.EvalStatusComplete && len(eval.FailedTGAllocs) == 0 {
// Get the blocked evaluation for a job if it exists
iter, err := txn.Get("evals", "job", eval.JobID, structs.EvalStatusBlocked)
if err != nil {
return fmt.Errorf("failed to get blocked evals for job %q", eval.JobID, err)
}

var blocked []*structs.Evaluation
for {
raw := iter.Next()
if raw == nil {
break
}
blocked = append(blocked, raw.(*structs.Evaluation))
}

// Go through and update the evals
for _, eval := range blocked {
newEval := eval.Copy()
newEval.Status = structs.EvalStatusCancelled
newEval.StatusDescription = fmt.Sprintf("evaluation %q successful", newEval.ID)
newEval.ModifyIndex = index
if err := txn.Insert("evals", newEval); err != nil {
return fmt.Errorf("eval insert failed: %v", err)
}

watcher.Add(watch.Item{Eval: newEval.ID})
}
}

// Insert the eval
if err := txn.Insert("evals", eval); err != nil {
return fmt.Errorf("eval insert failed: %v", err)
Expand Down Expand Up @@ -855,7 +886,7 @@ func (s *StateStore) EvalsByJob(jobID string) ([]*structs.Evaluation, error) {
txn := s.db.Txn(false)

// Get an iterator over the node allocations
iter, err := txn.Get("evals", "job", jobID)
iter, err := txn.Get("evals", "job_prefix", jobID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1603,7 +1634,7 @@ func (s *StateStore) getJobStatus(txn *memdb.Txn, job *structs.Job, evalDelete b
}
}

evals, err := txn.Get("evals", "job", job.ID)
evals, err := txn.Get("evals", "job_prefix", job.ID)
if err != nil {
return "", err
}
Expand Down
Loading

0 comments on commit 9323ce7

Please sign in to comment.