Skip to content

Commit

Permalink
kill tasks in alloc when one fails
Browse files Browse the repository at this point in the history
  • Loading branch information
dadgar committed Mar 23, 2016
1 parent 186b761 commit ee4a381
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 32 deletions.
31 changes: 7 additions & 24 deletions client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,7 @@ type AllocRunner struct {
restored map[string]struct{}
taskLock sync.RWMutex

// taskReceivedTimer is used to mitigate updates sent to the server because
// we expect that shortly after receiving an alloc it will transistion
// state. We use a timer to send the update if this hasn't happened after a
// reasonable time.
taskReceivedTimer *time.Timer
taskStatusLock sync.RWMutex
taskStatusLock sync.RWMutex

updateCh chan *structs.Allocation

Expand Down Expand Up @@ -314,25 +309,13 @@ func (r *AllocRunner) setTaskState(taskName, state string, event *structs.TaskEv
taskState.State = state
r.appendTaskEvent(taskState, event)

// We don't immediately mark ourselves as dirty, since in most cases there
// will immediately be another state transistion. This reduces traffic to
// the server.
if event != nil && event.Type == structs.TaskReceived {
if r.taskReceivedTimer == nil {
r.taskReceivedTimer = time.AfterFunc(taskReceivedSyncLimit, func() {
// Send a dirty signal to sync our state.
select {
case r.dirtyCh <- struct{}{}:
default:
}
})
// If the task failed, we should kill all the other tasks in the task group.
if state == structs.TaskStateDead && taskState.Failed() {
for task, tr := range r.tasks {
if task != taskName {
tr.Destroy()
}
}
return
}

// Cancel any existing received state timer.
if r.taskReceivedTimer != nil {
r.taskReceivedTimer.Stop()
}

select {
Expand Down
2 changes: 1 addition & 1 deletion client/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) {
// Ensure task takes some time
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
task.Config["command"] = "/bin/sleep"
task.Config["args"] = []string{"10"}
task.Config["args"] = []string{"1000"}
go ar.Run()

testutil.WaitForResult(func() (bool, error) {
Expand Down
7 changes: 4 additions & 3 deletions client/driver/logging/rotator.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,8 @@ type FileRotator struct {
logger *log.Logger
purgeCh chan struct{}
doneCh chan struct{}

closed bool
closedLock sync.Mutex
closed bool
closedLock sync.Mutex
}

// NewFileRotator returns a new file rotator
Expand Down Expand Up @@ -208,6 +207,8 @@ func (f *FileRotator) Close() {
}

// Stop the purge go routine
f.closedLock.Lock()
defer f.closedLock.Unlock()
if !f.closed {
f.doneCh <- struct{}{}
close(f.purgeCh)
Expand Down
8 changes: 4 additions & 4 deletions client/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,8 @@ func (r *TaskRunner) run() {
err := fmt.Errorf("task directory couldn't be found")
r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskDriverFailure).SetDriverError(err))
r.logger.Printf("[ERR] client: task directory for alloc %q task %q couldn't be found", r.alloc.ID, r.task.Name)

// Non-restartable error
return
r.restartTracker.SetStartError(err)
goto RESTART
}

for i, artifact := range r.task.Artifacts {
Expand All @@ -249,7 +248,8 @@ func (r *TaskRunner) run() {
structs.NewTaskEvent(structs.TaskArtifactDownloadFailed).SetDownloadError(err))
r.logger.Printf("[ERR] client: allocation %q, task %v, artifact %#v (%v) fails validation: %v",
r.alloc.ID, r.task.Name, artifact, i, err)
return
r.restartTracker.SetStartError(err)
goto RESTART
}

if err := getter.GetArtifact(artifact, taskDir, r.logger); err != nil {
Expand Down

0 comments on commit ee4a381

Please sign in to comment.