Skip to content

Commit

Permalink
Test job watcher
Browse files Browse the repository at this point in the history
  • Loading branch information
schmichael committed Mar 16, 2018
1 parent 5733c7c commit d360047
Showing 1 changed file with 183 additions and 95 deletions.
278 changes: 183 additions & 95 deletions nomad/drainer/watch_jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,32 +33,46 @@ func testNodes(t *testing.T, state *state.StateStore) (drainingNode, runningNode
return n1, n2
}

func testDrainingJobWatcher(t *testing.T, state *state.StateStore) *drainingJobWatcher {
func testDrainingJobWatcher(t *testing.T, state *state.StateStore) (*drainingJobWatcher, context.CancelFunc) {
t.Helper()

limiter := rate.NewLimiter(100.0, 100)
logger := testlog.Logger(t)
w := NewDrainingJobWatcher(context.Background(), limiter, state, logger)
return w
ctx, cancel := context.WithCancel(context.Background())
w := NewDrainingJobWatcher(ctx, limiter, state, logger)
return w, cancel
}

// TestDrainingJobWatcher_Interface is a compile-time assertion that we
// implement the intended interface.
func TestDrainingJobWatcher_Interface(t *testing.T) {
var _ DrainingJobWatcher = testDrainingJobWatcher(t, state.TestStateStore(t))
w, cancel := testDrainingJobWatcher(t, state.TestStateStore(t))
cancel()
var _ DrainingJobWatcher = w
}

// TestDrainingJobWatcher_DrainJobs asserts DrainingJobWatcher batches
// allocation changes from multiple jobs.
func TestDrainingJobWatcher_Batching(t *testing.T) {
func TestDrainingJobWatcher_DrainJobs(t *testing.T) {
t.Parallel()
require := require.New(t)

state := state.TestStateStore(t)
jobWatcher := testDrainingJobWatcher(t, state)
drainingNode, _ := testNodes(t, state)
jobWatcher, cancelWatcher := testDrainingJobWatcher(t, state)
defer cancelWatcher()
drainingNode, runningNode := testNodes(t, state)

var index uint64 = 101
count := 8

newAlloc := func(node *structs.Node, job *structs.Job) *structs.Allocation {
a := mock.Alloc()
a.JobID = job.ID
a.Job = job
a.TaskGroup = job.TaskGroups[0].Name
a.NodeID = node.ID
return a
}

// 2 jobs with count 10, max parallel 3
jnss := make([]structs.JobNs, 2)
Expand All @@ -68,17 +82,13 @@ func TestDrainingJobWatcher_Batching(t *testing.T) {
jobs[i] = job
jnss[i] = structs.NewJobNs(job.Namespace, job.ID)
job.TaskGroups[0].Migrate.MaxParallel = 3
job.TaskGroups[0].Count = 10
job.TaskGroups[0].Count = count
require.Nil(state.UpsertJob(index, job))
index++

var allocs []*structs.Allocation
for i := 0; i < 10; i++ {
a := mock.Alloc()
a.JobID = job.ID
a.Job = job
a.TaskGroup = job.TaskGroups[0].Name
a.NodeID = drainingNode.ID
for i := 0; i < count; i++ {
a := newAlloc(drainingNode, job)
a.DeploymentStatus = &structs.AllocDeploymentStatus{
Healthy: helper.BoolToPtr(true),
}
Expand All @@ -95,114 +105,192 @@ func TestDrainingJobWatcher_Batching(t *testing.T) {
// updating the batch future
jobWatcher.RegisterJobs(jnss)

// Expect a first batch of MaxParallel allocs from each job
drainedAllocs := make([]*structs.Allocation, 6)
select {
case drains := <-jobWatcher.Drain():
require.Len(drains.Allocs, 6)
allocsPerJob := make(map[string]int, 2)
ids := make([]string, len(drains.Allocs))
for i, a := range drains.Allocs {
ids[i] = a.ID[:6]
allocsPerJob[a.JobID]++
drainedAllocs[i] = a.Copy()
}
t.Logf("drains: %v", ids)
for _, j := range jobs {
require.Contains(allocsPerJob, j.ID)
require.Equal(j.TaskGroups[0].Migrate.MaxParallel, allocsPerJob[j.ID])
}
case <-time.After(time.Second):
t.Fatal("timeout waiting for allocs to drain")
}

// No more should be drained or migrated until the first batch is handled
assertNoops := func() {
// assertOps asserts how many allocs should be drained and migrated.
// The drains and migrations - if any - are returned.
assertOps := func(drained, migrated int) (drains *DrainRequest, migrations []*structs.Allocation) {
t.Helper()
select {
case drains := <-jobWatcher.Drain():
ids := []string{}
for _, a := range drains.Allocs {
ids = append(ids, a.ID[:6])
var drainsChecked, migrationsChecked bool
for {
select {
case drains = <-jobWatcher.Drain():
ids := make([]string, len(drains.Allocs))
for i, a := range drains.Allocs {
ids[i] = a.JobID[:6] + ":" + a.ID[:6]
}
t.Logf("draining %d allocs: %v", len(ids), ids)
require.False(drainsChecked, "drains already received")
drainsChecked = true
require.Lenf(drains.Allocs, drained,
"expected %d drains but found %d", drained, len(drains.Allocs))
case migrations = <-jobWatcher.Migrated():
ids := make([]string, len(migrations))
for i, a := range migrations {
ids[i] = a.JobID[:6] + ":" + a.ID[:6]
}
t.Logf("migrating %d allocs: %v", len(ids), ids)
require.False(migrationsChecked, "migrations already received")
migrationsChecked = true
require.Lenf(migrations, migrated,
"expected %d migrations but found %d", migrated, len(migrations))
case <-time.After(10 * time.Millisecond):
if !drainsChecked && drained > 0 {
t.Fatalf("expected %d drains but none happened", drained)
}
if !migrationsChecked && migrated > 0 {
t.Fatalf("expected %d migrations but none happened", migrated)
}
return drains, migrations
}
t.Logf("drains: %v", ids)
t.Fatalf("unexpected batch of %d drains", len(drains.Allocs))
case migrations := <-jobWatcher.Migrated():
t.Fatalf("unexpected batch of %d migrations", len(migrations))
case <-time.After(10 * time.Millisecond):
// Ok! No unexpected activity
}
}
assertNoops()

// Expect a first batch of MaxParallel allocs from each job
drains, _ := assertOps(6, 0)

// Fake migrating the drained allocs by starting new ones and stopping
// the old ones
for _, a := range drainedAllocs {
drainedAllocs := make([]*structs.Allocation, len(drains.Allocs))
for i, a := range drains.Allocs {
a.DesiredTransition.Migrate = helper.BoolToPtr(true)

// create a copy so we can reuse this slice
drainedAllocs[i] = a.Copy()
}
require.Nil(state.UpsertAllocs(index, drainedAllocs))
drains.Resp.Respond(index, nil)
index++

// Just setting ShouldMigrate should not cause any further drains
//assertNoops()
t.Logf("FIXME - 1 Looks like just setting ShouldMigrate causes more drains?!?! This seems wrong but maybe it can't happen if the scheduler transitions from ShouldMigrate->DesiredStatus=stop atomically.")
drainedAllocs = make([]*structs.Allocation, 6)
select {
case drains := <-jobWatcher.Drain():
require.Len(drains.Allocs, 6)
allocsPerJob := make(map[string]int, 2)
ids := make([]string, len(drains.Allocs))
for i, a := range drains.Allocs {
ids[i] = a.ID[:6]
allocsPerJob[a.JobID]++
drainedAllocs[i] = a.Copy()
}
t.Logf("drains: %v", ids)
for _, j := range jobs {
require.Contains(allocsPerJob, j.ID)
require.Equal(j.TaskGroups[0].Migrate.MaxParallel, allocsPerJob[j.ID])
assertOps(0, 0)

// Proceed our fake migration along by creating new allocs and stopping
// old ones
replacements := make([]*structs.Allocation, len(drainedAllocs))
updates := make([]*structs.Allocation, 0, len(drainedAllocs)*2)
for i, a := range drainedAllocs {
// Stop drained allocs
a.DesiredTransition.Migrate = nil
a.DesiredStatus = structs.AllocDesiredStatusStop

// Create a replacement
replacement := mock.Alloc()
replacement.JobID = a.Job.ID
replacement.Job = a.Job
replacement.TaskGroup = a.TaskGroup
replacement.NodeID = runningNode.ID
// start in pending state with no health status

updates = append(updates, a, replacement)
replacements[i] = replacement.Copy()
}
require.Nil(state.UpsertAllocs(index, updates))
index++

// The drained allocs stopping cause migrations but no new drains
// because the replacements have not started
assertOps(0, 6)

// Finally kickoff further drain activity by "starting" replacements
for _, a := range replacements {
a.ClientStatus = structs.AllocClientStatusRunning
a.DeploymentStatus = &structs.AllocDeploymentStatus{
Healthy: helper.BoolToPtr(true),
}
case <-time.After(time.Second):
t.Fatal("timeout waiting for allocs to drain")
}
require.Nil(state.UpsertAllocs(index, replacements))
index++

require.NotEmpty(jobWatcher.drainingJobs())

for _, a := range drainedAllocs {
// 6 new drains
drains, _ = assertOps(6, 0)

// Fake migrations once more to finish the drain
drainedAllocs = make([]*structs.Allocation, len(drains.Allocs))
for i, a := range drains.Allocs {
a.DesiredTransition.Migrate = helper.BoolToPtr(true)

// create a copy so we can reuse this slice
drainedAllocs[i] = a.Copy()
}
require.Nil(state.UpsertAllocs(index, drainedAllocs[:1]))
require.Nil(state.UpsertAllocs(index, drainedAllocs))
drains.Resp.Respond(index, nil)
index++

// Just setting ShouldMigrate should not cause any further drains
//assertNoops()
t.Logf("FIXME - 2 Looks like just setting ShouldMigrate causes more drains?!?! This seems wrong but maybe it can't happen if the scheduler transitions from ShouldMigrate->DesiredStatus=stop atomically.")
drainedAllocs = make([]*structs.Allocation, 6)
select {
case drains := <-jobWatcher.Drain():
require.Len(drains.Allocs, 6)
allocsPerJob := make(map[string]int, 2)
ids := make([]string, len(drains.Allocs))
for i, a := range drains.Allocs {
ids[i] = a.ID[:6]
allocsPerJob[a.JobID]++
drainedAllocs[i] = a.Copy()
}
t.Logf("drains: %v", ids)
for _, j := range jobs {
require.Contains(allocsPerJob, j.ID)
require.Equal(j.TaskGroups[0].Migrate.MaxParallel, allocsPerJob[j.ID])
assertOps(0, 0)

replacements = make([]*structs.Allocation, len(drainedAllocs))
updates = make([]*structs.Allocation, 0, len(drainedAllocs)*2)
for i, a := range drainedAllocs {
a.DesiredTransition.Migrate = nil
a.DesiredStatus = structs.AllocDesiredStatusStop

replacement := newAlloc(runningNode, a.Job)
updates = append(updates, a, replacement)
replacements[i] = replacement.Copy()
}
require.Nil(state.UpsertAllocs(index, updates))
index++

assertOps(0, 6)

for _, a := range replacements {
a.ClientStatus = structs.AllocClientStatusRunning
a.DeploymentStatus = &structs.AllocDeploymentStatus{
Healthy: helper.BoolToPtr(true),
}
case <-time.After(time.Second):
t.Fatal("timeout waiting for allocs to drain")
}
require.Nil(state.UpsertAllocs(index, replacements))
index++

// Proceed our fake migration along by creating new allocs and stopping
// old ones
require.NotEmpty(jobWatcher.drainingJobs())

// Final 4 new drains
drains, _ = assertOps(4, 0)

// Fake migrations once more to finish the drain
drainedAllocs = make([]*structs.Allocation, len(drains.Allocs))
for i, a := range drains.Allocs {
a.DesiredTransition.Migrate = helper.BoolToPtr(true)

// create a copy so we can reuse this slice
drainedAllocs[i] = a.Copy()
}
require.Nil(state.UpsertAllocs(index, drainedAllocs))
drains.Resp.Respond(index, nil)
index++

assertOps(0, 0)

replacements = make([]*structs.Allocation, len(drainedAllocs))
updates = make([]*structs.Allocation, 0, len(drainedAllocs)*2)
for i, a := range drainedAllocs {
a.DesiredTransition.Migrate = nil
a.DesiredStatus = structs.AllocDesiredStatusStop

replacement := newAlloc(runningNode, a.Job)
updates = append(updates, a, replacement)
replacements[i] = replacement.Copy()
}
require.Nil(state.UpsertAllocs(index, updates))
index++

assertOps(0, 4)

for _, a := range replacements {
a.ClientStatus = structs.AllocClientStatusRunning
a.DeploymentStatus = &structs.AllocDeploymentStatus{
Healthy: helper.BoolToPtr(true),
}
}
require.Nil(state.UpsertAllocs(index, replacements))
index++

// No jobs should be left!
require.Empty(jobWatcher.drainingJobs())
}

// DrainingJobWatcher tests:
// TODO Test that jobs are deregistered when they have no more to migrate
// TODO Test that the watcher gets triggered on alloc changes
// TODO Test that the watcher cancels its query when a new job is registered

// handleTaskGroupTestCase is the test case struct for TestHandleTaskGroup
Expand Down

0 comments on commit d360047

Please sign in to comment.