Skip to content

Commit

Permalink
Destroy task group leader first
Browse files Browse the repository at this point in the history
Before this commit all tasks in a task group were destroyed
concurrently. This meant logging sidecars might be stopped before the
leader task whose logs still need to be shipped.

This commit blocks on the leader shutting down before signalling to
followers to shutdown.
  • Loading branch information
schmichael committed Jun 29, 2017
1 parent e9a55d9 commit fc80a2c
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 2 deletions.
16 changes: 14 additions & 2 deletions client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -731,10 +731,22 @@ func (r *AllocRunner) SetPreviousAllocDir(allocDir *allocdir.AllocDir) {
// destroyTaskRunners destroys the task runners, waits for them to terminate and
// then saves state.
func (r *AllocRunner) destroyTaskRunners(destroyEvent *structs.TaskEvent) {
// Destroy each sub-task
runners := r.getTaskRunners()

// First destroy the leader
for _, tr := range runners {
if tr.task.Leader {
r.logger.Printf("[DEBUG] client: destroying leader task %q of task group %q first", tr.task.Name, tr.alloc.TaskGroup)
tr.Destroy(destroyEvent)
<-tr.WaitCh()
}
}

// Then destroy non-leader tasks concurrently
for _, tr := range runners {
tr.Destroy(destroyEvent)
if !tr.task.Leader {
tr.Destroy(destroyEvent)
}
}

// Wait for termination of the task runners
Expand Down
58 changes: 58 additions & 0 deletions client/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -858,6 +858,64 @@ func TestAllocRunner_TaskLeader_KillTG(t *testing.T) {
})
}

// TestAllocRunner_TaskLeader_StopTG asserts that when stopping a task group
// with a leader the leader is stopped before other tasks.
func TestAllocRunner_TaskLeader_StopTG(t *testing.T) {
upd, ar := testAllocRunner(false)

// Create 3 tasks in the task group
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
task.Name = "follower1"
task.Driver = "mock_driver"
task.KillTimeout = 10 * time.Millisecond
task.Config = map[string]interface{}{
"run_for": "10s",
}

task2 := ar.alloc.Job.TaskGroups[0].Tasks[0].Copy()
task2.Name = "leader"
task2.Driver = "mock_driver"
task2.Leader = true
task2.KillTimeout = 10 * time.Millisecond
task2.Config = map[string]interface{}{
"run_for": "10s",
}

task3 := ar.alloc.Job.TaskGroups[0].Tasks[0].Copy()
task3.Name = "follower2"
task3.Driver = "mock_driver"
task3.KillTimeout = 10 * time.Millisecond
task3.Config = map[string]interface{}{
"run_for": "10s",
}
ar.alloc.Job.TaskGroups[0].Tasks = append(ar.alloc.Job.TaskGroups[0].Tasks, task2, task3)
ar.alloc.TaskResources[task2.Name] = task2.Resources

// Destroy before running so it shuts down the alloc runner right after
// starting all tasks
ar.Destroy()
go ar.Run()
select {
case <-ar.WaitCh():
case <-time.After(8 * time.Second):
t.Fatalf("timed out waiting for alloc runner to exit")
}

if len(upd.Allocs) != 1 {
t.Fatalf("expected 1 alloc update but found %d", len(upd.Allocs))
}

a := upd.Allocs[0]
if a.TaskStates["leader"].FinishedAt.UnixNano() >= a.TaskStates["follower1"].FinishedAt.UnixNano() {
t.Fatalf("expected leader to finish before follower1: %s >= %s",
a.TaskStates["leader"].FinishedAt, a.TaskStates["follower1"].FinishedAt)
}
if a.TaskStates["leader"].FinishedAt.UnixNano() >= a.TaskStates["follower2"].FinishedAt.UnixNano() {
t.Fatalf("expected leader to finish before follower2: %s >= %s",
a.TaskStates["leader"].FinishedAt, a.TaskStates["follower2"].FinishedAt)
}
}

func TestAllocRunner_MoveAllocDir(t *testing.T) {
// Create an alloc runner
alloc := mock.Alloc()
Expand Down

0 comments on commit fc80a2c

Please sign in to comment.