Skip to content

Commit

Permalink
Merge pull request #586 from hashicorp/f-job-gc
Browse files Browse the repository at this point in the history
Add garbage collection to jobs
  • Loading branch information
dadgar committed Dec 18, 2015
2 parents 2977583 + bc13dca commit 9daeae7
Show file tree
Hide file tree
Showing 14 changed files with 470 additions and 48 deletions.
10 changes: 10 additions & 0 deletions nomad/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,14 @@ type Config struct {
// for GC. This gives users some time to debug a failed evaluation.
EvalGCThreshold time.Duration

// JobGCInterval is how often we dispatch a job to GC jobs that are
// available for garbage collection.
JobGCInterval time.Duration

// JobGCThreshold is how old a job must be before it eligible for GC. This gives
// the user time to inspect the job.
JobGCThreshold time.Duration

// NodeGCInterval is how often we dispatch a job to GC failed nodes.
NodeGCInterval time.Duration

Expand Down Expand Up @@ -202,6 +210,8 @@ func DefaultConfig() *Config {
ReconcileInterval: 60 * time.Second,
EvalGCInterval: 5 * time.Minute,
EvalGCThreshold: 1 * time.Hour,
JobGCInterval: 5 * time.Minute,
JobGCThreshold: 4 * time.Hour,
NodeGCInterval: 5 * time.Minute,
NodeGCThreshold: 24 * time.Hour,
EvalNackTimeout: 60 * time.Second,
Expand Down
161 changes: 130 additions & 31 deletions nomad/core_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,90 @@ func (s *CoreScheduler) Process(eval *structs.Evaluation) error {
return s.evalGC(eval)
case structs.CoreJobNodeGC:
return s.nodeGC(eval)
case structs.CoreJobJobGC:
return s.jobGC(eval)
default:
return fmt.Errorf("core scheduler cannot handle job '%s'", eval.JobID)
}
}

// jobGC is used to garbage collect eligible jobs.
func (c *CoreScheduler) jobGC(eval *structs.Evaluation) error {
// Get all the jobs eligible for garbage collection.
iter, err := c.snap.JobsByGC(true)
if err != nil {
return err
}

// Get the time table to calculate GC cutoffs.
tt := c.srv.fsm.TimeTable()
cutoff := time.Now().UTC().Add(-1 * c.srv.config.JobGCThreshold)
oldThreshold := tt.NearestIndex(cutoff)
c.srv.logger.Printf("[DEBUG] sched.core: job GC: scanning before index %d (%v)",
oldThreshold, c.srv.config.JobGCThreshold)

// Collect the allocations, evaluations and jobs to GC
var gcAlloc, gcEval, gcJob []string

OUTER:
for i := iter.Next(); i != nil; i = iter.Next() {
job := i.(*structs.Job)

// Ignore new jobs.
if job.CreateIndex > oldThreshold {
continue
}

evals, err := c.snap.EvalsByJob(job.ID)
if err != nil {
c.srv.logger.Printf("[ERR] sched.core: failed to get evals for job %s: %v", job.ID, err)
continue
}

for _, eval := range evals {
gc, allocs, err := c.gcEval(eval, oldThreshold)
if err != nil || !gc {
continue OUTER
}

gcEval = append(gcEval, eval.ID)
gcAlloc = append(gcAlloc, allocs...)
}

// Job is eligible for garbage collection
gcJob = append(gcJob, job.ID)
}

// Fast-path the nothing case
if len(gcEval) == 0 && len(gcAlloc) == 0 && len(gcJob) == 0 {
return nil
}
c.srv.logger.Printf("[DEBUG] sched.core: job GC: %d jobs, %d evaluations, %d allocs eligible",
len(gcJob), len(gcEval), len(gcAlloc))

// Reap the evals and allocs
if err := c.evalReap(gcEval, gcAlloc); err != nil {
return err
}

// Call to the leader to deregister the jobs.
for _, job := range gcJob {
req := structs.JobDeregisterRequest{
JobID: job,
WriteRequest: structs.WriteRequest{
Region: c.srv.config.Region,
},
}
var resp structs.JobDeregisterResponse
if err := c.srv.RPC("Job.Deregister", &req, &resp); err != nil {
c.srv.logger.Printf("[ERR] sched.core: job deregister failed: %v", err)
return err
}
}

return nil
}

// evalGC is used to garbage collect old evaluations
func (c *CoreScheduler) evalGC(eval *structs.Evaluation) error {
// Iterate over the evaluations
Expand All @@ -57,39 +136,16 @@ func (c *CoreScheduler) evalGC(eval *structs.Evaluation) error {

// Collect the allocations and evaluations to GC
var gcAlloc, gcEval []string

OUTER:
for {
raw := iter.Next()
if raw == nil {
break
}
for raw := iter.Next(); raw != nil; raw = iter.Next() {
eval := raw.(*structs.Evaluation)

// Ignore non-terminal and new evaluations
if !eval.TerminalStatus() || eval.ModifyIndex > oldThreshold {
continue
}

// Get the allocations by eval
allocs, err := c.snap.AllocsByEval(eval.ID)
gc, allocs, err := c.gcEval(eval, oldThreshold)
if err != nil {
c.srv.logger.Printf("[ERR] sched.core: failed to get allocs for eval %s: %v",
eval.ID, err)
continue
}

// Scan the allocations to ensure they are terminal and old
for _, alloc := range allocs {
if !alloc.TerminalStatus() || alloc.ModifyIndex > oldThreshold {
continue OUTER
}
return err
}

// Evaluation is eligible for garbage collection
gcEval = append(gcEval, eval.ID)
for _, alloc := range allocs {
gcAlloc = append(gcAlloc, alloc.ID)
if gc {
gcEval = append(gcEval, eval.ID)
gcAlloc = append(gcAlloc, allocs...)
}
}

Expand All @@ -100,10 +156,52 @@ OUTER:
c.srv.logger.Printf("[DEBUG] sched.core: eval GC: %d evaluations, %d allocs eligible",
len(gcEval), len(gcAlloc))

return c.evalReap(gcEval, gcAlloc)
}

// gcEval returns whether the eval should be garbage collected given a raft
// threshold index. The eval disqualifies for garbage collection if it or its
// allocs are not older than the threshold. If the eval should be garbage
// collected, the associated alloc ids that should also be removed are also
// returned
func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64) (
bool, []string, error) {
// Ignore non-terminal and new evaluations
if !eval.TerminalStatus() || eval.ModifyIndex > thresholdIndex {
return false, nil, nil
}

// Get the allocations by eval
allocs, err := c.snap.AllocsByEval(eval.ID)
if err != nil {
c.srv.logger.Printf("[ERR] sched.core: failed to get allocs for eval %s: %v",
eval.ID, err)
return false, nil, err
}

// Scan the allocations to ensure they are terminal and old
for _, alloc := range allocs {
if !alloc.TerminalStatus() || alloc.ModifyIndex > thresholdIndex {
return false, nil, nil
}
}

allocIds := make([]string, len(allocs))
for i, alloc := range allocs {
allocIds[i] = alloc.ID
}

// Evaluation is eligible for garbage collection
return true, allocIds, nil
}

// evalReap contacts the leader and issues a reap on the passed evals and
// allocs.
func (c *CoreScheduler) evalReap(evals, allocs []string) error {
// Call to the leader to issue the reap
req := structs.EvalDeleteRequest{
Evals: gcEval,
Allocs: gcAlloc,
Evals: evals,
Allocs: allocs,
WriteRequest: structs.WriteRequest{
Region: c.srv.config.Region,
},
Expand All @@ -113,6 +211,7 @@ OUTER:
c.srv.logger.Printf("[ERR] sched.core: eval reap failed: %v", err)
return err
}

return nil
}

Expand Down
104 changes: 104 additions & 0 deletions nomad/core_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,107 @@ func TestCoreScheduler_NodeGC(t *testing.T) {
t.Fatalf("bad: %v", out)
}
}

func TestCoreScheduler_JobGC(t *testing.T) {
tests := []struct {
test, evalStatus, allocStatus string
shouldExist bool
}{
{
test: "Terminal",
evalStatus: structs.EvalStatusFailed,
allocStatus: structs.AllocDesiredStatusFailed,
shouldExist: false,
},
{
test: "Has Alloc",
evalStatus: structs.EvalStatusFailed,
allocStatus: structs.AllocDesiredStatusRun,
shouldExist: true,
},
{
test: "Has Eval",
evalStatus: structs.EvalStatusPending,
allocStatus: structs.AllocDesiredStatusFailed,
shouldExist: true,
},
}

for _, test := range tests {
s1 := testServer(t, nil)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)

// Insert job.
state := s1.fsm.State()
job := mock.Job()
job.GC = true
err := state.UpsertJob(1000, job)
if err != nil {
t.Fatalf("test(%s) err: %v", test.test, err)
}

// Insert eval
eval := mock.Eval()
eval.JobID = job.ID
eval.Status = test.evalStatus
err = state.UpsertEvals(1001, []*structs.Evaluation{eval})
if err != nil {
t.Fatalf("test(%s) err: %v", test.test, err)
}

// Insert alloc
alloc := mock.Alloc()
alloc.JobID = job.ID
alloc.EvalID = eval.ID
alloc.DesiredStatus = test.allocStatus
err = state.UpsertAllocs(1002, []*structs.Allocation{alloc})
if err != nil {
t.Fatalf("test(%s) err: %v", test.test, err)
}

// Update the time tables to make this work
tt := s1.fsm.TimeTable()
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.JobGCThreshold))

// Create a core scheduler
snap, err := state.Snapshot()
if err != nil {
t.Fatalf("test(%s) err: %v", test.test, err)
}
core := NewCoreScheduler(s1, snap)

// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobJobGC)
gc.ModifyIndex = 2000
err = core.Process(gc)
if err != nil {
t.Fatalf("test(%s) err: %v", test.test, err)
}

// Should still exist
out, err := state.JobByID(job.ID)
if err != nil {
t.Fatalf("test(%s) err: %v", err)
}
if (test.shouldExist && out == nil) || (!test.shouldExist && out != nil) {
t.Fatalf("test(%s) bad: %v", test.test, out)
}

outE, err := state.EvalByID(eval.ID)
if err != nil {
t.Fatalf("test(%s) err: %v", err)
}
if (test.shouldExist && outE == nil) || (!test.shouldExist && outE != nil) {
t.Fatalf("test(%s) bad: %v", test.test, out)
}

outA, err := state.AllocByID(alloc.ID)
if err != nil {
t.Fatalf("test(%s) err: %v", err)
}
if (test.shouldExist && outA == nil) || (!test.shouldExist && outA != nil) {
t.Fatalf("test(%s) bad: %v", test.test, outA)
}
}
}
22 changes: 19 additions & 3 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package nomad

import (
"errors"
"fmt"
"time"

Expand All @@ -25,12 +26,17 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
if args.Job == nil {
return fmt.Errorf("missing job for registration")
}
if err := args.Job.Validate(); err != nil {

if err := j.checkBlacklist(args.Job); err != nil {
return err
}

// Initialize all the fields of services
args.Job.InitAllServiceFields()
// Initialize the job fields (sets defaults and any necessary init work).
args.Job.InitFields()

if err := args.Job.Validate(); err != nil {
return err
}

if args.Job.Type == structs.JobTypeCore {
return fmt.Errorf("job type cannot be core")
Expand Down Expand Up @@ -75,6 +81,16 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
return nil
}

// checkBlacklist returns an error if the user has set any blacklisted field in
// the job.
func (j *Job) checkBlacklist(job *structs.Job) error {
if job.GC {
return errors.New("GC field of a job is used only internally and should not be set by user")
}

return nil
}

// Evaluate is used to force a job for re-evaluation
func (j *Job) Evaluate(args *structs.JobEvaluateRequest, reply *structs.JobRegisterResponse) error {
if done, err := j.srv.forward("Job.Evaluate", args, args, reply); done {
Expand Down
Loading

0 comments on commit 9daeae7

Please sign in to comment.