diff --git a/nomad/core_sched.go b/nomad/core_sched.go index d433a7d15e0d..00cae976a71e 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -105,20 +105,26 @@ OUTER: continue } + allEvalsGC := true for _, eval := range evals { gc, allocs, err := c.gcEval(eval, oldThreshold) - if err != nil || !gc { - // We skip the job because it is not finished if it has - // non-terminal allocations. + if err != nil { continue OUTER } - gcEval = append(gcEval, eval.ID) + // Update whether all evals GC'd so we know whether to GC the job. + allEvalsGC = allEvalsGC && gc + + if gc { + gcEval = append(gcEval, eval.ID) + } gcAlloc = append(gcAlloc, allocs...) } // Job is eligible for garbage collection - gcJob = append(gcJob, job.ID) + if allEvalsGC { + gcJob = append(gcJob, job.ID) + } } // Fast-path the nothing case @@ -186,28 +192,10 @@ func (c *CoreScheduler) evalGC(eval *structs.Evaluation) error { return err } - // If the eval is from a running "batch" job we don't want to garbage - // collect its allocations. If there is a long running batch job and its - // terminal allocations get GC'd the scheduler would re-run the - // allocations. - if eval.Type == structs.JobTypeBatch { - // Check if the job is running - job, err := c.snap.JobByID(eval.JobID) - if err != nil { - return err - } - - // If the job has been deregistered, we want to garbage collect the - // allocations and evaluations. - if job != nil && len(allocs) != 0 { - continue - } - } - if gc { gcEval = append(gcEval, eval.ID) - gcAlloc = append(gcAlloc, allocs...) } + gcAlloc = append(gcAlloc, allocs...) } // Fast-path the nothing case @@ -232,6 +220,24 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64) return false, nil, nil } + // If the eval is from a running "batch" job we don't want to garbage + // collect its allocations. If there is a long running batch job and its + // terminal allocations get GC'd the scheduler would re-run the + // allocations. + if eval.Type == structs.JobTypeBatch { + // Check if the job is running + job, err := c.snap.JobByID(eval.JobID) + if err != nil { + return false, nil, err + } + + // If the job has been deregistered, we want to garbage collect the + // allocations and evaluations. + if job != nil { + return false, nil, nil + } + } + // Get the allocations by eval allocs, err := c.snap.AllocsByEval(eval.ID) if err != nil { @@ -241,19 +247,20 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64) } // Scan the allocations to ensure they are terminal and old + gcEval := true + var gcAllocIDs []string for _, alloc := range allocs { if !alloc.TerminalStatus() || alloc.ModifyIndex > thresholdIndex { - return false, nil, nil + // Can't GC the evaluation since not all of the allocations are + // terminal + gcEval = false + } else { + // The allocation is eligible to be GC'd + gcAllocIDs = append(gcAllocIDs, alloc.ID) } } - allocIds := make([]string, len(allocs)) - for i, alloc := range allocs { - allocIds[i] = alloc.ID - } - - // Evaluation is eligible for garbage collection - return true, allocIds, nil + return gcEval, gcAllocIDs, nil } // evalReap contacts the leader and issues a reap on the passed evals and @@ -343,6 +350,7 @@ func (c *CoreScheduler) nodeGC(eval *structs.Evaluation) error { // Collect the nodes to GC var gcNode []string +OUTER: for { raw := iter.Next() if raw == nil { @@ -363,9 +371,14 @@ func (c *CoreScheduler) nodeGC(eval *structs.Evaluation) error { continue } - // If there are any allocations, skip the node - if len(allocs) > 0 { - continue + // If there are any non-terminal allocations, skip the node. If the node + // is terminal and the allocations are not, the scheduler may not have + // run yet to transistion the allocs on the node to terminal. We delay + // GC'ing until this happens. + for _, alloc := range allocs { + if !alloc.TerminalStatus() { + continue OUTER + } } // Node is eligible for garbage collection diff --git a/nomad/core_sched_test.go b/nomad/core_sched_test.go index 5854f1649961..f0d6c4165ec1 100644 --- a/nomad/core_sched_test.go +++ b/nomad/core_sched_test.go @@ -69,6 +69,83 @@ func TestCoreScheduler_EvalGC(t *testing.T) { } } +func TestCoreScheduler_EvalGC_Partial(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + // Insert "dead" eval + state := s1.fsm.State() + eval := mock.Eval() + eval.Status = structs.EvalStatusComplete + err := state.UpsertEvals(1000, []*structs.Evaluation{eval}) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Insert "dead" alloc + alloc := mock.Alloc() + alloc.EvalID = eval.ID + alloc.DesiredStatus = structs.AllocDesiredStatusFailed + err = state.UpsertAllocs(1001, []*structs.Allocation{alloc}) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Insert "running" alloc + alloc2 := mock.Alloc() + alloc2.EvalID = eval.ID + err = state.UpsertAllocs(1002, []*structs.Allocation{alloc2}) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Update the time tables to make this work + tt := s1.fsm.TimeTable() + tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.EvalGCThreshold)) + + // Create a core scheduler + snap, err := state.Snapshot() + if err != nil { + t.Fatalf("err: %v", err) + } + core := NewCoreScheduler(s1, snap) + + // Attempt the GC + gc := s1.coreJobEval(structs.CoreJobEvalGC) + gc.ModifyIndex = 2000 + err = core.Process(gc) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Should not be gone + out, err := state.EvalByID(eval.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if out == nil { + t.Fatalf("bad: %v", out) + } + + outA, err := state.AllocByID(alloc2.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if outA == nil { + t.Fatalf("bad: %v", outA) + } + + // Should be gone + outB, err := state.AllocByID(alloc.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if outB != nil { + t.Fatalf("bad: %v", outB) + } +} + func TestCoreScheduler_EvalGC_Batch_NoAllocs(t *testing.T) { s1 := testServer(t, nil) defer s1.Shutdown() @@ -334,6 +411,108 @@ func TestCoreScheduler_NodeGC(t *testing.T) { } } +func TestCoreScheduler_NodeGC_TerminalAllocs(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + // Insert "dead" node + state := s1.fsm.State() + node := mock.Node() + node.Status = structs.NodeStatusDown + err := state.UpsertNode(1000, node) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Insert a terminal alloc on that node + alloc := mock.Alloc() + alloc.DesiredStatus = structs.AllocDesiredStatusStop + if err := state.UpsertAllocs(1001, []*structs.Allocation{alloc}); err != nil { + t.Fatalf("err: %v", err) + } + + // Update the time tables to make this work + tt := s1.fsm.TimeTable() + tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.NodeGCThreshold)) + + // Create a core scheduler + snap, err := state.Snapshot() + if err != nil { + t.Fatalf("err: %v", err) + } + core := NewCoreScheduler(s1, snap) + + // Attempt the GC + gc := s1.coreJobEval(structs.CoreJobNodeGC) + gc.ModifyIndex = 2000 + err = core.Process(gc) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Should be gone + out, err := state.NodeByID(node.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if out != nil { + t.Fatalf("bad: %v", out) + } +} + +func TestCoreScheduler_NodeGC_RunningAllocs(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + // Insert "dead" node + state := s1.fsm.State() + node := mock.Node() + node.Status = structs.NodeStatusDown + err := state.UpsertNode(1000, node) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Insert a running alloc on that node + alloc := mock.Alloc() + alloc.NodeID = node.ID + alloc.DesiredStatus = structs.AllocDesiredStatusRun + alloc.ClientStatus = structs.AllocClientStatusRunning + if err := state.UpsertAllocs(1001, []*structs.Allocation{alloc}); err != nil { + t.Fatalf("err: %v", err) + } + + // Update the time tables to make this work + tt := s1.fsm.TimeTable() + tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.NodeGCThreshold)) + + // Create a core scheduler + snap, err := state.Snapshot() + if err != nil { + t.Fatalf("err: %v", err) + } + core := NewCoreScheduler(s1, snap) + + // Attempt the GC + gc := s1.coreJobEval(structs.CoreJobNodeGC) + gc.ModifyIndex = 2000 + err = core.Process(gc) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Should still be here + out, err := state.NodeByID(node.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if out == nil { + t.Fatalf("bad: %v", out) + } +} + func TestCoreScheduler_NodeGC_Force(t *testing.T) { s1 := testServer(t, nil) defer s1.Shutdown()