From 38b27ea08abb262bf1889878c0b70fd12c5fa5a8 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Fri, 3 Nov 2017 15:42:15 -0700 Subject: [PATCH] Handle leader task being dead in RestoreState Fixes the panic mentioned in https://github.com/hashicorp/nomad/issues/3420#issuecomment-341666932 While a leader task dying serially stops all follower tasks, the synchronizing of state is asynchrnous. Nomad can shutdown before all follower tasks have updated their state to dead thus saving the state necessary to hit this panic: *have a non-terminal alloc with a dead leader.* The actual fix is a simple nil check to not assume non-terminal allocs leader's have a TaskRunner. --- client/alloc_runner.go | 16 +++++-- client/alloc_runner_test.go | 91 +++++++++++++++++++++++++++++++++++++ client/task_runner.go | 2 +- 3 files changed, 104 insertions(+), 5 deletions(-) diff --git a/client/alloc_runner.go b/client/alloc_runner.go index dc76a0cd6860..b45a553a6174 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -932,6 +932,7 @@ OUTER: r.logger.Printf("[WARN] client: failed to sync alloc %q status upon receiving alloc update: %v", r.allocID, err) } + case <-r.ctx.Done(): taskDestroyEvent = structs.NewTaskEvent(structs.TaskKilled) break OUTER @@ -967,10 +968,17 @@ func (r *AllocRunner) destroyTaskRunners(destroyEvent *structs.TaskEvent) { tr := r.tasks[leader] r.taskLock.RUnlock() - r.logger.Printf("[DEBUG] client: alloc %q destroying leader task %q of task group %q first", - r.allocID, leader, r.alloc.TaskGroup) - tr.Destroy(destroyEvent) - <-tr.WaitCh() + // Dead tasks don't have a task runner created so guard against + // the leader being dead when this AR was saved. + if tr == nil { + r.logger.Printf("[DEBUG] client: alloc %q leader task %q of task group %q already stopped", + r.allocID, leader, r.alloc.TaskGroup) + } else { + r.logger.Printf("[DEBUG] client: alloc %q destroying leader task %q of task group %q first", + r.allocID, leader, r.alloc.TaskGroup) + tr.Destroy(destroyEvent) + <-tr.WaitCh() + } } // Then destroy non-leader tasks concurrently diff --git a/client/alloc_runner_test.go b/client/alloc_runner_test.go index f53b01ef7ad4..47348e29bd00 100644 --- a/client/alloc_runner_test.go +++ b/client/alloc_runner_test.go @@ -1365,6 +1365,97 @@ func TestAllocRunner_TaskLeader_StopTG(t *testing.T) { }) } +// TestAllocRunner_TaskLeader_StopRestoredTG asserts that when stopping a +// restored task group with a leader that failed before restoring the leader is +// not stopped as it does not exist. +// See https://github.com/hashicorp/nomad/issues/3420#issuecomment-341666932 +func TestAllocRunner_TaskLeader_StopRestoredTG(t *testing.T) { + t.Parallel() + _, ar := testAllocRunner(false) + defer ar.Destroy() + + // Create a leader and follower task in the task group + task := ar.alloc.Job.TaskGroups[0].Tasks[0] + task.Name = "follower1" + task.Driver = "mock_driver" + task.KillTimeout = 10 * time.Second + task.Config = map[string]interface{}{ + "run_for": "10s", + } + + task2 := ar.alloc.Job.TaskGroups[0].Tasks[0].Copy() + task2.Name = "leader" + task2.Driver = "mock_driver" + task2.Leader = true + task2.KillTimeout = 10 * time.Millisecond + task2.Config = map[string]interface{}{ + "run_for": "0s", + } + + ar.alloc.Job.TaskGroups[0].Tasks = append(ar.alloc.Job.TaskGroups[0].Tasks, task2) + ar.alloc.TaskResources[task2.Name] = task2.Resources + + // Mimic Nomad exiting before the leader stopping is able to stop other tasks. + ar.tasks = map[string]*TaskRunner{ + "leader": NewTaskRunner(ar.logger, ar.config, ar.stateDB, ar.setTaskState, + ar.allocDir.NewTaskDir(task2.Name), ar.Alloc(), task2.Copy(), + ar.vaultClient, ar.consulClient), + "follower1": NewTaskRunner(ar.logger, ar.config, ar.stateDB, ar.setTaskState, + ar.allocDir.NewTaskDir(task.Name), ar.Alloc(), task.Copy(), + ar.vaultClient, ar.consulClient), + } + ar.taskStates = map[string]*structs.TaskState{ + "leader": {State: structs.TaskStateDead}, + "follower1": {State: structs.TaskStateRunning}, + } + if err := ar.SaveState(); err != nil { + t.Fatalf("error saving state: %v", err) + } + + // Create a new AllocRunner to test RestoreState and Run + upd2 := &MockAllocStateUpdater{} + ar2 := NewAllocRunner(ar.logger, ar.config, ar.stateDB, upd2.Update, ar.alloc, + ar.vaultClient, ar.consulClient, ar.prevAlloc) + defer ar2.Destroy() + + if err := ar2.RestoreState(); err != nil { + t.Fatalf("error restoring state: %v", err) + } + go ar2.Run() + + // Wait for tasks to be stopped because leader is dead + testutil.WaitForResult(func() (bool, error) { + _, last := upd2.Last() + if last == nil { + return false, fmt.Errorf("no updates yet") + } + if actual := last.TaskStates["leader"].State; actual != structs.TaskStateDead { + return false, fmt.Errorf("Task leader is not dead yet (it's %q)", actual) + } + if actual := last.TaskStates["follower1"].State; actual != structs.TaskStateDead { + return false, fmt.Errorf("Task follower1 is not dead yet (it's %q)", actual) + } + return true, nil + }, func(err error) { + count, last := upd2.Last() + t.Logf("Updates: %d", count) + for name, state := range last.TaskStates { + t.Logf("%s: %s", name, state.State) + } + t.Fatalf("err: %v", err) + }) + + // Make sure it GCs properly + ar2.Destroy() + + select { + case <-ar2.WaitCh(): + // exited as expected + case <-time.After(10 * time.Second): + t.Fatalf("timed out waiting for AR to GC") + } +} + // TestAllocRunner_MoveAllocDir asserts that a file written to an alloc's // local/ dir will be moved to a replacement alloc's local/ dir if sticky // volumes is on. diff --git a/client/task_runner.go b/client/task_runner.go index f7f2c043b070..f79bb87e774c 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -237,7 +237,7 @@ func NewTaskRunner(logger *log.Logger, config *config.Config, // Build the restart tracker. tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) if tg == nil { - logger.Printf("[ERR] client: alloc '%s' for missing task group '%s'", alloc.ID, alloc.TaskGroup) + logger.Printf("[ERR] client: alloc %q for missing task group %q", alloc.ID, alloc.TaskGroup) return nil } restartTracker := newRestartTracker(tg.RestartPolicy, alloc.Job.Type)