Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix state handling on restart #2878

Merged
merged 3 commits into from
Jul 21, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 23 additions & 4 deletions client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ type AllocRunner struct {

// State related fields
// stateDB is used to store the alloc runners state
stateDB *bolt.DB
stateDB *bolt.DB
allocStateLock sync.Mutex

// persistedEval is the last persisted evaluation ID. Since evaluation
// IDs change on every allocation update we only need to persist the
Expand Down Expand Up @@ -282,6 +283,7 @@ func (r *AllocRunner) RestoreState() error {
}

// Restore the task runners
taskDestroyEvent := structs.NewTaskEvent(structs.TaskKilled)
var mErr multierror.Error
for _, task := range tg.Tasks {
name := task.Name
Expand All @@ -299,14 +301,14 @@ func (r *AllocRunner) RestoreState() error {
td = r.allocDir.NewTaskDir(name)
}

tr := NewTaskRunner(r.logger, r.config, r.stateDB, r.setTaskState, td, r.Alloc(), task, r.vaultClient, r.consulClient)
r.tasks[name] = tr

// Skip tasks in terminal states.
if state.State == structs.TaskStateDead {
continue
}

tr := NewTaskRunner(r.logger, r.config, r.stateDB, r.setTaskState, td, r.Alloc(), task, r.vaultClient, r.consulClient)
r.tasks[name] = tr

if restartReason, err := tr.RestoreState(); err != nil {
r.logger.Printf("[ERR] client: failed to restore state for alloc %s task %q: %v", r.allocID, name, err)
mErr.Errors = append(mErr.Errors, err)
Expand All @@ -325,6 +327,8 @@ func (r *AllocRunner) RestoreState() error {
r.logger.Printf("[INFO] client: restarting alloc %s task %s: %v", r.allocID, name, restartReason)
tr.Restart("upgrade", restartReason)
}
} else {
tr.Destroy(taskDestroyEvent)
}
}

Expand Down Expand Up @@ -352,6 +356,13 @@ func (r *AllocRunner) SaveState() error {
}

func (r *AllocRunner) saveAllocRunnerState() error {
r.allocStateLock.Lock()
defer r.allocStateLock.Unlock()

if r.ctx.Err() == context.Canceled {
return nil
}

// Grab all the relevant data
alloc := r.Alloc()

Expand Down Expand Up @@ -445,6 +456,9 @@ func (r *AllocRunner) saveTaskRunnerState(tr *TaskRunner) error {

// DestroyState is used to cleanup after ourselves
func (r *AllocRunner) DestroyState() error {
r.allocStateLock.Lock()
defer r.allocStateLock.Unlock()

return r.stateDB.Update(func(tx *bolt.Tx) error {
if err := deleteAllocationBucket(tx, r.allocID); err != nil {
return fmt.Errorf("failed to delete allocation bucket: %v", err)
Expand Down Expand Up @@ -994,6 +1008,11 @@ func (r *AllocRunner) shouldUpdate(serverIndex uint64) bool {

// Destroy is used to indicate that the allocation context should be destroyed
func (r *AllocRunner) Destroy() {
// Lock when closing the context as that gives the save state code
// serialization.
r.allocStateLock.Lock()
defer r.allocStateLock.Unlock()

r.exitFn()
r.allocBroadcast.Close()
}
Expand Down
7 changes: 7 additions & 0 deletions client/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,13 @@ func pre06ScriptCheck(ver, driver string, services []*structs.Service) bool {

// SaveState is used to snapshot our state
func (r *TaskRunner) SaveState() error {
r.destroyLock.Lock()
defer r.destroyLock.Unlock()
if r.destroy {
// Don't save state if already destroyed
return nil
}

r.persistLock.Lock()
defer r.persistLock.Unlock()
snap := taskRunnerState{
Expand Down