Skip to content

Commit

Permalink
implement alloc runner task restart hook
Browse files Browse the repository at this point in the history
Most allocation hooks don't need to know when a single task within the
allocation is restarted. The check watcher for group services triggers the
alloc runner to restart all tasks, but the alloc runner's `Restart` method
doesn't trigger any of the alloc hooks, including the group service hook. The
result is that after the first time a check triggers a restart, we'll never
restart the tasks of an allocation again.

This commit adds a `RunnerTaskRestartHook` interface so that alloc runner
hooks can act if a task within the alloc is restarted. The only implementation
is in the group service hook, which will force a re-registration of the
alloc's services and fix check restarts.
  • Loading branch information
tgross committed Jan 21, 2021
1 parent 9069631 commit 211a078
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 13 deletions.
6 changes: 6 additions & 0 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -1138,6 +1138,9 @@ func (ar *allocRunner) Restart(ctx context.Context, event *structs.TaskEvent, fa
var err *multierror.Error
var errMutex sync.Mutex

// run alloc task restart hooks
ar.taskRestartHooks()

go func() {
var wg sync.WaitGroup
defer close(waitCh)
Expand Down Expand Up @@ -1170,6 +1173,9 @@ func (ar *allocRunner) Restart(ctx context.Context, event *structs.TaskEvent, fa
func (ar *allocRunner) RestartAll(taskEvent *structs.TaskEvent) error {
var err *multierror.Error

// run alloc task restart hooks
ar.taskRestartHooks()

for tn := range ar.tasks {
rerr := ar.RestartTask(tn, taskEvent.Copy())
if rerr != nil {
Expand Down
25 changes: 25 additions & 0 deletions client/allocrunner/alloc_runner_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,3 +360,28 @@ func (ar *allocRunner) shutdownHooks() {
}
}
}

func (ar *allocRunner) taskRestartHooks() {
for _, hook := range ar.runnerHooks {
re, ok := hook.(interfaces.RunnerTaskRestartHook)
if !ok {
continue
}

name := re.Name()
var start time.Time
if ar.logger.IsTrace() {
start = time.Now()
ar.logger.Trace("running alloc task restart hook",
"name", name, "start", start)
}

re.PreTaskRestart()

if ar.logger.IsTrace() {
end := time.Now()
ar.logger.Trace("finished alloc task restart hook",
"name", name, "end", end, "duration", end.Sub(start))
}
}
}
19 changes: 19 additions & 0 deletions client/allocrunner/groupservice_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,10 @@ func (h *groupServiceHook) Prerun() error {
h.prerun = true
h.mu.Unlock()
}()
return h.prerunLocked()
}

func (h *groupServiceHook) prerunLocked() error {
if len(h.services) == 0 {
return nil
}
Expand Down Expand Up @@ -145,10 +148,26 @@ func (h *groupServiceHook) Update(req *interfaces.RunnerUpdateRequest) error {
return h.consulClient.UpdateWorkload(oldWorkloadServices, newWorkloadServices)
}

func (h *groupServiceHook) PreTaskRestart() error {
h.mu.Lock()
defer func() {
// Mark prerun as true to unblock Updates
h.prerun = true
h.mu.Unlock()
}()

h.preKillLocked()
return h.prerunLocked()
}

func (h *groupServiceHook) PreKill() {
h.mu.Lock()
defer h.mu.Unlock()
h.preKillLocked()
}

// implements the PreKill hook but requires the caller hold the lock
func (h *groupServiceHook) preKillLocked() {
// If we have a shutdown delay deregister
// group services and then wait
// before continuing to kill tasks
Expand Down
45 changes: 32 additions & 13 deletions client/allocrunner/groupservice_hook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ var _ interfaces.RunnerPrerunHook = (*groupServiceHook)(nil)
var _ interfaces.RunnerUpdateHook = (*groupServiceHook)(nil)
var _ interfaces.RunnerPostrunHook = (*groupServiceHook)(nil)
var _ interfaces.RunnerPreKillHook = (*groupServiceHook)(nil)
var _ interfaces.RunnerTaskRestartHook = (*groupServiceHook)(nil)

// TestGroupServiceHook_NoGroupServices asserts calling group service hooks
// without group services does not error.
Expand Down Expand Up @@ -50,11 +51,17 @@ func TestGroupServiceHook_NoGroupServices(t *testing.T) {

require.NoError(t, h.Postrun())

require.NoError(t, h.PreTaskRestart())

ops := consulClient.GetOps()
require.Len(t, ops, 4)
require.Equal(t, "add", ops[0].Op)
require.Equal(t, "update", ops[1].Op)
require.Equal(t, "remove", ops[2].Op)
require.Len(t, ops, 7)
require.Equal(t, "add", ops[0].Op) // Prerun
require.Equal(t, "update", ops[1].Op) // Update
require.Equal(t, "remove", ops[2].Op) // Postrun (1st)
require.Equal(t, "remove", ops[3].Op) // Postrun (2nd)
require.Equal(t, "remove", ops[4].Op) // Restart -> preKill (1st)
require.Equal(t, "remove", ops[5].Op) // Restart -> preKill (2nd)
require.Equal(t, "add", ops[6].Op) // Restart -> preRun
}

// TestGroupServiceHook_ShutdownDelayUpdate asserts calling group service hooks
Expand Down Expand Up @@ -117,15 +124,21 @@ func TestGroupServiceHook_GroupServices(t *testing.T) {

require.NoError(t, h.Postrun())

require.NoError(t, h.PreTaskRestart())

ops := consulClient.GetOps()
require.Len(t, ops, 4)
require.Equal(t, "add", ops[0].Op)
require.Equal(t, "update", ops[1].Op)
require.Equal(t, "remove", ops[2].Op)
require.Len(t, ops, 7)
require.Equal(t, "add", ops[0].Op) // Prerun
require.Equal(t, "update", ops[1].Op) // Update
require.Equal(t, "remove", ops[2].Op) // Postrun (1st)
require.Equal(t, "remove", ops[3].Op) // Postrun (2nd)
require.Equal(t, "remove", ops[4].Op) // Restart -> preKill (1st)
require.Equal(t, "remove", ops[5].Op) // Restart -> preKill (2nd)
require.Equal(t, "add", ops[6].Op) // Restart -> preRun
}

// TestGroupServiceHook_Error asserts group service hooks with group
// services but no group network returns an error.
// services but no group network is handled gracefully.
func TestGroupServiceHook_NoNetwork(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -159,11 +172,17 @@ func TestGroupServiceHook_NoNetwork(t *testing.T) {

require.NoError(t, h.Postrun())

require.NoError(t, h.PreTaskRestart())

ops := consulClient.GetOps()
require.Len(t, ops, 4)
require.Equal(t, "add", ops[0].Op)
require.Equal(t, "update", ops[1].Op)
require.Equal(t, "remove", ops[2].Op)
require.Len(t, ops, 7)
require.Equal(t, "add", ops[0].Op) // Prerun
require.Equal(t, "update", ops[1].Op) // Update
require.Equal(t, "remove", ops[2].Op) // Postrun (1st)
require.Equal(t, "remove", ops[3].Op) // Postrun (2nd)
require.Equal(t, "remove", ops[4].Op) // Restart -> preKill (1st)
require.Equal(t, "remove", ops[5].Op) // Restart -> preKill (2nd)
require.Equal(t, "add", ops[6].Op) // Restart -> preRun
}

func TestGroupServiceHook_getWorkloadServices(t *testing.T) {
Expand Down
8 changes: 8 additions & 0 deletions client/allocrunner/interfaces/runner_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ type RunnerUpdateRequest struct {
Alloc *structs.Allocation
}

// RunnerTaskRestartHooks are executed just before the allocation runner is
// going to restart all tasks.
type RunnerTaskRestartHook interface {
RunnerHook

PreTaskRestart() error
}

// ShutdownHook may be implemented by AllocRunner or TaskRunner hooks and will
// be called when the agent process is being shutdown gracefully.
type ShutdownHook interface {
Expand Down

0 comments on commit 211a078

Please sign in to comment.