Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove terminal allocations associated with older job modify index #4839

Merged
merged 2 commits into from
Nov 9, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 23 additions & 9 deletions nomad/core_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,14 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64,
return false, nil, err
}

// Get the allocations by eval
allocs, err := c.snap.AllocsByEval(ws, eval.ID)
if err != nil {
c.logger.Error("failed to get allocs for eval",
"eval_id", eval.ID, "error", err)
return false, nil, err
}

// If the eval is from a running "batch" job we don't want to garbage
// collect its allocations. If there is a long running batch job and its
// terminal allocations get GC'd the scheduler would re-run the
Expand All @@ -311,18 +319,12 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64,
// We don't want to gc anything related to a job which is not dead
// If the batch job doesn't exist we can GC it regardless of allowBatch
if !collect {
return false, nil, nil
// Find allocs associated with older (based on modifyindex) and GC them if terminal
oldAllocs := olderVersionTerminalAllocs(allocs, job)
return false, oldAllocs, nil
}
}

// Get the allocations by eval
allocs, err := c.snap.AllocsByEval(ws, eval.ID)
if err != nil {
c.logger.Error("failed to get allocs for eval",
"eval_id", eval.ID, "error", err)
return false, nil, err
}

// Scan the allocations to ensure they are terminal and old
gcEval := true
var gcAllocIDs []string
Expand All @@ -340,6 +342,18 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64,
return gcEval, gcAllocIDs, nil
}

// olderVersionTerminalAllocs returns terminal allocations whose job modify index
// is older than the job's modify index
func olderVersionTerminalAllocs(allocs []*structs.Allocation, job *structs.Job) []string {
var ret []string
for _, alloc := range allocs {
if alloc.Job != nil && alloc.Job.JobModifyIndex < job.JobModifyIndex && alloc.TerminalStatus() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what you really want to test is the JobCreateIndex. I could change the priority of a job and that would change the job modify index. That doesn't mean we should GC its allocations. They are still potentially useful for scheduling decisions.

ret = append(ret, alloc.ID)
}
}
return ret
}

// evalReap contacts the leader and issues a reap on the passed evals and
// allocs.
func (c *CoreScheduler) evalReap(evals, allocs []string) error {
Expand Down
152 changes: 140 additions & 12 deletions nomad/core_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ func TestCoreScheduler_EvalGC_ReschedulingAllocs(t *testing.T) {

// Insert failed alloc with an old reschedule attempt, can be GCed
alloc := mock.Alloc()
alloc.Job = job
alloc.EvalID = eval.ID
alloc.DesiredStatus = structs.AllocDesiredStatusRun
alloc.ClientStatus = structs.AllocClientStatusFailed
Expand All @@ -158,6 +159,7 @@ func TestCoreScheduler_EvalGC_ReschedulingAllocs(t *testing.T) {
}

alloc2 := mock.Alloc()
alloc2.Job = job
alloc2.EvalID = eval.ID
alloc2.DesiredStatus = structs.AllocDesiredStatusRun
alloc2.ClientStatus = structs.AllocClientStatusFailed
Expand Down Expand Up @@ -315,12 +317,14 @@ func TestCoreScheduler_EvalGC_Batch(t *testing.T) {

// Insert "failed" alloc
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.EvalID = eval.ID
alloc.DesiredStatus = structs.AllocDesiredStatusStop

// Insert "lost" alloc
alloc2 := mock.Alloc()
alloc2.Job = job
alloc2.JobID = job.ID
alloc2.EvalID = eval.ID
alloc2.DesiredStatus = structs.AllocDesiredStatusRun
Expand Down Expand Up @@ -384,6 +388,128 @@ func TestCoreScheduler_EvalGC_Batch(t *testing.T) {
}
}

// An EvalGC should reap allocations from jobs with an older modify index
func TestCoreScheduler_EvalGC_Batch_OldVersion(t *testing.T) {
t.Parallel()
s1 := TestServer(t, nil)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)

// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10)

// Insert a "dead" job
state := s1.fsm.State()
job := mock.Job()
job.Type = structs.JobTypeBatch
job.Status = structs.JobStatusDead
err := state.UpsertJob(1000, job)
if err != nil {
t.Fatalf("err: %v", err)
}

// Insert "complete" eval
eval := mock.Eval()
eval.Status = structs.EvalStatusComplete
eval.Type = structs.JobTypeBatch
eval.JobID = job.ID
err = state.UpsertEvals(1001, []*structs.Evaluation{eval})
if err != nil {
t.Fatalf("err: %v", err)
}

// Insert "failed" alloc
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.EvalID = eval.ID
alloc.DesiredStatus = structs.AllocDesiredStatusStop

// Insert "lost" alloc
alloc2 := mock.Alloc()
alloc2.Job = job
alloc2.JobID = job.ID
alloc2.EvalID = eval.ID
alloc2.DesiredStatus = structs.AllocDesiredStatusRun
alloc2.ClientStatus = structs.AllocClientStatusLost

// Insert alloc with older job modifyindex
alloc3 := mock.Alloc()
job2 := job.Copy()

alloc3.Job = job2
alloc3.JobID = job2.ID
alloc3.EvalID = eval.ID
job2.JobModifyIndex = 500
alloc3.DesiredStatus = structs.AllocDesiredStatusRun
alloc3.ClientStatus = structs.AllocClientStatusLost

err = state.UpsertAllocs(1002, []*structs.Allocation{alloc, alloc2, alloc3})
if err != nil {
t.Fatalf("err: %v", err)
}

// Update the time tables to make this work
tt := s1.fsm.TimeTable()
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.EvalGCThreshold))

// Create a core scheduler
snap, err := state.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core := NewCoreScheduler(s1, snap)

// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
}

// Alloc1 and 2 should be there, and alloc3 should be gone
ws := memdb.NewWatchSet()
out, err := state.EvalByID(ws, eval.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("bad: %v", out)
}

outA, err := state.AllocByID(ws, alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outA == nil {
t.Fatalf("bad: %v", outA)
}

outA2, err := state.AllocByID(ws, alloc2.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outA2 == nil {
t.Fatalf("bad: %v", outA2)
}

outA3, err := state.AllocByID(ws, alloc3.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outA3 != nil {
t.Fatalf("expected alloc to be nil:%v", outA2)
}

outB, err := state.JobByID(ws, job.Namespace, job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outB == nil {
t.Fatalf("bad: %v", outB)
}
}

// An EvalGC should reap a batch job that has been stopped
func TestCoreScheduler_EvalGC_BatchStopped(t *testing.T) {
t.Parallel()
Expand Down Expand Up @@ -1798,18 +1924,20 @@ func TestCoreScheduler_PartitionJobReap(t *testing.T) {
// Tests various scenarios when allocations are eligible to be GCed
func TestAllocation_GCEligible(t *testing.T) {
type testCase struct {
Desc string
GCTime time.Time
ClientStatus string
DesiredStatus string
JobStatus string
JobStop bool
ModifyIndex uint64
NextAllocID string
ReschedulePolicy *structs.ReschedulePolicy
RescheduleTrackers []*structs.RescheduleEvent
ThresholdIndex uint64
ShouldGC bool
Desc string
GCTime time.Time
ClientStatus string
DesiredStatus string
JobStatus string
JobStop bool
AllocJobModifyIndex uint64
JobModifyIndex uint64
ModifyIndex uint64
NextAllocID string
ReschedulePolicy *structs.ReschedulePolicy
RescheduleTrackers []*structs.RescheduleEvent
ThresholdIndex uint64
ShouldGC bool
}

fail := time.Now()
Expand Down