diff --git a/client/allocrunner/taskrunner/lifecycle.go b/client/allocrunner/taskrunner/lifecycle.go index 2b8c7350ed8c..b812156a8461 100644 --- a/client/allocrunner/taskrunner/lifecycle.go +++ b/client/allocrunner/taskrunner/lifecycle.go @@ -36,7 +36,7 @@ func (tr *TaskRunner) Restart(ctx context.Context, event *structs.TaskEvent, fai } // Kill the task using an exponential backoff in-case of failures. - if err := tr.killTask(handle); err != nil { + if _, err := tr.killTask(handle, waitCh); err != nil { // We couldn't successfully destroy the resource created. tr.logger.Error("failed to kill task. Resources may have been leaked", "error", err) } diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index 0b03ec8406ae..b1f7643bda74 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -558,7 +558,7 @@ MAIN: case <-tr.killCtx.Done(): // We can go through the normal should restart check since // the restart tracker knowns it is killed - result = tr.handleKill() + result = tr.handleKill(resultCh) case <-tr.shutdownCtx.Done(): // TaskRunner was told to exit immediately return @@ -605,7 +605,7 @@ MAIN: // that should be terminal, so if the handle still exists we should // kill it here. if tr.getDriverHandle() != nil { - if result = tr.handleKill(); result != nil { + if result = tr.handleKill(nil); result != nil { tr.emitExitResultEvent(result) } @@ -864,7 +864,7 @@ func (tr *TaskRunner) initDriver() error { // handleKill is used to handle the a request to kill a task. It will return // the handle exit result if one is available and store any error in the task // runner killErr value. -func (tr *TaskRunner) handleKill() *drivers.ExitResult { +func (tr *TaskRunner) handleKill(resultCh <-chan *drivers.ExitResult) *drivers.ExitResult { // Run the pre killing hooks tr.preKill() @@ -873,7 +873,12 @@ func (tr *TaskRunner) handleKill() *drivers.ExitResult { // before waiting to kill task if delay := tr.Task().ShutdownDelay; delay != 0 { tr.logger.Debug("waiting before killing task", "shutdown_delay", delay) - time.Sleep(delay) + + select { + case result := <-resultCh: + return result + case <-time.After(delay): + } } // Tell the restart tracker that the task has been killed so it doesn't @@ -881,35 +886,48 @@ func (tr *TaskRunner) handleKill() *drivers.ExitResult { tr.restartTracker.SetKilled() // Check it is running + select { + case result := <-resultCh: + return result + default: + } + handle := tr.getDriverHandle() if handle == nil { return nil } // Kill the task using an exponential backoff in-case of failures. - killErr := tr.killTask(handle) + result, killErr := tr.killTask(handle, resultCh) if killErr != nil { // We couldn't successfully destroy the resource created. tr.logger.Error("failed to kill task. Resources may have been leaked", "error", killErr) tr.setKillErr(killErr) } + if result != nil { + return result + } + // Block until task has exited. - waitCh, err := handle.WaitCh(tr.shutdownCtx) + if resultCh == nil { + var err error + resultCh, err = handle.WaitCh(tr.shutdownCtx) - // The error should be nil or TaskNotFound, if it's something else then a - // failure in the driver or transport layer occurred - if err != nil { - if err == drivers.ErrTaskNotFound { + // The error should be nil or TaskNotFound, if it's something else then a + // failure in the driver or transport layer occurred + if err != nil { + if err == drivers.ErrTaskNotFound { + return nil + } + tr.logger.Error("failed to wait on task. Resources may have been leaked", "error", err) + tr.setKillErr(killErr) return nil } - tr.logger.Error("failed to wait on task. Resources may have been leaked", "error", err) - tr.setKillErr(killErr) - return nil } select { - case result := <-waitCh: + case result := <-resultCh: return result case <-tr.shutdownCtx.Done(): return nil @@ -919,14 +937,14 @@ func (tr *TaskRunner) handleKill() *drivers.ExitResult { // killTask kills the task handle. In the case that killing fails, // killTask will retry with an exponential backoff and will give up at a // given limit. Returns an error if the task could not be killed. -func (tr *TaskRunner) killTask(handle *DriverHandle) error { +func (tr *TaskRunner) killTask(handle *DriverHandle, resultCh <-chan *drivers.ExitResult) (*drivers.ExitResult, error) { // Cap the number of times we attempt to kill the task. var err error for i := 0; i < killFailureLimit; i++ { if err = handle.Kill(); err != nil { if err == drivers.ErrTaskNotFound { tr.logger.Warn("couldn't find task to kill", "task_id", handle.ID()) - return nil + return nil, nil } // Calculate the new backoff backoff := (1 << (2 * uint64(i))) * killBackoffBaseline @@ -935,13 +953,17 @@ func (tr *TaskRunner) killTask(handle *DriverHandle) error { } tr.logger.Error("failed to kill task", "backoff", backoff, "error", err) - time.Sleep(backoff) + select { + case result := <-resultCh: + return result, nil + case <-time.After(backoff): + } } else { // Kill was successful - return nil + return nil, nil } } - return err + return nil, err } // persistLocalState persists local state to disk synchronously. diff --git a/client/allocrunner/taskrunner/task_runner_test.go b/client/allocrunner/taskrunner/task_runner_test.go index fd9c4f1132d9..224ede3906aa 100644 --- a/client/allocrunner/taskrunner/task_runner_test.go +++ b/client/allocrunner/taskrunner/task_runner_test.go @@ -136,6 +136,121 @@ func runTestTaskRunner(t *testing.T, alloc *structs.Allocation, taskName string) } } +func TestTaskRunner_BuildTaskConfig_CPU_Memory(t *testing.T) { + t.Parallel() + + cases := []struct { + name string + cpu int64 + memoryMB int64 + memoryMaxMB int64 + expectedLinuxMemoryMB int64 + }{ + { + name: "plain no max", + cpu: 100, + memoryMB: 100, + memoryMaxMB: 0, + expectedLinuxMemoryMB: 100, + }, + { + name: "plain with max=reserve", + cpu: 100, + memoryMB: 100, + memoryMaxMB: 100, + expectedLinuxMemoryMB: 100, + }, + { + name: "plain with max>reserve", + cpu: 100, + memoryMB: 100, + memoryMaxMB: 200, + expectedLinuxMemoryMB: 200, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + alloc := mock.BatchAlloc() + alloc.Job.TaskGroups[0].Count = 1 + task := alloc.Job.TaskGroups[0].Tasks[0] + task.Driver = "mock_driver" + task.Config = map[string]interface{}{ + "run_for": "2s", + } + res := alloc.AllocatedResources.Tasks[task.Name] + res.Cpu.CpuShares = c.cpu + res.Memory.MemoryMB = c.memoryMB + res.Memory.MemoryMaxMB = c.memoryMaxMB + + conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name) + conf.StateDB = cstate.NewMemDB(conf.Logger) // "persist" state between task runners + defer cleanup() + + // Run the first TaskRunner + tr, err := NewTaskRunner(conf) + require.NoError(t, err) + + tc := tr.buildTaskConfig() + require.Equal(t, c.cpu, tc.Resources.LinuxResources.CPUShares) + require.Equal(t, c.expectedLinuxMemoryMB*1024*1024, tc.Resources.LinuxResources.MemoryLimitBytes) + + require.Equal(t, c.cpu, tc.Resources.NomadResources.Cpu.CpuShares) + require.Equal(t, c.memoryMB, tc.Resources.NomadResources.Memory.MemoryMB) + require.Equal(t, c.memoryMaxMB, tc.Resources.NomadResources.Memory.MemoryMaxMB) + }) + } +} + +// TestTaskRunner_Stop_ExitCode asserts that the exit code is captured on a task, even if it's stopped +func TestTaskRunner_Stop_ExitCode(t *testing.T) { + ctestutil.ExecCompatible(t) + t.Parallel() + + alloc := mock.BatchAlloc() + alloc.Job.TaskGroups[0].Count = 1 + task := alloc.Job.TaskGroups[0].Tasks[0] + task.KillSignal = "SIGTERM" + task.Driver = "raw_exec" + task.Config = map[string]interface{}{ + "command": "/bin/sleep", + "args": []string{"1000"}, + } + + conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name) + defer cleanup() + + // Run the first TaskRunner + tr, err := NewTaskRunner(conf) + require.NoError(t, err) + go tr.Run() + + defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup")) + + // Wait for it to be running + testWaitForTaskToStart(t, tr) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + err = tr.Kill(ctx, structs.NewTaskEvent("shutdown")) + require.NoError(t, err) + + var exitEvent *structs.TaskEvent + state := tr.TaskState() + for _, e := range state.Events { + if e.Type == structs.TaskTerminated { + exitEvent = e + break + } + } + require.NotNilf(t, exitEvent, "exit event not found: %v", state.Events) + + require.Equal(t, 143, exitEvent.ExitCode) + require.Equal(t, 15, exitEvent.Signal) + +} + // TestTaskRunner_Restore_Running asserts restoring a running task does not // rerun the task. func TestTaskRunner_Restore_Running(t *testing.T) {