Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cancel blocked evals upon successful one for job #2155

Merged
merged 3 commits into from
Jan 11, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 39 additions & 5 deletions nomad/blocked_evals.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ type BlockedEvals struct {
capacityChangeCh chan *capacityUpdate

// jobs is the map of blocked job and is used to ensure that only one
// blocked eval exists for each job.
jobs map[string]struct{}
// blocked eval exists for each job. The value is the blocked evaluation ID.
jobs map[string]string

// unblockIndexes maps computed node classes to the index in which they were
// unblocked. This is used to check if an evaluation could have been
Expand Down Expand Up @@ -91,7 +91,7 @@ func NewBlockedEvals(evalBroker *EvalBroker) *BlockedEvals {
evalBroker: evalBroker,
captured: make(map[string]wrappedEval),
escaped: make(map[string]wrappedEval),
jobs: make(map[string]struct{}),
jobs: make(map[string]string),
unblockIndexes: make(map[string]uint64),
capacityChangeCh: make(chan *capacityUpdate, unblockBuffer),
duplicateCh: make(chan struct{}, 1),
Expand Down Expand Up @@ -183,7 +183,7 @@ func (b *BlockedEvals) processBlock(eval *structs.Evaluation, token string) {

// Mark the job as tracked.
b.stats.TotalBlocked++
b.jobs[eval.JobID] = struct{}{}
b.jobs[eval.JobID] = eval.ID

// Wrap the evaluation, capturing its token.
wrapped := wrappedEval{
Expand Down Expand Up @@ -244,6 +244,40 @@ func (b *BlockedEvals) missedUnblock(eval *structs.Evaluation) bool {
return false
}

// Untrack causes any blocked evaluation for the passed job to be no longer
// tracked. Untrack is called when there is a successful evaluation for the job
// and a blocked evaluation is no longer needed.
func (b *BlockedEvals) Untrack(jobID string) {
b.l.Lock()
defer b.l.Unlock()

// Do nothing if not enabled
if !b.enabled {
return
}

// Get the evaluation ID to cancel
evalID, ok := b.jobs[jobID]
if !ok {
// No blocked evaluation so exit
return
}

// Attempt to delete the evaluation
if w, ok := b.captured[evalID]; ok {
delete(b.jobs, w.eval.JobID)
delete(b.captured, evalID)
b.stats.TotalBlocked--
}

if w, ok := b.escaped[evalID]; ok {
delete(b.jobs, w.eval.JobID)
delete(b.escaped, evalID)
b.stats.TotalEscaped--
b.stats.TotalBlocked--
}
}

// Unblock causes any evaluation that could potentially make progress on a
// capacity change on the passed computed node class to be enqueued into the
// eval broker.
Expand Down Expand Up @@ -410,7 +444,7 @@ func (b *BlockedEvals) Flush() {
b.stats.TotalBlocked = 0
b.captured = make(map[string]wrappedEval)
b.escaped = make(map[string]wrappedEval)
b.jobs = make(map[string]struct{})
b.jobs = make(map[string]string)
b.duplicates = nil
b.capacityChangeCh = make(chan *capacityUpdate, unblockBuffer)
b.stopCh = make(chan struct{})
Expand Down
24 changes: 24 additions & 0 deletions nomad/blocked_evals_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,3 +484,27 @@ func TestBlockedEvals_UnblockFailed(t *testing.T) {
t.Fatalf("bad: %#v", blockedStats)
}
}

func TestBlockedEvals_Untrack(t *testing.T) {
blocked, _ := testBlockedEvals(t)

// Create two blocked evals and add them to the blocked tracker.
e := mock.Eval()
e.Status = structs.EvalStatusBlocked
e.ClassEligibility = map[string]bool{"v1:123": false, "v1:456": false}
e.SnapshotIndex = 1000
blocked.Block(e)

// Verify block did track
bStats := blocked.Stats()
if bStats.TotalBlocked != 1 || bStats.TotalEscaped != 0 {
t.Fatalf("bad: %#v", bStats)
}

// Untrack and verify
blocked.Untrack(e.JobID)
bStats = blocked.Stats()
if bStats.TotalBlocked != 0 || bStats.TotalEscaped != 0 {
t.Fatalf("bad: %#v", bStats)
}
}
5 changes: 5 additions & 0 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,11 @@ func (n *nomadFSM) applyUpdateEval(buf []byte, index uint64) interface{} {
n.evalBroker.Enqueue(eval)
} else if eval.ShouldBlock() {
n.blockedEvals.Block(eval)
} else if eval.Status == structs.EvalStatusComplete &&
len(eval.FailedTGAllocs) == 0 {
// If we have a successful evaluation for a node, untrack any
// blocked evaluation
n.blockedEvals.Untrack(eval.JobID)
}
}
return nil
Expand Down
108 changes: 108 additions & 0 deletions nomad/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,114 @@ func TestFSM_UpdateEval_Blocked(t *testing.T) {
}
}

func TestFSM_UpdateEval_Untrack(t *testing.T) {
fsm := testFSM(t)
fsm.evalBroker.SetEnabled(true)
fsm.blockedEvals.SetEnabled(true)

// Mark an eval as blocked.
bEval := mock.Eval()
bEval.ClassEligibility = map[string]bool{"v1:123": true}
fsm.blockedEvals.Block(bEval)

// Create a successful eval for the same job
eval := mock.Eval()
eval.JobID = bEval.JobID
eval.Status = structs.EvalStatusComplete

req := structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
}
buf, err := structs.Encode(structs.EvalUpdateRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}

resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}

// Verify we are registered
out, err := fsm.State().EvalByID(eval.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("not found!")
}
if out.CreateIndex != 1 {
t.Fatalf("bad index: %d", out.CreateIndex)
}

// Verify the eval wasn't enqueued
stats := fsm.evalBroker.Stats()
if stats.TotalReady != 0 {
t.Fatalf("bad: %#v %#v", stats, out)
}

// Verify the eval was untracked in the blocked tracker.
bStats := fsm.blockedEvals.Stats()
if bStats.TotalBlocked != 0 {
t.Fatalf("bad: %#v %#v", bStats, out)
}
}

func TestFSM_UpdateEval_NoUntrack(t *testing.T) {
fsm := testFSM(t)
fsm.evalBroker.SetEnabled(true)
fsm.blockedEvals.SetEnabled(true)

// Mark an eval as blocked.
bEval := mock.Eval()
bEval.ClassEligibility = map[string]bool{"v1:123": true}
fsm.blockedEvals.Block(bEval)

// Create a successful eval for the same job but with placement failures
eval := mock.Eval()
eval.JobID = bEval.JobID
eval.Status = structs.EvalStatusComplete
eval.FailedTGAllocs = make(map[string]*structs.AllocMetric)
eval.FailedTGAllocs["test"] = new(structs.AllocMetric)

req := structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
}
buf, err := structs.Encode(structs.EvalUpdateRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}

resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}

// Verify we are registered
out, err := fsm.State().EvalByID(eval.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("not found!")
}
if out.CreateIndex != 1 {
t.Fatalf("bad index: %d", out.CreateIndex)
}

// Verify the eval wasn't enqueued
stats := fsm.evalBroker.Stats()
if stats.TotalReady != 0 {
t.Fatalf("bad: %#v %#v", stats, out)
}

// Verify the eval was not untracked in the blocked tracker.
bStats := fsm.blockedEvals.Stats()
if bStats.TotalBlocked != 1 {
t.Fatalf("bad: %#v %#v", bStats, out)
}
}

func TestFSM_DeleteEval(t *testing.T) {
fsm := testFSM(t)

Expand Down
14 changes: 11 additions & 3 deletions nomad/state/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,17 @@ func evalTableSchema() *memdb.TableSchema {
Name: "job",
AllowMissing: false,
Unique: false,
Indexer: &memdb.StringFieldIndex{
Field: "JobID",
Lowercase: true,
Indexer: &memdb.CompoundIndex{
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{
Field: "JobID",
Lowercase: true,
},
&memdb.StringFieldIndex{
Field: "Status",
Lowercase: true,
},
},
},
},
},
Expand Down
41 changes: 36 additions & 5 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ func (s *StateStore) PeriodicLaunches() (memdb.ResultIterator, error) {
return iter, nil
}

// UpsertEvaluation is used to upsert an evaluation
// UpsertEvals is used to upsert a set of evaluations
func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) error {
txn := s.db.Txn(true)
defer txn.Abort()
Expand All @@ -639,7 +639,7 @@ func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) erro
for _, eval := range evals {
watcher.Add(watch.Item{Eval: eval.ID})
watcher.Add(watch.Item{EvalJob: eval.JobID})
if err := s.nestedUpsertEval(txn, index, eval); err != nil {
if err := s.nestedUpsertEval(txn, watcher, index, eval); err != nil {
return err
}

Expand All @@ -657,7 +657,7 @@ func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) erro
}

// nestedUpsertEvaluation is used to nest an evaluation upsert within a transaction
func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, index uint64, eval *structs.Evaluation) error {
func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, watcher watch.Items, index uint64, eval *structs.Evaluation) error {
// Lookup the evaluation
existing, err := txn.First("evals", "id", eval.ID)
if err != nil {
Expand Down Expand Up @@ -705,6 +705,37 @@ func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, index uint64, eval *struct
}
}

// Check if the job has any blocked evaluations and cancel them
if eval.Status == structs.EvalStatusComplete && len(eval.FailedTGAllocs) == 0 {
// Get the blocked evaluation for a job if it exists
iter, err := txn.Get("evals", "job", eval.JobID, structs.EvalStatusBlocked)
if err != nil {
return fmt.Errorf("failed to get blocked evals for job %q", eval.JobID, err)
}

var blocked []*structs.Evaluation
for {
raw := iter.Next()
if raw == nil {
break
}
blocked = append(blocked, raw.(*structs.Evaluation))
}

// Go through and update the evals
for _, eval := range blocked {
newEval := eval.Copy()
newEval.Status = structs.EvalStatusCancelled
newEval.StatusDescription = fmt.Sprintf("evaluation %q successful", newEval.ID)
newEval.ModifyIndex = index
if err := txn.Insert("evals", newEval); err != nil {
return fmt.Errorf("eval insert failed: %v", err)
}

watcher.Add(watch.Item{Eval: newEval.ID})
}
}

// Insert the eval
if err := txn.Insert("evals", eval); err != nil {
return fmt.Errorf("eval insert failed: %v", err)
Expand Down Expand Up @@ -809,7 +840,7 @@ func (s *StateStore) EvalsByJob(jobID string) ([]*structs.Evaluation, error) {
txn := s.db.Txn(false)

// Get an iterator over the node allocations
iter, err := txn.Get("evals", "job", jobID)
iter, err := txn.Get("evals", "job_prefix", jobID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1490,7 +1521,7 @@ func (s *StateStore) getJobStatus(txn *memdb.Txn, job *structs.Job, evalDelete b
}
}

evals, err := txn.Get("evals", "job", job.ID)
evals, err := txn.Get("evals", "job_prefix", job.ID)
if err != nil {
return "", err
}
Expand Down
Loading