diff --git a/.changelog/16609.txt b/.changelog/16609.txt new file mode 100644 index 000000000000..61de306525cb --- /dev/null +++ b/.changelog/16609.txt @@ -0,0 +1,3 @@ +```release-note:bug +scheduler: Fix reconciliation of reconnecting allocs when the replacement allocations are not running +``` diff --git a/scheduler/reconcile.go b/scheduler/reconcile.go index b3da414bd66e..54b434eec59b 100644 --- a/scheduler/reconcile.go +++ b/scheduler/reconcile.go @@ -428,6 +428,34 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool { untainted, migrate, lost, disconnecting, reconnecting, ignore := all.filterByTainted(a.taintedNodes, a.supportsDisconnectedClients, a.now) desiredChanges.Ignore += uint64(len(ignore)) + // If there are allocations reconnecting we need to reconcile them and + // their replacements first because there is specific logic when deciding + // which ones to keep that can only be applied when the client reconnects. + if len(reconnecting) > 0 { + // Pass all allocations because the replacements we need to find may be + // in any state, including themselves being reconnected. + reconnect, stop := a.reconcileReconnecting(reconnecting, all) + + // Stop the reconciled allocations and remove them from the other sets + // since they have been already handled. + desiredChanges.Stop += uint64(len(stop)) + + untainted = untainted.difference(stop) + migrate = migrate.difference(stop) + lost = lost.difference(stop) + disconnecting = disconnecting.difference(stop) + reconnecting = reconnecting.difference(stop) + ignore = ignore.difference(stop) + + // Validate and add reconnecting allocations to the plan so they are + // logged. + a.computeReconnecting(reconnect) + + // The rest of the reconnecting allocations is now untainted and will + // be further reconciled below. + untainted = untainted.union(reconnect) + } + // Determine what set of terminal allocations need to be rescheduled untainted, rescheduleNow, rescheduleLater := untainted.filterByRescheduleable(a.batch, false, a.now, a.evalID, a.deployment) @@ -459,14 +487,10 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool { // Stop any unneeded allocations and update the untainted set to not // include stopped allocations. isCanarying := dstate != nil && dstate.DesiredCanaries != 0 && !dstate.Promoted - stop, reconnecting := a.computeStop(tg, nameIndex, untainted, migrate, lost, canaries, reconnecting, isCanarying, lostLaterEvals) + stop := a.computeStop(tg, nameIndex, untainted, migrate, lost, canaries, isCanarying, lostLaterEvals) desiredChanges.Stop += uint64(len(stop)) untainted = untainted.difference(stop) - // Validate and add reconnecting allocs to the plan so that they will be logged. - a.computeReconnecting(reconnecting) - desiredChanges.Ignore += uint64(len(a.result.reconnectUpdates)) - // Do inplace upgrades where possible and capture the set of upgrades that // need to be done destructively. ignore, inplace, destructive := a.computeUpdates(tg, untainted) @@ -496,10 +520,9 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool { // * If there are any canaries that they have been promoted // * There is no delayed stop_after_client_disconnect alloc, which delays scheduling for the whole group // * An alloc was lost - // * There is not a corresponding reconnecting alloc. var place []allocPlaceResult if len(lostLater) == 0 { - place = a.computePlacements(tg, nameIndex, untainted, migrate, rescheduleNow, lost, reconnecting, isCanarying) + place = a.computePlacements(tg, nameIndex, untainted, migrate, rescheduleNow, lost, isCanarying) if !existingDeployment { dstate.DesiredTotal += len(place) } @@ -705,7 +728,7 @@ func (a *allocReconciler) computeUnderProvisionedBy(group *structs.TaskGroup, un // // Placements will meet or exceed group count. func (a *allocReconciler) computePlacements(group *structs.TaskGroup, - nameIndex *allocNameIndex, untainted, migrate, reschedule, lost, reconnecting allocSet, + nameIndex *allocNameIndex, untainted, migrate, reschedule, lost allocSet, isCanarying bool) []allocPlaceResult { // Add rescheduled placement results @@ -725,7 +748,7 @@ func (a *allocReconciler) computePlacements(group *structs.TaskGroup, } // Add replacements for disconnected and lost allocs up to group.Count - existing := len(untainted) + len(migrate) + len(reschedule) + len(reconnecting) - len(reconnecting.filterByFailedReconnect()) + existing := len(untainted) + len(migrate) + len(reschedule) // Add replacements for lost for _, alloc := range lost { @@ -935,28 +958,22 @@ func (a *allocReconciler) isDeploymentComplete(groupName string, destructive, in // the group definition, the set of allocations in various states and whether we // are canarying. func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *allocNameIndex, - untainted, migrate, lost, canaries, reconnecting allocSet, isCanarying bool, followupEvals map[string]string) (allocSet, allocSet) { + untainted, migrate, lost, canaries allocSet, isCanarying bool, followupEvals map[string]string) allocSet { // Mark all lost allocations for stop. var stop allocSet stop = stop.union(lost) a.markDelayed(lost, structs.AllocClientStatusLost, allocLost, followupEvals) - // Mark all failed reconnects for stop. - failedReconnects := reconnecting.filterByFailedReconnect() - stop = stop.union(failedReconnects) - a.markStop(failedReconnects, structs.AllocClientStatusFailed, allocRescheduled) - reconnecting = reconnecting.difference(failedReconnects) - // If we are still deploying or creating canaries, don't stop them if isCanarying { untainted = untainted.difference(canaries) } // Hot path the nothing to do case - remove := len(untainted) + len(migrate) + len(reconnecting) - group.Count + remove := len(untainted) + len(migrate) - group.Count if remove <= 0 { - return stop, reconnecting + return stop } // Filter out any terminal allocations from the untainted set @@ -978,7 +995,7 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc remove-- if remove == 0 { - return stop, reconnecting + return stop } } } @@ -1002,19 +1019,11 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc remove-- if remove == 0 { - return stop, reconnecting + return stop } } } - // Handle allocs that might be able to reconnect. - if len(reconnecting) != 0 { - remove = a.computeStopByReconnecting(untainted, reconnecting, stop, remove) - if remove == 0 { - return stop, reconnecting - } - } - // Select the allocs with the highest count to remove removeNames := nameIndex.Highest(uint(remove)) for id, alloc := range untainted { @@ -1028,7 +1037,7 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc remove-- if remove == 0 { - return stop, reconnecting + return stop } } } @@ -1045,95 +1054,152 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc remove-- if remove == 0 { - return stop, reconnecting + return stop } } - return stop, reconnecting + return stop } -// computeStopByReconnecting moves allocations from either the untainted or reconnecting -// sets to the stop set and returns the number of allocations that still need to be removed. -func (a *allocReconciler) computeStopByReconnecting(untainted, reconnecting, stop allocSet, remove int) int { - if remove == 0 { - return remove - } +// reconcileReconnecting receives the set of allocations that are reconnecting +// and all other allocations for the same group and determines which ones to +// reconnect which ones or stop. +// +// - Every reconnecting allocation MUST be present in one, and only one, of +// the returned set. +// - Every replacement allocation that is not preferred MUST be returned in +// the stop set. +// - Only reconnecting allocations are allowed to be present in the returned +// reconnect set. +// - If the reconnecting allocation is to be stopped, its replacements may +// not be present in any of the returned sets. The rest of the reconciler +// logic will handle them. +func (a *allocReconciler) reconcileReconnecting(reconnecting allocSet, others allocSet) (allocSet, allocSet) { + stop := make(allocSet) + reconnect := make(allocSet) for _, reconnectingAlloc := range reconnecting { - // if the desired status is not run, or if the user-specified desired + // Stop allocations that failed to reconnect. + reconnectFailed := !reconnectingAlloc.ServerTerminalStatus() && + reconnectingAlloc.ClientStatus == structs.AllocClientStatusFailed + + if reconnectFailed { + stop[reconnectingAlloc.ID] = reconnectingAlloc + a.result.stop = append(a.result.stop, allocStopResult{ + alloc: reconnectingAlloc, + clientStatus: structs.AllocClientStatusFailed, + statusDescription: allocRescheduled, + }) + continue + } + + // If the desired status is not run, or if the user-specified desired // transition is not run, stop the reconnecting allocation. - if reconnectingAlloc.DesiredStatus != structs.AllocDesiredStatusRun || + stopReconnecting := reconnectingAlloc.DesiredStatus != structs.AllocDesiredStatusRun || reconnectingAlloc.DesiredTransition.ShouldMigrate() || reconnectingAlloc.DesiredTransition.ShouldReschedule() || reconnectingAlloc.DesiredTransition.ShouldForceReschedule() || reconnectingAlloc.Job.Version < a.job.Version || - reconnectingAlloc.Job.CreateIndex < a.job.CreateIndex { + reconnectingAlloc.Job.CreateIndex < a.job.CreateIndex + if stopReconnecting { stop[reconnectingAlloc.ID] = reconnectingAlloc a.result.stop = append(a.result.stop, allocStopResult{ alloc: reconnectingAlloc, statusDescription: allocNotNeeded, }) - delete(reconnecting, reconnectingAlloc.ID) - - remove-- - // if we've removed all we need to, stop iterating and return. - if remove == 0 { - return remove - } continue } - // Compare reconnecting to untainted and decide which to keep. - for _, untaintedAlloc := range untainted { - // If not a match by name and previous alloc continue - if reconnectingAlloc.Name != untaintedAlloc.Name { + // Find replacement allocations and decide which one to stop. A + // reconnecting allocation may have multiple replacements. + for _, replacementAlloc := range others { + + // Skip allocations that are not a replacement of the one + // reconnecting. Replacement allocations have the same name but a + // higher CreateIndex and a different ID. + isReplacement := replacementAlloc.ID != reconnectingAlloc.ID && + replacementAlloc.Name == reconnectingAlloc.Name && + replacementAlloc.CreateIndex > reconnectingAlloc.CreateIndex + + // Skip allocations that are server terminal. + // We don't want to replace a reconnecting allocation with one that + // is or will terminate and we don't need to stop them since they + // are already marked as terminal by the servers. + if !isReplacement || replacementAlloc.ServerTerminalStatus() { continue } - // By default, we prefer stopping the replacement alloc unless - // the replacement has a higher metrics score. - stopAlloc := untaintedAlloc - deleteSet := untainted - untaintedMaxScoreMeta := untaintedAlloc.Metrics.MaxNormScore() - reconnectingMaxScoreMeta := reconnectingAlloc.Metrics.MaxNormScore() - - if untaintedMaxScoreMeta == nil { - a.logger.Error("error computing stop: replacement allocation metrics not available", "alloc_name", untaintedAlloc.Name, "alloc_id", untaintedAlloc.ID) - continue - } - - if reconnectingMaxScoreMeta == nil { - a.logger.Error("error computing stop: reconnecting allocation metrics not available", "alloc_name", reconnectingAlloc.Name, "alloc_id", reconnectingAlloc.ID) - continue - } - - statusDescription := allocNotNeeded - if untaintedAlloc.Job.Version > reconnectingAlloc.Job.Version || - untaintedAlloc.Job.CreateIndex > reconnectingAlloc.Job.CreateIndex || - untaintedMaxScoreMeta.NormScore > reconnectingMaxScoreMeta.NormScore { - stopAlloc = reconnectingAlloc - deleteSet = reconnecting + // Pick which allocation we want to keep. + keepAlloc := pickReconnectingAlloc(reconnectingAlloc, replacementAlloc) + if keepAlloc == replacementAlloc { + // The replacement allocation is preferred, so stop the one + // reconnecting if not stopped yet. + if _, ok := stop[reconnectingAlloc.ID]; !ok { + stop[reconnectingAlloc.ID] = reconnectingAlloc + a.result.stop = append(a.result.stop, allocStopResult{ + alloc: reconnectingAlloc, + statusDescription: allocNotNeeded, + }) + } } else { - statusDescription = allocReconnected + // The reconnecting allocation is preferred, so stop this + // replacement. + stop[replacementAlloc.ID] = replacementAlloc + a.result.stop = append(a.result.stop, allocStopResult{ + alloc: replacementAlloc, + statusDescription: allocReconnected, + }) } + } + } - stop[stopAlloc.ID] = stopAlloc - a.result.stop = append(a.result.stop, allocStopResult{ - alloc: stopAlloc, - statusDescription: statusDescription, - }) - delete(deleteSet, stopAlloc.ID) - - remove-- - // if we've removed all we need to, stop iterating and return. - if remove == 0 { - return remove - } + // Any reconnecting allocation not set to stop must be reconnected. + for _, alloc := range reconnecting { + if _, ok := stop[alloc.ID]; !ok { + reconnect[alloc.ID] = alloc } } - return remove + return reconnect, stop +} + +// pickReconnectingAlloc returns the allocation to keep between the original +// one that is reconnecting and one of its replacements. +// +// This function is not commutative, meaning that pickReconnectingAlloc(A, B) +// is not the same as pickReconnectingAlloc(B, A). Preference is given to keep +// the original allocation when possible. +func pickReconnectingAlloc(original *structs.Allocation, replacement *structs.Allocation) *structs.Allocation { + // Check if the replacement is newer. + // Always prefer the replacement if true. + replacementIsNewer := replacement.Job.Version > original.Job.Version || + replacement.Job.CreateIndex > original.Job.CreateIndex + if replacementIsNewer { + return replacement + } + + // Check if the replacement has better placement score. + // If any of the scores is not available, only pick the replacement if + // itself does have scores. + originalMaxScoreMeta := original.Metrics.MaxNormScore() + replacementMaxScoreMeta := replacement.Metrics.MaxNormScore() + + replacementHasBetterScore := originalMaxScoreMeta == nil && replacementMaxScoreMeta != nil || + (originalMaxScoreMeta != nil && replacementMaxScoreMeta != nil && + replacementMaxScoreMeta.NormScore > originalMaxScoreMeta.NormScore) + + // Check if the replacement has better client status. + // Even with a better placement score make sure we don't replace a running + // allocation with one that is not. + replacementIsRunning := replacement.ClientStatus == structs.AllocClientStatusRunning + originalNotRunning := original.ClientStatus != structs.AllocClientStatusRunning + + if replacementHasBetterScore && (replacementIsRunning || originalNotRunning) { + return replacement + } + + return original } // computeUpdates determines which allocations for the passed group require diff --git a/scheduler/reconcile_test.go b/scheduler/reconcile_test.go index e3a9f01d36b8..b8494558c22a 100644 --- a/scheduler/reconcile_test.go +++ b/scheduler/reconcile_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/hashicorp/go-set" "github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/helper/pointer" "github.com/hashicorp/nomad/helper/testlog" @@ -5328,6 +5329,9 @@ func TestReconciler_Disconnected_Client(t *testing.T) { nodeStatusDisconnected bool replace bool failReplacement bool + taintReplacement bool + disconnectReplacement bool + replaceFailedReplacement bool shouldStopOnDisconnectedNode bool maxDisconnect *time.Duration expected *resultExpectation @@ -5449,6 +5453,66 @@ func TestReconciler_Disconnected_Client(t *testing.T) { }, }, }, + { + name: "keep-original-alloc-and-stop-failed-replacement", + allocCount: 3, + replace: true, + failReplacement: true, + disconnectedAllocCount: 2, + disconnectedAllocStatus: structs.AllocClientStatusRunning, + disconnectedAllocStates: disconnectAllocState, + serverDesiredStatus: structs.AllocDesiredStatusRun, + expected: &resultExpectation{ + reconnectUpdates: 2, + stop: 2, + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + "web": { + Ignore: 3, + Stop: 2, + }, + }, + }, + }, + { + name: "keep-original-and-stop-reconnecting-replacement", + allocCount: 2, + replace: true, + disconnectReplacement: true, + disconnectedAllocCount: 1, + disconnectedAllocStatus: structs.AllocClientStatusRunning, + disconnectedAllocStates: disconnectAllocState, + serverDesiredStatus: structs.AllocDesiredStatusRun, + expected: &resultExpectation{ + reconnectUpdates: 1, + stop: 1, + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + "web": { + Ignore: 2, + Stop: 1, + }, + }, + }, + }, + { + name: "keep-original-and-stop-tainted-replacement", + allocCount: 3, + replace: true, + taintReplacement: true, + disconnectedAllocCount: 2, + disconnectedAllocStatus: structs.AllocClientStatusRunning, + disconnectedAllocStates: disconnectAllocState, + serverDesiredStatus: structs.AllocDesiredStatusRun, + expected: &resultExpectation{ + reconnectUpdates: 2, + stop: 2, + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + "web": { + Ignore: 3, + Stop: 2, + }, + }, + }, + }, { name: "stop-original-alloc-with-old-job-version", allocCount: 5, @@ -5490,14 +5554,15 @@ func TestReconciler_Disconnected_Client(t *testing.T) { }, }, { - name: "stop-original-alloc-with-old-job-version-and-failed-replacements", + name: "stop-original-alloc-with-old-job-version-and-failed-replacements-replaced", allocCount: 5, replace: true, + failReplacement: true, + replaceFailedReplacement: true, disconnectedAllocCount: 2, disconnectedAllocStatus: structs.AllocClientStatusRunning, disconnectedAllocStates: disconnectAllocState, serverDesiredStatus: structs.AllocDesiredStatusRun, - failReplacement: true, shouldStopOnDisconnectedNode: true, jobVersionIncrement: 1, expected: &resultExpectation{ @@ -5530,6 +5595,28 @@ func TestReconciler_Disconnected_Client(t *testing.T) { }, }, }, + { + name: "stop-failed-original-and-failed-replacements-and-place-new", + allocCount: 5, + replace: true, + failReplacement: true, + disconnectedAllocCount: 2, + disconnectedAllocStatus: structs.AllocClientStatusFailed, + disconnectedAllocStates: disconnectAllocState, + serverDesiredStatus: structs.AllocDesiredStatusRun, + shouldStopOnDisconnectedNode: true, + expected: &resultExpectation{ + stop: 4, + place: 2, + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + "web": { + Stop: 4, + Place: 2, + Ignore: 3, + }, + }, + }, + }, { name: "stop-expired-allocs", allocCount: 5, @@ -5585,6 +5672,11 @@ func TestReconciler_Disconnected_Client(t *testing.T) { // Create resumable allocs job, allocs := buildResumableAllocations(tc.allocCount, structs.AllocClientStatusRunning, structs.AllocDesiredStatusRun, 2) + origAllocs := set.New[string](len(allocs)) + for _, alloc := range allocs { + origAllocs.Insert(alloc.ID) + } + if tc.isBatch { job.Type = structs.JobTypeBatch } @@ -5623,6 +5715,7 @@ func TestReconciler_Disconnected_Client(t *testing.T) { replacement.PreviousAllocation = alloc.ID replacement.AllocStates = nil replacement.TaskStates = nil + replacement.CreateIndex += 1 alloc.NextAllocation = replacement.ID if tc.jobVersionIncrement != 0 { @@ -5631,19 +5724,33 @@ func TestReconciler_Disconnected_Client(t *testing.T) { if tc.nodeScoreIncrement != 0 { replacement.Metrics.ScoreMetaData[0].NormScore = replacement.Metrics.ScoreMetaData[0].NormScore + tc.nodeScoreIncrement } - - replacements = append(replacements, replacement) + if tc.taintReplacement { + replacement.DesiredTransition.Migrate = pointer.Of(true) + } + if tc.disconnectReplacement { + replacement.AllocStates = tc.disconnectedAllocStates + } // If we want to test intermediate replacement failures simulate that. if tc.failReplacement { replacement.ClientStatus = structs.AllocClientStatusFailed - nextReplacement := replacement.Copy() - nextReplacement.ID = uuid.Generate() - nextReplacement.ClientStatus = structs.AllocClientStatusRunning - nextReplacement.PreviousAllocation = replacement.ID - replacement.NextAllocation = nextReplacement.ID - replacements = append(replacements, nextReplacement) + + if tc.replaceFailedReplacement { + nextReplacement := replacement.Copy() + nextReplacement.ID = uuid.Generate() + nextReplacement.ClientStatus = structs.AllocClientStatusRunning + nextReplacement.DesiredStatus = structs.AllocDesiredStatusRun + nextReplacement.PreviousAllocation = replacement.ID + nextReplacement.CreateIndex += 1 + + replacement.NextAllocation = nextReplacement.ID + replacement.DesiredStatus = structs.AllocDesiredStatusStop + + replacements = append(replacements, nextReplacement) + } } + + replacements = append(replacements, replacement) } allocs = append(allocs, replacements...) @@ -5661,6 +5768,11 @@ func TestReconciler_Disconnected_Client(t *testing.T) { assertResults(t, results, tc.expected) for _, stopResult := range results.stop { + // Skip replacement allocs. + if !origAllocs.Contains(stopResult.alloc.ID) { + continue + } + if tc.shouldStopOnDisconnectedNode { require.Equal(t, testNode.ID, stopResult.alloc.NodeID) } else { diff --git a/scheduler/reconcile_util.go b/scheduler/reconcile_util.go index 42e8fb8cd6b5..c080481b0c87 100644 --- a/scheduler/reconcile_util.go +++ b/scheduler/reconcile_util.go @@ -520,19 +520,6 @@ func (a allocSet) filterByDeployment(id string) (match, nonmatch allocSet) { return } -// filterByFailedReconnect filters allocation into a set that have failed on the -// client but do not have a terminal status at the server so that they can be -// marked as stop at the server. -func (a allocSet) filterByFailedReconnect() allocSet { - failed := make(allocSet) - for _, alloc := range a { - if !alloc.ServerTerminalStatus() && alloc.ClientStatus == structs.AllocClientStatusFailed { - failed[alloc.ID] = alloc - } - } - return failed -} - // delayByStopAfterClientDisconnect returns a delay for any lost allocation that's got a // stop_after_client_disconnect configured func (a allocSet) delayByStopAfterClientDisconnect() (later []*delayedRescheduleInfo) {