From e272d0bdf879ebb21e9c30be4eed38056cf5b40e Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Fri, 10 May 2019 08:54:35 -0700 Subject: [PATCH] client: register before restoring Registration and restoring allocs don't share state or depend on each other in any way (syncing allocs with servers is done outside of registration). Since restoring is synchronous, start the registration goroutine first. For nodes with lots of allocs to restore or close to their heartbeat deadline, this could be the difference between becoming "lost" or not. --- .../taskrunner/task_runner_test.go | 90 +++++++++++++++++-- client/client.go | 6 +- 2 files changed, 87 insertions(+), 9 deletions(-) diff --git a/client/allocrunner/taskrunner/task_runner_test.go b/client/allocrunner/taskrunner/task_runner_test.go index f1589a1d1a69..7d2bd5275e60 100644 --- a/client/allocrunner/taskrunner/task_runner_test.go +++ b/client/allocrunner/taskrunner/task_runner_test.go @@ -185,10 +185,9 @@ func TestTaskRunner_Restore_Running(t *testing.T) { // kills the task before restarting a new TaskRunner. The new TaskRunner is // returned once it is running and waiting in pending along with a cleanup // func. -func setupRestoreFailureTest(t *testing.T) (*TaskRunner, *Config, func()) { +func setupRestoreFailureTest(t *testing.T, alloc *structs.Allocation) (*TaskRunner, *Config, func()) { t.Parallel() - alloc := mock.Alloc() task := alloc.Job.TaskGroups[0].Tasks[0] task.Driver = "raw_exec" task.Config = map[string]interface{}{ @@ -217,7 +216,7 @@ func setupRestoreFailureTest(t *testing.T) (*TaskRunner, *Config, func()) { // Cause TR to exit without shutting down task origTR.Shutdown() - // Get the mock driver plugin + // Get the driver driverPlugin, err := conf.DriverManager.Dispense(rawexec.PluginID.Name) require.NoError(t, err) rawexecDriver := driverPlugin.(*rawexec.Driver) @@ -233,6 +232,7 @@ func setupRestoreFailureTest(t *testing.T) (*TaskRunner, *Config, func()) { require.EqualError(t, err, drivers.ErrTaskNotFound.Error()) // Create a new TaskRunner and Restore the task + conf.ServersContactedCh = make(chan struct{}) newTR, err := NewTaskRunner(conf) require.NoError(t, err) @@ -260,7 +260,7 @@ func setupRestoreFailureTest(t *testing.T) (*TaskRunner, *Config, func()) { // TestTaskRunner_Restore_Restart asserts restoring a dead task blocks until // MarkAlive is called. #1795 func TestTaskRunner_Restore_Restart(t *testing.T) { - newTR, conf, cleanup := setupRestoreFailureTest(t) + newTR, conf, cleanup := setupRestoreFailureTest(t, mock.Alloc()) defer cleanup() // Fake contacting the server by closing the chan @@ -277,7 +277,7 @@ func TestTaskRunner_Restore_Restart(t *testing.T) { // TestTaskRunner_Restore_Kill asserts restoring a dead task blocks until // the task is killed. #1795 func TestTaskRunner_Restore_Kill(t *testing.T) { - newTR, _, cleanup := setupRestoreFailureTest(t) + newTR, _, cleanup := setupRestoreFailureTest(t, mock.Alloc()) defer cleanup() // Sending the task a terminal update shouldn't kill it or unblock it @@ -302,12 +302,17 @@ func TestTaskRunner_Restore_Kill(t *testing.T) { // TestTaskRunner_Restore_Update asserts restoring a dead task blocks until // Update is called. #1795 func TestTaskRunner_Restore_Update(t *testing.T) { - newTR, conf, cleanup := setupRestoreFailureTest(t) + newTR, conf, cleanup := setupRestoreFailureTest(t, mock.Alloc()) defer cleanup() // Fake Client.runAllocs behavior by calling Update then closing chan alloc := newTR.Alloc().Copy() newTR.Update(alloc) + + // Update alone should not unblock the test + require.Equal(t, structs.TaskStatePending, newTR.TaskState().State) + + // Fake Client.runAllocs behavior of closing chan after Update close(conf.ServersContactedCh) testutil.WaitForResult(func() (bool, error) { @@ -318,6 +323,79 @@ func TestTaskRunner_Restore_Update(t *testing.T) { }) } +// TestTaskRunner_Restore_System asserts restoring a dead system task does not +// block. +func TestTaskRunner_Restore_System(t *testing.T) { + t.Parallel() + + alloc := mock.Alloc() + alloc.Job.Type = structs.JobTypeSystem + task := alloc.Job.TaskGroups[0].Tasks[0] + task.Driver = "raw_exec" + task.Config = map[string]interface{}{ + "command": "sleep", + "args": []string{"30"}, + } + conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name) + defer cleanup() + conf.StateDB = cstate.NewMemDB(conf.Logger) // "persist" state between runs + + // Run the first TaskRunner + origTR, err := NewTaskRunner(conf) + require.NoError(t, err) + go origTR.Run() + defer origTR.Kill(context.Background(), structs.NewTaskEvent("cleanup")) + + // Wait for it to be running + testWaitForTaskToStart(t, origTR) + + handle := origTR.getDriverHandle() + require.NotNil(t, handle) + taskID := handle.taskID + + // Cause TR to exit without shutting down task + origTR.Shutdown() + + // Get the driver + driverPlugin, err := conf.DriverManager.Dispense(rawexec.PluginID.Name) + require.NoError(t, err) + rawexecDriver := driverPlugin.(*rawexec.Driver) + + // Assert the task is still running despite TR having exited + taskStatus, err := rawexecDriver.InspectTask(taskID) + require.NoError(t, err) + require.Equal(t, drivers.TaskStateRunning, taskStatus.State) + + // Kill the task so it fails to recover when restore is called + require.NoError(t, rawexecDriver.DestroyTask(taskID, true)) + _, err = rawexecDriver.InspectTask(taskID) + require.EqualError(t, err, drivers.ErrTaskNotFound.Error()) + + // Create a new TaskRunner and Restore the task + conf.ServersContactedCh = make(chan struct{}) + newTR, err := NewTaskRunner(conf) + require.NoError(t, err) + + // Assert the TR will not wait on servers even though reattachment + // failed because it is a system task. + require.NoError(t, newTR.Restore()) + require.False(t, newTR.waitOnServers) + + // Nothing should have closed the chan + select { + case <-conf.ServersContactedCh: + require.Fail(t, "serversContactedCh was closed but should not have been") + default: + } + + testutil.WaitForResult(func() (bool, error) { + ts := newTR.TaskState().State + return ts == structs.TaskStateRunning, fmt.Errorf("expected task to be running but found %q", ts) + }, func(err error) { + require.NoError(t, err) + }) +} + // TestTaskRunner_TaskEnv_Interpolated asserts driver configurations are // interpolated. func TestTaskRunner_TaskEnv_Interpolated(t *testing.T) { diff --git a/client/client.go b/client/client.go index e27db56986d8..b57e8d2860d6 100644 --- a/client/client.go +++ b/client/client.go @@ -442,6 +442,9 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic logger.Warn("batch fingerprint operation timed out; proceeding to register with fingerprinted plugins so far") } + // Register and then start heartbeating to the servers. + c.shutdownGroup.Go(c.registerAndHeartbeat) + // Restore the state if err := c.restoreState(); err != nil { logger.Error("failed to restore state", "error", err) @@ -456,9 +459,6 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic return nil, fmt.Errorf("failed to restore state") } - // Register and then start heartbeating to the servers. - c.shutdownGroup.Go(c.registerAndHeartbeat) - // Begin periodic snapshotting of state. c.shutdownGroup.Go(c.periodicSnapshot)