diff --git a/e2e/nodedrain/input/drain_killtimeout.nomad b/e2e/nodedrain/input/drain_killtimeout.nomad new file mode 100644 index 000000000000..88f3b091f8f6 --- /dev/null +++ b/e2e/nodedrain/input/drain_killtimeout.nomad @@ -0,0 +1,43 @@ +# Copyright (c) HashiCorp, Inc. +# SPDX-License-Identifier: MPL-2.0 + +job "drain_killtimeout" { + + constraint { + attribute = "${attr.kernel.name}" + value = "linux" + } + + group "group" { + + task "task" { + driver = "docker" + + kill_timeout = "30s" # matches the agent's max_kill_timeout + + config { + image = "busybox:1" + command = "/bin/sh" + args = ["local/script.sh"] + } + + # this job traps SIGINT so that we can assert that we've forced the drain + # to wait until the client status has been updated + template { + data = < lastHandledIndex { result.migrated = append(result.migrated, alloc) @@ -385,8 +385,8 @@ func handleTaskGroup(snap *state.StateSnapshot, batch bool, tg *structs.TaskGrou // An alloc can't be considered for migration if: // - It isn't on a draining node - // - It is already terminal - if !onDrainingNode || alloc.TerminalStatus() { + // - It is already terminal on the client + if !onDrainingNode || alloc.ClientTerminalStatus() { continue } diff --git a/nomad/drainer/watch_jobs_test.go b/nomad/drainer/watch_jobs_test.go index 7029eb114cbb..562b2cb0af27 100644 --- a/nomad/drainer/watch_jobs_test.go +++ b/nomad/drainer/watch_jobs_test.go @@ -8,6 +8,11 @@ import ( "testing" "time" + "github.com/shoenig/test" + "github.com/shoenig/test/must" + "github.com/stretchr/testify/require" + "golang.org/x/time/rate" + "github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/helper/pointer" "github.com/hashicorp/nomad/helper/testlog" @@ -15,9 +20,6 @@ import ( "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "golang.org/x/time/rate" ) func testNodes(t *testing.T, state *state.StateStore) (drainingNode, runningNode *structs.Node) { @@ -106,12 +108,11 @@ func assertJobWatcherOps(t *testing.T, jw DrainingJobWatcher, drained, migrated // allocation changes from multiple jobs. func TestDrainingJobWatcher_DrainJobs(t *testing.T) { ci.Parallel(t) - require := require.New(t) - state := state.TestStateStore(t) - jobWatcher, cancelWatcher := testDrainingJobWatcher(t, state) + store := state.TestStateStore(t) + jobWatcher, cancelWatcher := testDrainingJobWatcher(t, store) defer cancelWatcher() - drainingNode, runningNode := testNodes(t, state) + drainingNode, runningNode := testNodes(t, store) var index uint64 = 101 count := 8 @@ -134,7 +135,7 @@ func TestDrainingJobWatcher_DrainJobs(t *testing.T) { jnss[i] = structs.NamespacedID{Namespace: job.Namespace, ID: job.ID} job.TaskGroups[0].Migrate.MaxParallel = 3 job.TaskGroups[0].Count = count - require.Nil(state.UpsertJob(structs.MsgTypeTestSetup, index, nil, job)) + must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, index, nil, job)) index++ var allocs []*structs.Allocation @@ -146,7 +147,7 @@ func TestDrainingJobWatcher_DrainJobs(t *testing.T) { allocs = append(allocs, a) } - require.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, index, allocs)) + must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, index, allocs)) index++ } @@ -168,7 +169,7 @@ func TestDrainingJobWatcher_DrainJobs(t *testing.T) { // create a copy so we can reuse this slice drainedAllocs[i] = a.Copy() } - require.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, index, drainedAllocs)) + must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, index, drainedAllocs)) drains.Resp.Respond(index, nil) index++ @@ -195,7 +196,21 @@ func TestDrainingJobWatcher_DrainJobs(t *testing.T) { updates = append(updates, a, replacement) replacements[i] = replacement.Copy() } - require.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, index, updates)) + must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, index, updates)) + index++ + + // The drained allocs stopping cause migrations but no new drains + // because the replacements have not started + assertJobWatcherOps(t, jobWatcher, 0, 0) + + // Client sends stop on these allocs + completeAllocs := make([]*structs.Allocation, len(drainedAllocs)) + for i, a := range drainedAllocs { + a = a.Copy() + a.ClientStatus = structs.AllocClientStatusComplete + completeAllocs[i] = a + } + must.NoError(t, store.UpdateAllocsFromClient(structs.MsgTypeTestSetup, index, completeAllocs)) index++ // The drained allocs stopping cause migrations but no new drains @@ -209,10 +224,10 @@ func TestDrainingJobWatcher_DrainJobs(t *testing.T) { Healthy: pointer.Of(true), } } - require.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, index, replacements)) + must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, index, replacements)) index++ - require.NotEmpty(jobWatcher.drainingJobs()) + must.MapNotEmpty(t, jobWatcher.drainingJobs()) // 6 new drains drains, _ = assertJobWatcherOps(t, jobWatcher, 6, 0) @@ -225,7 +240,7 @@ func TestDrainingJobWatcher_DrainJobs(t *testing.T) { // create a copy so we can reuse this slice drainedAllocs[i] = a.Copy() } - require.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, index, drainedAllocs)) + must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, index, drainedAllocs)) drains.Resp.Respond(index, nil) index++ @@ -236,12 +251,13 @@ func TestDrainingJobWatcher_DrainJobs(t *testing.T) { for i, a := range drainedAllocs { a.DesiredTransition.Migrate = nil a.DesiredStatus = structs.AllocDesiredStatusStop + a.ClientStatus = structs.AllocClientStatusComplete replacement := newAlloc(runningNode, a.Job) updates = append(updates, a, replacement) replacements[i] = replacement.Copy() } - require.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, index, updates)) + must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, index, updates)) index++ assertJobWatcherOps(t, jobWatcher, 0, 6) @@ -252,10 +268,10 @@ func TestDrainingJobWatcher_DrainJobs(t *testing.T) { Healthy: pointer.Of(true), } } - require.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, index, replacements)) + must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, index, replacements)) index++ - require.NotEmpty(jobWatcher.drainingJobs()) + must.MapNotEmpty(t, jobWatcher.drainingJobs()) // Final 4 new drains drains, _ = assertJobWatcherOps(t, jobWatcher, 4, 0) @@ -268,7 +284,7 @@ func TestDrainingJobWatcher_DrainJobs(t *testing.T) { // create a copy so we can reuse this slice drainedAllocs[i] = a.Copy() } - require.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, index, drainedAllocs)) + must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, index, drainedAllocs)) drains.Resp.Respond(index, nil) index++ @@ -279,12 +295,13 @@ func TestDrainingJobWatcher_DrainJobs(t *testing.T) { for i, a := range drainedAllocs { a.DesiredTransition.Migrate = nil a.DesiredStatus = structs.AllocDesiredStatusStop + a.ClientStatus = structs.AllocClientStatusComplete replacement := newAlloc(runningNode, a.Job) updates = append(updates, a, replacement) replacements[i] = replacement.Copy() } - require.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, index, updates)) + must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, index, updates)) index++ assertJobWatcherOps(t, jobWatcher, 0, 4) @@ -295,70 +312,55 @@ func TestDrainingJobWatcher_DrainJobs(t *testing.T) { Healthy: pointer.Of(true), } } - require.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, index, replacements)) + must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, index, replacements)) // No jobs should be left! - require.Empty(jobWatcher.drainingJobs()) + must.MapEmpty(t, jobWatcher.drainingJobs()) } -// DrainingJobWatcher tests: -// TODO Test that the watcher cancels its query when a new job is registered - -// handleTaskGroupTestCase is the test case struct for TestHandleTaskGroup -// -// Two nodes will be initialized: one draining and one running. -type handleTaskGroupTestCase struct { - // Name of test - Name string - - // Batch uses a batch job and alloc - Batch bool - - // Expectations - ExpectedDrained int - ExpectedMigrated int - ExpectedDone bool - - // Count overrides the default count of 10 if set - Count int - - // MaxParallel overrides the default max_parallel of 1 if set - MaxParallel int +// TestDrainingJobWatcher_HandleTaskGroup tests that the watcher handles +// allocation updates as expected. +func TestDrainingJobWatcher_HandleTaskGroup(t *testing.T) { + ci.Parallel(t) - // AddAlloc will be called 10 times to create test allocs - // - // Allocs default to be healthy on the draining node - AddAlloc func(i int, a *structs.Allocation, drainingID, runningID string) -} + testCases := []struct { + name string + batch bool // use a batch job + allocCount int // number of allocs in test (defaults to 10) + maxParallel int // max_parallel (defaults to 1) -func TestHandeTaskGroup_Table(t *testing.T) { - ci.Parallel(t) + // addAllocFn will be called allocCount times to create test allocs, + // and the allocs default to be healthy on the draining node + addAllocFn func(idx int, a *structs.Allocation, drainingID, runningID string) - cases := []handleTaskGroupTestCase{ + expectDrained int + expectMigrated int + expectDone bool + }{ { - // All allocs on draining node - Name: "AllDraining", - ExpectedDrained: 1, - ExpectedMigrated: 0, - ExpectedDone: false, + // all allocs on draining node, should respect max_parallel=1 + name: "drain-respects-max-parallel-1", + expectDrained: 1, + expectMigrated: 0, + expectDone: false, }, { - // All allocs on non-draining node - Name: "AllNonDraining", - ExpectedDrained: 0, - ExpectedMigrated: 0, - ExpectedDone: true, - AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) { + // allocs on a non-draining node, should not be drained + name: "allocs-on-non-draining-node-should-not-drain", + expectDrained: 0, + expectMigrated: 0, + expectDone: true, + addAllocFn: func(i int, a *structs.Allocation, drainingID, runningID string) { a.NodeID = runningID }, }, { - // Some allocs on non-draining node but not healthy - Name: "SomeNonDrainingUnhealthy", - ExpectedDrained: 0, - ExpectedMigrated: 0, - ExpectedDone: false, - AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) { + // even unhealthy allocs on a non-draining node should not be drained + name: "unhealthy-allocs-on-non-draining-node-should-not-drain", + expectDrained: 0, + expectMigrated: 0, + expectDone: false, + addAllocFn: func(i int, a *structs.Allocation, drainingID, runningID string) { if i%2 == 0 { a.NodeID = runningID a.DeploymentStatus = nil @@ -366,24 +368,24 @@ func TestHandeTaskGroup_Table(t *testing.T) { }, }, { - // One draining, other allocs on non-draining node and healthy - Name: "OneDraining", - ExpectedDrained: 1, - ExpectedMigrated: 0, - ExpectedDone: false, - AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) { + // only the alloc on draining node should be drained + name: "healthy-alloc-draining-node-should-drain", + expectDrained: 1, + expectMigrated: 0, + expectDone: false, + addAllocFn: func(i int, a *structs.Allocation, drainingID, runningID string) { if i != 0 { a.NodeID = runningID } }, }, { - // One already draining, other allocs on non-draining node and healthy - Name: "OneAlreadyDraining", - ExpectedDrained: 0, - ExpectedMigrated: 0, - ExpectedDone: false, - AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) { + // alloc that's still draining doesn't produce more result updates + name: "still-draining-alloc-no-new-updates", + expectDrained: 0, + expectMigrated: 0, + expectDone: false, + addAllocFn: func(i int, a *structs.Allocation, drainingID, runningID string) { if i == 0 { a.DesiredTransition.Migrate = pointer.Of(true) return @@ -392,77 +394,97 @@ func TestHandeTaskGroup_Table(t *testing.T) { }, }, { - // One already drained, other allocs on non-draining node and healthy - Name: "OneAlreadyDrained", - ExpectedDrained: 0, - ExpectedMigrated: 1, - ExpectedDone: true, - AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) { + // alloc that's finished draining gets marked as migrated + name: "client-terminal-alloc-drain-should-be-finished", + expectDrained: 0, + expectMigrated: 1, + expectDone: true, + addAllocFn: func(i int, a *structs.Allocation, drainingID, runningID string) { if i == 0 { a.DesiredStatus = structs.AllocDesiredStatusStop + a.ClientStatus = structs.AllocClientStatusComplete return } a.NodeID = runningID }, }, { - // One already drained, other allocs on non-draining node and healthy - Name: "OneAlreadyDrainedBatched", - Batch: true, - ExpectedDrained: 0, - ExpectedMigrated: 1, - ExpectedDone: true, - AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) { + // batch alloc that's finished draining gets marked as migrated + name: "client-terminal-batch-alloc-drain-should-be-finished", + batch: true, + expectDrained: 0, + expectMigrated: 1, + expectDone: true, + addAllocFn: func(i int, a *structs.Allocation, drainingID, runningID string) { if i == 0 { a.DesiredStatus = structs.AllocDesiredStatusStop + a.ClientStatus = structs.AllocClientStatusComplete return } a.NodeID = runningID }, }, { - // All allocs are terminl, nothing to be drained - Name: "AllMigrating", - ExpectedDrained: 0, - ExpectedMigrated: 10, - ExpectedDone: true, - AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) { + // all allocs are client-terminal, so nothing left to drain + name: "all-client-terminal-drain-should-be-finished", + expectDrained: 0, + expectMigrated: 10, + expectDone: true, + addAllocFn: func(i int, a *structs.Allocation, drainingID, runningID string) { + a.DesiredStatus = structs.AllocDesiredStatusStop + a.ClientStatus = structs.AllocClientStatusComplete + }, + }, + { + // all allocs are terminal, but only half are client-terminal + name: "half-client-terminal-drain-should-not-be-finished", + expectDrained: 0, + expectMigrated: 5, + expectDone: false, + addAllocFn: func(i int, a *structs.Allocation, drainingID, runningID string) { a.DesiredStatus = structs.AllocDesiredStatusStop + if i%2 == 0 { + a.ClientStatus = structs.AllocClientStatusComplete + } }, }, { - // All allocs are terminl, nothing to be drained - Name: "AllMigratingBatch", - Batch: true, - ExpectedDrained: 0, - ExpectedMigrated: 10, - ExpectedDone: true, - AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) { + // All allocs are terminal, nothing to be drained + name: "all-terminal-batch", + batch: true, + expectDrained: 0, + expectMigrated: 10, + expectDone: true, + addAllocFn: func(i int, a *structs.Allocation, drainingID, runningID string) { a.DesiredStatus = structs.AllocDesiredStatusStop + a.ClientStatus = structs.AllocClientStatusComplete }, }, { - // All allocs may be drained at once - Name: "AllAtOnce", - ExpectedDrained: 10, - ExpectedMigrated: 0, - ExpectedDone: false, - MaxParallel: 10, + // with max_parallel=10, all allocs can be drained at once + name: "drain-respects-max-parallel-all-at-once", + expectDrained: 10, + expectMigrated: 0, + expectDone: false, + maxParallel: 10, }, { - // Drain 2 - Name: "Drain2", - ExpectedDrained: 2, - ExpectedMigrated: 0, - ExpectedDone: false, - MaxParallel: 2, + // with max_parallel=2, up to 2 allocs can be drained at a time + name: "drain-respects-max-parallel-2", + expectDrained: 2, + expectMigrated: 0, + expectDone: false, + maxParallel: 2, }, { - // One on new node, one drained, and one draining - ExpectedDrained: 1, - ExpectedMigrated: 1, - MaxParallel: 2, - AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) { + // with max_parallel=2, up to 2 allocs can be drained at a time but + // we haven't yet informed the drainer that 1 has completed + // migrating + name: "notify-migrated-1-on-new-1-drained-1-draining", + expectDrained: 1, + expectMigrated: 1, + maxParallel: 2, + addAllocFn: func(i int, a *structs.Allocation, drainingID, runningID string) { switch i { case 0: // One alloc on running node @@ -470,44 +492,55 @@ func TestHandeTaskGroup_Table(t *testing.T) { case 1: // One alloc already migrated a.DesiredStatus = structs.AllocDesiredStatusStop + a.ClientStatus = structs.AllocClientStatusComplete } }, }, { - // 8 on new node, one drained, and one draining - ExpectedDrained: 1, - ExpectedMigrated: 1, - MaxParallel: 2, - AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) { + // with max_parallel=2, up to 2 allocs can be drained at a time but + // we haven't yet informed the drainer that 1 has completed + // migrating + name: "notify-migrated-8-on-new-1-drained-1-draining", + expectDrained: 1, + expectMigrated: 1, + maxParallel: 2, + addAllocFn: func(i int, a *structs.Allocation, drainingID, runningID string) { switch i { case 0, 1, 2, 3, 4, 5, 6, 7: a.NodeID = runningID case 8: a.DesiredStatus = structs.AllocDesiredStatusStop + a.ClientStatus = structs.AllocClientStatusComplete } }, }, { // 5 on new node, two drained, and three draining - ExpectedDrained: 3, - ExpectedMigrated: 2, - MaxParallel: 5, - AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) { + // with max_parallel=5, up to 5 allocs can be drained at a time but + // we haven't yet informed the drainer that 2 have completed + // migrating + name: "notify-migrated-5-on-new-2-drained-3-draining", + expectDrained: 3, + expectMigrated: 2, + maxParallel: 5, + addAllocFn: func(i int, a *structs.Allocation, drainingID, runningID string) { switch i { case 0, 1, 2, 3, 4: a.NodeID = runningID case 8, 9: a.DesiredStatus = structs.AllocDesiredStatusStop + a.ClientStatus = structs.AllocClientStatusComplete } }, }, { - // Not all on new node have health set - Name: "PendingHealth", - ExpectedDrained: 1, - ExpectedMigrated: 1, - MaxParallel: 3, - AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) { + // half the allocs have been moved to the new node but 1 doesn't + // have health set yet, so we should have MaxParallel - 1 in flight + name: "pending-health-blocks", + expectDrained: 1, + expectMigrated: 1, + maxParallel: 3, + addAllocFn: func(i int, a *structs.Allocation, drainingID, runningID string) { switch i { case 0: // Deployment status UNset for 1 on new node @@ -518,16 +551,18 @@ func TestHandeTaskGroup_Table(t *testing.T) { a.NodeID = runningID case 9: a.DesiredStatus = structs.AllocDesiredStatusStop + a.ClientStatus = structs.AllocClientStatusComplete } }, }, { - // 5 max parallel - 1 migrating - 2 with unset health = 2 drainable - Name: "PendingHealthHigherMax", - ExpectedDrained: 2, - ExpectedMigrated: 1, - MaxParallel: 5, - AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) { + // half the allocs have been moved to the new node but 2 don't have + // health set yet, so we should have MaxParallel - 2 in flight + name: "pending-health-blocks-higher-max", + expectDrained: 2, + expectMigrated: 1, + maxParallel: 5, + addAllocFn: func(i int, a *structs.Allocation, drainingID, runningID string) { switch i { case 0, 1: // Deployment status UNset for 2 on new node @@ -538,73 +573,66 @@ func TestHandeTaskGroup_Table(t *testing.T) { a.NodeID = runningID case 9: a.DesiredStatus = structs.AllocDesiredStatusStop + a.ClientStatus = structs.AllocClientStatusComplete } }, }, } - for _, testCase := range cases { - t.Run(testCase.Name, func(t *testing.T) { - testHandleTaskGroup(t, testCase) - }) - } -} + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + ci.Parallel(t) -func testHandleTaskGroup(t *testing.T, tc handleTaskGroupTestCase) { - ci.Parallel(t) + // Create nodes + store := state.TestStateStore(t) + drainingNode, runningNode := testNodes(t, store) - require := require.New(t) - assert := assert.New(t) - - // Create nodes - state := state.TestStateStore(t) - drainingNode, runningNode := testNodes(t, state) + job := mock.Job() + if tc.batch { + job = mock.BatchJob() + } + job.TaskGroups[0].Count = 10 + if tc.allocCount > 0 { + job.TaskGroups[0].Count = tc.allocCount + } + if tc.maxParallel > 0 { + job.TaskGroups[0].Migrate.MaxParallel = tc.maxParallel + } + must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, 102, nil, job)) - job := mock.Job() - if tc.Batch { - job = mock.BatchJob() - } - job.TaskGroups[0].Count = 10 - if tc.Count > 0 { - job.TaskGroups[0].Count = tc.Count - } - if tc.MaxParallel > 0 { - job.TaskGroups[0].Migrate.MaxParallel = tc.MaxParallel - } - require.Nil(state.UpsertJob(structs.MsgTypeTestSetup, 102, nil, job)) + var allocs []*structs.Allocation + for i := 0; i < 10; i++ { + a := mock.Alloc() + if tc.batch { + a = mock.BatchAlloc() + } + a.JobID = job.ID + a.Job = job + a.TaskGroup = job.TaskGroups[0].Name + + // Default to being healthy on the draining node + a.NodeID = drainingNode.ID + a.DeploymentStatus = &structs.AllocDeploymentStatus{ + Healthy: pointer.Of(true), + } + if tc.addAllocFn != nil { + tc.addAllocFn(i, a, drainingNode.ID, runningNode.ID) + } + allocs = append(allocs, a) + } - var allocs []*structs.Allocation - for i := 0; i < 10; i++ { - a := mock.Alloc() - if tc.Batch { - a = mock.BatchAlloc() - } - a.JobID = job.ID - a.Job = job - a.TaskGroup = job.TaskGroups[0].Name + must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, 103, allocs)) + snap, err := store.Snapshot() + must.NoError(t, err) - // Default to being healthy on the draining node - a.NodeID = drainingNode.ID - a.DeploymentStatus = &structs.AllocDeploymentStatus{ - Healthy: pointer.Of(true), - } - if tc.AddAlloc != nil { - tc.AddAlloc(i, a, drainingNode.ID, runningNode.ID) - } - allocs = append(allocs, a) + res := newJobResult() + must.NoError(t, handleTaskGroup(snap, tc.batch, job.TaskGroups[0], allocs, 102, res)) + test.Len(t, tc.expectDrained, res.drain, test.Sprint("expected drained allocs")) + test.Len(t, tc.expectMigrated, res.migrated, test.Sprint("expected migrated allocs")) + test.Eq(t, tc.expectDone, res.done) + }) } - - require.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, 103, allocs)) - snap, err := state.Snapshot() - require.Nil(err) - - res := newJobResult() - require.Nil(handleTaskGroup(snap, tc.Batch, job.TaskGroups[0], allocs, 102, res)) - assert.Lenf(res.drain, tc.ExpectedDrained, "Drain expected %d but found: %d", - tc.ExpectedDrained, len(res.drain)) - assert.Lenf(res.migrated, tc.ExpectedMigrated, "Migrate expected %d but found: %d", - tc.ExpectedMigrated, len(res.migrated)) - assert.Equal(tc.ExpectedDone, res.done) } func TestHandleTaskGroup_Migrations(t *testing.T) { @@ -638,6 +666,7 @@ func TestHandleTaskGroup_Migrations(t *testing.T) { if i%2 == 0 { a.DesiredStatus = structs.AllocDesiredStatusStop + a.ClientStatus = structs.AllocClientStatusComplete } else { a.ClientStatus = structs.AllocClientStatusFailed } @@ -707,6 +736,7 @@ func TestHandleTaskGroup_GarbageCollectedNode(t *testing.T) { if i%2 == 0 { a.DesiredStatus = structs.AllocDesiredStatusStop + a.ClientStatus = structs.AllocClientStatusComplete } else { a.ClientStatus = structs.AllocClientStatusFailed } diff --git a/nomad/drainer_int_test.go b/nomad/drainer_int_test.go index 76d70e7de1a8..4a9dc49d4f2a 100644 --- a/nomad/drainer_int_test.go +++ b/nomad/drainer_int_test.go @@ -56,16 +56,29 @@ func allocClientStateSimulator(t *testing.T, errCh chan<- error, ctx context.Con continue } - if alloc.DeploymentStatus.HasHealth() { - continue // only update to healthy once - } - newAlloc := alloc.Copy() - newAlloc.DeploymentStatus = &structs.AllocDeploymentStatus{ - Healthy: pointer.Of(true), - Timestamp: now, + switch alloc.DesiredStatus { + case structs.AllocDesiredStatusRun: + if alloc.DeploymentStatus.HasHealth() { + continue // only update to healthy once + } + newAlloc := alloc.Copy() + newAlloc.DeploymentStatus = &structs.AllocDeploymentStatus{ + Healthy: pointer.Of(true), + Timestamp: now, + } + updates = append(updates, newAlloc) + logger.Trace("marking deployment health for alloc", "alloc_id", alloc.ID) + + case structs.AllocDesiredStatusStop, structs.AllocDesiredStatusEvict: + if alloc.ClientStatus == structs.AllocClientStatusComplete { + continue // only update to complete once + } + newAlloc := alloc.Copy() + newAlloc.ClientStatus = structs.AllocClientStatusComplete + updates = append(updates, newAlloc) + logger.Trace("marking alloc complete", "alloc_id", alloc.ID) } - updates = append(updates, newAlloc) - logger.Trace("marking deployment health for alloc", "alloc_id", alloc.ID) + } if len(updates) == 0 {