Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task lifecycle restart #14127

Merged
merged 14 commits into from
Aug 24, 2022
7 changes: 7 additions & 0 deletions .changelog/14127.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
```release-note:improvement
client: add option to restart all tasks of an allocation, regardless of lifecycle type or state.
```

```release-note:improvement
client: only start poststop tasks after poststart tasks are done.
```
Comment on lines +5 to +7
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bug or feature?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We never specified the expected behaviour here, so it's not like it was doing something wrong. The code change will now always follow this order, so I think it' an improvement.

21 changes: 20 additions & 1 deletion api/allocations.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,9 @@ func (a *Allocations) GC(alloc *Allocation, q *QueryOptions) error {
return err
}

// Restart restarts an allocation.
// Restart restarts the tasks that are currently running or a specific task if
// taskName is provided. An error is returned if the task to be restarted is
// not running.
//
// Note: for cluster topologies where API consumers don't have network access to
// Nomad clients, set api.ClientConnTimeout to a small value (ex 1ms) to avoid
Expand All @@ -156,6 +158,22 @@ func (a *Allocations) Restart(alloc *Allocation, taskName string, q *QueryOption
return err
}

// RestartAllTasks restarts all tasks in the allocation, regardless of
// lifecycle type or state. Tasks will restart following their lifecycle order.
//
// Note: for cluster topologies where API consumers don't have network access to
// Nomad clients, set api.ClientConnTimeout to a small value (ex 1ms) to avoid
// long pauses on this API call.
func (a *Allocations) RestartAllTasks(alloc *Allocation, q *QueryOptions) error {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created a new method here to avoid an api package breaking change in the Restart method above.

lgfa29 marked this conversation as resolved.
Show resolved Hide resolved
req := AllocationRestartRequest{
AllTasks: true,
}

var resp struct{}
_, err := a.client.putQuery("/v1/client/allocation/"+alloc.ID+"/restart", &req, &resp, q)
return err
}

// Stop stops an allocation.
//
// Note: for cluster topologies where API consumers don't have network access to
Expand Down Expand Up @@ -447,6 +465,7 @@ func (a Allocation) RescheduleInfo(t time.Time) (int, int) {

type AllocationRestartRequest struct {
TaskName string
AllTasks bool
}

type AllocSignalRequest struct {
Expand Down
2 changes: 1 addition & 1 deletion client/alloc_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (a *Allocations) Restart(args *nstructs.AllocRestartRequest, reply *nstruct
return nstructs.ErrPermissionDenied
}

return a.c.RestartAllocation(args.AllocID, args.TaskName)
return a.c.RestartAllocation(args.AllocID, args.TaskName, args.AllTasks)
}

// Stats is used to collect allocation statistics
Expand Down
39 changes: 39 additions & 0 deletions client/alloc_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,45 @@ func TestAllocations_Restart(t *testing.T) {
})
}

func TestAllocations_RestartAllTasks(t *testing.T) {
ci.Parallel(t)

require := require.New(t)
client, cleanup := TestClient(t, nil)
defer cleanup()

alloc := mock.LifecycleAlloc()
require.Nil(client.addAlloc(alloc, ""))

// Can't restart all tasks while specifying a task name.
req := &nstructs.AllocRestartRequest{
AllocID: alloc.ID,
AllTasks: true,
TaskName: "web",
}
var resp nstructs.GenericResponse
err := client.ClientRPC("Allocations.Restart", &req, &resp)
require.Error(err)

// Good request.
req = &nstructs.AllocRestartRequest{
AllocID: alloc.ID,
AllTasks: true,
}

testutil.WaitForResult(func() (bool, error) {
var resp2 nstructs.GenericResponse
err := client.ClientRPC("Allocations.Restart", &req, &resp2)
if err != nil && strings.Contains(err.Error(), "not running") {
return false, err
}

return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
Comment on lines +97 to +107
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like this is copied form elsewhere, can it be extracted into a helper function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah, it came from here. But I'm not sure if it's generic enough to be extracted as a helper? The two tests happen to call the Allocations.Restart RPC, but interpretation of the result may change if the code changes.

}

func TestAllocations_Restart_ACL(t *testing.T) {
ci.Parallel(t)
require := require.New(t)
Expand Down
162 changes: 105 additions & 57 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
cstate "github.com/hashicorp/nomad/client/state"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/client/vaultclient"
agentconsul "github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/helper/pointer"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/device"
Expand Down Expand Up @@ -547,40 +546,64 @@ func (ar *allocRunner) handleTaskStateUpdates() {
}
}

// if all live runners are sidecars - kill alloc
if killEvent == nil && hasSidecars && !hasNonSidecarTasks(liveRunners) {
killEvent = structs.NewTaskEvent(structs.TaskMainDead)
}

// If there's a kill event set and live runners, kill them
if killEvent != nil && len(liveRunners) > 0 {

// Log kill reason
switch killEvent.Type {
case structs.TaskLeaderDead:
ar.logger.Debug("leader task dead, destroying all tasks", "leader_task", killTask)
case structs.TaskMainDead:
ar.logger.Debug("main tasks dead, destroying all sidecar tasks")
default:
ar.logger.Debug("task failure, destroying all tasks", "failed_task", killTask)
if len(liveRunners) > 0 {
// if all live runners are sidecars - kill alloc
hasLiveSidecars := hasSidecars && !hasNonSidecarTasks(liveRunners)
lgfa29 marked this conversation as resolved.
Show resolved Hide resolved
if killEvent == nil && hasLiveSidecars {
killEvent = structs.NewTaskEvent(structs.TaskMainDead)
}

// Emit kill event for live runners
for _, tr := range liveRunners {
tr.EmitEvent(killEvent)
}
// If there's a kill event set and live runners, kill them
if killEvent != nil {

// Log kill reason
switch killEvent.Type {
case structs.TaskLeaderDead:
ar.logger.Debug("leader task dead, destroying all tasks", "leader_task", killTask)
case structs.TaskMainDead:
ar.logger.Debug("main tasks dead, destroying all sidecar tasks")
default:
ar.logger.Debug("task failure, destroying all tasks", "failed_task", killTask)
}

// Emit kill event for live runners
for _, tr := range liveRunners {
tr.EmitEvent(killEvent)
}

// Kill 'em all
states = ar.killTasks()
// Kill 'em all
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😱

states = ar.killTasks()

// Wait for TaskRunners to exit before continuing. This will
// prevent looping before TaskRunners have transitioned to
// Dead.
for _, tr := range liveRunners {
ar.logger.Info("waiting for task to exit", "task", tr.Task().Name)
select {
case <-tr.WaitCh():
case <-ar.waitCh:
}
}
}
} else {
// If there are no live runners left kill all non-poststop task
lgfa29 marked this conversation as resolved.
Show resolved Hide resolved
// runners to unblock them from the alloc restart loop.
for _, tr := range ar.tasks {
if tr.IsPoststopTask() {
continue
}

// Wait for TaskRunners to exit before continuing to
// prevent looping before TaskRunners have transitioned
// to Dead.
for _, tr := range liveRunners {
ar.logger.Info("killing task", "task", tr.Task().Name)
select {
case <-tr.WaitCh():
case <-ar.waitCh:
default:
// Kill task runner without setting an event because the
// task is already dead, it's just waiting in the alloc
// restart loop.
err := tr.Kill(context.TODO(), nil)
if err != nil {
ar.logger.Warn("failed to kill task", "task", tr.Task().Name, "error", err)
}
}
}
}
Expand Down Expand Up @@ -648,7 +671,7 @@ func (ar *allocRunner) killTasks() map[string]*structs.TaskState {
break
}

// Kill the rest non-sidecar or poststop tasks concurrently
// Kill the rest non-sidecar and non-poststop tasks concurrently
wg := sync.WaitGroup{}
for name, tr := range ar.tasks {
// Filter out poststop and sidecar tasks so that they stop after all the other tasks are killed
Expand Down Expand Up @@ -1205,19 +1228,61 @@ func (ar *allocRunner) GetTaskEventHandler(taskName string) drivermanager.EventH
return nil
}

// RestartTask signalls the task runner for the provided task to restart.
func (ar *allocRunner) RestartTask(taskName string, taskEvent *structs.TaskEvent) error {
// Restart satisfies the WorkloadRestarter interface and restarts all tasks
// that are currently running.
//
// The event type will be set to TaskRestartRunningSignal to comply with
// internal restart logic requirements.
func (ar *allocRunner) Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error {
if event.Type != structs.TaskRestartRunningSignal {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the event.Type isn't TaskRestartRunningSignal should we emit the incoming event to TaskState and create a new event, rather than overwriting it? It looks like only the check watcher calls this method, and it uses the the correct event.Type but maybe it would be a little more future proof to emit and create?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hum..not sure if I understood, do you mean something like this?

func (ar *allocRunner) Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error {
	ev := event.Copy()
	ev.Type = structs.TaskRestartRunningSignal
	return ar.restartTasks(ctx, ev, failure)
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought the original reason we were overwriting was a form of assertion? If it's not the right task restart signal, it's a bug?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, my understanding is that, for the caller of the allocRunner.Restart* functions, the event type doesn't matter, so overwriting it was a way to "correct" the call if, for some reason, it was done using the wrong type.

There is no valid scenario where you would call, for example, RestartRunning with anything other than the TaskRestartRunningSignal event type, so instead of returning an error we correct the call.

The Restart method specifically is an interesting one. It is used to implement the WorkloadRestarter interface (defined here and here) so, in theory, it could be used to perform different types of restart, but I chose to keep it as-is (restart only tasks that are currently running).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's almost what I meant. I was suggesting we make the copy and emit both events. I was thinking that with this implementation you lose the info on which code path triggered the restart.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few things to note here:

  • task events are used to surface information to users, they are not something like code tracing.
  • event types have a fairly restrict set of possible values, so they are not the best to help trace code path.
  • events have other, more meaningful fields, like DisplayMessage, Details, ReastartReason etc. and those will be kept from the input.
  • task events are stored in state, and we only keep the last 10, so emitting (almost) duplicate events will take up a slot and provide little value to users.

That being said, my implementation does abuse the task event mechanism a bit. I tried to think of an alternative, but couldn't think of one yet. I will take another look to see if I can think of something else.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e620cd1 removes all of this task event nonsense 😅

I also got ride of the new task event types to avoid any confusion on the developer side as to which type to use. I think it's easy enough for users to distinguish between them given their event description:
image

event.Type = structs.TaskRestartRunningSignal
}
return ar.restartTasks(ctx, event, failure)
}

// RestartTask restarts the provided task.
//
// The event type will be set to TaskRestartSignal to comply with internal
// restart logic requirements.
func (ar *allocRunner) RestartTask(taskName string, event *structs.TaskEvent) error {
if event.Type != structs.TaskRestartSignal {
event.Type = structs.TaskRestartSignal
}

tr, ok := ar.tasks[taskName]
if !ok {
return fmt.Errorf("Could not find task runner for task: %s", taskName)
}

return tr.Restart(context.TODO(), taskEvent, false)
return tr.Restart(context.TODO(), event, false)
}

// Restart satisfies the WorkloadRestarter interface restarts all task runners
// concurrently
func (ar *allocRunner) Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error {
// RestartRunning restarts all tasks that are currently running.
//
// The event type will be set to TaskRestartRunningSignal to comply with
// internal restart logic requirements.
func (ar *allocRunner) RestartRunning(event *structs.TaskEvent) error {
if event.Type != structs.TaskRestartRunningSignal {
event.Type = structs.TaskRestartRunningSignal
}
return ar.restartTasks(context.TODO(), event, false)
}

// RestartAll restarts all tasks in the allocation, including dead ones. They
// will restart following their lifecycle order. Only the TaskRestartAllSignal
// event type may be used.
func (ar *allocRunner) RestartAll(event *structs.TaskEvent) error {
if event.Type != structs.TaskRestartAllSignal {
return fmt.Errorf("Invalid event %s for all tasks restart request", event.Type)
}

// Restart the taskCoordinator to allow dead tasks to run again.
ar.taskCoordinator.Restart()
return ar.restartTasks(context.TODO(), event, false)
}

// restartTasks restarts all task runners concurrently.
func (ar *allocRunner) restartTasks(ctx context.Context, event *structs.TaskEvent, failure bool) error {
waitCh := make(chan struct{})
var err *multierror.Error
var errMutex sync.Mutex
Expand All @@ -1230,10 +1295,12 @@ func (ar *allocRunner) Restart(ctx context.Context, event *structs.TaskEvent, fa
defer close(waitCh)
for tn, tr := range ar.tasks {
wg.Add(1)
go func(taskName string, r agentconsul.WorkloadRestarter) {
go func(taskName string, taskRunner *taskrunner.TaskRunner) {
defer wg.Done()
e := r.Restart(ctx, event, failure)
if e != nil {
e := taskRunner.Restart(ctx, event.Copy(), failure)
// Ignore ErrTaskNotRunning errors since tasks that are not
// running are expected to not be restarted.
if e != nil && e != taskrunner.ErrTaskNotRunning {
errMutex.Lock()
defer errMutex.Unlock()
err = multierror.Append(err, fmt.Errorf("failed to restart task %s: %v", taskName, e))
Expand All @@ -1251,25 +1318,6 @@ func (ar *allocRunner) Restart(ctx context.Context, event *structs.TaskEvent, fa
return err.ErrorOrNil()
}

// RestartAll signalls all task runners in the allocation to restart and passes
// a copy of the task event to each restart event.
// Returns any errors in a concatenated form.
func (ar *allocRunner) RestartAll(taskEvent *structs.TaskEvent) error {
var err *multierror.Error

// run alloc task restart hooks
ar.taskRestartHooks()

for tn := range ar.tasks {
rerr := ar.RestartTask(tn, taskEvent.Copy())
if rerr != nil {
err = multierror.Append(err, rerr)
}
}

return err.ErrorOrNil()
}

// Signal sends a signal request to task runners inside an allocation. If the
// taskName is empty, then it is sent to all tasks.
func (ar *allocRunner) Signal(taskName, signal string) error {
Expand Down
Loading