Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: When a task fails, kill all other tasks in the task group #962

Merged
merged 2 commits into from
Mar 26, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 13 additions & 24 deletions client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,7 @@ type AllocRunner struct {
restored map[string]struct{}
taskLock sync.RWMutex

// taskReceivedTimer is used to mitigate updates sent to the server because
// we expect that shortly after receiving an alloc it will transistion
// state. We use a timer to send the update if this hasn't happened after a
// reasonable time.
taskReceivedTimer *time.Timer
taskStatusLock sync.RWMutex
taskStatusLock sync.RWMutex

updateCh chan *structs.Allocation

Expand Down Expand Up @@ -312,25 +307,18 @@ func (r *AllocRunner) setTaskState(taskName, state string, event *structs.TaskEv
taskState.State = state
r.appendTaskEvent(taskState, event)

// We don't immediately mark ourselves as dirty, since in most cases there
// will immediately be another state transistion. This reduces traffic to
// the server.
if event != nil && event.Type == structs.TaskReceived {
if r.taskReceivedTimer == nil {
r.taskReceivedTimer = time.AfterFunc(taskReceivedSyncLimit, func() {
// Send a dirty signal to sync our state.
select {
case r.dirtyCh <- struct{}{}:
default:
}
})
// If the task failed, we should kill all the other tasks in the task group.
if state == structs.TaskStateDead && taskState.Failed() {
var destroyingTasks []string
for task, tr := range r.tasks {
if task != taskName {
destroyingTasks = append(destroyingTasks, task)
tr.Destroy()
}
}
if len(destroyingTasks) > 0 {
r.logger.Printf("[DEBUG] client: task %q failed, destroying other tasks in task group: %v", taskName, destroyingTasks)
}
return
}

// Cancel any existing received state timer.
if r.taskReceivedTimer != nil {
r.taskReceivedTimer.Stop()
}

select {
Expand Down Expand Up @@ -405,6 +393,7 @@ func (r *AllocRunner) Run() {
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx, r.Alloc(),
task.Copy())
r.tasks[task.Name] = tr
tr.MarkReceived()
go tr.Run()
}
r.taskLock.Unlock()
Expand Down
58 changes: 56 additions & 2 deletions client/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,19 +279,20 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) {
func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) {
ctestutil.ExecCompatible(t)
upd, ar := testAllocRunner(false)
ar.logger = prefixedTestLogger("ar1: ")

// Ensure task takes some time
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
task.Config["command"] = "/bin/sleep"
task.Config["args"] = []string{"10"}
task.Config["args"] = []string{"1000"}
go ar.Run()

testutil.WaitForResult(func() (bool, error) {
if upd.Count == 0 {
return false, fmt.Errorf("No updates")
}
last := upd.Allocs[upd.Count-1]
if last.ClientStatus == structs.AllocClientStatusRunning {
if last.ClientStatus != structs.AllocClientStatusRunning {
return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusRunning)
}
return true, nil
Expand Down Expand Up @@ -322,11 +323,13 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) {
// Create a new alloc runner
ar2 := NewAllocRunner(ar.logger, ar.config, upd.Update,
&structs.Allocation{ID: ar.alloc.ID})
ar2.logger = prefixedTestLogger("ar2: ")
err = ar2.RestoreState()
if err != nil {
t.Fatalf("err: %v", err)
}
go ar2.Run()
ar2.logger.Println("[TESTING] starting second alloc runner")

testutil.WaitForResult(func() (bool, error) {
// Check the state still exists
Expand All @@ -345,6 +348,7 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) {
})

// Send the destroy signal and ensure the AllocRunner cleans up.
ar2.logger.Println("[TESTING] destroying second alloc runner")
ar2.Destroy()

testutil.WaitForResult(func() (bool, error) {
Expand Down Expand Up @@ -377,3 +381,53 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) {
t.Fatalf("err: %v", err)
})
}

func TestAllocRunner_TaskFailed_KillTG(t *testing.T) {
ctestutil.ExecCompatible(t)
upd, ar := testAllocRunner(false)

// Create two tasks in the task group
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
task.Config["command"] = "/bin/sleep"
task.Config["args"] = []string{"1000"}

task2 := ar.alloc.Job.TaskGroups[0].Tasks[0].Copy()
task2.Name = "task 2"
task2.Config = map[string]interface{}{"command": "invalidBinaryToFail"}
ar.alloc.Job.TaskGroups[0].Tasks = append(ar.alloc.Job.TaskGroups[0].Tasks, task2)
ar.alloc.TaskResources[task2.Name] = task2.Resources
//t.Logf("%#v", ar.alloc.Job.TaskGroups[0])
go ar.Run()

testutil.WaitForResult(func() (bool, error) {
if upd.Count == 0 {
return false, fmt.Errorf("No updates")
}
last := upd.Allocs[upd.Count-1]
if last.ClientStatus != structs.AllocClientStatusFailed {
return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusFailed)
}

// Task One should be killed
state1 := last.TaskStates[task.Name]
if state1.State != structs.TaskStateDead {
return false, fmt.Errorf("got state %v; want %v", state1.State, structs.TaskStateDead)
}
if lastE := state1.Events[len(state1.Events)-1]; lastE.Type != structs.TaskKilled {
return false, fmt.Errorf("got last event %v; want %v", lastE.Type, structs.TaskKilled)
}

// Task Two should be failed
state2 := last.TaskStates[task2.Name]
if state2.State != structs.TaskStateDead {
return false, fmt.Errorf("got state %v; want %v", state2.State, structs.TaskStateDead)
}
if !state2.Failed() {
return false, fmt.Errorf("task2 should have failed")
}

return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
12 changes: 7 additions & 5 deletions client/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,14 @@ func NewTaskRunner(logger *log.Logger, config *config.Config,
waitCh: make(chan struct{}),
}

// Set the state to pending.
tc.updater(task.Name, structs.TaskStatePending, structs.NewTaskEvent(structs.TaskReceived))
return tc
}

// MarkReceived marks the task as received.
func (r *TaskRunner) MarkReceived() {
r.updater(r.task.Name, structs.TaskStatePending, structs.NewTaskEvent(structs.TaskReceived))
}

// WaitCh returns a channel to wait for termination
func (r *TaskRunner) WaitCh() <-chan struct{} {
return r.waitCh
Expand Down Expand Up @@ -270,9 +273,8 @@ func (r *TaskRunner) run() {
err := fmt.Errorf("task directory couldn't be found")
r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskDriverFailure).SetDriverError(err))
r.logger.Printf("[ERR] client: task directory for alloc %q task %q couldn't be found", r.alloc.ID, r.task.Name)

// Non-restartable error
return
r.restartTracker.SetStartError(err)
goto RESTART
}

for _, artifact := range r.task.Artifacts {
Expand Down
10 changes: 9 additions & 1 deletion client/task_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@ import (
)

func testLogger() *log.Logger {
return log.New(os.Stderr, "", log.LstdFlags)
return prefixedTestLogger("")
}

func prefixedTestLogger(prefix string) *log.Logger {
return log.New(os.Stderr, prefix, log.LstdFlags)
}

type MockTaskStateUpdater struct {
Expand Down Expand Up @@ -64,6 +68,7 @@ func testTaskRunnerFromAlloc(restarts bool, alloc *structs.Allocation) (*MockTas
func TestTaskRunner_SimpleRun(t *testing.T) {
ctestutil.ExecCompatible(t)
upd, tr := testTaskRunner(false)
tr.MarkReceived()
go tr.Run()
defer tr.Destroy()
defer tr.ctx.AllocDir.Destroy()
Expand Down Expand Up @@ -98,6 +103,7 @@ func TestTaskRunner_SimpleRun(t *testing.T) {
func TestTaskRunner_Destroy(t *testing.T) {
ctestutil.ExecCompatible(t)
upd, tr := testTaskRunner(true)
tr.MarkReceived()
defer tr.ctx.AllocDir.Destroy()

// Change command to ensure we run for a bit
Expand Down Expand Up @@ -253,6 +259,7 @@ func TestTaskRunner_Download_List(t *testing.T) {
task.Artifacts = []*structs.TaskArtifact{&artifact1, &artifact2}

upd, tr := testTaskRunnerFromAlloc(false, alloc)
tr.MarkReceived()
go tr.Run()
defer tr.Destroy()
defer tr.ctx.AllocDir.Destroy()
Expand Down Expand Up @@ -317,6 +324,7 @@ func TestTaskRunner_Download_Retries(t *testing.T) {
}

upd, tr := testTaskRunnerFromAlloc(true, alloc)
tr.MarkReceived()
go tr.Run()
defer tr.Destroy()
defer tr.ctx.AllocDir.Destroy()
Expand Down