Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

task lifecycle poststart hook #8390

Merged
merged 10 commits into from
Aug 31, 2020
1 change: 1 addition & 0 deletions api/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,7 @@ type DispatchPayloadConfig struct {

const (
TaskLifecycleHookPrestart = "prestart"
TaskLifecycleHookPoststart = "poststart"
)

type TaskLifecycle struct {
Expand Down
157 changes: 144 additions & 13 deletions client/allocrunner/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,100 @@ func TestAllocRunner_TaskLeader_KillTG(t *testing.T) {
})
}

// TestAllocRunner_Lifecycle_Poststart asserts that a service job with 2
// poststart lifecycle hooks (1 sidecar, 1 ephemeral) starts all 3 tasks, only
// the ephemeral one finishes, and the other 2 exit when the alloc is stopped.
func TestAllocRunner_Lifecycle_Poststart(t *testing.T) {
alloc := mock.LifecycleAlloc()

alloc.Job.Type = structs.JobTypeService
mainTask := alloc.Job.TaskGroups[0].Tasks[0]
mainTask.Config["run_for"] = "100s"

sidecarTask := alloc.Job.TaskGroups[0].Tasks[1]
sidecarTask.Lifecycle.Hook = structs.TaskLifecycleHookPoststart
sidecarTask.Config["run_for"] = "100s"

ephemeralTask := alloc.Job.TaskGroups[0].Tasks[2]
ephemeralTask.Lifecycle.Hook = structs.TaskLifecycleHookPoststart

conf, cleanup := testAllocRunnerConfig(t, alloc)
defer cleanup()
ar, err := NewAllocRunner(conf)
require.NoError(t, err)
defer destroy(ar)
go ar.Run()

upd := conf.StateUpdater.(*MockStateUpdater)

// Wait for main and sidecar tasks to be running, and that the
// ephemeral task ran and exited.
testutil.WaitForResult(func() (bool, error) {
last := upd.Last()
if last == nil {
return false, fmt.Errorf("No updates")
}

if last.ClientStatus != structs.AllocClientStatusRunning {
return false, fmt.Errorf("expected alloc to be running not %s", last.ClientStatus)
}

if s := last.TaskStates[mainTask.Name].State; s != structs.TaskStateRunning {
return false, fmt.Errorf("expected main task to be running not %s", s)
}

if s := last.TaskStates[sidecarTask.Name].State; s != structs.TaskStateRunning {
return false, fmt.Errorf("expected sidecar task to be running not %s", s)
}

if s := last.TaskStates[ephemeralTask.Name].State; s != structs.TaskStateDead {
return false, fmt.Errorf("expected ephemeral task to be dead not %s", s)
}

if last.TaskStates[ephemeralTask.Name].Failed {
return false, fmt.Errorf("expected ephemeral task to be successful not failed")
}

return true, nil
}, func(err error) {
t.Fatalf("error waiting for initial state:\n%v", err)
})

// Tell the alloc to stop
stopAlloc := alloc.Copy()
stopAlloc.DesiredStatus = structs.AllocDesiredStatusStop
ar.Update(stopAlloc)

// Wait for main and sidecar tasks to stop.
testutil.WaitForResult(func() (bool, error) {
last := upd.Last()

if last.ClientStatus != structs.AllocClientStatusComplete {
return false, fmt.Errorf("expected alloc to be running not %s", last.ClientStatus)
}

if s := last.TaskStates[mainTask.Name].State; s != structs.TaskStateDead {
return false, fmt.Errorf("expected main task to be dead not %s", s)
}

if last.TaskStates[mainTask.Name].Failed {
return false, fmt.Errorf("expected main task to be successful not failed")
}

if s := last.TaskStates[sidecarTask.Name].State; s != structs.TaskStateDead {
return false, fmt.Errorf("expected sidecar task to be dead not %s", s)
}

if last.TaskStates[sidecarTask.Name].Failed {
return false, fmt.Errorf("expected sidecar task to be successful not failed")
}

return true, nil
}, func(err error) {
t.Fatalf("error waiting for initial state:\n%v", err)
})
}

// TestAllocRunner_TaskMain_KillTG asserts that when main tasks die the
// entire task group is killed.
func TestAllocRunner_TaskMain_KillTG(t *testing.T) {
Expand All @@ -152,20 +246,34 @@ func TestAllocRunner_TaskMain_KillTG(t *testing.T) {
alloc.Job.TaskGroups[0].RestartPolicy.Attempts = 0
alloc.Job.TaskGroups[0].Tasks[0].RestartPolicy.Attempts = 0

// Create three tasks in the task group
sidecar := alloc.Job.TaskGroups[0].Tasks[0].Copy()
sidecar.Name = "sidecar"
sidecar.Driver = "mock_driver"
sidecar.KillTimeout = 10 * time.Millisecond
sidecar.Lifecycle = &structs.TaskLifecycleConfig{
// Create four tasks in the task group
prestart := alloc.Job.TaskGroups[0].Tasks[0].Copy()
prestart.Name = "prestart-sidecar"
prestart.Driver = "mock_driver"
prestart.KillTimeout = 10 * time.Millisecond
prestart.Lifecycle = &structs.TaskLifecycleConfig{
Hook: structs.TaskLifecycleHookPrestart,
Sidecar: true,
}

sidecar.Config = map[string]interface{}{
prestart.Config = map[string]interface{}{
"run_for": "100s",
}

poststart := alloc.Job.TaskGroups[0].Tasks[0].Copy()
poststart.Name = "poststart-sidecar"
poststart.Driver = "mock_driver"
poststart.KillTimeout = 10 * time.Millisecond
poststart.Lifecycle = &structs.TaskLifecycleConfig{
Hook: structs.TaskLifecycleHookPoststart,
Sidecar: true,
}

poststart.Config = map[string]interface{}{
"run_for": "100s",
}

// these two main tasks have the same name, is that ok?
main1 := alloc.Job.TaskGroups[0].Tasks[0].Copy()
main1.Name = "task2"
main1.Driver = "mock_driver"
Expand All @@ -180,11 +288,12 @@ func TestAllocRunner_TaskMain_KillTG(t *testing.T) {
"run_for": "2s",
}

alloc.Job.TaskGroups[0].Tasks = []*structs.Task{sidecar, main1, main2}
alloc.Job.TaskGroups[0].Tasks = []*structs.Task{prestart, poststart, main1, main2}
alloc.AllocatedResources.Tasks = map[string]*structs.AllocatedTaskResources{
sidecar.Name: tr,
main1.Name: tr,
main2.Name: tr,
prestart.Name: tr,
poststart.Name: tr,
main1.Name: tr,
main2.Name: tr,
}

conf, cleanup := testAllocRunnerConfig(t, alloc)
Expand Down Expand Up @@ -217,8 +326,30 @@ func TestAllocRunner_TaskMain_KillTG(t *testing.T) {

var state *structs.TaskState

// Task1 should be killed because Task2 exited
state = last.TaskStates[sidecar.Name]
// both sidecars should be killed because Task2 exited
state = last.TaskStates[prestart.Name]
if state == nil {
return false, fmt.Errorf("could not find state for task %s", prestart.Name)
}
if state.State != structs.TaskStateDead {
return false, fmt.Errorf("got state %v; want %v", state.State, structs.TaskStateDead)
}
if state.FinishedAt.IsZero() || state.StartedAt.IsZero() {
return false, fmt.Errorf("expected to have a start and finish time")
}
if len(state.Events) < 2 {
// At least have a received and destroyed
return false, fmt.Errorf("Unexpected number of events")
}

if !hasTaskMainEvent(state) {
return false, fmt.Errorf("Did not find event %v: %#+v", structs.TaskMainDead, state.Events)
}

state = last.TaskStates[poststart.Name]
if state == nil {
return false, fmt.Errorf("could not find state for task %s", poststart.Name)
}
if state.State != structs.TaskStateDead {
return false, fmt.Errorf("got state %v; want %v", state.State, structs.TaskStateDead)
}
Expand Down
66 changes: 47 additions & 19 deletions client/allocrunner/task_hook_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,31 @@ type taskHookCoordinator struct {
mainTaskCtx context.Context
mainTaskCtxCancel func()

poststartTaskCtx context.Context
poststartTaskCtxCancel func()

prestartSidecar map[string]struct{}
prestartEphemeral map[string]struct{}
mainTasksPending map[string]struct{}
}

func newTaskHookCoordinator(logger hclog.Logger, tasks []*structs.Task) *taskHookCoordinator {
closedCh := make(chan struct{})
close(closedCh)

mainTaskCtx, cancelFn := context.WithCancel(context.Background())
mainTaskCtx, mainCancelFn := context.WithCancel(context.Background())
poststartTaskCtx, poststartCancelFn := context.WithCancel(context.Background())

c := &taskHookCoordinator{
logger: logger,
closedCh: closedCh,
mainTaskCtx: mainTaskCtx,
mainTaskCtxCancel: cancelFn,
prestartSidecar: map[string]struct{}{},
prestartEphemeral: map[string]struct{}{},
logger: logger,
closedCh: closedCh,
mainTaskCtx: mainTaskCtx,
mainTaskCtxCancel: mainCancelFn,
prestartSidecar: map[string]struct{}{},
prestartEphemeral: map[string]struct{}{},
mainTasksPending: map[string]struct{}{},
poststartTaskCtx: poststartTaskCtx,
poststartTaskCtxCancel: poststartCancelFn,
}
c.setTasks(tasks)
return c
Expand All @@ -44,7 +52,7 @@ func (c *taskHookCoordinator) setTasks(tasks []*structs.Task) {
for _, task := range tasks {

if task.Lifecycle == nil {
// move nothing
c.mainTasksPending[task.Name] = struct{}{}
continue
}

Expand All @@ -55,9 +63,10 @@ func (c *taskHookCoordinator) setTasks(tasks []*structs.Task) {
} else {
c.prestartEphemeral[task.Name] = struct{}{}
}

case structs.TaskLifecycleHookPoststart:
// Poststart hooks don't need to be tracked.
default:
c.logger.Error("invalid lifecycle hook", "hook", task.Lifecycle.Hook)
c.logger.Error("invalid lifecycle hook", "task", task.Name, "hook", task.Lifecycle.Hook)
}
}

Expand All @@ -70,22 +79,28 @@ func (c *taskHookCoordinator) hasPrestartTasks() bool {
return len(c.prestartSidecar)+len(c.prestartEphemeral) > 0
}

func (c *taskHookCoordinator) hasPendingMainTasks() bool {
return len(c.mainTasksPending) > 0
}

func (c *taskHookCoordinator) startConditionForTask(task *structs.Task) <-chan struct{} {
if task.Lifecycle != nil && task.Lifecycle.Hook == structs.TaskLifecycleHookPrestart {
return c.closedCh
if task.Lifecycle == nil {
return c.mainTaskCtx.Done()
}

return c.mainTaskCtx.Done()

switch task.Lifecycle.Hook {
case structs.TaskLifecycleHookPrestart:
// Prestart tasks start without checking status of other tasks
return c.closedCh
case structs.TaskLifecycleHookPoststart:
return c.poststartTaskCtx.Done()
default:
return c.mainTaskCtx.Done()
}
}

// This is not thread safe! This must only be called from one thread per alloc runner.
func (c *taskHookCoordinator) taskStateUpdated(states map[string]*structs.TaskState) {
if c.mainTaskCtx.Err() != nil {
// nothing to do here
return
}

for task := range c.prestartSidecar {
st := states[task]
if st == nil || st.StartedAt.IsZero() {
Expand All @@ -104,10 +119,23 @@ func (c *taskHookCoordinator) taskStateUpdated(states map[string]*structs.TaskSt
delete(c.prestartEphemeral, task)
}

for task := range c.mainTasksPending {
st := states[task]
if st == nil || st.StartedAt.IsZero() {
continue
}

delete(c.mainTasksPending, task)
}

// everything well
if !c.hasPrestartTasks() {
c.mainTaskCtxCancel()
}

if !c.hasPendingMainTasks() {
c.poststartTaskCtxCancel()
}
}

// hasNonSidecarTasks returns false if all the passed tasks are sidecar tasks
Expand Down
46 changes: 46 additions & 0 deletions client/allocrunner/task_hook_coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,52 @@ func TestTaskHookCoordinator_SidecarNeverStarts(t *testing.T) {
require.Falsef(t, isChannelClosed(mainCh), "%s channel was closed, should be open", mainTask.Name)
}

func TestTaskHookCoordinator_PoststartStartsAfterMain(t *testing.T) {
logger := testlog.HCLogger(t)

alloc := mock.LifecycleAlloc()
tasks := alloc.Job.TaskGroups[0].Tasks

mainTask := tasks[0]
sideTask := tasks[1]
postTask := tasks[2]

// Make the the third task a poststart hook
postTask.Lifecycle.Hook = structs.TaskLifecycleHookPoststart

coord := newTaskHookCoordinator(logger, tasks)
postCh := coord.startConditionForTask(postTask)
sideCh := coord.startConditionForTask(sideTask)
mainCh := coord.startConditionForTask(mainTask)

require.Truef(t, isChannelClosed(sideCh), "%s channel was open, should be closed", sideTask.Name)
require.Falsef(t, isChannelClosed(mainCh), "%s channel was closed, should be open", mainTask.Name)
require.Falsef(t, isChannelClosed(mainCh), "%s channel was closed, should be open", postTask.Name)

states := map[string]*structs.TaskState{
postTask.Name: {
State: structs.TaskStatePending,
Failed: false,
},
mainTask.Name: {
State: structs.TaskStateRunning,
Failed: false,
StartedAt: time.Now(),
},
sideTask.Name: {
State: structs.TaskStateRunning,
Failed: false,
StartedAt: time.Now(),
},
}

coord.taskStateUpdated(states)

require.Truef(t, isChannelClosed(postCh), "%s channel was open, should be closed", postTask.Name)
require.Truef(t, isChannelClosed(sideCh), "%s channel was open, should be closed", sideTask.Name)
require.Truef(t, isChannelClosed(mainCh), "%s channel was open, should be closed", mainTask.Name)
}

func isChannelClosed(ch <-chan struct{}) bool {
select {
case <-ch:
Expand Down
8 changes: 8 additions & 0 deletions client/allocrunner/taskrunner/restarts/restarts.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,19 @@ const (
)

func NewRestartTracker(policy *structs.RestartPolicy, jobType string, tlc *structs.TaskLifecycleConfig) *RestartTracker {
// Batch jobs should not restart if they exit successfully
onSuccess := jobType != structs.JobTypeBatch

// Prestart sidecars should get restarted on success
if tlc != nil && tlc.Hook == structs.TaskLifecycleHookPrestart {
onSuccess = tlc.Sidecar
}

// Poststart sidecars should get restarted on success
if tlc != nil && tlc.Hook == structs.TaskLifecycleHookPoststart {
onSuccess = tlc.Sidecar
}

return &RestartTracker{
startTime: time.Now(),
onSuccess: onSuccess,
Expand Down
Loading