diff --git a/client/alloc_runner.go b/client/alloc_runner.go index 21c3ce2ed821..4cc279a3954c 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -88,6 +88,7 @@ type AllocRunner struct { allocDirPersisted bool } +// COMPAT: Remove in 0.7.0 // allocRunnerState is used to snapshot the state of the alloc runner type allocRunnerState struct { Version string @@ -149,8 +150,10 @@ func NewAllocRunner(logger *log.Logger, config *config.Config, stateDB *bolt.DB, return ar } -// stateFilePath returns the path to our state file -func (r *AllocRunner) stateFilePath() string { +// pre060StateFilePath returns the path to our state file that would have been +// written pre v0.6.0 +// COMPAT: Remove in 0.7.0 +func (r *AllocRunner) pre060StateFilePath() string { r.allocLock.Lock() defer r.allocLock.Unlock() path := filepath.Join(r.config.StateDir, "alloc", r.alloc.ID, "state.json") @@ -159,29 +162,72 @@ func (r *AllocRunner) stateFilePath() string { // RestoreState is used to restore the state of the alloc runner func (r *AllocRunner) RestoreState() error { - // Load the snapshot + + // Check if the old snapshot is there + oldPath := r.pre060StateFilePath() var snap allocRunnerState - if err := restoreState(r.stateFilePath(), &snap); err != nil { - return err - } + if err := pre060RestoreState(oldPath, &snap); err == nil { + // Restore fields + r.alloc = snap.Alloc + r.allocDir = snap.AllocDir + r.allocClientStatus = snap.AllocClientStatus + r.allocClientDescription = snap.AllocClientDescription + + if r.alloc != nil { + r.taskStates = snap.Alloc.TaskStates + } - // #2132 Upgrade path: if snap.AllocDir is nil, try to convert old - // Context struct to new AllocDir struct - if snap.AllocDir == nil && snap.Context != nil { - r.logger.Printf("[DEBUG] client: migrating state snapshot for alloc %q", r.alloc.ID) - snap.AllocDir = allocdir.NewAllocDir(r.logger, snap.Context.AllocDir.AllocDir) - for taskName := range snap.Context.AllocDir.TaskDirs { - snap.AllocDir.NewTaskDir(taskName) + // #2132 Upgrade path: if snap.AllocDir is nil, try to convert old + // Context struct to new AllocDir struct + if snap.AllocDir == nil && snap.Context != nil { + r.logger.Printf("[DEBUG] client: migrating state snapshot for alloc %q", r.alloc.ID) + snap.AllocDir = allocdir.NewAllocDir(r.logger, snap.Context.AllocDir.AllocDir) + for taskName := range snap.Context.AllocDir.TaskDirs { + snap.AllocDir.NewTaskDir(taskName) + } } - } - // XXX needs to be updated and handle the upgrade path + // Delete the old state + os.RemoveAll(oldPath) + } else if !os.IsNotExist(err) { + // Something corrupt in the old state file + return err + } else { + // We are doing a normal restore + err := r.stateDB.View(func(tx *bolt.Tx) error { + bkt, err := getAllocationBucket(tx, r.alloc.ID) + if err != nil { + return fmt.Errorf("failed to get allocation bucket: %v", err) + } - // Restore fields - r.alloc = snap.Alloc - r.allocDir = snap.AllocDir - r.allocClientStatus = snap.AllocClientStatus - r.allocClientDescription = snap.AllocClientDescription + // Get the state objects + var mutable allocRunnerMutableState + var immutable allocRunnerImmutableState + var allocDir allocdir.AllocDir + + if err := getObject(bkt, allocRunnerStateImmutableKey, &immutable); err != nil { + return fmt.Errorf("failed to read alloc runner immutable state: %v", err) + } + if err := getObject(bkt, allocRunnerStateMutableKey, &mutable); err != nil { + return fmt.Errorf("failed to read alloc runner mutable state: %v", err) + } + if err := getObject(bkt, allocRunnerStateAllocDirKey, &allocDir); err != nil { + return fmt.Errorf("failed to read alloc runner alloc_dir state: %v", err) + } + + // Populate the fields + r.alloc = immutable.Alloc + r.allocDir = &allocDir + r.allocClientStatus = mutable.AllocClientStatus + r.allocClientDescription = mutable.AllocClientDescription + r.taskStates = mutable.TaskStates + return nil + }) + + if err != nil { + return fmt.Errorf("failed to read allocation state: %v", err) + } + } var snapshotErrors multierror.Error if r.alloc == nil { @@ -194,11 +240,17 @@ func (r *AllocRunner) RestoreState() error { return e } - r.taskStates = snap.Alloc.TaskStates + tg := r.alloc.Job.LookupTaskGroup(r.alloc.TaskGroup) + if tg == nil { + return fmt.Errorf("restored allocation doesn't contain task group %q", r.alloc.TaskGroup) + } // Restore the task runners var mErr multierror.Error - for name, state := range r.taskStates { + for _, task := range tg.Tasks { + name := task.Name + state := r.taskStates[name] + // Mark the task as restored. r.restored[name] = struct{}{} @@ -210,7 +262,6 @@ func (r *AllocRunner) RestoreState() error { return err } - task := &structs.Task{Name: name} tr := NewTaskRunner(r.logger, r.config, r.stateDB, r.setTaskState, td, r.Alloc(), task, r.vaultClient, r.consulClient) r.tasks[name] = tr @@ -231,11 +282,6 @@ func (r *AllocRunner) RestoreState() error { return mErr.ErrorOrNil() } -// GetAllocDir returns the alloc dir for the alloc runner -func (r *AllocRunner) GetAllocDir() *allocdir.AllocDir { - return r.allocDir -} - // SaveState is used to snapshot the state of the alloc runner // if the fullSync is marked as false only the state of the Alloc Runner // is snapshotted. If fullSync is marked as true, we snapshot @@ -330,7 +376,12 @@ func (r *AllocRunner) saveTaskRunnerState(tr *TaskRunner) error { // DestroyState is used to cleanup after ourselves func (r *AllocRunner) DestroyState() error { - return os.RemoveAll(filepath.Dir(r.stateFilePath())) + return r.stateDB.Update(func(tx *bolt.Tx) error { + if err := deleteAllocationBucket(tx, r.alloc.ID); err != nil { + return fmt.Errorf("failed to delete allocation bucket: %v", err) + } + return nil + }) } // DestroyContext is used to destroy the context @@ -338,6 +389,11 @@ func (r *AllocRunner) DestroyContext() error { return r.allocDir.Destroy() } +// GetAllocDir returns the alloc dir for the alloc runner +func (r *AllocRunner) GetAllocDir() *allocdir.AllocDir { + return r.allocDir +} + // copyTaskStates returns a copy of the passed task states. func copyTaskStates(states map[string]*structs.TaskState) map[string]*structs.TaskState { copy := make(map[string]*structs.TaskState, len(states)) @@ -543,7 +599,7 @@ func (r *AllocRunner) Run() { go r.dirtySyncState() // Find the task group to run in the allocation - alloc := r.alloc + alloc := r.Alloc() tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) if tg == nil { r.logger.Printf("[ERR] client: alloc '%s' for missing task group '%s'", alloc.ID, alloc.TaskGroup) @@ -629,6 +685,10 @@ OUTER: for _, tr := range runners { tr.Update(update) } + + if err := r.syncStatus(); err != nil { + r.logger.Printf("[WARN] client: failed to sync status upon receiving alloc update: %v", err) + } case <-r.destroyCh: taskDestroyEvent = structs.NewTaskEvent(structs.TaskKilled) break OUTER diff --git a/client/client.go b/client/client.go index faf0a34bba35..a6a6ba2a417a 100644 --- a/client/client.go +++ b/client/client.go @@ -608,27 +608,56 @@ func (c *Client) restoreState() error { return nil } - // XXX Needs to be updated and handle the upgrade case + // COMPAT: Remove in 0.7.0 + // 0.6.0 transistioned from individual state files to a single bolt-db. + // The upgrade path is to: + // Check if old state exists + // If so, restore from that and delete old state + // Restore using state database + + // Allocs holds the IDs of the allocations being restored + var allocs []string + + // Upgrading tracks whether this is a pre 0.6.0 upgrade path + var upgrading bool // Scan the directory - list, err := ioutil.ReadDir(filepath.Join(c.config.StateDir, "alloc")) - if err != nil && os.IsNotExist(err) { - return nil - } else if err != nil { + allocDir := filepath.Join(c.config.StateDir, "alloc") + list, err := ioutil.ReadDir(allocDir) + if err != nil && !os.IsNotExist(err) { return fmt.Errorf("failed to list alloc state: %v", err) + } else if err == nil && len(list) != 0 { + upgrading = true + for _, entry := range list { + allocs = append(allocs, entry.Name()) + } + } else { + // Normal path + err := c.stateDB.View(func(tx *bolt.Tx) error { + allocs, err = getAllAllocationIDs(tx) + if err != nil { + return fmt.Errorf("failed to list allocations: %v", err) + } + return nil + }) + if err != nil { + return err + } } // Load each alloc back var mErr multierror.Error - for _, entry := range list { - id := entry.Name() + for _, id := range allocs { alloc := &structs.Allocation{ID: id} + c.configLock.RLock() ar := NewAllocRunner(c.logger, c.configCopy, c.stateDB, c.updateAllocStatus, alloc, c.vaultClient, c.consulService) c.configLock.RUnlock() + c.allocLock.Lock() c.allocs[id] = ar c.allocLock.Unlock() + if err := ar.RestoreState(); err != nil { c.logger.Printf("[ERR] client: failed to restore state for alloc %s: %v", id, err) mErr.Errors = append(mErr.Errors, err) @@ -636,6 +665,14 @@ func (c *Client) restoreState() error { go ar.Run() } } + + // Delete all the entries + if upgrading { + if err := os.RemoveAll(allocDir); err != nil { + mErr.Errors = append(mErr.Errors, err) + } + } + return mErr.ErrorOrNil() } @@ -653,10 +690,9 @@ func (c *Client) saveState(blocking bool) error { runners := c.getAllocRunners() wg.Add(len(runners)) - for id, ar := range c.getAllocRunners() { - go func() { - local := ar - err := local.SaveState() + for id, ar := range runners { + go func(id string, ar *AllocRunner) { + err := ar.SaveState() if err != nil { c.logger.Printf("[ERR] client: failed to save state for alloc %s: %v", id, err) l.Lock() @@ -664,7 +700,7 @@ func (c *Client) saveState(blocking bool) error { l.Unlock() } wg.Done() - }() + }(id, ar) } if blocking { @@ -1505,8 +1541,7 @@ func (c *Client) runAllocs(update *allocUpdates) { // Remove the old allocations for _, remove := range diff.removed { if err := c.removeAlloc(remove); err != nil { - c.logger.Printf("[ERR] client: failed to remove alloc '%s': %v", - remove.ID, err) + c.logger.Printf("[ERR] client: failed to remove alloc '%s': %v", remove.ID, err) } } @@ -1581,11 +1616,6 @@ func (c *Client) runAllocs(update *allocUpdates) { add.ID, err) } } - - // Persist our state - if err := c.saveState(false); err != nil { - c.logger.Printf("[ERR] client: failed to save state: %v", err) - } } // blockForRemoteAlloc blocks until the previous allocation of an allocation has @@ -1934,6 +1964,11 @@ func (c *Client) addAlloc(alloc *structs.Allocation, prevAllocDir *allocdir.Allo ar := NewAllocRunner(c.logger, c.configCopy, c.stateDB, c.updateAllocStatus, alloc, c.vaultClient, c.consulService) ar.SetPreviousAllocDir(prevAllocDir) c.configLock.RUnlock() + + if err := ar.SaveState(); err != nil { + c.logger.Printf("[WARN] client: initial save state for alloc %q failed: %v", alloc.ID, err) + } + go ar.Run() // Store the alloc runner. diff --git a/client/state_database.go b/client/state_database.go index ba6f71d71f74..a9a36a5f9d1c 100644 --- a/client/state_database.go +++ b/client/state_database.go @@ -55,24 +55,54 @@ func putData(bkt *bolt.Bucket, key, value []byte) error { return nil } +func getObject(bkt *bolt.Bucket, key []byte, obj interface{}) error { + // Get the data + data := bkt.Get(key) + if data == nil { + return fmt.Errorf("no data at key %v", string(key)) + } + + // Deserialize the object + if err := codec.NewDecoderBytes(data, structs.MsgpackHandle).Decode(obj); err != nil { + return fmt.Errorf("failed to decode data into passed object: %v", err) + } + + return nil +} + // getAllocationBucket returns the bucket used to persist state about a // particular allocation. If the root allocation bucket or the specific -// allocation bucket doesn't exist, it will be created. +// allocation bucket doesn't exist, it will be created as long as the +// transaction is writable. func getAllocationBucket(tx *bolt.Tx, allocID string) (*bolt.Bucket, error) { - if !tx.Writable() { - return nil, fmt.Errorf("transaction must be writable") - } + var err error + w := tx.Writable() // Retrieve the root allocations bucket - allocations, err := tx.CreateBucketIfNotExists(allocationsBucket) - if err != nil { - return nil, err + allocations := tx.Bucket(allocationsBucket) + if allocations == nil { + if !w { + return nil, fmt.Errorf("Allocations bucket doesn't exist and transaction is not writable") + } + + allocations, err = tx.CreateBucket(allocationsBucket) + if err != nil { + return nil, err + } } // Retrieve the specific allocations bucket - alloc, err := allocations.CreateBucketIfNotExists([]byte(allocID)) - if err != nil { - return nil, err + key := []byte(allocID) + alloc := allocations.Bucket(key) + if alloc == nil { + if !w { + return nil, fmt.Errorf("Allocation bucket doesn't exist and transaction is not writable") + } + + alloc, err = allocations.CreateBucket(key) + if err != nil { + return nil, err + } } return alloc, nil @@ -80,29 +110,94 @@ func getAllocationBucket(tx *bolt.Tx, allocID string) (*bolt.Bucket, error) { // getTaskBucket returns the bucket used to persist state about a // particular task. If the root allocation bucket, the specific -// allocation or task bucket doesn't exist, they will be created. +// allocation or task bucket doesn't exist, they will be created as long as the +// transaction is writable. func getTaskBucket(tx *bolt.Tx, allocID, taskName string) (*bolt.Bucket, error) { + alloc, err := getAllocationBucket(tx, allocID) + if err != nil { + return nil, err + } + + // Retrieve the specific task bucket + w := tx.Writable() + key := []byte(taskName) + task := alloc.Bucket(key) + if task == nil { + if !w { + return nil, fmt.Errorf("Task bucket doesn't exist and transaction is not writable") + } + + task, err = alloc.CreateBucket(key) + if err != nil { + return nil, err + } + } + + return task, nil +} + +// deleteAllocationBucket is used to delete an allocation bucket if it exists. +func deleteAllocationBucket(tx *bolt.Tx, allocID string) error { if !tx.Writable() { - return nil, fmt.Errorf("transaction must be writable") + return fmt.Errorf("transaction must be writable") } // Retrieve the root allocations bucket - allocations, err := tx.CreateBucketIfNotExists(allocationsBucket) - if err != nil { - return nil, err + allocations := tx.Bucket(allocationsBucket) + if allocations == nil { + return nil + } + + // Check if the bucket exists + key := []byte(allocID) + if allocBkt := allocations.Bucket(key); allocBkt == nil { + return nil + } + + return allocations.DeleteBucket(key) +} + +// deleteTaskBucket is used to delete a task bucket if it exists. +func deleteTaskBucket(tx *bolt.Tx, allocID, taskName string) error { + if !tx.Writable() { + return fmt.Errorf("transaction must be writable") + } + + // Retrieve the root allocations bucket + allocations := tx.Bucket(allocationsBucket) + if allocations == nil { + return nil } // Retrieve the specific allocations bucket - alloc, err := allocations.CreateBucketIfNotExists([]byte(allocID)) - if err != nil { - return nil, err + alloc := allocations.Bucket([]byte(allocID)) + if alloc == nil { + return nil } - // Retrieve the specific task bucket - task, err := alloc.CreateBucketIfNotExists([]byte(taskName)) - if err != nil { - return nil, err + // Check if the bucket exists + key := []byte(taskName) + if taskBkt := alloc.Bucket(key); taskBkt == nil { + return nil } - return task, nil + return alloc.DeleteBucket(key) +} + +func getAllAllocationIDs(tx *bolt.Tx) ([]string, error) { + allocationsBkt := tx.Bucket(allocationsBucket) + if allocationsBkt == nil { + return nil, nil + } + + // Create a cursor for iteration. + var allocIDs []string + c := allocationsBkt.Cursor() + + // Iterate over all the buckets + for k, _ := c.First(); k != nil; k, _ = c.Next() { + allocIDs = append(allocIDs, string(k)) + } + + return allocIDs, nil } diff --git a/client/task_runner.go b/client/task_runner.go index ca74ae50ac6d..8f91d8f4cac8 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -249,34 +249,61 @@ func (r *TaskRunner) WaitCh() <-chan struct{} { return r.waitCh } -// stateFilePath returns the path to our state file -func (r *TaskRunner) stateFilePath() string { +// pre060StateFilePath returns the path to our state file that would have been +// written pre v0.6.0 +// COMPAT: Remove in 0.7.0 +func (r *TaskRunner) pre060StateFilePath() string { // Get the MD5 of the task name hashVal := md5.Sum([]byte(r.task.Name)) hashHex := hex.EncodeToString(hashVal[:]) dirName := fmt.Sprintf("task-%s", hashHex) // Generate the path - path := filepath.Join(r.config.StateDir, "alloc", r.alloc.ID, - dirName, "state.json") - return path + return filepath.Join(r.config.StateDir, "alloc", r.alloc.ID, dirName, "state.json") } // RestoreState is used to restore our state func (r *TaskRunner) RestoreState() error { - // XXX needs to be updated and handle the upgrade path + // COMPAT: Remove in 0.7.0 + // 0.6.0 transistioned from individual state files to a single bolt-db. + // The upgrade path is to: + // Check if old state exists + // If so, restore from that and delete old state + // Restore using state database - // Load the snapshot var snap taskRunnerState - if err := restoreState(r.stateFilePath(), &snap); err != nil { + + // Check if the old snapshot is there + oldPath := r.pre060StateFilePath() + if err := pre060RestoreState(oldPath, &snap); err == nil { + // Delete the old state + os.RemoveAll(oldPath) + } else if !os.IsNotExist(err) { + // Something corrupt in the old state file return err + } else { + // We are doing a normal restore + err := r.stateDB.View(func(tx *bolt.Tx) error { + bkt, err := getTaskBucket(tx, r.alloc.ID, r.task.Name) + if err != nil { + return fmt.Errorf("failed to get task bucket: %v", err) + } + + if err := getObject(bkt, taskRunnerStateAllKey, &snap); err != nil { + return fmt.Errorf("failed to read task runner state: %v", err) + } + return nil + }) + if err != nil { + return err + } + } - // Restore fields + // Restore fields from the snapshot r.artifactsDownloaded = snap.ArtifactDownloaded r.taskDirBuilt = snap.TaskDirBuilt r.payloadRendered = snap.PayloadRendered - r.setCreatedResources(snap.CreatedResources) if err := r.setTaskEnv(); err != nil { @@ -333,14 +360,13 @@ func (r *TaskRunner) RestoreState() error { r.running = true r.runningLock.Unlock() } + return nil } // SaveState is used to snapshot our state func (r *TaskRunner) SaveState() error { - // XXX needs to be updated r.persistLock.Lock() - snap := taskRunnerState{ Version: r.config.Version, ArtifactDownloaded: r.artifactsDownloaded, @@ -355,6 +381,7 @@ func (r *TaskRunner) SaveState() error { } r.handleLock.Unlock() + // If nothing has changed avoid the write h := snap.Hash() if bytes.Equal(h, r.persistedHash) { r.persistLock.Unlock() @@ -394,11 +421,21 @@ func (r *TaskRunner) DestroyState() error { r.persistLock.Lock() defer r.persistLock.Unlock() - return os.RemoveAll(r.stateFilePath()) + return r.stateDB.Update(func(tx *bolt.Tx) error { + if err := deleteTaskBucket(tx, r.alloc.ID, r.task.Name); err != nil { + return fmt.Errorf("failed to delete task bucket: %v", err) + } + return nil + }) } // setState is used to update the state of the task runner func (r *TaskRunner) setState(state string, event *structs.TaskEvent) { + // Persist our state to disk. + if err := r.SaveState(); err != nil { + r.logger.Printf("[ERR] client: failed to save state of Task Runner for task %q: %v", r.task.Name, err) + } + // Indicate the task has been updated. r.updater(r.task.Name, state, event) } diff --git a/client/util.go b/client/util.go index ee78ebac20ee..233a0259514b 100644 --- a/client/util.go +++ b/client/util.go @@ -106,14 +106,12 @@ func persistState(path string, data interface{}) error { return nil } -// restoreState is used to read back in the persisted state -func restoreState(path string, data interface{}) error { +// pre060RestoreState is used to read back in the persisted state for pre v0.6.0 +// state +func pre060RestoreState(path string, data interface{}) error { buf, err := ioutil.ReadFile(path) if err != nil { - if os.IsNotExist(err) { - return nil - } - return fmt.Errorf("failed to read state: %v", err) + return err } if err := json.Unmarshal(buf, data); err != nil { return fmt.Errorf("failed to decode state: %v", err) diff --git a/demo/vagrant/README.md b/demo/vagrant/README.md index 2150799a4880..112daa8e4f68 100644 Binary files a/demo/vagrant/README.md and b/demo/vagrant/README.md differ