Skip to content

Commit

Permalink
Merge pull request #1471 from hashicorp/b-handle-old-batch-allocs
Browse files Browse the repository at this point in the history
filterCompleteAllocs filters replaced batch allocs
  • Loading branch information
dadgar committed Jul 28, 2016
2 parents 30dd65e + 23414a5 commit 6272c1c
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 23 deletions.
43 changes: 20 additions & 23 deletions scheduler/generic_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"log"

"github.com/davecgh/go-spew/spew"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/nomad/structs"
)
Expand Down Expand Up @@ -309,7 +308,26 @@ func (s *GenericScheduler) filterCompleteAllocs(allocs []*structs.Allocation) []
n--
}
}
return allocs[:n]

// If the job is batch, we want to filter allocations that have been
// replaced by a newer version for the same task group.
filtered := allocs[:n]
if s.batch {
byTG := make(map[string]*structs.Allocation)
for _, alloc := range filtered {
existing := byTG[alloc.TaskGroup]
if existing == nil || existing.CreateIndex < alloc.CreateIndex {
byTG[alloc.TaskGroup] = alloc
}
}

filtered = make([]*structs.Allocation, 0, len(byTG))
for _, alloc := range byTG {
filtered = append(filtered, alloc)
}
}

return filtered
}

// computeJobAllocs is used to reconcile differences between the job,
Expand Down Expand Up @@ -342,27 +360,6 @@ func (s *GenericScheduler) computeJobAllocs() error {
diff := diffAllocs(s.job, tainted, groups, allocs)
s.logger.Printf("[DEBUG] sched: %#v: %#v", s.eval, diff)

// XXX: For debugging purposes only. An issue was observed where a job had a
// task group with count > 0 that produced a diff where no action would be
// taken (every slice was empty). Below we dump debug information if this
// condition is hit.
diffSum := len(diff.stop) + len(diff.place) + len(diff.ignore) +
len(diff.update) + len(diff.migrate)
if diffSum == 0 && len(groups) != 0 {
s.logger.Printf("[ERR] sched: %d tasks to schedule but scheduler believes there is no work", len(groups))

// Get the original set of allocations for the job.
jobAllocs, err := s.state.AllocsByJob(s.eval.JobID)
if err != nil {
return fmt.Errorf("failed to get allocs for job '%s': %v", s.eval.JobID, err)
}
s.logger.Printf("[DEBUG] sched: job: %s", spew.Sdump(s.job))
s.logger.Printf("[DEBUG] sched: materializeTaskGroups() returned: %s", spew.Sdump(groups))
s.logger.Printf("[DEBUG] sched: AllocsByJob(%q) returned: %s", s.eval.JobID, spew.Sdump(jobAllocs))
s.logger.Printf("[DEBUG] sched: filterCompleteAllocs(): %s", spew.Sdump(allocs))
s.logger.Printf("[DEBUG] sched: taintedNodes(): %s", spew.Sdump(tainted))
}

// Add all the allocs to stop
for _, e := range diff.stop {
s.plan.AppendUpdate(e.Alloc, structs.AllocDesiredStatusStop, allocNotNeeded)
Expand Down
74 changes: 74 additions & 0 deletions scheduler/generic_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1866,3 +1866,77 @@ func TestBatchSched_ReRun_SuccessfullyFinishedAlloc(t *testing.T) {

h.AssertEvalStatus(t, structs.EvalStatusComplete)
}

func TestGenericSched_FilterCompleteAllocs(t *testing.T) {
running := mock.Alloc()
desiredStop := mock.Alloc()
desiredStop.DesiredStatus = structs.AllocDesiredStatusStop

new := mock.Alloc()
new.CreateIndex = 10000

oldSuccessful := mock.Alloc()
oldSuccessful.CreateIndex = 30
oldSuccessful.DesiredStatus = structs.AllocDesiredStatusStop
oldSuccessful.ClientStatus = structs.AllocClientStatusComplete
oldSuccessful.TaskStates = make(map[string]*structs.TaskState, 1)
oldSuccessful.TaskStates["foo"] = &structs.TaskState{
State: structs.TaskStateDead,
Events: []*structs.TaskEvent{{Type: structs.TaskTerminated, ExitCode: 0}},
}

unsuccessful := mock.Alloc()
unsuccessful.DesiredStatus = structs.AllocDesiredStatusRun
unsuccessful.ClientStatus = structs.AllocClientStatusFailed
unsuccessful.TaskStates = make(map[string]*structs.TaskState, 1)
unsuccessful.TaskStates["foo"] = &structs.TaskState{
State: structs.TaskStateDead,
Events: []*structs.TaskEvent{{Type: structs.TaskTerminated, ExitCode: 1}},
}

cases := []struct {
Batch bool
Input, Output []*structs.Allocation
}{
{
Input: []*structs.Allocation{running},
Output: []*structs.Allocation{running},
},
{
Input: []*structs.Allocation{running, desiredStop},
Output: []*structs.Allocation{running},
},
{
Batch: true,
Input: []*structs.Allocation{running},
Output: []*structs.Allocation{running},
},
{
Batch: true,
Input: []*structs.Allocation{new, oldSuccessful},
Output: []*structs.Allocation{new},
},
{
Batch: true,
Input: []*structs.Allocation{unsuccessful},
Output: []*structs.Allocation{},
},
}

for i, c := range cases {
g := &GenericScheduler{batch: c.Batch}
out := g.filterCompleteAllocs(c.Input)

if !reflect.DeepEqual(out, c.Output) {
t.Log("Got:")
for i, a := range out {
t.Logf("%d: %#v", i, a)
}
t.Log("Want:")
for i, a := range c.Output {
t.Logf("%d: %#v", i, a)
}
t.Fatalf("Case %d failed", i+1)
}
}
}

0 comments on commit 6272c1c

Please sign in to comment.