Skip to content

Commit

Permalink
Task lifecycle restart (#14127)
Browse files Browse the repository at this point in the history
* allocrunner: handle lifecycle when all tasks die

When all tasks die the Coordinator must transition to its terminal
state, coordinatorStatePoststop, to unblock poststop tasks. Since this
could happen at any time (for example, a prestart task dies), all states
must be able to transition to this terminal state.

* allocrunner: implement different alloc restarts

Add a new alloc restart mode where all tasks are restarted, even if they
have already exited. Also unifies the alloc restart logic to use the
implementation that restarts tasks concurrently and ignores
ErrTaskNotRunning errors since those are expected when restarting the
allocation.

* allocrunner: allow tasks to run again

Prevent the task runner Run() method from exiting to allow a dead task
to run again. When the task runner is signaled to restart, the function
will jump back to the MAIN loop and run it again.

The task runner determines if a task needs to run again based on two new
task events that were added to differentiate between a request to
restart a specific task, the tasks that are currently running, or all
tasks that have already run.

* api/cli: add support for all tasks alloc restart

Implement the new -all-tasks alloc restart CLI flag and its API
counterpar, AllTasks. The client endpoint calls the appropriate restart
method from the allocrunner depending on the restart parameters used.

* test: fix tasklifecycle Coordinator test

* allocrunner: kill taskrunners if all tasks are dead

When all non-poststop tasks are dead we need to kill the taskrunners so
we don't leak their goroutines, which are blocked in the alloc restart
loop. This also ensures the allocrunner exits on its own.

* taskrunner: fix tests that waited on WaitCh

Now that "dead" tasks may run again, the taskrunner Run() method will
not return when the task finishes running, so tests must wait for the
task state to be "dead" instead of using the WaitCh, since it won't be
closed until the taskrunner is killed.

* tests: add tests for all tasks alloc restart

* changelog: add entry for #14127

* taskrunner: fix restore logic.

The first implementation of the task runner restore process relied on
server data (`tr.Alloc().TerminalStatus()`) which may not be available
to the client at the time of restore.

It also had the incorrect code path. When restoring a dead task the
driver handle always needs to be clear cleanly using `clearDriverHandle`
otherwise, after exiting the MAIN loop, the task may be killed by
`tr.handleKill`.

The fix is to store the state of the Run() loop in the task runner local
client state: if the task runner ever exits this loop cleanly (not with
a shutdown) it will never be able to run again. So if the Run() loops
starts with this local state flag set, it must exit early.

This local state flag is also being checked on task restart requests. If
the task is "dead" and its Run() loop is not active it will never be
able to run again.

* address code review requests

* apply more code review changes

* taskrunner: add different Restart modes

Using the task event to differentiate between the allocrunner restart
methods proved to be confusing for developers to understand how it all
worked.

So instead of relying on the event type, this commit separated the logic
of restarting an taskRunner into two methods:
- `Restart` will retain the current behaviour and only will only restart
  the task if it's currently running.
- `ForceRestart` is the new method where a `dead` task is allowed to
  restart if its `Run()` method is still active. Callers will need to
  restart the allocRunner taskCoordinator to make sure it will allow the
  task to run again.

* minor fixes
  • Loading branch information
lgfa29 committed Aug 24, 2022
1 parent e886d5d commit f74f508
Show file tree
Hide file tree
Showing 22 changed files with 1,098 additions and 212 deletions.
7 changes: 7 additions & 0 deletions .changelog/14127.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
```release-note:improvement
client: add option to restart all tasks of an allocation, regardless of lifecycle type or state.
```

```release-note:improvement
client: only start poststop tasks after poststart tasks are done.
```
21 changes: 20 additions & 1 deletion api/allocations.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,9 @@ func (a *Allocations) GC(alloc *Allocation, q *QueryOptions) error {
return err
}

// Restart restarts an allocation.
// Restart restarts the tasks that are currently running or a specific task if
// taskName is provided. An error is returned if the task to be restarted is
// not running.
//
// Note: for cluster topologies where API consumers don't have network access to
// Nomad clients, set api.ClientConnTimeout to a small value (ex 1ms) to avoid
Expand All @@ -156,6 +158,22 @@ func (a *Allocations) Restart(alloc *Allocation, taskName string, q *QueryOption
return err
}

// RestartAllTasks restarts all tasks in the allocation, regardless of
// lifecycle type or state. Tasks will restart following their lifecycle order.
//
// Note: for cluster topologies where API consumers don't have network access to
// Nomad clients, set api.ClientConnTimeout to a small value (ex 1ms) to avoid
// long pauses on this API call.
func (a *Allocations) RestartAllTasks(alloc *Allocation, q *QueryOptions) error {
req := AllocationRestartRequest{
AllTasks: true,
}

var resp struct{}
_, err := a.client.putQuery("/v1/client/allocation/"+alloc.ID+"/restart", &req, &resp, q)
return err
}

// Stop stops an allocation.
//
// Note: for cluster topologies where API consumers don't have network access to
Expand Down Expand Up @@ -447,6 +465,7 @@ func (a Allocation) RescheduleInfo(t time.Time) (int, int) {

type AllocationRestartRequest struct {
TaskName string
AllTasks bool
}

type AllocSignalRequest struct {
Expand Down
2 changes: 1 addition & 1 deletion client/alloc_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (a *Allocations) Restart(args *nstructs.AllocRestartRequest, reply *nstruct
return nstructs.ErrPermissionDenied
}

return a.c.RestartAllocation(args.AllocID, args.TaskName)
return a.c.RestartAllocation(args.AllocID, args.TaskName, args.AllTasks)
}

// Stats is used to collect allocation statistics
Expand Down
39 changes: 39 additions & 0 deletions client/alloc_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,45 @@ func TestAllocations_Restart(t *testing.T) {
})
}

func TestAllocations_RestartAllTasks(t *testing.T) {
ci.Parallel(t)

require := require.New(t)
client, cleanup := TestClient(t, nil)
defer cleanup()

alloc := mock.LifecycleAlloc()
require.Nil(client.addAlloc(alloc, ""))

// Can't restart all tasks while specifying a task name.
req := &nstructs.AllocRestartRequest{
AllocID: alloc.ID,
AllTasks: true,
TaskName: "web",
}
var resp nstructs.GenericResponse
err := client.ClientRPC("Allocations.Restart", &req, &resp)
require.Error(err)

// Good request.
req = &nstructs.AllocRestartRequest{
AllocID: alloc.ID,
AllTasks: true,
}

testutil.WaitForResult(func() (bool, error) {
var resp2 nstructs.GenericResponse
err := client.ClientRPC("Allocations.Restart", &req, &resp2)
if err != nil && strings.Contains(err.Error(), "not running") {
return false, err
}

return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}

func TestAllocations_Restart_ACL(t *testing.T) {
ci.Parallel(t)
require := require.New(t)
Expand Down
145 changes: 88 additions & 57 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
cstate "github.com/hashicorp/nomad/client/state"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/client/vaultclient"
agentconsul "github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/helper/pointer"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/device"
Expand Down Expand Up @@ -547,40 +546,64 @@ func (ar *allocRunner) handleTaskStateUpdates() {
}
}

// if all live runners are sidecars - kill alloc
if killEvent == nil && hasSidecars && !hasNonSidecarTasks(liveRunners) {
killEvent = structs.NewTaskEvent(structs.TaskMainDead)
}

// If there's a kill event set and live runners, kill them
if killEvent != nil && len(liveRunners) > 0 {

// Log kill reason
switch killEvent.Type {
case structs.TaskLeaderDead:
ar.logger.Debug("leader task dead, destroying all tasks", "leader_task", killTask)
case structs.TaskMainDead:
ar.logger.Debug("main tasks dead, destroying all sidecar tasks")
default:
ar.logger.Debug("task failure, destroying all tasks", "failed_task", killTask)
if len(liveRunners) > 0 {
// if all live runners are sidecars - kill alloc
onlySidecarsRemaining := hasSidecars && !hasNonSidecarTasks(liveRunners)
if killEvent == nil && onlySidecarsRemaining {
killEvent = structs.NewTaskEvent(structs.TaskMainDead)
}

// Emit kill event for live runners
for _, tr := range liveRunners {
tr.EmitEvent(killEvent)
}
// If there's a kill event set and live runners, kill them
if killEvent != nil {

// Log kill reason
switch killEvent.Type {
case structs.TaskLeaderDead:
ar.logger.Debug("leader task dead, destroying all tasks", "leader_task", killTask)
case structs.TaskMainDead:
ar.logger.Debug("main tasks dead, destroying all sidecar tasks")
default:
ar.logger.Debug("task failure, destroying all tasks", "failed_task", killTask)
}

// Kill 'em all
states = ar.killTasks()
// Emit kill event for live runners
for _, tr := range liveRunners {
tr.EmitEvent(killEvent)
}

// Kill 'em all
states = ar.killTasks()

// Wait for TaskRunners to exit before continuing. This will
// prevent looping before TaskRunners have transitioned to
// Dead.
for _, tr := range liveRunners {
ar.logger.Info("waiting for task to exit", "task", tr.Task().Name)
select {
case <-tr.WaitCh():
case <-ar.waitCh:
}
}
}
} else {
// If there are no live runners left kill all non-poststop task
// runners to unblock them from the alloc restart loop.
for _, tr := range ar.tasks {
if tr.IsPoststopTask() {
continue
}

// Wait for TaskRunners to exit before continuing to
// prevent looping before TaskRunners have transitioned
// to Dead.
for _, tr := range liveRunners {
ar.logger.Info("killing task", "task", tr.Task().Name)
select {
case <-tr.WaitCh():
case <-ar.waitCh:
default:
// Kill task runner without setting an event because the
// task is already dead, it's just waiting in the alloc
// restart loop.
err := tr.Kill(context.TODO(), nil)
if err != nil {
ar.logger.Warn("failed to kill task", "task", tr.Task().Name, "error", err)
}
}
}
}
Expand Down Expand Up @@ -648,7 +671,7 @@ func (ar *allocRunner) killTasks() map[string]*structs.TaskState {
break
}

// Kill the rest non-sidecar or poststop tasks concurrently
// Kill the rest non-sidecar and non-poststop tasks concurrently
wg := sync.WaitGroup{}
for name, tr := range ar.tasks {
// Filter out poststop and sidecar tasks so that they stop after all the other tasks are killed
Expand Down Expand Up @@ -1205,19 +1228,37 @@ func (ar *allocRunner) GetTaskEventHandler(taskName string) drivermanager.EventH
return nil
}

// RestartTask signalls the task runner for the provided task to restart.
func (ar *allocRunner) RestartTask(taskName string, taskEvent *structs.TaskEvent) error {
// Restart satisfies the WorkloadRestarter interface and restarts all tasks
// that are currently running.
func (ar *allocRunner) Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error {
return ar.restartTasks(ctx, event, failure, false)
}

// RestartTask restarts the provided task.
func (ar *allocRunner) RestartTask(taskName string, event *structs.TaskEvent) error {
tr, ok := ar.tasks[taskName]
if !ok {
return fmt.Errorf("Could not find task runner for task: %s", taskName)
}

return tr.Restart(context.TODO(), taskEvent, false)
return tr.Restart(context.TODO(), event, false)
}

// Restart satisfies the WorkloadRestarter interface restarts all task runners
// concurrently
func (ar *allocRunner) Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error {
// RestartRunning restarts all tasks that are currently running.
func (ar *allocRunner) RestartRunning(event *structs.TaskEvent) error {
return ar.restartTasks(context.TODO(), event, false, false)
}

// RestartAll restarts all tasks in the allocation, including dead ones. They
// will restart following their lifecycle order.
func (ar *allocRunner) RestartAll(event *structs.TaskEvent) error {
// Restart the taskCoordinator to allow dead tasks to run again.
ar.taskCoordinator.Restart()
return ar.restartTasks(context.TODO(), event, false, true)
}

// restartTasks restarts all task runners concurrently.
func (ar *allocRunner) restartTasks(ctx context.Context, event *structs.TaskEvent, failure bool, force bool) error {
waitCh := make(chan struct{})
var err *multierror.Error
var errMutex sync.Mutex
Expand All @@ -1230,10 +1271,19 @@ func (ar *allocRunner) Restart(ctx context.Context, event *structs.TaskEvent, fa
defer close(waitCh)
for tn, tr := range ar.tasks {
wg.Add(1)
go func(taskName string, r agentconsul.WorkloadRestarter) {
go func(taskName string, taskRunner *taskrunner.TaskRunner) {
defer wg.Done()
e := r.Restart(ctx, event, failure)
if e != nil {

var e error
if force {
e = taskRunner.ForceRestart(ctx, event.Copy(), failure)
} else {
e = taskRunner.Restart(ctx, event.Copy(), failure)
}

// Ignore ErrTaskNotRunning errors since tasks that are not
// running are expected to not be restarted.
if e != nil && e != taskrunner.ErrTaskNotRunning {
errMutex.Lock()
defer errMutex.Unlock()
err = multierror.Append(err, fmt.Errorf("failed to restart task %s: %v", taskName, e))
Expand All @@ -1251,25 +1301,6 @@ func (ar *allocRunner) Restart(ctx context.Context, event *structs.TaskEvent, fa
return err.ErrorOrNil()
}

// RestartAll signalls all task runners in the allocation to restart and passes
// a copy of the task event to each restart event.
// Returns any errors in a concatenated form.
func (ar *allocRunner) RestartAll(taskEvent *structs.TaskEvent) error {
var err *multierror.Error

// run alloc task restart hooks
ar.taskRestartHooks()

for tn := range ar.tasks {
rerr := ar.RestartTask(tn, taskEvent.Copy())
if rerr != nil {
err = multierror.Append(err, rerr)
}
}

return err.ErrorOrNil()
}

// Signal sends a signal request to task runners inside an allocation. If the
// taskName is empty, then it is sent to all tasks.
func (ar *allocRunner) Signal(taskName, signal string) error {
Expand Down
Loading

0 comments on commit f74f508

Please sign in to comment.