From 104067bc2b2002a4e45ae7b667a476b89addc162 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Tue, 7 May 2019 23:04:40 -0700 Subject: [PATCH] client: do not restart dead tasks until server is contacted Fixes #1795 Running restored allocations and pulling what allocations to run from the server happen concurrently. This means that if a client is rebooted, and has its allocations rescheduled, it may restart the dead allocations before it contacts the server and determines they should be dead. This commit makes tasks that fail to reattach on restore wait until the server is contacted before restarting. --- client/allocrunner/alloc_runner.go | 9 ++ client/allocrunner/taskrunner/task_runner.go | 48 +++++- .../taskrunner/task_runner_test.go | 142 +++++++++++++++++- client/client.go | 25 +++ helper/gate/gate.go | 87 +++++++++++ helper/gate/gate_test.go | 126 ++++++++++++++++ nomad/structs/structs.go | 4 + 7 files changed, 438 insertions(+), 3 deletions(-) create mode 100644 helper/gate/gate.go create mode 100644 helper/gate/gate_test.go diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index c450b1759893..4f56aae91bcf 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -721,6 +721,15 @@ func (ar *allocRunner) handleAllocUpdate(update *structs.Allocation) { } +// MarkLive unblocks restored tasks that failed to reattach and are waiting to +// contact a server before restarting the dead task. The Client will call this +// method when the task should run, otherwise the task will be killed. +func (ar *allocRunner) MarkLive() { + for _, tr := range ar.tasks { + tr.MarkLive() + } +} + func (ar *allocRunner) Listener() *cstructs.AllocListener { return ar.allocBroadcaster.Listen() } diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index 9888c1421953..b876cd7ea314 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -25,6 +25,7 @@ import ( cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/client/taskenv" "github.com/hashicorp/nomad/client/vaultclient" + "github.com/hashicorp/nomad/helper/gate" "github.com/hashicorp/nomad/helper/pluginutils/hclspecutils" "github.com/hashicorp/nomad/helper/pluginutils/hclutils" "github.com/hashicorp/nomad/helper/uuid" @@ -193,6 +194,10 @@ type TaskRunner struct { // maxEvents is the capacity of the TaskEvents on the TaskState. // Defaults to defaultMaxEvents but overrideable for testing. maxEvents int + + // restoreGate is used to block restored tasks that failed to reattach + // from restarting until servers are contacted. #1795 + restoreGate *gate.G } type Config struct { @@ -245,6 +250,12 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) { tstate = ts.Copy() } + // Initialize restoreGate as open. It will only be closed if Restore is + // called and fails to reconnect to the task handle. In that case the + // we must wait until contact with the server is made before restarting + // or killing the task. #1795 + restoreGate := gate.NewOpen() + tr := &TaskRunner{ alloc: config.Alloc, allocID: config.Alloc.ID, @@ -270,6 +281,7 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) { devicemanager: config.DeviceManager, driverManager: config.DriverManager, maxEvents: defaultMaxEvents, + restoreGate: restoreGate, } // Create the logger based on the allocation ID @@ -381,6 +393,18 @@ func (tr *TaskRunner) Run() { // - should be handled serially. go tr.handleUpdates() + // If restore failed, don't proceed until servers are contacted + if tr.restoreGate.IsClosed() { + tr.logger.Info("task failed to restore; waiting to contact server before restarting") + select { + case <-tr.killCtx.Done(): + case <-tr.shutdownCtx.Done(): + return + case <-tr.restoreGate.Wait(): + tr.logger.Trace("server contacted; unblocking waiting task") + } + } + MAIN: for !tr.Alloc().TerminalStatus() { select { @@ -858,7 +882,18 @@ func (tr *TaskRunner) Restore() error { if taskHandle := tr.localState.TaskHandle; taskHandle != nil { //TODO if RecoverTask returned the DriverNetwork we wouldn't // have to persist it at all! - tr.restoreHandle(taskHandle, tr.localState.DriverNetwork) + restored := tr.restoreHandle(taskHandle, tr.localState.DriverNetwork) + if !restored && !tr.Alloc().TerminalStatus() { + // Restore failed, close the restore gate to block + // until server is contacted to prevent restarting + // terminal allocs. #1795 + tr.logger.Trace("failed to reattach to task; will not run until server is contacted") + tr.restoreGate.Close() + + ev := structs.NewTaskEvent(structs.TaskRestoreFailed). + SetDisplayMessage("failed to restore task; will not run until server is contacted") + tr.UpdateState(structs.TaskStatePending, ev) + } } return nil } @@ -1073,6 +1108,10 @@ func (tr *TaskRunner) Update(update *structs.Allocation) { // Trigger update hooks if not terminal if !update.TerminalStatus() { tr.triggerUpdateHooks() + + // MarkLive in case task had failed to restore and were waiting + // to hear from the server. + tr.MarkLive() } } @@ -1089,6 +1128,13 @@ func (tr *TaskRunner) triggerUpdateHooks() { } } +// MarkLive unblocks restored tasks that failed to reattach and are waiting to +// contact a server before restarting the dead task. The Client will call this +// method when the task should run, otherwise the task will be killed. +func (tr *TaskRunner) MarkLive() { + tr.restoreGate.Open() +} + // Shutdown TaskRunner gracefully without affecting the state of the task. // Shutdown blocks until the main Run loop exits. func (tr *TaskRunner) Shutdown() { diff --git a/client/allocrunner/taskrunner/task_runner_test.go b/client/allocrunner/taskrunner/task_runner_test.go index 99900b785093..b8a1ad69af12 100644 --- a/client/allocrunner/taskrunner/task_runner_test.go +++ b/client/allocrunner/taskrunner/task_runner_test.go @@ -25,10 +25,12 @@ import ( "github.com/hashicorp/nomad/client/vaultclient" agentconsul "github.com/hashicorp/nomad/command/agent/consul" mockdriver "github.com/hashicorp/nomad/drivers/mock" + "github.com/hashicorp/nomad/drivers/rawexec" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/device" + "github.com/hashicorp/nomad/plugins/drivers" "github.com/hashicorp/nomad/testutil" "github.com/kr/pretty" "github.com/stretchr/testify/assert" @@ -124,8 +126,8 @@ func runTestTaskRunner(t *testing.T, alloc *structs.Allocation, taskName string) } } -// TestTaskRunner_Restore asserts restoring a running task does not rerun the -// task. +// TestTaskRunner_Restore_Running asserts restoring a running task does not +// rerun the task. func TestTaskRunner_Restore_Running(t *testing.T) { t.Parallel() require := require.New(t) @@ -178,6 +180,142 @@ func TestTaskRunner_Restore_Running(t *testing.T) { assert.Equal(t, 1, started) } +// setupRestoreFailureTest starts a service, shuts down the task runner, and +// kills the task before restarting a new TaskRunner. The new TaskRunner is +// returned once it is running and waiting in pending along with a cleanup +// func. +func setupRestoreFailureTest(t *testing.T) (*TaskRunner, func()) { + t.Parallel() + + alloc := mock.Alloc() + task := alloc.Job.TaskGroups[0].Tasks[0] + task.Driver = "raw_exec" + task.Config = map[string]interface{}{ + "command": "sleep", + "args": []string{"30"}, + } + conf, cleanup1 := testTaskRunnerConfig(t, alloc, task.Name) + conf.StateDB = cstate.NewMemDB(conf.Logger) // "persist" state between runs + + // Run the first TaskRunner + origTR, err := NewTaskRunner(conf) + require.NoError(t, err) + go origTR.Run() + cleanup2 := func() { + origTR.Kill(context.Background(), structs.NewTaskEvent("cleanup")) + cleanup1() + } + + // Wait for it to be running + testWaitForTaskToStart(t, origTR) + + handle := origTR.getDriverHandle() + require.NotNil(t, handle) + taskID := handle.taskID + + // Cause TR to exit without shutting down task + origTR.Shutdown() + + // Get the mock driver plugin + driverPlugin, err := conf.DriverManager.Dispense(rawexec.PluginID.Name) + require.NoError(t, err) + rawexecDriver := driverPlugin.(*rawexec.Driver) + + // Assert the task is still running despite TR having exited + taskStatus, err := rawexecDriver.InspectTask(taskID) + require.NoError(t, err) + require.Equal(t, drivers.TaskStateRunning, taskStatus.State) + + // Kill the task so it fails to recover when restore is called + require.NoError(t, rawexecDriver.DestroyTask(taskID, true)) + _, err = rawexecDriver.InspectTask(taskID) + require.EqualError(t, err, drivers.ErrTaskNotFound.Error()) + + // Create a new TaskRunner and Restore the task + newTR, err := NewTaskRunner(conf) + require.NoError(t, err) + require.NoError(t, newTR.Restore()) + + // Assert the restore gate is *closed* because reattachment failed + require.True(t, newTR.restoreGate.IsClosed()) + + // Start new TR + go newTR.Run() + cleanup3 := func() { + newTR.Kill(context.Background(), structs.NewTaskEvent("cleanup")) + cleanup2() + cleanup1() + } + + // Assert task has not been restarted + _, err = rawexecDriver.InspectTask(taskID) + require.EqualError(t, err, drivers.ErrTaskNotFound.Error()) + ts := newTR.TaskState() + require.Equal(t, structs.TaskStatePending, ts.State) + + return newTR, cleanup3 +} + +// TestTaskRunner_Restore_Restart asserts restoring a dead task blocks until +// MarkAlive is called. #1795 +func TestTaskRunner_Restore_Restart(t *testing.T) { + newTR, cleanup := setupRestoreFailureTest(t) + defer cleanup() + + // Fake contacting the server by opening the restore gate + newTR.MarkLive() + + testutil.WaitForResult(func() (bool, error) { + ts := newTR.TaskState().State + return ts == structs.TaskStateRunning, fmt.Errorf("expected task to be running but found %q", ts) + }, func(err error) { + require.NoError(t, err) + }) +} + +// TestTaskRunner_Restore_Kill asserts restoring a dead task blocks until +// the task is killed. #1795 +func TestTaskRunner_Restore_Kill(t *testing.T) { + newTR, cleanup := setupRestoreFailureTest(t) + defer cleanup() + + // Sending the task a terminal update shouldn't kill it or mark it live + alloc := newTR.Alloc().Copy() + alloc.DesiredStatus = structs.AllocDesiredStatusStop + newTR.Update(alloc) + + require.Equal(t, structs.TaskStatePending, newTR.TaskState().State) + + // AllocRunner will immediately kill tasks after sending a terminal + // update. + newTR.Kill(context.Background(), structs.NewTaskEvent(structs.TaskKilling)) + + select { + case <-newTR.WaitCh(): + // It died as expected! + case <-time.After(10 * time.Second): + require.Fail(t, "timeout waiting for task to die") + } +} + +// TestTaskRunner_Restore_Update asserts restoring a dead task blocks until +// Update is called. #1795 +func TestTaskRunner_Restore_Update(t *testing.T) { + newTR, cleanup := setupRestoreFailureTest(t) + defer cleanup() + + // Fake contacting the server by opening the restore gate + alloc := newTR.Alloc().Copy() + newTR.Update(alloc) + + testutil.WaitForResult(func() (bool, error) { + ts := newTR.TaskState().State + return ts == structs.TaskStateRunning, fmt.Errorf("expected task to be running but found %q", ts) + }, func(err error) { + require.NoError(t, err) + }) +} + // TestTaskRunner_TaskEnv_Interpolated asserts driver configurations are // interpolated. func TestTaskRunner_TaskEnv_Interpolated(t *testing.T) { diff --git a/client/client.go b/client/client.go index dcf5c235aa4b..4828632db074 100644 --- a/client/client.go +++ b/client/client.go @@ -125,6 +125,7 @@ type AllocRunner interface { Run() StatsReporter() interfaces.AllocStatsReporter Update(*structs.Allocation) + MarkLive() WaitCh() <-chan struct{} DestroyCh() <-chan struct{} ShutdownCh() <-chan struct{} @@ -1989,6 +1990,12 @@ func (c *Client) runAllocs(update *allocUpdates) { errs := 0 + // Mark existing allocations as live in case they failed to reattach on + // restore and are waiting to hear from the server before restarting. + for _, live := range diff.ignore { + c.markAllocLive(live) + } + // Remove the old allocations for _, remove := range diff.removed { c.removeAlloc(remove) @@ -2072,6 +2079,24 @@ func makeFailedAlloc(add *structs.Allocation, err error) *structs.Allocation { return stripped } +// markAllocLive is invoked when an alloc should be running but has not been +// updated or just been added. This allows unblocking tasks that failed to +// reattach on restored and are waiting to hear from the server. +func (c *Client) markAllocLive(allocID string) { + c.allocLock.Lock() + defer c.allocLock.Unlock() + + ar, ok := c.allocs[allocID] + if !ok { + // This should never happen as alloc diffing should cause + // unknown allocs to be added, not marked live. + c.logger.Warn("unknown alloc should be running but is not", "alloc_id", allocID) + return + } + + ar.MarkLive() +} + // removeAlloc is invoked when we should remove an allocation because it has // been removed by the server. func (c *Client) removeAlloc(allocID string) { diff --git a/helper/gate/gate.go b/helper/gate/gate.go new file mode 100644 index 000000000000..092eec92a9ab --- /dev/null +++ b/helper/gate/gate.go @@ -0,0 +1,87 @@ +// Package gate implements a simple on/off latch or gate: it blocks waiters +// until opened. Waiters may receive on a chan which is closed when the gate is +// open. +package gate + +import "sync" + +// closedCh is a chan initialized as closed +var closedCh chan struct{} + +func init() { + closedCh = make(chan struct{}) + close(closedCh) +} + +// G is a gate which blocks waiters until opened and is safe for concurrent +// use. Must be created via New. +type G struct { + // open is true if the gate is open and ch is closed. + open bool + + // ch is closed if the gate is open. + ch chan struct{} + + mu sync.Mutex +} + +// NewClosed returns a closed gate. The chan returned by Wait will block until Open +// is called. +func NewClosed() *G { + return &G{ + ch: make(chan struct{}), + } +} + +// NewOpen returns an open gate. The chan returned by Wait is closed and +// therefore will never block. +func NewOpen() *G { + return &G{ + open: true, + ch: closedCh, + } +} + +// Open the gate. Unblocks any Waiters. Opening an opened gate is a noop. Safe +// for concurrent ues with Close and Wait. +func (g *G) Open() { + g.mu.Lock() + defer g.mu.Unlock() + + if g.open { + return + } + + g.open = true + close(g.ch) +} + +// Close the gate. Blocks subsequent Wait callers. Closing a closed gate is a +// noop. Safe for concurrent use with Open and Wait. +func (g *G) Close() { + g.mu.Lock() + defer g.mu.Unlock() + + if !g.open { + return + } + + g.open = false + g.ch = make(chan struct{}) +} + +// Wait returns a chan that blocks until the gate is open. Safe for concurrent +// use with Open and Close, but the chan should not be reused between calls to +// Open and Close. +func (g *G) Wait() <-chan struct{} { + g.mu.Lock() + defer g.mu.Unlock() + return g.ch +} + +// IsClosed returns true if the gate is closed. +func (g *G) IsClosed() bool { + g.mu.Lock() + defer g.mu.Unlock() + return !g.open +} diff --git a/helper/gate/gate_test.go b/helper/gate/gate_test.go new file mode 100644 index 000000000000..cceb9673c05b --- /dev/null +++ b/helper/gate/gate_test.go @@ -0,0 +1,126 @@ +package gate + +import ( + "math/rand" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestGate_NewClosed(t *testing.T) { + t.Parallel() + + g := NewClosed() + + assertClosed := func() { + require.True(t, g.IsClosed()) + select { + case <-g.Wait(): + require.Fail(t, "expected gate to be closed") + default: + // Ok! + } + } + + assertClosed() + g.Close() + assertClosed() + + // Close should be safe to call multiple times + g.Close() + assertClosed() + + g.Open() + require.False(t, g.IsClosed()) + select { + case <-g.Wait(): + // Ok! + default: + require.Fail(t, "expected gate to be open") + } +} + +func TestGate_NewOpen(t *testing.T) { + t.Parallel() + + g := NewOpen() + + assertOpen := func() { + require.False(t, g.IsClosed()) + select { + case <-g.Wait(): + // Ok! + default: + require.Fail(t, "expected gate to be open") + } + } + + assertOpen() + g.Open() + assertOpen() + + // Open should be safe to call multiple times + g.Open() + assertOpen() + + g.Close() + select { + case <-g.Wait(): + require.Fail(t, "expected gate to be closed") + default: + // Ok! + } +} + +// TestGate_Concurrency is meant to be run with the race detector enabled to +// find any races. +func TestGate_Concurrency(t *testing.T) { + t.Parallel() + + g := NewOpen() + wg := sync.WaitGroup{} + + // Start closer + wg.Add(1) + go func() { + defer wg.Done() + dice := rand.New(rand.NewSource(time.Now().UnixNano())) + for i := 0; i < 1000; i++ { + g.Close() + time.Sleep(time.Duration(dice.Int63n(100))) + } + }() + + // Start opener + wg.Add(1) + go func() { + defer wg.Done() + dice := rand.New(rand.NewSource(time.Now().UnixNano())) + for i := 0; i < 1000; i++ { + g.Open() + time.Sleep(time.Duration(dice.Int63n(100))) + } + }() + + // Perform reads concurrently with writes + wgCh := make(chan struct{}) + doneCh := make(chan struct{}) + go func() { + defer close(doneCh) + for { + select { + case <-time.After(time.Millisecond): + case <-wgCh: + return + } + g.IsClosed() + g.Wait() + } + }() + + wg.Wait() + close(wgCh) + <-doneCh +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 46a79e3e642b..a774f2c8d239 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -5966,6 +5966,10 @@ const ( // TaskHookFailed indicates that one of the hooks for a task failed. TaskHookFailed = "Task hook failed" + + // TaskRestoreFailed indicates Nomad was unable to reattach to a + // restored task. + TaskRestoreFailed = "Failed Restoring Task" ) // TaskEvent is an event that effects the state of a task and contains meta-data