Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destroy task group leader first #2753

Merged
merged 1 commit into from
Jul 3, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -731,10 +731,22 @@ func (r *AllocRunner) SetPreviousAllocDir(allocDir *allocdir.AllocDir) {
// destroyTaskRunners destroys the task runners, waits for them to terminate and
// then saves state.
func (r *AllocRunner) destroyTaskRunners(destroyEvent *structs.TaskEvent) {
// Destroy each sub-task
runners := r.getTaskRunners()

// First destroy the leader
for _, tr := range runners {
if tr.task.Leader {
r.logger.Printf("[DEBUG] client: destroying leader task %q of task group %q first", tr.task.Name, tr.alloc.TaskGroup)
tr.Destroy(destroyEvent)
<-tr.WaitCh()
}
}

// Then destroy non-leader tasks concurrently
for _, tr := range runners {
tr.Destroy(destroyEvent)
if !tr.task.Leader {
tr.Destroy(destroyEvent)
}
}

// Wait for termination of the task runners
Expand Down
58 changes: 58 additions & 0 deletions client/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -858,6 +858,64 @@ func TestAllocRunner_TaskLeader_KillTG(t *testing.T) {
})
}

// TestAllocRunner_TaskLeader_StopTG asserts that when stopping a task group
// with a leader the leader is stopped before other tasks.
func TestAllocRunner_TaskLeader_StopTG(t *testing.T) {
upd, ar := testAllocRunner(false)

// Create 3 tasks in the task group
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
task.Name = "follower1"
task.Driver = "mock_driver"
task.KillTimeout = 10 * time.Millisecond
task.Config = map[string]interface{}{
"run_for": "10s",
}

task2 := ar.alloc.Job.TaskGroups[0].Tasks[0].Copy()
task2.Name = "leader"
task2.Driver = "mock_driver"
task2.Leader = true
task2.KillTimeout = 10 * time.Millisecond
task2.Config = map[string]interface{}{
"run_for": "10s",
}

task3 := ar.alloc.Job.TaskGroups[0].Tasks[0].Copy()
task3.Name = "follower2"
task3.Driver = "mock_driver"
task3.KillTimeout = 10 * time.Millisecond
task3.Config = map[string]interface{}{
"run_for": "10s",
}
ar.alloc.Job.TaskGroups[0].Tasks = append(ar.alloc.Job.TaskGroups[0].Tasks, task2, task3)
ar.alloc.TaskResources[task2.Name] = task2.Resources

// Destroy before running so it shuts down the alloc runner right after
// starting all tasks
ar.Destroy()
go ar.Run()
select {
case <-ar.WaitCh():
case <-time.After(8 * time.Second):
t.Fatalf("timed out waiting for alloc runner to exit")
}

if len(upd.Allocs) != 1 {
t.Fatalf("expected 1 alloc update but found %d", len(upd.Allocs))
}

a := upd.Allocs[0]
if a.TaskStates["leader"].FinishedAt.UnixNano() >= a.TaskStates["follower1"].FinishedAt.UnixNano() {
t.Fatalf("expected leader to finish before follower1: %s >= %s",
a.TaskStates["leader"].FinishedAt, a.TaskStates["follower1"].FinishedAt)
}
if a.TaskStates["leader"].FinishedAt.UnixNano() >= a.TaskStates["follower2"].FinishedAt.UnixNano() {
t.Fatalf("expected leader to finish before follower2: %s >= %s",
a.TaskStates["leader"].FinishedAt, a.TaskStates["follower2"].FinishedAt)
}
}

func TestAllocRunner_MoveAllocDir(t *testing.T) {
// Create an alloc runner
alloc := mock.Alloc()
Expand Down