Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[15090] Ensure no leakage of evaluations for batch jobs. #15097

Merged
merged 12 commits into from
Jan 31, 2023
37 changes: 20 additions & 17 deletions nomad/core_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,16 +297,18 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64,
}

// If the eval is from a running "batch" job we don't want to garbage
// collect its allocations. If there is a long running batch job and its
// terminal allocations get GC'd the scheduler would re-run the
// allocations.
// collect its most current allocations. If there is a long running batch job and its
// terminal allocations get GC'd the scheduler would re-run the allocations. However,
// we do want to GC old Evals and Allocs if there are newer ones due to an update.
if eval.Type == structs.JobTypeBatch {
// Check if the job is running

// Can collect if:
// Job doesn't exist
// Job is Stopped and dead
// allowBatch and the job is dead
// Can collect if either holds:
// - Job doesn't exist
// - Job is Stopped and dead
// - allowBatch and the job is dead
//
// If we cannot collect outright, check if a partial GC may occur
collect := false
if job == nil {
collect = true
stswidwinski marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -318,12 +320,9 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64,
collect = true
}

// We don't want to gc anything related to a job which is not dead
// If the batch job doesn't exist we can GC it regardless of allowBatch
if !collect {
// Find allocs associated with older (based on createindex) and GC them if terminal
oldAllocs := olderVersionTerminalAllocs(allocs, job)
return false, oldAllocs, nil
oldAllocs, gcEval := olderVersionTerminalAllocs(allocs, job, thresholdIndex)
return gcEval, oldAllocs, nil
}
stswidwinski marked this conversation as resolved.
Show resolved Hide resolved
}

Expand All @@ -344,16 +343,20 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64,
return gcEval, gcAllocIDs, nil
}

// olderVersionTerminalAllocs returns terminal allocations whose job create index
// is older than the job's create index
func olderVersionTerminalAllocs(allocs []*structs.Allocation, job *structs.Job) []string {
// olderVersionTerminalAllocs returns a tuplie ([]string, bool). The first element is the list of
// terminal allocations which may be garbage collected for batch jobs. The second element indicates
// whether or not the allocation itself may be garbage collected.
stswidwinski marked this conversation as resolved.
Show resolved Hide resolved
func olderVersionTerminalAllocs(allocs []*structs.Allocation, job *structs.Job, thresholdIndex uint64) ([]string, bool) {
var ret []string
var mayGCEval = true
for _, alloc := range allocs {
if alloc.Job != nil && alloc.Job.CreateIndex < job.CreateIndex && alloc.TerminalStatus() {
if alloc.CreateIndex < job.JobModifyIndex && alloc.ModifyIndex < thresholdIndex && alloc.TerminalStatus() {
stswidwinski marked this conversation as resolved.
Show resolved Hide resolved
ret = append(ret, alloc.ID)
} else {
mayGCEval = false
}
}
return ret
return ret, mayGCEval
}

// evalReap contacts the leader and issues a reap on the passed evals and
Expand Down
224 changes: 199 additions & 25 deletions nomad/core_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func TestCoreScheduler_EvalGC_Batch(t *testing.T) {
}
}

// An EvalGC should reap allocations from jobs with an older modify index
// An EvalGC should reap allocations from jobs with a newer modify index
func TestCoreScheduler_EvalGC_Batch_OldVersion(t *testing.T) {
ci.Parallel(t)

Expand All @@ -404,12 +404,14 @@ func TestCoreScheduler_EvalGC_Batch_OldVersion(t *testing.T) {
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10)

var jobModifyIdx uint64 = 1000

// Insert a "dead" job
store := s1.fsm.State()
job := mock.Job()
job.Type = structs.JobTypeBatch
job.Status = structs.JobStatusDead
err := store.UpsertJob(structs.MsgTypeTestSetup, 1000, job)
err := store.UpsertJob(structs.MsgTypeTestSetup, jobModifyIdx, job)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand All @@ -419,7 +421,7 @@ func TestCoreScheduler_EvalGC_Batch_OldVersion(t *testing.T) {
eval.Status = structs.EvalStatusComplete
eval.Type = structs.JobTypeBatch
eval.JobID = job.ID
err = store.UpsertEvals(structs.MsgTypeTestSetup, 1001, []*structs.Evaluation{eval})
err = store.UpsertEvals(structs.MsgTypeTestSetup, jobModifyIdx+1, []*structs.Evaluation{eval})
if err != nil {
t.Fatalf("err: %v", err)
}
Expand All @@ -439,25 +441,194 @@ func TestCoreScheduler_EvalGC_Batch_OldVersion(t *testing.T) {
alloc2.DesiredStatus = structs.AllocDesiredStatusRun
alloc2.ClientStatus = structs.AllocClientStatusLost

// Insert alloc with older job modifyindex
alloc3 := mock.Alloc()
job2 := job.Copy()
err = store.UpsertAllocs(structs.MsgTypeTestSetup, jobModifyIdx+2, []*structs.Allocation{alloc, alloc2})
if err != nil {
t.Fatalf("err: %v", err)
}

alloc3.Job = job2
alloc3.JobID = job2.ID
// Insert allocs with indexes older than job.ModifyIndex. Two cases:
stswidwinski marked this conversation as resolved.
Show resolved Hide resolved
// 1. Terminal state
// 2. Non-terminal state
alloc3 := mock.Alloc()
alloc3.Job = job
alloc3.JobID = job.ID
alloc3.EvalID = eval.ID
job2.CreateIndex = 500
alloc3.DesiredStatus = structs.AllocDesiredStatusRun
alloc3.ClientStatus = structs.AllocClientStatusLost

err = store.UpsertAllocs(structs.MsgTypeTestSetup, 1002, []*structs.Allocation{alloc, alloc2, alloc3})
alloc4 := mock.Alloc()
alloc4.Job = job
alloc4.JobID = job.ID
alloc4.EvalID = eval.ID
alloc4.DesiredStatus = structs.AllocDesiredStatusRun
alloc4.ClientStatus = structs.AllocClientStatusRunning

err = store.UpsertAllocs(structs.MsgTypeTestSetup, jobModifyIdx-1, []*structs.Allocation{alloc3, alloc4})
if err != nil {
t.Fatalf("err: %v", err)
}

// Update the time tables to make this work
// A little helper for assertions
assertCorrectEvalAlloc := func(
ws memdb.WatchSet,
eval *structs.Evaluation,
allocsShouldExist []*structs.Allocation,
allocsShouldNotExist []*structs.Allocation,
) {
stswidwinski marked this conversation as resolved.
Show resolved Hide resolved
out, err := store.EvalByID(ws, eval.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("bad: %v", out)
}
stswidwinski marked this conversation as resolved.
Show resolved Hide resolved

for _, alloc := range allocsShouldExist {
outA, err := store.AllocByID(ws, alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outA == nil {
t.Fatalf("bad: %v", outA)
}
}

for _, alloc := range allocsShouldNotExist {
outA, err := store.AllocByID(ws, alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outA != nil {
t.Fatalf("expected alloc to be nil:%v", outA)
}
}
}

// Create a core scheduler
snap, err := store.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core := NewCoreScheduler(s1, snap)

// Attempt the GC without moving the time significantly.
gc := s1.coreJobEval(structs.CoreJobEvalGC, jobModifyIdx)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
}

// All allocations should remain in place.
assertCorrectEvalAlloc(
memdb.NewWatchSet(),
eval,
[]*structs.Allocation{alloc, alloc2, alloc3, alloc4},
[]*structs.Allocation{},
)

// Attempt the GC while moving the time forward significantly.
tt := s1.fsm.TimeTable()
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.EvalGCThreshold))
tt.Witness(2*jobModifyIdx, time.Now().UTC().Add(-1*s1.config.EvalGCThreshold))

gc = s1.coreJobEval(structs.CoreJobEvalGC, jobModifyIdx*2)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
}

// The only remaining allocations are those which are "current", or such that
// the batch job is still using them. To spell it out:
//
// Alloc1 and Alloc 2 should remain due to the update threshold.
// Alloc 3 should be GCed due to age
// Alloc 4 should remain due to non-terminal state
assertCorrectEvalAlloc(
memdb.NewWatchSet(),
eval,
[]*structs.Allocation{alloc, alloc2, alloc4},
[]*structs.Allocation{alloc3},
)

// The job should still exist.
ws := memdb.NewWatchSet()
outB, err := store.JobByID(ws, job.Namespace, job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outB == nil {
t.Fatalf("bad: %v", outB)
}
}

// An EvalGC should reap allocations from jobs with a newer modify index and reap the eval itself
// if all allocs are reaped.
func TestCoreScheduler_EvalGC_Batch_OldVersionReapsEval(t *testing.T) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests have a lot of setup code (as I'm sure you noticed!) and the assertion is only subtly different than the previous one. It might make the whole test more understandable if we collapsed these into a single test and had this one be the final assertion that the eval is GC'd once the conditions are right.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I have actually spent quite a bit of time unravelling these test structures. The reason why making this into a single test is non-trivial is the way in which the time table is used and the fact that there does not exist a clock whose reading we may set to a particular time.

In essence, time.Now() is used throughout which always gives the current time without the ability to set the clock. When we insert time events into the time table, we must do so from oldest-to-newest for the time table to allow us to select older events (the sorting logic is: "give me the last event that has occurred past time X").

As such, I resorted to forceful reset of the time table for each logical test sharing the setup.

Please see the changes.

ci.Parallel(t)

s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)

// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10)

var jobModifyIdx uint64 = 1000

// Insert a "dead" job
store := s1.fsm.State()
job := mock.Job()
job.Type = structs.JobTypeBatch
job.Status = structs.JobStatusDead
err := store.UpsertJob(structs.MsgTypeTestSetup, jobModifyIdx, job)
if err != nil {
t.Fatalf("err: %v", err)
}

// Insert "complete" eval
eval := mock.Eval()
eval.Status = structs.EvalStatusComplete
eval.Type = structs.JobTypeBatch
eval.JobID = job.ID
err = store.UpsertEvals(structs.MsgTypeTestSetup, jobModifyIdx+1, []*structs.Evaluation{eval})
if err != nil {
t.Fatalf("err: %v", err)
}

// Insert an alloc with index older than job.ModifyIndex.
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.EvalID = eval.ID
alloc.DesiredStatus = structs.AllocDesiredStatusRun
alloc.ClientStatus = structs.AllocClientStatusLost

err = store.UpsertAllocs(structs.MsgTypeTestSetup, jobModifyIdx-1, []*structs.Allocation{alloc})
if err != nil {
t.Fatalf("err: %v", err)
}

// Insert a new eval
eval2 := mock.Eval()
eval2.Status = structs.EvalStatusComplete
eval2.Type = structs.JobTypeBatch
eval2.JobID = job.ID
err = store.UpsertEvals(structs.MsgTypeTestSetup, jobModifyIdx+1, []*structs.Evaluation{eval2})
if err != nil {
t.Fatalf("err: %v", err)
}

// Insert a running alloc belonging to the above eval
alloc2 := mock.Alloc()
alloc2.Job = job
alloc2.JobID = job.ID
alloc2.EvalID = eval2.ID
alloc2.DesiredStatus = structs.AllocDesiredStatusRun
alloc2.ClientStatus = structs.AllocClientStatusLost

err = store.UpsertAllocs(structs.MsgTypeTestSetup, jobModifyIdx+1, []*structs.Allocation{alloc2})
if err != nil {
t.Fatalf("err: %v", err)
}

// Create a core scheduler
snap, err := store.Snapshot()
Expand All @@ -466,45 +637,48 @@ func TestCoreScheduler_EvalGC_Batch_OldVersion(t *testing.T) {
}
core := NewCoreScheduler(s1, snap)

// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000)
// Attempt the GC while moving the time forward significantly.
tt := s1.fsm.TimeTable()
tt.Witness(2*jobModifyIdx, time.Now().UTC().Add(-1*s1.config.EvalGCThreshold))

gc := s1.coreJobEval(structs.CoreJobEvalGC, jobModifyIdx*2)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
}

// Alloc1 and 2 should be there, and alloc3 should be gone
// The old alloc and the old eval are gone. The new eval and the new alloc are not.
ws := memdb.NewWatchSet()
out, err := store.EvalByID(ws, eval.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
if out != nil {
t.Fatalf("bad: %v", out)
}

outA, err := store.AllocByID(ws, alloc.ID)
out, err = store.EvalByID(ws, eval2.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outA == nil {
t.Fatalf("bad: %v", outA)
if out == nil {
t.Fatalf("bad: %v", out)
}

outA2, err := store.AllocByID(ws, alloc2.ID)
outA, err := store.AllocByID(ws, alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outA2 == nil {
t.Fatalf("bad: %v", outA2)
if outA != nil {
t.Fatalf("bad: %v", outA)
}

outA3, err := store.AllocByID(ws, alloc3.ID)
outA, err = store.AllocByID(ws, alloc2.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outA3 != nil {
t.Fatalf("expected alloc to be nil:%v", outA2)
if outA == nil {
t.Fatalf("bad: %v", outA)
}

outB, err := store.JobByID(ws, job.Namespace, job.ID)
Expand Down