Skip to content

Commit

Permalink
Refactor assertOps into a helper func
Browse files Browse the repository at this point in the history
  • Loading branch information
schmichael committed Mar 20, 2018
1 parent 00ae04e commit 94fb0c0
Showing 1 changed file with 53 additions and 48 deletions.
101 changes: 53 additions & 48 deletions nomad/drainer/watch_jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,50 @@ func TestDrainingJobWatcher_Interface(t *testing.T) {
var _ DrainingJobWatcher = w
}

// asertJobWatcherOps asserts a certain number of allocs are drained and/or
// migrated by the job watcher.
func assertJobWatcherOps(t *testing.T, jw DrainingJobWatcher, drained, migrated int) (
*DrainRequest, []*structs.Allocation) {
t.Helper()
var (
drains *DrainRequest
migrations []*structs.Allocation
drainsChecked, migrationsChecked bool
)
for {
select {
case drains = <-jw.Drain():
ids := make([]string, len(drains.Allocs))
for i, a := range drains.Allocs {
ids[i] = a.JobID[:6] + ":" + a.ID[:6]
}
t.Logf("draining %d allocs: %v", len(ids), ids)
require.False(t, drainsChecked, "drains already received")
drainsChecked = true
require.Lenf(t, drains.Allocs, drained,
"expected %d drains but found %d", drained, len(drains.Allocs))
case migrations = <-jw.Migrated():
ids := make([]string, len(migrations))
for i, a := range migrations {
ids[i] = a.JobID[:6] + ":" + a.ID[:6]
}
t.Logf("migrating %d allocs: %v", len(ids), ids)
require.False(t, migrationsChecked, "migrations already received")
migrationsChecked = true
require.Lenf(t, migrations, migrated,
"expected %d migrations but found %d", migrated, len(migrations))
case <-time.After(10 * time.Millisecond):
if !drainsChecked && drained > 0 {
t.Fatalf("expected %d drains but none happened", drained)
}
if !migrationsChecked && migrated > 0 {
t.Fatalf("expected %d migrations but none happened", migrated)
}
return drains, migrations
}
}
}

// TestDrainingJobWatcher_DrainJobs asserts DrainingJobWatcher batches
// allocation changes from multiple jobs.
func TestDrainingJobWatcher_DrainJobs(t *testing.T) {
Expand Down Expand Up @@ -105,47 +149,8 @@ func TestDrainingJobWatcher_DrainJobs(t *testing.T) {
// updating the batch future
jobWatcher.RegisterJobs(jnss)

// assertOps asserts how many allocs should be drained and migrated.
// The drains and migrations - if any - are returned.
assertOps := func(drained, migrated int) (drains *DrainRequest, migrations []*structs.Allocation) {
t.Helper()
var drainsChecked, migrationsChecked bool
for {
select {
case drains = <-jobWatcher.Drain():
ids := make([]string, len(drains.Allocs))
for i, a := range drains.Allocs {
ids[i] = a.JobID[:6] + ":" + a.ID[:6]
}
t.Logf("draining %d allocs: %v", len(ids), ids)
require.False(drainsChecked, "drains already received")
drainsChecked = true
require.Lenf(drains.Allocs, drained,
"expected %d drains but found %d", drained, len(drains.Allocs))
case migrations = <-jobWatcher.Migrated():
ids := make([]string, len(migrations))
for i, a := range migrations {
ids[i] = a.JobID[:6] + ":" + a.ID[:6]
}
t.Logf("migrating %d allocs: %v", len(ids), ids)
require.False(migrationsChecked, "migrations already received")
migrationsChecked = true
require.Lenf(migrations, migrated,
"expected %d migrations but found %d", migrated, len(migrations))
case <-time.After(10 * time.Millisecond):
if !drainsChecked && drained > 0 {
t.Fatalf("expected %d drains but none happened", drained)
}
if !migrationsChecked && migrated > 0 {
t.Fatalf("expected %d migrations but none happened", migrated)
}
return drains, migrations
}
}
}

// Expect a first batch of MaxParallel allocs from each job
drains, _ := assertOps(6, 0)
drains, _ := assertJobWatcherOps(t, jobWatcher, 6, 0)

// Fake migrating the drained allocs by starting new ones and stopping
// the old ones
Expand All @@ -161,7 +166,7 @@ func TestDrainingJobWatcher_DrainJobs(t *testing.T) {
index++

// Just setting ShouldMigrate should not cause any further drains
assertOps(0, 0)
assertJobWatcherOps(t, jobWatcher, 0, 0)

// Proceed our fake migration along by creating new allocs and stopping
// old ones
Expand All @@ -188,7 +193,7 @@ func TestDrainingJobWatcher_DrainJobs(t *testing.T) {

// The drained allocs stopping cause migrations but no new drains
// because the replacements have not started
assertOps(0, 6)
assertJobWatcherOps(t, jobWatcher, 0, 6)

// Finally kickoff further drain activity by "starting" replacements
for _, a := range replacements {
Expand All @@ -203,7 +208,7 @@ func TestDrainingJobWatcher_DrainJobs(t *testing.T) {
require.NotEmpty(jobWatcher.drainingJobs())

// 6 new drains
drains, _ = assertOps(6, 0)
drains, _ = assertJobWatcherOps(t, jobWatcher, 6, 0)

// Fake migrations once more to finish the drain
drainedAllocs = make([]*structs.Allocation, len(drains.Allocs))
Expand All @@ -217,7 +222,7 @@ func TestDrainingJobWatcher_DrainJobs(t *testing.T) {
drains.Resp.Respond(index, nil)
index++

assertOps(0, 0)
assertJobWatcherOps(t, jobWatcher, 0, 0)

replacements = make([]*structs.Allocation, len(drainedAllocs))
updates = make([]*structs.Allocation, 0, len(drainedAllocs)*2)
Expand All @@ -232,7 +237,7 @@ func TestDrainingJobWatcher_DrainJobs(t *testing.T) {
require.Nil(state.UpsertAllocs(index, updates))
index++

assertOps(0, 6)
assertJobWatcherOps(t, jobWatcher, 0, 6)

for _, a := range replacements {
a.ClientStatus = structs.AllocClientStatusRunning
Expand All @@ -246,7 +251,7 @@ func TestDrainingJobWatcher_DrainJobs(t *testing.T) {
require.NotEmpty(jobWatcher.drainingJobs())

// Final 4 new drains
drains, _ = assertOps(4, 0)
drains, _ = assertJobWatcherOps(t, jobWatcher, 4, 0)

// Fake migrations once more to finish the drain
drainedAllocs = make([]*structs.Allocation, len(drains.Allocs))
Expand All @@ -260,7 +265,7 @@ func TestDrainingJobWatcher_DrainJobs(t *testing.T) {
drains.Resp.Respond(index, nil)
index++

assertOps(0, 0)
assertJobWatcherOps(t, jobWatcher, 0, 0)

replacements = make([]*structs.Allocation, len(drainedAllocs))
updates = make([]*structs.Allocation, 0, len(drainedAllocs)*2)
Expand All @@ -275,7 +280,7 @@ func TestDrainingJobWatcher_DrainJobs(t *testing.T) {
require.Nil(state.UpsertAllocs(index, updates))
index++

assertOps(0, 4)
assertJobWatcherOps(t, jobWatcher, 0, 4)

for _, a := range replacements {
a.ClientStatus = structs.AllocClientStatusRunning
Expand Down

0 comments on commit 94fb0c0

Please sign in to comment.