Skip to content

Commit

Permalink
scheduler: reconnect alloc with an failed replacement
Browse files Browse the repository at this point in the history
  • Loading branch information
lgfa29 committed Mar 22, 2023
1 parent a633b79 commit 7e334ff
Showing 1 changed file with 134 additions and 84 deletions.
218 changes: 134 additions & 84 deletions scheduler/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,25 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool {
untainted, migrate, lost, disconnecting, reconnecting, ignore := all.filterByTainted(a.taintedNodes, a.supportsDisconnectedClients, a.now)
desiredChanges.Ignore += uint64(len(ignore))

// If there are allocations reconnecting we need to reconcile its
// replacements first because there are specific business logic when
// deciding which one to keep.
if len(reconnecting) > 0 {
reconnect, stop := a.reconcileReconnecting(reconnecting, all)

// Stop the reconnecting allocations that we don't need anymore.
desiredChanges.Stop += uint64(len(stop))
untainted = untainted.difference(stop)

// Validate and add reconnecting allocations to the plan so they are
// logged.
a.computeReconnecting(reconnect)

// The rest of the reconnecting allocations is now untainted and will
// be further reconciled below.
untainted = untainted.union(reconnect)
}

// Determine what set of terminal allocations need to be rescheduled
untainted, rescheduleNow, rescheduleLater := untainted.filterByRescheduleable(a.batch, false, a.now, a.evalID, a.deployment)

Expand Down Expand Up @@ -459,14 +478,10 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool {
// Stop any unneeded allocations and update the untainted set to not
// include stopped allocations.
isCanarying := dstate != nil && dstate.DesiredCanaries != 0 && !dstate.Promoted
stop, reconnecting := a.computeStop(tg, nameIndex, untainted, migrate, lost, canaries, reconnecting, isCanarying, lostLaterEvals)
stop := a.computeStop(tg, nameIndex, untainted, migrate, lost, canaries, isCanarying, lostLaterEvals)
desiredChanges.Stop += uint64(len(stop))
untainted = untainted.difference(stop)

// Validate and add reconnecting allocs to the plan so that they will be logged.
a.computeReconnecting(reconnecting)
desiredChanges.Ignore += uint64(len(a.result.reconnectUpdates))

// Do inplace upgrades where possible and capture the set of upgrades that
// need to be done destructively.
ignore, inplace, destructive := a.computeUpdates(tg, untainted)
Expand Down Expand Up @@ -496,10 +511,9 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool {
// * If there are any canaries that they have been promoted
// * There is no delayed stop_after_client_disconnect alloc, which delays scheduling for the whole group
// * An alloc was lost
// * There is not a corresponding reconnecting alloc.
var place []allocPlaceResult
if len(lostLater) == 0 {
place = a.computePlacements(tg, nameIndex, untainted, migrate, rescheduleNow, lost, reconnecting, isCanarying)
place = a.computePlacements(tg, nameIndex, untainted, migrate, rescheduleNow, lost, isCanarying)
if !existingDeployment {
dstate.DesiredTotal += len(place)
}
Expand Down Expand Up @@ -705,7 +719,7 @@ func (a *allocReconciler) computeUnderProvisionedBy(group *structs.TaskGroup, un
//
// Placements will meet or exceed group count.
func (a *allocReconciler) computePlacements(group *structs.TaskGroup,
nameIndex *allocNameIndex, untainted, migrate, reschedule, lost, reconnecting allocSet,
nameIndex *allocNameIndex, untainted, migrate, reschedule, lost allocSet,
isCanarying bool) []allocPlaceResult {

// Add rescheduled placement results
Expand All @@ -725,7 +739,7 @@ func (a *allocReconciler) computePlacements(group *structs.TaskGroup,
}

// Add replacements for disconnected and lost allocs up to group.Count
existing := len(untainted) + len(migrate) + len(reschedule) + len(reconnecting) - len(reconnecting.filterByFailedReconnect())
existing := len(untainted) + len(migrate) + len(reschedule)

// Add replacements for lost
for _, alloc := range lost {
Expand Down Expand Up @@ -935,28 +949,22 @@ func (a *allocReconciler) isDeploymentComplete(groupName string, destructive, in
// the group definition, the set of allocations in various states and whether we
// are canarying.
func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *allocNameIndex,
untainted, migrate, lost, canaries, reconnecting allocSet, isCanarying bool, followupEvals map[string]string) (allocSet, allocSet) {
untainted, migrate, lost, canaries allocSet, isCanarying bool, followupEvals map[string]string) allocSet {

// Mark all lost allocations for stop.
var stop allocSet
stop = stop.union(lost)
a.markDelayed(lost, structs.AllocClientStatusLost, allocLost, followupEvals)

// Mark all failed reconnects for stop.
failedReconnects := reconnecting.filterByFailedReconnect()
stop = stop.union(failedReconnects)
a.markStop(failedReconnects, structs.AllocClientStatusFailed, allocRescheduled)
reconnecting = reconnecting.difference(failedReconnects)

// If we are still deploying or creating canaries, don't stop them
if isCanarying {
untainted = untainted.difference(canaries)
}

// Hot path the nothing to do case
remove := len(untainted) + len(migrate) + len(reconnecting) - group.Count
remove := len(untainted) + len(migrate) - group.Count
if remove <= 0 {
return stop, reconnecting
return stop
}

// Filter out any terminal allocations from the untainted set
Expand All @@ -978,7 +986,7 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc

remove--
if remove == 0 {
return stop, reconnecting
return stop
}
}
}
Expand All @@ -1002,19 +1010,11 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc

remove--
if remove == 0 {
return stop, reconnecting
return stop
}
}
}

// Handle allocs that might be able to reconnect.
if len(reconnecting) != 0 {
remove = a.computeStopByReconnecting(untainted, reconnecting, stop, remove)
if remove == 0 {
return stop, reconnecting
}
}

// Select the allocs with the highest count to remove
removeNames := nameIndex.Highest(uint(remove))
for id, alloc := range untainted {
Expand All @@ -1028,7 +1028,7 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc

remove--
if remove == 0 {
return stop, reconnecting
return stop
}
}
}
Expand All @@ -1045,95 +1045,145 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc

remove--
if remove == 0 {
return stop, reconnecting
return stop
}
}

return stop, reconnecting
return stop
}

// computeStopByReconnecting moves allocations from either the untainted or reconnecting
// sets to the stop set and returns the number of allocations that still need to be removed.
func (a *allocReconciler) computeStopByReconnecting(untainted, reconnecting, stop allocSet, remove int) int {
if remove == 0 {
return remove
}
// reconcileReconnecting receives the set of allocations that are reconnecting
// and all other allocations for the same group and determines which ones to
// reconnect which ones or stop.
//
// - Every reconnecting allocation MUST be present in either returned set.
// - Every replacement allocation that is not preferred MUST be returned in
// the stop set.
// - Only reconnecting allocations are allowed to be returned in the
// reconnect set.
// - If the reconnecting allocation is to be stopped, its replacements may
// not be present in any of the returned sets. The rest of the reconciler
// logic will handle them.
func (a *allocReconciler) reconcileReconnecting(reconnecting allocSet, others allocSet) (allocSet, allocSet) {
stop := make(allocSet)
reconnect := make(allocSet)

// Mark all failed reconnects for stop.
failedReconnects := reconnecting.filterByFailedReconnect()
stop = stop.union(failedReconnects)
a.markStop(failedReconnects, structs.AllocClientStatusFailed, allocRescheduled)
successfulReconnects := reconnecting.difference(failedReconnects)

for _, reconnectingAlloc := range reconnecting {
// if the desired status is not run, or if the user-specified desired
for _, reconnectingAlloc := range successfulReconnects {
// If the desired status is not run, or if the user-specified desired
// transition is not run, stop the reconnecting allocation.
if reconnectingAlloc.DesiredStatus != structs.AllocDesiredStatusRun ||
stopReconnecting := reconnectingAlloc.DesiredStatus != structs.AllocDesiredStatusRun ||
reconnectingAlloc.DesiredTransition.ShouldMigrate() ||
reconnectingAlloc.DesiredTransition.ShouldReschedule() ||
reconnectingAlloc.DesiredTransition.ShouldForceReschedule() ||
reconnectingAlloc.Job.Version < a.job.Version ||
reconnectingAlloc.Job.CreateIndex < a.job.CreateIndex {
reconnectingAlloc.Job.CreateIndex < a.job.CreateIndex

if stopReconnecting {
stop[reconnectingAlloc.ID] = reconnectingAlloc
a.result.stop = append(a.result.stop, allocStopResult{
alloc: reconnectingAlloc,
statusDescription: allocNotNeeded,
})
delete(reconnecting, reconnectingAlloc.ID)

remove--
// if we've removed all we need to, stop iterating and return.
if remove == 0 {
return remove
}
continue
}

// Compare reconnecting to untainted and decide which to keep.
for _, untaintedAlloc := range untainted {
// If not a match by name and previous alloc continue
if reconnectingAlloc.Name != untaintedAlloc.Name {
continue
}
// By default, prefer stopping the replacement allocation, so assume
// all non-terminal reconnecting allocations will reconnect.
reconnect[reconnectingAlloc.ID] = reconnectingAlloc

// By default, we prefer stopping the replacement alloc unless
// the replacement has a higher metrics score.
stopAlloc := untaintedAlloc
deleteSet := untainted
untaintedMaxScoreMeta := untaintedAlloc.Metrics.MaxNormScore()
reconnectingMaxScoreMeta := reconnectingAlloc.Metrics.MaxNormScore()

if untaintedMaxScoreMeta == nil {
a.logger.Error("error computing stop: replacement allocation metrics not available", "alloc_name", untaintedAlloc.Name, "alloc_id", untaintedAlloc.ID)
// Find replacement allocations and decide which one to stop. A
// reconnecting allocation may have multiple replacements.
for _, replacementAlloc := range others {
// spiderman_meme.jpg
if reconnectingAlloc.ID == replacementAlloc.ID {
continue
}

if reconnectingMaxScoreMeta == nil {
a.logger.Error("error computing stop: reconnecting allocation metrics not available", "alloc_name", reconnectingAlloc.Name, "alloc_id", reconnectingAlloc.ID)
// Replacement allocations have the same name as the original so
// skip the ones that don't.
if reconnectingAlloc.Name != replacementAlloc.Name {
continue
}

statusDescription := allocNotNeeded
if untaintedAlloc.Job.Version > reconnectingAlloc.Job.Version ||
untaintedAlloc.Job.CreateIndex > reconnectingAlloc.Job.CreateIndex ||
untaintedMaxScoreMeta.NormScore > reconnectingMaxScoreMeta.NormScore {
stopAlloc = reconnectingAlloc
deleteSet = reconnecting
} else {
statusDescription = allocReconnected
// Skip allocations that are server terminal since they are
// already set to stop.
if replacementAlloc.ServerTerminalStatus() {
continue
}

stop[stopAlloc.ID] = stopAlloc
a.result.stop = append(a.result.stop, allocStopResult{
alloc: stopAlloc,
statusDescription: statusDescription,
})
delete(deleteSet, stopAlloc.ID)
keepAlloc := pickReconnectingAlloc(reconnectingAlloc, replacementAlloc)
if keepAlloc == replacementAlloc {
// The replacement allocation is preferred, so stop the one
// that is reconnecting if not stopped yet.
if _, ok := stop[reconnectingAlloc.ID]; !ok {
stop[reconnectingAlloc.ID] = reconnectingAlloc
a.result.stop = append(a.result.stop, allocStopResult{
alloc: reconnectingAlloc,
statusDescription: allocNotNeeded,
})
}

remove--
// if we've removed all we need to, stop iterating and return.
if remove == 0 {
return remove
// Our assumption that all non-terminal reconnecting
// allocations will reconnect was wrong, so remove it from the
// reconnect set.
delete(reconnect, reconnectingAlloc.ID)
} else {
// The reconnecting allocation is preferred, so stop this
// replacement.
stop[replacementAlloc.ID] = replacementAlloc
a.result.stop = append(a.result.stop, allocStopResult{
alloc: replacementAlloc,
statusDescription: allocReconnected,
})
}
}
}

return remove
return reconnect, stop
}

// pickReconnectingAlloc returns the allocation to keep between the original
// one that is reconnecting and one of its replacements.
//
// This function is not commutative, meaning that pickReconnectingAlloc(A, B)
// is not the same as pickReconnectingAlloc(B, A). Preference is given to keep
// the original allocation when possible.
func pickReconnectingAlloc(original *structs.Allocation, replacement *structs.Allocation) *structs.Allocation {
// Check if the replacement is newer.
// Always prefer the replacement if true.
replacementIsNewer := replacement.Job.Version > original.Job.Version ||
replacement.Job.CreateIndex > original.Job.CreateIndex
if replacementIsNewer {
return replacement
}

// Check if the replacement has better placement score.
// If any of the scores is not available, only pick the replacement if
// itself does have scores.
originalMaxScoreMeta := original.Metrics.MaxNormScore()
replacementMaxScoreMeta := replacement.Metrics.MaxNormScore()

replacementHasBetterScore := originalMaxScoreMeta == nil && replacementMaxScoreMeta != nil ||
(originalMaxScoreMeta != nil && replacementMaxScoreMeta != nil &&
replacementMaxScoreMeta.NormScore > originalMaxScoreMeta.NormScore)

// Check if the replacement has better client status.
// Even with a better placement score make sure we don't replace a running
// allocation with one that is not.
replacementIsRunning := replacement.ClientStatus == structs.AllocClientStatusRunning
originalNotRunning := original.ClientStatus != structs.AllocClientStatusRunning

if replacementHasBetterScore && (replacementIsRunning || originalNotRunning) {
return replacement
}

return original
}

// computeUpdates determines which allocations for the passed group require
Expand Down

0 comments on commit 7e334ff

Please sign in to comment.