Skip to content

Commit

Permalink
task lifecycle poststop for service & system jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
jazzyfresh committed Aug 31, 2020
1 parent 13ad551 commit c343cdb
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 6 deletions.
22 changes: 20 additions & 2 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,12 +347,26 @@ func (ar *allocRunner) shouldRun() bool {

// runTasks is used to run the task runners and block until they exit.
func (ar *allocRunner) runTasks() {
// Start all tasks
for _, task := range ar.tasks {
go task.Run()
}

// Block on all tasks except poststop tasks
for _, task := range ar.tasks {
<-task.WaitCh()
if !task.IsPoststopTask() {
<-task.WaitCh()
}
}

// Signal poststop tasks to proceed to main runtime
ar.taskHookCoordinator.StartPoststopTasks()

// Wait for poststop tasks to finish before proceeding
for _, task := range ar.tasks {
if task.IsPoststopTask() {
<-task.WaitCh()
}
}
}

Expand Down Expand Up @@ -515,6 +529,9 @@ func (ar *allocRunner) handleTaskStateUpdates() {

// Emit kill event for live runners
for _, tr := range liveRunners {
if tr.IsPoststopTask() {
continue
}
tr.EmitEvent(killEvent)
}

Expand Down Expand Up @@ -577,7 +594,8 @@ func (ar *allocRunner) killTasks() map[string]*structs.TaskState {
// Kill the rest concurrently
wg := sync.WaitGroup{}
for name, tr := range ar.tasks {
if tr.IsLeader() {
// Filter out poststop tasks so they run after all the other tasks are killed
if tr.IsLeader() || tr.IsPoststopTask() {
continue
}

Expand Down
5 changes: 4 additions & 1 deletion client/allocrunner/task_hook_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ func (c *taskHookCoordinator) taskStateUpdated(states map[string]*structs.TaskSt
continue
}


delete(c.prestartSidecar, task)
}

Expand Down Expand Up @@ -173,6 +172,10 @@ func (c *taskHookCoordinator) taskStateUpdated(states map[string]*structs.TaskSt
}
}

func (c *taskHookCoordinator) StartPoststopTasks() {
c.poststopTaskCtxCancel()
}

// hasNonSidecarTasks returns false if all the passed tasks are sidecar tasks
func hasNonSidecarTasks(tasks []*taskrunner.TaskRunner) bool {
for _, tr := range tasks {
Expand Down
5 changes: 5 additions & 0 deletions client/allocrunner/taskrunner/restarts/restarts.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ func NewRestartTracker(policy *structs.RestartPolicy, jobType string, tlc *struc
onSuccess = tlc.Sidecar
}

// Poststop should never be restarted on success
if tlc != nil && tlc.Hook == structs.TaskLifecycleHookPoststop {
onSuccess = false
}

return &RestartTracker{
startTime: time.Now(),
onSuccess: onSuccess,
Expand Down
14 changes: 13 additions & 1 deletion client/allocrunner/taskrunner/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,14 +485,15 @@ func (tr *TaskRunner) Run() {

select {
case <-tr.startConditionMetCtx:
tr.logger.Debug("lifecycle start condition has been met, proceeding", "task", tr.task.Name)
// yay proceed
case <-tr.killCtx.Done():
case <-tr.shutdownCtx.Done():
return
}

MAIN:
for !tr.Alloc().TerminalStatus() {
for !tr.shouldShutdown() {
select {
case <-tr.killCtx.Done():
break MAIN
Expand Down Expand Up @@ -615,6 +616,17 @@ MAIN:
tr.logger.Debug("task run loop exiting")
}

func (tr *TaskRunner) shouldShutdown() bool {
if tr.alloc.ClientTerminalStatus() {
return true
}

if !tr.IsPoststopTask() && tr.alloc.ServerTerminalStatus() {
return true
}

return false
}
// handleTaskExitResult handles the results returned by the task exiting. If
// retryWait is true, the caller should attempt to wait on the task again since
// it has not actually finished running. This can happen if the driver plugin
Expand Down
3 changes: 1 addition & 2 deletions client/allocrunner/taskrunner/task_runner_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,7 @@ func (tr *TaskRunner) emitHookError(err error, hookName string) {
func (tr *TaskRunner) prestart() error {
// Determine if the allocation is terminal and we should avoid running
// prestart hooks.
alloc := tr.Alloc()
if alloc.TerminalStatus() {
if tr.shouldShutdown() {
tr.logger.Trace("skipping prestart hooks since allocation is terminal")
return nil
}
Expand Down

0 comments on commit c343cdb

Please sign in to comment.