Skip to content

Commit

Permalink
Merge pull request #962 from hashicorp/b-failed-task-kills-tg
Browse files Browse the repository at this point in the history
client: When a task fails, kill all other tasks in the task group
  • Loading branch information
dadgar committed Mar 26, 2016
2 parents 425a25f + ff6028b commit 21abd52
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 32 deletions.
37 changes: 13 additions & 24 deletions client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,7 @@ type AllocRunner struct {
restored map[string]struct{}
taskLock sync.RWMutex

// taskReceivedTimer is used to mitigate updates sent to the server because
// we expect that shortly after receiving an alloc it will transistion
// state. We use a timer to send the update if this hasn't happened after a
// reasonable time.
taskReceivedTimer *time.Timer
taskStatusLock sync.RWMutex
taskStatusLock sync.RWMutex

updateCh chan *structs.Allocation

Expand Down Expand Up @@ -312,25 +307,18 @@ func (r *AllocRunner) setTaskState(taskName, state string, event *structs.TaskEv
taskState.State = state
r.appendTaskEvent(taskState, event)

// We don't immediately mark ourselves as dirty, since in most cases there
// will immediately be another state transistion. This reduces traffic to
// the server.
if event != nil && event.Type == structs.TaskReceived {
if r.taskReceivedTimer == nil {
r.taskReceivedTimer = time.AfterFunc(taskReceivedSyncLimit, func() {
// Send a dirty signal to sync our state.
select {
case r.dirtyCh <- struct{}{}:
default:
}
})
// If the task failed, we should kill all the other tasks in the task group.
if state == structs.TaskStateDead && taskState.Failed() {
var destroyingTasks []string
for task, tr := range r.tasks {
if task != taskName {
destroyingTasks = append(destroyingTasks, task)
tr.Destroy()
}
}
if len(destroyingTasks) > 0 {
r.logger.Printf("[DEBUG] client: task %q failed, destroying other tasks in task group: %v", taskName, destroyingTasks)
}
return
}

// Cancel any existing received state timer.
if r.taskReceivedTimer != nil {
r.taskReceivedTimer.Stop()
}

select {
Expand Down Expand Up @@ -405,6 +393,7 @@ func (r *AllocRunner) Run() {
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx, r.Alloc(),
task.Copy())
r.tasks[task.Name] = tr
tr.MarkReceived()
go tr.Run()
}
r.taskLock.Unlock()
Expand Down
58 changes: 56 additions & 2 deletions client/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,19 +279,20 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) {
func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) {
ctestutil.ExecCompatible(t)
upd, ar := testAllocRunner(false)
ar.logger = prefixedTestLogger("ar1: ")

// Ensure task takes some time
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
task.Config["command"] = "/bin/sleep"
task.Config["args"] = []string{"10"}
task.Config["args"] = []string{"1000"}
go ar.Run()

testutil.WaitForResult(func() (bool, error) {
if upd.Count == 0 {
return false, fmt.Errorf("No updates")
}
last := upd.Allocs[upd.Count-1]
if last.ClientStatus == structs.AllocClientStatusRunning {
if last.ClientStatus != structs.AllocClientStatusRunning {
return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusRunning)
}
return true, nil
Expand Down Expand Up @@ -322,11 +323,13 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) {
// Create a new alloc runner
ar2 := NewAllocRunner(ar.logger, ar.config, upd.Update,
&structs.Allocation{ID: ar.alloc.ID})
ar2.logger = prefixedTestLogger("ar2: ")
err = ar2.RestoreState()
if err != nil {
t.Fatalf("err: %v", err)
}
go ar2.Run()
ar2.logger.Println("[TESTING] starting second alloc runner")

testutil.WaitForResult(func() (bool, error) {
// Check the state still exists
Expand All @@ -345,6 +348,7 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) {
})

// Send the destroy signal and ensure the AllocRunner cleans up.
ar2.logger.Println("[TESTING] destroying second alloc runner")
ar2.Destroy()

testutil.WaitForResult(func() (bool, error) {
Expand Down Expand Up @@ -377,3 +381,53 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) {
t.Fatalf("err: %v", err)
})
}

func TestAllocRunner_TaskFailed_KillTG(t *testing.T) {
ctestutil.ExecCompatible(t)
upd, ar := testAllocRunner(false)

// Create two tasks in the task group
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
task.Config["command"] = "/bin/sleep"
task.Config["args"] = []string{"1000"}

task2 := ar.alloc.Job.TaskGroups[0].Tasks[0].Copy()
task2.Name = "task 2"
task2.Config = map[string]interface{}{"command": "invalidBinaryToFail"}
ar.alloc.Job.TaskGroups[0].Tasks = append(ar.alloc.Job.TaskGroups[0].Tasks, task2)
ar.alloc.TaskResources[task2.Name] = task2.Resources
//t.Logf("%#v", ar.alloc.Job.TaskGroups[0])
go ar.Run()

testutil.WaitForResult(func() (bool, error) {
if upd.Count == 0 {
return false, fmt.Errorf("No updates")
}
last := upd.Allocs[upd.Count-1]
if last.ClientStatus != structs.AllocClientStatusFailed {
return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusFailed)
}

// Task One should be killed
state1 := last.TaskStates[task.Name]
if state1.State != structs.TaskStateDead {
return false, fmt.Errorf("got state %v; want %v", state1.State, structs.TaskStateDead)
}
if lastE := state1.Events[len(state1.Events)-1]; lastE.Type != structs.TaskKilled {
return false, fmt.Errorf("got last event %v; want %v", lastE.Type, structs.TaskKilled)
}

// Task Two should be failed
state2 := last.TaskStates[task2.Name]
if state2.State != structs.TaskStateDead {
return false, fmt.Errorf("got state %v; want %v", state2.State, structs.TaskStateDead)
}
if !state2.Failed() {
return false, fmt.Errorf("task2 should have failed")
}

return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
12 changes: 7 additions & 5 deletions client/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,14 @@ func NewTaskRunner(logger *log.Logger, config *config.Config,
waitCh: make(chan struct{}),
}

// Set the state to pending.
tc.updater(task.Name, structs.TaskStatePending, structs.NewTaskEvent(structs.TaskReceived))
return tc
}

// MarkReceived marks the task as received.
func (r *TaskRunner) MarkReceived() {
r.updater(r.task.Name, structs.TaskStatePending, structs.NewTaskEvent(structs.TaskReceived))
}

// WaitCh returns a channel to wait for termination
func (r *TaskRunner) WaitCh() <-chan struct{} {
return r.waitCh
Expand Down Expand Up @@ -270,9 +273,8 @@ func (r *TaskRunner) run() {
err := fmt.Errorf("task directory couldn't be found")
r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskDriverFailure).SetDriverError(err))
r.logger.Printf("[ERR] client: task directory for alloc %q task %q couldn't be found", r.alloc.ID, r.task.Name)

// Non-restartable error
return
r.restartTracker.SetStartError(err)
goto RESTART
}

for _, artifact := range r.task.Artifacts {
Expand Down
10 changes: 9 additions & 1 deletion client/task_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@ import (
)

func testLogger() *log.Logger {
return log.New(os.Stderr, "", log.LstdFlags)
return prefixedTestLogger("")
}

func prefixedTestLogger(prefix string) *log.Logger {
return log.New(os.Stderr, prefix, log.LstdFlags)
}

type MockTaskStateUpdater struct {
Expand Down Expand Up @@ -64,6 +68,7 @@ func testTaskRunnerFromAlloc(restarts bool, alloc *structs.Allocation) (*MockTas
func TestTaskRunner_SimpleRun(t *testing.T) {
ctestutil.ExecCompatible(t)
upd, tr := testTaskRunner(false)
tr.MarkReceived()
go tr.Run()
defer tr.Destroy()
defer tr.ctx.AllocDir.Destroy()
Expand Down Expand Up @@ -98,6 +103,7 @@ func TestTaskRunner_SimpleRun(t *testing.T) {
func TestTaskRunner_Destroy(t *testing.T) {
ctestutil.ExecCompatible(t)
upd, tr := testTaskRunner(true)
tr.MarkReceived()
defer tr.ctx.AllocDir.Destroy()

// Change command to ensure we run for a bit
Expand Down Expand Up @@ -253,6 +259,7 @@ func TestTaskRunner_Download_List(t *testing.T) {
task.Artifacts = []*structs.TaskArtifact{&artifact1, &artifact2}

upd, tr := testTaskRunnerFromAlloc(false, alloc)
tr.MarkReceived()
go tr.Run()
defer tr.Destroy()
defer tr.ctx.AllocDir.Destroy()
Expand Down Expand Up @@ -317,6 +324,7 @@ func TestTaskRunner_Download_Retries(t *testing.T) {
}

upd, tr := testTaskRunnerFromAlloc(true, alloc)
tr.MarkReceived()
go tr.Run()
defer tr.Destroy()
defer tr.ctx.AllocDir.Destroy()
Expand Down

0 comments on commit 21abd52

Please sign in to comment.