From 94fb0c0015ee09a0242cc96ae4fec22024e99b13 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Mon, 19 Mar 2018 10:36:31 -0700 Subject: [PATCH] Refactor assertOps into a helper func --- nomad/drainer/watch_jobs_test.go | 101 ++++++++++++++++--------------- 1 file changed, 53 insertions(+), 48 deletions(-) diff --git a/nomad/drainer/watch_jobs_test.go b/nomad/drainer/watch_jobs_test.go index 399ee46a16ec..3a73938d713b 100644 --- a/nomad/drainer/watch_jobs_test.go +++ b/nomad/drainer/watch_jobs_test.go @@ -51,6 +51,50 @@ func TestDrainingJobWatcher_Interface(t *testing.T) { var _ DrainingJobWatcher = w } +// asertJobWatcherOps asserts a certain number of allocs are drained and/or +// migrated by the job watcher. +func assertJobWatcherOps(t *testing.T, jw DrainingJobWatcher, drained, migrated int) ( + *DrainRequest, []*structs.Allocation) { + t.Helper() + var ( + drains *DrainRequest + migrations []*structs.Allocation + drainsChecked, migrationsChecked bool + ) + for { + select { + case drains = <-jw.Drain(): + ids := make([]string, len(drains.Allocs)) + for i, a := range drains.Allocs { + ids[i] = a.JobID[:6] + ":" + a.ID[:6] + } + t.Logf("draining %d allocs: %v", len(ids), ids) + require.False(t, drainsChecked, "drains already received") + drainsChecked = true + require.Lenf(t, drains.Allocs, drained, + "expected %d drains but found %d", drained, len(drains.Allocs)) + case migrations = <-jw.Migrated(): + ids := make([]string, len(migrations)) + for i, a := range migrations { + ids[i] = a.JobID[:6] + ":" + a.ID[:6] + } + t.Logf("migrating %d allocs: %v", len(ids), ids) + require.False(t, migrationsChecked, "migrations already received") + migrationsChecked = true + require.Lenf(t, migrations, migrated, + "expected %d migrations but found %d", migrated, len(migrations)) + case <-time.After(10 * time.Millisecond): + if !drainsChecked && drained > 0 { + t.Fatalf("expected %d drains but none happened", drained) + } + if !migrationsChecked && migrated > 0 { + t.Fatalf("expected %d migrations but none happened", migrated) + } + return drains, migrations + } + } +} + // TestDrainingJobWatcher_DrainJobs asserts DrainingJobWatcher batches // allocation changes from multiple jobs. func TestDrainingJobWatcher_DrainJobs(t *testing.T) { @@ -105,47 +149,8 @@ func TestDrainingJobWatcher_DrainJobs(t *testing.T) { // updating the batch future jobWatcher.RegisterJobs(jnss) - // assertOps asserts how many allocs should be drained and migrated. - // The drains and migrations - if any - are returned. - assertOps := func(drained, migrated int) (drains *DrainRequest, migrations []*structs.Allocation) { - t.Helper() - var drainsChecked, migrationsChecked bool - for { - select { - case drains = <-jobWatcher.Drain(): - ids := make([]string, len(drains.Allocs)) - for i, a := range drains.Allocs { - ids[i] = a.JobID[:6] + ":" + a.ID[:6] - } - t.Logf("draining %d allocs: %v", len(ids), ids) - require.False(drainsChecked, "drains already received") - drainsChecked = true - require.Lenf(drains.Allocs, drained, - "expected %d drains but found %d", drained, len(drains.Allocs)) - case migrations = <-jobWatcher.Migrated(): - ids := make([]string, len(migrations)) - for i, a := range migrations { - ids[i] = a.JobID[:6] + ":" + a.ID[:6] - } - t.Logf("migrating %d allocs: %v", len(ids), ids) - require.False(migrationsChecked, "migrations already received") - migrationsChecked = true - require.Lenf(migrations, migrated, - "expected %d migrations but found %d", migrated, len(migrations)) - case <-time.After(10 * time.Millisecond): - if !drainsChecked && drained > 0 { - t.Fatalf("expected %d drains but none happened", drained) - } - if !migrationsChecked && migrated > 0 { - t.Fatalf("expected %d migrations but none happened", migrated) - } - return drains, migrations - } - } - } - // Expect a first batch of MaxParallel allocs from each job - drains, _ := assertOps(6, 0) + drains, _ := assertJobWatcherOps(t, jobWatcher, 6, 0) // Fake migrating the drained allocs by starting new ones and stopping // the old ones @@ -161,7 +166,7 @@ func TestDrainingJobWatcher_DrainJobs(t *testing.T) { index++ // Just setting ShouldMigrate should not cause any further drains - assertOps(0, 0) + assertJobWatcherOps(t, jobWatcher, 0, 0) // Proceed our fake migration along by creating new allocs and stopping // old ones @@ -188,7 +193,7 @@ func TestDrainingJobWatcher_DrainJobs(t *testing.T) { // The drained allocs stopping cause migrations but no new drains // because the replacements have not started - assertOps(0, 6) + assertJobWatcherOps(t, jobWatcher, 0, 6) // Finally kickoff further drain activity by "starting" replacements for _, a := range replacements { @@ -203,7 +208,7 @@ func TestDrainingJobWatcher_DrainJobs(t *testing.T) { require.NotEmpty(jobWatcher.drainingJobs()) // 6 new drains - drains, _ = assertOps(6, 0) + drains, _ = assertJobWatcherOps(t, jobWatcher, 6, 0) // Fake migrations once more to finish the drain drainedAllocs = make([]*structs.Allocation, len(drains.Allocs)) @@ -217,7 +222,7 @@ func TestDrainingJobWatcher_DrainJobs(t *testing.T) { drains.Resp.Respond(index, nil) index++ - assertOps(0, 0) + assertJobWatcherOps(t, jobWatcher, 0, 0) replacements = make([]*structs.Allocation, len(drainedAllocs)) updates = make([]*structs.Allocation, 0, len(drainedAllocs)*2) @@ -232,7 +237,7 @@ func TestDrainingJobWatcher_DrainJobs(t *testing.T) { require.Nil(state.UpsertAllocs(index, updates)) index++ - assertOps(0, 6) + assertJobWatcherOps(t, jobWatcher, 0, 6) for _, a := range replacements { a.ClientStatus = structs.AllocClientStatusRunning @@ -246,7 +251,7 @@ func TestDrainingJobWatcher_DrainJobs(t *testing.T) { require.NotEmpty(jobWatcher.drainingJobs()) // Final 4 new drains - drains, _ = assertOps(4, 0) + drains, _ = assertJobWatcherOps(t, jobWatcher, 4, 0) // Fake migrations once more to finish the drain drainedAllocs = make([]*structs.Allocation, len(drains.Allocs)) @@ -260,7 +265,7 @@ func TestDrainingJobWatcher_DrainJobs(t *testing.T) { drains.Resp.Respond(index, nil) index++ - assertOps(0, 0) + assertJobWatcherOps(t, jobWatcher, 0, 0) replacements = make([]*structs.Allocation, len(drainedAllocs)) updates = make([]*structs.Allocation, 0, len(drainedAllocs)*2) @@ -275,7 +280,7 @@ func TestDrainingJobWatcher_DrainJobs(t *testing.T) { require.Nil(state.UpsertAllocs(index, updates)) index++ - assertOps(0, 4) + assertJobWatcherOps(t, jobWatcher, 0, 4) for _, a := range replacements { a.ClientStatus = structs.AllocClientStatusRunning