diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 36d459efa183..1fc294cf6974 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -198,6 +198,32 @@ func (s *GenericScheduler) process() (bool, error) { return true, nil } +// filterCompleteAllocs filters allocations that are terminal and should be +// re-placed. +func (s *GenericScheduler) filterCompleteAllocs(allocs []*structs.Allocation) []*structs.Allocation { + filter := func(a *structs.Allocation) bool { + // Allocs from batch jobs should be filtered when their status is failed so that + // they will be replaced. If they are dead but not failed, they + // shouldn't be replaced. + if s.batch { + return a.ClientStatus == structs.AllocClientStatusFailed + } + + // Filter terminal, non batch allocations + return a.TerminalStatus() + } + + n := len(allocs) + for i := 0; i < n; i++ { + if filter(allocs[i]) { + allocs[i], allocs[n-1] = allocs[n-1], nil + i-- + n-- + } + } + return allocs[:n] +} + // computeJobAllocs is used to reconcile differences between the job, // existing allocations and node status to update the allocations. func (s *GenericScheduler) computeJobAllocs() error { @@ -215,7 +241,7 @@ func (s *GenericScheduler) computeJobAllocs() error { } // Filter out the allocations in a terminal state - allocs = structs.FilterTerminalAllocs(allocs) + allocs = s.filterCompleteAllocs(allocs) // Determine the tainted nodes containing job allocs tainted, err := taintedNodes(s.state, allocs) diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index 2c77eff29ce3..e3711ae24bd1 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -696,3 +696,107 @@ func TestServiceSched_RetryLimit(t *testing.T) { // Should hit the retry limit h.AssertEvalStatus(t, structs.EvalStatusFailed) } + +func TestBatchSched_Run_DeadAlloc(t *testing.T) { + h := NewHarness(t) + + // Create a node + node := mock.Node() + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + + // Create a job + job := mock.Job() + job.TaskGroups[0].Count = 1 + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + // Create a failed alloc + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = node.ID + alloc.Name = "my-job.web[0]" + alloc.ClientStatus = structs.AllocClientStatusDead + noErr(t, h.State.UpsertAllocs(h.NextIndex(), []*structs.Allocation{alloc})) + + // Create a mock evaluation to register the job + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + } + + // Process the evaluation + err := h.Process(NewBatchScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure no plan as it should be a no-op + if len(h.Plans) != 0 { + t.Fatalf("bad: %#v", h.Plans) + } + + // Lookup the allocations by JobID + out, err := h.State.AllocsByJob(job.ID) + noErr(t, err) + + // Ensure no allocations placed + if len(out) != 1 { + t.Fatalf("bad: %#v", out) + } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) +} + +func TestBatchSched_Run_FailedAlloc(t *testing.T) { + h := NewHarness(t) + + // Create a node + node := mock.Node() + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + + // Create a job + job := mock.Job() + job.TaskGroups[0].Count = 1 + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + // Create a failed alloc + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = node.ID + alloc.Name = "my-job.web[0]" + alloc.ClientStatus = structs.AllocClientStatusFailed + noErr(t, h.State.UpsertAllocs(h.NextIndex(), []*structs.Allocation{alloc})) + + // Create a mock evaluation to register the job + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + } + + // Process the evaluation + err := h.Process(NewBatchScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure no plan as it should be a no-op + if len(h.Plans) != 1 { + t.Fatalf("bad: %#v", h.Plans) + } + + // Lookup the allocations by JobID + out, err := h.State.AllocsByJob(job.ID) + noErr(t, err) + + // Ensure a replacement alloc was placed. + if len(out) != 2 { + t.Fatalf("bad: %#v", out) + } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) +}