Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add garbage collection to jobs #586

Merged
merged 8 commits into from
Dec 18, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions nomad/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,14 @@ type Config struct {
// for GC. This gives users some time to debug a failed evaluation.
EvalGCThreshold time.Duration

// JobGCInterval is how often we dispatch a job to GC jobs that are
// available for garbage collection.
JobGCInterval time.Duration

// JobGCThreshold is how old a job must be before it eligible for GC. This gives
// the user time to inspect the job.
JobGCThreshold time.Duration

// NodeGCInterval is how often we dispatch a job to GC failed nodes.
NodeGCInterval time.Duration

Expand Down Expand Up @@ -202,6 +210,8 @@ func DefaultConfig() *Config {
ReconcileInterval: 60 * time.Second,
EvalGCInterval: 5 * time.Minute,
EvalGCThreshold: 1 * time.Hour,
JobGCInterval: 5 * time.Minute,
JobGCThreshold: 4 * time.Hour,
NodeGCInterval: 5 * time.Minute,
NodeGCThreshold: 24 * time.Hour,
EvalNackTimeout: 60 * time.Second,
Expand Down
161 changes: 130 additions & 31 deletions nomad/core_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,90 @@ func (s *CoreScheduler) Process(eval *structs.Evaluation) error {
return s.evalGC(eval)
case structs.CoreJobNodeGC:
return s.nodeGC(eval)
case structs.CoreJobJobGC:
return s.jobGC(eval)
default:
return fmt.Errorf("core scheduler cannot handle job '%s'", eval.JobID)
}
}

// jobGC is used to garbage collect eligible jobs.
func (c *CoreScheduler) jobGC(eval *structs.Evaluation) error {
// Get all the jobs eligible for garbage collection.
iter, err := c.snap.JobsByGC(true)
if err != nil {
return err
}

// Get the time table to calculate GC cutoffs.
tt := c.srv.fsm.TimeTable()
cutoff := time.Now().UTC().Add(-1 * c.srv.config.JobGCThreshold)
oldThreshold := tt.NearestIndex(cutoff)
c.srv.logger.Printf("[DEBUG] sched.core: job GC: scanning before index %d (%v)",
oldThreshold, c.srv.config.JobGCThreshold)

// Collect the allocations, evaluations and jobs to GC
var gcAlloc, gcEval, gcJob []string

OUTER:
for i := iter.Next(); i != nil; i = iter.Next() {
job := i.(*structs.Job)

// Ignore new jobs.
if job.CreateIndex > oldThreshold {
continue
}

evals, err := c.snap.EvalsByJob(job.ID)
if err != nil {
c.srv.logger.Printf("[ERR] sched.core: failed to get evals for job %s: %v", job.ID, err)
continue
}

for _, eval := range evals {
gc, allocs, err := c.gcEval(eval, oldThreshold)
if err != nil || !gc {
continue OUTER
}

gcEval = append(gcEval, eval.ID)
gcAlloc = append(gcAlloc, allocs...)
}

// Job is eligible for garbage collection
gcJob = append(gcJob, job.ID)
}

// Fast-path the nothing case
if len(gcEval) == 0 && len(gcAlloc) == 0 && len(gcJob) == 0 {
return nil
}
c.srv.logger.Printf("[DEBUG] sched.core: job GC: %d jobs, %d evaluations, %d allocs eligible",
len(gcJob), len(gcEval), len(gcAlloc))

// Reap the evals and allocs
if err := c.evalReap(gcEval, gcAlloc); err != nil {
return err
}

// Call to the leader to deregister the jobs.
for _, job := range gcJob {
req := structs.JobDeregisterRequest{
JobID: job,
WriteRequest: structs.WriteRequest{
Region: c.srv.config.Region,
},
}
var resp structs.JobDeregisterResponse
if err := c.srv.RPC("Job.Deregister", &req, &resp); err != nil {
c.srv.logger.Printf("[ERR] sched.core: job deregister failed: %v", err)
return err
}
}

return nil
}

// evalGC is used to garbage collect old evaluations
func (c *CoreScheduler) evalGC(eval *structs.Evaluation) error {
// Iterate over the evaluations
Expand All @@ -57,39 +136,16 @@ func (c *CoreScheduler) evalGC(eval *structs.Evaluation) error {

// Collect the allocations and evaluations to GC
var gcAlloc, gcEval []string

OUTER:
for {
raw := iter.Next()
if raw == nil {
break
}
for raw := iter.Next(); raw != nil; raw = iter.Next() {
eval := raw.(*structs.Evaluation)

// Ignore non-terminal and new evaluations
if !eval.TerminalStatus() || eval.ModifyIndex > oldThreshold {
continue
}

// Get the allocations by eval
allocs, err := c.snap.AllocsByEval(eval.ID)
gc, allocs, err := c.gcEval(eval, oldThreshold)
if err != nil {
c.srv.logger.Printf("[ERR] sched.core: failed to get allocs for eval %s: %v",
eval.ID, err)
continue
}

// Scan the allocations to ensure they are terminal and old
for _, alloc := range allocs {
if !alloc.TerminalStatus() || alloc.ModifyIndex > oldThreshold {
continue OUTER
}
return err
}

// Evaluation is eligible for garbage collection
gcEval = append(gcEval, eval.ID)
for _, alloc := range allocs {
gcAlloc = append(gcAlloc, alloc.ID)
if gc {
gcEval = append(gcEval, eval.ID)
gcAlloc = append(gcAlloc, allocs...)
}
}

Expand All @@ -100,10 +156,52 @@ OUTER:
c.srv.logger.Printf("[DEBUG] sched.core: eval GC: %d evaluations, %d allocs eligible",
len(gcEval), len(gcAlloc))

return c.evalReap(gcEval, gcAlloc)
}

// gcEval returns whether the eval should be garbage collected given a raft
// threshold index. The eval disqualifies for garbage collection if it or its
// allocs are not older than the threshold. If the eval should be garbage
// collected, the associated alloc ids that should also be removed are also
// returned
func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64) (
bool, []string, error) {
// Ignore non-terminal and new evaluations
if !eval.TerminalStatus() || eval.ModifyIndex > thresholdIndex {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put this logic in eval?

func (e *Evaluation) IsGCable(thresholdIdx int) {
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its pretty specific to what we are doing here and wouldn't really be useful anywhere else.

return false, nil, nil
}

// Get the allocations by eval
allocs, err := c.snap.AllocsByEval(eval.ID)
if err != nil {
c.srv.logger.Printf("[ERR] sched.core: failed to get allocs for eval %s: %v",
eval.ID, err)
return false, nil, err
}

// Scan the allocations to ensure they are terminal and old
for _, alloc := range allocs {
if !alloc.TerminalStatus() || alloc.ModifyIndex > thresholdIndex {
return false, nil, nil
}
}

allocIds := make([]string, len(allocs))
for i, alloc := range allocs {
allocIds[i] = alloc.ID
}

// Evaluation is eligible for garbage collection
return true, allocIds, nil
}

// evalReap contacts the leader and issues a reap on the passed evals and
// allocs.
func (c *CoreScheduler) evalReap(evals, allocs []string) error {
// Call to the leader to issue the reap
req := structs.EvalDeleteRequest{
Evals: gcEval,
Allocs: gcAlloc,
Evals: evals,
Allocs: allocs,
WriteRequest: structs.WriteRequest{
Region: c.srv.config.Region,
},
Expand All @@ -113,6 +211,7 @@ OUTER:
c.srv.logger.Printf("[ERR] sched.core: eval reap failed: %v", err)
return err
}

return nil
}

Expand Down
104 changes: 104 additions & 0 deletions nomad/core_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,107 @@ func TestCoreScheduler_NodeGC(t *testing.T) {
t.Fatalf("bad: %v", out)
}
}

func TestCoreScheduler_JobGC(t *testing.T) {
tests := []struct {
test, evalStatus, allocStatus string
shouldExist bool
}{
{
test: "Terminal",
evalStatus: structs.EvalStatusFailed,
allocStatus: structs.AllocDesiredStatusFailed,
shouldExist: false,
},
{
test: "Has Alloc",
evalStatus: structs.EvalStatusFailed,
allocStatus: structs.AllocDesiredStatusRun,
shouldExist: true,
},
{
test: "Has Eval",
evalStatus: structs.EvalStatusPending,
allocStatus: structs.AllocDesiredStatusFailed,
shouldExist: true,
},
}

for _, test := range tests {
s1 := testServer(t, nil)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)

// Insert job.
state := s1.fsm.State()
job := mock.Job()
job.GC = true
err := state.UpsertJob(1000, job)
if err != nil {
t.Fatalf("test(%s) err: %v", test.test, err)
}

// Insert eval
eval := mock.Eval()
eval.JobID = job.ID
eval.Status = test.evalStatus
err = state.UpsertEvals(1001, []*structs.Evaluation{eval})
if err != nil {
t.Fatalf("test(%s) err: %v", test.test, err)
}

// Insert alloc
alloc := mock.Alloc()
alloc.JobID = job.ID
alloc.EvalID = eval.ID
alloc.DesiredStatus = test.allocStatus
err = state.UpsertAllocs(1002, []*structs.Allocation{alloc})
if err != nil {
t.Fatalf("test(%s) err: %v", test.test, err)
}

// Update the time tables to make this work
tt := s1.fsm.TimeTable()
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.JobGCThreshold))

// Create a core scheduler
snap, err := state.Snapshot()
if err != nil {
t.Fatalf("test(%s) err: %v", test.test, err)
}
core := NewCoreScheduler(s1, snap)

// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobJobGC)
gc.ModifyIndex = 2000
err = core.Process(gc)
if err != nil {
t.Fatalf("test(%s) err: %v", test.test, err)
}

// Should still exist
out, err := state.JobByID(job.ID)
if err != nil {
t.Fatalf("test(%s) err: %v", err)
}
if (test.shouldExist && out == nil) || (!test.shouldExist && out != nil) {
t.Fatalf("test(%s) bad: %v", test.test, out)
}

outE, err := state.EvalByID(eval.ID)
if err != nil {
t.Fatalf("test(%s) err: %v", err)
}
if (test.shouldExist && outE == nil) || (!test.shouldExist && outE != nil) {
t.Fatalf("test(%s) bad: %v", test.test, out)
}

outA, err := state.AllocByID(alloc.ID)
if err != nil {
t.Fatalf("test(%s) err: %v", err)
}
if (test.shouldExist && outA == nil) || (!test.shouldExist && outA != nil) {
t.Fatalf("test(%s) bad: %v", test.test, outA)
}
}
}
22 changes: 19 additions & 3 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package nomad

import (
"errors"
"fmt"
"time"

Expand All @@ -25,12 +26,17 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
if args.Job == nil {
return fmt.Errorf("missing job for registration")
}
if err := args.Job.Validate(); err != nil {

if err := j.checkBlacklist(args.Job); err != nil {
return err
}

// Initialize all the fields of services
args.Job.InitAllServiceFields()
// Initialize the job fields (sets defaults and any necessary init work).
args.Job.InitFields()

if err := args.Job.Validate(); err != nil {
return err
}

if args.Job.Type == structs.JobTypeCore {
return fmt.Errorf("job type cannot be core")
Expand Down Expand Up @@ -75,6 +81,16 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
return nil
}

// checkBlacklist returns an error if the user has set any blacklisted field in
// the job.
func (j *Job) checkBlacklist(job *structs.Job) error {
if job.GC {
return errors.New("GC field of a job is used only internally and should not be set by user")
}

return nil
}

// Evaluate is used to force a job for re-evaluation
func (j *Job) Evaluate(args *structs.JobEvaluateRequest, reply *structs.JobRegisterResponse) error {
if done, err := j.srv.forward("Job.Evaluate", args, args, reply); done {
Expand Down
Loading