Skip to content

Commit

Permalink
Fix batch handling of complete allocs/node drains
Browse files Browse the repository at this point in the history
This PR fixes:
* An issue in which a node-drain that contains a complete batch alloc
would cause a replacement
* An issue in which allocations with the same name during a scale
down/stop event wouldn't be properly stopped.
* An issue in which batch allocations from previous job versions may not
have been stopped properly.

Fixes #3210
  • Loading branch information
dadgar committed Sep 14, 2017
1 parent a88ce54 commit ccf077c
Show file tree
Hide file tree
Showing 5 changed files with 345 additions and 189 deletions.
13 changes: 1 addition & 12 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4610,18 +4610,7 @@ func (a *Allocation) Terminated() bool {
// RanSuccessfully returns whether the client has ran the allocation and all
// tasks finished successfully
func (a *Allocation) RanSuccessfully() bool {
// Handle the case the client hasn't started the allocation.
if len(a.TaskStates) == 0 {
return false
}

// Check to see if all the tasks finised successfully in the allocation
allSuccess := true
for _, state := range a.TaskStates {
allSuccess = allSuccess && state.Successful()
}

return allSuccess
return a.ClientStatus == AllocClientStatusComplete
}

// ShouldMigrate returns if the allocation needs data migration
Expand Down
34 changes: 3 additions & 31 deletions scheduler/generic_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func (s *GenericScheduler) process() (bool, error) {

// filterCompleteAllocs filters allocations that are terminal and should be
// re-placed.
func (s *GenericScheduler) filterCompleteAllocs(allocs []*structs.Allocation) ([]*structs.Allocation, map[string]*structs.Allocation) {
func (s *GenericScheduler) filterCompleteAllocs(allocs []*structs.Allocation) []*structs.Allocation {
filter := func(a *structs.Allocation) bool {
if s.batch {
// Allocs from batch jobs should be filtered when the desired status
Expand All @@ -319,45 +319,17 @@ func (s *GenericScheduler) filterCompleteAllocs(allocs []*structs.Allocation) ([
return a.TerminalStatus()
}

terminalAllocsByName := make(map[string]*structs.Allocation)
n := len(allocs)
for i := 0; i < n; i++ {
if filter(allocs[i]) {

// Add the allocation to the terminal allocs map if it's not already
// added or has a higher create index than the one which is
// currently present.
alloc, ok := terminalAllocsByName[allocs[i].Name]
if !ok || alloc.CreateIndex < allocs[i].CreateIndex {
terminalAllocsByName[allocs[i].Name] = allocs[i]
}

// Remove the allocation
allocs[i], allocs[n-1] = allocs[n-1], nil
i--
n--
}
}

// If the job is batch, we want to filter allocations that have been
// replaced by a newer version for the same task group.
filtered := allocs[:n]
if s.batch {
byTG := make(map[string]*structs.Allocation)
for _, alloc := range filtered {
existing := byTG[alloc.Name]
if existing == nil || existing.CreateIndex < alloc.CreateIndex {
byTG[alloc.Name] = alloc
}
}

filtered = make([]*structs.Allocation, 0, len(byTG))
for _, alloc := range byTG {
filtered = append(filtered, alloc)
}
}

return filtered, terminalAllocsByName
return allocs[:n]
}

// computeJobAllocs is used to reconcile differences between the job,
Expand All @@ -383,7 +355,7 @@ func (s *GenericScheduler) computeJobAllocs() error {
updateNonTerminalAllocsToLost(s.plan, tainted, allocs)

// Filter out the allocations in a terminal state
allocs, _ = s.filterCompleteAllocs(allocs)
allocs = s.filterCompleteAllocs(allocs)

reconciler := NewAllocReconciler(s.ctx.Logger(),
genericAllocUpdateFn(s.ctx, s.stack, s.eval.ID),
Expand Down
Loading

0 comments on commit ccf077c

Please sign in to comment.