Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: fix reconciliation of reconnecting allocs #16609

Merged
merged 7 commits into from
Mar 24, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/16609.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
scheduler: Fix reconciliation of reconnecting allocs when the replacement allocations are not running
```
234 changes: 150 additions & 84 deletions scheduler/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,34 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool {
untainted, migrate, lost, disconnecting, reconnecting, ignore := all.filterByTainted(a.taintedNodes, a.supportsDisconnectedClients, a.now)
desiredChanges.Ignore += uint64(len(ignore))

// If there are allocations reconnecting we need to reconcile them and
// their replacements first because there is specific logic when deciding
// which ones to keep that can only be applied when the client reconnects.
if len(reconnecting) > 0 {
// Pass all allocations becasue the replacements we need to find may be
// in any state, including themselves being reconnected.
reconnect, stop := a.reconcileReconnecting(reconnecting, all)

// Stop the reconciled allocations and remove them from the other sets
// since they have been already handled.
desiredChanges.Stop += uint64(len(stop))

untainted = untainted.difference(stop)
migrate = migrate.difference(stop)
lost = lost.difference(stop)
disconnecting = disconnecting.difference(stop)
reconnecting = reconnecting.difference(stop)
ignore = ignore.difference(stop)

// Validate and add reconnecting allocations to the plan so they are
// logged.
a.computeReconnecting(reconnect)

// The rest of the reconnecting allocations is now untainted and will
// be further reconciled below.
untainted = untainted.union(reconnect)
}

// Determine what set of terminal allocations need to be rescheduled
untainted, rescheduleNow, rescheduleLater := untainted.filterByRescheduleable(a.batch, false, a.now, a.evalID, a.deployment)

Expand Down Expand Up @@ -459,14 +487,10 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool {
// Stop any unneeded allocations and update the untainted set to not
// include stopped allocations.
isCanarying := dstate != nil && dstate.DesiredCanaries != 0 && !dstate.Promoted
stop, reconnecting := a.computeStop(tg, nameIndex, untainted, migrate, lost, canaries, reconnecting, isCanarying, lostLaterEvals)
stop := a.computeStop(tg, nameIndex, untainted, migrate, lost, canaries, isCanarying, lostLaterEvals)
desiredChanges.Stop += uint64(len(stop))
untainted = untainted.difference(stop)

// Validate and add reconnecting allocs to the plan so that they will be logged.
a.computeReconnecting(reconnecting)
desiredChanges.Ignore += uint64(len(a.result.reconnectUpdates))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't tracked down exactly why, but pulling this line to the reconnecting if block above causes a miscount of Ignore.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've spent an hour looking at it and can't figure it out either. You mentioned there was a bug around that remove variable decrementing, maybe that was resulting in a change to the ignore set?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see it now. computeUpdates find allocs to ignore from the untainted set. Before the reconnecting allocs were not included there, but now they are (if not stopped), so if we increment Ignore during the reconnect reconciliation we will double-count the reconnecting allocs.


// Do inplace upgrades where possible and capture the set of upgrades that
// need to be done destructively.
ignore, inplace, destructive := a.computeUpdates(tg, untainted)
Expand Down Expand Up @@ -496,10 +520,9 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool {
// * If there are any canaries that they have been promoted
// * There is no delayed stop_after_client_disconnect alloc, which delays scheduling for the whole group
// * An alloc was lost
// * There is not a corresponding reconnecting alloc.
var place []allocPlaceResult
if len(lostLater) == 0 {
place = a.computePlacements(tg, nameIndex, untainted, migrate, rescheduleNow, lost, reconnecting, isCanarying)
place = a.computePlacements(tg, nameIndex, untainted, migrate, rescheduleNow, lost, isCanarying)
if !existingDeployment {
dstate.DesiredTotal += len(place)
}
Expand Down Expand Up @@ -705,7 +728,7 @@ func (a *allocReconciler) computeUnderProvisionedBy(group *structs.TaskGroup, un
//
// Placements will meet or exceed group count.
func (a *allocReconciler) computePlacements(group *structs.TaskGroup,
nameIndex *allocNameIndex, untainted, migrate, reschedule, lost, reconnecting allocSet,
nameIndex *allocNameIndex, untainted, migrate, reschedule, lost allocSet,
isCanarying bool) []allocPlaceResult {

// Add rescheduled placement results
Expand All @@ -725,7 +748,7 @@ func (a *allocReconciler) computePlacements(group *structs.TaskGroup,
}

// Add replacements for disconnected and lost allocs up to group.Count
existing := len(untainted) + len(migrate) + len(reschedule) + len(reconnecting) - len(reconnecting.filterByFailedReconnect())
existing := len(untainted) + len(migrate) + len(reschedule)

// Add replacements for lost
for _, alloc := range lost {
Expand Down Expand Up @@ -935,28 +958,22 @@ func (a *allocReconciler) isDeploymentComplete(groupName string, destructive, in
// the group definition, the set of allocations in various states and whether we
// are canarying.
func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *allocNameIndex,
untainted, migrate, lost, canaries, reconnecting allocSet, isCanarying bool, followupEvals map[string]string) (allocSet, allocSet) {
untainted, migrate, lost, canaries allocSet, isCanarying bool, followupEvals map[string]string) allocSet {

// Mark all lost allocations for stop.
var stop allocSet
stop = stop.union(lost)
a.markDelayed(lost, structs.AllocClientStatusLost, allocLost, followupEvals)

// Mark all failed reconnects for stop.
failedReconnects := reconnecting.filterByFailedReconnect()
stop = stop.union(failedReconnects)
a.markStop(failedReconnects, structs.AllocClientStatusFailed, allocRescheduled)
reconnecting = reconnecting.difference(failedReconnects)

// If we are still deploying or creating canaries, don't stop them
if isCanarying {
untainted = untainted.difference(canaries)
}

// Hot path the nothing to do case
remove := len(untainted) + len(migrate) + len(reconnecting) - group.Count
remove := len(untainted) + len(migrate) - group.Count
if remove <= 0 {
return stop, reconnecting
return stop
}

// Filter out any terminal allocations from the untainted set
Expand All @@ -978,7 +995,7 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc

remove--
if remove == 0 {
return stop, reconnecting
return stop
}
}
}
Expand All @@ -1002,19 +1019,11 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc

remove--
if remove == 0 {
return stop, reconnecting
return stop
}
}
}

// Handle allocs that might be able to reconnect.
if len(reconnecting) != 0 {
remove = a.computeStopByReconnecting(untainted, reconnecting, stop, remove)
if remove == 0 {
return stop, reconnecting
}
}

// Select the allocs with the highest count to remove
removeNames := nameIndex.Highest(uint(remove))
for id, alloc := range untainted {
Expand All @@ -1028,7 +1037,7 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc

remove--
if remove == 0 {
return stop, reconnecting
return stop
}
}
}
Expand All @@ -1045,95 +1054,152 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc

remove--
if remove == 0 {
return stop, reconnecting
return stop
}
}

return stop, reconnecting
return stop
}

// computeStopByReconnecting moves allocations from either the untainted or reconnecting
// sets to the stop set and returns the number of allocations that still need to be removed.
func (a *allocReconciler) computeStopByReconnecting(untainted, reconnecting, stop allocSet, remove int) int {
if remove == 0 {
return remove
}
// reconcileReconnecting receives the set of allocations that are reconnecting
// and all other allocations for the same group and determines which ones to
// reconnect which ones or stop.
//
// - Every reconnecting allocation MUST be present in one, and only one, of
// the returned set.
// - Every replacement allocation that is not preferred MUST be returned in
// the stop set.
// - Only reconnecting allocations are allowed to be present in the returned
// reconnect set.
// - If the reconnecting allocation is to be stopped, its replacements may
// not be present in any of the returned sets. The rest of the reconciler
// logic will handle them.
func (a *allocReconciler) reconcileReconnecting(reconnecting allocSet, others allocSet) (allocSet, allocSet) {
stop := make(allocSet)
reconnect := make(allocSet)

// Mark all failed reconnects for stop.
failedReconnects := reconnecting.filterByFailedReconnect()
stop = stop.union(failedReconnects)
a.markStop(failedReconnects, structs.AllocClientStatusFailed, allocRescheduled)
successfulReconnects := reconnecting.difference(failedReconnects)

for _, reconnectingAlloc := range reconnecting {
// if the desired status is not run, or if the user-specified desired
for _, reconnectingAlloc := range successfulReconnects {
// If the desired status is not run, or if the user-specified desired
// transition is not run, stop the reconnecting allocation.
if reconnectingAlloc.DesiredStatus != structs.AllocDesiredStatusRun ||
stopReconnecting := reconnectingAlloc.DesiredStatus != structs.AllocDesiredStatusRun ||
reconnectingAlloc.DesiredTransition.ShouldMigrate() ||
reconnectingAlloc.DesiredTransition.ShouldReschedule() ||
reconnectingAlloc.DesiredTransition.ShouldForceReschedule() ||
reconnectingAlloc.Job.Version < a.job.Version ||
reconnectingAlloc.Job.CreateIndex < a.job.CreateIndex {
reconnectingAlloc.Job.CreateIndex < a.job.CreateIndex

if stopReconnecting {
stop[reconnectingAlloc.ID] = reconnectingAlloc
a.result.stop = append(a.result.stop, allocStopResult{
alloc: reconnectingAlloc,
statusDescription: allocNotNeeded,
})
delete(reconnecting, reconnectingAlloc.ID)

remove--
// if we've removed all we need to, stop iterating and return.
if remove == 0 {
return remove
}
Comment on lines -1079 to -1083
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this may have been a bug as well where remove was miscalculated so we were only stopping a fixed number of replacements, not all of them. But I didn't investigate further since we don't do this anymore.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call: removing this logic lets computeStop do its job to reduce the total count if we need to, without any spooky action at a distance.

continue
}

// Compare reconnecting to untainted and decide which to keep.
for _, untaintedAlloc := range untainted {
// If not a match by name and previous alloc continue
if reconnectingAlloc.Name != untaintedAlloc.Name {
// By default, prefer stopping the replacement allocation, so assume
// all non-terminal reconnecting allocations will reconnect and ajust
// later when this is not the case.
reconnect[reconnectingAlloc.ID] = reconnectingAlloc

// Find replacement allocations and decide which one to stop. A
// reconnecting allocation may have multiple replacements.
for _, replacementAlloc := range others {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I also need to check that replacementAlloc.CreateIndex > reconnectingAlloc.CreateIndex otherwise in a scenario where both the original and one of its replacements are reconnecting, and they tie in all selection criteria, I think one alloc will stop the other because they will both think they are the original.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup! Fixed in 0b97e34.

// Skip the reconnecting allocation itself.
// Since a replacement may itslef be reconnecting, the set of all
// other allocations contains the reconnecting allocations as well.
if reconnectingAlloc.ID == replacementAlloc.ID {
continue
}

// By default, we prefer stopping the replacement alloc unless
// the replacement has a higher metrics score.
stopAlloc := untaintedAlloc
deleteSet := untainted
untaintedMaxScoreMeta := untaintedAlloc.Metrics.MaxNormScore()
reconnectingMaxScoreMeta := reconnectingAlloc.Metrics.MaxNormScore()

if untaintedMaxScoreMeta == nil {
a.logger.Error("error computing stop: replacement allocation metrics not available", "alloc_name", untaintedAlloc.Name, "alloc_id", untaintedAlloc.ID)
// Skip allocations that don't have the same name.
// Replacement allocations have the same name as the original.
if reconnectingAlloc.Name != replacementAlloc.Name {
continue
}

if reconnectingMaxScoreMeta == nil {
a.logger.Error("error computing stop: reconnecting allocation metrics not available", "alloc_name", reconnectingAlloc.Name, "alloc_id", reconnectingAlloc.ID)
// Skip allocations that are server terminal.
// We don't want to replace a reconnecting allocation with one that
// is or will terminate and we don't need to stop them since they
// are marked as terminal by the servers.
if replacementAlloc.ServerTerminalStatus() {
continue
}

statusDescription := allocNotNeeded
if untaintedAlloc.Job.Version > reconnectingAlloc.Job.Version ||
untaintedAlloc.Job.CreateIndex > reconnectingAlloc.Job.CreateIndex ||
untaintedMaxScoreMeta.NormScore > reconnectingMaxScoreMeta.NormScore {
stopAlloc = reconnectingAlloc
deleteSet = reconnecting
} else {
statusDescription = allocReconnected
}

stop[stopAlloc.ID] = stopAlloc
a.result.stop = append(a.result.stop, allocStopResult{
alloc: stopAlloc,
statusDescription: statusDescription,
})
delete(deleteSet, stopAlloc.ID)
// Pick which allocation we want to keep.
keepAlloc := pickReconnectingAlloc(reconnectingAlloc, replacementAlloc)
if keepAlloc == replacementAlloc {
// The replacement allocation is preferred, so stop the one
// reconnecting if not stopped yet.
if _, ok := stop[reconnectingAlloc.ID]; !ok {
stop[reconnectingAlloc.ID] = reconnectingAlloc
a.result.stop = append(a.result.stop, allocStopResult{
alloc: reconnectingAlloc,
statusDescription: allocNotNeeded,
})
}

remove--
// if we've removed all we need to, stop iterating and return.
if remove == 0 {
return remove
// Our assumption that all reconnecting allocations will
// reconnect was wrong, so remove this one from the reconnect
// set.
delete(reconnect, reconnectingAlloc.ID)
} else {
// The reconnecting allocation is preferred, so stop this
// replacement.
stop[replacementAlloc.ID] = replacementAlloc
a.result.stop = append(a.result.stop, allocStopResult{
alloc: replacementAlloc,
statusDescription: allocReconnected,
})
}
}
}

return remove
return reconnect, stop
}

// pickReconnectingAlloc returns the allocation to keep between the original
// one that is reconnecting and one of its replacements.
//
// This function is not commutative, meaning that pickReconnectingAlloc(A, B)
// is not the same as pickReconnectingAlloc(B, A). Preference is given to keep
// the original allocation when possible.
func pickReconnectingAlloc(original *structs.Allocation, replacement *structs.Allocation) *structs.Allocation {
// Check if the replacement is newer.
// Always prefer the replacement if true.
replacementIsNewer := replacement.Job.Version > original.Job.Version ||
replacement.Job.CreateIndex > original.Job.CreateIndex
if replacementIsNewer {
return replacement
}

// Check if the replacement has better placement score.
// If any of the scores is not available, only pick the replacement if
// itself does have scores.
originalMaxScoreMeta := original.Metrics.MaxNormScore()
replacementMaxScoreMeta := replacement.Metrics.MaxNormScore()

replacementHasBetterScore := originalMaxScoreMeta == nil && replacementMaxScoreMeta != nil ||
(originalMaxScoreMeta != nil && replacementMaxScoreMeta != nil &&
replacementMaxScoreMeta.NormScore > originalMaxScoreMeta.NormScore)
Comment on lines +1188 to +1190
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The existing implementation skips allocations without metrics, which I think may also be a source of inconsistent state. I don't know when MaxNormScore could be nil, but I turned those errors into a "best effort" check by preferring the original alloc unless enough metrics are available.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because these allocations are always existing allocations that have been thru the whole scheduler once already, I suspect NormScore is only ever nil in test scenarios and that was put in to catch panics because of bad test setup. I don't think it makes much of a difference and this code seems a little nicer. 👍


// Check if the replacement has better client status.
// Even with a better placement score make sure we don't replace a running
// allocation with one that is not.
replacementIsRunning := replacement.ClientStatus == structs.AllocClientStatusRunning
originalNotRunning := original.ClientStatus != structs.AllocClientStatusRunning
Comment on lines +1192 to +1196
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a new rule I added to prevent a replacement if the original alloc is still running but the replacement is not, even if it has better placement metric.

I first added it as an attempt to fix the original bug but it turns out the problem was something else. So I don't think this is strictly necessary, but it kind makes sense to not replace a running alloc with one that is not not?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's 3 sets of client states to worry about here: pending (for the replacement only), running, and terminal. We also have three checks in the caller that influence whether we ever even call this function: the reconnecting.filterByFailedReconnect() check (line 1082R), the stopReconnecting check (line 1090R) and the replacementAlloc.ServerTerminalStatus() check (line 1131R). That gives us the following table:

original client status replacement client status replacement server status what to do? done in this PR?
running running run keep highest score, stop the other ✔️
running running stop keep original, stop replacement ✔️
running pending run keep highest score, stop the other ✔️
running pending stop keep original, stop replacement ✔️
running terminal run keep original, stop replacement ✔️
running terminal stop keep original, stop replacement ✔️
terminal running run stop original, reconcile replacement ❌ (replacement not checked here?)
terminal running stop stop original, reconcile replacement ✔️
terminal pending run stop original, reconcile replacement ❌ (replacement not checked here?)
terminal pending stop stop original, reconcile replacement ✔️
terminal terminal run stop original, reconcile replacement ✔️
terminal terminal stop stop original, reconcile replacement ✔️

So it looks like the logic is correct except for maybe the case where the original is filtered by filterByFailedReconnect and the replacement is non-terminal. I'm not sure what happens to the replacement in that case -- does it just fall thru to the rest of the reconciler? In any case, I had to build a big ol' truth table to verify that for myself because the logic is split between this method and its caller. It's kinda hard to follow. Maybe we should move all these client status checks into the caller, so that we only ever call this method if we have two legitimate options?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to build a big ol' truth table to verify that for myself

Yup, I had to do the same thing, which should've been a signal that this logic is confusing 😅

But I looked into it again, and I'm not sure if we can move this ClientStatus check to the caller because it's an awkward conditional tie-breaker.

The way I approached this was using a process of elimination. There are three states we can safely stop the reconnecting allocation without needing to compare it with its replacements:

  • desired status not run (so stop or evict): handled by stopReconnecting, but now I'm wondering if we even need to stop in this case as opposed to just skip these allocs 🤷
  • client status failed: handled by filterByFailedReconnect, but we probably don't need this anymore and just handle it in the main loop.

That leaves 5 possible client status for original alloc:

  • lost and unknown: I don't think these are possible for a reconnecting alloc? I think lost is only used for groups without max_client_disconnect (which is by definition not the case for allocation that are reconnecting) and the allocation would not be in the reconnecting set if it was unknown, so we can probably ignore these.
  • complete: probably a no-op? Or maybe we still need to check and stop replacements? But only if not batch? Not sure, this seems a bit undefined 😅
  • pending and running: we need to compare with its replacements. If the replacement is newer we keep it, done. If the replacement has better score we need to check the client status.

And this is where the awkwardness lives. Outside this function we can't make a decision before comparing them and we don't know if the replacement was picked because it had a higher version or better placement (well, we can check again, but that would be kind of redundant).

I'm not sure what happens to the replacement in that case -- does it just fall thru to the rest of the reconciler?

That's right. My assumption here is the idea of merging timelines. When the client reconnects we assume the replacement allocations were being handled by the reconciler like any other, so if we don't need to stop them just let the reconciler keep managing it as it has been.

Copy link
Member

@tgross tgross Mar 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

client status failed: handled by filterByFailedReconnect, but we probably don't need this anymore and just handle it in the main loop.

The filterByFailedReconnect should pick up the complete case as well because that's terminal.

pending and running: we need to compare with its replacements. If the replacement is newer we keep it, done. If the replacement has better score we need to check the client status.

I probably could have explained it better, but I think we only need to compare if the replacement is running. So in other words, the check we're doing here could be before we compare scores and return original early. (And at that point, it could probably be hoisted up to the caller instead.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The filterByFailedReconnect should pick up the complete case as well because that's terminal.

It only looks for not server terminal and failed explicitly, so I think complete is not handled.

if !alloc.ServerTerminalStatus() && alloc.ClientStatus == structs.AllocClientStatusFailed {

I think we only need to compare if the replacement is running

...and has the same job version.

A new job version always wins because we don't want to keep outdated allocs, even if they are the original ones. So the client status check can be done before the score, but it can't be done before the job version check.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, good catch. Yeah it seems like there's no way to hoist this up.


if replacementHasBetterScore && (replacementIsRunning || originalNotRunning) {
return replacement
}

return original
}

// computeUpdates determines which allocations for the passed group require
Expand Down
Loading