Skip to content

Commit

Permalink
taskrunner: add different Restart modes
Browse files Browse the repository at this point in the history
Using the task event to differentiate between the allocrunner restart
methods proved to be confusing for developers to understand how it all
worked.

So instead of relying on the event type, this commit separated the logic
of restarting an taskRunner into two methods:
- `Restart` will retain the current behaviour and only will only restart
  the task if it's currently running.
- `ForceRestart` is the new method where a `dead` task is allowed to
  restart if its `Run()` method is still active. Callers will need to
  restart the allocRunner taskCoordinator to make sure it will allow the
  task to run again.
  • Loading branch information
lgfa29 committed Aug 24, 2022
1 parent 647f071 commit e620cd1
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 76 deletions.
43 changes: 13 additions & 30 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -1230,25 +1230,12 @@ func (ar *allocRunner) GetTaskEventHandler(taskName string) drivermanager.EventH

// Restart satisfies the WorkloadRestarter interface and restarts all tasks
// that are currently running.
//
// The event type will be set to TaskRestartRunningSignal to comply with
// internal restart logic requirements.
func (ar *allocRunner) Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error {
if event.Type != structs.TaskRestartRunningSignal {
event.Type = structs.TaskRestartRunningSignal
}
return ar.restartTasks(ctx, event, failure)
return ar.restartTasks(ctx, event, failure, false)
}

// RestartTask restarts the provided task.
//
// The event type will be set to TaskRestartSignal to comply with internal
// restart logic requirements.
func (ar *allocRunner) RestartTask(taskName string, event *structs.TaskEvent) error {
if event.Type != structs.TaskRestartSignal {
event.Type = structs.TaskRestartSignal
}

tr, ok := ar.tasks[taskName]
if !ok {
return fmt.Errorf("Could not find task runner for task: %s", taskName)
Expand All @@ -1258,31 +1245,20 @@ func (ar *allocRunner) RestartTask(taskName string, event *structs.TaskEvent) er
}

// RestartRunning restarts all tasks that are currently running.
//
// The event type will be set to TaskRestartRunningSignal to comply with
// internal restart logic requirements.
func (ar *allocRunner) RestartRunning(event *structs.TaskEvent) error {
if event.Type != structs.TaskRestartRunningSignal {
event.Type = structs.TaskRestartRunningSignal
}
return ar.restartTasks(context.TODO(), event, false)
return ar.restartTasks(context.TODO(), event, false, false)
}

// RestartAll restarts all tasks in the allocation, including dead ones. They
// will restart following their lifecycle order. Only the TaskRestartAllSignal
// event type may be used.
// will restart following their lifecycle order.
func (ar *allocRunner) RestartAll(event *structs.TaskEvent) error {
if event.Type != structs.TaskRestartAllSignal {
return fmt.Errorf("Invalid event %s for all tasks restart request", event.Type)
}

// Restart the taskCoordinator to allow dead tasks to run again.
ar.taskCoordinator.Restart()
return ar.restartTasks(context.TODO(), event, false)
return ar.restartTasks(context.TODO(), event, false, true)
}

// restartTasks restarts all task runners concurrently.
func (ar *allocRunner) restartTasks(ctx context.Context, event *structs.TaskEvent, failure bool) error {
func (ar *allocRunner) restartTasks(ctx context.Context, event *structs.TaskEvent, failure bool, force bool) error {
waitCh := make(chan struct{})
var err *multierror.Error
var errMutex sync.Mutex
Expand All @@ -1297,7 +1273,14 @@ func (ar *allocRunner) restartTasks(ctx context.Context, event *structs.TaskEven
wg.Add(1)
go func(taskName string, taskRunner *taskrunner.TaskRunner) {
defer wg.Done()
e := taskRunner.Restart(ctx, event.Copy(), failure)

var e error
if force {
e = taskRunner.ForceRestart(ctx, event.Copy(), failure)
} else {
e = taskRunner.Restart(ctx, event.Copy(), failure)
}

// Ignore ErrTaskNotRunning errors since tasks that are not
// running are expected to not be restarted.
if e != nil && e != taskrunner.ErrTaskNotRunning {
Expand Down
16 changes: 6 additions & 10 deletions client/allocrunner/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,8 @@ func TestAllocRunner_Lifecycle_Restart(t *testing.T) {
Mode: structs.RestartPolicyModeFail,
}

ev := &structs.TaskEvent{Type: structs.TaskRestartSignal}

testCases := []struct {
name string
taskDefs []mock.LifecycleTaskDef
Expand All @@ -516,7 +518,6 @@ func TestAllocRunner_Lifecycle_Restart(t *testing.T) {
{
name: "restart entire allocation",
action: func(ar *allocRunner, alloc *structs.Allocation) error {
ev := &structs.TaskEvent{Type: structs.TaskRestartAllSignal}
return ar.RestartAll(ev)
},
expectedAfter: map[string]structs.TaskState{
Expand All @@ -531,7 +532,6 @@ func TestAllocRunner_Lifecycle_Restart(t *testing.T) {
{
name: "restart only running tasks",
action: func(ar *allocRunner, alloc *structs.Allocation) error {
ev := &structs.TaskEvent{Type: structs.TaskRestartRunningSignal}
return ar.RestartRunning(ev)
},
expectedAfter: map[string]structs.TaskState{
Expand All @@ -555,7 +555,6 @@ func TestAllocRunner_Lifecycle_Restart(t *testing.T) {
},
isBatch: true,
action: func(ar *allocRunner, alloc *structs.Allocation) error {
ev := &structs.TaskEvent{Type: structs.TaskRestartAllSignal}
return ar.RestartAll(ev)
},
expectedAfter: map[string]structs.TaskState{
Expand All @@ -579,7 +578,6 @@ func TestAllocRunner_Lifecycle_Restart(t *testing.T) {
},
isBatch: true,
action: func(ar *allocRunner, alloc *structs.Allocation) error {
ev := &structs.TaskEvent{Type: structs.TaskRestartRunningSignal}
return ar.RestartRunning(ev)
},
expectedAfter: map[string]structs.TaskState{
Expand All @@ -595,7 +593,6 @@ func TestAllocRunner_Lifecycle_Restart(t *testing.T) {
name: "restart entire allocation with leader",
hasLeader: true,
action: func(ar *allocRunner, alloc *structs.Allocation) error {
ev := &structs.TaskEvent{Type: structs.TaskRestartAllSignal}
return ar.RestartAll(ev)
},
expectedAfter: map[string]structs.TaskState{
Expand Down Expand Up @@ -627,7 +624,6 @@ func TestAllocRunner_Lifecycle_Restart(t *testing.T) {
{
name: "restart main task",
action: func(ar *allocRunner, alloc *structs.Allocation) error {
ev := &structs.TaskEvent{Type: structs.TaskRestartSignal}
return ar.RestartTask("main", ev)
},
expectedAfter: map[string]structs.TaskState{
Expand All @@ -643,7 +639,7 @@ func TestAllocRunner_Lifecycle_Restart(t *testing.T) {
name: "restart leader main task",
hasLeader: true,
action: func(ar *allocRunner, alloc *structs.Allocation) error {
return ar.RestartTask("main", &structs.TaskEvent{Type: structs.TaskRestartSignal})
return ar.RestartTask("main", ev)
},
expectedAfter: map[string]structs.TaskState{
"main": structs.TaskState{State: "running", Restarts: 1},
Expand Down Expand Up @@ -761,7 +757,7 @@ func TestAllocRunner_Lifecycle_Restart(t *testing.T) {
// make sure main task has had a chance to restart once on its
// own and fail again before we try to manually restart it
time.Sleep(5 * time.Second)
return ar.RestartTask("main", &structs.TaskEvent{Type: structs.TaskRestartSignal})
return ar.RestartTask("main", ev)
},
expectedErr: "Task not running",
expectedAfter: map[string]structs.TaskState{
Expand All @@ -776,7 +772,7 @@ func TestAllocRunner_Lifecycle_Restart(t *testing.T) {
{
name: "restart prestart-sidecar task",
action: func(ar *allocRunner, alloc *structs.Allocation) error {
return ar.RestartTask("prestart-sidecar", &structs.TaskEvent{Type: structs.TaskRestartSignal})
return ar.RestartTask("prestart-sidecar", ev)
},
expectedAfter: map[string]structs.TaskState{
"main": structs.TaskState{State: "running", Restarts: 0},
Expand All @@ -790,7 +786,7 @@ func TestAllocRunner_Lifecycle_Restart(t *testing.T) {
{
name: "restart poststart-sidecar task",
action: func(ar *allocRunner, alloc *structs.Allocation) error {
return ar.RestartTask("poststart-sidecar", &structs.TaskEvent{Type: structs.TaskRestartSignal})
return ar.RestartTask("poststart-sidecar", ev)
},
expectedAfter: map[string]structs.TaskState{
"main": structs.TaskState{State: "running", Restarts: 0},
Expand Down
55 changes: 46 additions & 9 deletions client/allocrunner/taskrunner/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,34 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
)

// Restart a task. Returns immediately if no task is running. Blocks until
// existing task exits or passed-in context is canceled.
// Restart restarts a task that is already running. Returns an error if the
// task is not running. Blocks until existing task exits or passed-in context
// is canceled.
func (tr *TaskRunner) Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error {
tr.logger.Trace("Restart requested", "failure", failure, "event", event.GoString())

// Check if the task is able to restart based on its state and the type of
// restart event that was triggered.
taskState := tr.TaskState()
if taskState == nil {
return ErrTaskNotRunning
}

switch taskState.State {
case structs.TaskStatePending, structs.TaskStateDead:
return ErrTaskNotRunning
}

return tr.restartImpl(ctx, event, failure)
}

// ForceRestart restarts a task that is already running or reruns it if dead.
// Returns an error if the task is not able to rerun. Blocks until existing
// task exits or passed-in context is canceled.
//
// Callers must restart the AllocRuner taskCoordinator beforehand to make sure
// the task will be able to run again.
func (tr *TaskRunner) ForceRestart(ctx context.Context, event *structs.TaskEvent, failure bool) error {
tr.logger.Trace("Force restart requested", "failure", failure, "event", event.GoString())

taskState := tr.TaskState()
if taskState == nil {
return ErrTaskNotRunning
Expand All @@ -21,23 +42,39 @@ func (tr *TaskRunner) Restart(ctx context.Context, event *structs.TaskEvent, fai
tr.stateLock.Lock()
localState := tr.localState.Copy()
tr.stateLock.Unlock()

if localState == nil {
return ErrTaskNotRunning
}

switch taskState.State {
case structs.TaskStatePending:
// Tasks that are "pending" are never allowed to restart.
return ErrTaskNotRunning

case structs.TaskStateDead:
// Tasks that are "dead" are only allowed to restart when restarting
// all tasks in the alloc, otherwise the taskCoordinator will prevent
// it from running again, and if their Run method is still running.
if event.Type != structs.TaskRestartAllSignal || localState.RunComplete {
// Tasks that are in the "dead" state are only allowed to restart if
// their Run() method is still active.
if localState.RunComplete {
return ErrTaskNotRunning
}
}

return tr.restartImpl(ctx, event, failure)
}

// restartImpl implements to task restart process.
//
// It should never be called directly as it doesn't verify if the task state
// allows for a restart.
func (tr *TaskRunner) restartImpl(ctx context.Context, event *structs.TaskEvent, failure bool) error {

// Check if the task is able to restart based on its state and the type of
// restart event that was triggered.
taskState := tr.TaskState()
if taskState == nil {
return ErrTaskNotRunning
}

// Emit the event since it may take a long time to kill
tr.EmitEvent(event)

Expand Down
8 changes: 4 additions & 4 deletions client/allocrunner/taskrunner/task_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,8 +397,8 @@ func TestTaskRunner_Restore_Dead(t *testing.T) {
// Verify that we can restart task.
// Retry a few times as the newTR.Run() may not have started yet.
testutil.WaitForResult(func() (bool, error) {
ev := &structs.TaskEvent{Type: structs.TaskRestartAllSignal}
err = newTR.Restart(context.Background(), ev, false)
ev := &structs.TaskEvent{Type: structs.TaskRestartSignal}
err = newTR.Rerun(context.Background(), ev, false)
return err == nil, err
}, func(err error) {
require.NoError(t, err)
Expand All @@ -425,8 +425,8 @@ func TestTaskRunner_Restore_Dead(t *testing.T) {
go newTR2.Run()
defer newTR2.Kill(context.Background(), structs.NewTaskEvent("cleanup"))

ev := &structs.TaskEvent{Type: structs.TaskRestartAllSignal}
err = newTR2.Restart(context.Background(), ev, false)
ev := &structs.TaskEvent{Type: structs.TaskRestartSignal}
err = newTR2.Rerun(context.Background(), ev, false)
require.Equal(t, err, ErrTaskNotRunning)
}

Expand Down
4 changes: 2 additions & 2 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -943,12 +943,12 @@ func (c *Client) RestartAllocation(allocID, taskName string, allTasks bool) erro
}

if allTasks {
event := structs.NewTaskEvent(structs.TaskRestartAllSignal).
event := structs.NewTaskEvent(structs.TaskRestartSignal).
SetRestartReason("User requested all tasks to restart")
return ar.RestartAll(event)
}

event := structs.NewTaskEvent(structs.TaskRestartRunningSignal).
event := structs.NewTaskEvent(structs.TaskRestartSignal).
SetRestartReason("User requested running tasks to restart")
return ar.RestartRunning(event)
}
Expand Down
2 changes: 1 addition & 1 deletion command/agent/consul/check_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (c *checkRestart) apply(ctx context.Context, now time.Time, status string)

// Tell TaskRunner to restart due to failure
reason := fmt.Sprintf("healthcheck: check %q unhealthy", c.checkName)
event := structs.NewTaskEvent(structs.TaskRestartRunningSignal).SetRestartReason(reason)
event := structs.NewTaskEvent(structs.TaskRestartSignal).SetRestartReason(reason)
go asyncRestart(ctx, c.logger, c.task, event)
return true
}
Expand Down
20 changes: 0 additions & 20 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -8022,14 +8022,6 @@ const (
// restarted
TaskRestartSignal = "Restart Signaled"

// TaskRestartRunningSignal indicates that all tasks in the allocation that
// are currently running have been signaled to be restarted.
TaskRestartRunningSignal = "Restart Running Signaled"

// TaskRestartAllSignal indicates that all tasks in the allocation have
// been signaled to be restarted, even the ones that have already run.
TaskRestartAllSignal = "Restart All Signaled"

// TaskSignaling indicates that the task is being signalled.
TaskSignaling = "Signaling"

Expand Down Expand Up @@ -8287,18 +8279,6 @@ func (e *TaskEvent) PopulateEventDisplayMessage() {
} else {
desc = "Task signaled to restart"
}
case TaskRestartRunningSignal:
if e.RestartReason != "" {
desc = e.RestartReason
} else {
desc = "Running tasks signaled to restart"
}
case TaskRestartAllSignal:
if e.RestartReason != "" {
desc = e.RestartReason
} else {
desc = "All tasks signaled to restart"
}
case TaskDriverMessage:
desc = e.DriverMessage
case TaskLeaderDead:
Expand Down

0 comments on commit e620cd1

Please sign in to comment.