Skip to content

Commit

Permalink
Merge pull request #3217 from hashicorp/b-batch-filter
Browse files Browse the repository at this point in the history
Fix batch handling of complete allocs/node drains
  • Loading branch information
dadgar committed Sep 14, 2017
2 parents ca07d16 + 9b594b4 commit 15be156
Show file tree
Hide file tree
Showing 6 changed files with 351 additions and 189 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ IMPROVEMENTS:

BUG FIXES:
* core: Fix restoration of stopped periodic jobs [GH-3201]
* core: Fix issue where node-drain with complete batch allocation would create
replacement [GH-3217]
* core: Fix issue in which batch allocations from previous job versions may not
have been stopped properly. [GH-3217]
* core: Fix issue in which allocations with the same name during a scale
down/stop event wouldn't be properly stopped [GH-3217]
* core: Fix a race condition in which scheduling results from one invocation of
the scheduler wouldn't be considered by the next for the same job [GH-3206]
* api: Sort /v1/agent/servers output so that output of Consul checks does not
Expand Down
13 changes: 1 addition & 12 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4630,18 +4630,7 @@ func (a *Allocation) Terminated() bool {
// RanSuccessfully returns whether the client has ran the allocation and all
// tasks finished successfully
func (a *Allocation) RanSuccessfully() bool {
// Handle the case the client hasn't started the allocation.
if len(a.TaskStates) == 0 {
return false
}

// Check to see if all the tasks finised successfully in the allocation
allSuccess := true
for _, state := range a.TaskStates {
allSuccess = allSuccess && state.Successful()
}

return allSuccess
return a.ClientStatus == AllocClientStatusComplete
}

// ShouldMigrate returns if the allocation needs data migration
Expand Down
34 changes: 3 additions & 31 deletions scheduler/generic_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func (s *GenericScheduler) process() (bool, error) {

// filterCompleteAllocs filters allocations that are terminal and should be
// re-placed.
func (s *GenericScheduler) filterCompleteAllocs(allocs []*structs.Allocation) ([]*structs.Allocation, map[string]*structs.Allocation) {
func (s *GenericScheduler) filterCompleteAllocs(allocs []*structs.Allocation) []*structs.Allocation {
filter := func(a *structs.Allocation) bool {
if s.batch {
// Allocs from batch jobs should be filtered when the desired status
Expand All @@ -319,45 +319,17 @@ func (s *GenericScheduler) filterCompleteAllocs(allocs []*structs.Allocation) ([
return a.TerminalStatus()
}

terminalAllocsByName := make(map[string]*structs.Allocation)
n := len(allocs)
for i := 0; i < n; i++ {
if filter(allocs[i]) {

// Add the allocation to the terminal allocs map if it's not already
// added or has a higher create index than the one which is
// currently present.
alloc, ok := terminalAllocsByName[allocs[i].Name]
if !ok || alloc.CreateIndex < allocs[i].CreateIndex {
terminalAllocsByName[allocs[i].Name] = allocs[i]
}

// Remove the allocation
allocs[i], allocs[n-1] = allocs[n-1], nil
i--
n--
}
}

// If the job is batch, we want to filter allocations that have been
// replaced by a newer version for the same task group.
filtered := allocs[:n]
if s.batch {
byTG := make(map[string]*structs.Allocation)
for _, alloc := range filtered {
existing := byTG[alloc.Name]
if existing == nil || existing.CreateIndex < alloc.CreateIndex {
byTG[alloc.Name] = alloc
}
}

filtered = make([]*structs.Allocation, 0, len(byTG))
for _, alloc := range byTG {
filtered = append(filtered, alloc)
}
}

return filtered, terminalAllocsByName
return allocs[:n]
}

// computeJobAllocs is used to reconcile differences between the job,
Expand All @@ -383,7 +355,7 @@ func (s *GenericScheduler) computeJobAllocs() error {
updateNonTerminalAllocsToLost(s.plan, tainted, allocs)

// Filter out the allocations in a terminal state
allocs, _ = s.filterCompleteAllocs(allocs)
allocs = s.filterCompleteAllocs(allocs)

reconciler := NewAllocReconciler(s.ctx.Logger(),
genericAllocUpdateFn(s.ctx, s.stack, s.eval.ID),
Expand Down
Loading

0 comments on commit 15be156

Please sign in to comment.