Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

drivers: Capture exit code when task is killed #10494

Merged
merged 2 commits into from
May 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client/allocrunner/taskrunner/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (tr *TaskRunner) Restart(ctx context.Context, event *structs.TaskEvent, fai
}

// Kill the task using an exponential backoff in-case of failures.
if err := tr.killTask(handle); err != nil {
if _, err := tr.killTask(handle, waitCh); err != nil {
// We couldn't successfully destroy the resource created.
tr.logger.Error("failed to kill task. Resources may have been leaked", "error", err)
}
Expand Down
60 changes: 41 additions & 19 deletions client/allocrunner/taskrunner/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ MAIN:
case <-tr.killCtx.Done():
// We can go through the normal should restart check since
// the restart tracker knowns it is killed
result = tr.handleKill()
result = tr.handleKill(resultCh)
case <-tr.shutdownCtx.Done():
// TaskRunner was told to exit immediately
return
Expand Down Expand Up @@ -616,7 +616,7 @@ MAIN:
// that should be terminal, so if the handle still exists we should
// kill it here.
if tr.getDriverHandle() != nil {
if result = tr.handleKill(); result != nil {
if result = tr.handleKill(nil); result != nil {
tr.emitExitResultEvent(result)
}

Expand Down Expand Up @@ -883,7 +883,7 @@ func (tr *TaskRunner) initDriver() error {
// handleKill is used to handle the a request to kill a task. It will return
// the handle exit result if one is available and store any error in the task
// runner killErr value.
func (tr *TaskRunner) handleKill() *drivers.ExitResult {
func (tr *TaskRunner) handleKill(resultCh <-chan *drivers.ExitResult) *drivers.ExitResult {
// Run the pre killing hooks
tr.preKill()

Expand All @@ -892,43 +892,61 @@ func (tr *TaskRunner) handleKill() *drivers.ExitResult {
// before waiting to kill task
if delay := tr.Task().ShutdownDelay; delay != 0 {
tr.logger.Debug("waiting before killing task", "shutdown_delay", delay)
time.Sleep(delay)

select {
case result := <-resultCh:
return result
case <-time.After(delay):
}
}

// Tell the restart tracker that the task has been killed so it doesn't
// attempt to restart it.
tr.restartTracker.SetKilled()

// Check it is running
select {
case result := <-resultCh:
return result
default:
}

handle := tr.getDriverHandle()
if handle == nil {
return nil
}

// Kill the task using an exponential backoff in-case of failures.
killErr := tr.killTask(handle)
result, killErr := tr.killTask(handle, resultCh)
if killErr != nil {
// We couldn't successfully destroy the resource created.
tr.logger.Error("failed to kill task. Resources may have been leaked", "error", killErr)
tr.setKillErr(killErr)
}

if result != nil {
return result
}

// Block until task has exited.
waitCh, err := handle.WaitCh(tr.shutdownCtx)
if resultCh == nil {
var err error
resultCh, err = handle.WaitCh(tr.shutdownCtx)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realize this is existing code that just has a new conditional, but why do we get the WaitCh after we send the killTask? Wouldn't we avoid the error handling code here if we got the WaitCh first and then blocked on it after we send the killTask?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Not sure, it wasn't the cause of the bug but it might lead to other interesting cases. I'll merge this PR as-is, do some testing, and follow up in another PR.


// The error should be nil or TaskNotFound, if it's something else then a
// failure in the driver or transport layer occurred
if err != nil {
if err == drivers.ErrTaskNotFound {
// The error should be nil or TaskNotFound, if it's something else then a
// failure in the driver or transport layer occurred
if err != nil {
if err == drivers.ErrTaskNotFound {
return nil
}
tr.logger.Error("failed to wait on task. Resources may have been leaked", "error", err)
tr.setKillErr(killErr)
return nil
}
tr.logger.Error("failed to wait on task. Resources may have been leaked", "error", err)
tr.setKillErr(killErr)
return nil
}

select {
case result := <-waitCh:
case result := <-resultCh:
return result
case <-tr.shutdownCtx.Done():
return nil
Expand All @@ -938,14 +956,14 @@ func (tr *TaskRunner) handleKill() *drivers.ExitResult {
// killTask kills the task handle. In the case that killing fails,
// killTask will retry with an exponential backoff and will give up at a
// given limit. Returns an error if the task could not be killed.
func (tr *TaskRunner) killTask(handle *DriverHandle) error {
func (tr *TaskRunner) killTask(handle *DriverHandle, resultCh <-chan *drivers.ExitResult) (*drivers.ExitResult, error) {
// Cap the number of times we attempt to kill the task.
var err error
for i := 0; i < killFailureLimit; i++ {
if err = handle.Kill(); err != nil {
if err == drivers.ErrTaskNotFound {
tr.logger.Warn("couldn't find task to kill", "task_id", handle.ID())
return nil
return nil, nil
}
// Calculate the new backoff
backoff := (1 << (2 * uint64(i))) * killBackoffBaseline
Expand All @@ -954,13 +972,17 @@ func (tr *TaskRunner) killTask(handle *DriverHandle) error {
}

tr.logger.Error("failed to kill task", "backoff", backoff, "error", err)
time.Sleep(backoff)
select {
case result := <-resultCh:
return result, nil
case <-time.After(backoff):
}
} else {
// Kill was successful
return nil
return nil, nil
}
}
return err
return nil, err
}

// persistLocalState persists local state to disk synchronously.
Expand Down
49 changes: 49 additions & 0 deletions client/allocrunner/taskrunner/task_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,55 @@ func TestTaskRunner_BuildTaskConfig_CPU_Memory(t *testing.T) {
}
}

// TestTaskRunner_Stop_ExitCode asserts that the exit code is captured on a task, even if it's stopped
func TestTaskRunner_Stop_ExitCode(t *testing.T) {
ctestutil.ExecCompatible(t)
t.Parallel()

alloc := mock.BatchAlloc()
alloc.Job.TaskGroups[0].Count = 1
task := alloc.Job.TaskGroups[0].Tasks[0]
task.KillSignal = "SIGTERM"
task.Driver = "raw_exec"
task.Config = map[string]interface{}{
"command": "/bin/sleep",
"args": []string{"1000"},
}

conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name)
defer cleanup()

// Run the first TaskRunner
tr, err := NewTaskRunner(conf)
require.NoError(t, err)
go tr.Run()

defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup"))

// Wait for it to be running
testWaitForTaskToStart(t, tr)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

err = tr.Kill(ctx, structs.NewTaskEvent("shutdown"))
require.NoError(t, err)

var exitEvent *structs.TaskEvent
state := tr.TaskState()
for _, e := range state.Events {
if e.Type == structs.TaskTerminated {
exitEvent = e
break
}
}
require.NotNilf(t, exitEvent, "exit event not found: %v", state.Events)

require.Equal(t, 143, exitEvent.ExitCode)
require.Equal(t, 15, exitEvent.Signal)

}

// TestTaskRunner_Restore_Running asserts restoring a running task does not
// rerun the task.
func TestTaskRunner_Restore_Running(t *testing.T) {
Expand Down