diff --git a/client/alloc_runner.go b/client/alloc_runner.go index bbb8e0128402..4d58e1a1a9af 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -88,7 +88,8 @@ type AllocRunner struct { // State related fields // stateDB is used to store the alloc runners state - stateDB *bolt.DB + stateDB *bolt.DB + allocStateLock sync.Mutex // persistedEval is the last persisted evaluation ID. Since evaluation // IDs change on every allocation update we only need to persist the @@ -282,6 +283,7 @@ func (r *AllocRunner) RestoreState() error { } // Restore the task runners + taskDestroyEvent := structs.NewTaskEvent(structs.TaskKilled) var mErr multierror.Error for _, task := range tg.Tasks { name := task.Name @@ -299,14 +301,14 @@ func (r *AllocRunner) RestoreState() error { td = r.allocDir.NewTaskDir(name) } - tr := NewTaskRunner(r.logger, r.config, r.stateDB, r.setTaskState, td, r.Alloc(), task, r.vaultClient, r.consulClient) - r.tasks[name] = tr - // Skip tasks in terminal states. if state.State == structs.TaskStateDead { continue } + tr := NewTaskRunner(r.logger, r.config, r.stateDB, r.setTaskState, td, r.Alloc(), task, r.vaultClient, r.consulClient) + r.tasks[name] = tr + if restartReason, err := tr.RestoreState(); err != nil { r.logger.Printf("[ERR] client: failed to restore state for alloc %s task %q: %v", r.allocID, name, err) mErr.Errors = append(mErr.Errors, err) @@ -325,6 +327,8 @@ func (r *AllocRunner) RestoreState() error { r.logger.Printf("[INFO] client: restarting alloc %s task %s: %v", r.allocID, name, restartReason) tr.Restart("upgrade", restartReason) } + } else { + tr.Destroy(taskDestroyEvent) } } @@ -352,6 +356,13 @@ func (r *AllocRunner) SaveState() error { } func (r *AllocRunner) saveAllocRunnerState() error { + r.allocStateLock.Lock() + defer r.allocStateLock.Unlock() + + if r.ctx.Err() == context.Canceled { + return nil + } + // Grab all the relevant data alloc := r.Alloc() @@ -445,6 +456,9 @@ func (r *AllocRunner) saveTaskRunnerState(tr *TaskRunner) error { // DestroyState is used to cleanup after ourselves func (r *AllocRunner) DestroyState() error { + r.allocStateLock.Lock() + defer r.allocStateLock.Unlock() + return r.stateDB.Update(func(tx *bolt.Tx) error { if err := deleteAllocationBucket(tx, r.allocID); err != nil { return fmt.Errorf("failed to delete allocation bucket: %v", err) @@ -994,6 +1008,11 @@ func (r *AllocRunner) shouldUpdate(serverIndex uint64) bool { // Destroy is used to indicate that the allocation context should be destroyed func (r *AllocRunner) Destroy() { + // Lock when closing the context as that gives the save state code + // serialization. + r.allocStateLock.Lock() + defer r.allocStateLock.Unlock() + r.exitFn() r.allocBroadcast.Close() } diff --git a/client/task_runner.go b/client/task_runner.go index db494c94c90a..f248bf8ad839 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -416,6 +416,13 @@ func pre06ScriptCheck(ver, driver string, services []*structs.Service) bool { // SaveState is used to snapshot our state func (r *TaskRunner) SaveState() error { + r.destroyLock.Lock() + defer r.destroyLock.Unlock() + if r.destroy { + // Don't save state if already destroyed + return nil + } + r.persistLock.Lock() defer r.persistLock.Unlock() snap := taskRunnerState{