Skip to content

Commit

Permalink
scheduler/reconcile: set FollowupEvalID on lost stop_after_client_dis…
Browse files Browse the repository at this point in the history
…connect (#8105) (#8138)

* scheduler/reconcile: set FollowupEvalID on lost stop_after_client_disconnect

* scheduler/reconcile: thread follupEvalIDs through to results.stop

* scheduler/reconcile: comment typo

* nomad/_test: correct arguments for plan.AppendStoppedAlloc

* scheduler/reconcile: avoid nil, cleanup handleDelayed(Lost|Reschedules)
  • Loading branch information
langmartin committed Jun 9, 2020
1 parent 11d80ae commit 9ccec0a
Show file tree
Hide file tree
Showing 10 changed files with 64 additions and 42 deletions.
1 change: 1 addition & 0 deletions nomad/plan_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ func normalizeStoppedAlloc(stoppedAlloc *structs.Allocation, now int64) *structs
DesiredDescription: stoppedAlloc.DesiredDescription,
ClientStatus: stoppedAlloc.ClientStatus,
ModifyTime: now,
FollowupEvalID: stoppedAlloc.FollowupEvalID,
}
}

Expand Down
3 changes: 3 additions & 0 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5379,6 +5379,9 @@ func (s *StateSnapshot) DenormalizeAllocationDiffSlice(allocDiffs []*structs.All
if allocDiff.ClientStatus != "" {
allocCopy.ClientStatus = allocDiff.ClientStatus
}
if allocDiff.FollowupEvalID != "" {
allocCopy.FollowupEvalID = allocDiff.FollowupEvalID
}
}
if allocDiff.ModifyTime != 0 {
allocCopy.ModifyTime = allocDiff.ModifyTime
Expand Down
7 changes: 6 additions & 1 deletion nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -9465,7 +9465,7 @@ type Plan struct {

// AppendStoppedAlloc marks an allocation to be stopped. The clientStatus of the
// allocation may be optionally set by passing in a non-empty value.
func (p *Plan) AppendStoppedAlloc(alloc *Allocation, desiredDesc, clientStatus string) {
func (p *Plan) AppendStoppedAlloc(alloc *Allocation, desiredDesc, clientStatus, followupEvalID string) {
newAlloc := new(Allocation)
*newAlloc = *alloc

Expand All @@ -9490,6 +9490,10 @@ func (p *Plan) AppendStoppedAlloc(alloc *Allocation, desiredDesc, clientStatus s

newAlloc.AppendState(AllocStateFieldClientStatus, clientStatus)

if followupEvalID != "" {
newAlloc.FollowupEvalID = followupEvalID
}

node := alloc.NodeID
existing := p.NodeUpdate[node]
p.NodeUpdate[node] = append(existing, newAlloc)
Expand Down Expand Up @@ -9564,6 +9568,7 @@ func (p *Plan) NormalizeAllocations() {
ID: alloc.ID,
DesiredDescription: alloc.DesiredDescription,
ClientStatus: alloc.ClientStatus,
FollowupEvalID: alloc.FollowupEvalID,
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions nomad/structs/structs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3563,7 +3563,7 @@ func TestPlan_NormalizeAllocations(t *testing.T) {
}
stoppedAlloc := MockAlloc()
desiredDesc := "Desired desc"
plan.AppendStoppedAlloc(stoppedAlloc, desiredDesc, AllocClientStatusLost)
plan.AppendStoppedAlloc(stoppedAlloc, desiredDesc, AllocClientStatusLost, "followup-eval-id")
preemptedAlloc := MockAlloc()
preemptingAllocID := uuid.Generate()
plan.AppendPreemptedAlloc(preemptedAlloc, preemptingAllocID)
Expand All @@ -3575,6 +3575,7 @@ func TestPlan_NormalizeAllocations(t *testing.T) {
ID: stoppedAlloc.ID,
DesiredDescription: desiredDesc,
ClientStatus: AllocClientStatusLost,
FollowupEvalID: "followup-eval-id",
}
assert.Equal(t, expectedStoppedAlloc, actualStoppedAlloc)
actualPreemptedAlloc := plan.NodePreemptions[preemptedAlloc.NodeID][0]
Expand All @@ -3593,7 +3594,7 @@ func TestPlan_AppendStoppedAllocAppendsAllocWithUpdatedAttrs(t *testing.T) {
alloc := MockAlloc()
desiredDesc := "Desired desc"

plan.AppendStoppedAlloc(alloc, desiredDesc, AllocClientStatusLost)
plan.AppendStoppedAlloc(alloc, desiredDesc, AllocClientStatusLost, "")

expectedAlloc := new(Allocation)
*expectedAlloc = *alloc
Expand Down
2 changes: 1 addition & 1 deletion nomad/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ func TestWorker_SubmitPlanNormalizedAllocations(t *testing.T) {
NodePreemptions: make(map[string][]*structs.Allocation),
}
desiredDescription := "desired desc"
plan.AppendStoppedAlloc(stoppedAlloc, desiredDescription, structs.AllocClientStatusLost)
plan.AppendStoppedAlloc(stoppedAlloc, desiredDescription, structs.AllocClientStatusLost, "")
preemptingAllocID := uuid.Generate()
plan.AppendPreemptedAlloc(preemptedAlloc, preemptingAllocID)

Expand Down
4 changes: 2 additions & 2 deletions scheduler/generic_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ func (s *GenericScheduler) computeJobAllocs() error {

// Handle the stop
for _, stop := range results.stop {
s.plan.AppendStoppedAlloc(stop.alloc, stop.statusDescription, stop.clientStatus)
s.plan.AppendStoppedAlloc(stop.alloc, stop.statusDescription, stop.clientStatus, stop.followupEvalID)
}

// Handle the in-place updates
Expand Down Expand Up @@ -476,7 +476,7 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
stopPrevAlloc, stopPrevAllocDesc := missing.StopPreviousAlloc()
prevAllocation := missing.PreviousAllocation()
if stopPrevAlloc {
s.plan.AppendStoppedAlloc(prevAllocation, stopPrevAllocDesc, "")
s.plan.AppendStoppedAlloc(prevAllocation, stopPrevAllocDesc, "", "")
}

// Compute penalty nodes for rescheduled allocs
Expand Down
69 changes: 40 additions & 29 deletions scheduler/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,19 @@ func (a *allocReconciler) markStop(allocs allocSet, clientStatus, statusDescript
}
}

// markDelayed does markStop, but optionally includes a FollowupEvalID so that we can update
// the stopped alloc with its delayed rescheduling evalID
func (a *allocReconciler) markDelayed(allocs allocSet, clientStatus, statusDescription string, followupEvals map[string]string) {
for _, alloc := range allocs {
a.result.stop = append(a.result.stop, allocStopResult{
alloc: alloc,
clientStatus: clientStatus,
statusDescription: statusDescription,
followupEvalID: followupEvals[alloc.ID],
})
}
}

// computeGroup reconciles state for a particular task group. It returns whether
// the deployment it is for is complete with regards to the task group.
func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
Expand Down Expand Up @@ -355,7 +368,7 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {

// Find delays for any lost allocs that have stop_after_client_disconnect
lostLater := lost.delayByStopAfterClientDisconnect()
a.handleDelayedLost(lostLater, all, tg.Name)
lostLaterEvals := a.handleDelayedLost(lostLater, all, tg.Name)

// Create batched follow up evaluations for allocations that are
// reschedulable later and mark the allocations for in place updating
Expand All @@ -368,7 +381,7 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
// Stop any unneeded allocations and update the untainted set to not
// included stopped allocations.
canaryState := dstate != nil && dstate.DesiredCanaries != 0 && !dstate.Promoted
stop := a.computeStop(tg, nameIndex, untainted, migrate, lost, canaries, canaryState)
stop := a.computeStop(tg, nameIndex, untainted, migrate, lost, canaries, canaryState, lostLaterEvals)
desiredChanges.Stop += uint64(len(stop))
untainted = untainted.difference(stop)

Expand Down Expand Up @@ -705,13 +718,13 @@ func (a *allocReconciler) computePlacements(group *structs.TaskGroup,
// the group definition, the set of allocations in various states and whether we
// are canarying.
func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *allocNameIndex,
untainted, migrate, lost, canaries allocSet, canaryState bool) allocSet {
untainted, migrate, lost, canaries allocSet, canaryState bool, followupEvals map[string]string) allocSet {

// Mark all lost allocations for stop. Previous allocation doesn't matter
// here since it is on a lost node
var stop allocSet
stop = stop.union(lost)
a.markStop(lost, structs.AllocClientStatusLost, allocLost)
a.markDelayed(lost, structs.AllocClientStatusLost, allocLost, followupEvals)

// If we are still deploying or creating canaries, don't stop them
if canaryState {
Expand Down Expand Up @@ -836,22 +849,33 @@ func (a *allocReconciler) computeUpdates(group *structs.TaskGroup, untainted all
return
}

// handleDelayedReschedules creates batched followup evaluations with the WaitUntil field set
// for allocations that are eligible to be rescheduled later
// handleDelayedReschedules creates batched followup evaluations with the WaitUntil field
// set for allocations that are eligible to be rescheduled later, and marks the alloc with
// the followupEvalID
func (a *allocReconciler) handleDelayedReschedules(rescheduleLater []*delayedRescheduleInfo, all allocSet, tgName string) {
a.handleDelayedReschedulesImpl(rescheduleLater, all, tgName, true)
}
// followupEvals are created in the same way as for delayed lost allocs
allocIDToFollowupEvalID := a.handleDelayedLost(rescheduleLater, all, tgName)

// Initialize the annotations
if len(allocIDToFollowupEvalID) != 0 && a.result.attributeUpdates == nil {
a.result.attributeUpdates = make(map[string]*structs.Allocation)
}

// handleDelayedLost creates batched followup evaluations with the WaitUntil field set for lost allocations
func (a *allocReconciler) handleDelayedLost(rescheduleLater []*delayedRescheduleInfo, all allocSet, tgName string) {
a.handleDelayedReschedulesImpl(rescheduleLater, all, tgName, false)
// Create updates that will be applied to the allocs to mark the FollowupEvalID
for allocID, evalID := range allocIDToFollowupEvalID {
existingAlloc := all[allocID]
updatedAlloc := existingAlloc.Copy()
updatedAlloc.FollowupEvalID = evalID
a.result.attributeUpdates[updatedAlloc.ID] = updatedAlloc
}
}

// handleDelayedReschedulesImpl creates batched followup evaluations with the WaitUntil field set
func (a *allocReconciler) handleDelayedReschedulesImpl(rescheduleLater []*delayedRescheduleInfo, all allocSet, tgName string,
createUpdates bool) {
// handleDelayedLost creates batched followup evaluations with the WaitUntil field set for
// lost allocations. followupEvals are appended to a.result as a side effect, we return a
// map of alloc IDs to their followupEval IDs
func (a *allocReconciler) handleDelayedLost(rescheduleLater []*delayedRescheduleInfo, all allocSet, tgName string) map[string]string {
if len(rescheduleLater) == 0 {
return
return map[string]string{}
}

// Sort by time
Expand Down Expand Up @@ -904,18 +928,5 @@ func (a *allocReconciler) handleDelayedReschedulesImpl(rescheduleLater []*delaye

a.result.desiredFollowupEvals[tgName] = evals

// Initialize the annotations
if len(allocIDToFollowupEvalID) != 0 && a.result.attributeUpdates == nil {
a.result.attributeUpdates = make(map[string]*structs.Allocation)
}

// Create in-place updates for every alloc ID that needs to be updated with its follow up eval ID
if createUpdates {
for allocID, evalID := range allocIDToFollowupEvalID {
existingAlloc := all[allocID]
updatedAlloc := existingAlloc.Copy()
updatedAlloc.FollowupEvalID = evalID
a.result.attributeUpdates[updatedAlloc.ID] = updatedAlloc
}
}
return allocIDToFollowupEvalID
}
1 change: 1 addition & 0 deletions scheduler/reconcile_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type allocStopResult struct {
alloc *structs.Allocation
clientStatus string
statusDescription string
followupEvalID string
}

// allocPlaceResult contains the information required to place a single
Expand Down
6 changes: 3 additions & 3 deletions scheduler/system_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,18 +212,18 @@ func (s *SystemScheduler) computeJobAllocs() error {

// Add all the allocs to stop
for _, e := range diff.stop {
s.plan.AppendStoppedAlloc(e.Alloc, allocNotNeeded, "")
s.plan.AppendStoppedAlloc(e.Alloc, allocNotNeeded, "", "")
}

// Add all the allocs to migrate
for _, e := range diff.migrate {
s.plan.AppendStoppedAlloc(e.Alloc, allocNodeTainted, "")
s.plan.AppendStoppedAlloc(e.Alloc, allocNodeTainted, "", "")
}

// Lost allocations should be transitioned to desired status stop and client
// status lost.
for _, e := range diff.lost {
s.plan.AppendStoppedAlloc(e.Alloc, allocLost, structs.AllocClientStatusLost)
s.plan.AppendStoppedAlloc(e.Alloc, allocLost, structs.AllocClientStatusLost, "")
}

// Attempt to do the upgrades in place
Expand Down
8 changes: 4 additions & 4 deletions scheduler/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ func inplaceUpdate(ctx Context, eval *structs.Evaluation, job *structs.Job,
// the current allocation is discounted when checking for feasibility.
// Otherwise we would be trying to fit the tasks current resources and
// updated resources. After select is called we can remove the evict.
ctx.Plan().AppendStoppedAlloc(update.Alloc, allocInPlace, "")
ctx.Plan().AppendStoppedAlloc(update.Alloc, allocInPlace, "", "")

// Attempt to match the task group
option := stack.Select(update.TaskGroup, nil) // This select only looks at one node so we don't pass selectOptions
Expand Down Expand Up @@ -670,7 +670,7 @@ func evictAndPlace(ctx Context, diff *diffResult, allocs []allocTuple, desc stri
n := len(allocs)
for i := 0; i < n && i < *limit; i++ {
a := allocs[i]
ctx.Plan().AppendStoppedAlloc(a.Alloc, desc, "")
ctx.Plan().AppendStoppedAlloc(a.Alloc, desc, "", "")
diff.place = append(diff.place, a)
}
if n <= *limit {
Expand Down Expand Up @@ -831,7 +831,7 @@ func updateNonTerminalAllocsToLost(plan *structs.Plan, tainted map[string]*struc
alloc.DesiredStatus == structs.AllocDesiredStatusEvict) &&
(alloc.ClientStatus == structs.AllocClientStatusRunning ||
alloc.ClientStatus == structs.AllocClientStatusPending) {
plan.AppendStoppedAlloc(alloc, allocLost, structs.AllocClientStatusLost)
plan.AppendStoppedAlloc(alloc, allocLost, structs.AllocClientStatusLost, "")
}
}
}
Expand Down Expand Up @@ -881,7 +881,7 @@ func genericAllocUpdateFn(ctx Context, stack Stack, evalID string) allocUpdateTy
// the current allocation is discounted when checking for feasibility.
// Otherwise we would be trying to fit the tasks current resources and
// updated resources. After select is called we can remove the evict.
ctx.Plan().AppendStoppedAlloc(existing, allocInPlace, "")
ctx.Plan().AppendStoppedAlloc(existing, allocInPlace, "", "")

// Attempt to match the task group
option := stack.Select(newTG, nil) // This select only looks at one node so we don't pass selectOptions
Expand Down

0 comments on commit 9ccec0a

Please sign in to comment.