diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index c450b1759893..4f56aae91bcf 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -721,6 +721,15 @@ func (ar *allocRunner) handleAllocUpdate(update *structs.Allocation) { } +// MarkLive unblocks restored tasks that failed to reattach and are waiting to +// contact a server before restarting the dead task. The Client will call this +// method when the task should run, otherwise the task will be killed. +func (ar *allocRunner) MarkLive() { + for _, tr := range ar.tasks { + tr.MarkLive() + } +} + func (ar *allocRunner) Listener() *cstructs.AllocListener { return ar.allocBroadcaster.Listen() } diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index 9888c1421953..b876cd7ea314 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -25,6 +25,7 @@ import ( cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/client/taskenv" "github.com/hashicorp/nomad/client/vaultclient" + "github.com/hashicorp/nomad/helper/gate" "github.com/hashicorp/nomad/helper/pluginutils/hclspecutils" "github.com/hashicorp/nomad/helper/pluginutils/hclutils" "github.com/hashicorp/nomad/helper/uuid" @@ -193,6 +194,10 @@ type TaskRunner struct { // maxEvents is the capacity of the TaskEvents on the TaskState. // Defaults to defaultMaxEvents but overrideable for testing. maxEvents int + + // restoreGate is used to block restored tasks that failed to reattach + // from restarting until servers are contacted. #1795 + restoreGate *gate.G } type Config struct { @@ -245,6 +250,12 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) { tstate = ts.Copy() } + // Initialize restoreGate as open. It will only be closed if Restore is + // called and fails to reconnect to the task handle. In that case the + // we must wait until contact with the server is made before restarting + // or killing the task. #1795 + restoreGate := gate.NewOpen() + tr := &TaskRunner{ alloc: config.Alloc, allocID: config.Alloc.ID, @@ -270,6 +281,7 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) { devicemanager: config.DeviceManager, driverManager: config.DriverManager, maxEvents: defaultMaxEvents, + restoreGate: restoreGate, } // Create the logger based on the allocation ID @@ -381,6 +393,18 @@ func (tr *TaskRunner) Run() { // - should be handled serially. go tr.handleUpdates() + // If restore failed, don't proceed until servers are contacted + if tr.restoreGate.IsClosed() { + tr.logger.Info("task failed to restore; waiting to contact server before restarting") + select { + case <-tr.killCtx.Done(): + case <-tr.shutdownCtx.Done(): + return + case <-tr.restoreGate.Wait(): + tr.logger.Trace("server contacted; unblocking waiting task") + } + } + MAIN: for !tr.Alloc().TerminalStatus() { select { @@ -858,7 +882,18 @@ func (tr *TaskRunner) Restore() error { if taskHandle := tr.localState.TaskHandle; taskHandle != nil { //TODO if RecoverTask returned the DriverNetwork we wouldn't // have to persist it at all! - tr.restoreHandle(taskHandle, tr.localState.DriverNetwork) + restored := tr.restoreHandle(taskHandle, tr.localState.DriverNetwork) + if !restored && !tr.Alloc().TerminalStatus() { + // Restore failed, close the restore gate to block + // until server is contacted to prevent restarting + // terminal allocs. #1795 + tr.logger.Trace("failed to reattach to task; will not run until server is contacted") + tr.restoreGate.Close() + + ev := structs.NewTaskEvent(structs.TaskRestoreFailed). + SetDisplayMessage("failed to restore task; will not run until server is contacted") + tr.UpdateState(structs.TaskStatePending, ev) + } } return nil } @@ -1073,6 +1108,10 @@ func (tr *TaskRunner) Update(update *structs.Allocation) { // Trigger update hooks if not terminal if !update.TerminalStatus() { tr.triggerUpdateHooks() + + // MarkLive in case task had failed to restore and were waiting + // to hear from the server. + tr.MarkLive() } } @@ -1089,6 +1128,13 @@ func (tr *TaskRunner) triggerUpdateHooks() { } } +// MarkLive unblocks restored tasks that failed to reattach and are waiting to +// contact a server before restarting the dead task. The Client will call this +// method when the task should run, otherwise the task will be killed. +func (tr *TaskRunner) MarkLive() { + tr.restoreGate.Open() +} + // Shutdown TaskRunner gracefully without affecting the state of the task. // Shutdown blocks until the main Run loop exits. func (tr *TaskRunner) Shutdown() { diff --git a/client/allocrunner/taskrunner/task_runner_test.go b/client/allocrunner/taskrunner/task_runner_test.go index 99900b785093..b8a1ad69af12 100644 --- a/client/allocrunner/taskrunner/task_runner_test.go +++ b/client/allocrunner/taskrunner/task_runner_test.go @@ -25,10 +25,12 @@ import ( "github.com/hashicorp/nomad/client/vaultclient" agentconsul "github.com/hashicorp/nomad/command/agent/consul" mockdriver "github.com/hashicorp/nomad/drivers/mock" + "github.com/hashicorp/nomad/drivers/rawexec" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/device" + "github.com/hashicorp/nomad/plugins/drivers" "github.com/hashicorp/nomad/testutil" "github.com/kr/pretty" "github.com/stretchr/testify/assert" @@ -124,8 +126,8 @@ func runTestTaskRunner(t *testing.T, alloc *structs.Allocation, taskName string) } } -// TestTaskRunner_Restore asserts restoring a running task does not rerun the -// task. +// TestTaskRunner_Restore_Running asserts restoring a running task does not +// rerun the task. func TestTaskRunner_Restore_Running(t *testing.T) { t.Parallel() require := require.New(t) @@ -178,6 +180,142 @@ func TestTaskRunner_Restore_Running(t *testing.T) { assert.Equal(t, 1, started) } +// setupRestoreFailureTest starts a service, shuts down the task runner, and +// kills the task before restarting a new TaskRunner. The new TaskRunner is +// returned once it is running and waiting in pending along with a cleanup +// func. +func setupRestoreFailureTest(t *testing.T) (*TaskRunner, func()) { + t.Parallel() + + alloc := mock.Alloc() + task := alloc.Job.TaskGroups[0].Tasks[0] + task.Driver = "raw_exec" + task.Config = map[string]interface{}{ + "command": "sleep", + "args": []string{"30"}, + } + conf, cleanup1 := testTaskRunnerConfig(t, alloc, task.Name) + conf.StateDB = cstate.NewMemDB(conf.Logger) // "persist" state between runs + + // Run the first TaskRunner + origTR, err := NewTaskRunner(conf) + require.NoError(t, err) + go origTR.Run() + cleanup2 := func() { + origTR.Kill(context.Background(), structs.NewTaskEvent("cleanup")) + cleanup1() + } + + // Wait for it to be running + testWaitForTaskToStart(t, origTR) + + handle := origTR.getDriverHandle() + require.NotNil(t, handle) + taskID := handle.taskID + + // Cause TR to exit without shutting down task + origTR.Shutdown() + + // Get the mock driver plugin + driverPlugin, err := conf.DriverManager.Dispense(rawexec.PluginID.Name) + require.NoError(t, err) + rawexecDriver := driverPlugin.(*rawexec.Driver) + + // Assert the task is still running despite TR having exited + taskStatus, err := rawexecDriver.InspectTask(taskID) + require.NoError(t, err) + require.Equal(t, drivers.TaskStateRunning, taskStatus.State) + + // Kill the task so it fails to recover when restore is called + require.NoError(t, rawexecDriver.DestroyTask(taskID, true)) + _, err = rawexecDriver.InspectTask(taskID) + require.EqualError(t, err, drivers.ErrTaskNotFound.Error()) + + // Create a new TaskRunner and Restore the task + newTR, err := NewTaskRunner(conf) + require.NoError(t, err) + require.NoError(t, newTR.Restore()) + + // Assert the restore gate is *closed* because reattachment failed + require.True(t, newTR.restoreGate.IsClosed()) + + // Start new TR + go newTR.Run() + cleanup3 := func() { + newTR.Kill(context.Background(), structs.NewTaskEvent("cleanup")) + cleanup2() + cleanup1() + } + + // Assert task has not been restarted + _, err = rawexecDriver.InspectTask(taskID) + require.EqualError(t, err, drivers.ErrTaskNotFound.Error()) + ts := newTR.TaskState() + require.Equal(t, structs.TaskStatePending, ts.State) + + return newTR, cleanup3 +} + +// TestTaskRunner_Restore_Restart asserts restoring a dead task blocks until +// MarkAlive is called. #1795 +func TestTaskRunner_Restore_Restart(t *testing.T) { + newTR, cleanup := setupRestoreFailureTest(t) + defer cleanup() + + // Fake contacting the server by opening the restore gate + newTR.MarkLive() + + testutil.WaitForResult(func() (bool, error) { + ts := newTR.TaskState().State + return ts == structs.TaskStateRunning, fmt.Errorf("expected task to be running but found %q", ts) + }, func(err error) { + require.NoError(t, err) + }) +} + +// TestTaskRunner_Restore_Kill asserts restoring a dead task blocks until +// the task is killed. #1795 +func TestTaskRunner_Restore_Kill(t *testing.T) { + newTR, cleanup := setupRestoreFailureTest(t) + defer cleanup() + + // Sending the task a terminal update shouldn't kill it or mark it live + alloc := newTR.Alloc().Copy() + alloc.DesiredStatus = structs.AllocDesiredStatusStop + newTR.Update(alloc) + + require.Equal(t, structs.TaskStatePending, newTR.TaskState().State) + + // AllocRunner will immediately kill tasks after sending a terminal + // update. + newTR.Kill(context.Background(), structs.NewTaskEvent(structs.TaskKilling)) + + select { + case <-newTR.WaitCh(): + // It died as expected! + case <-time.After(10 * time.Second): + require.Fail(t, "timeout waiting for task to die") + } +} + +// TestTaskRunner_Restore_Update asserts restoring a dead task blocks until +// Update is called. #1795 +func TestTaskRunner_Restore_Update(t *testing.T) { + newTR, cleanup := setupRestoreFailureTest(t) + defer cleanup() + + // Fake contacting the server by opening the restore gate + alloc := newTR.Alloc().Copy() + newTR.Update(alloc) + + testutil.WaitForResult(func() (bool, error) { + ts := newTR.TaskState().State + return ts == structs.TaskStateRunning, fmt.Errorf("expected task to be running but found %q", ts) + }, func(err error) { + require.NoError(t, err) + }) +} + // TestTaskRunner_TaskEnv_Interpolated asserts driver configurations are // interpolated. func TestTaskRunner_TaskEnv_Interpolated(t *testing.T) { diff --git a/client/client.go b/client/client.go index dcf5c235aa4b..4828632db074 100644 --- a/client/client.go +++ b/client/client.go @@ -125,6 +125,7 @@ type AllocRunner interface { Run() StatsReporter() interfaces.AllocStatsReporter Update(*structs.Allocation) + MarkLive() WaitCh() <-chan struct{} DestroyCh() <-chan struct{} ShutdownCh() <-chan struct{} @@ -1989,6 +1990,12 @@ func (c *Client) runAllocs(update *allocUpdates) { errs := 0 + // Mark existing allocations as live in case they failed to reattach on + // restore and are waiting to hear from the server before restarting. + for _, live := range diff.ignore { + c.markAllocLive(live) + } + // Remove the old allocations for _, remove := range diff.removed { c.removeAlloc(remove) @@ -2072,6 +2079,24 @@ func makeFailedAlloc(add *structs.Allocation, err error) *structs.Allocation { return stripped } +// markAllocLive is invoked when an alloc should be running but has not been +// updated or just been added. This allows unblocking tasks that failed to +// reattach on restored and are waiting to hear from the server. +func (c *Client) markAllocLive(allocID string) { + c.allocLock.Lock() + defer c.allocLock.Unlock() + + ar, ok := c.allocs[allocID] + if !ok { + // This should never happen as alloc diffing should cause + // unknown allocs to be added, not marked live. + c.logger.Warn("unknown alloc should be running but is not", "alloc_id", allocID) + return + } + + ar.MarkLive() +} + // removeAlloc is invoked when we should remove an allocation because it has // been removed by the server. func (c *Client) removeAlloc(allocID string) { diff --git a/helper/gate/gate.go b/helper/gate/gate.go new file mode 100644 index 000000000000..092eec92a9ab --- /dev/null +++ b/helper/gate/gate.go @@ -0,0 +1,87 @@ +// Package gate implements a simple on/off latch or gate: it blocks waiters +// until opened. Waiters may receive on a chan which is closed when the gate is +// open. +package gate + +import "sync" + +// closedCh is a chan initialized as closed +var closedCh chan struct{} + +func init() { + closedCh = make(chan struct{}) + close(closedCh) +} + +// G is a gate which blocks waiters until opened and is safe for concurrent +// use. Must be created via New. +type G struct { + // open is true if the gate is open and ch is closed. + open bool + + // ch is closed if the gate is open. + ch chan struct{} + + mu sync.Mutex +} + +// NewClosed returns a closed gate. The chan returned by Wait will block until Open +// is called. +func NewClosed() *G { + return &G{ + ch: make(chan struct{}), + } +} + +// NewOpen returns an open gate. The chan returned by Wait is closed and +// therefore will never block. +func NewOpen() *G { + return &G{ + open: true, + ch: closedCh, + } +} + +// Open the gate. Unblocks any Waiters. Opening an opened gate is a noop. Safe +// for concurrent ues with Close and Wait. +func (g *G) Open() { + g.mu.Lock() + defer g.mu.Unlock() + + if g.open { + return + } + + g.open = true + close(g.ch) +} + +// Close the gate. Blocks subsequent Wait callers. Closing a closed gate is a +// noop. Safe for concurrent use with Open and Wait. +func (g *G) Close() { + g.mu.Lock() + defer g.mu.Unlock() + + if !g.open { + return + } + + g.open = false + g.ch = make(chan struct{}) +} + +// Wait returns a chan that blocks until the gate is open. Safe for concurrent +// use with Open and Close, but the chan should not be reused between calls to +// Open and Close. +func (g *G) Wait() <-chan struct{} { + g.mu.Lock() + defer g.mu.Unlock() + return g.ch +} + +// IsClosed returns true if the gate is closed. +func (g *G) IsClosed() bool { + g.mu.Lock() + defer g.mu.Unlock() + return !g.open +} diff --git a/helper/gate/gate_test.go b/helper/gate/gate_test.go new file mode 100644 index 000000000000..cceb9673c05b --- /dev/null +++ b/helper/gate/gate_test.go @@ -0,0 +1,126 @@ +package gate + +import ( + "math/rand" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestGate_NewClosed(t *testing.T) { + t.Parallel() + + g := NewClosed() + + assertClosed := func() { + require.True(t, g.IsClosed()) + select { + case <-g.Wait(): + require.Fail(t, "expected gate to be closed") + default: + // Ok! + } + } + + assertClosed() + g.Close() + assertClosed() + + // Close should be safe to call multiple times + g.Close() + assertClosed() + + g.Open() + require.False(t, g.IsClosed()) + select { + case <-g.Wait(): + // Ok! + default: + require.Fail(t, "expected gate to be open") + } +} + +func TestGate_NewOpen(t *testing.T) { + t.Parallel() + + g := NewOpen() + + assertOpen := func() { + require.False(t, g.IsClosed()) + select { + case <-g.Wait(): + // Ok! + default: + require.Fail(t, "expected gate to be open") + } + } + + assertOpen() + g.Open() + assertOpen() + + // Open should be safe to call multiple times + g.Open() + assertOpen() + + g.Close() + select { + case <-g.Wait(): + require.Fail(t, "expected gate to be closed") + default: + // Ok! + } +} + +// TestGate_Concurrency is meant to be run with the race detector enabled to +// find any races. +func TestGate_Concurrency(t *testing.T) { + t.Parallel() + + g := NewOpen() + wg := sync.WaitGroup{} + + // Start closer + wg.Add(1) + go func() { + defer wg.Done() + dice := rand.New(rand.NewSource(time.Now().UnixNano())) + for i := 0; i < 1000; i++ { + g.Close() + time.Sleep(time.Duration(dice.Int63n(100))) + } + }() + + // Start opener + wg.Add(1) + go func() { + defer wg.Done() + dice := rand.New(rand.NewSource(time.Now().UnixNano())) + for i := 0; i < 1000; i++ { + g.Open() + time.Sleep(time.Duration(dice.Int63n(100))) + } + }() + + // Perform reads concurrently with writes + wgCh := make(chan struct{}) + doneCh := make(chan struct{}) + go func() { + defer close(doneCh) + for { + select { + case <-time.After(time.Millisecond): + case <-wgCh: + return + } + g.IsClosed() + g.Wait() + } + }() + + wg.Wait() + close(wgCh) + <-doneCh +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 46a79e3e642b..a774f2c8d239 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -5966,6 +5966,10 @@ const ( // TaskHookFailed indicates that one of the hooks for a task failed. TaskHookFailed = "Task hook failed" + + // TaskRestoreFailed indicates Nomad was unable to reattach to a + // restored task. + TaskRestoreFailed = "Failed Restoring Task" ) // TaskEvent is an event that effects the state of a task and contains meta-data