diff --git a/client/allocrunner/taskrunner/task_runner_test.go b/client/allocrunner/taskrunner/task_runner_test.go index f1589a1d1a69..7d2bd5275e60 100644 --- a/client/allocrunner/taskrunner/task_runner_test.go +++ b/client/allocrunner/taskrunner/task_runner_test.go @@ -185,10 +185,9 @@ func TestTaskRunner_Restore_Running(t *testing.T) { // kills the task before restarting a new TaskRunner. The new TaskRunner is // returned once it is running and waiting in pending along with a cleanup // func. -func setupRestoreFailureTest(t *testing.T) (*TaskRunner, *Config, func()) { +func setupRestoreFailureTest(t *testing.T, alloc *structs.Allocation) (*TaskRunner, *Config, func()) { t.Parallel() - alloc := mock.Alloc() task := alloc.Job.TaskGroups[0].Tasks[0] task.Driver = "raw_exec" task.Config = map[string]interface{}{ @@ -217,7 +216,7 @@ func setupRestoreFailureTest(t *testing.T) (*TaskRunner, *Config, func()) { // Cause TR to exit without shutting down task origTR.Shutdown() - // Get the mock driver plugin + // Get the driver driverPlugin, err := conf.DriverManager.Dispense(rawexec.PluginID.Name) require.NoError(t, err) rawexecDriver := driverPlugin.(*rawexec.Driver) @@ -233,6 +232,7 @@ func setupRestoreFailureTest(t *testing.T) (*TaskRunner, *Config, func()) { require.EqualError(t, err, drivers.ErrTaskNotFound.Error()) // Create a new TaskRunner and Restore the task + conf.ServersContactedCh = make(chan struct{}) newTR, err := NewTaskRunner(conf) require.NoError(t, err) @@ -260,7 +260,7 @@ func setupRestoreFailureTest(t *testing.T) (*TaskRunner, *Config, func()) { // TestTaskRunner_Restore_Restart asserts restoring a dead task blocks until // MarkAlive is called. #1795 func TestTaskRunner_Restore_Restart(t *testing.T) { - newTR, conf, cleanup := setupRestoreFailureTest(t) + newTR, conf, cleanup := setupRestoreFailureTest(t, mock.Alloc()) defer cleanup() // Fake contacting the server by closing the chan @@ -277,7 +277,7 @@ func TestTaskRunner_Restore_Restart(t *testing.T) { // TestTaskRunner_Restore_Kill asserts restoring a dead task blocks until // the task is killed. #1795 func TestTaskRunner_Restore_Kill(t *testing.T) { - newTR, _, cleanup := setupRestoreFailureTest(t) + newTR, _, cleanup := setupRestoreFailureTest(t, mock.Alloc()) defer cleanup() // Sending the task a terminal update shouldn't kill it or unblock it @@ -302,12 +302,17 @@ func TestTaskRunner_Restore_Kill(t *testing.T) { // TestTaskRunner_Restore_Update asserts restoring a dead task blocks until // Update is called. #1795 func TestTaskRunner_Restore_Update(t *testing.T) { - newTR, conf, cleanup := setupRestoreFailureTest(t) + newTR, conf, cleanup := setupRestoreFailureTest(t, mock.Alloc()) defer cleanup() // Fake Client.runAllocs behavior by calling Update then closing chan alloc := newTR.Alloc().Copy() newTR.Update(alloc) + + // Update alone should not unblock the test + require.Equal(t, structs.TaskStatePending, newTR.TaskState().State) + + // Fake Client.runAllocs behavior of closing chan after Update close(conf.ServersContactedCh) testutil.WaitForResult(func() (bool, error) { @@ -318,6 +323,79 @@ func TestTaskRunner_Restore_Update(t *testing.T) { }) } +// TestTaskRunner_Restore_System asserts restoring a dead system task does not +// block. +func TestTaskRunner_Restore_System(t *testing.T) { + t.Parallel() + + alloc := mock.Alloc() + alloc.Job.Type = structs.JobTypeSystem + task := alloc.Job.TaskGroups[0].Tasks[0] + task.Driver = "raw_exec" + task.Config = map[string]interface{}{ + "command": "sleep", + "args": []string{"30"}, + } + conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name) + defer cleanup() + conf.StateDB = cstate.NewMemDB(conf.Logger) // "persist" state between runs + + // Run the first TaskRunner + origTR, err := NewTaskRunner(conf) + require.NoError(t, err) + go origTR.Run() + defer origTR.Kill(context.Background(), structs.NewTaskEvent("cleanup")) + + // Wait for it to be running + testWaitForTaskToStart(t, origTR) + + handle := origTR.getDriverHandle() + require.NotNil(t, handle) + taskID := handle.taskID + + // Cause TR to exit without shutting down task + origTR.Shutdown() + + // Get the driver + driverPlugin, err := conf.DriverManager.Dispense(rawexec.PluginID.Name) + require.NoError(t, err) + rawexecDriver := driverPlugin.(*rawexec.Driver) + + // Assert the task is still running despite TR having exited + taskStatus, err := rawexecDriver.InspectTask(taskID) + require.NoError(t, err) + require.Equal(t, drivers.TaskStateRunning, taskStatus.State) + + // Kill the task so it fails to recover when restore is called + require.NoError(t, rawexecDriver.DestroyTask(taskID, true)) + _, err = rawexecDriver.InspectTask(taskID) + require.EqualError(t, err, drivers.ErrTaskNotFound.Error()) + + // Create a new TaskRunner and Restore the task + conf.ServersContactedCh = make(chan struct{}) + newTR, err := NewTaskRunner(conf) + require.NoError(t, err) + + // Assert the TR will not wait on servers even though reattachment + // failed because it is a system task. + require.NoError(t, newTR.Restore()) + require.False(t, newTR.waitOnServers) + + // Nothing should have closed the chan + select { + case <-conf.ServersContactedCh: + require.Fail(t, "serversContactedCh was closed but should not have been") + default: + } + + testutil.WaitForResult(func() (bool, error) { + ts := newTR.TaskState().State + return ts == structs.TaskStateRunning, fmt.Errorf("expected task to be running but found %q", ts) + }, func(err error) { + require.NoError(t, err) + }) +} + // TestTaskRunner_TaskEnv_Interpolated asserts driver configurations are // interpolated. func TestTaskRunner_TaskEnv_Interpolated(t *testing.T) { diff --git a/client/client.go b/client/client.go index e27db56986d8..b57e8d2860d6 100644 --- a/client/client.go +++ b/client/client.go @@ -442,6 +442,9 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic logger.Warn("batch fingerprint operation timed out; proceeding to register with fingerprinted plugins so far") } + // Register and then start heartbeating to the servers. + c.shutdownGroup.Go(c.registerAndHeartbeat) + // Restore the state if err := c.restoreState(); err != nil { logger.Error("failed to restore state", "error", err) @@ -456,9 +459,6 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic return nil, fmt.Errorf("failed to restore state") } - // Register and then start heartbeating to the servers. - c.shutdownGroup.Go(c.registerAndHeartbeat) - // Begin periodic snapshotting of state. c.shutdownGroup.Go(c.periodicSnapshot)