diff --git a/client/alloc_runner.go b/client/alloc_runner.go index d681c510ed23..60e7ae66eb08 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -48,12 +48,7 @@ type AllocRunner struct { restored map[string]struct{} taskLock sync.RWMutex - // taskReceivedTimer is used to mitigate updates sent to the server because - // we expect that shortly after receiving an alloc it will transistion - // state. We use a timer to send the update if this hasn't happened after a - // reasonable time. - taskReceivedTimer *time.Timer - taskStatusLock sync.RWMutex + taskStatusLock sync.RWMutex updateCh chan *structs.Allocation @@ -312,25 +307,18 @@ func (r *AllocRunner) setTaskState(taskName, state string, event *structs.TaskEv taskState.State = state r.appendTaskEvent(taskState, event) - // We don't immediately mark ourselves as dirty, since in most cases there - // will immediately be another state transistion. This reduces traffic to - // the server. - if event != nil && event.Type == structs.TaskReceived { - if r.taskReceivedTimer == nil { - r.taskReceivedTimer = time.AfterFunc(taskReceivedSyncLimit, func() { - // Send a dirty signal to sync our state. - select { - case r.dirtyCh <- struct{}{}: - default: - } - }) + // If the task failed, we should kill all the other tasks in the task group. + if state == structs.TaskStateDead && taskState.Failed() { + var destroyingTasks []string + for task, tr := range r.tasks { + if task != taskName { + destroyingTasks = append(destroyingTasks, task) + tr.Destroy() + } + } + if len(destroyingTasks) > 0 { + r.logger.Printf("[DEBUG] client: task %q failed, destroying other tasks in task group: %v", taskName, destroyingTasks) } - return - } - - // Cancel any existing received state timer. - if r.taskReceivedTimer != nil { - r.taskReceivedTimer.Stop() } select { @@ -405,6 +393,7 @@ func (r *AllocRunner) Run() { tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx, r.Alloc(), task.Copy()) r.tasks[task.Name] = tr + tr.MarkReceived() go tr.Run() } r.taskLock.Unlock() diff --git a/client/alloc_runner_test.go b/client/alloc_runner_test.go index f9c90577d3b8..97b3dc348440 100644 --- a/client/alloc_runner_test.go +++ b/client/alloc_runner_test.go @@ -279,11 +279,12 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) { func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) { ctestutil.ExecCompatible(t) upd, ar := testAllocRunner(false) + ar.logger = prefixedTestLogger("ar1: ") // Ensure task takes some time task := ar.alloc.Job.TaskGroups[0].Tasks[0] task.Config["command"] = "/bin/sleep" - task.Config["args"] = []string{"10"} + task.Config["args"] = []string{"1000"} go ar.Run() testutil.WaitForResult(func() (bool, error) { @@ -291,7 +292,7 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) { return false, fmt.Errorf("No updates") } last := upd.Allocs[upd.Count-1] - if last.ClientStatus == structs.AllocClientStatusRunning { + if last.ClientStatus != structs.AllocClientStatusRunning { return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusRunning) } return true, nil @@ -322,11 +323,13 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) { // Create a new alloc runner ar2 := NewAllocRunner(ar.logger, ar.config, upd.Update, &structs.Allocation{ID: ar.alloc.ID}) + ar2.logger = prefixedTestLogger("ar2: ") err = ar2.RestoreState() if err != nil { t.Fatalf("err: %v", err) } go ar2.Run() + ar2.logger.Println("[TESTING] starting second alloc runner") testutil.WaitForResult(func() (bool, error) { // Check the state still exists @@ -345,6 +348,7 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) { }) // Send the destroy signal and ensure the AllocRunner cleans up. + ar2.logger.Println("[TESTING] destroying second alloc runner") ar2.Destroy() testutil.WaitForResult(func() (bool, error) { @@ -377,3 +381,53 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) { t.Fatalf("err: %v", err) }) } + +func TestAllocRunner_TaskFailed_KillTG(t *testing.T) { + ctestutil.ExecCompatible(t) + upd, ar := testAllocRunner(false) + + // Create two tasks in the task group + task := ar.alloc.Job.TaskGroups[0].Tasks[0] + task.Config["command"] = "/bin/sleep" + task.Config["args"] = []string{"1000"} + + task2 := ar.alloc.Job.TaskGroups[0].Tasks[0].Copy() + task2.Name = "task 2" + task2.Config = map[string]interface{}{"command": "invalidBinaryToFail"} + ar.alloc.Job.TaskGroups[0].Tasks = append(ar.alloc.Job.TaskGroups[0].Tasks, task2) + ar.alloc.TaskResources[task2.Name] = task2.Resources + //t.Logf("%#v", ar.alloc.Job.TaskGroups[0]) + go ar.Run() + + testutil.WaitForResult(func() (bool, error) { + if upd.Count == 0 { + return false, fmt.Errorf("No updates") + } + last := upd.Allocs[upd.Count-1] + if last.ClientStatus != structs.AllocClientStatusFailed { + return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusFailed) + } + + // Task One should be killed + state1 := last.TaskStates[task.Name] + if state1.State != structs.TaskStateDead { + return false, fmt.Errorf("got state %v; want %v", state1.State, structs.TaskStateDead) + } + if lastE := state1.Events[len(state1.Events)-1]; lastE.Type != structs.TaskKilled { + return false, fmt.Errorf("got last event %v; want %v", lastE.Type, structs.TaskKilled) + } + + // Task Two should be failed + state2 := last.TaskStates[task2.Name] + if state2.State != structs.TaskStateDead { + return false, fmt.Errorf("got state %v; want %v", state2.State, structs.TaskStateDead) + } + if !state2.Failed() { + return false, fmt.Errorf("task2 should have failed") + } + + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) +} diff --git a/client/task_runner.go b/client/task_runner.go index 7c50ba8e0f51..5955376e6082 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -96,11 +96,14 @@ func NewTaskRunner(logger *log.Logger, config *config.Config, waitCh: make(chan struct{}), } - // Set the state to pending. - tc.updater(task.Name, structs.TaskStatePending, structs.NewTaskEvent(structs.TaskReceived)) return tc } +// MarkReceived marks the task as received. +func (r *TaskRunner) MarkReceived() { + r.updater(r.task.Name, structs.TaskStatePending, structs.NewTaskEvent(structs.TaskReceived)) +} + // WaitCh returns a channel to wait for termination func (r *TaskRunner) WaitCh() <-chan struct{} { return r.waitCh @@ -270,9 +273,8 @@ func (r *TaskRunner) run() { err := fmt.Errorf("task directory couldn't be found") r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskDriverFailure).SetDriverError(err)) r.logger.Printf("[ERR] client: task directory for alloc %q task %q couldn't be found", r.alloc.ID, r.task.Name) - - // Non-restartable error - return + r.restartTracker.SetStartError(err) + goto RESTART } for _, artifact := range r.task.Artifacts { diff --git a/client/task_runner_test.go b/client/task_runner_test.go index 908834fcc9c2..af695d935912 100644 --- a/client/task_runner_test.go +++ b/client/task_runner_test.go @@ -20,7 +20,11 @@ import ( ) func testLogger() *log.Logger { - return log.New(os.Stderr, "", log.LstdFlags) + return prefixedTestLogger("") +} + +func prefixedTestLogger(prefix string) *log.Logger { + return log.New(os.Stderr, prefix, log.LstdFlags) } type MockTaskStateUpdater struct { @@ -64,6 +68,7 @@ func testTaskRunnerFromAlloc(restarts bool, alloc *structs.Allocation) (*MockTas func TestTaskRunner_SimpleRun(t *testing.T) { ctestutil.ExecCompatible(t) upd, tr := testTaskRunner(false) + tr.MarkReceived() go tr.Run() defer tr.Destroy() defer tr.ctx.AllocDir.Destroy() @@ -98,6 +103,7 @@ func TestTaskRunner_SimpleRun(t *testing.T) { func TestTaskRunner_Destroy(t *testing.T) { ctestutil.ExecCompatible(t) upd, tr := testTaskRunner(true) + tr.MarkReceived() defer tr.ctx.AllocDir.Destroy() // Change command to ensure we run for a bit @@ -253,6 +259,7 @@ func TestTaskRunner_Download_List(t *testing.T) { task.Artifacts = []*structs.TaskArtifact{&artifact1, &artifact2} upd, tr := testTaskRunnerFromAlloc(false, alloc) + tr.MarkReceived() go tr.Run() defer tr.Destroy() defer tr.ctx.AllocDir.Destroy() @@ -317,6 +324,7 @@ func TestTaskRunner_Download_Retries(t *testing.T) { } upd, tr := testTaskRunnerFromAlloc(true, alloc) + tr.MarkReceived() go tr.Run() defer tr.Destroy() defer tr.ctx.AllocDir.Destroy()