Skip to content

Commit

Permalink
client: register before restoring
Browse files Browse the repository at this point in the history
Registration and restoring allocs don't share state or depend on each
other in any way (syncing allocs with servers is done outside of
registration).

Since restoring is synchronous, start the registration goroutine first.

For nodes with lots of allocs to restore or close to their heartbeat
deadline, this could be the difference between becoming "lost" or not.
  • Loading branch information
schmichael committed May 10, 2019
1 parent aced24f commit e272d0b
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 9 deletions.
90 changes: 84 additions & 6 deletions client/allocrunner/taskrunner/task_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,9 @@ func TestTaskRunner_Restore_Running(t *testing.T) {
// kills the task before restarting a new TaskRunner. The new TaskRunner is
// returned once it is running and waiting in pending along with a cleanup
// func.
func setupRestoreFailureTest(t *testing.T) (*TaskRunner, *Config, func()) {
func setupRestoreFailureTest(t *testing.T, alloc *structs.Allocation) (*TaskRunner, *Config, func()) {
t.Parallel()

alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "raw_exec"
task.Config = map[string]interface{}{
Expand Down Expand Up @@ -217,7 +216,7 @@ func setupRestoreFailureTest(t *testing.T) (*TaskRunner, *Config, func()) {
// Cause TR to exit without shutting down task
origTR.Shutdown()

// Get the mock driver plugin
// Get the driver
driverPlugin, err := conf.DriverManager.Dispense(rawexec.PluginID.Name)
require.NoError(t, err)
rawexecDriver := driverPlugin.(*rawexec.Driver)
Expand All @@ -233,6 +232,7 @@ func setupRestoreFailureTest(t *testing.T) (*TaskRunner, *Config, func()) {
require.EqualError(t, err, drivers.ErrTaskNotFound.Error())

// Create a new TaskRunner and Restore the task
conf.ServersContactedCh = make(chan struct{})
newTR, err := NewTaskRunner(conf)
require.NoError(t, err)

Expand Down Expand Up @@ -260,7 +260,7 @@ func setupRestoreFailureTest(t *testing.T) (*TaskRunner, *Config, func()) {
// TestTaskRunner_Restore_Restart asserts restoring a dead task blocks until
// MarkAlive is called. #1795
func TestTaskRunner_Restore_Restart(t *testing.T) {
newTR, conf, cleanup := setupRestoreFailureTest(t)
newTR, conf, cleanup := setupRestoreFailureTest(t, mock.Alloc())
defer cleanup()

// Fake contacting the server by closing the chan
Expand All @@ -277,7 +277,7 @@ func TestTaskRunner_Restore_Restart(t *testing.T) {
// TestTaskRunner_Restore_Kill asserts restoring a dead task blocks until
// the task is killed. #1795
func TestTaskRunner_Restore_Kill(t *testing.T) {
newTR, _, cleanup := setupRestoreFailureTest(t)
newTR, _, cleanup := setupRestoreFailureTest(t, mock.Alloc())
defer cleanup()

// Sending the task a terminal update shouldn't kill it or unblock it
Expand All @@ -302,12 +302,17 @@ func TestTaskRunner_Restore_Kill(t *testing.T) {
// TestTaskRunner_Restore_Update asserts restoring a dead task blocks until
// Update is called. #1795
func TestTaskRunner_Restore_Update(t *testing.T) {
newTR, conf, cleanup := setupRestoreFailureTest(t)
newTR, conf, cleanup := setupRestoreFailureTest(t, mock.Alloc())
defer cleanup()

// Fake Client.runAllocs behavior by calling Update then closing chan
alloc := newTR.Alloc().Copy()
newTR.Update(alloc)

// Update alone should not unblock the test
require.Equal(t, structs.TaskStatePending, newTR.TaskState().State)

// Fake Client.runAllocs behavior of closing chan after Update
close(conf.ServersContactedCh)

testutil.WaitForResult(func() (bool, error) {
Expand All @@ -318,6 +323,79 @@ func TestTaskRunner_Restore_Update(t *testing.T) {
})
}

// TestTaskRunner_Restore_System asserts restoring a dead system task does not
// block.
func TestTaskRunner_Restore_System(t *testing.T) {
t.Parallel()

alloc := mock.Alloc()
alloc.Job.Type = structs.JobTypeSystem
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "raw_exec"
task.Config = map[string]interface{}{
"command": "sleep",
"args": []string{"30"},
}
conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name)
defer cleanup()
conf.StateDB = cstate.NewMemDB(conf.Logger) // "persist" state between runs

// Run the first TaskRunner
origTR, err := NewTaskRunner(conf)
require.NoError(t, err)
go origTR.Run()
defer origTR.Kill(context.Background(), structs.NewTaskEvent("cleanup"))

// Wait for it to be running
testWaitForTaskToStart(t, origTR)

handle := origTR.getDriverHandle()
require.NotNil(t, handle)
taskID := handle.taskID

// Cause TR to exit without shutting down task
origTR.Shutdown()

// Get the driver
driverPlugin, err := conf.DriverManager.Dispense(rawexec.PluginID.Name)
require.NoError(t, err)
rawexecDriver := driverPlugin.(*rawexec.Driver)

// Assert the task is still running despite TR having exited
taskStatus, err := rawexecDriver.InspectTask(taskID)
require.NoError(t, err)
require.Equal(t, drivers.TaskStateRunning, taskStatus.State)

// Kill the task so it fails to recover when restore is called
require.NoError(t, rawexecDriver.DestroyTask(taskID, true))
_, err = rawexecDriver.InspectTask(taskID)
require.EqualError(t, err, drivers.ErrTaskNotFound.Error())

// Create a new TaskRunner and Restore the task
conf.ServersContactedCh = make(chan struct{})
newTR, err := NewTaskRunner(conf)
require.NoError(t, err)

// Assert the TR will not wait on servers even though reattachment
// failed because it is a system task.
require.NoError(t, newTR.Restore())
require.False(t, newTR.waitOnServers)

// Nothing should have closed the chan
select {
case <-conf.ServersContactedCh:
require.Fail(t, "serversContactedCh was closed but should not have been")
default:
}

testutil.WaitForResult(func() (bool, error) {
ts := newTR.TaskState().State
return ts == structs.TaskStateRunning, fmt.Errorf("expected task to be running but found %q", ts)
}, func(err error) {
require.NoError(t, err)
})
}

// TestTaskRunner_TaskEnv_Interpolated asserts driver configurations are
// interpolated.
func TestTaskRunner_TaskEnv_Interpolated(t *testing.T) {
Expand Down
6 changes: 3 additions & 3 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,9 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
logger.Warn("batch fingerprint operation timed out; proceeding to register with fingerprinted plugins so far")
}

// Register and then start heartbeating to the servers.
c.shutdownGroup.Go(c.registerAndHeartbeat)

// Restore the state
if err := c.restoreState(); err != nil {
logger.Error("failed to restore state", "error", err)
Expand All @@ -456,9 +459,6 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
return nil, fmt.Errorf("failed to restore state")
}

// Register and then start heartbeating to the servers.
c.shutdownGroup.Go(c.registerAndHeartbeat)

// Begin periodic snapshotting of state.
c.shutdownGroup.Go(c.periodicSnapshot)

Expand Down

0 comments on commit e272d0b

Please sign in to comment.