Skip to content

Commit

Permalink
kill taskrunners if all tasks are dead
Browse files Browse the repository at this point in the history
  • Loading branch information
lgfa29 committed Aug 16, 2022
1 parent 464279e commit 0bec51f
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 31 deletions.
80 changes: 51 additions & 29 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,40 +546,62 @@ func (ar *allocRunner) handleTaskStateUpdates() {
}
}

// if all live runners are sidecars - kill alloc
if killEvent == nil && hasSidecars && !hasNonSidecarTasks(liveRunners) {
killEvent = structs.NewTaskEvent(structs.TaskMainDead)
}
if len(liveRunners) == 0 {
// If there are not live runners left kill all task runners to
// unblock them from the alloc restart block.
for _, tr := range ar.tasks {
if tr.TaskState().State != structs.TaskStateDead {
continue
}

// If there's a kill event set and live runners, kill them
if killEvent != nil && len(liveRunners) > 0 {

// Log kill reason
switch killEvent.Type {
case structs.TaskLeaderDead:
ar.logger.Debug("leader task dead, destroying all tasks", "leader_task", killTask)
case structs.TaskMainDead:
ar.logger.Debug("main tasks dead, destroying all sidecar tasks")
default:
ar.logger.Debug("task failure, destroying all tasks", "failed_task", killTask)
select {
case <-tr.WaitCh():
default:
// Kill task runner without setting an event because the
// task is already dead, it's just waiting in the alloc
// restart loop.
err := tr.Kill(context.TODO(), nil)
if err != nil {
ar.logger.Warn("failed to kill task", "task", tr.Task().Name, "error", err)
}
}
}

// Emit kill event for live runners
for _, tr := range liveRunners {
tr.EmitEvent(killEvent)
} else {
// if all live runners are sidecars - kill alloc
if killEvent == nil && hasSidecars && !hasNonSidecarTasks(liveRunners) {
killEvent = structs.NewTaskEvent(structs.TaskMainDead)
}

// Kill 'em all
states = ar.killTasks()
// If there's a kill event set and live runners, kill them
if killEvent != nil {

// Log kill reason
switch killEvent.Type {
case structs.TaskLeaderDead:
ar.logger.Debug("leader task dead, destroying all tasks", "leader_task", killTask)
case structs.TaskMainDead:
ar.logger.Debug("main tasks dead, destroying all sidecar tasks")
default:
ar.logger.Debug("task failure, destroying all tasks", "failed_task", killTask)
}

// Wait for TaskRunners to exit before continuing to
// prevent looping before TaskRunners have transitioned
// to Dead.
for _, tr := range liveRunners {
ar.logger.Info("killing task", "task", tr.Task().Name)
select {
case <-tr.WaitCh():
case <-ar.waitCh:
// Emit kill event for live runners
for _, tr := range liveRunners {
tr.EmitEvent(killEvent)
}

// Kill 'em all
states = ar.killTasks()

// Wait for TaskRunners to exit before continuing to
// prevent looping before TaskRunners have transitioned
// to Dead.
for _, tr := range liveRunners {
ar.logger.Info("killing task", "task", tr.Task().Name)
select {
case <-tr.WaitCh():
case <-ar.waitCh:
}
}
}
}
Expand Down
7 changes: 5 additions & 2 deletions client/allocrunner/taskrunner/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,17 @@ func (tr *TaskRunner) Signal(event *structs.TaskEvent, s string) error {
// Kill a task. Blocks until task exits or context is canceled. State is set to
// dead.
func (tr *TaskRunner) Kill(ctx context.Context, event *structs.TaskEvent) error {
tr.logger.Trace("Kill requested", "event_type", event.Type, "event_reason", event.KillReason)
tr.logger.Trace("Kill requested")

// Cancel the task runner to break out of restart delay or the main run
// loop.
tr.killCtxCancel()

// Emit kill event
tr.EmitEvent(event)
if event != nil {
tr.logger.Trace("Kill event", "event_type", event.Type, "event_reason", event.KillReason)
tr.EmitEvent(event)
}

select {
case <-tr.WaitCh():
Expand Down

0 comments on commit 0bec51f

Please sign in to comment.