From 846b48281426beb38d04f6a14f9b6cc6b57a119e Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Tue, 7 May 2019 20:26:04 -0700 Subject: [PATCH 1/6] e2e: fix nomad service for systemd<230 --- e2e/terraform/shared/config/nomad.service | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/e2e/terraform/shared/config/nomad.service b/e2e/terraform/shared/config/nomad.service index 995d055e725e..622b69a470f5 100644 --- a/e2e/terraform/shared/config/nomad.service +++ b/e2e/terraform/shared/config/nomad.service @@ -10,11 +10,15 @@ KillMode=process KillSignal=SIGINT LimitNOFILE=infinity LimitNPROC=infinity +TasksMax=infinity Restart=on-failure RestartSec=2 + +# systemd>=230 prefer StartLimitIntervalSec,StartLimitBurst in Unit, +# however Ubuntu 16.04 only has systemd==229. Use these old style settings +# as they will be supported by newer systemds. StartLimitBurst=3 -StartLimitIntervalSec=10 -TasksMax=infinity +StartLimitInterval=10 [Install] WantedBy=multi-user.target From e7042b674bd11d322e3f5406b5293de5ff166a9d Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Tue, 7 May 2019 23:04:40 -0700 Subject: [PATCH 2/6] client: do not restart dead tasks until server is contacted Fixes #1795 Running restored allocations and pulling what allocations to run from the server happen concurrently. This means that if a client is rebooted, and has its allocations rescheduled, it may restart the dead allocations before it contacts the server and determines they should be dead. This commit makes tasks that fail to reattach on restore wait until the server is contacted before restarting. --- client/allocrunner/alloc_runner.go | 9 ++ client/allocrunner/taskrunner/task_runner.go | 48 +++++- .../taskrunner/task_runner_test.go | 142 +++++++++++++++++- client/client.go | 25 +++ helper/gate/gate.go | 87 +++++++++++ helper/gate/gate_test.go | 126 ++++++++++++++++ nomad/structs/structs.go | 4 + 7 files changed, 438 insertions(+), 3 deletions(-) create mode 100644 helper/gate/gate.go create mode 100644 helper/gate/gate_test.go diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index 7168914b2358..a8612bb44a36 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -721,6 +721,15 @@ func (ar *allocRunner) handleAllocUpdate(update *structs.Allocation) { } +// MarkLive unblocks restored tasks that failed to reattach and are waiting to +// contact a server before restarting the dead task. The Client will call this +// method when the task should run, otherwise the task will be killed. +func (ar *allocRunner) MarkLive() { + for _, tr := range ar.tasks { + tr.MarkLive() + } +} + func (ar *allocRunner) Listener() *cstructs.AllocListener { return ar.allocBroadcaster.Listen() } diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index 1cf727e46922..c8e2ccf08c36 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -25,6 +25,7 @@ import ( cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/client/taskenv" "github.com/hashicorp/nomad/client/vaultclient" + "github.com/hashicorp/nomad/helper/gate" "github.com/hashicorp/nomad/helper/pluginutils/hclspecutils" "github.com/hashicorp/nomad/helper/pluginutils/hclutils" "github.com/hashicorp/nomad/helper/uuid" @@ -193,6 +194,10 @@ type TaskRunner struct { // maxEvents is the capacity of the TaskEvents on the TaskState. // Defaults to defaultMaxEvents but overrideable for testing. maxEvents int + + // restoreGate is used to block restored tasks that failed to reattach + // from restarting until servers are contacted. #1795 + restoreGate *gate.G } type Config struct { @@ -245,6 +250,12 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) { tstate = ts.Copy() } + // Initialize restoreGate as open. It will only be closed if Restore is + // called and fails to reconnect to the task handle. In that case the + // we must wait until contact with the server is made before restarting + // or killing the task. #1795 + restoreGate := gate.NewOpen() + tr := &TaskRunner{ alloc: config.Alloc, allocID: config.Alloc.ID, @@ -270,6 +281,7 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) { devicemanager: config.DeviceManager, driverManager: config.DriverManager, maxEvents: defaultMaxEvents, + restoreGate: restoreGate, } // Create the logger based on the allocation ID @@ -381,6 +393,18 @@ func (tr *TaskRunner) Run() { // - should be handled serially. go tr.handleUpdates() + // If restore failed, don't proceed until servers are contacted + if tr.restoreGate.IsClosed() { + tr.logger.Info("task failed to restore; waiting to contact server before restarting") + select { + case <-tr.killCtx.Done(): + case <-tr.shutdownCtx.Done(): + return + case <-tr.restoreGate.Wait(): + tr.logger.Trace("server contacted; unblocking waiting task") + } + } + MAIN: for !tr.Alloc().TerminalStatus() { select { @@ -858,7 +882,18 @@ func (tr *TaskRunner) Restore() error { if taskHandle := tr.localState.TaskHandle; taskHandle != nil { //TODO if RecoverTask returned the DriverNetwork we wouldn't // have to persist it at all! - tr.restoreHandle(taskHandle, tr.localState.DriverNetwork) + restored := tr.restoreHandle(taskHandle, tr.localState.DriverNetwork) + if !restored && !tr.Alloc().TerminalStatus() { + // Restore failed, close the restore gate to block + // until server is contacted to prevent restarting + // terminal allocs. #1795 + tr.logger.Trace("failed to reattach to task; will not run until server is contacted") + tr.restoreGate.Close() + + ev := structs.NewTaskEvent(structs.TaskRestoreFailed). + SetDisplayMessage("failed to restore task; will not run until server is contacted") + tr.UpdateState(structs.TaskStatePending, ev) + } } return nil } @@ -1073,6 +1108,10 @@ func (tr *TaskRunner) Update(update *structs.Allocation) { // Trigger update hooks if not terminal if !update.TerminalStatus() { tr.triggerUpdateHooks() + + // MarkLive in case task had failed to restore and were waiting + // to hear from the server. + tr.MarkLive() } } @@ -1089,6 +1128,13 @@ func (tr *TaskRunner) triggerUpdateHooks() { } } +// MarkLive unblocks restored tasks that failed to reattach and are waiting to +// contact a server before restarting the dead task. The Client will call this +// method when the task should run, otherwise the task will be killed. +func (tr *TaskRunner) MarkLive() { + tr.restoreGate.Open() +} + // Shutdown TaskRunner gracefully without affecting the state of the task. // Shutdown blocks until the main Run loop exits. func (tr *TaskRunner) Shutdown() { diff --git a/client/allocrunner/taskrunner/task_runner_test.go b/client/allocrunner/taskrunner/task_runner_test.go index 99900b785093..b8a1ad69af12 100644 --- a/client/allocrunner/taskrunner/task_runner_test.go +++ b/client/allocrunner/taskrunner/task_runner_test.go @@ -25,10 +25,12 @@ import ( "github.com/hashicorp/nomad/client/vaultclient" agentconsul "github.com/hashicorp/nomad/command/agent/consul" mockdriver "github.com/hashicorp/nomad/drivers/mock" + "github.com/hashicorp/nomad/drivers/rawexec" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/device" + "github.com/hashicorp/nomad/plugins/drivers" "github.com/hashicorp/nomad/testutil" "github.com/kr/pretty" "github.com/stretchr/testify/assert" @@ -124,8 +126,8 @@ func runTestTaskRunner(t *testing.T, alloc *structs.Allocation, taskName string) } } -// TestTaskRunner_Restore asserts restoring a running task does not rerun the -// task. +// TestTaskRunner_Restore_Running asserts restoring a running task does not +// rerun the task. func TestTaskRunner_Restore_Running(t *testing.T) { t.Parallel() require := require.New(t) @@ -178,6 +180,142 @@ func TestTaskRunner_Restore_Running(t *testing.T) { assert.Equal(t, 1, started) } +// setupRestoreFailureTest starts a service, shuts down the task runner, and +// kills the task before restarting a new TaskRunner. The new TaskRunner is +// returned once it is running and waiting in pending along with a cleanup +// func. +func setupRestoreFailureTest(t *testing.T) (*TaskRunner, func()) { + t.Parallel() + + alloc := mock.Alloc() + task := alloc.Job.TaskGroups[0].Tasks[0] + task.Driver = "raw_exec" + task.Config = map[string]interface{}{ + "command": "sleep", + "args": []string{"30"}, + } + conf, cleanup1 := testTaskRunnerConfig(t, alloc, task.Name) + conf.StateDB = cstate.NewMemDB(conf.Logger) // "persist" state between runs + + // Run the first TaskRunner + origTR, err := NewTaskRunner(conf) + require.NoError(t, err) + go origTR.Run() + cleanup2 := func() { + origTR.Kill(context.Background(), structs.NewTaskEvent("cleanup")) + cleanup1() + } + + // Wait for it to be running + testWaitForTaskToStart(t, origTR) + + handle := origTR.getDriverHandle() + require.NotNil(t, handle) + taskID := handle.taskID + + // Cause TR to exit without shutting down task + origTR.Shutdown() + + // Get the mock driver plugin + driverPlugin, err := conf.DriverManager.Dispense(rawexec.PluginID.Name) + require.NoError(t, err) + rawexecDriver := driverPlugin.(*rawexec.Driver) + + // Assert the task is still running despite TR having exited + taskStatus, err := rawexecDriver.InspectTask(taskID) + require.NoError(t, err) + require.Equal(t, drivers.TaskStateRunning, taskStatus.State) + + // Kill the task so it fails to recover when restore is called + require.NoError(t, rawexecDriver.DestroyTask(taskID, true)) + _, err = rawexecDriver.InspectTask(taskID) + require.EqualError(t, err, drivers.ErrTaskNotFound.Error()) + + // Create a new TaskRunner and Restore the task + newTR, err := NewTaskRunner(conf) + require.NoError(t, err) + require.NoError(t, newTR.Restore()) + + // Assert the restore gate is *closed* because reattachment failed + require.True(t, newTR.restoreGate.IsClosed()) + + // Start new TR + go newTR.Run() + cleanup3 := func() { + newTR.Kill(context.Background(), structs.NewTaskEvent("cleanup")) + cleanup2() + cleanup1() + } + + // Assert task has not been restarted + _, err = rawexecDriver.InspectTask(taskID) + require.EqualError(t, err, drivers.ErrTaskNotFound.Error()) + ts := newTR.TaskState() + require.Equal(t, structs.TaskStatePending, ts.State) + + return newTR, cleanup3 +} + +// TestTaskRunner_Restore_Restart asserts restoring a dead task blocks until +// MarkAlive is called. #1795 +func TestTaskRunner_Restore_Restart(t *testing.T) { + newTR, cleanup := setupRestoreFailureTest(t) + defer cleanup() + + // Fake contacting the server by opening the restore gate + newTR.MarkLive() + + testutil.WaitForResult(func() (bool, error) { + ts := newTR.TaskState().State + return ts == structs.TaskStateRunning, fmt.Errorf("expected task to be running but found %q", ts) + }, func(err error) { + require.NoError(t, err) + }) +} + +// TestTaskRunner_Restore_Kill asserts restoring a dead task blocks until +// the task is killed. #1795 +func TestTaskRunner_Restore_Kill(t *testing.T) { + newTR, cleanup := setupRestoreFailureTest(t) + defer cleanup() + + // Sending the task a terminal update shouldn't kill it or mark it live + alloc := newTR.Alloc().Copy() + alloc.DesiredStatus = structs.AllocDesiredStatusStop + newTR.Update(alloc) + + require.Equal(t, structs.TaskStatePending, newTR.TaskState().State) + + // AllocRunner will immediately kill tasks after sending a terminal + // update. + newTR.Kill(context.Background(), structs.NewTaskEvent(structs.TaskKilling)) + + select { + case <-newTR.WaitCh(): + // It died as expected! + case <-time.After(10 * time.Second): + require.Fail(t, "timeout waiting for task to die") + } +} + +// TestTaskRunner_Restore_Update asserts restoring a dead task blocks until +// Update is called. #1795 +func TestTaskRunner_Restore_Update(t *testing.T) { + newTR, cleanup := setupRestoreFailureTest(t) + defer cleanup() + + // Fake contacting the server by opening the restore gate + alloc := newTR.Alloc().Copy() + newTR.Update(alloc) + + testutil.WaitForResult(func() (bool, error) { + ts := newTR.TaskState().State + return ts == structs.TaskStateRunning, fmt.Errorf("expected task to be running but found %q", ts) + }, func(err error) { + require.NoError(t, err) + }) +} + // TestTaskRunner_TaskEnv_Interpolated asserts driver configurations are // interpolated. func TestTaskRunner_TaskEnv_Interpolated(t *testing.T) { diff --git a/client/client.go b/client/client.go index e45288c28999..030906172fb9 100644 --- a/client/client.go +++ b/client/client.go @@ -126,6 +126,7 @@ type AllocRunner interface { Run() StatsReporter() interfaces.AllocStatsReporter Update(*structs.Allocation) + MarkLive() WaitCh() <-chan struct{} DestroyCh() <-chan struct{} ShutdownCh() <-chan struct{} @@ -1999,6 +2000,12 @@ func (c *Client) runAllocs(update *allocUpdates) { errs := 0 + // Mark existing allocations as live in case they failed to reattach on + // restore and are waiting to hear from the server before restarting. + for _, live := range diff.ignore { + c.markAllocLive(live) + } + // Remove the old allocations for _, remove := range diff.removed { c.removeAlloc(remove) @@ -2082,6 +2089,24 @@ func makeFailedAlloc(add *structs.Allocation, err error) *structs.Allocation { return stripped } +// markAllocLive is invoked when an alloc should be running but has not been +// updated or just been added. This allows unblocking tasks that failed to +// reattach on restored and are waiting to hear from the server. +func (c *Client) markAllocLive(allocID string) { + c.allocLock.Lock() + defer c.allocLock.Unlock() + + ar, ok := c.allocs[allocID] + if !ok { + // This should never happen as alloc diffing should cause + // unknown allocs to be added, not marked live. + c.logger.Warn("unknown alloc should be running but is not", "alloc_id", allocID) + return + } + + ar.MarkLive() +} + // removeAlloc is invoked when we should remove an allocation because it has // been removed by the server. func (c *Client) removeAlloc(allocID string) { diff --git a/helper/gate/gate.go b/helper/gate/gate.go new file mode 100644 index 000000000000..092eec92a9ab --- /dev/null +++ b/helper/gate/gate.go @@ -0,0 +1,87 @@ +// Package gate implements a simple on/off latch or gate: it blocks waiters +// until opened. Waiters may receive on a chan which is closed when the gate is +// open. +package gate + +import "sync" + +// closedCh is a chan initialized as closed +var closedCh chan struct{} + +func init() { + closedCh = make(chan struct{}) + close(closedCh) +} + +// G is a gate which blocks waiters until opened and is safe for concurrent +// use. Must be created via New. +type G struct { + // open is true if the gate is open and ch is closed. + open bool + + // ch is closed if the gate is open. + ch chan struct{} + + mu sync.Mutex +} + +// NewClosed returns a closed gate. The chan returned by Wait will block until Open +// is called. +func NewClosed() *G { + return &G{ + ch: make(chan struct{}), + } +} + +// NewOpen returns an open gate. The chan returned by Wait is closed and +// therefore will never block. +func NewOpen() *G { + return &G{ + open: true, + ch: closedCh, + } +} + +// Open the gate. Unblocks any Waiters. Opening an opened gate is a noop. Safe +// for concurrent ues with Close and Wait. +func (g *G) Open() { + g.mu.Lock() + defer g.mu.Unlock() + + if g.open { + return + } + + g.open = true + close(g.ch) +} + +// Close the gate. Blocks subsequent Wait callers. Closing a closed gate is a +// noop. Safe for concurrent use with Open and Wait. +func (g *G) Close() { + g.mu.Lock() + defer g.mu.Unlock() + + if !g.open { + return + } + + g.open = false + g.ch = make(chan struct{}) +} + +// Wait returns a chan that blocks until the gate is open. Safe for concurrent +// use with Open and Close, but the chan should not be reused between calls to +// Open and Close. +func (g *G) Wait() <-chan struct{} { + g.mu.Lock() + defer g.mu.Unlock() + return g.ch +} + +// IsClosed returns true if the gate is closed. +func (g *G) IsClosed() bool { + g.mu.Lock() + defer g.mu.Unlock() + return !g.open +} diff --git a/helper/gate/gate_test.go b/helper/gate/gate_test.go new file mode 100644 index 000000000000..cceb9673c05b --- /dev/null +++ b/helper/gate/gate_test.go @@ -0,0 +1,126 @@ +package gate + +import ( + "math/rand" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestGate_NewClosed(t *testing.T) { + t.Parallel() + + g := NewClosed() + + assertClosed := func() { + require.True(t, g.IsClosed()) + select { + case <-g.Wait(): + require.Fail(t, "expected gate to be closed") + default: + // Ok! + } + } + + assertClosed() + g.Close() + assertClosed() + + // Close should be safe to call multiple times + g.Close() + assertClosed() + + g.Open() + require.False(t, g.IsClosed()) + select { + case <-g.Wait(): + // Ok! + default: + require.Fail(t, "expected gate to be open") + } +} + +func TestGate_NewOpen(t *testing.T) { + t.Parallel() + + g := NewOpen() + + assertOpen := func() { + require.False(t, g.IsClosed()) + select { + case <-g.Wait(): + // Ok! + default: + require.Fail(t, "expected gate to be open") + } + } + + assertOpen() + g.Open() + assertOpen() + + // Open should be safe to call multiple times + g.Open() + assertOpen() + + g.Close() + select { + case <-g.Wait(): + require.Fail(t, "expected gate to be closed") + default: + // Ok! + } +} + +// TestGate_Concurrency is meant to be run with the race detector enabled to +// find any races. +func TestGate_Concurrency(t *testing.T) { + t.Parallel() + + g := NewOpen() + wg := sync.WaitGroup{} + + // Start closer + wg.Add(1) + go func() { + defer wg.Done() + dice := rand.New(rand.NewSource(time.Now().UnixNano())) + for i := 0; i < 1000; i++ { + g.Close() + time.Sleep(time.Duration(dice.Int63n(100))) + } + }() + + // Start opener + wg.Add(1) + go func() { + defer wg.Done() + dice := rand.New(rand.NewSource(time.Now().UnixNano())) + for i := 0; i < 1000; i++ { + g.Open() + time.Sleep(time.Duration(dice.Int63n(100))) + } + }() + + // Perform reads concurrently with writes + wgCh := make(chan struct{}) + doneCh := make(chan struct{}) + go func() { + defer close(doneCh) + for { + select { + case <-time.After(time.Millisecond): + case <-wgCh: + return + } + g.IsClosed() + g.Wait() + } + }() + + wg.Wait() + close(wgCh) + <-doneCh +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index c9271efee171..6f6d54005795 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -5971,6 +5971,10 @@ const ( // TaskHookFailed indicates that one of the hooks for a task failed. TaskHookFailed = "Task hook failed" + + // TaskRestoreFailed indicates Nomad was unable to reattach to a + // restored task. + TaskRestoreFailed = "Failed Restoring Task" ) // TaskEvent is an event that effects the state of a task and contains meta-data From 4b854cc55759f3c62e28b788175e4133c9985b41 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Wed, 8 May 2019 13:58:16 -0700 Subject: [PATCH 3/6] drivers/mock: implement InspectTask --- drivers/mock/driver.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/drivers/mock/driver.go b/drivers/mock/driver.go index 1c99a9a9ab9e..749d4c25c414 100644 --- a/drivers/mock/driver.go +++ b/drivers/mock/driver.go @@ -555,7 +555,13 @@ func (d *Driver) DestroyTask(taskID string, force bool) error { } func (d *Driver) InspectTask(taskID string) (*drivers.TaskStatus, error) { - panic("not implemented") + h, ok := d.tasks.Get(taskID) + if !ok { + return nil, drivers.ErrTaskNotFound + } + + return h.TaskStatus(), nil + } func (d *Driver) TaskStats(ctx context.Context, taskID string, interval time.Duration) (<-chan *drivers.TaskResourceUsage, error) { From 6a2792ad90aedbb6e7c826944312ebc1a0f21237 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Fri, 10 May 2019 08:51:06 -0700 Subject: [PATCH 4/6] client: do not restart dead tasks until server is contacted (try 2) Refactoring of 104067bc2b2002a4e45ae7b667a476b89addc162 Switch the MarkLive method for a chan that is closed by the client. Thanks to @notnoop for the idea! The old approach called a method on most existing ARs and TRs on every runAllocs call. The new approach does a once.Do call in runAllocs to accomplish the same thing with less work. Able to remove the gate abstraction that did much more than was needed. --- client/allocrunner/alloc_runner.go | 16 +-- client/allocrunner/config.go | 4 + client/allocrunner/taskrunner/task_runner.go | 72 +++++----- .../taskrunner/task_runner_test.go | 48 +++---- client/allocrunner/testing.go | 23 ++-- client/client.go | 38 ++---- helper/gate/gate.go | 87 ------------ helper/gate/gate_test.go | 126 ------------------ 8 files changed, 98 insertions(+), 316 deletions(-) delete mode 100644 helper/gate/gate.go delete mode 100644 helper/gate/gate_test.go diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index a8612bb44a36..e9efa120f15f 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -136,6 +136,11 @@ type allocRunner struct { // driverManager is responsible for dispensing driver plugins and registering // event handlers driverManager drivermanager.Manager + + // serversContactedCh is passed to TaskRunners so they can detect when + // servers have been contacted for the first time in case of a failed + // restore. + serversContactedCh chan struct{} } // NewAllocRunner returns a new allocation runner. @@ -167,6 +172,7 @@ func NewAllocRunner(config *Config) (*allocRunner, error) { prevAllocMigrator: config.PrevAllocMigrator, devicemanager: config.DeviceManager, driverManager: config.DriverManager, + serversContactedCh: config.ServersContactedCh, } // Create the logger based on the allocation ID @@ -205,6 +211,7 @@ func (ar *allocRunner) initTaskRunners(tasks []*structs.Task) error { DeviceStatsReporter: ar.deviceStatsReporter, DeviceManager: ar.devicemanager, DriverManager: ar.driverManager, + ServersContactedCh: ar.serversContactedCh, } // Create, but do not Run, the task runner @@ -721,15 +728,6 @@ func (ar *allocRunner) handleAllocUpdate(update *structs.Allocation) { } -// MarkLive unblocks restored tasks that failed to reattach and are waiting to -// contact a server before restarting the dead task. The Client will call this -// method when the task should run, otherwise the task will be killed. -func (ar *allocRunner) MarkLive() { - for _, tr := range ar.tasks { - tr.MarkLive() - } -} - func (ar *allocRunner) Listener() *cstructs.AllocListener { return ar.allocBroadcaster.Listen() } diff --git a/client/allocrunner/config.go b/client/allocrunner/config.go index 16f3e27db438..42cea978ea60 100644 --- a/client/allocrunner/config.go +++ b/client/allocrunner/config.go @@ -51,4 +51,8 @@ type Config struct { // DriverManager handles dispensing of driver plugins DriverManager drivermanager.Manager + + // ServersContactedCh is closed when the first GetClientAllocs call to + // servers succeeds and allocs are synced. + ServersContactedCh chan struct{} } diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index c8e2ccf08c36..e88487431f71 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -25,7 +25,6 @@ import ( cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/client/taskenv" "github.com/hashicorp/nomad/client/vaultclient" - "github.com/hashicorp/nomad/helper/gate" "github.com/hashicorp/nomad/helper/pluginutils/hclspecutils" "github.com/hashicorp/nomad/helper/pluginutils/hclutils" "github.com/hashicorp/nomad/helper/uuid" @@ -195,9 +194,15 @@ type TaskRunner struct { // Defaults to defaultMaxEvents but overrideable for testing. maxEvents int - // restoreGate is used to block restored tasks that failed to reattach - // from restarting until servers are contacted. #1795 - restoreGate *gate.G + // serversContactedCh is passed to TaskRunners so they can detect when + // servers have been contacted for the first time in case of a failed + // restore. + serversContactedCh <-chan struct{} + + // waitOnServers defaults to false but will be set true if a restore + // fails and the Run method should wait until serversContactedCh is + // closed. + waitOnServers bool } type Config struct { @@ -227,6 +232,10 @@ type Config struct { // DriverManager is used to dispense driver plugins and register event // handlers DriverManager drivermanager.Manager + + // ServersContactedCh is closed when the first GetClientAllocs call to + // servers succeeds and allocs are synced. + ServersContactedCh chan struct{} } func NewTaskRunner(config *Config) (*TaskRunner, error) { @@ -250,12 +259,6 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) { tstate = ts.Copy() } - // Initialize restoreGate as open. It will only be closed if Restore is - // called and fails to reconnect to the task handle. In that case the - // we must wait until contact with the server is made before restarting - // or killing the task. #1795 - restoreGate := gate.NewOpen() - tr := &TaskRunner{ alloc: config.Alloc, allocID: config.Alloc.ID, @@ -281,7 +284,7 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) { devicemanager: config.DeviceManager, driverManager: config.DriverManager, maxEvents: defaultMaxEvents, - restoreGate: restoreGate, + serversContactedCh: config.ServersContactedCh, } // Create the logger based on the allocation ID @@ -393,14 +396,15 @@ func (tr *TaskRunner) Run() { // - should be handled serially. go tr.handleUpdates() - // If restore failed, don't proceed until servers are contacted - if tr.restoreGate.IsClosed() { + // If restore failed wait until servers are contacted before running. + // #1795 + if tr.waitOnServers { tr.logger.Info("task failed to restore; waiting to contact server before restarting") select { case <-tr.killCtx.Done(): case <-tr.shutdownCtx.Done(): return - case <-tr.restoreGate.Wait(): + case <-tr.serversContactedCh: tr.logger.Trace("server contacted; unblocking waiting task") } } @@ -883,18 +887,27 @@ func (tr *TaskRunner) Restore() error { //TODO if RecoverTask returned the DriverNetwork we wouldn't // have to persist it at all! restored := tr.restoreHandle(taskHandle, tr.localState.DriverNetwork) - if !restored && !tr.Alloc().TerminalStatus() { - // Restore failed, close the restore gate to block - // until server is contacted to prevent restarting - // terminal allocs. #1795 - tr.logger.Trace("failed to reattach to task; will not run until server is contacted") - tr.restoreGate.Close() - - ev := structs.NewTaskEvent(structs.TaskRestoreFailed). - SetDisplayMessage("failed to restore task; will not run until server is contacted") - tr.UpdateState(structs.TaskStatePending, ev) + + // If the handle could not be restored, the alloc is + // non-terminal, and the task isn't a system job: wait until + // servers have been contacted before running. #1795 + if restored { + return nil + } + + alloc := tr.Alloc() + if alloc.TerminalStatus() || alloc.Job.Type == structs.JobTypeSystem { + return nil } + + tr.logger.Trace("failed to reattach to task; will not run until server is contacted") + tr.waitOnServers = true + + ev := structs.NewTaskEvent(structs.TaskRestoreFailed). + SetDisplayMessage("failed to restore task; will not run until server is contacted") + tr.UpdateState(structs.TaskStatePending, ev) } + return nil } @@ -1108,10 +1121,6 @@ func (tr *TaskRunner) Update(update *structs.Allocation) { // Trigger update hooks if not terminal if !update.TerminalStatus() { tr.triggerUpdateHooks() - - // MarkLive in case task had failed to restore and were waiting - // to hear from the server. - tr.MarkLive() } } @@ -1128,13 +1137,6 @@ func (tr *TaskRunner) triggerUpdateHooks() { } } -// MarkLive unblocks restored tasks that failed to reattach and are waiting to -// contact a server before restarting the dead task. The Client will call this -// method when the task should run, otherwise the task will be killed. -func (tr *TaskRunner) MarkLive() { - tr.restoreGate.Open() -} - // Shutdown TaskRunner gracefully without affecting the state of the task. // Shutdown blocks until the main Run loop exits. func (tr *TaskRunner) Shutdown() { diff --git a/client/allocrunner/taskrunner/task_runner_test.go b/client/allocrunner/taskrunner/task_runner_test.go index b8a1ad69af12..f1589a1d1a69 100644 --- a/client/allocrunner/taskrunner/task_runner_test.go +++ b/client/allocrunner/taskrunner/task_runner_test.go @@ -95,17 +95,18 @@ func testTaskRunnerConfig(t *testing.T, alloc *structs.Allocation, taskName stri } conf := &Config{ - Alloc: alloc, - ClientConfig: clientConf, - Consul: consulapi.NewMockConsulServiceClient(t, logger), - Task: thisTask, - TaskDir: taskDir, - Logger: clientConf.Logger, - Vault: vaultclient.NewMockVaultClient(), - StateDB: cstate.NoopDB{}, - StateUpdater: NewMockTaskStateUpdater(), - DeviceManager: devicemanager.NoopMockManager(), - DriverManager: drivermanager.TestDriverManager(t), + Alloc: alloc, + ClientConfig: clientConf, + Consul: consulapi.NewMockConsulServiceClient(t, logger), + Task: thisTask, + TaskDir: taskDir, + Logger: clientConf.Logger, + Vault: vaultclient.NewMockVaultClient(), + StateDB: cstate.NoopDB{}, + StateUpdater: NewMockTaskStateUpdater(), + DeviceManager: devicemanager.NoopMockManager(), + DriverManager: drivermanager.TestDriverManager(t), + ServersContactedCh: make(chan struct{}), } return conf, trCleanup } @@ -184,7 +185,7 @@ func TestTaskRunner_Restore_Running(t *testing.T) { // kills the task before restarting a new TaskRunner. The new TaskRunner is // returned once it is running and waiting in pending along with a cleanup // func. -func setupRestoreFailureTest(t *testing.T) (*TaskRunner, func()) { +func setupRestoreFailureTest(t *testing.T) (*TaskRunner, *Config, func()) { t.Parallel() alloc := mock.Alloc() @@ -234,10 +235,10 @@ func setupRestoreFailureTest(t *testing.T) (*TaskRunner, func()) { // Create a new TaskRunner and Restore the task newTR, err := NewTaskRunner(conf) require.NoError(t, err) - require.NoError(t, newTR.Restore()) - // Assert the restore gate is *closed* because reattachment failed - require.True(t, newTR.restoreGate.IsClosed()) + // Assert the TR will wait on servers because reattachment failed + require.NoError(t, newTR.Restore()) + require.True(t, newTR.waitOnServers) // Start new TR go newTR.Run() @@ -253,17 +254,17 @@ func setupRestoreFailureTest(t *testing.T) (*TaskRunner, func()) { ts := newTR.TaskState() require.Equal(t, structs.TaskStatePending, ts.State) - return newTR, cleanup3 + return newTR, conf, cleanup3 } // TestTaskRunner_Restore_Restart asserts restoring a dead task blocks until // MarkAlive is called. #1795 func TestTaskRunner_Restore_Restart(t *testing.T) { - newTR, cleanup := setupRestoreFailureTest(t) + newTR, conf, cleanup := setupRestoreFailureTest(t) defer cleanup() - // Fake contacting the server by opening the restore gate - newTR.MarkLive() + // Fake contacting the server by closing the chan + close(conf.ServersContactedCh) testutil.WaitForResult(func() (bool, error) { ts := newTR.TaskState().State @@ -276,10 +277,10 @@ func TestTaskRunner_Restore_Restart(t *testing.T) { // TestTaskRunner_Restore_Kill asserts restoring a dead task blocks until // the task is killed. #1795 func TestTaskRunner_Restore_Kill(t *testing.T) { - newTR, cleanup := setupRestoreFailureTest(t) + newTR, _, cleanup := setupRestoreFailureTest(t) defer cleanup() - // Sending the task a terminal update shouldn't kill it or mark it live + // Sending the task a terminal update shouldn't kill it or unblock it alloc := newTR.Alloc().Copy() alloc.DesiredStatus = structs.AllocDesiredStatusStop newTR.Update(alloc) @@ -301,12 +302,13 @@ func TestTaskRunner_Restore_Kill(t *testing.T) { // TestTaskRunner_Restore_Update asserts restoring a dead task blocks until // Update is called. #1795 func TestTaskRunner_Restore_Update(t *testing.T) { - newTR, cleanup := setupRestoreFailureTest(t) + newTR, conf, cleanup := setupRestoreFailureTest(t) defer cleanup() - // Fake contacting the server by opening the restore gate + // Fake Client.runAllocs behavior by calling Update then closing chan alloc := newTR.Alloc().Copy() newTR.Update(alloc) + close(conf.ServersContactedCh) testutil.WaitForResult(func() (bool, error) { ts := newTR.TaskState().State diff --git a/client/allocrunner/testing.go b/client/allocrunner/testing.go index 5933019ea57a..75806644bb80 100644 --- a/client/allocrunner/testing.go +++ b/client/allocrunner/testing.go @@ -55,17 +55,18 @@ func testAllocRunnerConfig(t *testing.T, alloc *structs.Allocation) (*Config, fu clientConf, cleanup := clientconfig.TestClientConfig(t) conf := &Config{ // Copy the alloc in case the caller edits and reuses it - Alloc: alloc.Copy(), - Logger: clientConf.Logger, - ClientConfig: clientConf, - StateDB: state.NoopDB{}, - Consul: consul.NewMockConsulServiceClient(t, clientConf.Logger), - Vault: vaultclient.NewMockVaultClient(), - StateUpdater: &MockStateUpdater{}, - PrevAllocWatcher: allocwatcher.NoopPrevAlloc{}, - PrevAllocMigrator: allocwatcher.NoopPrevAlloc{}, - DeviceManager: devicemanager.NoopMockManager(), - DriverManager: drivermanager.TestDriverManager(t), + Alloc: alloc.Copy(), + Logger: clientConf.Logger, + ClientConfig: clientConf, + StateDB: state.NoopDB{}, + Consul: consul.NewMockConsulServiceClient(t, clientConf.Logger), + Vault: vaultclient.NewMockVaultClient(), + StateUpdater: &MockStateUpdater{}, + PrevAllocWatcher: allocwatcher.NoopPrevAlloc{}, + PrevAllocMigrator: allocwatcher.NoopPrevAlloc{}, + DeviceManager: devicemanager.NoopMockManager(), + DriverManager: drivermanager.TestDriverManager(t), + ServersContactedCh: make(chan struct{}), } return conf, cleanup } diff --git a/client/client.go b/client/client.go index 030906172fb9..51edb28ffa3e 100644 --- a/client/client.go +++ b/client/client.go @@ -126,7 +126,6 @@ type AllocRunner interface { Run() StatsReporter() interfaces.AllocStatsReporter Update(*structs.Allocation) - MarkLive() WaitCh() <-chan struct{} DestroyCh() <-chan struct{} ShutdownCh() <-chan struct{} @@ -260,6 +259,11 @@ type Client struct { // fpInitialized chan is closed when the first batch of fingerprints are // applied to the node and the server is updated fpInitialized chan struct{} + + // serversContactedCh is closed when GetClientAllocs and runAllocs have + // successfully run once. + serversContactedCh chan struct{} + serversContactedOnce sync.Once } var ( @@ -310,6 +314,8 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic triggerEmitNodeEvent: make(chan *structs.NodeEvent, 8), fpInitialized: make(chan struct{}), invalidAllocs: make(map[string]struct{}), + serversContactedCh: make(chan struct{}), + serversContactedOnce: sync.Once{}, } c.batchNodeUpdates = newBatchNodeUpdates( @@ -2000,12 +2006,6 @@ func (c *Client) runAllocs(update *allocUpdates) { errs := 0 - // Mark existing allocations as live in case they failed to reattach on - // restore and are waiting to hear from the server before restarting. - for _, live := range diff.ignore { - c.markAllocLive(live) - } - // Remove the old allocations for _, remove := range diff.removed { c.removeAlloc(remove) @@ -2037,6 +2037,12 @@ func (c *Client) runAllocs(update *allocUpdates) { } } + // Mark servers as having been contacted so blocked tasks that failed + // to restore can now restart. + c.serversContactedOnce.Do(func() { + close(c.serversContactedCh) + }) + // Trigger the GC once more now that new allocs are started that could // have caused thresholds to be exceeded c.garbageCollector.Trigger() @@ -2089,24 +2095,6 @@ func makeFailedAlloc(add *structs.Allocation, err error) *structs.Allocation { return stripped } -// markAllocLive is invoked when an alloc should be running but has not been -// updated or just been added. This allows unblocking tasks that failed to -// reattach on restored and are waiting to hear from the server. -func (c *Client) markAllocLive(allocID string) { - c.allocLock.Lock() - defer c.allocLock.Unlock() - - ar, ok := c.allocs[allocID] - if !ok { - // This should never happen as alloc diffing should cause - // unknown allocs to be added, not marked live. - c.logger.Warn("unknown alloc should be running but is not", "alloc_id", allocID) - return - } - - ar.MarkLive() -} - // removeAlloc is invoked when we should remove an allocation because it has // been removed by the server. func (c *Client) removeAlloc(allocID string) { diff --git a/helper/gate/gate.go b/helper/gate/gate.go deleted file mode 100644 index 092eec92a9ab..000000000000 --- a/helper/gate/gate.go +++ /dev/null @@ -1,87 +0,0 @@ -// Package gate implements a simple on/off latch or gate: it blocks waiters -// until opened. Waiters may receive on a chan which is closed when the gate is -// open. -package gate - -import "sync" - -// closedCh is a chan initialized as closed -var closedCh chan struct{} - -func init() { - closedCh = make(chan struct{}) - close(closedCh) -} - -// G is a gate which blocks waiters until opened and is safe for concurrent -// use. Must be created via New. -type G struct { - // open is true if the gate is open and ch is closed. - open bool - - // ch is closed if the gate is open. - ch chan struct{} - - mu sync.Mutex -} - -// NewClosed returns a closed gate. The chan returned by Wait will block until Open -// is called. -func NewClosed() *G { - return &G{ - ch: make(chan struct{}), - } -} - -// NewOpen returns an open gate. The chan returned by Wait is closed and -// therefore will never block. -func NewOpen() *G { - return &G{ - open: true, - ch: closedCh, - } -} - -// Open the gate. Unblocks any Waiters. Opening an opened gate is a noop. Safe -// for concurrent ues with Close and Wait. -func (g *G) Open() { - g.mu.Lock() - defer g.mu.Unlock() - - if g.open { - return - } - - g.open = true - close(g.ch) -} - -// Close the gate. Blocks subsequent Wait callers. Closing a closed gate is a -// noop. Safe for concurrent use with Open and Wait. -func (g *G) Close() { - g.mu.Lock() - defer g.mu.Unlock() - - if !g.open { - return - } - - g.open = false - g.ch = make(chan struct{}) -} - -// Wait returns a chan that blocks until the gate is open. Safe for concurrent -// use with Open and Close, but the chan should not be reused between calls to -// Open and Close. -func (g *G) Wait() <-chan struct{} { - g.mu.Lock() - defer g.mu.Unlock() - return g.ch -} - -// IsClosed returns true if the gate is closed. -func (g *G) IsClosed() bool { - g.mu.Lock() - defer g.mu.Unlock() - return !g.open -} diff --git a/helper/gate/gate_test.go b/helper/gate/gate_test.go deleted file mode 100644 index cceb9673c05b..000000000000 --- a/helper/gate/gate_test.go +++ /dev/null @@ -1,126 +0,0 @@ -package gate - -import ( - "math/rand" - "sync" - "testing" - "time" - - "github.com/stretchr/testify/require" -) - -func TestGate_NewClosed(t *testing.T) { - t.Parallel() - - g := NewClosed() - - assertClosed := func() { - require.True(t, g.IsClosed()) - select { - case <-g.Wait(): - require.Fail(t, "expected gate to be closed") - default: - // Ok! - } - } - - assertClosed() - g.Close() - assertClosed() - - // Close should be safe to call multiple times - g.Close() - assertClosed() - - g.Open() - require.False(t, g.IsClosed()) - select { - case <-g.Wait(): - // Ok! - default: - require.Fail(t, "expected gate to be open") - } -} - -func TestGate_NewOpen(t *testing.T) { - t.Parallel() - - g := NewOpen() - - assertOpen := func() { - require.False(t, g.IsClosed()) - select { - case <-g.Wait(): - // Ok! - default: - require.Fail(t, "expected gate to be open") - } - } - - assertOpen() - g.Open() - assertOpen() - - // Open should be safe to call multiple times - g.Open() - assertOpen() - - g.Close() - select { - case <-g.Wait(): - require.Fail(t, "expected gate to be closed") - default: - // Ok! - } -} - -// TestGate_Concurrency is meant to be run with the race detector enabled to -// find any races. -func TestGate_Concurrency(t *testing.T) { - t.Parallel() - - g := NewOpen() - wg := sync.WaitGroup{} - - // Start closer - wg.Add(1) - go func() { - defer wg.Done() - dice := rand.New(rand.NewSource(time.Now().UnixNano())) - for i := 0; i < 1000; i++ { - g.Close() - time.Sleep(time.Duration(dice.Int63n(100))) - } - }() - - // Start opener - wg.Add(1) - go func() { - defer wg.Done() - dice := rand.New(rand.NewSource(time.Now().UnixNano())) - for i := 0; i < 1000; i++ { - g.Open() - time.Sleep(time.Duration(dice.Int63n(100))) - } - }() - - // Perform reads concurrently with writes - wgCh := make(chan struct{}) - doneCh := make(chan struct{}) - go func() { - defer close(doneCh) - for { - select { - case <-time.After(time.Millisecond): - case <-wgCh: - return - } - g.IsClosed() - g.Wait() - } - }() - - wg.Wait() - close(wgCh) - <-doneCh -} From 796c05b9b8d87a5ee9623298172fdec8b87461e1 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Fri, 10 May 2019 08:54:35 -0700 Subject: [PATCH 5/6] client: register before restoring Registration and restoring allocs don't share state or depend on each other in any way (syncing allocs with servers is done outside of registration). Since restoring is synchronous, start the registration goroutine first. For nodes with lots of allocs to restore or close to their heartbeat deadline, this could be the difference between becoming "lost" or not. --- .../taskrunner/task_runner_test.go | 90 +++++++++++++++++-- client/client.go | 6 +- 2 files changed, 87 insertions(+), 9 deletions(-) diff --git a/client/allocrunner/taskrunner/task_runner_test.go b/client/allocrunner/taskrunner/task_runner_test.go index f1589a1d1a69..7d2bd5275e60 100644 --- a/client/allocrunner/taskrunner/task_runner_test.go +++ b/client/allocrunner/taskrunner/task_runner_test.go @@ -185,10 +185,9 @@ func TestTaskRunner_Restore_Running(t *testing.T) { // kills the task before restarting a new TaskRunner. The new TaskRunner is // returned once it is running and waiting in pending along with a cleanup // func. -func setupRestoreFailureTest(t *testing.T) (*TaskRunner, *Config, func()) { +func setupRestoreFailureTest(t *testing.T, alloc *structs.Allocation) (*TaskRunner, *Config, func()) { t.Parallel() - alloc := mock.Alloc() task := alloc.Job.TaskGroups[0].Tasks[0] task.Driver = "raw_exec" task.Config = map[string]interface{}{ @@ -217,7 +216,7 @@ func setupRestoreFailureTest(t *testing.T) (*TaskRunner, *Config, func()) { // Cause TR to exit without shutting down task origTR.Shutdown() - // Get the mock driver plugin + // Get the driver driverPlugin, err := conf.DriverManager.Dispense(rawexec.PluginID.Name) require.NoError(t, err) rawexecDriver := driverPlugin.(*rawexec.Driver) @@ -233,6 +232,7 @@ func setupRestoreFailureTest(t *testing.T) (*TaskRunner, *Config, func()) { require.EqualError(t, err, drivers.ErrTaskNotFound.Error()) // Create a new TaskRunner and Restore the task + conf.ServersContactedCh = make(chan struct{}) newTR, err := NewTaskRunner(conf) require.NoError(t, err) @@ -260,7 +260,7 @@ func setupRestoreFailureTest(t *testing.T) (*TaskRunner, *Config, func()) { // TestTaskRunner_Restore_Restart asserts restoring a dead task blocks until // MarkAlive is called. #1795 func TestTaskRunner_Restore_Restart(t *testing.T) { - newTR, conf, cleanup := setupRestoreFailureTest(t) + newTR, conf, cleanup := setupRestoreFailureTest(t, mock.Alloc()) defer cleanup() // Fake contacting the server by closing the chan @@ -277,7 +277,7 @@ func TestTaskRunner_Restore_Restart(t *testing.T) { // TestTaskRunner_Restore_Kill asserts restoring a dead task blocks until // the task is killed. #1795 func TestTaskRunner_Restore_Kill(t *testing.T) { - newTR, _, cleanup := setupRestoreFailureTest(t) + newTR, _, cleanup := setupRestoreFailureTest(t, mock.Alloc()) defer cleanup() // Sending the task a terminal update shouldn't kill it or unblock it @@ -302,12 +302,17 @@ func TestTaskRunner_Restore_Kill(t *testing.T) { // TestTaskRunner_Restore_Update asserts restoring a dead task blocks until // Update is called. #1795 func TestTaskRunner_Restore_Update(t *testing.T) { - newTR, conf, cleanup := setupRestoreFailureTest(t) + newTR, conf, cleanup := setupRestoreFailureTest(t, mock.Alloc()) defer cleanup() // Fake Client.runAllocs behavior by calling Update then closing chan alloc := newTR.Alloc().Copy() newTR.Update(alloc) + + // Update alone should not unblock the test + require.Equal(t, structs.TaskStatePending, newTR.TaskState().State) + + // Fake Client.runAllocs behavior of closing chan after Update close(conf.ServersContactedCh) testutil.WaitForResult(func() (bool, error) { @@ -318,6 +323,79 @@ func TestTaskRunner_Restore_Update(t *testing.T) { }) } +// TestTaskRunner_Restore_System asserts restoring a dead system task does not +// block. +func TestTaskRunner_Restore_System(t *testing.T) { + t.Parallel() + + alloc := mock.Alloc() + alloc.Job.Type = structs.JobTypeSystem + task := alloc.Job.TaskGroups[0].Tasks[0] + task.Driver = "raw_exec" + task.Config = map[string]interface{}{ + "command": "sleep", + "args": []string{"30"}, + } + conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name) + defer cleanup() + conf.StateDB = cstate.NewMemDB(conf.Logger) // "persist" state between runs + + // Run the first TaskRunner + origTR, err := NewTaskRunner(conf) + require.NoError(t, err) + go origTR.Run() + defer origTR.Kill(context.Background(), structs.NewTaskEvent("cleanup")) + + // Wait for it to be running + testWaitForTaskToStart(t, origTR) + + handle := origTR.getDriverHandle() + require.NotNil(t, handle) + taskID := handle.taskID + + // Cause TR to exit without shutting down task + origTR.Shutdown() + + // Get the driver + driverPlugin, err := conf.DriverManager.Dispense(rawexec.PluginID.Name) + require.NoError(t, err) + rawexecDriver := driverPlugin.(*rawexec.Driver) + + // Assert the task is still running despite TR having exited + taskStatus, err := rawexecDriver.InspectTask(taskID) + require.NoError(t, err) + require.Equal(t, drivers.TaskStateRunning, taskStatus.State) + + // Kill the task so it fails to recover when restore is called + require.NoError(t, rawexecDriver.DestroyTask(taskID, true)) + _, err = rawexecDriver.InspectTask(taskID) + require.EqualError(t, err, drivers.ErrTaskNotFound.Error()) + + // Create a new TaskRunner and Restore the task + conf.ServersContactedCh = make(chan struct{}) + newTR, err := NewTaskRunner(conf) + require.NoError(t, err) + + // Assert the TR will not wait on servers even though reattachment + // failed because it is a system task. + require.NoError(t, newTR.Restore()) + require.False(t, newTR.waitOnServers) + + // Nothing should have closed the chan + select { + case <-conf.ServersContactedCh: + require.Fail(t, "serversContactedCh was closed but should not have been") + default: + } + + testutil.WaitForResult(func() (bool, error) { + ts := newTR.TaskState().State + return ts == structs.TaskStateRunning, fmt.Errorf("expected task to be running but found %q", ts) + }, func(err error) { + require.NoError(t, err) + }) +} + // TestTaskRunner_TaskEnv_Interpolated asserts driver configurations are // interpolated. func TestTaskRunner_TaskEnv_Interpolated(t *testing.T) { diff --git a/client/client.go b/client/client.go index 51edb28ffa3e..a15618f77c46 100644 --- a/client/client.go +++ b/client/client.go @@ -446,6 +446,9 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic logger.Warn("batch fingerprint operation timed out; proceeding to register with fingerprinted plugins so far") } + // Register and then start heartbeating to the servers. + c.shutdownGroup.Go(c.registerAndHeartbeat) + // Restore the state if err := c.restoreState(); err != nil { logger.Error("failed to restore state", "error", err) @@ -460,9 +463,6 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic return nil, fmt.Errorf("failed to restore state") } - // Register and then start heartbeating to the servers. - c.shutdownGroup.Go(c.registerAndHeartbeat) - // Begin periodic snapshotting of state. c.shutdownGroup.Go(c.periodicSnapshot) From abd809d60afd436459cad4f03ff88e8130aa065c Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Tue, 14 May 2019 09:10:09 -0700 Subject: [PATCH 6/6] docs: changelog entry for #5669 and fix comment --- CHANGELOG.md | 4 ++++ client/allocrunner/taskrunner/task_runner.go | 3 +-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 03fbb622d58f..3aa3df85fd7e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ __BACKWARDS INCOMPATIBILITIES:__ to update your code. [[GH-5536](https://github.com/hashicorp/nomad/pull/5536)] * client: The format of check IDs in Consul has changed. If you rely upon Nomad's check IDs you will need to update your code. [[GH-5536](https://github.com/hashicorp/nomad/pull/5536)] + * client: On startup a client will reattach to running tasks as before but + will not restart exited tasks. Exited tasks will be restarted only after the + client has reestablished communication with servers. System jobs will always + be restarted. [[GH-5669](https://github.com/hashicorp/nomad/pull/5669)] FEATURES: diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index e88487431f71..8fe87cf36478 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -195,8 +195,7 @@ type TaskRunner struct { maxEvents int // serversContactedCh is passed to TaskRunners so they can detect when - // servers have been contacted for the first time in case of a failed - // restore. + // GetClientAllocs has been called in case of a failed restore. serversContactedCh <-chan struct{} // waitOnServers defaults to false but will be set true if a restore