diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index 39fbe01e694b..f628ef5017a0 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -765,14 +765,15 @@ func (ar *allocRunner) destroyImpl() { // state if Run() ran at all. <-ar.taskStateUpdateHandlerCh - // Cleanup state db + // Mark alloc as destroyed + ar.destroyedLock.Lock() + + // Cleanup state db; while holding the lock to avoid + // a race periodic PersistState that may resurrect the alloc if err := ar.stateDB.DeleteAllocationBucket(ar.id); err != nil { ar.logger.Warn("failed to delete allocation state", "error", err) } - // Mark alloc as destroyed - ar.destroyedLock.Lock() - if !ar.shutdown { ar.shutdown = true close(ar.shutdownCh) @@ -784,6 +785,24 @@ func (ar *allocRunner) destroyImpl() { ar.destroyedLock.Unlock() } +func (ar *allocRunner) PersistState() error { + ar.destroyedLock.Lock() + defer ar.destroyedLock.Unlock() + + if ar.destroyed { + err := ar.stateDB.DeleteAllocationBucket(ar.id) + if err != nil { + ar.logger.Warn("failed to delete allocation bucket", "error", err) + } + return nil + } + + // TODO: consider persisting deployment state along with task status. + // While we study why only the alloc is persisted, I opted to maintain current + // behavior and not risk adding yet more IO calls unnecessarily. + return ar.stateDB.PutAllocation(ar.Alloc()) +} + // Destroy the alloc runner by stopping it if it is still running and cleaning // up all of its resources. // diff --git a/client/allocrunner/alloc_runner_test.go b/client/allocrunner/alloc_runner_test.go index 5a758ce0a504..548602f1ebb0 100644 --- a/client/allocrunner/alloc_runner_test.go +++ b/client/allocrunner/alloc_runner_test.go @@ -1001,3 +1001,61 @@ func TestAllocRunner_TerminalUpdate_Destroy(t *testing.T) { require.Fail(t, "err: %v", err) }) } + +// TestAllocRunner_PersistState_Destroyed asserts that destroyed allocs don't persist anymore +func TestAllocRunner_PersistState_Destroyed(t *testing.T) { + t.Parallel() + + alloc := mock.BatchAlloc() + taskName := alloc.Job.LookupTaskGroup(alloc.TaskGroup).Tasks[0].Name + + conf, cleanup := testAllocRunnerConfig(t, alloc) + conf.StateDB = state.NewMemDB(conf.Logger) + + defer cleanup() + ar, err := NewAllocRunner(conf) + require.NoError(t, err) + defer destroy(ar) + + go ar.Run() + + select { + case <-ar.WaitCh(): + case <-time.After(10 * time.Second): + require.Fail(t, "timed out waiting for alloc to complete") + } + + // test final persisted state upon completion + require.NoError(t, ar.PersistState()) + allocs, _, err := conf.StateDB.GetAllAllocations() + require.NoError(t, err) + require.Len(t, allocs, 1) + require.Equal(t, alloc.ID, allocs[0].ID) + _, ts, err := conf.StateDB.GetTaskRunnerState(alloc.ID, taskName) + require.NoError(t, err) + require.Equal(t, structs.TaskStateDead, ts.State) + + // check that DB alloc is empty after destroying AR + ar.Destroy() + select { + case <-ar.DestroyCh(): + case <-time.After(10 * time.Second): + require.Fail(t, "timedout waiting for destruction") + } + + allocs, _, err = conf.StateDB.GetAllAllocations() + require.NoError(t, err) + require.Empty(t, allocs) + _, ts, err = conf.StateDB.GetTaskRunnerState(alloc.ID, taskName) + require.NoError(t, err) + require.Nil(t, ts) + + // check that DB alloc is empty after persisting state of destroyed AR + ar.PersistState() + allocs, _, err = conf.StateDB.GetAllAllocations() + require.NoError(t, err) + require.Empty(t, allocs) + _, ts, err = conf.StateDB.GetTaskRunnerState(alloc.ID, taskName) + require.NoError(t, err) + require.Nil(t, ts) +} diff --git a/client/client.go b/client/client.go index 622c1514d8c2..ce6c1f2afcd5 100644 --- a/client/client.go +++ b/client/client.go @@ -139,6 +139,7 @@ type AllocRunner interface { ShutdownCh() <-chan struct{} Signal(taskName, signal string) error GetTaskEventHandler(taskName string) drivermanager.EventHandler + PersistState() error RestartTask(taskName string, taskEvent *structs.TaskEvent) error RestartAll(taskEvent *structs.TaskEvent) error @@ -1084,7 +1085,7 @@ func (c *Client) saveState() error { for id, ar := range runners { go func(id string, ar AllocRunner) { - err := c.stateDB.PutAllocation(ar.Alloc()) + err := ar.PersistState() if err != nil { c.logger.Error("error saving alloc state", "error", err, "alloc_id", id) l.Lock()