Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve partial garbage collection of allocations #1256

Merged
merged 2 commits into from
Jun 11, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 48 additions & 35 deletions nomad/core_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,20 +105,26 @@ OUTER:
continue
}

allEvalsGC := true
for _, eval := range evals {
gc, allocs, err := c.gcEval(eval, oldThreshold)
if err != nil || !gc {
// We skip the job because it is not finished if it has
// non-terminal allocations.
if err != nil {
continue OUTER
}

gcEval = append(gcEval, eval.ID)
// Update whether all evals GC'd so we know whether to GC the job.
allEvalsGC = allEvalsGC && gc

if gc {
gcEval = append(gcEval, eval.ID)
}
gcAlloc = append(gcAlloc, allocs...)
}

// Job is eligible for garbage collection
gcJob = append(gcJob, job.ID)
if allEvalsGC {
gcJob = append(gcJob, job.ID)
}
}

// Fast-path the nothing case
Expand Down Expand Up @@ -186,28 +192,10 @@ func (c *CoreScheduler) evalGC(eval *structs.Evaluation) error {
return err
}

// If the eval is from a running "batch" job we don't want to garbage
// collect its allocations. If there is a long running batch job and its
// terminal allocations get GC'd the scheduler would re-run the
// allocations.
if eval.Type == structs.JobTypeBatch {
// Check if the job is running
job, err := c.snap.JobByID(eval.JobID)
if err != nil {
return err
}

// If the job has been deregistered, we want to garbage collect the
// allocations and evaluations.
if job != nil && len(allocs) != 0 {
continue
}
}

if gc {
gcEval = append(gcEval, eval.ID)
gcAlloc = append(gcAlloc, allocs...)
}
gcAlloc = append(gcAlloc, allocs...)
}

// Fast-path the nothing case
Expand All @@ -232,6 +220,24 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64)
return false, nil, nil
}

// If the eval is from a running "batch" job we don't want to garbage
// collect its allocations. If there is a long running batch job and its
// terminal allocations get GC'd the scheduler would re-run the
// allocations.
if eval.Type == structs.JobTypeBatch {
// Check if the job is running
job, err := c.snap.JobByID(eval.JobID)
if err != nil {
return false, nil, err
}

// If the job has been deregistered, we want to garbage collect the
// allocations and evaluations.
if job != nil {
return false, nil, nil
}
}

// Get the allocations by eval
allocs, err := c.snap.AllocsByEval(eval.ID)
if err != nil {
Expand All @@ -241,19 +247,20 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64)
}

// Scan the allocations to ensure they are terminal and old
gcEval := true
var gcAllocIDs []string
for _, alloc := range allocs {
if !alloc.TerminalStatus() || alloc.ModifyIndex > thresholdIndex {
return false, nil, nil
// Can't GC the evaluation since not all of the allocations are
// terminal
gcEval = false
} else {
// The allocation is eligible to be GC'd
gcAllocIDs = append(gcAllocIDs, alloc.ID)
}
}

allocIds := make([]string, len(allocs))
for i, alloc := range allocs {
allocIds[i] = alloc.ID
}

// Evaluation is eligible for garbage collection
return true, allocIds, nil
return gcEval, gcAllocIDs, nil
}

// evalReap contacts the leader and issues a reap on the passed evals and
Expand Down Expand Up @@ -343,6 +350,7 @@ func (c *CoreScheduler) nodeGC(eval *structs.Evaluation) error {

// Collect the nodes to GC
var gcNode []string
OUTER:
for {
raw := iter.Next()
if raw == nil {
Expand All @@ -363,9 +371,14 @@ func (c *CoreScheduler) nodeGC(eval *structs.Evaluation) error {
continue
}

// If there are any allocations, skip the node
if len(allocs) > 0 {
continue
// If there are any non-terminal allocations, skip the node. If the node
// is terminal and the allocations are not, the scheduler may not have
// run yet to transistion the allocs on the node to terminal. We delay
// GC'ing until this happens.
for _, alloc := range allocs {
if !alloc.TerminalStatus() {
continue OUTER
}
}

// Node is eligible for garbage collection
Expand Down
179 changes: 179 additions & 0 deletions nomad/core_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,83 @@ func TestCoreScheduler_EvalGC(t *testing.T) {
}
}

func TestCoreScheduler_EvalGC_Partial(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)

// Insert "dead" eval
state := s1.fsm.State()
eval := mock.Eval()
eval.Status = structs.EvalStatusComplete
err := state.UpsertEvals(1000, []*structs.Evaluation{eval})
if err != nil {
t.Fatalf("err: %v", err)
}

// Insert "dead" alloc
alloc := mock.Alloc()
alloc.EvalID = eval.ID
alloc.DesiredStatus = structs.AllocDesiredStatusFailed
err = state.UpsertAllocs(1001, []*structs.Allocation{alloc})
if err != nil {
t.Fatalf("err: %v", err)
}

// Insert "running" alloc
alloc2 := mock.Alloc()
alloc2.EvalID = eval.ID
err = state.UpsertAllocs(1002, []*structs.Allocation{alloc2})
if err != nil {
t.Fatalf("err: %v", err)
}

// Update the time tables to make this work
tt := s1.fsm.TimeTable()
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.EvalGCThreshold))

// Create a core scheduler
snap, err := state.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core := NewCoreScheduler(s1, snap)

// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobEvalGC)
gc.ModifyIndex = 2000
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
}

// Should not be gone
out, err := state.EvalByID(eval.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("bad: %v", out)
}

outA, err := state.AllocByID(alloc2.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outA == nil {
t.Fatalf("bad: %v", outA)
}

// Should be gone
outB, err := state.AllocByID(alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outB != nil {
t.Fatalf("bad: %v", outB)
}
}

func TestCoreScheduler_EvalGC_Batch_NoAllocs(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
Expand Down Expand Up @@ -334,6 +411,108 @@ func TestCoreScheduler_NodeGC(t *testing.T) {
}
}

func TestCoreScheduler_NodeGC_TerminalAllocs(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)

// Insert "dead" node
state := s1.fsm.State()
node := mock.Node()
node.Status = structs.NodeStatusDown
err := state.UpsertNode(1000, node)
if err != nil {
t.Fatalf("err: %v", err)
}

// Insert a terminal alloc on that node
alloc := mock.Alloc()
alloc.DesiredStatus = structs.AllocDesiredStatusStop
if err := state.UpsertAllocs(1001, []*structs.Allocation{alloc}); err != nil {
t.Fatalf("err: %v", err)
}

// Update the time tables to make this work
tt := s1.fsm.TimeTable()
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.NodeGCThreshold))

// Create a core scheduler
snap, err := state.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core := NewCoreScheduler(s1, snap)

// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobNodeGC)
gc.ModifyIndex = 2000
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
}

// Should be gone
out, err := state.NodeByID(node.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != nil {
t.Fatalf("bad: %v", out)
}
}

func TestCoreScheduler_NodeGC_RunningAllocs(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)

// Insert "dead" node
state := s1.fsm.State()
node := mock.Node()
node.Status = structs.NodeStatusDown
err := state.UpsertNode(1000, node)
if err != nil {
t.Fatalf("err: %v", err)
}

// Insert a running alloc on that node
alloc := mock.Alloc()
alloc.NodeID = node.ID
alloc.DesiredStatus = structs.AllocDesiredStatusRun
alloc.ClientStatus = structs.AllocClientStatusRunning
if err := state.UpsertAllocs(1001, []*structs.Allocation{alloc}); err != nil {
t.Fatalf("err: %v", err)
}

// Update the time tables to make this work
tt := s1.fsm.TimeTable()
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.NodeGCThreshold))

// Create a core scheduler
snap, err := state.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core := NewCoreScheduler(s1, snap)

// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobNodeGC)
gc.ModifyIndex = 2000
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
}

// Should still be here
out, err := state.NodeByID(node.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("bad: %v", out)
}
}

func TestCoreScheduler_NodeGC_Force(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
Expand Down