From 7e334ff921b8041291757a3deb47ac381191c5e4 Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Mon, 20 Mar 2023 23:27:35 -0400 Subject: [PATCH 1/7] scheduler: reconnect alloc with an failed replacement --- scheduler/reconcile.go | 218 +++++++++++++++++++++++++---------------- 1 file changed, 134 insertions(+), 84 deletions(-) diff --git a/scheduler/reconcile.go b/scheduler/reconcile.go index b3da414bd66e..8b149b36f9a1 100644 --- a/scheduler/reconcile.go +++ b/scheduler/reconcile.go @@ -428,6 +428,25 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool { untainted, migrate, lost, disconnecting, reconnecting, ignore := all.filterByTainted(a.taintedNodes, a.supportsDisconnectedClients, a.now) desiredChanges.Ignore += uint64(len(ignore)) + // If there are allocations reconnecting we need to reconcile its + // replacements first because there are specific business logic when + // deciding which one to keep. + if len(reconnecting) > 0 { + reconnect, stop := a.reconcileReconnecting(reconnecting, all) + + // Stop the reconnecting allocations that we don't need anymore. + desiredChanges.Stop += uint64(len(stop)) + untainted = untainted.difference(stop) + + // Validate and add reconnecting allocations to the plan so they are + // logged. + a.computeReconnecting(reconnect) + + // The rest of the reconnecting allocations is now untainted and will + // be further reconciled below. + untainted = untainted.union(reconnect) + } + // Determine what set of terminal allocations need to be rescheduled untainted, rescheduleNow, rescheduleLater := untainted.filterByRescheduleable(a.batch, false, a.now, a.evalID, a.deployment) @@ -459,14 +478,10 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool { // Stop any unneeded allocations and update the untainted set to not // include stopped allocations. isCanarying := dstate != nil && dstate.DesiredCanaries != 0 && !dstate.Promoted - stop, reconnecting := a.computeStop(tg, nameIndex, untainted, migrate, lost, canaries, reconnecting, isCanarying, lostLaterEvals) + stop := a.computeStop(tg, nameIndex, untainted, migrate, lost, canaries, isCanarying, lostLaterEvals) desiredChanges.Stop += uint64(len(stop)) untainted = untainted.difference(stop) - // Validate and add reconnecting allocs to the plan so that they will be logged. - a.computeReconnecting(reconnecting) - desiredChanges.Ignore += uint64(len(a.result.reconnectUpdates)) - // Do inplace upgrades where possible and capture the set of upgrades that // need to be done destructively. ignore, inplace, destructive := a.computeUpdates(tg, untainted) @@ -496,10 +511,9 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool { // * If there are any canaries that they have been promoted // * There is no delayed stop_after_client_disconnect alloc, which delays scheduling for the whole group // * An alloc was lost - // * There is not a corresponding reconnecting alloc. var place []allocPlaceResult if len(lostLater) == 0 { - place = a.computePlacements(tg, nameIndex, untainted, migrate, rescheduleNow, lost, reconnecting, isCanarying) + place = a.computePlacements(tg, nameIndex, untainted, migrate, rescheduleNow, lost, isCanarying) if !existingDeployment { dstate.DesiredTotal += len(place) } @@ -705,7 +719,7 @@ func (a *allocReconciler) computeUnderProvisionedBy(group *structs.TaskGroup, un // // Placements will meet or exceed group count. func (a *allocReconciler) computePlacements(group *structs.TaskGroup, - nameIndex *allocNameIndex, untainted, migrate, reschedule, lost, reconnecting allocSet, + nameIndex *allocNameIndex, untainted, migrate, reschedule, lost allocSet, isCanarying bool) []allocPlaceResult { // Add rescheduled placement results @@ -725,7 +739,7 @@ func (a *allocReconciler) computePlacements(group *structs.TaskGroup, } // Add replacements for disconnected and lost allocs up to group.Count - existing := len(untainted) + len(migrate) + len(reschedule) + len(reconnecting) - len(reconnecting.filterByFailedReconnect()) + existing := len(untainted) + len(migrate) + len(reschedule) // Add replacements for lost for _, alloc := range lost { @@ -935,28 +949,22 @@ func (a *allocReconciler) isDeploymentComplete(groupName string, destructive, in // the group definition, the set of allocations in various states and whether we // are canarying. func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *allocNameIndex, - untainted, migrate, lost, canaries, reconnecting allocSet, isCanarying bool, followupEvals map[string]string) (allocSet, allocSet) { + untainted, migrate, lost, canaries allocSet, isCanarying bool, followupEvals map[string]string) allocSet { // Mark all lost allocations for stop. var stop allocSet stop = stop.union(lost) a.markDelayed(lost, structs.AllocClientStatusLost, allocLost, followupEvals) - // Mark all failed reconnects for stop. - failedReconnects := reconnecting.filterByFailedReconnect() - stop = stop.union(failedReconnects) - a.markStop(failedReconnects, structs.AllocClientStatusFailed, allocRescheduled) - reconnecting = reconnecting.difference(failedReconnects) - // If we are still deploying or creating canaries, don't stop them if isCanarying { untainted = untainted.difference(canaries) } // Hot path the nothing to do case - remove := len(untainted) + len(migrate) + len(reconnecting) - group.Count + remove := len(untainted) + len(migrate) - group.Count if remove <= 0 { - return stop, reconnecting + return stop } // Filter out any terminal allocations from the untainted set @@ -978,7 +986,7 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc remove-- if remove == 0 { - return stop, reconnecting + return stop } } } @@ -1002,19 +1010,11 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc remove-- if remove == 0 { - return stop, reconnecting + return stop } } } - // Handle allocs that might be able to reconnect. - if len(reconnecting) != 0 { - remove = a.computeStopByReconnecting(untainted, reconnecting, stop, remove) - if remove == 0 { - return stop, reconnecting - } - } - // Select the allocs with the highest count to remove removeNames := nameIndex.Highest(uint(remove)) for id, alloc := range untainted { @@ -1028,7 +1028,7 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc remove-- if remove == 0 { - return stop, reconnecting + return stop } } } @@ -1045,95 +1045,145 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc remove-- if remove == 0 { - return stop, reconnecting + return stop } } - return stop, reconnecting + return stop } -// computeStopByReconnecting moves allocations from either the untainted or reconnecting -// sets to the stop set and returns the number of allocations that still need to be removed. -func (a *allocReconciler) computeStopByReconnecting(untainted, reconnecting, stop allocSet, remove int) int { - if remove == 0 { - return remove - } +// reconcileReconnecting receives the set of allocations that are reconnecting +// and all other allocations for the same group and determines which ones to +// reconnect which ones or stop. +// +// - Every reconnecting allocation MUST be present in either returned set. +// - Every replacement allocation that is not preferred MUST be returned in +// the stop set. +// - Only reconnecting allocations are allowed to be returned in the +// reconnect set. +// - If the reconnecting allocation is to be stopped, its replacements may +// not be present in any of the returned sets. The rest of the reconciler +// logic will handle them. +func (a *allocReconciler) reconcileReconnecting(reconnecting allocSet, others allocSet) (allocSet, allocSet) { + stop := make(allocSet) + reconnect := make(allocSet) + + // Mark all failed reconnects for stop. + failedReconnects := reconnecting.filterByFailedReconnect() + stop = stop.union(failedReconnects) + a.markStop(failedReconnects, structs.AllocClientStatusFailed, allocRescheduled) + successfulReconnects := reconnecting.difference(failedReconnects) - for _, reconnectingAlloc := range reconnecting { - // if the desired status is not run, or if the user-specified desired + for _, reconnectingAlloc := range successfulReconnects { + // If the desired status is not run, or if the user-specified desired // transition is not run, stop the reconnecting allocation. - if reconnectingAlloc.DesiredStatus != structs.AllocDesiredStatusRun || + stopReconnecting := reconnectingAlloc.DesiredStatus != structs.AllocDesiredStatusRun || reconnectingAlloc.DesiredTransition.ShouldMigrate() || reconnectingAlloc.DesiredTransition.ShouldReschedule() || reconnectingAlloc.DesiredTransition.ShouldForceReschedule() || reconnectingAlloc.Job.Version < a.job.Version || - reconnectingAlloc.Job.CreateIndex < a.job.CreateIndex { + reconnectingAlloc.Job.CreateIndex < a.job.CreateIndex + if stopReconnecting { stop[reconnectingAlloc.ID] = reconnectingAlloc a.result.stop = append(a.result.stop, allocStopResult{ alloc: reconnectingAlloc, statusDescription: allocNotNeeded, }) - delete(reconnecting, reconnectingAlloc.ID) - - remove-- - // if we've removed all we need to, stop iterating and return. - if remove == 0 { - return remove - } continue } - // Compare reconnecting to untainted and decide which to keep. - for _, untaintedAlloc := range untainted { - // If not a match by name and previous alloc continue - if reconnectingAlloc.Name != untaintedAlloc.Name { - continue - } + // By default, prefer stopping the replacement allocation, so assume + // all non-terminal reconnecting allocations will reconnect. + reconnect[reconnectingAlloc.ID] = reconnectingAlloc - // By default, we prefer stopping the replacement alloc unless - // the replacement has a higher metrics score. - stopAlloc := untaintedAlloc - deleteSet := untainted - untaintedMaxScoreMeta := untaintedAlloc.Metrics.MaxNormScore() - reconnectingMaxScoreMeta := reconnectingAlloc.Metrics.MaxNormScore() - - if untaintedMaxScoreMeta == nil { - a.logger.Error("error computing stop: replacement allocation metrics not available", "alloc_name", untaintedAlloc.Name, "alloc_id", untaintedAlloc.ID) + // Find replacement allocations and decide which one to stop. A + // reconnecting allocation may have multiple replacements. + for _, replacementAlloc := range others { + // spiderman_meme.jpg + if reconnectingAlloc.ID == replacementAlloc.ID { continue } - if reconnectingMaxScoreMeta == nil { - a.logger.Error("error computing stop: reconnecting allocation metrics not available", "alloc_name", reconnectingAlloc.Name, "alloc_id", reconnectingAlloc.ID) + // Replacement allocations have the same name as the original so + // skip the ones that don't. + if reconnectingAlloc.Name != replacementAlloc.Name { continue } - statusDescription := allocNotNeeded - if untaintedAlloc.Job.Version > reconnectingAlloc.Job.Version || - untaintedAlloc.Job.CreateIndex > reconnectingAlloc.Job.CreateIndex || - untaintedMaxScoreMeta.NormScore > reconnectingMaxScoreMeta.NormScore { - stopAlloc = reconnectingAlloc - deleteSet = reconnecting - } else { - statusDescription = allocReconnected + // Skip allocations that are server terminal since they are + // already set to stop. + if replacementAlloc.ServerTerminalStatus() { + continue } - stop[stopAlloc.ID] = stopAlloc - a.result.stop = append(a.result.stop, allocStopResult{ - alloc: stopAlloc, - statusDescription: statusDescription, - }) - delete(deleteSet, stopAlloc.ID) + keepAlloc := pickReconnectingAlloc(reconnectingAlloc, replacementAlloc) + if keepAlloc == replacementAlloc { + // The replacement allocation is preferred, so stop the one + // that is reconnecting if not stopped yet. + if _, ok := stop[reconnectingAlloc.ID]; !ok { + stop[reconnectingAlloc.ID] = reconnectingAlloc + a.result.stop = append(a.result.stop, allocStopResult{ + alloc: reconnectingAlloc, + statusDescription: allocNotNeeded, + }) + } - remove-- - // if we've removed all we need to, stop iterating and return. - if remove == 0 { - return remove + // Our assumption that all non-terminal reconnecting + // allocations will reconnect was wrong, so remove it from the + // reconnect set. + delete(reconnect, reconnectingAlloc.ID) + } else { + // The reconnecting allocation is preferred, so stop this + // replacement. + stop[replacementAlloc.ID] = replacementAlloc + a.result.stop = append(a.result.stop, allocStopResult{ + alloc: replacementAlloc, + statusDescription: allocReconnected, + }) } } } - return remove + return reconnect, stop +} + +// pickReconnectingAlloc returns the allocation to keep between the original +// one that is reconnecting and one of its replacements. +// +// This function is not commutative, meaning that pickReconnectingAlloc(A, B) +// is not the same as pickReconnectingAlloc(B, A). Preference is given to keep +// the original allocation when possible. +func pickReconnectingAlloc(original *structs.Allocation, replacement *structs.Allocation) *structs.Allocation { + // Check if the replacement is newer. + // Always prefer the replacement if true. + replacementIsNewer := replacement.Job.Version > original.Job.Version || + replacement.Job.CreateIndex > original.Job.CreateIndex + if replacementIsNewer { + return replacement + } + + // Check if the replacement has better placement score. + // If any of the scores is not available, only pick the replacement if + // itself does have scores. + originalMaxScoreMeta := original.Metrics.MaxNormScore() + replacementMaxScoreMeta := replacement.Metrics.MaxNormScore() + + replacementHasBetterScore := originalMaxScoreMeta == nil && replacementMaxScoreMeta != nil || + (originalMaxScoreMeta != nil && replacementMaxScoreMeta != nil && + replacementMaxScoreMeta.NormScore > originalMaxScoreMeta.NormScore) + + // Check if the replacement has better client status. + // Even with a better placement score make sure we don't replace a running + // allocation with one that is not. + replacementIsRunning := replacement.ClientStatus == structs.AllocClientStatusRunning + originalNotRunning := original.ClientStatus != structs.AllocClientStatusRunning + + if replacementHasBetterScore && (replacementIsRunning || originalNotRunning) { + return replacement + } + + return original } // computeUpdates determines which allocations for the passed group require From 3ab49e9748729dcb2050336579b9c92a292c1b34 Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Wed, 22 Mar 2023 15:16:26 -0400 Subject: [PATCH 2/7] scheduler: fix reconciliation of reconnecting allocs When a disconnect client reconnects the `allocReconciler` must find the allocations that were created to replace the original disconnected allocations. This process was being done in only a subset of non-terminal untainted allocations, meaning that, if the replacement allocations were not in this state the reconciler didn't stop them, leaving the job in an inconsistent state. This inconsistency is only solved in a future job evaluation, but at that point the allocation is considered reconnected and so the specific reconnection logic was not applied, leading to unexpected outcomes. This commit fixes the problem by running reconnecting allocation reconciliation logic earlier into the process, leaving the rest of the reconciler oblivious of reconnecting allocations. It also uses the full set of allocations to search for replacements, stopping them even if they are not in the `untainted` set. The system `SystemScheduler` is not affected by this bug because disconnected clients don't trigger replacements: every eligible client is already running an allocation. --- scheduler/reconcile.go | 48 ++++++++----- scheduler/reconcile_test.go | 133 +++++++++++++++++++++++++++++------- 2 files changed, 141 insertions(+), 40 deletions(-) diff --git a/scheduler/reconcile.go b/scheduler/reconcile.go index 8b149b36f9a1..cb601baa9638 100644 --- a/scheduler/reconcile.go +++ b/scheduler/reconcile.go @@ -428,15 +428,24 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool { untainted, migrate, lost, disconnecting, reconnecting, ignore := all.filterByTainted(a.taintedNodes, a.supportsDisconnectedClients, a.now) desiredChanges.Ignore += uint64(len(ignore)) - // If there are allocations reconnecting we need to reconcile its - // replacements first because there are specific business logic when - // deciding which one to keep. + // If there are allocations reconnecting we need to reconcile them and + // their replacements first because there is specific logic when deciding + // which ones to keep that can only be applied when the client reconnects. if len(reconnecting) > 0 { + // Pass all allocations becasue the replacements we need to find may be + // in any state, including themselves being reconnected. reconnect, stop := a.reconcileReconnecting(reconnecting, all) - // Stop the reconnecting allocations that we don't need anymore. + // Stop the reconciled allocations and remove them from the other sets + // since they have been already handled. desiredChanges.Stop += uint64(len(stop)) + untainted = untainted.difference(stop) + migrate = migrate.difference(stop) + lost = lost.difference(stop) + disconnecting = disconnecting.difference(stop) + reconnecting = reconnecting.difference(stop) + ignore = ignore.difference(stop) // Validate and add reconnecting allocations to the plan so they are // logged. @@ -1056,10 +1065,11 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc // and all other allocations for the same group and determines which ones to // reconnect which ones or stop. // -// - Every reconnecting allocation MUST be present in either returned set. +// - Every reconnecting allocation MUST be present in one, and only one, of +// the returned set. // - Every replacement allocation that is not preferred MUST be returned in // the stop set. -// - Only reconnecting allocations are allowed to be returned in the +// - Only reconnecting allocations are allowed to be present in the returned // reconnect set. // - If the reconnecting allocation is to be stopped, its replacements may // not be present in any of the returned sets. The rest of the reconciler @@ -1094,33 +1104,39 @@ func (a *allocReconciler) reconcileReconnecting(reconnecting allocSet, others al } // By default, prefer stopping the replacement allocation, so assume - // all non-terminal reconnecting allocations will reconnect. + // all non-terminal reconnecting allocations will reconnect and ajust + // later when this is not the case. reconnect[reconnectingAlloc.ID] = reconnectingAlloc // Find replacement allocations and decide which one to stop. A // reconnecting allocation may have multiple replacements. for _, replacementAlloc := range others { - // spiderman_meme.jpg + // Skip the reconnecting allocation itself. + // Since a replacement may itslef be reconnecting, the set of all + // other allocations contains the reconnecting allocations as well. if reconnectingAlloc.ID == replacementAlloc.ID { continue } - // Replacement allocations have the same name as the original so - // skip the ones that don't. + // Skip allocations that don't have the same name. + // Replacement allocations have the same name as the original. if reconnectingAlloc.Name != replacementAlloc.Name { continue } - // Skip allocations that are server terminal since they are - // already set to stop. + // Skip allocations that are server terminal. + // We don't want to replace a reconnecting allocation with one that + // is or will terminate and we don't need to stop them since they + // are marked as terminal by the servers. if replacementAlloc.ServerTerminalStatus() { continue } + // Pick which allocation we want to keep. keepAlloc := pickReconnectingAlloc(reconnectingAlloc, replacementAlloc) if keepAlloc == replacementAlloc { // The replacement allocation is preferred, so stop the one - // that is reconnecting if not stopped yet. + // reconnecting if not stopped yet. if _, ok := stop[reconnectingAlloc.ID]; !ok { stop[reconnectingAlloc.ID] = reconnectingAlloc a.result.stop = append(a.result.stop, allocStopResult{ @@ -1129,9 +1145,9 @@ func (a *allocReconciler) reconcileReconnecting(reconnecting allocSet, others al }) } - // Our assumption that all non-terminal reconnecting - // allocations will reconnect was wrong, so remove it from the - // reconnect set. + // Our assumption that all reconnecting allocations will + // reconnect was wrong, so remove this one from the reconnect + // set. delete(reconnect, reconnectingAlloc.ID) } else { // The reconnecting allocation is preferred, so stop this diff --git a/scheduler/reconcile_test.go b/scheduler/reconcile_test.go index e3a9f01d36b8..7a9061b40689 100644 --- a/scheduler/reconcile_test.go +++ b/scheduler/reconcile_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/hashicorp/go-set" "github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/helper/pointer" "github.com/hashicorp/nomad/helper/testlog" @@ -5316,21 +5317,24 @@ func TestReconciler_Disconnected_Client(t *testing.T) { }} type testCase struct { - name string - allocCount int - disconnectedAllocCount int - jobVersionIncrement uint64 - nodeScoreIncrement float64 - disconnectedAllocStatus string - disconnectedAllocStates []*structs.AllocState - serverDesiredStatus string - isBatch bool - nodeStatusDisconnected bool - replace bool - failReplacement bool - shouldStopOnDisconnectedNode bool - maxDisconnect *time.Duration - expected *resultExpectation + name string + allocCount int + disconnectedAllocCount int + jobVersionIncrement uint64 + nodeScoreIncrement float64 + disconnectedAllocStatus string + disconnectedAllocStates []*structs.AllocState + serverDesiredStatus string + isBatch bool + nodeStatusDisconnected bool + replace bool + failReplacement bool + replaceFailedReplacement bool + replacementAllocClientStatus string + replacementAllocDesiredStatus string + shouldStopOnDisconnectedNode bool + maxDisconnect *time.Duration + expected *resultExpectation } testCases := []testCase{ @@ -5449,6 +5453,47 @@ func TestReconciler_Disconnected_Client(t *testing.T) { }, }, }, + { + name: "keep-original-alloc-and-stop-failed-replacement", + allocCount: 3, + replace: true, + disconnectedAllocCount: 2, + disconnectedAllocStatus: structs.AllocClientStatusRunning, + disconnectedAllocStates: disconnectAllocState, + serverDesiredStatus: structs.AllocDesiredStatusRun, + failReplacement: true, + expected: &resultExpectation{ + reconnectUpdates: 2, + stop: 2, + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + "web": { + Ignore: 3, + Stop: 2, + }, + }, + }, + }, + { + name: "keep-original-and-stop-tainted-replacement", + allocCount: 3, + replace: true, + disconnectedAllocCount: 2, + disconnectedAllocStatus: structs.AllocClientStatusRunning, + disconnectedAllocStates: disconnectAllocState, + serverDesiredStatus: structs.AllocDesiredStatusRun, + failReplacement: true, + replacementAllocClientStatus: structs.AllocClientStatusUnknown, + expected: &resultExpectation{ + reconnectUpdates: 2, + stop: 2, + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + "web": { + Ignore: 3, + Stop: 2, + }, + }, + }, + }, { name: "stop-original-alloc-with-old-job-version", allocCount: 5, @@ -5490,7 +5535,7 @@ func TestReconciler_Disconnected_Client(t *testing.T) { }, }, { - name: "stop-original-alloc-with-old-job-version-and-failed-replacements", + name: "stop-original-alloc-with-old-job-version-and-failed-replacements-replaced", allocCount: 5, replace: true, disconnectedAllocCount: 2, @@ -5498,6 +5543,7 @@ func TestReconciler_Disconnected_Client(t *testing.T) { disconnectedAllocStates: disconnectAllocState, serverDesiredStatus: structs.AllocDesiredStatusRun, failReplacement: true, + replaceFailedReplacement: true, shouldStopOnDisconnectedNode: true, jobVersionIncrement: 1, expected: &resultExpectation{ @@ -5530,6 +5576,28 @@ func TestReconciler_Disconnected_Client(t *testing.T) { }, }, }, + { + name: "stop-failed-original-and-failed-replacements-and-place-new", + allocCount: 5, + replace: true, + disconnectedAllocCount: 2, + disconnectedAllocStatus: structs.AllocClientStatusFailed, + disconnectedAllocStates: disconnectAllocState, + serverDesiredStatus: structs.AllocDesiredStatusRun, + failReplacement: true, + shouldStopOnDisconnectedNode: true, + expected: &resultExpectation{ + stop: 4, + place: 2, + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + "web": { + Stop: 4, + Place: 2, + Ignore: 3, + }, + }, + }, + }, { name: "stop-expired-allocs", allocCount: 5, @@ -5585,6 +5653,11 @@ func TestReconciler_Disconnected_Client(t *testing.T) { // Create resumable allocs job, allocs := buildResumableAllocations(tc.allocCount, structs.AllocClientStatusRunning, structs.AllocDesiredStatusRun, 2) + origAllocs := set.New[string](len(allocs)) + for _, alloc := range allocs { + origAllocs.Insert(alloc.ID) + } + if tc.isBatch { job.Type = structs.JobTypeBatch } @@ -5632,18 +5705,25 @@ func TestReconciler_Disconnected_Client(t *testing.T) { replacement.Metrics.ScoreMetaData[0].NormScore = replacement.Metrics.ScoreMetaData[0].NormScore + tc.nodeScoreIncrement } - replacements = append(replacements, replacement) - // If we want to test intermediate replacement failures simulate that. if tc.failReplacement { replacement.ClientStatus = structs.AllocClientStatusFailed - nextReplacement := replacement.Copy() - nextReplacement.ID = uuid.Generate() - nextReplacement.ClientStatus = structs.AllocClientStatusRunning - nextReplacement.PreviousAllocation = replacement.ID - replacement.NextAllocation = nextReplacement.ID - replacements = append(replacements, nextReplacement) + + if tc.replaceFailedReplacement { + nextReplacement := replacement.Copy() + nextReplacement.ID = uuid.Generate() + nextReplacement.ClientStatus = structs.AllocClientStatusRunning + nextReplacement.DesiredStatus = structs.AllocClientStatusRunning + nextReplacement.PreviousAllocation = replacement.ID + + replacement.NextAllocation = nextReplacement.ID + replacement.DesiredStatus = structs.AllocDesiredStatusStop + + replacements = append(replacements, nextReplacement) + } } + + replacements = append(replacements, replacement) } allocs = append(allocs, replacements...) @@ -5661,6 +5741,11 @@ func TestReconciler_Disconnected_Client(t *testing.T) { assertResults(t, results, tc.expected) for _, stopResult := range results.stop { + // Skip replacement allocs. + if !origAllocs.Contains(stopResult.alloc.ID) { + continue + } + if tc.shouldStopOnDisconnectedNode { require.Equal(t, testNode.ID, stopResult.alloc.NodeID) } else { From a87af841d42ccfb76d7b14717fb389b8c169fda0 Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Wed, 22 Mar 2023 15:30:04 -0400 Subject: [PATCH 3/7] changelog: add entry for #16609 --- .changelog/16609.txt | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 .changelog/16609.txt diff --git a/.changelog/16609.txt b/.changelog/16609.txt new file mode 100644 index 000000000000..61de306525cb --- /dev/null +++ b/.changelog/16609.txt @@ -0,0 +1,3 @@ +```release-note:bug +scheduler: Fix reconciliation of reconnecting allocs when the replacement allocations are not running +``` From 66241b86882ee870bfece14ded23229c712fadea Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Wed, 22 Mar 2023 15:47:45 -0400 Subject: [PATCH 4/7] fix reconciler test --- scheduler/reconcile_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scheduler/reconcile_test.go b/scheduler/reconcile_test.go index 7a9061b40689..544f515b7051 100644 --- a/scheduler/reconcile_test.go +++ b/scheduler/reconcile_test.go @@ -5713,7 +5713,7 @@ func TestReconciler_Disconnected_Client(t *testing.T) { nextReplacement := replacement.Copy() nextReplacement.ID = uuid.Generate() nextReplacement.ClientStatus = structs.AllocClientStatusRunning - nextReplacement.DesiredStatus = structs.AllocClientStatusRunning + nextReplacement.DesiredStatus = structs.AllocDesiredStatusRun nextReplacement.PreviousAllocation = replacement.ID replacement.NextAllocation = nextReplacement.ID From f88c15add0efbe2edaa45bc4610e3790131cd580 Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Wed, 22 Mar 2023 20:17:06 -0400 Subject: [PATCH 5/7] fix typo --- scheduler/reconcile.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scheduler/reconcile.go b/scheduler/reconcile.go index cb601baa9638..e17d09174ae2 100644 --- a/scheduler/reconcile.go +++ b/scheduler/reconcile.go @@ -432,7 +432,7 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool { // their replacements first because there is specific logic when deciding // which ones to keep that can only be applied when the client reconnects. if len(reconnecting) > 0 { - // Pass all allocations becasue the replacements we need to find may be + // Pass all allocations because the replacements we need to find may be // in any state, including themselves being reconnected. reconnect, stop := a.reconcileReconnecting(reconnecting, all) From 0b97e34fcf3ef784709b25307ce1c4015015dc59 Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Thu, 23 Mar 2023 11:26:26 -0400 Subject: [PATCH 6/7] scheduler: handle reconnecting replacements If the replacement for a reconnecting allocation is also reconnecting we need to make sure we only compare the original with the replacement, and not the other way around, otherwise the replacement may stop the original if they tie in the selection criteria. --- scheduler/reconcile.go | 37 ++++++--------- scheduler/reconcile_test.go | 89 ++++++++++++++++++++++++------------- 2 files changed, 72 insertions(+), 54 deletions(-) diff --git a/scheduler/reconcile.go b/scheduler/reconcile.go index e17d09174ae2..458a48b2545d 100644 --- a/scheduler/reconcile.go +++ b/scheduler/reconcile.go @@ -1103,32 +1103,21 @@ func (a *allocReconciler) reconcileReconnecting(reconnecting allocSet, others al continue } - // By default, prefer stopping the replacement allocation, so assume - // all non-terminal reconnecting allocations will reconnect and ajust - // later when this is not the case. - reconnect[reconnectingAlloc.ID] = reconnectingAlloc - // Find replacement allocations and decide which one to stop. A // reconnecting allocation may have multiple replacements. for _, replacementAlloc := range others { - // Skip the reconnecting allocation itself. - // Since a replacement may itslef be reconnecting, the set of all - // other allocations contains the reconnecting allocations as well. - if reconnectingAlloc.ID == replacementAlloc.ID { - continue - } - - // Skip allocations that don't have the same name. - // Replacement allocations have the same name as the original. - if reconnectingAlloc.Name != replacementAlloc.Name { - continue - } + // Skip allocations that are not a replacement of the one + // reconnecting. Replacement allocations have the same name but a + // higher CreateIndex and a different ID. + isReplacement := replacementAlloc.ID != reconnectingAlloc.ID && + replacementAlloc.Name == reconnectingAlloc.Name && + replacementAlloc.CreateIndex > reconnectingAlloc.CreateIndex // Skip allocations that are server terminal. // We don't want to replace a reconnecting allocation with one that // is or will terminate and we don't need to stop them since they // are marked as terminal by the servers. - if replacementAlloc.ServerTerminalStatus() { + if !isReplacement || replacementAlloc.ServerTerminalStatus() { continue } @@ -1144,11 +1133,6 @@ func (a *allocReconciler) reconcileReconnecting(reconnecting allocSet, others al statusDescription: allocNotNeeded, }) } - - // Our assumption that all reconnecting allocations will - // reconnect was wrong, so remove this one from the reconnect - // set. - delete(reconnect, reconnectingAlloc.ID) } else { // The reconnecting allocation is preferred, so stop this // replacement. @@ -1161,6 +1145,13 @@ func (a *allocReconciler) reconcileReconnecting(reconnecting allocSet, others al } } + // Any reconnecting allocation not set to stop must be reconnected. + for _, alloc := range successfulReconnects { + if _, ok := stop[alloc.ID]; !ok { + reconnect[alloc.ID] = alloc + } + } + return reconnect, stop } diff --git a/scheduler/reconcile_test.go b/scheduler/reconcile_test.go index 544f515b7051..b8494558c22a 100644 --- a/scheduler/reconcile_test.go +++ b/scheduler/reconcile_test.go @@ -5317,24 +5317,24 @@ func TestReconciler_Disconnected_Client(t *testing.T) { }} type testCase struct { - name string - allocCount int - disconnectedAllocCount int - jobVersionIncrement uint64 - nodeScoreIncrement float64 - disconnectedAllocStatus string - disconnectedAllocStates []*structs.AllocState - serverDesiredStatus string - isBatch bool - nodeStatusDisconnected bool - replace bool - failReplacement bool - replaceFailedReplacement bool - replacementAllocClientStatus string - replacementAllocDesiredStatus string - shouldStopOnDisconnectedNode bool - maxDisconnect *time.Duration - expected *resultExpectation + name string + allocCount int + disconnectedAllocCount int + jobVersionIncrement uint64 + nodeScoreIncrement float64 + disconnectedAllocStatus string + disconnectedAllocStates []*structs.AllocState + serverDesiredStatus string + isBatch bool + nodeStatusDisconnected bool + replace bool + failReplacement bool + taintReplacement bool + disconnectReplacement bool + replaceFailedReplacement bool + shouldStopOnDisconnectedNode bool + maxDisconnect *time.Duration + expected *resultExpectation } testCases := []testCase{ @@ -5457,11 +5457,11 @@ func TestReconciler_Disconnected_Client(t *testing.T) { name: "keep-original-alloc-and-stop-failed-replacement", allocCount: 3, replace: true, + failReplacement: true, disconnectedAllocCount: 2, disconnectedAllocStatus: structs.AllocClientStatusRunning, disconnectedAllocStates: disconnectAllocState, serverDesiredStatus: structs.AllocDesiredStatusRun, - failReplacement: true, expected: &resultExpectation{ reconnectUpdates: 2, stop: 2, @@ -5474,15 +5474,34 @@ func TestReconciler_Disconnected_Client(t *testing.T) { }, }, { - name: "keep-original-and-stop-tainted-replacement", - allocCount: 3, - replace: true, - disconnectedAllocCount: 2, - disconnectedAllocStatus: structs.AllocClientStatusRunning, - disconnectedAllocStates: disconnectAllocState, - serverDesiredStatus: structs.AllocDesiredStatusRun, - failReplacement: true, - replacementAllocClientStatus: structs.AllocClientStatusUnknown, + name: "keep-original-and-stop-reconnecting-replacement", + allocCount: 2, + replace: true, + disconnectReplacement: true, + disconnectedAllocCount: 1, + disconnectedAllocStatus: structs.AllocClientStatusRunning, + disconnectedAllocStates: disconnectAllocState, + serverDesiredStatus: structs.AllocDesiredStatusRun, + expected: &resultExpectation{ + reconnectUpdates: 1, + stop: 1, + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + "web": { + Ignore: 2, + Stop: 1, + }, + }, + }, + }, + { + name: "keep-original-and-stop-tainted-replacement", + allocCount: 3, + replace: true, + taintReplacement: true, + disconnectedAllocCount: 2, + disconnectedAllocStatus: structs.AllocClientStatusRunning, + disconnectedAllocStates: disconnectAllocState, + serverDesiredStatus: structs.AllocDesiredStatusRun, expected: &resultExpectation{ reconnectUpdates: 2, stop: 2, @@ -5538,12 +5557,12 @@ func TestReconciler_Disconnected_Client(t *testing.T) { name: "stop-original-alloc-with-old-job-version-and-failed-replacements-replaced", allocCount: 5, replace: true, + failReplacement: true, + replaceFailedReplacement: true, disconnectedAllocCount: 2, disconnectedAllocStatus: structs.AllocClientStatusRunning, disconnectedAllocStates: disconnectAllocState, serverDesiredStatus: structs.AllocDesiredStatusRun, - failReplacement: true, - replaceFailedReplacement: true, shouldStopOnDisconnectedNode: true, jobVersionIncrement: 1, expected: &resultExpectation{ @@ -5580,11 +5599,11 @@ func TestReconciler_Disconnected_Client(t *testing.T) { name: "stop-failed-original-and-failed-replacements-and-place-new", allocCount: 5, replace: true, + failReplacement: true, disconnectedAllocCount: 2, disconnectedAllocStatus: structs.AllocClientStatusFailed, disconnectedAllocStates: disconnectAllocState, serverDesiredStatus: structs.AllocDesiredStatusRun, - failReplacement: true, shouldStopOnDisconnectedNode: true, expected: &resultExpectation{ stop: 4, @@ -5696,6 +5715,7 @@ func TestReconciler_Disconnected_Client(t *testing.T) { replacement.PreviousAllocation = alloc.ID replacement.AllocStates = nil replacement.TaskStates = nil + replacement.CreateIndex += 1 alloc.NextAllocation = replacement.ID if tc.jobVersionIncrement != 0 { @@ -5704,6 +5724,12 @@ func TestReconciler_Disconnected_Client(t *testing.T) { if tc.nodeScoreIncrement != 0 { replacement.Metrics.ScoreMetaData[0].NormScore = replacement.Metrics.ScoreMetaData[0].NormScore + tc.nodeScoreIncrement } + if tc.taintReplacement { + replacement.DesiredTransition.Migrate = pointer.Of(true) + } + if tc.disconnectReplacement { + replacement.AllocStates = tc.disconnectedAllocStates + } // If we want to test intermediate replacement failures simulate that. if tc.failReplacement { @@ -5715,6 +5741,7 @@ func TestReconciler_Disconnected_Client(t *testing.T) { nextReplacement.ClientStatus = structs.AllocClientStatusRunning nextReplacement.DesiredStatus = structs.AllocDesiredStatusRun nextReplacement.PreviousAllocation = replacement.ID + nextReplacement.CreateIndex += 1 replacement.NextAllocation = nextReplacement.ID replacement.DesiredStatus = structs.AllocDesiredStatusStop From 129dda0bab86449dcd142f4540e37f15469b64ad Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Fri, 24 Mar 2023 19:01:20 -0400 Subject: [PATCH 7/7] scheduler: remove `filterByFailedReconnect` method Since we are now already iterating over the reconnecting allocations in a specific method having a separate loop to find failed allocations is unnecessary. --- scheduler/reconcile.go | 25 +++++++++++++++++-------- scheduler/reconcile_util.go | 13 ------------- 2 files changed, 17 insertions(+), 21 deletions(-) diff --git a/scheduler/reconcile.go b/scheduler/reconcile.go index 458a48b2545d..54b434eec59b 100644 --- a/scheduler/reconcile.go +++ b/scheduler/reconcile.go @@ -1078,13 +1078,21 @@ func (a *allocReconciler) reconcileReconnecting(reconnecting allocSet, others al stop := make(allocSet) reconnect := make(allocSet) - // Mark all failed reconnects for stop. - failedReconnects := reconnecting.filterByFailedReconnect() - stop = stop.union(failedReconnects) - a.markStop(failedReconnects, structs.AllocClientStatusFailed, allocRescheduled) - successfulReconnects := reconnecting.difference(failedReconnects) + for _, reconnectingAlloc := range reconnecting { + // Stop allocations that failed to reconnect. + reconnectFailed := !reconnectingAlloc.ServerTerminalStatus() && + reconnectingAlloc.ClientStatus == structs.AllocClientStatusFailed + + if reconnectFailed { + stop[reconnectingAlloc.ID] = reconnectingAlloc + a.result.stop = append(a.result.stop, allocStopResult{ + alloc: reconnectingAlloc, + clientStatus: structs.AllocClientStatusFailed, + statusDescription: allocRescheduled, + }) + continue + } - for _, reconnectingAlloc := range successfulReconnects { // If the desired status is not run, or if the user-specified desired // transition is not run, stop the reconnecting allocation. stopReconnecting := reconnectingAlloc.DesiredStatus != structs.AllocDesiredStatusRun || @@ -1106,6 +1114,7 @@ func (a *allocReconciler) reconcileReconnecting(reconnecting allocSet, others al // Find replacement allocations and decide which one to stop. A // reconnecting allocation may have multiple replacements. for _, replacementAlloc := range others { + // Skip allocations that are not a replacement of the one // reconnecting. Replacement allocations have the same name but a // higher CreateIndex and a different ID. @@ -1116,7 +1125,7 @@ func (a *allocReconciler) reconcileReconnecting(reconnecting allocSet, others al // Skip allocations that are server terminal. // We don't want to replace a reconnecting allocation with one that // is or will terminate and we don't need to stop them since they - // are marked as terminal by the servers. + // are already marked as terminal by the servers. if !isReplacement || replacementAlloc.ServerTerminalStatus() { continue } @@ -1146,7 +1155,7 @@ func (a *allocReconciler) reconcileReconnecting(reconnecting allocSet, others al } // Any reconnecting allocation not set to stop must be reconnected. - for _, alloc := range successfulReconnects { + for _, alloc := range reconnecting { if _, ok := stop[alloc.ID]; !ok { reconnect[alloc.ID] = alloc } diff --git a/scheduler/reconcile_util.go b/scheduler/reconcile_util.go index 42e8fb8cd6b5..c080481b0c87 100644 --- a/scheduler/reconcile_util.go +++ b/scheduler/reconcile_util.go @@ -520,19 +520,6 @@ func (a allocSet) filterByDeployment(id string) (match, nonmatch allocSet) { return } -// filterByFailedReconnect filters allocation into a set that have failed on the -// client but do not have a terminal status at the server so that they can be -// marked as stop at the server. -func (a allocSet) filterByFailedReconnect() allocSet { - failed := make(allocSet) - for _, alloc := range a { - if !alloc.ServerTerminalStatus() && alloc.ClientStatus == structs.AllocClientStatusFailed { - failed[alloc.ID] = alloc - } - } - return failed -} - // delayByStopAfterClientDisconnect returns a delay for any lost allocation that's got a // stop_after_client_disconnect configured func (a allocSet) delayByStopAfterClientDisconnect() (later []*delayedRescheduleInfo) {