Skip to content

Commit

Permalink
Merge pull request #8390 from hashicorp/lifecycle-poststart-hook
Browse files Browse the repository at this point in the history
task lifecycle poststart hook
  • Loading branch information
jazzyfresh committed Aug 31, 2020
2 parents 2740b48 + 81cad55 commit 8faece3
Show file tree
Hide file tree
Showing 11 changed files with 683 additions and 33 deletions.
1 change: 1 addition & 0 deletions api/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,7 @@ type DispatchPayloadConfig struct {

const (
TaskLifecycleHookPrestart = "prestart"
TaskLifecycleHookPoststart = "poststart"
)

type TaskLifecycle struct {
Expand Down
157 changes: 144 additions & 13 deletions client/allocrunner/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,100 @@ func TestAllocRunner_TaskLeader_KillTG(t *testing.T) {
})
}

// TestAllocRunner_Lifecycle_Poststart asserts that a service job with 2
// poststart lifecycle hooks (1 sidecar, 1 ephemeral) starts all 3 tasks, only
// the ephemeral one finishes, and the other 2 exit when the alloc is stopped.
func TestAllocRunner_Lifecycle_Poststart(t *testing.T) {
alloc := mock.LifecycleAlloc()

alloc.Job.Type = structs.JobTypeService
mainTask := alloc.Job.TaskGroups[0].Tasks[0]
mainTask.Config["run_for"] = "100s"

sidecarTask := alloc.Job.TaskGroups[0].Tasks[1]
sidecarTask.Lifecycle.Hook = structs.TaskLifecycleHookPoststart
sidecarTask.Config["run_for"] = "100s"

ephemeralTask := alloc.Job.TaskGroups[0].Tasks[2]
ephemeralTask.Lifecycle.Hook = structs.TaskLifecycleHookPoststart

conf, cleanup := testAllocRunnerConfig(t, alloc)
defer cleanup()
ar, err := NewAllocRunner(conf)
require.NoError(t, err)
defer destroy(ar)
go ar.Run()

upd := conf.StateUpdater.(*MockStateUpdater)

// Wait for main and sidecar tasks to be running, and that the
// ephemeral task ran and exited.
testutil.WaitForResult(func() (bool, error) {
last := upd.Last()
if last == nil {
return false, fmt.Errorf("No updates")
}

if last.ClientStatus != structs.AllocClientStatusRunning {
return false, fmt.Errorf("expected alloc to be running not %s", last.ClientStatus)
}

if s := last.TaskStates[mainTask.Name].State; s != structs.TaskStateRunning {
return false, fmt.Errorf("expected main task to be running not %s", s)
}

if s := last.TaskStates[sidecarTask.Name].State; s != structs.TaskStateRunning {
return false, fmt.Errorf("expected sidecar task to be running not %s", s)
}

if s := last.TaskStates[ephemeralTask.Name].State; s != structs.TaskStateDead {
return false, fmt.Errorf("expected ephemeral task to be dead not %s", s)
}

if last.TaskStates[ephemeralTask.Name].Failed {
return false, fmt.Errorf("expected ephemeral task to be successful not failed")
}

return true, nil
}, func(err error) {
t.Fatalf("error waiting for initial state:\n%v", err)
})

// Tell the alloc to stop
stopAlloc := alloc.Copy()
stopAlloc.DesiredStatus = structs.AllocDesiredStatusStop
ar.Update(stopAlloc)

// Wait for main and sidecar tasks to stop.
testutil.WaitForResult(func() (bool, error) {
last := upd.Last()

if last.ClientStatus != structs.AllocClientStatusComplete {
return false, fmt.Errorf("expected alloc to be running not %s", last.ClientStatus)
}

if s := last.TaskStates[mainTask.Name].State; s != structs.TaskStateDead {
return false, fmt.Errorf("expected main task to be dead not %s", s)
}

if last.TaskStates[mainTask.Name].Failed {
return false, fmt.Errorf("expected main task to be successful not failed")
}

if s := last.TaskStates[sidecarTask.Name].State; s != structs.TaskStateDead {
return false, fmt.Errorf("expected sidecar task to be dead not %s", s)
}

if last.TaskStates[sidecarTask.Name].Failed {
return false, fmt.Errorf("expected sidecar task to be successful not failed")
}

return true, nil
}, func(err error) {
t.Fatalf("error waiting for initial state:\n%v", err)
})
}

// TestAllocRunner_TaskMain_KillTG asserts that when main tasks die the
// entire task group is killed.
func TestAllocRunner_TaskMain_KillTG(t *testing.T) {
Expand All @@ -152,20 +246,34 @@ func TestAllocRunner_TaskMain_KillTG(t *testing.T) {
alloc.Job.TaskGroups[0].RestartPolicy.Attempts = 0
alloc.Job.TaskGroups[0].Tasks[0].RestartPolicy.Attempts = 0

// Create three tasks in the task group
sidecar := alloc.Job.TaskGroups[0].Tasks[0].Copy()
sidecar.Name = "sidecar"
sidecar.Driver = "mock_driver"
sidecar.KillTimeout = 10 * time.Millisecond
sidecar.Lifecycle = &structs.TaskLifecycleConfig{
// Create four tasks in the task group
prestart := alloc.Job.TaskGroups[0].Tasks[0].Copy()
prestart.Name = "prestart-sidecar"
prestart.Driver = "mock_driver"
prestart.KillTimeout = 10 * time.Millisecond
prestart.Lifecycle = &structs.TaskLifecycleConfig{
Hook: structs.TaskLifecycleHookPrestart,
Sidecar: true,
}

sidecar.Config = map[string]interface{}{
prestart.Config = map[string]interface{}{
"run_for": "100s",
}

poststart := alloc.Job.TaskGroups[0].Tasks[0].Copy()
poststart.Name = "poststart-sidecar"
poststart.Driver = "mock_driver"
poststart.KillTimeout = 10 * time.Millisecond
poststart.Lifecycle = &structs.TaskLifecycleConfig{
Hook: structs.TaskLifecycleHookPoststart,
Sidecar: true,
}

poststart.Config = map[string]interface{}{
"run_for": "100s",
}

// these two main tasks have the same name, is that ok?
main1 := alloc.Job.TaskGroups[0].Tasks[0].Copy()
main1.Name = "task2"
main1.Driver = "mock_driver"
Expand All @@ -180,11 +288,12 @@ func TestAllocRunner_TaskMain_KillTG(t *testing.T) {
"run_for": "2s",
}

alloc.Job.TaskGroups[0].Tasks = []*structs.Task{sidecar, main1, main2}
alloc.Job.TaskGroups[0].Tasks = []*structs.Task{prestart, poststart, main1, main2}
alloc.AllocatedResources.Tasks = map[string]*structs.AllocatedTaskResources{
sidecar.Name: tr,
main1.Name: tr,
main2.Name: tr,
prestart.Name: tr,
poststart.Name: tr,
main1.Name: tr,
main2.Name: tr,
}

conf, cleanup := testAllocRunnerConfig(t, alloc)
Expand Down Expand Up @@ -217,8 +326,30 @@ func TestAllocRunner_TaskMain_KillTG(t *testing.T) {

var state *structs.TaskState

// Task1 should be killed because Task2 exited
state = last.TaskStates[sidecar.Name]
// both sidecars should be killed because Task2 exited
state = last.TaskStates[prestart.Name]
if state == nil {
return false, fmt.Errorf("could not find state for task %s", prestart.Name)
}
if state.State != structs.TaskStateDead {
return false, fmt.Errorf("got state %v; want %v", state.State, structs.TaskStateDead)
}
if state.FinishedAt.IsZero() || state.StartedAt.IsZero() {
return false, fmt.Errorf("expected to have a start and finish time")
}
if len(state.Events) < 2 {
// At least have a received and destroyed
return false, fmt.Errorf("Unexpected number of events")
}

if !hasTaskMainEvent(state) {
return false, fmt.Errorf("Did not find event %v: %#+v", structs.TaskMainDead, state.Events)
}

state = last.TaskStates[poststart.Name]
if state == nil {
return false, fmt.Errorf("could not find state for task %s", poststart.Name)
}
if state.State != structs.TaskStateDead {
return false, fmt.Errorf("got state %v; want %v", state.State, structs.TaskStateDead)
}
Expand Down
66 changes: 47 additions & 19 deletions client/allocrunner/task_hook_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,31 @@ type taskHookCoordinator struct {
mainTaskCtx context.Context
mainTaskCtxCancel func()

poststartTaskCtx context.Context
poststartTaskCtxCancel func()

prestartSidecar map[string]struct{}
prestartEphemeral map[string]struct{}
mainTasksPending map[string]struct{}
}

func newTaskHookCoordinator(logger hclog.Logger, tasks []*structs.Task) *taskHookCoordinator {
closedCh := make(chan struct{})
close(closedCh)

mainTaskCtx, cancelFn := context.WithCancel(context.Background())
mainTaskCtx, mainCancelFn := context.WithCancel(context.Background())
poststartTaskCtx, poststartCancelFn := context.WithCancel(context.Background())

c := &taskHookCoordinator{
logger: logger,
closedCh: closedCh,
mainTaskCtx: mainTaskCtx,
mainTaskCtxCancel: cancelFn,
prestartSidecar: map[string]struct{}{},
prestartEphemeral: map[string]struct{}{},
logger: logger,
closedCh: closedCh,
mainTaskCtx: mainTaskCtx,
mainTaskCtxCancel: mainCancelFn,
prestartSidecar: map[string]struct{}{},
prestartEphemeral: map[string]struct{}{},
mainTasksPending: map[string]struct{}{},
poststartTaskCtx: poststartTaskCtx,
poststartTaskCtxCancel: poststartCancelFn,
}
c.setTasks(tasks)
return c
Expand All @@ -44,7 +52,7 @@ func (c *taskHookCoordinator) setTasks(tasks []*structs.Task) {
for _, task := range tasks {

if task.Lifecycle == nil {
// move nothing
c.mainTasksPending[task.Name] = struct{}{}
continue
}

Expand All @@ -55,9 +63,10 @@ func (c *taskHookCoordinator) setTasks(tasks []*structs.Task) {
} else {
c.prestartEphemeral[task.Name] = struct{}{}
}

case structs.TaskLifecycleHookPoststart:
// Poststart hooks don't need to be tracked.
default:
c.logger.Error("invalid lifecycle hook", "hook", task.Lifecycle.Hook)
c.logger.Error("invalid lifecycle hook", "task", task.Name, "hook", task.Lifecycle.Hook)
}
}

Expand All @@ -70,22 +79,28 @@ func (c *taskHookCoordinator) hasPrestartTasks() bool {
return len(c.prestartSidecar)+len(c.prestartEphemeral) > 0
}

func (c *taskHookCoordinator) hasPendingMainTasks() bool {
return len(c.mainTasksPending) > 0
}

func (c *taskHookCoordinator) startConditionForTask(task *structs.Task) <-chan struct{} {
if task.Lifecycle != nil && task.Lifecycle.Hook == structs.TaskLifecycleHookPrestart {
return c.closedCh
if task.Lifecycle == nil {
return c.mainTaskCtx.Done()
}

return c.mainTaskCtx.Done()

switch task.Lifecycle.Hook {
case structs.TaskLifecycleHookPrestart:
// Prestart tasks start without checking status of other tasks
return c.closedCh
case structs.TaskLifecycleHookPoststart:
return c.poststartTaskCtx.Done()
default:
return c.mainTaskCtx.Done()
}
}

// This is not thread safe! This must only be called from one thread per alloc runner.
func (c *taskHookCoordinator) taskStateUpdated(states map[string]*structs.TaskState) {
if c.mainTaskCtx.Err() != nil {
// nothing to do here
return
}

for task := range c.prestartSidecar {
st := states[task]
if st == nil || st.StartedAt.IsZero() {
Expand All @@ -104,10 +119,23 @@ func (c *taskHookCoordinator) taskStateUpdated(states map[string]*structs.TaskSt
delete(c.prestartEphemeral, task)
}

for task := range c.mainTasksPending {
st := states[task]
if st == nil || st.StartedAt.IsZero() {
continue
}

delete(c.mainTasksPending, task)
}

// everything well
if !c.hasPrestartTasks() {
c.mainTaskCtxCancel()
}

if !c.hasPendingMainTasks() {
c.poststartTaskCtxCancel()
}
}

// hasNonSidecarTasks returns false if all the passed tasks are sidecar tasks
Expand Down
46 changes: 46 additions & 0 deletions client/allocrunner/task_hook_coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,52 @@ func TestTaskHookCoordinator_SidecarNeverStarts(t *testing.T) {
require.Falsef(t, isChannelClosed(mainCh), "%s channel was closed, should be open", mainTask.Name)
}

func TestTaskHookCoordinator_PoststartStartsAfterMain(t *testing.T) {
logger := testlog.HCLogger(t)

alloc := mock.LifecycleAlloc()
tasks := alloc.Job.TaskGroups[0].Tasks

mainTask := tasks[0]
sideTask := tasks[1]
postTask := tasks[2]

// Make the the third task a poststart hook
postTask.Lifecycle.Hook = structs.TaskLifecycleHookPoststart

coord := newTaskHookCoordinator(logger, tasks)
postCh := coord.startConditionForTask(postTask)
sideCh := coord.startConditionForTask(sideTask)
mainCh := coord.startConditionForTask(mainTask)

require.Truef(t, isChannelClosed(sideCh), "%s channel was open, should be closed", sideTask.Name)
require.Falsef(t, isChannelClosed(mainCh), "%s channel was closed, should be open", mainTask.Name)
require.Falsef(t, isChannelClosed(mainCh), "%s channel was closed, should be open", postTask.Name)

states := map[string]*structs.TaskState{
postTask.Name: {
State: structs.TaskStatePending,
Failed: false,
},
mainTask.Name: {
State: structs.TaskStateRunning,
Failed: false,
StartedAt: time.Now(),
},
sideTask.Name: {
State: structs.TaskStateRunning,
Failed: false,
StartedAt: time.Now(),
},
}

coord.taskStateUpdated(states)

require.Truef(t, isChannelClosed(postCh), "%s channel was open, should be closed", postTask.Name)
require.Truef(t, isChannelClosed(sideCh), "%s channel was open, should be closed", sideTask.Name)
require.Truef(t, isChannelClosed(mainCh), "%s channel was open, should be closed", mainTask.Name)
}

func isChannelClosed(ch <-chan struct{}) bool {
select {
case <-ch:
Expand Down
8 changes: 8 additions & 0 deletions client/allocrunner/taskrunner/restarts/restarts.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,19 @@ const (
)

func NewRestartTracker(policy *structs.RestartPolicy, jobType string, tlc *structs.TaskLifecycleConfig) *RestartTracker {
// Batch jobs should not restart if they exit successfully
onSuccess := jobType != structs.JobTypeBatch

// Prestart sidecars should get restarted on success
if tlc != nil && tlc.Hook == structs.TaskLifecycleHookPrestart {
onSuccess = tlc.Sidecar
}

// Poststart sidecars should get restarted on success
if tlc != nil && tlc.Hook == structs.TaskLifecycleHookPoststart {
onSuccess = tlc.Sidecar
}

return &RestartTracker{
startTime: time.Now(),
onSuccess: onSuccess,
Expand Down
Loading

0 comments on commit 8faece3

Please sign in to comment.