From f91bf84e12de0fa432dbb658d5723f959c83de16 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Thu, 13 Apr 2023 08:55:28 -0400 Subject: [PATCH] drain: use client status to determine drain is complete (#14348) If an allocation is slow to stop because of `kill_timeout` or `shutdown_delay`, the node drain is marked as complete prematurely, even though drain monitoring will continue to report allocation migrations. This impacts the UI or API clients that monitor node draining to shut down nodes. This changeset updates the behavior to wait until the client status of all drained allocs are terminal before marking the node as done draining. --- .changelog/14348.txt | 3 + e2e/nodedrain/input/drain_killtimeout.nomad | 43 ++ e2e/nodedrain/node_drain_test.go | 57 +++ nomad/drainer/draining_node.go | 2 +- nomad/drainer/watch_jobs.go | 10 +- nomad/drainer/watch_jobs_test.go | 432 +++++++++++--------- nomad/drainer_int_test.go | 31 +- 7 files changed, 362 insertions(+), 216 deletions(-) create mode 100644 .changelog/14348.txt create mode 100644 e2e/nodedrain/input/drain_killtimeout.nomad diff --git a/.changelog/14348.txt b/.changelog/14348.txt new file mode 100644 index 000000000000..01d5cf4d2a37 --- /dev/null +++ b/.changelog/14348.txt @@ -0,0 +1,3 @@ +```release-note:bug +drain: Fixed a bug where drains would complete based on the server status and not the client status of an allocation +``` diff --git a/e2e/nodedrain/input/drain_killtimeout.nomad b/e2e/nodedrain/input/drain_killtimeout.nomad new file mode 100644 index 000000000000..88f3b091f8f6 --- /dev/null +++ b/e2e/nodedrain/input/drain_killtimeout.nomad @@ -0,0 +1,43 @@ +# Copyright (c) HashiCorp, Inc. +# SPDX-License-Identifier: MPL-2.0 + +job "drain_killtimeout" { + + constraint { + attribute = "${attr.kernel.name}" + value = "linux" + } + + group "group" { + + task "task" { + driver = "docker" + + kill_timeout = "30s" # matches the agent's max_kill_timeout + + config { + image = "busybox:1" + command = "/bin/sh" + args = ["local/script.sh"] + } + + # this job traps SIGINT so that we can assert that we've forced the drain + # to wait until the client status has been updated + template { + data = < lastHandledIndex { result.migrated = append(result.migrated, alloc) @@ -385,8 +385,8 @@ func handleTaskGroup(snap *state.StateSnapshot, batch bool, tg *structs.TaskGrou // An alloc can't be considered for migration if: // - It isn't on a draining node - // - It is already terminal - if !onDrainingNode || alloc.TerminalStatus() { + // - It is already terminal on the client + if !onDrainingNode || alloc.ClientTerminalStatus() { continue } diff --git a/nomad/drainer/watch_jobs_test.go b/nomad/drainer/watch_jobs_test.go index 7029eb114cbb..562b2cb0af27 100644 --- a/nomad/drainer/watch_jobs_test.go +++ b/nomad/drainer/watch_jobs_test.go @@ -8,6 +8,11 @@ import ( "testing" "time" + "github.com/shoenig/test" + "github.com/shoenig/test/must" + "github.com/stretchr/testify/require" + "golang.org/x/time/rate" + "github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/helper/pointer" "github.com/hashicorp/nomad/helper/testlog" @@ -15,9 +20,6 @@ import ( "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "golang.org/x/time/rate" ) func testNodes(t *testing.T, state *state.StateStore) (drainingNode, runningNode *structs.Node) { @@ -106,12 +108,11 @@ func assertJobWatcherOps(t *testing.T, jw DrainingJobWatcher, drained, migrated // allocation changes from multiple jobs. func TestDrainingJobWatcher_DrainJobs(t *testing.T) { ci.Parallel(t) - require := require.New(t) - state := state.TestStateStore(t) - jobWatcher, cancelWatcher := testDrainingJobWatcher(t, state) + store := state.TestStateStore(t) + jobWatcher, cancelWatcher := testDrainingJobWatcher(t, store) defer cancelWatcher() - drainingNode, runningNode := testNodes(t, state) + drainingNode, runningNode := testNodes(t, store) var index uint64 = 101 count := 8 @@ -134,7 +135,7 @@ func TestDrainingJobWatcher_DrainJobs(t *testing.T) { jnss[i] = structs.NamespacedID{Namespace: job.Namespace, ID: job.ID} job.TaskGroups[0].Migrate.MaxParallel = 3 job.TaskGroups[0].Count = count - require.Nil(state.UpsertJob(structs.MsgTypeTestSetup, index, nil, job)) + must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, index, nil, job)) index++ var allocs []*structs.Allocation @@ -146,7 +147,7 @@ func TestDrainingJobWatcher_DrainJobs(t *testing.T) { allocs = append(allocs, a) } - require.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, index, allocs)) + must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, index, allocs)) index++ } @@ -168,7 +169,7 @@ func TestDrainingJobWatcher_DrainJobs(t *testing.T) { // create a copy so we can reuse this slice drainedAllocs[i] = a.Copy() } - require.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, index, drainedAllocs)) + must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, index, drainedAllocs)) drains.Resp.Respond(index, nil) index++ @@ -195,7 +196,21 @@ func TestDrainingJobWatcher_DrainJobs(t *testing.T) { updates = append(updates, a, replacement) replacements[i] = replacement.Copy() } - require.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, index, updates)) + must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, index, updates)) + index++ + + // The drained allocs stopping cause migrations but no new drains + // because the replacements have not started + assertJobWatcherOps(t, jobWatcher, 0, 0) + + // Client sends stop on these allocs + completeAllocs := make([]*structs.Allocation, len(drainedAllocs)) + for i, a := range drainedAllocs { + a = a.Copy() + a.ClientStatus = structs.AllocClientStatusComplete + completeAllocs[i] = a + } + must.NoError(t, store.UpdateAllocsFromClient(structs.MsgTypeTestSetup, index, completeAllocs)) index++ // The drained allocs stopping cause migrations but no new drains @@ -209,10 +224,10 @@ func TestDrainingJobWatcher_DrainJobs(t *testing.T) { Healthy: pointer.Of(true), } } - require.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, index, replacements)) + must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, index, replacements)) index++ - require.NotEmpty(jobWatcher.drainingJobs()) + must.MapNotEmpty(t, jobWatcher.drainingJobs()) // 6 new drains drains, _ = assertJobWatcherOps(t, jobWatcher, 6, 0) @@ -225,7 +240,7 @@ func TestDrainingJobWatcher_DrainJobs(t *testing.T) { // create a copy so we can reuse this slice drainedAllocs[i] = a.Copy() } - require.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, index, drainedAllocs)) + must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, index, drainedAllocs)) drains.Resp.Respond(index, nil) index++ @@ -236,12 +251,13 @@ func TestDrainingJobWatcher_DrainJobs(t *testing.T) { for i, a := range drainedAllocs { a.DesiredTransition.Migrate = nil a.DesiredStatus = structs.AllocDesiredStatusStop + a.ClientStatus = structs.AllocClientStatusComplete replacement := newAlloc(runningNode, a.Job) updates = append(updates, a, replacement) replacements[i] = replacement.Copy() } - require.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, index, updates)) + must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, index, updates)) index++ assertJobWatcherOps(t, jobWatcher, 0, 6) @@ -252,10 +268,10 @@ func TestDrainingJobWatcher_DrainJobs(t *testing.T) { Healthy: pointer.Of(true), } } - require.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, index, replacements)) + must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, index, replacements)) index++ - require.NotEmpty(jobWatcher.drainingJobs()) + must.MapNotEmpty(t, jobWatcher.drainingJobs()) // Final 4 new drains drains, _ = assertJobWatcherOps(t, jobWatcher, 4, 0) @@ -268,7 +284,7 @@ func TestDrainingJobWatcher_DrainJobs(t *testing.T) { // create a copy so we can reuse this slice drainedAllocs[i] = a.Copy() } - require.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, index, drainedAllocs)) + must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, index, drainedAllocs)) drains.Resp.Respond(index, nil) index++ @@ -279,12 +295,13 @@ func TestDrainingJobWatcher_DrainJobs(t *testing.T) { for i, a := range drainedAllocs { a.DesiredTransition.Migrate = nil a.DesiredStatus = structs.AllocDesiredStatusStop + a.ClientStatus = structs.AllocClientStatusComplete replacement := newAlloc(runningNode, a.Job) updates = append(updates, a, replacement) replacements[i] = replacement.Copy() } - require.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, index, updates)) + must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, index, updates)) index++ assertJobWatcherOps(t, jobWatcher, 0, 4) @@ -295,70 +312,55 @@ func TestDrainingJobWatcher_DrainJobs(t *testing.T) { Healthy: pointer.Of(true), } } - require.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, index, replacements)) + must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, index, replacements)) // No jobs should be left! - require.Empty(jobWatcher.drainingJobs()) + must.MapEmpty(t, jobWatcher.drainingJobs()) } -// DrainingJobWatcher tests: -// TODO Test that the watcher cancels its query when a new job is registered - -// handleTaskGroupTestCase is the test case struct for TestHandleTaskGroup -// -// Two nodes will be initialized: one draining and one running. -type handleTaskGroupTestCase struct { - // Name of test - Name string - - // Batch uses a batch job and alloc - Batch bool - - // Expectations - ExpectedDrained int - ExpectedMigrated int - ExpectedDone bool - - // Count overrides the default count of 10 if set - Count int - - // MaxParallel overrides the default max_parallel of 1 if set - MaxParallel int +// TestDrainingJobWatcher_HandleTaskGroup tests that the watcher handles +// allocation updates as expected. +func TestDrainingJobWatcher_HandleTaskGroup(t *testing.T) { + ci.Parallel(t) - // AddAlloc will be called 10 times to create test allocs - // - // Allocs default to be healthy on the draining node - AddAlloc func(i int, a *structs.Allocation, drainingID, runningID string) -} + testCases := []struct { + name string + batch bool // use a batch job + allocCount int // number of allocs in test (defaults to 10) + maxParallel int // max_parallel (defaults to 1) -func TestHandeTaskGroup_Table(t *testing.T) { - ci.Parallel(t) + // addAllocFn will be called allocCount times to create test allocs, + // and the allocs default to be healthy on the draining node + addAllocFn func(idx int, a *structs.Allocation, drainingID, runningID string) - cases := []handleTaskGroupTestCase{ + expectDrained int + expectMigrated int + expectDone bool + }{ { - // All allocs on draining node - Name: "AllDraining", - ExpectedDrained: 1, - ExpectedMigrated: 0, - ExpectedDone: false, + // all allocs on draining node, should respect max_parallel=1 + name: "drain-respects-max-parallel-1", + expectDrained: 1, + expectMigrated: 0, + expectDone: false, }, { - // All allocs on non-draining node - Name: "AllNonDraining", - ExpectedDrained: 0, - ExpectedMigrated: 0, - ExpectedDone: true, - AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) { + // allocs on a non-draining node, should not be drained + name: "allocs-on-non-draining-node-should-not-drain", + expectDrained: 0, + expectMigrated: 0, + expectDone: true, + addAllocFn: func(i int, a *structs.Allocation, drainingID, runningID string) { a.NodeID = runningID }, }, { - // Some allocs on non-draining node but not healthy - Name: "SomeNonDrainingUnhealthy", - ExpectedDrained: 0, - ExpectedMigrated: 0, - ExpectedDone: false, - AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) { + // even unhealthy allocs on a non-draining node should not be drained + name: "unhealthy-allocs-on-non-draining-node-should-not-drain", + expectDrained: 0, + expectMigrated: 0, + expectDone: false, + addAllocFn: func(i int, a *structs.Allocation, drainingID, runningID string) { if i%2 == 0 { a.NodeID = runningID a.DeploymentStatus = nil @@ -366,24 +368,24 @@ func TestHandeTaskGroup_Table(t *testing.T) { }, }, { - // One draining, other allocs on non-draining node and healthy - Name: "OneDraining", - ExpectedDrained: 1, - ExpectedMigrated: 0, - ExpectedDone: false, - AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) { + // only the alloc on draining node should be drained + name: "healthy-alloc-draining-node-should-drain", + expectDrained: 1, + expectMigrated: 0, + expectDone: false, + addAllocFn: func(i int, a *structs.Allocation, drainingID, runningID string) { if i != 0 { a.NodeID = runningID } }, }, { - // One already draining, other allocs on non-draining node and healthy - Name: "OneAlreadyDraining", - ExpectedDrained: 0, - ExpectedMigrated: 0, - ExpectedDone: false, - AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) { + // alloc that's still draining doesn't produce more result updates + name: "still-draining-alloc-no-new-updates", + expectDrained: 0, + expectMigrated: 0, + expectDone: false, + addAllocFn: func(i int, a *structs.Allocation, drainingID, runningID string) { if i == 0 { a.DesiredTransition.Migrate = pointer.Of(true) return @@ -392,77 +394,97 @@ func TestHandeTaskGroup_Table(t *testing.T) { }, }, { - // One already drained, other allocs on non-draining node and healthy - Name: "OneAlreadyDrained", - ExpectedDrained: 0, - ExpectedMigrated: 1, - ExpectedDone: true, - AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) { + // alloc that's finished draining gets marked as migrated + name: "client-terminal-alloc-drain-should-be-finished", + expectDrained: 0, + expectMigrated: 1, + expectDone: true, + addAllocFn: func(i int, a *structs.Allocation, drainingID, runningID string) { if i == 0 { a.DesiredStatus = structs.AllocDesiredStatusStop + a.ClientStatus = structs.AllocClientStatusComplete return } a.NodeID = runningID }, }, { - // One already drained, other allocs on non-draining node and healthy - Name: "OneAlreadyDrainedBatched", - Batch: true, - ExpectedDrained: 0, - ExpectedMigrated: 1, - ExpectedDone: true, - AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) { + // batch alloc that's finished draining gets marked as migrated + name: "client-terminal-batch-alloc-drain-should-be-finished", + batch: true, + expectDrained: 0, + expectMigrated: 1, + expectDone: true, + addAllocFn: func(i int, a *structs.Allocation, drainingID, runningID string) { if i == 0 { a.DesiredStatus = structs.AllocDesiredStatusStop + a.ClientStatus = structs.AllocClientStatusComplete return } a.NodeID = runningID }, }, { - // All allocs are terminl, nothing to be drained - Name: "AllMigrating", - ExpectedDrained: 0, - ExpectedMigrated: 10, - ExpectedDone: true, - AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) { + // all allocs are client-terminal, so nothing left to drain + name: "all-client-terminal-drain-should-be-finished", + expectDrained: 0, + expectMigrated: 10, + expectDone: true, + addAllocFn: func(i int, a *structs.Allocation, drainingID, runningID string) { + a.DesiredStatus = structs.AllocDesiredStatusStop + a.ClientStatus = structs.AllocClientStatusComplete + }, + }, + { + // all allocs are terminal, but only half are client-terminal + name: "half-client-terminal-drain-should-not-be-finished", + expectDrained: 0, + expectMigrated: 5, + expectDone: false, + addAllocFn: func(i int, a *structs.Allocation, drainingID, runningID string) { a.DesiredStatus = structs.AllocDesiredStatusStop + if i%2 == 0 { + a.ClientStatus = structs.AllocClientStatusComplete + } }, }, { - // All allocs are terminl, nothing to be drained - Name: "AllMigratingBatch", - Batch: true, - ExpectedDrained: 0, - ExpectedMigrated: 10, - ExpectedDone: true, - AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) { + // All allocs are terminal, nothing to be drained + name: "all-terminal-batch", + batch: true, + expectDrained: 0, + expectMigrated: 10, + expectDone: true, + addAllocFn: func(i int, a *structs.Allocation, drainingID, runningID string) { a.DesiredStatus = structs.AllocDesiredStatusStop + a.ClientStatus = structs.AllocClientStatusComplete }, }, { - // All allocs may be drained at once - Name: "AllAtOnce", - ExpectedDrained: 10, - ExpectedMigrated: 0, - ExpectedDone: false, - MaxParallel: 10, + // with max_parallel=10, all allocs can be drained at once + name: "drain-respects-max-parallel-all-at-once", + expectDrained: 10, + expectMigrated: 0, + expectDone: false, + maxParallel: 10, }, { - // Drain 2 - Name: "Drain2", - ExpectedDrained: 2, - ExpectedMigrated: 0, - ExpectedDone: false, - MaxParallel: 2, + // with max_parallel=2, up to 2 allocs can be drained at a time + name: "drain-respects-max-parallel-2", + expectDrained: 2, + expectMigrated: 0, + expectDone: false, + maxParallel: 2, }, { - // One on new node, one drained, and one draining - ExpectedDrained: 1, - ExpectedMigrated: 1, - MaxParallel: 2, - AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) { + // with max_parallel=2, up to 2 allocs can be drained at a time but + // we haven't yet informed the drainer that 1 has completed + // migrating + name: "notify-migrated-1-on-new-1-drained-1-draining", + expectDrained: 1, + expectMigrated: 1, + maxParallel: 2, + addAllocFn: func(i int, a *structs.Allocation, drainingID, runningID string) { switch i { case 0: // One alloc on running node @@ -470,44 +492,55 @@ func TestHandeTaskGroup_Table(t *testing.T) { case 1: // One alloc already migrated a.DesiredStatus = structs.AllocDesiredStatusStop + a.ClientStatus = structs.AllocClientStatusComplete } }, }, { - // 8 on new node, one drained, and one draining - ExpectedDrained: 1, - ExpectedMigrated: 1, - MaxParallel: 2, - AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) { + // with max_parallel=2, up to 2 allocs can be drained at a time but + // we haven't yet informed the drainer that 1 has completed + // migrating + name: "notify-migrated-8-on-new-1-drained-1-draining", + expectDrained: 1, + expectMigrated: 1, + maxParallel: 2, + addAllocFn: func(i int, a *structs.Allocation, drainingID, runningID string) { switch i { case 0, 1, 2, 3, 4, 5, 6, 7: a.NodeID = runningID case 8: a.DesiredStatus = structs.AllocDesiredStatusStop + a.ClientStatus = structs.AllocClientStatusComplete } }, }, { // 5 on new node, two drained, and three draining - ExpectedDrained: 3, - ExpectedMigrated: 2, - MaxParallel: 5, - AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) { + // with max_parallel=5, up to 5 allocs can be drained at a time but + // we haven't yet informed the drainer that 2 have completed + // migrating + name: "notify-migrated-5-on-new-2-drained-3-draining", + expectDrained: 3, + expectMigrated: 2, + maxParallel: 5, + addAllocFn: func(i int, a *structs.Allocation, drainingID, runningID string) { switch i { case 0, 1, 2, 3, 4: a.NodeID = runningID case 8, 9: a.DesiredStatus = structs.AllocDesiredStatusStop + a.ClientStatus = structs.AllocClientStatusComplete } }, }, { - // Not all on new node have health set - Name: "PendingHealth", - ExpectedDrained: 1, - ExpectedMigrated: 1, - MaxParallel: 3, - AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) { + // half the allocs have been moved to the new node but 1 doesn't + // have health set yet, so we should have MaxParallel - 1 in flight + name: "pending-health-blocks", + expectDrained: 1, + expectMigrated: 1, + maxParallel: 3, + addAllocFn: func(i int, a *structs.Allocation, drainingID, runningID string) { switch i { case 0: // Deployment status UNset for 1 on new node @@ -518,16 +551,18 @@ func TestHandeTaskGroup_Table(t *testing.T) { a.NodeID = runningID case 9: a.DesiredStatus = structs.AllocDesiredStatusStop + a.ClientStatus = structs.AllocClientStatusComplete } }, }, { - // 5 max parallel - 1 migrating - 2 with unset health = 2 drainable - Name: "PendingHealthHigherMax", - ExpectedDrained: 2, - ExpectedMigrated: 1, - MaxParallel: 5, - AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) { + // half the allocs have been moved to the new node but 2 don't have + // health set yet, so we should have MaxParallel - 2 in flight + name: "pending-health-blocks-higher-max", + expectDrained: 2, + expectMigrated: 1, + maxParallel: 5, + addAllocFn: func(i int, a *structs.Allocation, drainingID, runningID string) { switch i { case 0, 1: // Deployment status UNset for 2 on new node @@ -538,73 +573,66 @@ func TestHandeTaskGroup_Table(t *testing.T) { a.NodeID = runningID case 9: a.DesiredStatus = structs.AllocDesiredStatusStop + a.ClientStatus = structs.AllocClientStatusComplete } }, }, } - for _, testCase := range cases { - t.Run(testCase.Name, func(t *testing.T) { - testHandleTaskGroup(t, testCase) - }) - } -} + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + ci.Parallel(t) -func testHandleTaskGroup(t *testing.T, tc handleTaskGroupTestCase) { - ci.Parallel(t) + // Create nodes + store := state.TestStateStore(t) + drainingNode, runningNode := testNodes(t, store) - require := require.New(t) - assert := assert.New(t) - - // Create nodes - state := state.TestStateStore(t) - drainingNode, runningNode := testNodes(t, state) + job := mock.Job() + if tc.batch { + job = mock.BatchJob() + } + job.TaskGroups[0].Count = 10 + if tc.allocCount > 0 { + job.TaskGroups[0].Count = tc.allocCount + } + if tc.maxParallel > 0 { + job.TaskGroups[0].Migrate.MaxParallel = tc.maxParallel + } + must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, 102, nil, job)) - job := mock.Job() - if tc.Batch { - job = mock.BatchJob() - } - job.TaskGroups[0].Count = 10 - if tc.Count > 0 { - job.TaskGroups[0].Count = tc.Count - } - if tc.MaxParallel > 0 { - job.TaskGroups[0].Migrate.MaxParallel = tc.MaxParallel - } - require.Nil(state.UpsertJob(structs.MsgTypeTestSetup, 102, nil, job)) + var allocs []*structs.Allocation + for i := 0; i < 10; i++ { + a := mock.Alloc() + if tc.batch { + a = mock.BatchAlloc() + } + a.JobID = job.ID + a.Job = job + a.TaskGroup = job.TaskGroups[0].Name + + // Default to being healthy on the draining node + a.NodeID = drainingNode.ID + a.DeploymentStatus = &structs.AllocDeploymentStatus{ + Healthy: pointer.Of(true), + } + if tc.addAllocFn != nil { + tc.addAllocFn(i, a, drainingNode.ID, runningNode.ID) + } + allocs = append(allocs, a) + } - var allocs []*structs.Allocation - for i := 0; i < 10; i++ { - a := mock.Alloc() - if tc.Batch { - a = mock.BatchAlloc() - } - a.JobID = job.ID - a.Job = job - a.TaskGroup = job.TaskGroups[0].Name + must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, 103, allocs)) + snap, err := store.Snapshot() + must.NoError(t, err) - // Default to being healthy on the draining node - a.NodeID = drainingNode.ID - a.DeploymentStatus = &structs.AllocDeploymentStatus{ - Healthy: pointer.Of(true), - } - if tc.AddAlloc != nil { - tc.AddAlloc(i, a, drainingNode.ID, runningNode.ID) - } - allocs = append(allocs, a) + res := newJobResult() + must.NoError(t, handleTaskGroup(snap, tc.batch, job.TaskGroups[0], allocs, 102, res)) + test.Len(t, tc.expectDrained, res.drain, test.Sprint("expected drained allocs")) + test.Len(t, tc.expectMigrated, res.migrated, test.Sprint("expected migrated allocs")) + test.Eq(t, tc.expectDone, res.done) + }) } - - require.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, 103, allocs)) - snap, err := state.Snapshot() - require.Nil(err) - - res := newJobResult() - require.Nil(handleTaskGroup(snap, tc.Batch, job.TaskGroups[0], allocs, 102, res)) - assert.Lenf(res.drain, tc.ExpectedDrained, "Drain expected %d but found: %d", - tc.ExpectedDrained, len(res.drain)) - assert.Lenf(res.migrated, tc.ExpectedMigrated, "Migrate expected %d but found: %d", - tc.ExpectedMigrated, len(res.migrated)) - assert.Equal(tc.ExpectedDone, res.done) } func TestHandleTaskGroup_Migrations(t *testing.T) { @@ -638,6 +666,7 @@ func TestHandleTaskGroup_Migrations(t *testing.T) { if i%2 == 0 { a.DesiredStatus = structs.AllocDesiredStatusStop + a.ClientStatus = structs.AllocClientStatusComplete } else { a.ClientStatus = structs.AllocClientStatusFailed } @@ -707,6 +736,7 @@ func TestHandleTaskGroup_GarbageCollectedNode(t *testing.T) { if i%2 == 0 { a.DesiredStatus = structs.AllocDesiredStatusStop + a.ClientStatus = structs.AllocClientStatusComplete } else { a.ClientStatus = structs.AllocClientStatusFailed } diff --git a/nomad/drainer_int_test.go b/nomad/drainer_int_test.go index 76d70e7de1a8..4a9dc49d4f2a 100644 --- a/nomad/drainer_int_test.go +++ b/nomad/drainer_int_test.go @@ -56,16 +56,29 @@ func allocClientStateSimulator(t *testing.T, errCh chan<- error, ctx context.Con continue } - if alloc.DeploymentStatus.HasHealth() { - continue // only update to healthy once - } - newAlloc := alloc.Copy() - newAlloc.DeploymentStatus = &structs.AllocDeploymentStatus{ - Healthy: pointer.Of(true), - Timestamp: now, + switch alloc.DesiredStatus { + case structs.AllocDesiredStatusRun: + if alloc.DeploymentStatus.HasHealth() { + continue // only update to healthy once + } + newAlloc := alloc.Copy() + newAlloc.DeploymentStatus = &structs.AllocDeploymentStatus{ + Healthy: pointer.Of(true), + Timestamp: now, + } + updates = append(updates, newAlloc) + logger.Trace("marking deployment health for alloc", "alloc_id", alloc.ID) + + case structs.AllocDesiredStatusStop, structs.AllocDesiredStatusEvict: + if alloc.ClientStatus == structs.AllocClientStatusComplete { + continue // only update to complete once + } + newAlloc := alloc.Copy() + newAlloc.ClientStatus = structs.AllocClientStatusComplete + updates = append(updates, newAlloc) + logger.Trace("marking alloc complete", "alloc_id", alloc.ID) } - updates = append(updates, newAlloc) - logger.Trace("marking deployment health for alloc", "alloc_id", alloc.ID) + } if len(updates) == 0 {