Skip to content

Commit

Permalink
Restore state + upgrade path
Browse files Browse the repository at this point in the history
  • Loading branch information
dadgar committed May 3, 2017
1 parent 85a81f4 commit e22393a
Show file tree
Hide file tree
Showing 6 changed files with 316 additions and 91 deletions.
120 changes: 90 additions & 30 deletions client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ type AllocRunner struct {
allocDirPersisted bool
}

// COMPAT: Remove in 0.7.0
// allocRunnerState is used to snapshot the state of the alloc runner
type allocRunnerState struct {
Version string
Expand Down Expand Up @@ -149,8 +150,10 @@ func NewAllocRunner(logger *log.Logger, config *config.Config, stateDB *bolt.DB,
return ar
}

// stateFilePath returns the path to our state file
func (r *AllocRunner) stateFilePath() string {
// pre060StateFilePath returns the path to our state file that would have been
// written pre v0.6.0
// COMPAT: Remove in 0.7.0
func (r *AllocRunner) pre060StateFilePath() string {
r.allocLock.Lock()
defer r.allocLock.Unlock()
path := filepath.Join(r.config.StateDir, "alloc", r.alloc.ID, "state.json")
Expand All @@ -159,29 +162,72 @@ func (r *AllocRunner) stateFilePath() string {

// RestoreState is used to restore the state of the alloc runner
func (r *AllocRunner) RestoreState() error {
// Load the snapshot

// Check if the old snapshot is there
oldPath := r.pre060StateFilePath()
var snap allocRunnerState
if err := restoreState(r.stateFilePath(), &snap); err != nil {
return err
}
if err := pre060RestoreState(oldPath, &snap); err == nil {
// Restore fields
r.alloc = snap.Alloc
r.allocDir = snap.AllocDir
r.allocClientStatus = snap.AllocClientStatus
r.allocClientDescription = snap.AllocClientDescription

if r.alloc != nil {
r.taskStates = snap.Alloc.TaskStates
}

// #2132 Upgrade path: if snap.AllocDir is nil, try to convert old
// Context struct to new AllocDir struct
if snap.AllocDir == nil && snap.Context != nil {
r.logger.Printf("[DEBUG] client: migrating state snapshot for alloc %q", r.alloc.ID)
snap.AllocDir = allocdir.NewAllocDir(r.logger, snap.Context.AllocDir.AllocDir)
for taskName := range snap.Context.AllocDir.TaskDirs {
snap.AllocDir.NewTaskDir(taskName)
// #2132 Upgrade path: if snap.AllocDir is nil, try to convert old
// Context struct to new AllocDir struct
if snap.AllocDir == nil && snap.Context != nil {
r.logger.Printf("[DEBUG] client: migrating state snapshot for alloc %q", r.alloc.ID)
snap.AllocDir = allocdir.NewAllocDir(r.logger, snap.Context.AllocDir.AllocDir)
for taskName := range snap.Context.AllocDir.TaskDirs {
snap.AllocDir.NewTaskDir(taskName)
}
}
}

// XXX needs to be updated and handle the upgrade path
// Delete the old state
os.RemoveAll(oldPath)
} else if !os.IsNotExist(err) {
// Something corrupt in the old state file
return err
} else {
// We are doing a normal restore
err := r.stateDB.View(func(tx *bolt.Tx) error {
bkt, err := getAllocationBucket(tx, r.alloc.ID)
if err != nil {
return fmt.Errorf("failed to get allocation bucket: %v", err)
}

// Restore fields
r.alloc = snap.Alloc
r.allocDir = snap.AllocDir
r.allocClientStatus = snap.AllocClientStatus
r.allocClientDescription = snap.AllocClientDescription
// Get the state objects
var mutable allocRunnerMutableState
var immutable allocRunnerImmutableState
var allocDir allocdir.AllocDir

if err := getObject(bkt, allocRunnerStateImmutableKey, &immutable); err != nil {
return fmt.Errorf("failed to read alloc runner immutable state: %v", err)
}
if err := getObject(bkt, allocRunnerStateMutableKey, &mutable); err != nil {
return fmt.Errorf("failed to read alloc runner mutable state: %v", err)
}
if err := getObject(bkt, allocRunnerStateAllocDirKey, &allocDir); err != nil {
return fmt.Errorf("failed to read alloc runner alloc_dir state: %v", err)
}

// Populate the fields
r.alloc = immutable.Alloc
r.allocDir = &allocDir
r.allocClientStatus = mutable.AllocClientStatus
r.allocClientDescription = mutable.AllocClientDescription
r.taskStates = mutable.TaskStates
return nil
})

if err != nil {
return fmt.Errorf("failed to read allocation state: %v", err)
}
}

var snapshotErrors multierror.Error
if r.alloc == nil {
Expand All @@ -194,11 +240,17 @@ func (r *AllocRunner) RestoreState() error {
return e
}

r.taskStates = snap.Alloc.TaskStates
tg := r.alloc.Job.LookupTaskGroup(r.alloc.TaskGroup)
if tg == nil {
return fmt.Errorf("restored allocation doesn't contain task group %q", r.alloc.TaskGroup)
}

// Restore the task runners
var mErr multierror.Error
for name, state := range r.taskStates {
for _, task := range tg.Tasks {
name := task.Name
state := r.taskStates[name]

// Mark the task as restored.
r.restored[name] = struct{}{}

Expand All @@ -210,7 +262,6 @@ func (r *AllocRunner) RestoreState() error {
return err
}

task := &structs.Task{Name: name}
tr := NewTaskRunner(r.logger, r.config, r.stateDB, r.setTaskState, td, r.Alloc(), task, r.vaultClient, r.consulClient)
r.tasks[name] = tr

Expand All @@ -231,11 +282,6 @@ func (r *AllocRunner) RestoreState() error {
return mErr.ErrorOrNil()
}

// GetAllocDir returns the alloc dir for the alloc runner
func (r *AllocRunner) GetAllocDir() *allocdir.AllocDir {
return r.allocDir
}

// SaveState is used to snapshot the state of the alloc runner
// if the fullSync is marked as false only the state of the Alloc Runner
// is snapshotted. If fullSync is marked as true, we snapshot
Expand Down Expand Up @@ -330,14 +376,24 @@ func (r *AllocRunner) saveTaskRunnerState(tr *TaskRunner) error {

// DestroyState is used to cleanup after ourselves
func (r *AllocRunner) DestroyState() error {
return os.RemoveAll(filepath.Dir(r.stateFilePath()))
return r.stateDB.Update(func(tx *bolt.Tx) error {
if err := deleteAllocationBucket(tx, r.alloc.ID); err != nil {
return fmt.Errorf("failed to delete allocation bucket: %v", err)
}
return nil
})
}

// DestroyContext is used to destroy the context
func (r *AllocRunner) DestroyContext() error {
return r.allocDir.Destroy()
}

// GetAllocDir returns the alloc dir for the alloc runner
func (r *AllocRunner) GetAllocDir() *allocdir.AllocDir {
return r.allocDir
}

// copyTaskStates returns a copy of the passed task states.
func copyTaskStates(states map[string]*structs.TaskState) map[string]*structs.TaskState {
copy := make(map[string]*structs.TaskState, len(states))
Expand Down Expand Up @@ -543,7 +599,7 @@ func (r *AllocRunner) Run() {
go r.dirtySyncState()

// Find the task group to run in the allocation
alloc := r.alloc
alloc := r.Alloc()
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
if tg == nil {
r.logger.Printf("[ERR] client: alloc '%s' for missing task group '%s'", alloc.ID, alloc.TaskGroup)
Expand Down Expand Up @@ -629,6 +685,10 @@ OUTER:
for _, tr := range runners {
tr.Update(update)
}

if err := r.syncStatus(); err != nil {
r.logger.Printf("[WARN] client: failed to sync status upon receiving alloc update: %v", err)
}
case <-r.destroyCh:
taskDestroyEvent = structs.NewTaskEvent(structs.TaskKilled)
break OUTER
Expand Down
73 changes: 54 additions & 19 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,34 +608,71 @@ func (c *Client) restoreState() error {
return nil
}

// XXX Needs to be updated and handle the upgrade case
// COMPAT: Remove in 0.7.0
// 0.6.0 transistioned from individual state files to a single bolt-db.
// The upgrade path is to:
// Check if old state exists
// If so, restore from that and delete old state
// Restore using state database

// Allocs holds the IDs of the allocations being restored
var allocs []string

// Upgrading tracks whether this is a pre 0.6.0 upgrade path
var upgrading bool

// Scan the directory
list, err := ioutil.ReadDir(filepath.Join(c.config.StateDir, "alloc"))
if err != nil && os.IsNotExist(err) {
return nil
} else if err != nil {
allocDir := filepath.Join(c.config.StateDir, "alloc")
list, err := ioutil.ReadDir(allocDir)
if err != nil && !os.IsNotExist(err) {
return fmt.Errorf("failed to list alloc state: %v", err)
} else if err == nil && len(list) != 0 {
upgrading = true
for _, entry := range list {
allocs = append(allocs, entry.Name())
}
} else {
// Normal path
err := c.stateDB.View(func(tx *bolt.Tx) error {
allocs, err = getAllAllocationIDs(tx)
if err != nil {
return fmt.Errorf("failed to list allocations: %v", err)
}
return nil
})
if err != nil {
return err
}
}

// Load each alloc back
var mErr multierror.Error
for _, entry := range list {
id := entry.Name()
for _, id := range allocs {
alloc := &structs.Allocation{ID: id}

c.configLock.RLock()
ar := NewAllocRunner(c.logger, c.configCopy, c.stateDB, c.updateAllocStatus, alloc, c.vaultClient, c.consulService)
c.configLock.RUnlock()

c.allocLock.Lock()
c.allocs[id] = ar
c.allocLock.Unlock()

if err := ar.RestoreState(); err != nil {
c.logger.Printf("[ERR] client: failed to restore state for alloc %s: %v", id, err)
mErr.Errors = append(mErr.Errors, err)
} else {
go ar.Run()
}
}

// Delete all the entries
if upgrading {
if err := os.RemoveAll(allocDir); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
}

return mErr.ErrorOrNil()
}

Expand All @@ -653,18 +690,17 @@ func (c *Client) saveState(blocking bool) error {
runners := c.getAllocRunners()
wg.Add(len(runners))

for id, ar := range c.getAllocRunners() {
go func() {
local := ar
err := local.SaveState()
for id, ar := range runners {
go func(id string, ar *AllocRunner) {
err := ar.SaveState()
if err != nil {
c.logger.Printf("[ERR] client: failed to save state for alloc %s: %v", id, err)
l.Lock()
multierror.Append(&mErr, err)
l.Unlock()
}
wg.Done()
}()
}(id, ar)
}

if blocking {
Expand Down Expand Up @@ -1505,8 +1541,7 @@ func (c *Client) runAllocs(update *allocUpdates) {
// Remove the old allocations
for _, remove := range diff.removed {
if err := c.removeAlloc(remove); err != nil {
c.logger.Printf("[ERR] client: failed to remove alloc '%s': %v",
remove.ID, err)
c.logger.Printf("[ERR] client: failed to remove alloc '%s': %v", remove.ID, err)
}
}

Expand Down Expand Up @@ -1581,11 +1616,6 @@ func (c *Client) runAllocs(update *allocUpdates) {
add.ID, err)
}
}

// Persist our state
if err := c.saveState(false); err != nil {
c.logger.Printf("[ERR] client: failed to save state: %v", err)
}
}

// blockForRemoteAlloc blocks until the previous allocation of an allocation has
Expand Down Expand Up @@ -1934,6 +1964,11 @@ func (c *Client) addAlloc(alloc *structs.Allocation, prevAllocDir *allocdir.Allo
ar := NewAllocRunner(c.logger, c.configCopy, c.stateDB, c.updateAllocStatus, alloc, c.vaultClient, c.consulService)
ar.SetPreviousAllocDir(prevAllocDir)
c.configLock.RUnlock()

if err := ar.SaveState(); err != nil {
c.logger.Printf("[WARN] client: initial save state for alloc %q failed: %v", alloc.ID, err)
}

go ar.Run()

// Store the alloc runner.
Expand Down
Loading

2 comments on commit e22393a

@glarrain
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @dadgar. I wanted to let you know that the file demo/vagrant/README.md got corrupted in this commit.

@dadgar
Copy link
Contributor Author

@dadgar dadgar commented on e22393a May 17, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@glarrain Oh wow good catch! Thank you!

Please sign in to comment.