Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lifecycle: add poststop hook #8194

Merged
merged 2 commits into from
Nov 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,7 @@ type DispatchPayloadConfig struct {
const (
TaskLifecycleHookPrestart = "prestart"
TaskLifecycleHookPoststart = "poststart"
TaskLifecycleHookPoststop = "poststop"
)

type TaskLifecycle struct {
Expand Down
24 changes: 22 additions & 2 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,12 +353,26 @@ func (ar *allocRunner) shouldRun() bool {

// runTasks is used to run the task runners and block until they exit.
func (ar *allocRunner) runTasks() {
// Start all tasks
for _, task := range ar.tasks {
go task.Run()
}

// Block on all tasks except poststop tasks
for _, task := range ar.tasks {
<-task.WaitCh()
if !task.IsPoststopTask() {
<-task.WaitCh()
}
}

// Signal poststop tasks to proceed to main runtime
ar.taskHookCoordinator.StartPoststopTasks()

// Wait for poststop tasks to finish before proceeding
for _, task := range ar.tasks {
if task.IsPoststopTask() {
<-task.WaitCh()
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is responsible for starting poststop tasks after the other tasks have finished.

This covers the nomad job stop case:

  • wait until existing tasks have finished running
  • start poststop tasks
  • wait until poststop tasks finish

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this compare to having the coordinator tracking main task completion? The coordinator logic now already tracks task completions (so main tasks start after pre-start) - can we reuse the same state machine rather than add the logic here?

Copy link
Contributor Author

@jazzyfresh jazzyfresh Nov 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was discussed offline & the goal is to do a refactor in another pull request to move the poststop specific code out of the alloc runner & back into the task hook coordinator. The reason it was avoided this time around because I didn't know how to block progression of the allocrunner from exiting during a service job stop & still signal the poststop tasks to start

}
}

Expand Down Expand Up @@ -485,6 +499,10 @@ func (ar *allocRunner) handleTaskStateUpdates() {
state := tr.TaskState()
states[name] = state

if tr.IsPoststopTask() {
continue
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Poststop tasks have special kill behavior: i.e. we don't want to kill them if we receive a kill signal from nomad job stop. We want to wait until everything else has been killed, then we want to run the poststop tasks.

Copy link
Contributor Author

@jazzyfresh jazzyfresh Oct 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Skipping the rest of this loop essentially says: don't track poststop tasks along with the main group of tasks (in the set of liverunners), that way we won't log them later as being killed when there is a kill event

// Capture live task runners in case we need to kill them
if state.State != structs.TaskStateDead {
liveRunners = append(liveRunners, tr)
Expand Down Expand Up @@ -535,6 +553,7 @@ func (ar *allocRunner) handleTaskStateUpdates() {
// prevent looping before TaskRunners have transitioned
// to Dead.
for _, tr := range liveRunners {
ar.logger.Info("killing task: ", tr.Task().Name)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this actually happen at the info level? My expectation is that those messages will actually hit the console in a default config, and adding a new sort of tracing/debug message here (w/o putting it at debug log level) could create log noise that most operators won't know what to do with.

select {
case <-tr.WaitCh():
case <-ar.waitCh:
Expand Down Expand Up @@ -586,7 +605,8 @@ func (ar *allocRunner) killTasks() map[string]*structs.TaskState {
// Kill the rest concurrently
wg := sync.WaitGroup{}
for name, tr := range ar.tasks {
if tr.IsLeader() {
// Filter out poststop tasks so they run after all the other tasks are killed
if tr.IsLeader() || tr.IsPoststopTask() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the line that actually prevents poststop tasks from being killed in a kill event.

Copy link

@rcoder rcoder Oct 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The checks for IsPoststopTask() spread throughout the code, especially when they're part of a multi-clause boolean check, strike me as a bit low-level and specific to this task type, vs. capturing a general property of the tasks.

Would it be reasonable to explicitly define interface methods for e.g. tr.isReadyToStart() and tr.isReadyToKill() (or similar) that hid the property checks behind the task runner interface? (OTOH, this could easily be future-proofing something we don't care about yet, or run contrary to our standard practices around expanding interface footprint.)

continue
}

Expand Down
83 changes: 83 additions & 0 deletions client/allocrunner/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,89 @@ func TestAllocRunner_TaskMain_KillTG(t *testing.T) {
})
}

// TestAllocRunner_Lifecycle_Poststop asserts that a service job with 1
// postop lifecycle hook starts all 3 tasks, only
// the ephemeral one finishes, and the other 2 exit when the alloc is stopped.
func TestAllocRunner_Lifecycle_Poststop(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest changing the name of this test - as this tests stop behavior - so TestAllocRunner_Lifecycle_Poststop_IfStopped?

Also, should add a test if main tasks complete naturally, for batch and service jobs.
Also, should add a test for when

alloc := mock.LifecycleAlloc()
tr := alloc.AllocatedResources.Tasks[alloc.Job.TaskGroups[0].Tasks[0].Name]

alloc.Job.Type = structs.JobTypeService
mainTask := alloc.Job.TaskGroups[0].Tasks[0]
mainTask.Config["run_for"] = "100s"

ephemeralTask := alloc.Job.TaskGroups[0].Tasks[1]
ephemeralTask.Name = "quit"
ephemeralTask.Lifecycle.Hook = structs.TaskLifecycleHookPoststop
ephemeralTask.Config["run_for"] = "10s"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these numbers wall-clock time? (I.e., will this test take a minimum of 10 seconds to run?) It may not be a big deal in isolation, but actual blocking sleep calls scattered throughout a test suite can pile up and make tests slooooowwww.


alloc.Job.TaskGroups[0].Tasks = []*structs.Task{mainTask, ephemeralTask}
alloc.AllocatedResources.Tasks = map[string]*structs.AllocatedTaskResources{
mainTask.Name: tr,
ephemeralTask.Name: tr,
}

conf, cleanup := testAllocRunnerConfig(t, alloc)
defer cleanup()
ar, err := NewAllocRunner(conf)
require.NoError(t, err)
defer destroy(ar)
go ar.Run()

upd := conf.StateUpdater.(*MockStateUpdater)

// Wait for main task to be running
testutil.WaitForResult(func() (bool, error) {
last := upd.Last()
if last == nil {
return false, fmt.Errorf("No updates")
}

if last.ClientStatus != structs.AllocClientStatusRunning {
return false, fmt.Errorf("expected alloc to be running not %s", last.ClientStatus)
}

if s := last.TaskStates[mainTask.Name].State; s != structs.TaskStateRunning {
return false, fmt.Errorf("expected main task to be running not %s", s)
}

if s := last.TaskStates[ephemeralTask.Name].State; s != structs.TaskStatePending {
return false, fmt.Errorf("expected ephemeral task to be pending not %s", s)
}

return true, nil
}, func(err error) {
t.Fatalf("error waiting for initial state:\n%v", err)
})

// Tell the alloc to stop
stopAlloc := alloc.Copy()
stopAlloc.DesiredStatus = structs.AllocDesiredStatusStop
ar.Update(stopAlloc)

// Wait for main task to die & poststop task to run.
testutil.WaitForResult(func() (bool, error) {
last := upd.Last()

if last.ClientStatus != structs.AllocClientStatusRunning {
return false, fmt.Errorf("expected alloc to be running not %s", last.ClientStatus)
}

if s := last.TaskStates[mainTask.Name].State; s != structs.TaskStateDead {
return false, fmt.Errorf("expected main task to be dead not %s", s)
}

if s := last.TaskStates[ephemeralTask.Name].State; s != structs.TaskStateRunning {
return false, fmt.Errorf("expected poststop task to be running not %s", s)
}

return true, nil
}, func(err error) {
t.Fatalf("error waiting for initial state:\n%v", err)
})

}

func TestAllocRunner_TaskGroup_ShutdownDelay(t *testing.T) {
t.Parallel()

Expand Down
44 changes: 41 additions & 3 deletions client/allocrunner/task_hook_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,29 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
)

// TaskHookCoordinator helps coordinate when main start tasks can launch
// TaskHookCoordinator helps coordinate when mainTasks start tasks can launch
// namely after all Prestart Tasks have run, and after all BlockUntilCompleted have completed
type taskHookCoordinator struct {
logger hclog.Logger

// constant for quickly starting all prestart tasks
closedCh chan struct{}

// Each context is used to gate task runners launching the tasks. A task
// runner waits until the context associated its lifecycle context is
// done/cancelled.
mainTaskCtx context.Context
mainTaskCtxCancel func()

poststartTaskCtx context.Context
poststartTaskCtxCancel func()
poststopTaskCtx context.Context
poststopTaskCtxCancel context.CancelFunc

prestartSidecar map[string]struct{}
prestartEphemeral map[string]struct{}
mainTasksPending map[string]struct{}
mainTasksRunning map[string]struct{} // poststop: main tasks running -> finished
mainTasksPending map[string]struct{} // poststart: main tasks pending -> running
}

func newTaskHookCoordinator(logger hclog.Logger, tasks []*structs.Task) *taskHookCoordinator {
Expand All @@ -32,6 +39,7 @@ func newTaskHookCoordinator(logger hclog.Logger, tasks []*structs.Task) *taskHoo

mainTaskCtx, mainCancelFn := context.WithCancel(context.Background())
poststartTaskCtx, poststartCancelFn := context.WithCancel(context.Background())
poststopTaskCtx, poststopTaskCancelFn := context.WithCancel(context.Background())

c := &taskHookCoordinator{
logger: logger,
Expand All @@ -40,9 +48,12 @@ func newTaskHookCoordinator(logger hclog.Logger, tasks []*structs.Task) *taskHoo
mainTaskCtxCancel: mainCancelFn,
prestartSidecar: map[string]struct{}{},
prestartEphemeral: map[string]struct{}{},
mainTasksRunning: map[string]struct{}{},
mainTasksPending: map[string]struct{}{},
poststartTaskCtx: poststartTaskCtx,
poststartTaskCtxCancel: poststartCancelFn,
poststopTaskCtx: poststopTaskCtx,
poststopTaskCtxCancel: poststopTaskCancelFn,
}
c.setTasks(tasks)
return c
Expand All @@ -53,6 +64,7 @@ func (c *taskHookCoordinator) setTasks(tasks []*structs.Task) {

if task.Lifecycle == nil {
c.mainTasksPending[task.Name] = struct{}{}
c.mainTasksRunning[task.Name] = struct{}{}
continue
}

Expand All @@ -65,6 +77,8 @@ func (c *taskHookCoordinator) setTasks(tasks []*structs.Task) {
}
case structs.TaskLifecycleHookPoststart:
// Poststart hooks don't need to be tracked.
case structs.TaskLifecycleHookPoststop:
// Poststop hooks don't need to be tracked.
jazzyfresh marked this conversation as resolved.
Show resolved Hide resolved
default:
c.logger.Error("invalid lifecycle hook", "task", task.Name, "hook", task.Lifecycle.Hook)
}
Expand All @@ -79,6 +93,10 @@ func (c *taskHookCoordinator) hasPrestartTasks() bool {
return len(c.prestartSidecar)+len(c.prestartEphemeral) > 0
}

func (c *taskHookCoordinator) hasRunningMainTasks() bool {
return len(c.mainTasksRunning) > 0
}

func (c *taskHookCoordinator) hasPendingMainTasks() bool {
return len(c.mainTasksPending) > 0
}
Expand All @@ -94,7 +112,11 @@ func (c *taskHookCoordinator) startConditionForTask(task *structs.Task) <-chan s
return c.closedCh
case structs.TaskLifecycleHookPoststart:
return c.poststartTaskCtx.Done()
case structs.TaskLifecycleHookPoststop:
return c.poststopTaskCtx.Done()
default:
// it should never have a lifecycle stanza w/o a hook, so report an error but allow the task to start normally
c.logger.Error("invalid lifecycle hook", "task", task.Name, "hook", task.Lifecycle.Hook)
return c.mainTaskCtx.Done()
}
}
Expand All @@ -119,6 +141,16 @@ func (c *taskHookCoordinator) taskStateUpdated(states map[string]*structs.TaskSt
delete(c.prestartEphemeral, task)
}

for task := range c.mainTasksRunning {
st := states[task]

if st == nil || st.State != structs.TaskStateDead {
continue
}

delete(c.mainTasksRunning, task)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a main task is dead, remove from the set (when all tasks are removed from the set, poststop tasks may proceed with execution)

for task := range c.mainTasksPending {
st := states[task]
if st == nil || st.StartedAt.IsZero() {
Expand All @@ -128,14 +160,20 @@ func (c *taskHookCoordinator) taskStateUpdated(states map[string]*structs.TaskSt
delete(c.mainTasksPending, task)
}

// everything well
if !c.hasPrestartTasks() {
c.mainTaskCtxCancel()
}

if !c.hasPendingMainTasks() {
c.poststartTaskCtxCancel()
}
if !c.hasRunningMainTasks() {
c.poststopTaskCtxCancel()
}
}

func (c *taskHookCoordinator) StartPoststopTasks() {
c.poststopTaskCtxCancel()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs comment: helper function for starting poststop tasks outside of the handleTaskStateUpdate() infinite loop

}

// hasNonSidecarTasks returns false if all the passed tasks are sidecar tasks
Expand Down
5 changes: 5 additions & 0 deletions client/allocrunner/taskrunner/restarts/restarts.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ func NewRestartTracker(policy *structs.RestartPolicy, jobType string, tlc *struc
onSuccess = tlc.Sidecar
}

// Poststop should never be restarted on success
if tlc != nil && tlc.Hook == structs.TaskLifecycleHookPoststop {
onSuccess = false
}

return &RestartTracker{
startTime: time.Now(),
onSuccess: onSuccess,
Expand Down
15 changes: 14 additions & 1 deletion client/allocrunner/taskrunner/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,14 +495,15 @@ func (tr *TaskRunner) Run() {

select {
case <-tr.startConditionMetCtx:
tr.logger.Debug("lifecycle start condition has been met, proceeding")
// yay proceed
case <-tr.killCtx.Done():
case <-tr.shutdownCtx.Done():
return
}

MAIN:
for !tr.Alloc().TerminalStatus() {
for !tr.shouldShutdown() {
select {
case <-tr.killCtx.Done():
break MAIN
Expand Down Expand Up @@ -625,6 +626,18 @@ MAIN:
tr.logger.Debug("task run loop exiting")
}

func (tr *TaskRunner) shouldShutdown() bool {
if tr.alloc.ClientTerminalStatus() {
return true
}

if !tr.IsPoststopTask() && tr.alloc.ServerTerminalStatus() {
return true
}

return false
}

// handleTaskExitResult handles the results returned by the task exiting. If
// retryWait is true, the caller should attempt to wait on the task again since
// it has not actually finished running. This can happen if the driver plugin
Expand Down
5 changes: 5 additions & 0 deletions client/allocrunner/taskrunner/task_runner_getters.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ func (tr *TaskRunner) IsLeader() bool {
return tr.taskLeader
}

// IsPoststopTask returns true if this task is a poststop task in its task group.
func (tr *TaskRunner) IsPoststopTask() bool {
return tr.Task().Lifecycle != nil && tr.Task().Lifecycle.Hook == structs.TaskLifecycleHookPoststop
}

func (tr *TaskRunner) Task() *structs.Task {
tr.taskLock.RLock()
defer tr.taskLock.RUnlock()
Expand Down
3 changes: 1 addition & 2 deletions client/allocrunner/taskrunner/task_runner_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,7 @@ func (tr *TaskRunner) emitHookError(err error, hookName string) {
func (tr *TaskRunner) prestart() error {
// Determine if the allocation is terminal and we should avoid running
// prestart hooks.
alloc := tr.Alloc()
if alloc.TerminalStatus() {
if tr.shouldShutdown() {
tr.logger.Trace("skipping prestart hooks since allocation is terminal")
return nil
}
Expand Down
Loading