From c343cdbcdfb8ad303de79c80514725a85e2ebf6e Mon Sep 17 00:00:00 2001 From: Jasmine Dahilig Date: Mon, 31 Aug 2020 11:11:36 -0700 Subject: [PATCH] task lifecycle poststop for service & system jobs --- client/allocrunner/alloc_runner.go | 22 +++++++++++++++++-- client/allocrunner/task_hook_coordinator.go | 5 ++++- .../taskrunner/restarts/restarts.go | 5 +++++ client/allocrunner/taskrunner/task_runner.go | 14 +++++++++++- .../taskrunner/task_runner_hooks.go | 3 +-- 5 files changed, 43 insertions(+), 6 deletions(-) diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index 36994cf83892..de9992e4504d 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -347,12 +347,26 @@ func (ar *allocRunner) shouldRun() bool { // runTasks is used to run the task runners and block until they exit. func (ar *allocRunner) runTasks() { + // Start all tasks for _, task := range ar.tasks { go task.Run() } + // Block on all tasks except poststop tasks for _, task := range ar.tasks { - <-task.WaitCh() + if !task.IsPoststopTask() { + <-task.WaitCh() + } + } + + // Signal poststop tasks to proceed to main runtime + ar.taskHookCoordinator.StartPoststopTasks() + + // Wait for poststop tasks to finish before proceeding + for _, task := range ar.tasks { + if task.IsPoststopTask() { + <-task.WaitCh() + } } } @@ -515,6 +529,9 @@ func (ar *allocRunner) handleTaskStateUpdates() { // Emit kill event for live runners for _, tr := range liveRunners { + if tr.IsPoststopTask() { + continue + } tr.EmitEvent(killEvent) } @@ -577,7 +594,8 @@ func (ar *allocRunner) killTasks() map[string]*structs.TaskState { // Kill the rest concurrently wg := sync.WaitGroup{} for name, tr := range ar.tasks { - if tr.IsLeader() { + // Filter out poststop tasks so they run after all the other tasks are killed + if tr.IsLeader() || tr.IsPoststopTask() { continue } diff --git a/client/allocrunner/task_hook_coordinator.go b/client/allocrunner/task_hook_coordinator.go index 93483e356115..46b1bc30d80e 100644 --- a/client/allocrunner/task_hook_coordinator.go +++ b/client/allocrunner/task_hook_coordinator.go @@ -129,7 +129,6 @@ func (c *taskHookCoordinator) taskStateUpdated(states map[string]*structs.TaskSt continue } - delete(c.prestartSidecar, task) } @@ -173,6 +172,10 @@ func (c *taskHookCoordinator) taskStateUpdated(states map[string]*structs.TaskSt } } +func (c *taskHookCoordinator) StartPoststopTasks() { + c.poststopTaskCtxCancel() +} + // hasNonSidecarTasks returns false if all the passed tasks are sidecar tasks func hasNonSidecarTasks(tasks []*taskrunner.TaskRunner) bool { for _, tr := range tasks { diff --git a/client/allocrunner/taskrunner/restarts/restarts.go b/client/allocrunner/taskrunner/restarts/restarts.go index 6ee0056ccd8b..100f19b69be6 100644 --- a/client/allocrunner/taskrunner/restarts/restarts.go +++ b/client/allocrunner/taskrunner/restarts/restarts.go @@ -34,6 +34,11 @@ func NewRestartTracker(policy *structs.RestartPolicy, jobType string, tlc *struc onSuccess = tlc.Sidecar } + // Poststop should never be restarted on success + if tlc != nil && tlc.Hook == structs.TaskLifecycleHookPoststop { + onSuccess = false + } + return &RestartTracker{ startTime: time.Now(), onSuccess: onSuccess, diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index 514d0e1ed31c..bb9368c4b88c 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -485,6 +485,7 @@ func (tr *TaskRunner) Run() { select { case <-tr.startConditionMetCtx: + tr.logger.Debug("lifecycle start condition has been met, proceeding", "task", tr.task.Name) // yay proceed case <-tr.killCtx.Done(): case <-tr.shutdownCtx.Done(): @@ -492,7 +493,7 @@ func (tr *TaskRunner) Run() { } MAIN: - for !tr.Alloc().TerminalStatus() { + for !tr.shouldShutdown() { select { case <-tr.killCtx.Done(): break MAIN @@ -615,6 +616,17 @@ MAIN: tr.logger.Debug("task run loop exiting") } +func (tr *TaskRunner) shouldShutdown() bool { + if tr.alloc.ClientTerminalStatus() { + return true + } + + if !tr.IsPoststopTask() && tr.alloc.ServerTerminalStatus() { + return true + } + + return false +} // handleTaskExitResult handles the results returned by the task exiting. If // retryWait is true, the caller should attempt to wait on the task again since // it has not actually finished running. This can happen if the driver plugin diff --git a/client/allocrunner/taskrunner/task_runner_hooks.go b/client/allocrunner/taskrunner/task_runner_hooks.go index 07325c012038..dae9a4c11386 100644 --- a/client/allocrunner/taskrunner/task_runner_hooks.go +++ b/client/allocrunner/taskrunner/task_runner_hooks.go @@ -164,8 +164,7 @@ func (tr *TaskRunner) emitHookError(err error, hookName string) { func (tr *TaskRunner) prestart() error { // Determine if the allocation is terminal and we should avoid running // prestart hooks. - alloc := tr.Alloc() - if alloc.TerminalStatus() { + if tr.shouldShutdown() { tr.logger.Trace("skipping prestart hooks since allocation is terminal") return nil }