From 9def7e1a140150ae24d32ccb9548f674592924f2 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Fri, 28 Apr 2017 13:18:04 -0700 Subject: [PATCH 01/13] Don't deepcopy job when retrieving copy of Alloc This PR removes deepcopying of the job attached to the allocation in the alloc runner. This operation is called very often so removing reflect from the code path and the potentially large number of mallocs need to create a job reduced memory and cpu pressure. --- client/alloc_runner.go | 9 +++++++++ client/util.go | 12 ++++++++---- command/agent/fs_endpoint.go | 3 ++- command/agent/http.go | 16 ++-------------- nomad/structs/structs.go | 12 ++++++++++++ 5 files changed, 33 insertions(+), 19 deletions(-) diff --git a/client/alloc_runner.go b/client/alloc_runner.go index 8ca6f671eae1..1bb6d1a6502f 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -279,8 +279,17 @@ func copyTaskStates(states map[string]*structs.TaskState) map[string]*structs.Ta // Alloc returns the associated allocation func (r *AllocRunner) Alloc() *structs.Allocation { r.allocLock.Lock() + + // Clear the job before copying + job := r.alloc.Job + r.alloc.Job = nil + alloc := r.alloc.Copy() + // Restore + r.alloc.Job = job + alloc.Job = job + // The status has explicitly been set. if r.allocClientStatus != "" || r.allocClientDescription != "" { alloc.ClientStatus = r.allocClientStatus diff --git a/client/util.go b/client/util.go index 3f7cef981e79..ee78ebac20ee 100644 --- a/client/util.go +++ b/client/util.go @@ -1,6 +1,7 @@ package client import ( + "bytes" "encoding/json" "fmt" "io/ioutil" @@ -9,6 +10,7 @@ import ( "path/filepath" "github.com/hashicorp/nomad/nomad/structs" + "github.com/ugorji/go/codec" ) type allocTuple struct { @@ -78,15 +80,17 @@ func shuffleStrings(list []string) { // persistState is used to help with saving state func persistState(path string, data interface{}) error { - buf, err := json.Marshal(data) - if err != nil { - return fmt.Errorf("failed to encode state: %v", err) + var buf bytes.Buffer + enc := codec.NewEncoder(&buf, structs.JsonHandlePretty) + if err := enc.Encode(data); err != nil { + return err } + if err := os.MkdirAll(filepath.Dir(path), 0700); err != nil { return fmt.Errorf("failed to make dirs for %s: %v", path, err) } tmpPath := path + ".tmp" - if err := ioutil.WriteFile(tmpPath, buf, 0600); err != nil { + if err := ioutil.WriteFile(tmpPath, buf.Bytes(), 0600); err != nil { return fmt.Errorf("failed to save state to tmp: %v", err) } if err := os.Rename(tmpPath, path); err != nil { diff --git a/command/agent/fs_endpoint.go b/command/agent/fs_endpoint.go index 7658e662874d..ad6847c4e41c 100644 --- a/command/agent/fs_endpoint.go +++ b/command/agent/fs_endpoint.go @@ -21,6 +21,7 @@ import ( "github.com/docker/docker/pkg/ioutils" "github.com/hashicorp/nomad/client/allocdir" + "github.com/hashicorp/nomad/nomad/structs" "github.com/hpcloud/tail/watch" "github.com/ugorji/go/codec" ) @@ -290,7 +291,7 @@ func NewStreamFramer(out io.WriteCloser, plainTxt bool, heartbeatRate, batchWindow time.Duration, frameSize int) *StreamFramer { // Create a JSON encoder - enc := codec.NewEncoder(out, jsonHandle) + enc := codec.NewEncoder(out, structs.JsonHandle) // Create the heartbeat and flush ticker heartbeat := time.NewTicker(heartbeatRate) diff --git a/command/agent/http.go b/command/agent/http.go index 8dbfca78eebd..147a9df08a7b 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -29,18 +29,6 @@ const ( scadaHTTPAddr = "SCADA" ) -var ( - // jsonHandle and jsonHandlePretty are the codec handles to JSON encode - // structs. The pretty handle will add indents for easier human consumption. - jsonHandle = &codec.JsonHandle{ - HTMLCharsAsIs: true, - } - jsonHandlePretty = &codec.JsonHandle{ - HTMLCharsAsIs: true, - Indent: 4, - } -) - // HTTPServer is used to wrap an Agent and expose it over an HTTP interface type HTTPServer struct { agent *Agent @@ -248,13 +236,13 @@ func (s *HTTPServer) wrap(handler func(resp http.ResponseWriter, req *http.Reque if obj != nil { var buf bytes.Buffer if prettyPrint { - enc := codec.NewEncoder(&buf, jsonHandlePretty) + enc := codec.NewEncoder(&buf, structs.JsonHandlePretty) err = enc.Encode(obj) if err == nil { buf.Write([]byte("\n")) } } else { - enc := codec.NewEncoder(&buf, jsonHandle) + enc := codec.NewEncoder(&buf, structs.JsonHandle) err = enc.Encode(obj) } if err != nil { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 924c39009e0f..3826ed784fd6 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -4231,6 +4231,18 @@ var MsgpackHandle = func() *codec.MsgpackHandle { return h }() +var ( + // JsonHandle and JsonHandlePretty are the codec handles to JSON encode + // structs. The pretty handle will add indents for easier human consumption. + JsonHandle = &codec.JsonHandle{ + HTMLCharsAsIs: true, + } + JsonHandlePretty = &codec.JsonHandle{ + HTMLCharsAsIs: true, + Indent: 4, + } +) + var HashiMsgpackHandle = func() *hcodec.MsgpackHandle { h := &hcodec.MsgpackHandle{RawToString: true} From 7614feddbd66256dbc1a1d5a19aef06b8a5c9bda Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Sat, 29 Apr 2017 15:43:23 -0700 Subject: [PATCH 02/13] boltDB database for client state --- client/alloc_runner.go | 99 +++++++++++++++++++++++++++++++++++------ client/client.go | 24 +++++++++- client/driver/driver.go | 16 +++++++ client/task_runner.go | 71 ++++++++++++++++++++++++----- 4 files changed, 185 insertions(+), 25 deletions(-) diff --git a/client/alloc_runner.go b/client/alloc_runner.go index 1bb6d1a6502f..4f29d08e3d4d 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -8,6 +8,7 @@ import ( "sync" "time" + "github.com/boltdb/bolt" "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/config" @@ -27,6 +28,13 @@ const ( taskReceivedSyncLimit = 30 * time.Second ) +var ( + // The following are the key paths written to the state database + allocRunnerStateImmutableKey = []byte("immutable") + allocRunnerStateMutableKey = []byte("mutable") + allocRunnerStateAllocDirKey = []byte("alloc-dir") +) + // AllocStateUpdater is used to update the status of an allocation type AllocStateUpdater func(alloc *structs.Allocation) @@ -69,8 +77,15 @@ type AllocRunner struct { destroyLock sync.Mutex waitCh chan struct{} - // serialize saveAllocRunnerState calls - persistLock sync.Mutex + // State related fields + // stateDB is used to store the alloc runners state + stateDB *bolt.DB + + // immutablePersisted and allocDirPersisted are used to track whether the + // immutable data and the alloc dir have been persisted. Once persisted we + // can lower write volume by not re-writing these values + immutablePersisted bool + allocDirPersisted bool } // allocRunnerState is used to snapshot the state of the alloc runner @@ -95,13 +110,29 @@ type allocRunnerState struct { } `json:"Context,omitempty"` } +// allocRunnerImmutableState is state that only has to be written once as it +// doesn't change over the life-cycle of the alloc_runner. +type allocRunnerImmutableState struct { + Version string + Alloc *structs.Allocation +} + +// allocRunnerMutableState is state that has to be written on each save as it +// changes over the life-cycle of the alloc_runner. +type allocRunnerMutableState struct { + AllocClientStatus string + AllocClientDescription string + TaskStates map[string]*structs.TaskState +} + // NewAllocRunner is used to create a new allocation context -func NewAllocRunner(logger *log.Logger, config *config.Config, updater AllocStateUpdater, +func NewAllocRunner(logger *log.Logger, config *config.Config, stateDB *bolt.DB, updater AllocStateUpdater, alloc *structs.Allocation, vaultClient vaultclient.VaultClient, consulClient ConsulServiceAPI) *AllocRunner { ar := &AllocRunner{ config: config, + stateDB: stateDB, updater: updater, logger: logger, alloc: alloc, @@ -144,6 +175,8 @@ func (r *AllocRunner) RestoreState() error { } } + // XXX needs to be updated and handle the upgrade path + // Restore fields r.alloc = snap.Alloc r.allocDir = snap.AllocDir @@ -178,7 +211,7 @@ func (r *AllocRunner) RestoreState() error { } task := &structs.Task{Name: name} - tr := NewTaskRunner(r.logger, r.config, r.setTaskState, td, r.Alloc(), task, r.vaultClient, r.consulClient) + tr := NewTaskRunner(r.logger, r.config, r.stateDB, r.setTaskState, td, r.Alloc(), task, r.vaultClient, r.consulClient) r.tasks[name] = tr // Skip tasks in terminal states. @@ -224,10 +257,19 @@ func (r *AllocRunner) SaveState() error { } func (r *AllocRunner) saveAllocRunnerState() error { - r.persistLock.Lock() - defer r.persistLock.Unlock() + // Start the transaction. + tx, err := r.stateDB.Begin(true) + if err != nil { + return err + } + + // Grab the allocation bucket + allocBkt, err := getAllocationBucket(tx, r.alloc.ID) + if err != nil { + return fmt.Errorf("failed to retrieve allocation bucket: %v", err) + } - // Create the snapshot. + // Grab all the relevant data alloc := r.Alloc() r.allocLock.Lock() @@ -239,14 +281,45 @@ func (r *AllocRunner) saveAllocRunnerState() error { allocDir := r.allocDir r.allocDirLock.Unlock() - snap := allocRunnerState{ - Version: r.config.Version, - Alloc: alloc, - AllocDir: allocDir, + // Write the immutable data + if !r.immutablePersisted { + immutable := &allocRunnerImmutableState{ + Alloc: alloc, + Version: r.config.Version, + } + + if err := putObject(allocBkt, allocRunnerStateImmutableKey, &immutable); err != nil { + return fmt.Errorf("failed to write alloc_runner immutable state: %v", err) + } + + tx.OnCommit(func() { + r.immutablePersisted = true + }) + } + + // Write the alloc dir data if it hasn't been written before and it exists. + if !r.allocDirPersisted && r.allocDir != nil { + if err := putObject(allocBkt, allocRunnerStateAllocDirKey, allocDir); err != nil { + return fmt.Errorf("failed to write alloc_runner allocDir state: %v", err) + } + + tx.OnCommit(func() { + r.allocDirPersisted = true + }) + } + + // Write the mutable state every time + mutable := &allocRunnerMutableState{ AllocClientStatus: allocClientStatus, AllocClientDescription: allocClientDescription, + TaskStates: alloc.TaskStates, } - return persistState(r.stateFilePath(), &snap) + + if err := putObject(allocBkt, allocRunnerStateMutableKey, &mutable); err != nil { + return fmt.Errorf("failed to write alloc_runner mutable state: %v", err) + } + + return tx.Commit() } func (r *AllocRunner) saveTaskRunnerState(tr *TaskRunner) error { @@ -525,7 +598,7 @@ func (r *AllocRunner) Run() { taskdir := r.allocDir.NewTaskDir(task.Name) r.allocDirLock.Unlock() - tr := NewTaskRunner(r.logger, r.config, r.setTaskState, taskdir, r.Alloc(), task.Copy(), r.vaultClient, r.consulClient) + tr := NewTaskRunner(r.logger, r.config, r.stateDB, r.setTaskState, taskdir, r.Alloc(), task.Copy(), r.vaultClient, r.consulClient) r.tasks[task.Name] = tr tr.MarkReceived() diff --git a/client/client.go b/client/client.go index 89e14823b85f..00d90be678b4 100644 --- a/client/client.go +++ b/client/client.go @@ -16,6 +16,7 @@ import ( "time" "github.com/armon/go-metrics" + "github.com/boltdb/bolt" consulapi "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/lib" "github.com/hashicorp/go-multierror" @@ -99,6 +100,9 @@ type Client struct { config *config.Config start time.Time + // stateDB is used to efficiently store client state. + stateDB *bolt.DB + // configCopy is a copy that should be passed to alloc-runners. configCopy *config.Config configLock sync.RWMutex @@ -340,6 +344,13 @@ func (c *Client) init() error { } c.logger.Printf("[INFO] client: using state directory %v", c.config.StateDir) + // Create or open the state database + db, err := bolt.Open(filepath.Join(c.config.StateDir, "state.db"), 0600, nil) + if err != nil { + return fmt.Errorf("failed to create state database", err) + } + c.stateDB = db + // Ensure the alloc dir exists if we have one if c.config.AllocDir != "" { if err := os.MkdirAll(c.config.AllocDir, 0755); err != nil { @@ -410,6 +421,13 @@ func (c *Client) Shutdown() error { return nil } + // Defer closing the database + defer func() { + if err := c.stateDB.Close(); err != nil { + c.logger.Printf("[ERR] client: failed to close state database on shutdown: %v", err) + } + }() + // Stop renewing tokens and secrets if c.vaultClient != nil { c.vaultClient.Stop() @@ -590,6 +608,8 @@ func (c *Client) restoreState() error { return nil } + // XXX Needs to be updated and handle the upgrade case + // Scan the directory list, err := ioutil.ReadDir(filepath.Join(c.config.StateDir, "alloc")) if err != nil && os.IsNotExist(err) { @@ -604,7 +624,7 @@ func (c *Client) restoreState() error { id := entry.Name() alloc := &structs.Allocation{ID: id} c.configLock.RLock() - ar := NewAllocRunner(c.logger, c.configCopy, c.updateAllocStatus, alloc, c.vaultClient, c.consulService) + ar := NewAllocRunner(c.logger, c.configCopy, c.stateDB, c.updateAllocStatus, alloc, c.vaultClient, c.consulService) c.configLock.RUnlock() c.allocLock.Lock() c.allocs[id] = ar @@ -1892,7 +1912,7 @@ func (c *Client) addAlloc(alloc *structs.Allocation, prevAllocDir *allocdir.Allo defer c.allocLock.Unlock() c.configLock.RLock() - ar := NewAllocRunner(c.logger, c.configCopy, c.updateAllocStatus, alloc, c.vaultClient, c.consulService) + ar := NewAllocRunner(c.logger, c.configCopy, c.stateDB, c.updateAllocStatus, alloc, c.vaultClient, c.consulService) ar.SetPreviousAllocDir(prevAllocDir) c.configLock.RUnlock() go ar.Run() diff --git a/client/driver/driver.go b/client/driver/driver.go index ee28888cfad0..7a27e3f77ada 100644 --- a/client/driver/driver.go +++ b/client/driver/driver.go @@ -2,8 +2,10 @@ package driver import ( "context" + "crypto/md5" "errors" "fmt" + "io" "log" "os" "strings" @@ -150,6 +152,20 @@ func (r *CreatedResources) Merge(o *CreatedResources) { } } +func (r *CreatedResources) Hash() []byte { + h := md5.New() + + for k, values := range r.Resources { + io.WriteString(h, k) + io.WriteString(h, "values") + for i, v := range values { + io.WriteString(h, fmt.Sprintf("%d-%v", i, v)) + } + } + + return h.Sum(nil) +} + // Driver is used for execution of tasks. This allows Nomad // to support many pluggable implementations of task drivers. // Examples could include LXC, Docker, Qemu, etc. diff --git a/client/task_runner.go b/client/task_runner.go index c578ca5ea0b7..3094addb369e 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -1,9 +1,11 @@ package client import ( + "bytes" "crypto/md5" "encoding/hex" "fmt" + "io" "io/ioutil" "log" "os" @@ -13,6 +15,7 @@ import ( "time" "github.com/armon/go-metrics" + "github.com/boltdb/bolt" "github.com/golang/snappy" "github.com/hashicorp/consul-template/signals" "github.com/hashicorp/go-multierror" @@ -54,8 +57,15 @@ const ( vaultTokenFile = "vault_token" ) +var ( + // taskRunnerStateAllKey holds all the task runners state. At the moment + // there is no need to split it + taskRunnerStateAllKey = []byte("simple-all") +) + // TaskRunner is used to wrap a task within an allocation and provide the execution context. type TaskRunner struct { + stateDB *bolt.DB config *config.Config updater TaskStateUpdater logger *log.Logger @@ -142,17 +152,33 @@ type TaskRunner struct { // AllocRunner, so all state fields must be synchronized using this // lock. persistLock sync.Mutex + + // persistedHash is the hash of the last persisted snapshot. It is used to + // detect if a new snapshot has to be writen to disk. + persistedHash []byte } // taskRunnerState is used to snapshot the state of the task runner type taskRunnerState struct { Version string - Task *structs.Task HandleID string ArtifactDownloaded bool TaskDirBuilt bool - CreatedResources *driver.CreatedResources PayloadRendered bool + CreatedResources *driver.CreatedResources +} + +func (s *taskRunnerState) Hash() []byte { + h := md5.New() + + io.WriteString(h, s.Version) + io.WriteString(h, s.HandleID) + io.WriteString(h, fmt.Sprintf("%v", s.ArtifactDownloaded)) + io.WriteString(h, fmt.Sprintf("%v", s.TaskDirBuilt)) + io.WriteString(h, fmt.Sprintf("%v", s.PayloadRendered)) + h.Write(s.CreatedResources.Hash()) + + return h.Sum(nil) } // TaskStateUpdater is used to signal that tasks state has changed. @@ -172,7 +198,7 @@ type SignalEvent struct { // NewTaskRunner is used to create a new task context func NewTaskRunner(logger *log.Logger, config *config.Config, - updater TaskStateUpdater, taskDir *allocdir.TaskDir, + stateDB *bolt.DB, updater TaskStateUpdater, taskDir *allocdir.TaskDir, alloc *structs.Allocation, task *structs.Task, vaultClient vaultclient.VaultClient, consulClient ConsulServiceAPI) *TaskRunner { @@ -189,6 +215,7 @@ func NewTaskRunner(logger *log.Logger, config *config.Config, tc := &TaskRunner{ config: config, + stateDB: stateDB, updater: updater, logger: logger, restartTracker: restartTracker, @@ -236,6 +263,8 @@ func (r *TaskRunner) stateFilePath() string { // RestoreState is used to restore our state func (r *TaskRunner) RestoreState() error { + // XXX needs to be updated and handle the upgrade path + // Load the snapshot var snap taskRunnerState if err := restoreState(r.stateFilePath(), &snap); err != nil { @@ -243,11 +272,6 @@ func (r *TaskRunner) RestoreState() error { } // Restore fields - if snap.Task == nil { - return fmt.Errorf("task runner snapshot includes nil Task") - } else { - r.task = snap.Task - } r.artifactsDownloaded = snap.ArtifactDownloaded r.taskDirBuilt = snap.TaskDirBuilt r.payloadRendered = snap.PayloadRendered @@ -313,11 +337,11 @@ func (r *TaskRunner) RestoreState() error { // SaveState is used to snapshot our state func (r *TaskRunner) SaveState() error { + // XXX needs to be updated r.persistLock.Lock() defer r.persistLock.Unlock() snap := taskRunnerState{ - Task: r.task, Version: r.config.Version, ArtifactDownloaded: r.artifactsDownloaded, TaskDirBuilt: r.taskDirBuilt, @@ -330,7 +354,34 @@ func (r *TaskRunner) SaveState() error { snap.HandleID = r.handle.ID() } r.handleLock.Unlock() - return persistState(r.stateFilePath(), &snap) + + h := snap.Hash() + if bytes.Equal(h, r.persistedHash) { + return nil + } + + // Start the transaction. + tx, err := r.stateDB.Begin(true) + if err != nil { + return err + } + + // Grab the task bucket + taskBkt, err := getTaskBucket(tx, r.alloc.ID, r.task.Name) + if err != nil { + return fmt.Errorf("failed to retrieve allocation bucket: %v", err) + } + + if err := putObject(taskBkt, taskRunnerStateAllKey, &snap); err != nil { + return fmt.Errorf("failed to write task_runner state: %v", err) + } + + // Store the hash that was persisted + tx.OnCommit(func() { + r.persistedHash = h + }) + + return tx.Commit() } // DestroyState is used to cleanup after ourselves From 5aa6e18807c2a92898f9534de146f883845cedc7 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Mon, 1 May 2017 11:09:08 -0700 Subject: [PATCH 03/13] Use batching --- client/alloc_runner.go | 82 +++++++++++++++++++++--------------------- client/task_runner.go | 31 ++++++++-------- command/agent/http.go | 1 + 3 files changed, 55 insertions(+), 59 deletions(-) diff --git a/client/alloc_runner.go b/client/alloc_runner.go index 4f29d08e3d4d..21c3ce2ed821 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -257,18 +257,6 @@ func (r *AllocRunner) SaveState() error { } func (r *AllocRunner) saveAllocRunnerState() error { - // Start the transaction. - tx, err := r.stateDB.Begin(true) - if err != nil { - return err - } - - // Grab the allocation bucket - allocBkt, err := getAllocationBucket(tx, r.alloc.ID) - if err != nil { - return fmt.Errorf("failed to retrieve allocation bucket: %v", err) - } - // Grab all the relevant data alloc := r.Alloc() @@ -281,45 +269,55 @@ func (r *AllocRunner) saveAllocRunnerState() error { allocDir := r.allocDir r.allocDirLock.Unlock() - // Write the immutable data - if !r.immutablePersisted { - immutable := &allocRunnerImmutableState{ - Alloc: alloc, - Version: r.config.Version, - } + // Start the transaction. + return r.stateDB.Batch(func(tx *bolt.Tx) error { - if err := putObject(allocBkt, allocRunnerStateImmutableKey, &immutable); err != nil { - return fmt.Errorf("failed to write alloc_runner immutable state: %v", err) + // Grab the allocation bucket + allocBkt, err := getAllocationBucket(tx, r.alloc.ID) + if err != nil { + return fmt.Errorf("failed to retrieve allocation bucket: %v", err) } - tx.OnCommit(func() { - r.immutablePersisted = true - }) - } + // Write the immutable data + if !r.immutablePersisted { + immutable := &allocRunnerImmutableState{ + Alloc: alloc, + Version: r.config.Version, + } - // Write the alloc dir data if it hasn't been written before and it exists. - if !r.allocDirPersisted && r.allocDir != nil { - if err := putObject(allocBkt, allocRunnerStateAllocDirKey, allocDir); err != nil { - return fmt.Errorf("failed to write alloc_runner allocDir state: %v", err) + if err := putObject(allocBkt, allocRunnerStateImmutableKey, &immutable); err != nil { + return fmt.Errorf("failed to write alloc_runner immutable state: %v", err) + } + + tx.OnCommit(func() { + r.immutablePersisted = true + }) } - tx.OnCommit(func() { - r.allocDirPersisted = true - }) - } + // Write the alloc dir data if it hasn't been written before and it exists. + if !r.allocDirPersisted && r.allocDir != nil { + if err := putObject(allocBkt, allocRunnerStateAllocDirKey, allocDir); err != nil { + return fmt.Errorf("failed to write alloc_runner allocDir state: %v", err) + } - // Write the mutable state every time - mutable := &allocRunnerMutableState{ - AllocClientStatus: allocClientStatus, - AllocClientDescription: allocClientDescription, - TaskStates: alloc.TaskStates, - } + tx.OnCommit(func() { + r.allocDirPersisted = true + }) + } - if err := putObject(allocBkt, allocRunnerStateMutableKey, &mutable); err != nil { - return fmt.Errorf("failed to write alloc_runner mutable state: %v", err) - } + // Write the mutable state every time + mutable := &allocRunnerMutableState{ + AllocClientStatus: allocClientStatus, + AllocClientDescription: allocClientDescription, + TaskStates: alloc.TaskStates, + } + + if err := putObject(allocBkt, allocRunnerStateMutableKey, &mutable); err != nil { + return fmt.Errorf("failed to write alloc_runner mutable state: %v", err) + } - return tx.Commit() + return nil + }) } func (r *AllocRunner) saveTaskRunnerState(tr *TaskRunner) error { diff --git a/client/task_runner.go b/client/task_runner.go index 3094addb369e..09fdbf83981e 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -361,27 +361,24 @@ func (r *TaskRunner) SaveState() error { } // Start the transaction. - tx, err := r.stateDB.Begin(true) - if err != nil { - return err - } + return r.stateDB.Batch(func(tx *bolt.Tx) error { + // Grab the task bucket + taskBkt, err := getTaskBucket(tx, r.alloc.ID, r.task.Name) + if err != nil { + return fmt.Errorf("failed to retrieve allocation bucket: %v", err) + } - // Grab the task bucket - taskBkt, err := getTaskBucket(tx, r.alloc.ID, r.task.Name) - if err != nil { - return fmt.Errorf("failed to retrieve allocation bucket: %v", err) - } + if err := putObject(taskBkt, taskRunnerStateAllKey, &snap); err != nil { + return fmt.Errorf("failed to write task_runner state: %v", err) + } - if err := putObject(taskBkt, taskRunnerStateAllKey, &snap); err != nil { - return fmt.Errorf("failed to write task_runner state: %v", err) - } + // Store the hash that was persisted + tx.OnCommit(func() { + r.persistedHash = h + }) - // Store the hash that was persisted - tx.OnCommit(func() { - r.persistedHash = h + return nil }) - - return tx.Commit() } // DestroyState is used to cleanup after ourselves diff --git a/command/agent/http.go b/command/agent/http.go index 147a9df08a7b..c9efbe60d6af 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -174,6 +174,7 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) { s.mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) s.mux.HandleFunc("/debug/pprof/profile", pprof.Profile) s.mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) + s.mux.HandleFunc("/debug/pprof/trace", pprof.Trace) } } From 4d6a012c6fb6f1fba6c62985d091b1a20c3198e7 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Mon, 1 May 2017 11:53:43 -0700 Subject: [PATCH 04/13] metrics --- client/alloc_runner.go | 4 ++++ client/client.go | 2 ++ client/task_runner.go | 5 +++++ 3 files changed, 11 insertions(+) diff --git a/client/alloc_runner.go b/client/alloc_runner.go index 21c3ce2ed821..25c21d3e99d4 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -8,6 +8,7 @@ import ( "sync" "time" + metrics "github.com/armon/go-metrics" "github.com/boltdb/bolt" "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/client/allocdir" @@ -539,6 +540,7 @@ func (r *AllocRunner) appendTaskEvent(state *structs.TaskState, event *structs.T // Run is a long running goroutine used to manage an allocation func (r *AllocRunner) Run() { + start := time.Now() defer close(r.waitCh) go r.dirtySyncState() @@ -604,6 +606,8 @@ func (r *AllocRunner) Run() { } r.taskLock.Unlock() + metrics.MeasureSince([]string{"client", "alloc_runner", "run_delay"}, start) + // taskDestroyEvent contains an event that caused the destroyment of a task // in the allocation. var taskDestroyEvent *structs.TaskEvent diff --git a/client/client.go b/client/client.go index 00d90be678b4..ffb1bc2196b6 100644 --- a/client/client.go +++ b/client/client.go @@ -1471,6 +1471,8 @@ func (c *Client) watchNodeUpdates() { // runAllocs is invoked when we get an updated set of allocations func (c *Client) runAllocs(update *allocUpdates) { + defer metrics.MeasureSince([]string{"client", "client", "runAllocs"}, time.Now()) + // Get the existing allocs c.allocLock.RLock() exist := make([]*structs.Allocation, 0, len(c.allocs)) diff --git a/client/task_runner.go b/client/task_runner.go index 09fdbf83981e..5444a5f12b4a 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -797,6 +797,7 @@ func (r *TaskRunner) updatedTokenHandler() { // prestart handles life-cycle tasks that occur before the task has started. func (r *TaskRunner) prestart(resultCh chan bool) { + defer metrics.MeasureSince([]string{"client", "task_runner", "prestart"}, time.Now()) if r.task.Vault != nil { // Wait for the token r.logger.Printf("[DEBUG] client: waiting for Vault token for task %v in alloc %q", r.task.Name, r.alloc.ID) @@ -940,6 +941,7 @@ func (r *TaskRunner) postrun() { // run is the main run loop that handles starting the application, destroying // it, restarts and signals. func (r *TaskRunner) run() { + start := time.Now() // Predeclare things so we can jump to the RESTART var stopCollection chan struct{} var handleWaitCh chan *dstructs.WaitResult @@ -978,6 +980,7 @@ func (r *TaskRunner) run() { handleEmpty := r.handle == nil r.handleLock.Unlock() if handleEmpty { + metrics.MeasureSince([]string{"client", "task_runner", "run_delay"}, start) startErr := r.startTask() r.restartTracker.SetStartError(startErr) if startErr != nil { @@ -1255,6 +1258,7 @@ func (r *TaskRunner) killTask(killingEvent *structs.TaskEvent) { // startTask creates the driver, task dir, and starts the task. func (r *TaskRunner) startTask() error { + defer metrics.MeasureSince([]string{"client", "task_runner", "startTask"}, time.Now()) // Create a driver drv, err := r.createDriver() if err != nil { @@ -1342,6 +1346,7 @@ func interpolateServices(taskEnv *env.TaskEnvironment, task *structs.Task) { // buildTaskDir creates the task directory before driver.Prestart. It is safe // to call multiple times as its state is persisted. func (r *TaskRunner) buildTaskDir(fsi cstructs.FSIsolation) error { + defer metrics.MeasureSince([]string{"client", "task_runner", "buildTaskDir"}, time.Now()) r.persistLock.Lock() built := r.taskDirBuilt r.persistLock.Unlock() From 7dee8ae534fe13626ff147bc3fa8143edc80b06a Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Mon, 1 May 2017 15:06:18 -0700 Subject: [PATCH 05/13] perf --- client/client.go | 25 ++++++++++++++++++------- client/task_runner.go | 17 ++++++++++------- 2 files changed, 28 insertions(+), 14 deletions(-) diff --git a/client/client.go b/client/client.go index ffb1bc2196b6..5bb939380334 100644 --- a/client/client.go +++ b/client/client.go @@ -645,15 +645,26 @@ func (c *Client) saveState() error { return nil } - var mErr multierror.Error for id, ar := range c.getAllocRunners() { - if err := ar.SaveState(); err != nil { - c.logger.Printf("[ERR] client: failed to save state for alloc %s: %v", - id, err) - mErr.Errors = append(mErr.Errors, err) - } + go func() { + local := ar + if err := local.SaveState(); err != nil { + c.logger.Printf("[ERR] client: failed to save state for alloc %s: %v", + id, err) + } + }() } - return mErr.ErrorOrNil() + return nil + + //var mErr multierror.Error + //for id, ar := range c.getAllocRunners() { + //if err := ar.SaveState(); err != nil { + //c.logger.Printf("[ERR] client: failed to save state for alloc %s: %v", + //id, err) + //mErr.Errors = append(mErr.Errors, err) + //} + //} + //return mErr.ErrorOrNil() } // getAllocRunners returns a snapshot of the current set of alloc runners. diff --git a/client/task_runner.go b/client/task_runner.go index 5444a5f12b4a..00c760a86c2a 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -25,6 +25,7 @@ import ( "github.com/hashicorp/nomad/client/getter" "github.com/hashicorp/nomad/client/vaultclient" "github.com/hashicorp/nomad/nomad/structs" + "github.com/ugorji/go/codec" "github.com/hashicorp/nomad/client/driver/env" dstructs "github.com/hashicorp/nomad/client/driver/structs" @@ -339,7 +340,6 @@ func (r *TaskRunner) RestoreState() error { func (r *TaskRunner) SaveState() error { // XXX needs to be updated r.persistLock.Lock() - defer r.persistLock.Unlock() snap := taskRunnerState{ Version: r.config.Version, @@ -357,9 +357,17 @@ func (r *TaskRunner) SaveState() error { h := snap.Hash() if bytes.Equal(h, r.persistedHash) { + r.persistLock.Unlock() return nil } + // Serialize the object + var buf bytes.Buffer + if err := codec.NewEncoder(&buf, structs.MsgpackHandle).Encode(&snap); err != nil { + return fmt.Errorf("failed to serialize snapshot: %v", err) + } + r.persistLock.Unlock() + // Start the transaction. return r.stateDB.Batch(func(tx *bolt.Tx) error { // Grab the task bucket @@ -368,7 +376,7 @@ func (r *TaskRunner) SaveState() error { return fmt.Errorf("failed to retrieve allocation bucket: %v", err) } - if err := putObject(taskBkt, taskRunnerStateAllKey, &snap); err != nil { + if err := putData(taskBkt, taskRunnerStateAllKey, buf.Bytes()); err != nil { return fmt.Errorf("failed to write task_runner state: %v", err) } @@ -391,11 +399,6 @@ func (r *TaskRunner) DestroyState() error { // setState is used to update the state of the task runner func (r *TaskRunner) setState(state string, event *structs.TaskEvent) { - // Persist our state to disk. - if err := r.SaveState(); err != nil { - r.logger.Printf("[ERR] client: failed to save state of Task Runner for task %q: %v", r.task.Name, err) - } - // Indicate the task has been updated. r.updater(r.task.Name, state, event) } From 1d20b1129737befe7df013b1f1e70157bff1642d Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Mon, 1 May 2017 16:16:53 -0700 Subject: [PATCH 06/13] Async and sync saving of client state --- client/client.go | 44 ++++++++++++++++++++++++++------------------ 1 file changed, 26 insertions(+), 18 deletions(-) diff --git a/client/client.go b/client/client.go index 5bb939380334..84bf27afe743 100644 --- a/client/client.go +++ b/client/client.go @@ -447,7 +447,7 @@ func (c *Client) Shutdown() error { c.shutdown = true close(c.shutdownCh) c.connPool.Shutdown() - return c.saveState() + return c.saveState(true) } // RPC is used to forward an RPC call to a nomad server, or fail if no servers. @@ -639,32 +639,40 @@ func (c *Client) restoreState() error { return mErr.ErrorOrNil() } -// saveState is used to snapshot our state into the data dir -func (c *Client) saveState() error { +// saveState is used to snapshot our state into the data dir. If blocking is set +// to true, the function will only return once state has been saved. If false, +// the errors will be logged and state saving will be asyncronous +func (c *Client) saveState(blocking bool) error { if c.config.DevMode { return nil } + var wg sync.WaitGroup + var l sync.Mutex + var mErr multierror.Error + runners := c.getAllocRunners() + wg.Add(len(runners)) + for id, ar := range c.getAllocRunners() { go func() { local := ar - if err := local.SaveState(); err != nil { - c.logger.Printf("[ERR] client: failed to save state for alloc %s: %v", - id, err) + err := local.SaveState() + if err != nil { + c.logger.Printf("[ERR] client: failed to save state for alloc %s: %v", id, err) + l.Lock() + multierror.Append(&mErr, err) + l.Unlock() } + wg.Done() }() } - return nil - //var mErr multierror.Error - //for id, ar := range c.getAllocRunners() { - //if err := ar.SaveState(); err != nil { - //c.logger.Printf("[ERR] client: failed to save state for alloc %s: %v", - //id, err) - //mErr.Errors = append(mErr.Errors, err) - //} - //} - //return mErr.ErrorOrNil() + if blocking { + wg.Wait() + return mErr.ErrorOrNil() + } + + return nil } // getAllocRunners returns a snapshot of the current set of alloc runners. @@ -1018,7 +1026,7 @@ func (c *Client) periodicSnapshot() { select { case <-snapshot: snapshot = time.After(stateSnapshotIntv) - if err := c.saveState(); err != nil { + if err := c.saveState(false); err != nil { c.logger.Printf("[ERR] client: failed to save state: %v", err) } @@ -1577,7 +1585,7 @@ func (c *Client) runAllocs(update *allocUpdates) { } // Persist our state - if err := c.saveState(); err != nil { + if err := c.saveState(false); err != nil { c.logger.Printf("[ERR] client: failed to save state: %v", err) } } From 85a81f47de0dbac56715348a4d57e889da34c6e7 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Mon, 1 May 2017 16:18:03 -0700 Subject: [PATCH 07/13] Revert "metrics" This reverts commit 4d6a012c6fb6f1fba6c62985d091b1a20c3198e7. --- client/alloc_runner.go | 4 -- client/client.go | 2 - client/state_database.go | 108 +++++++++++++++++++++++++++++++++++++++ client/task_runner.go | 5 -- 4 files changed, 108 insertions(+), 11 deletions(-) create mode 100644 client/state_database.go diff --git a/client/alloc_runner.go b/client/alloc_runner.go index 25c21d3e99d4..21c3ce2ed821 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -8,7 +8,6 @@ import ( "sync" "time" - metrics "github.com/armon/go-metrics" "github.com/boltdb/bolt" "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/client/allocdir" @@ -540,7 +539,6 @@ func (r *AllocRunner) appendTaskEvent(state *structs.TaskState, event *structs.T // Run is a long running goroutine used to manage an allocation func (r *AllocRunner) Run() { - start := time.Now() defer close(r.waitCh) go r.dirtySyncState() @@ -606,8 +604,6 @@ func (r *AllocRunner) Run() { } r.taskLock.Unlock() - metrics.MeasureSince([]string{"client", "alloc_runner", "run_delay"}, start) - // taskDestroyEvent contains an event that caused the destroyment of a task // in the allocation. var taskDestroyEvent *structs.TaskEvent diff --git a/client/client.go b/client/client.go index 84bf27afe743..faf0a34bba35 100644 --- a/client/client.go +++ b/client/client.go @@ -1490,8 +1490,6 @@ func (c *Client) watchNodeUpdates() { // runAllocs is invoked when we get an updated set of allocations func (c *Client) runAllocs(update *allocUpdates) { - defer metrics.MeasureSince([]string{"client", "client", "runAllocs"}, time.Now()) - // Get the existing allocs c.allocLock.RLock() exist := make([]*structs.Allocation, 0, len(c.allocs)) diff --git a/client/state_database.go b/client/state_database.go new file mode 100644 index 000000000000..ba6f71d71f74 --- /dev/null +++ b/client/state_database.go @@ -0,0 +1,108 @@ +package client + +import ( + "bytes" + "fmt" + + "github.com/boltdb/bolt" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/ugorji/go/codec" +) + +/* +The client has a boltDB backed state store. The schema as of 0.6 looks as follows: + +allocations/ (bucket) +|--> / (bucket) + |--> alloc_runner persisted objects (k/v) + |--> / (bucket) + |--> task_runner persisted objects (k/v) +*/ + +var ( + // allocationsBucket is the bucket name containing all allocation related + // data + allocationsBucket = []byte("allocations") +) + +func putObject(bkt *bolt.Bucket, key []byte, obj interface{}) error { + if !bkt.Writable() { + return fmt.Errorf("bucket must be writable") + } + + // Serialize the object + var buf bytes.Buffer + if err := codec.NewEncoder(&buf, structs.MsgpackHandle).Encode(obj); err != nil { + return fmt.Errorf("failed to encode passed object: %v", err) + } + + if err := bkt.Put(key, buf.Bytes()); err != nil { + return fmt.Errorf("failed to write data at key %v: %v", string(key), err) + } + + return nil +} + +func putData(bkt *bolt.Bucket, key, value []byte) error { + if !bkt.Writable() { + return fmt.Errorf("bucket must be writable") + } + + if err := bkt.Put(key, value); err != nil { + return fmt.Errorf("failed to write data at key %v: %v", string(key), err) + } + + return nil +} + +// getAllocationBucket returns the bucket used to persist state about a +// particular allocation. If the root allocation bucket or the specific +// allocation bucket doesn't exist, it will be created. +func getAllocationBucket(tx *bolt.Tx, allocID string) (*bolt.Bucket, error) { + if !tx.Writable() { + return nil, fmt.Errorf("transaction must be writable") + } + + // Retrieve the root allocations bucket + allocations, err := tx.CreateBucketIfNotExists(allocationsBucket) + if err != nil { + return nil, err + } + + // Retrieve the specific allocations bucket + alloc, err := allocations.CreateBucketIfNotExists([]byte(allocID)) + if err != nil { + return nil, err + } + + return alloc, nil +} + +// getTaskBucket returns the bucket used to persist state about a +// particular task. If the root allocation bucket, the specific +// allocation or task bucket doesn't exist, they will be created. +func getTaskBucket(tx *bolt.Tx, allocID, taskName string) (*bolt.Bucket, error) { + if !tx.Writable() { + return nil, fmt.Errorf("transaction must be writable") + } + + // Retrieve the root allocations bucket + allocations, err := tx.CreateBucketIfNotExists(allocationsBucket) + if err != nil { + return nil, err + } + + // Retrieve the specific allocations bucket + alloc, err := allocations.CreateBucketIfNotExists([]byte(allocID)) + if err != nil { + return nil, err + } + + // Retrieve the specific task bucket + task, err := alloc.CreateBucketIfNotExists([]byte(taskName)) + if err != nil { + return nil, err + } + + return task, nil +} diff --git a/client/task_runner.go b/client/task_runner.go index 00c760a86c2a..ca74ae50ac6d 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -800,7 +800,6 @@ func (r *TaskRunner) updatedTokenHandler() { // prestart handles life-cycle tasks that occur before the task has started. func (r *TaskRunner) prestart(resultCh chan bool) { - defer metrics.MeasureSince([]string{"client", "task_runner", "prestart"}, time.Now()) if r.task.Vault != nil { // Wait for the token r.logger.Printf("[DEBUG] client: waiting for Vault token for task %v in alloc %q", r.task.Name, r.alloc.ID) @@ -944,7 +943,6 @@ func (r *TaskRunner) postrun() { // run is the main run loop that handles starting the application, destroying // it, restarts and signals. func (r *TaskRunner) run() { - start := time.Now() // Predeclare things so we can jump to the RESTART var stopCollection chan struct{} var handleWaitCh chan *dstructs.WaitResult @@ -983,7 +981,6 @@ func (r *TaskRunner) run() { handleEmpty := r.handle == nil r.handleLock.Unlock() if handleEmpty { - metrics.MeasureSince([]string{"client", "task_runner", "run_delay"}, start) startErr := r.startTask() r.restartTracker.SetStartError(startErr) if startErr != nil { @@ -1261,7 +1258,6 @@ func (r *TaskRunner) killTask(killingEvent *structs.TaskEvent) { // startTask creates the driver, task dir, and starts the task. func (r *TaskRunner) startTask() error { - defer metrics.MeasureSince([]string{"client", "task_runner", "startTask"}, time.Now()) // Create a driver drv, err := r.createDriver() if err != nil { @@ -1349,7 +1345,6 @@ func interpolateServices(taskEnv *env.TaskEnvironment, task *structs.Task) { // buildTaskDir creates the task directory before driver.Prestart. It is safe // to call multiple times as its state is persisted. func (r *TaskRunner) buildTaskDir(fsi cstructs.FSIsolation) error { - defer metrics.MeasureSince([]string{"client", "task_runner", "buildTaskDir"}, time.Now()) r.persistLock.Lock() built := r.taskDirBuilt r.persistLock.Unlock() From e22393aeb88e82de17ca5f89bb4bc33aeba43cbc Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Tue, 2 May 2017 13:31:56 -0700 Subject: [PATCH 08/13] Restore state + upgrade path --- client/alloc_runner.go | 120 ++++++++++++++++++++++++--------- client/client.go | 73 ++++++++++++++------ client/state_database.go | 141 ++++++++++++++++++++++++++++++++------- client/task_runner.go | 63 +++++++++++++---- client/util.go | 10 ++- demo/vagrant/README.md | Bin 709 -> 1537 bytes 6 files changed, 316 insertions(+), 91 deletions(-) diff --git a/client/alloc_runner.go b/client/alloc_runner.go index 21c3ce2ed821..4cc279a3954c 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -88,6 +88,7 @@ type AllocRunner struct { allocDirPersisted bool } +// COMPAT: Remove in 0.7.0 // allocRunnerState is used to snapshot the state of the alloc runner type allocRunnerState struct { Version string @@ -149,8 +150,10 @@ func NewAllocRunner(logger *log.Logger, config *config.Config, stateDB *bolt.DB, return ar } -// stateFilePath returns the path to our state file -func (r *AllocRunner) stateFilePath() string { +// pre060StateFilePath returns the path to our state file that would have been +// written pre v0.6.0 +// COMPAT: Remove in 0.7.0 +func (r *AllocRunner) pre060StateFilePath() string { r.allocLock.Lock() defer r.allocLock.Unlock() path := filepath.Join(r.config.StateDir, "alloc", r.alloc.ID, "state.json") @@ -159,29 +162,72 @@ func (r *AllocRunner) stateFilePath() string { // RestoreState is used to restore the state of the alloc runner func (r *AllocRunner) RestoreState() error { - // Load the snapshot + + // Check if the old snapshot is there + oldPath := r.pre060StateFilePath() var snap allocRunnerState - if err := restoreState(r.stateFilePath(), &snap); err != nil { - return err - } + if err := pre060RestoreState(oldPath, &snap); err == nil { + // Restore fields + r.alloc = snap.Alloc + r.allocDir = snap.AllocDir + r.allocClientStatus = snap.AllocClientStatus + r.allocClientDescription = snap.AllocClientDescription + + if r.alloc != nil { + r.taskStates = snap.Alloc.TaskStates + } - // #2132 Upgrade path: if snap.AllocDir is nil, try to convert old - // Context struct to new AllocDir struct - if snap.AllocDir == nil && snap.Context != nil { - r.logger.Printf("[DEBUG] client: migrating state snapshot for alloc %q", r.alloc.ID) - snap.AllocDir = allocdir.NewAllocDir(r.logger, snap.Context.AllocDir.AllocDir) - for taskName := range snap.Context.AllocDir.TaskDirs { - snap.AllocDir.NewTaskDir(taskName) + // #2132 Upgrade path: if snap.AllocDir is nil, try to convert old + // Context struct to new AllocDir struct + if snap.AllocDir == nil && snap.Context != nil { + r.logger.Printf("[DEBUG] client: migrating state snapshot for alloc %q", r.alloc.ID) + snap.AllocDir = allocdir.NewAllocDir(r.logger, snap.Context.AllocDir.AllocDir) + for taskName := range snap.Context.AllocDir.TaskDirs { + snap.AllocDir.NewTaskDir(taskName) + } } - } - // XXX needs to be updated and handle the upgrade path + // Delete the old state + os.RemoveAll(oldPath) + } else if !os.IsNotExist(err) { + // Something corrupt in the old state file + return err + } else { + // We are doing a normal restore + err := r.stateDB.View(func(tx *bolt.Tx) error { + bkt, err := getAllocationBucket(tx, r.alloc.ID) + if err != nil { + return fmt.Errorf("failed to get allocation bucket: %v", err) + } - // Restore fields - r.alloc = snap.Alloc - r.allocDir = snap.AllocDir - r.allocClientStatus = snap.AllocClientStatus - r.allocClientDescription = snap.AllocClientDescription + // Get the state objects + var mutable allocRunnerMutableState + var immutable allocRunnerImmutableState + var allocDir allocdir.AllocDir + + if err := getObject(bkt, allocRunnerStateImmutableKey, &immutable); err != nil { + return fmt.Errorf("failed to read alloc runner immutable state: %v", err) + } + if err := getObject(bkt, allocRunnerStateMutableKey, &mutable); err != nil { + return fmt.Errorf("failed to read alloc runner mutable state: %v", err) + } + if err := getObject(bkt, allocRunnerStateAllocDirKey, &allocDir); err != nil { + return fmt.Errorf("failed to read alloc runner alloc_dir state: %v", err) + } + + // Populate the fields + r.alloc = immutable.Alloc + r.allocDir = &allocDir + r.allocClientStatus = mutable.AllocClientStatus + r.allocClientDescription = mutable.AllocClientDescription + r.taskStates = mutable.TaskStates + return nil + }) + + if err != nil { + return fmt.Errorf("failed to read allocation state: %v", err) + } + } var snapshotErrors multierror.Error if r.alloc == nil { @@ -194,11 +240,17 @@ func (r *AllocRunner) RestoreState() error { return e } - r.taskStates = snap.Alloc.TaskStates + tg := r.alloc.Job.LookupTaskGroup(r.alloc.TaskGroup) + if tg == nil { + return fmt.Errorf("restored allocation doesn't contain task group %q", r.alloc.TaskGroup) + } // Restore the task runners var mErr multierror.Error - for name, state := range r.taskStates { + for _, task := range tg.Tasks { + name := task.Name + state := r.taskStates[name] + // Mark the task as restored. r.restored[name] = struct{}{} @@ -210,7 +262,6 @@ func (r *AllocRunner) RestoreState() error { return err } - task := &structs.Task{Name: name} tr := NewTaskRunner(r.logger, r.config, r.stateDB, r.setTaskState, td, r.Alloc(), task, r.vaultClient, r.consulClient) r.tasks[name] = tr @@ -231,11 +282,6 @@ func (r *AllocRunner) RestoreState() error { return mErr.ErrorOrNil() } -// GetAllocDir returns the alloc dir for the alloc runner -func (r *AllocRunner) GetAllocDir() *allocdir.AllocDir { - return r.allocDir -} - // SaveState is used to snapshot the state of the alloc runner // if the fullSync is marked as false only the state of the Alloc Runner // is snapshotted. If fullSync is marked as true, we snapshot @@ -330,7 +376,12 @@ func (r *AllocRunner) saveTaskRunnerState(tr *TaskRunner) error { // DestroyState is used to cleanup after ourselves func (r *AllocRunner) DestroyState() error { - return os.RemoveAll(filepath.Dir(r.stateFilePath())) + return r.stateDB.Update(func(tx *bolt.Tx) error { + if err := deleteAllocationBucket(tx, r.alloc.ID); err != nil { + return fmt.Errorf("failed to delete allocation bucket: %v", err) + } + return nil + }) } // DestroyContext is used to destroy the context @@ -338,6 +389,11 @@ func (r *AllocRunner) DestroyContext() error { return r.allocDir.Destroy() } +// GetAllocDir returns the alloc dir for the alloc runner +func (r *AllocRunner) GetAllocDir() *allocdir.AllocDir { + return r.allocDir +} + // copyTaskStates returns a copy of the passed task states. func copyTaskStates(states map[string]*structs.TaskState) map[string]*structs.TaskState { copy := make(map[string]*structs.TaskState, len(states)) @@ -543,7 +599,7 @@ func (r *AllocRunner) Run() { go r.dirtySyncState() // Find the task group to run in the allocation - alloc := r.alloc + alloc := r.Alloc() tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) if tg == nil { r.logger.Printf("[ERR] client: alloc '%s' for missing task group '%s'", alloc.ID, alloc.TaskGroup) @@ -629,6 +685,10 @@ OUTER: for _, tr := range runners { tr.Update(update) } + + if err := r.syncStatus(); err != nil { + r.logger.Printf("[WARN] client: failed to sync status upon receiving alloc update: %v", err) + } case <-r.destroyCh: taskDestroyEvent = structs.NewTaskEvent(structs.TaskKilled) break OUTER diff --git a/client/client.go b/client/client.go index faf0a34bba35..a6a6ba2a417a 100644 --- a/client/client.go +++ b/client/client.go @@ -608,27 +608,56 @@ func (c *Client) restoreState() error { return nil } - // XXX Needs to be updated and handle the upgrade case + // COMPAT: Remove in 0.7.0 + // 0.6.0 transistioned from individual state files to a single bolt-db. + // The upgrade path is to: + // Check if old state exists + // If so, restore from that and delete old state + // Restore using state database + + // Allocs holds the IDs of the allocations being restored + var allocs []string + + // Upgrading tracks whether this is a pre 0.6.0 upgrade path + var upgrading bool // Scan the directory - list, err := ioutil.ReadDir(filepath.Join(c.config.StateDir, "alloc")) - if err != nil && os.IsNotExist(err) { - return nil - } else if err != nil { + allocDir := filepath.Join(c.config.StateDir, "alloc") + list, err := ioutil.ReadDir(allocDir) + if err != nil && !os.IsNotExist(err) { return fmt.Errorf("failed to list alloc state: %v", err) + } else if err == nil && len(list) != 0 { + upgrading = true + for _, entry := range list { + allocs = append(allocs, entry.Name()) + } + } else { + // Normal path + err := c.stateDB.View(func(tx *bolt.Tx) error { + allocs, err = getAllAllocationIDs(tx) + if err != nil { + return fmt.Errorf("failed to list allocations: %v", err) + } + return nil + }) + if err != nil { + return err + } } // Load each alloc back var mErr multierror.Error - for _, entry := range list { - id := entry.Name() + for _, id := range allocs { alloc := &structs.Allocation{ID: id} + c.configLock.RLock() ar := NewAllocRunner(c.logger, c.configCopy, c.stateDB, c.updateAllocStatus, alloc, c.vaultClient, c.consulService) c.configLock.RUnlock() + c.allocLock.Lock() c.allocs[id] = ar c.allocLock.Unlock() + if err := ar.RestoreState(); err != nil { c.logger.Printf("[ERR] client: failed to restore state for alloc %s: %v", id, err) mErr.Errors = append(mErr.Errors, err) @@ -636,6 +665,14 @@ func (c *Client) restoreState() error { go ar.Run() } } + + // Delete all the entries + if upgrading { + if err := os.RemoveAll(allocDir); err != nil { + mErr.Errors = append(mErr.Errors, err) + } + } + return mErr.ErrorOrNil() } @@ -653,10 +690,9 @@ func (c *Client) saveState(blocking bool) error { runners := c.getAllocRunners() wg.Add(len(runners)) - for id, ar := range c.getAllocRunners() { - go func() { - local := ar - err := local.SaveState() + for id, ar := range runners { + go func(id string, ar *AllocRunner) { + err := ar.SaveState() if err != nil { c.logger.Printf("[ERR] client: failed to save state for alloc %s: %v", id, err) l.Lock() @@ -664,7 +700,7 @@ func (c *Client) saveState(blocking bool) error { l.Unlock() } wg.Done() - }() + }(id, ar) } if blocking { @@ -1505,8 +1541,7 @@ func (c *Client) runAllocs(update *allocUpdates) { // Remove the old allocations for _, remove := range diff.removed { if err := c.removeAlloc(remove); err != nil { - c.logger.Printf("[ERR] client: failed to remove alloc '%s': %v", - remove.ID, err) + c.logger.Printf("[ERR] client: failed to remove alloc '%s': %v", remove.ID, err) } } @@ -1581,11 +1616,6 @@ func (c *Client) runAllocs(update *allocUpdates) { add.ID, err) } } - - // Persist our state - if err := c.saveState(false); err != nil { - c.logger.Printf("[ERR] client: failed to save state: %v", err) - } } // blockForRemoteAlloc blocks until the previous allocation of an allocation has @@ -1934,6 +1964,11 @@ func (c *Client) addAlloc(alloc *structs.Allocation, prevAllocDir *allocdir.Allo ar := NewAllocRunner(c.logger, c.configCopy, c.stateDB, c.updateAllocStatus, alloc, c.vaultClient, c.consulService) ar.SetPreviousAllocDir(prevAllocDir) c.configLock.RUnlock() + + if err := ar.SaveState(); err != nil { + c.logger.Printf("[WARN] client: initial save state for alloc %q failed: %v", alloc.ID, err) + } + go ar.Run() // Store the alloc runner. diff --git a/client/state_database.go b/client/state_database.go index ba6f71d71f74..a9a36a5f9d1c 100644 --- a/client/state_database.go +++ b/client/state_database.go @@ -55,24 +55,54 @@ func putData(bkt *bolt.Bucket, key, value []byte) error { return nil } +func getObject(bkt *bolt.Bucket, key []byte, obj interface{}) error { + // Get the data + data := bkt.Get(key) + if data == nil { + return fmt.Errorf("no data at key %v", string(key)) + } + + // Deserialize the object + if err := codec.NewDecoderBytes(data, structs.MsgpackHandle).Decode(obj); err != nil { + return fmt.Errorf("failed to decode data into passed object: %v", err) + } + + return nil +} + // getAllocationBucket returns the bucket used to persist state about a // particular allocation. If the root allocation bucket or the specific -// allocation bucket doesn't exist, it will be created. +// allocation bucket doesn't exist, it will be created as long as the +// transaction is writable. func getAllocationBucket(tx *bolt.Tx, allocID string) (*bolt.Bucket, error) { - if !tx.Writable() { - return nil, fmt.Errorf("transaction must be writable") - } + var err error + w := tx.Writable() // Retrieve the root allocations bucket - allocations, err := tx.CreateBucketIfNotExists(allocationsBucket) - if err != nil { - return nil, err + allocations := tx.Bucket(allocationsBucket) + if allocations == nil { + if !w { + return nil, fmt.Errorf("Allocations bucket doesn't exist and transaction is not writable") + } + + allocations, err = tx.CreateBucket(allocationsBucket) + if err != nil { + return nil, err + } } // Retrieve the specific allocations bucket - alloc, err := allocations.CreateBucketIfNotExists([]byte(allocID)) - if err != nil { - return nil, err + key := []byte(allocID) + alloc := allocations.Bucket(key) + if alloc == nil { + if !w { + return nil, fmt.Errorf("Allocation bucket doesn't exist and transaction is not writable") + } + + alloc, err = allocations.CreateBucket(key) + if err != nil { + return nil, err + } } return alloc, nil @@ -80,29 +110,94 @@ func getAllocationBucket(tx *bolt.Tx, allocID string) (*bolt.Bucket, error) { // getTaskBucket returns the bucket used to persist state about a // particular task. If the root allocation bucket, the specific -// allocation or task bucket doesn't exist, they will be created. +// allocation or task bucket doesn't exist, they will be created as long as the +// transaction is writable. func getTaskBucket(tx *bolt.Tx, allocID, taskName string) (*bolt.Bucket, error) { + alloc, err := getAllocationBucket(tx, allocID) + if err != nil { + return nil, err + } + + // Retrieve the specific task bucket + w := tx.Writable() + key := []byte(taskName) + task := alloc.Bucket(key) + if task == nil { + if !w { + return nil, fmt.Errorf("Task bucket doesn't exist and transaction is not writable") + } + + task, err = alloc.CreateBucket(key) + if err != nil { + return nil, err + } + } + + return task, nil +} + +// deleteAllocationBucket is used to delete an allocation bucket if it exists. +func deleteAllocationBucket(tx *bolt.Tx, allocID string) error { if !tx.Writable() { - return nil, fmt.Errorf("transaction must be writable") + return fmt.Errorf("transaction must be writable") } // Retrieve the root allocations bucket - allocations, err := tx.CreateBucketIfNotExists(allocationsBucket) - if err != nil { - return nil, err + allocations := tx.Bucket(allocationsBucket) + if allocations == nil { + return nil + } + + // Check if the bucket exists + key := []byte(allocID) + if allocBkt := allocations.Bucket(key); allocBkt == nil { + return nil + } + + return allocations.DeleteBucket(key) +} + +// deleteTaskBucket is used to delete a task bucket if it exists. +func deleteTaskBucket(tx *bolt.Tx, allocID, taskName string) error { + if !tx.Writable() { + return fmt.Errorf("transaction must be writable") + } + + // Retrieve the root allocations bucket + allocations := tx.Bucket(allocationsBucket) + if allocations == nil { + return nil } // Retrieve the specific allocations bucket - alloc, err := allocations.CreateBucketIfNotExists([]byte(allocID)) - if err != nil { - return nil, err + alloc := allocations.Bucket([]byte(allocID)) + if alloc == nil { + return nil } - // Retrieve the specific task bucket - task, err := alloc.CreateBucketIfNotExists([]byte(taskName)) - if err != nil { - return nil, err + // Check if the bucket exists + key := []byte(taskName) + if taskBkt := alloc.Bucket(key); taskBkt == nil { + return nil } - return task, nil + return alloc.DeleteBucket(key) +} + +func getAllAllocationIDs(tx *bolt.Tx) ([]string, error) { + allocationsBkt := tx.Bucket(allocationsBucket) + if allocationsBkt == nil { + return nil, nil + } + + // Create a cursor for iteration. + var allocIDs []string + c := allocationsBkt.Cursor() + + // Iterate over all the buckets + for k, _ := c.First(); k != nil; k, _ = c.Next() { + allocIDs = append(allocIDs, string(k)) + } + + return allocIDs, nil } diff --git a/client/task_runner.go b/client/task_runner.go index ca74ae50ac6d..8f91d8f4cac8 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -249,34 +249,61 @@ func (r *TaskRunner) WaitCh() <-chan struct{} { return r.waitCh } -// stateFilePath returns the path to our state file -func (r *TaskRunner) stateFilePath() string { +// pre060StateFilePath returns the path to our state file that would have been +// written pre v0.6.0 +// COMPAT: Remove in 0.7.0 +func (r *TaskRunner) pre060StateFilePath() string { // Get the MD5 of the task name hashVal := md5.Sum([]byte(r.task.Name)) hashHex := hex.EncodeToString(hashVal[:]) dirName := fmt.Sprintf("task-%s", hashHex) // Generate the path - path := filepath.Join(r.config.StateDir, "alloc", r.alloc.ID, - dirName, "state.json") - return path + return filepath.Join(r.config.StateDir, "alloc", r.alloc.ID, dirName, "state.json") } // RestoreState is used to restore our state func (r *TaskRunner) RestoreState() error { - // XXX needs to be updated and handle the upgrade path + // COMPAT: Remove in 0.7.0 + // 0.6.0 transistioned from individual state files to a single bolt-db. + // The upgrade path is to: + // Check if old state exists + // If so, restore from that and delete old state + // Restore using state database - // Load the snapshot var snap taskRunnerState - if err := restoreState(r.stateFilePath(), &snap); err != nil { + + // Check if the old snapshot is there + oldPath := r.pre060StateFilePath() + if err := pre060RestoreState(oldPath, &snap); err == nil { + // Delete the old state + os.RemoveAll(oldPath) + } else if !os.IsNotExist(err) { + // Something corrupt in the old state file return err + } else { + // We are doing a normal restore + err := r.stateDB.View(func(tx *bolt.Tx) error { + bkt, err := getTaskBucket(tx, r.alloc.ID, r.task.Name) + if err != nil { + return fmt.Errorf("failed to get task bucket: %v", err) + } + + if err := getObject(bkt, taskRunnerStateAllKey, &snap); err != nil { + return fmt.Errorf("failed to read task runner state: %v", err) + } + return nil + }) + if err != nil { + return err + } + } - // Restore fields + // Restore fields from the snapshot r.artifactsDownloaded = snap.ArtifactDownloaded r.taskDirBuilt = snap.TaskDirBuilt r.payloadRendered = snap.PayloadRendered - r.setCreatedResources(snap.CreatedResources) if err := r.setTaskEnv(); err != nil { @@ -333,14 +360,13 @@ func (r *TaskRunner) RestoreState() error { r.running = true r.runningLock.Unlock() } + return nil } // SaveState is used to snapshot our state func (r *TaskRunner) SaveState() error { - // XXX needs to be updated r.persistLock.Lock() - snap := taskRunnerState{ Version: r.config.Version, ArtifactDownloaded: r.artifactsDownloaded, @@ -355,6 +381,7 @@ func (r *TaskRunner) SaveState() error { } r.handleLock.Unlock() + // If nothing has changed avoid the write h := snap.Hash() if bytes.Equal(h, r.persistedHash) { r.persistLock.Unlock() @@ -394,11 +421,21 @@ func (r *TaskRunner) DestroyState() error { r.persistLock.Lock() defer r.persistLock.Unlock() - return os.RemoveAll(r.stateFilePath()) + return r.stateDB.Update(func(tx *bolt.Tx) error { + if err := deleteTaskBucket(tx, r.alloc.ID, r.task.Name); err != nil { + return fmt.Errorf("failed to delete task bucket: %v", err) + } + return nil + }) } // setState is used to update the state of the task runner func (r *TaskRunner) setState(state string, event *structs.TaskEvent) { + // Persist our state to disk. + if err := r.SaveState(); err != nil { + r.logger.Printf("[ERR] client: failed to save state of Task Runner for task %q: %v", r.task.Name, err) + } + // Indicate the task has been updated. r.updater(r.task.Name, state, event) } diff --git a/client/util.go b/client/util.go index ee78ebac20ee..233a0259514b 100644 --- a/client/util.go +++ b/client/util.go @@ -106,14 +106,12 @@ func persistState(path string, data interface{}) error { return nil } -// restoreState is used to read back in the persisted state -func restoreState(path string, data interface{}) error { +// pre060RestoreState is used to read back in the persisted state for pre v0.6.0 +// state +func pre060RestoreState(path string, data interface{}) error { buf, err := ioutil.ReadFile(path) if err != nil { - if os.IsNotExist(err) { - return nil - } - return fmt.Errorf("failed to read state: %v", err) + return err } if err := json.Unmarshal(buf, data); err != nil { return fmt.Errorf("failed to decode state: %v", err) diff --git a/demo/vagrant/README.md b/demo/vagrant/README.md index 2150799a4880aadab947c6db4c9fe1223601ba9b..112daa8e4f683b81c16da6837f9d0b0649738e50 100644 GIT binary patch literal 1537 zcmV+c2LAaUiwFSs!v|Ra1MOICZ`(Ey?q~gqgA$-`u`D@$X|v`-*Ca!RHZ7XA`%n}H zEzuTXNfby*PTJ2`sALu$74(v7Txo3A9jUd{M!YCSr!$A+iXt+BV-GlHpgSd$*ZJ5GPSi+JS-rYo~ zHl6;3AD0WPuKy9*g3hB3sX1>z&<>CcKYJ& z@tb$3_r6S+;lPm3iUek8lJJrj2`^&7wWq5@lC(pT$|^hF!QrpR&riljZ(pChJ3Kkq zFJvJK!xf8-sQJo76j=8xgU&fr#b;4MT&WBeB&tR3VFU+qQDl-Oa3XV-cqgx4JwF(K zIDC70{Q6`+3xj?nD$W5U-KW6QQ6}LKhQvk|0@k#vU&=WmwEptZOQ@DOk?$ zYdC%b20gI^CnTx4GD7p57Y2UET?2neHFpj*PoN{9!@<|x;E{9qsJs0D6H&6l1ws=R z!B=35Io$tJDx7QBJ~{e@p9ynu|Dj8y>$`v`0_}OtBJ&zl!Wk&0Vpb{sp!X1P)US^5 z)dO&ML7fK^d4?<0uO?Mts_q#t1j~kfAGRPG24NpwzB}E}UuH{2HVA)1Tq;=$Eo6b2 zr98|09bA$)U$|SE{jdX=O^t{-Ug@!_WHyA$hYS!2KrnZo?8dj;)PX#idrz?X=93$qhpDK zp9iSx;-C_K^rNS}AR0aiM*Se#ed>IFqJ(TY;|!Cb5>2J_i7QKrSlc21wlM5$;<7^dAPlV>HCAT{t3YQiYrhs)KO|?WjNy6-`k!7W zx%mG9_)pR`_}{y0{Er6V)%U*^|L=(Y9{lh5-$m4KFqK$^9Mge(TvcKnYkwMTZvk(A z3~{eu?yBEh!Y%ou*DYBo&9j-E-bYPE6mhj#{@nAR@-Hg-Jm@}HCB}8B4Ex~z> z+2ZoQl{WC-t&iJ=|L@^H3WrzE|9IQt|6R~+<9`D&H=X}Cnu62y+~xYFumZ|#+QAov z*zBx9_%0y)4lD>?!o9VaNNKcDti)4liB{+?a$xya zokwrckI>ac9Xq2 n+Fr6-^lYi6mRf45rIuQ1sil@$YN@4`eun-8nK7+f04M+eORfT! literal 709 zcmZ`%!D<{a488j+1nnV#?CgCBfkFs9wa|oImOW@^JYxlqHCXbd`TNM83A8Eo$=FX% zdhh8AJmZuxDg59W2e|h$JNFbwK0Zc@9xx4nBHRFdmsgK zW28h9{BQ$rTtEVsI8=dnHhWC#e14T= zxGmHM`MY38Y2-hP?n#!2Z`DmLVzNih^ST9i=AzBu$UVljW237|-4jFfm=nxwi?V`S bpmiOZQy|{_w1AF0j%525K}i3<9X0xQ{U_)( From 5952dae3e0686c3484d56e5566efaf40d15f1cab Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 3 May 2017 11:15:30 -0700 Subject: [PATCH 09/13] Fix tests --- client/alloc_runner.go | 5 ++- client/alloc_runner_test.go | 86 ++++++++++++++++++++++++++----------- client/state_database.go | 11 +++++ client/task_runner_test.go | 17 ++++++-- client/util_test.go | 42 ------------------ 5 files changed, 88 insertions(+), 73 deletions(-) diff --git a/client/alloc_runner.go b/client/alloc_runner.go index 4cc279a3954c..db13ba4328ec 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -168,6 +168,7 @@ func (r *AllocRunner) RestoreState() error { var snap allocRunnerState if err := pre060RestoreState(oldPath, &snap); err == nil { // Restore fields + r.logger.Printf("[DEBUG] client: restoring pre v0.6.0 alloc runner state for alloc %q", r.alloc.ID) r.alloc = snap.Alloc r.allocDir = snap.AllocDir r.allocClientStatus = snap.AllocClientStatus @@ -181,9 +182,9 @@ func (r *AllocRunner) RestoreState() error { // Context struct to new AllocDir struct if snap.AllocDir == nil && snap.Context != nil { r.logger.Printf("[DEBUG] client: migrating state snapshot for alloc %q", r.alloc.ID) - snap.AllocDir = allocdir.NewAllocDir(r.logger, snap.Context.AllocDir.AllocDir) + r.allocDir = allocdir.NewAllocDir(r.logger, snap.Context.AllocDir.AllocDir) for taskName := range snap.Context.AllocDir.TaskDirs { - snap.AllocDir.NewTaskDir(taskName) + r.allocDir.NewTaskDir(taskName) } } diff --git a/client/alloc_runner_test.go b/client/alloc_runner_test.go index f1bc828b8072..6206daa0327e 100644 --- a/client/alloc_runner_test.go +++ b/client/alloc_runner_test.go @@ -5,10 +5,12 @@ import ( "io/ioutil" "os" "path/filepath" + "strings" "testing" "text/template" "time" + "github.com/boltdb/bolt" "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" @@ -34,13 +36,15 @@ func testAllocRunnerFromAlloc(alloc *structs.Allocation, restarts bool) (*MockAl conf := config.DefaultConfig() conf.StateDir = os.TempDir() conf.AllocDir = os.TempDir() + tmp, _ := ioutil.TempFile("", "state-db") + db, _ := bolt.Open(tmp.Name(), 0600, nil) upd := &MockAllocStateUpdater{} if !restarts { *alloc.Job.LookupTaskGroup(alloc.TaskGroup).RestartPolicy = structs.RestartPolicy{Attempts: 0} alloc.Job.Type = structs.JobTypeBatch } vclient := vaultclient.NewMockVaultClient() - ar := NewAllocRunner(logger, conf, upd.Update, alloc, vclient, newMockConsulServiceClient()) + ar := NewAllocRunner(logger, conf, db, upd.Update, alloc, vclient, newMockConsulServiceClient()) return upd, ar } @@ -169,9 +173,15 @@ func TestAllocRunner_TerminalUpdate_Destroy(t *testing.T) { return false, fmt.Errorf("got client status %v; want %v", last.ClientStatus, structs.AllocClientStatusComplete) } - // Check the state still exists - if _, err := os.Stat(ar.stateFilePath()); err != nil { - return false, fmt.Errorf("state file destroyed: %v", err) + // Check the allocation state still exists + if err := ar.stateDB.View(func(tx *bolt.Tx) error { + if !allocationBucketExists(tx, ar.Alloc().ID) { + return fmt.Errorf("no bucket for alloc") + } + + return nil + }); err != nil { + return false, fmt.Errorf("state destroyed") } // Check the alloc directory still exists @@ -199,10 +209,14 @@ func TestAllocRunner_TerminalUpdate_Destroy(t *testing.T) { } // Check the state was cleaned - if _, err := os.Stat(ar.stateFilePath()); err == nil { - return false, fmt.Errorf("state file still exists: %v", ar.stateFilePath()) - } else if !os.IsNotExist(err) { - return false, fmt.Errorf("stat err: %v", err) + if err := ar.stateDB.View(func(tx *bolt.Tx) error { + if allocationBucketExists(tx, ar.Alloc().ID) { + return fmt.Errorf("bucket for alloc exists") + } + + return nil + }); err != nil { + return false, fmt.Errorf("state not destroyed") } // Check the alloc directory was cleaned @@ -247,10 +261,14 @@ func TestAllocRunner_Destroy(t *testing.T) { } // Check the state was cleaned - if _, err := os.Stat(ar.stateFilePath()); err == nil { - return false, fmt.Errorf("state file still exists: %v", ar.stateFilePath()) - } else if !os.IsNotExist(err) { - return false, fmt.Errorf("stat err: %v", err) + if err := ar.stateDB.View(func(tx *bolt.Tx) error { + if allocationBucketExists(tx, ar.Alloc().ID) { + return fmt.Errorf("bucket for alloc exists") + } + + return nil + }); err != nil { + return false, fmt.Errorf("state not destroyed") } // Check the alloc directory was cleaned @@ -322,7 +340,7 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) { // Create a new alloc runner l2 := prefixedTestLogger("----- ar2: ") - ar2 := NewAllocRunner(l2, ar.config, upd.Update, + ar2 := NewAllocRunner(l2, ar.config, ar.stateDB, upd.Update, &structs.Allocation{ID: ar.alloc.ID}, ar.vaultClient, ar.consulClient) err = ar2.RestoreState() @@ -366,12 +384,10 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) { } func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) { - ctestutil.ExecCompatible(t) upd, ar := testAllocRunner(false) ar.logger = prefixedTestLogger("ar1: ") // Ensure task takes some time - ar.alloc.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver" task := ar.alloc.Job.TaskGroups[0].Tasks[0] task.Config["run_for"] = "10s" @@ -408,14 +424,14 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) { } // Ensure ar1 doesn't recreate the state file - ar.persistLock.Lock() - defer ar.persistLock.Unlock() + ar.allocLock.Lock() + defer ar.allocLock.Unlock() // Ensure both alloc runners don't destroy ar.destroy = true // Create a new alloc runner - ar2 := NewAllocRunner(ar.logger, ar.config, upd.Update, + ar2 := NewAllocRunner(ar.logger, ar.config, ar.stateDB, upd.Update, &structs.Allocation{ID: ar.alloc.ID}, ar.vaultClient, ar.consulClient) ar2.logger = prefixedTestLogger("ar2: ") err = ar2.RestoreState() @@ -427,8 +443,14 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) { testutil.WaitForResult(func() (bool, error) { // Check the state still exists - if _, err := os.Stat(ar.stateFilePath()); err != nil { - return false, fmt.Errorf("state file destroyed: %v", err) + if err := ar.stateDB.View(func(tx *bolt.Tx) error { + if !allocationBucketExists(tx, ar2.Alloc().ID) { + return fmt.Errorf("no bucket for alloc") + } + + return nil + }); err != nil { + return false, fmt.Errorf("state destroyed") } // Check the alloc directory still exists @@ -457,10 +479,14 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) { } // Check the state was cleaned - if _, err := os.Stat(ar.stateFilePath()); err == nil { - return false, fmt.Errorf("state file still exists: %v", ar.stateFilePath()) - } else if !os.IsNotExist(err) { - return false, fmt.Errorf("stat err: %v", err) + if err := ar.stateDB.View(func(tx *bolt.Tx) error { + if allocationBucketExists(tx, ar2.Alloc().ID) { + return fmt.Errorf("bucket for alloc exists") + } + + return nil + }); err != nil { + return false, fmt.Errorf("state not destroyed") } // Check the alloc directory was cleaned @@ -504,6 +530,14 @@ func TestAllocRunner_RestoreOldState(t *testing.T) { conf := config.DefaultConfig() conf.StateDir = os.TempDir() conf.AllocDir = os.TempDir() + tmp, err := ioutil.TempFile("", "state-db") + if err != nil { + t.Fatalf("error creating state db file: %v", err) + } + db, err := bolt.Open(tmp.Name(), 0600, nil) + if err != nil { + t.Fatalf("error creating state db: %v", err) + } if err := os.MkdirAll(filepath.Join(conf.StateDir, "alloc", alloc.ID), 0777); err != nil { t.Fatalf("error creating state dir: %v", err) @@ -575,7 +609,7 @@ func TestAllocRunner_RestoreOldState(t *testing.T) { alloc.Job.Type = structs.JobTypeBatch vclient := vaultclient.NewMockVaultClient() cclient := newMockConsulServiceClient() - ar := NewAllocRunner(logger, conf, upd.Update, alloc, vclient, cclient) + ar := NewAllocRunner(logger, conf, db, upd.Update, alloc, vclient, cclient) defer ar.Destroy() // RestoreState should fail on the task state since we only test the @@ -591,7 +625,7 @@ func TestAllocRunner_RestoreOldState(t *testing.T) { if len(merr.Errors) != 1 { t.Fatalf("expected exactly 1 error from RestoreState but found: %d: %v", len(merr.Errors), err) } - if expected := "task runner snapshot includes nil Task"; merr.Errors[0].Error() != expected { + if expected := "failed to get task bucket"; !strings.Contains(merr.Errors[0].Error(), expected) { t.Fatalf("expected %q but got: %q", expected, merr.Errors[0].Error()) } diff --git a/client/state_database.go b/client/state_database.go index a9a36a5f9d1c..4a5784a6a7ef 100644 --- a/client/state_database.go +++ b/client/state_database.go @@ -108,6 +108,17 @@ func getAllocationBucket(tx *bolt.Tx, allocID string) (*bolt.Bucket, error) { return alloc, nil } +func allocationBucketExists(tx *bolt.Tx, allocID string) bool { + allocations := tx.Bucket(allocationsBucket) + if allocations == nil { + return false + } + + // Retrieve the specific allocations bucket + alloc := allocations.Bucket([]byte(allocID)) + return alloc != nil +} + // getTaskBucket returns the bucket used to persist state about a // particular task. If the root allocation bucket, the specific // allocation or task bucket doesn't exist, they will be created as long as the diff --git a/client/task_runner_test.go b/client/task_runner_test.go index ede8cb1647c9..f5ac61a0c71e 100644 --- a/client/task_runner_test.go +++ b/client/task_runner_test.go @@ -13,6 +13,7 @@ import ( "testing" "time" + "github.com/boltdb/bolt" "github.com/golang/snappy" "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/config" @@ -76,6 +77,16 @@ func testTaskRunnerFromAlloc(t *testing.T, restarts bool, alloc *structs.Allocat conf := config.DefaultConfig() conf.StateDir = os.TempDir() conf.AllocDir = os.TempDir() + + tmp, err := ioutil.TempFile("", "state-db") + if err != nil { + t.Fatalf("error creating state db file: %v", err) + } + db, err := bolt.Open(tmp.Name(), 0600, nil) + if err != nil { + t.Fatalf("error creating state db: %v", err) + } + upd := &MockTaskStateUpdater{} task := alloc.Job.TaskGroups[0].Tasks[0] // Initialize the port listing. This should be done by the offer process but @@ -105,7 +116,7 @@ func testTaskRunnerFromAlloc(t *testing.T, restarts bool, alloc *structs.Allocat vclient := vaultclient.NewMockVaultClient() cclient := newMockConsulServiceClient() - tr := NewTaskRunner(logger, conf, upd.Update, taskDir, alloc, task, vclient, cclient) + tr := NewTaskRunner(logger, conf, db, upd.Update, taskDir, alloc, task, vclient, cclient) if !restarts { tr.restartTracker = noRestartsTracker() } @@ -365,8 +376,8 @@ func TestTaskRunner_SaveRestoreState(t *testing.T) { } // Create a new task runner - task2 := &structs.Task{Name: ctx.tr.task.Name, Driver: ctx.tr.task.Driver} - tr2 := NewTaskRunner(ctx.tr.logger, ctx.tr.config, ctx.upd.Update, + task2 := &structs.Task{Name: ctx.tr.task.Name, Driver: ctx.tr.task.Driver, Vault: ctx.tr.task.Vault} + tr2 := NewTaskRunner(ctx.tr.logger, ctx.tr.config, ctx.tr.stateDB, ctx.upd.Update, ctx.tr.taskDir, ctx.tr.alloc, task2, ctx.tr.vaultClient, ctx.tr.consul) tr2.restartTracker = noRestartsTracker() if err := tr2.RestoreState(); err != nil { diff --git a/client/util_test.go b/client/util_test.go index c0a8633c3296..7e17220277ce 100644 --- a/client/util_test.go +++ b/client/util_test.go @@ -1,9 +1,6 @@ package client import ( - "io/ioutil" - "os" - "path/filepath" "reflect" "testing" @@ -75,42 +72,3 @@ func TestShuffleStrings(t *testing.T) { t.Fatalf("shuffle failed") } } - -func TestPersistRestoreState(t *testing.T) { - t.Parallel() - dir, err := ioutil.TempDir("", "nomad") - if err != nil { - t.Fatalf("err: %s", err) - } - defer os.RemoveAll(dir) - - // Use a state path inside a non-existent directory. This - // verifies that the directory is created properly. - statePath := filepath.Join(dir, "subdir", "test-persist") - - type stateTest struct { - Foo int - Bar string - Baz bool - } - state := stateTest{ - Foo: 42, - Bar: "the quick brown fox", - Baz: true, - } - - err = persistState(statePath, &state) - if err != nil { - t.Fatalf("err: %v", err) - } - - var out stateTest - err = restoreState(statePath, &out) - if err != nil { - t.Fatalf("err: %v", err) - } - - if !reflect.DeepEqual(state, out) { - t.Fatalf("bad: %#v %#v", state, out) - } -} From 098b02d5bf36b342196b8abdaf7c59ca1f7aecd3 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 3 May 2017 11:27:33 -0700 Subject: [PATCH 10/13] Helpful comment --- client/alloc_runner.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/client/alloc_runner.go b/client/alloc_runner.go index db13ba4328ec..125cfc3c8869 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -410,6 +410,9 @@ func (r *AllocRunner) Alloc() *structs.Allocation { // Clear the job before copying job := r.alloc.Job + + // Since we are clearing the job, anything that access the alloc.Job field + // must acquire the lock or access it via this method. r.alloc.Job = nil alloc := r.alloc.Copy() From 646fd8a2f99b40e9cccf4f9c4d2532ff63ca9ff8 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 3 May 2017 12:38:49 -0700 Subject: [PATCH 11/13] Fix tests --- command/agent/consul/int_test.go | 12 +++++++++++- command/agent/fs_endpoint_test.go | 21 +++++++++++---------- 2 files changed, 22 insertions(+), 11 deletions(-) diff --git a/command/agent/consul/int_test.go b/command/agent/consul/int_test.go index 9243b5822e1f..319f46825129 100644 --- a/command/agent/consul/int_test.go +++ b/command/agent/consul/int_test.go @@ -10,6 +10,7 @@ import ( "testing" "time" + "github.com/boltdb/bolt" consulapi "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/testutil" "github.com/hashicorp/nomad/client" @@ -71,6 +72,15 @@ func TestConsul_Integration(t *testing.T) { } defer os.RemoveAll(conf.AllocDir) + tmp, err := ioutil.TempFile("", "state-db") + if err != nil { + t.Fatalf("error creating state db file: %v", err) + } + db, err := bolt.Open(tmp.Name(), 0600, nil) + if err != nil { + t.Fatalf("error creating state db: %v", err) + } + alloc := mock.Alloc() task := alloc.Job.TaskGroups[0].Tasks[0] task.Driver = "mock_driver" @@ -131,7 +141,7 @@ func TestConsul_Integration(t *testing.T) { serviceClient.Run() close(consulRan) }() - tr := client.NewTaskRunner(logger, conf, logUpdate, taskDir, alloc, task, vclient, serviceClient) + tr := client.NewTaskRunner(logger, conf, db, logUpdate, taskDir, alloc, task, vclient, serviceClient) tr.MarkReceived() go tr.Run() defer func() { diff --git a/command/agent/fs_endpoint_test.go b/command/agent/fs_endpoint_test.go index 0e9b732547c4..2feee179cdd9 100644 --- a/command/agent/fs_endpoint_test.go +++ b/command/agent/fs_endpoint_test.go @@ -19,6 +19,7 @@ import ( "time" "github.com/hashicorp/nomad/client/allocdir" + "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" "github.com/ugorji/go/codec" ) @@ -123,7 +124,7 @@ func TestStreamFramer_Flush(t *testing.T) { sf.Run() // Create a decoder - dec := codec.NewDecoder(r, jsonHandle) + dec := codec.NewDecoder(r, structs.JsonHandle) f := "foo" fe := "bar" @@ -191,7 +192,7 @@ func TestStreamFramer_Batch(t *testing.T) { sf.Run() // Create a decoder - dec := codec.NewDecoder(r, jsonHandle) + dec := codec.NewDecoder(r, structs.JsonHandle) f := "foo" fe := "bar" @@ -268,7 +269,7 @@ func TestStreamFramer_Heartbeat(t *testing.T) { sf.Run() // Create a decoder - dec := codec.NewDecoder(r, jsonHandle) + dec := codec.NewDecoder(r, structs.JsonHandle) // Start the reader resultCh := make(chan struct{}) @@ -320,7 +321,7 @@ func TestStreamFramer_Order(t *testing.T) { sf.Run() // Create a decoder - dec := codec.NewDecoder(r, jsonHandle) + dec := codec.NewDecoder(r, structs.JsonHandle) files := []string{"1", "2", "3", "4", "5"} input := bytes.NewBuffer(make([]byte, 0, 100000)) @@ -592,7 +593,7 @@ func TestHTTP_Stream_Modify(t *testing.T) { r, w := io.Pipe() defer r.Close() defer w.Close() - dec := codec.NewDecoder(r, jsonHandle) + dec := codec.NewDecoder(r, structs.JsonHandle) data := []byte("helloworld") @@ -668,7 +669,7 @@ func TestHTTP_Stream_Truncate(t *testing.T) { r, w := io.Pipe() defer r.Close() defer w.Close() - dec := codec.NewDecoder(r, jsonHandle) + dec := codec.NewDecoder(r, structs.JsonHandle) data := []byte("helloworld") @@ -778,7 +779,7 @@ func TestHTTP_Stream_Delete(t *testing.T) { wrappedW := &WriteCloseChecker{WriteCloser: w} defer r.Close() defer w.Close() - dec := codec.NewDecoder(r, jsonHandle) + dec := codec.NewDecoder(r, structs.JsonHandle) data := []byte("helloworld") @@ -869,7 +870,7 @@ func TestHTTP_Logs_NoFollow(t *testing.T) { wrappedW := &WriteCloseChecker{WriteCloser: w} defer r.Close() defer w.Close() - dec := codec.NewDecoder(r, jsonHandle) + dec := codec.NewDecoder(r, structs.JsonHandle) var received []byte @@ -955,7 +956,7 @@ func TestHTTP_Logs_Follow(t *testing.T) { wrappedW := &WriteCloseChecker{WriteCloser: w} defer r.Close() defer w.Close() - dec := codec.NewDecoder(r, jsonHandle) + dec := codec.NewDecoder(r, structs.JsonHandle) var received []byte @@ -1071,7 +1072,7 @@ func BenchmarkHTTP_Logs_Follow(t *testing.B) { wrappedW := &WriteCloseChecker{WriteCloser: w} defer r.Close() defer w.Close() - dec := codec.NewDecoder(r, jsonHandle) + dec := codec.NewDecoder(r, structs.JsonHandle) var received []byte From 3f1ccf72786d81d5cd0ce82a61b6b14f3d5d8e2c Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Tue, 9 May 2017 10:50:24 -0700 Subject: [PATCH 12/13] Respond to comments --- client/alloc_runner.go | 37 ++++++++++++++++++++++++++++--------- client/client.go | 24 ++++++++++++------------ client/task_runner.go | 3 +-- client/util.go | 32 -------------------------------- 4 files changed, 41 insertions(+), 55 deletions(-) diff --git a/client/alloc_runner.go b/client/alloc_runner.go index 125cfc3c8869..81d2185f2222 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -163,12 +163,14 @@ func (r *AllocRunner) pre060StateFilePath() string { // RestoreState is used to restore the state of the alloc runner func (r *AllocRunner) RestoreState() error { + // COMPAT: Remove in 0.7.0 // Check if the old snapshot is there oldPath := r.pre060StateFilePath() var snap allocRunnerState + var upgrading bool if err := pre060RestoreState(oldPath, &snap); err == nil { // Restore fields - r.logger.Printf("[DEBUG] client: restoring pre v0.6.0 alloc runner state for alloc %q", r.alloc.ID) + r.logger.Printf("[INFO] client: restoring pre v0.6.0 alloc runner state for alloc %q", r.alloc.ID) r.alloc = snap.Alloc r.allocDir = snap.AllocDir r.allocClientStatus = snap.AllocClientStatus @@ -178,6 +180,7 @@ func (r *AllocRunner) RestoreState() error { r.taskStates = snap.Alloc.TaskStates } + // COMPAT: Remove in 0.7.0 // #2132 Upgrade path: if snap.AllocDir is nil, try to convert old // Context struct to new AllocDir struct if snap.AllocDir == nil && snap.Context != nil { @@ -190,6 +193,7 @@ func (r *AllocRunner) RestoreState() error { // Delete the old state os.RemoveAll(oldPath) + upgrading = true } else if !os.IsNotExist(err) { // Something corrupt in the old state file return err @@ -222,6 +226,7 @@ func (r *AllocRunner) RestoreState() error { r.allocClientStatus = mutable.AllocClientStatus r.allocClientDescription = mutable.AllocClientDescription r.taskStates = mutable.TaskStates + r.alloc.ClientStatus = getClientStatus(r.taskStates) return nil }) @@ -277,6 +282,12 @@ func (r *AllocRunner) RestoreState() error { } else if !r.alloc.TerminalStatus() { // Only start if the alloc isn't in a terminal status. go tr.Run() + + if upgrading { + if err := tr.SaveState(); err != nil { + r.logger.Printf("[WARN] client: initial save state for alloc %s task %s failed: %v", r.alloc.ID, name, err) + } + } } } @@ -437,10 +448,19 @@ func (r *AllocRunner) Alloc() *structs.Allocation { r.allocLock.Unlock() // Scan the task states to determine the status of the alloc - var pending, running, dead, failed bool r.taskStatusLock.RLock() alloc.TaskStates = copyTaskStates(r.taskStates) - for _, state := range r.taskStates { + alloc.ClientStatus = getClientStatus(r.taskStates) + r.taskStatusLock.RUnlock() + + return alloc +} + +// getClientStatus takes in the task states for a given allocation and computes +// the client status +func getClientStatus(taskStates map[string]*structs.TaskState) string { + var pending, running, dead, failed bool + for _, state := range taskStates { switch state.State { case structs.TaskStateRunning: running = true @@ -454,20 +474,19 @@ func (r *AllocRunner) Alloc() *structs.Allocation { } } } - r.taskStatusLock.RUnlock() // Determine the alloc status if failed { - alloc.ClientStatus = structs.AllocClientStatusFailed + return structs.AllocClientStatusFailed } else if running { - alloc.ClientStatus = structs.AllocClientStatusRunning + return structs.AllocClientStatusRunning } else if pending { - alloc.ClientStatus = structs.AllocClientStatusPending + return structs.AllocClientStatusPending } else if dead { - alloc.ClientStatus = structs.AllocClientStatusComplete + return structs.AllocClientStatusComplete } - return alloc + return "" } // dirtySyncState is used to watch for state being marked dirty to sync diff --git a/client/client.go b/client/client.go index a6a6ba2a417a..59460bf1199a 100644 --- a/client/client.go +++ b/client/client.go @@ -447,7 +447,7 @@ func (c *Client) Shutdown() error { c.shutdown = true close(c.shutdownCh) c.connPool.Shutdown() - return c.saveState(true) + return c.saveState() } // RPC is used to forward an RPC call to a nomad server, or fail if no servers. @@ -663,6 +663,12 @@ func (c *Client) restoreState() error { mErr.Errors = append(mErr.Errors, err) } else { go ar.Run() + + if upgrading { + if err := ar.SaveState(); err != nil { + c.logger.Printf("[WARN] client: initial save state for alloc %s failed: %v", id, err) + } + } } } @@ -676,10 +682,8 @@ func (c *Client) restoreState() error { return mErr.ErrorOrNil() } -// saveState is used to snapshot our state into the data dir. If blocking is set -// to true, the function will only return once state has been saved. If false, -// the errors will be logged and state saving will be asyncronous -func (c *Client) saveState(blocking bool) error { +// saveState is used to snapshot our state into the data dir. +func (c *Client) saveState() error { if c.config.DevMode { return nil } @@ -703,12 +707,8 @@ func (c *Client) saveState(blocking bool) error { }(id, ar) } - if blocking { - wg.Wait() - return mErr.ErrorOrNil() - } - - return nil + wg.Wait() + return mErr.ErrorOrNil() } // getAllocRunners returns a snapshot of the current set of alloc runners. @@ -1062,7 +1062,7 @@ func (c *Client) periodicSnapshot() { select { case <-snapshot: snapshot = time.After(stateSnapshotIntv) - if err := c.saveState(false); err != nil { + if err := c.saveState(); err != nil { c.logger.Printf("[ERR] client: failed to save state: %v", err) } diff --git a/client/task_runner.go b/client/task_runner.go index 8f91d8f4cac8..5d1a9d23e11f 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -367,6 +367,7 @@ func (r *TaskRunner) RestoreState() error { // SaveState is used to snapshot our state func (r *TaskRunner) SaveState() error { r.persistLock.Lock() + defer r.persistLock.Unlock() snap := taskRunnerState{ Version: r.config.Version, ArtifactDownloaded: r.artifactsDownloaded, @@ -384,7 +385,6 @@ func (r *TaskRunner) SaveState() error { // If nothing has changed avoid the write h := snap.Hash() if bytes.Equal(h, r.persistedHash) { - r.persistLock.Unlock() return nil } @@ -393,7 +393,6 @@ func (r *TaskRunner) SaveState() error { if err := codec.NewEncoder(&buf, structs.MsgpackHandle).Encode(&snap); err != nil { return fmt.Errorf("failed to serialize snapshot: %v", err) } - r.persistLock.Unlock() // Start the transaction. return r.stateDB.Batch(func(tx *bolt.Tx) error { diff --git a/client/util.go b/client/util.go index 233a0259514b..32f765550c51 100644 --- a/client/util.go +++ b/client/util.go @@ -1,16 +1,12 @@ package client import ( - "bytes" "encoding/json" "fmt" "io/ioutil" "math/rand" - "os" - "path/filepath" "github.com/hashicorp/nomad/nomad/structs" - "github.com/ugorji/go/codec" ) type allocTuple struct { @@ -78,34 +74,6 @@ func shuffleStrings(list []string) { } } -// persistState is used to help with saving state -func persistState(path string, data interface{}) error { - var buf bytes.Buffer - enc := codec.NewEncoder(&buf, structs.JsonHandlePretty) - if err := enc.Encode(data); err != nil { - return err - } - - if err := os.MkdirAll(filepath.Dir(path), 0700); err != nil { - return fmt.Errorf("failed to make dirs for %s: %v", path, err) - } - tmpPath := path + ".tmp" - if err := ioutil.WriteFile(tmpPath, buf.Bytes(), 0600); err != nil { - return fmt.Errorf("failed to save state to tmp: %v", err) - } - if err := os.Rename(tmpPath, path); err != nil { - return fmt.Errorf("failed to rename tmp to path: %v", err) - } - - // Sanity check since users have reported empty state files on disk - if stat, err := os.Stat(path); err != nil { - return fmt.Errorf("unable to stat state file %s: %v", path, err) - } else if stat.Size() == 0 { - return fmt.Errorf("persisted invalid state file %s; see https://github.com/hashicorp/nomad/issues/1367", path) - } - return nil -} - // pre060RestoreState is used to read back in the persisted state for pre v0.6.0 // state func pre060RestoreState(path string, data interface{}) error { From 997390b04c6809ec7a4b2f4323182643f73e293a Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Tue, 9 May 2017 11:20:35 -0700 Subject: [PATCH 13/13] Fix test --- client/alloc_runner.go | 1 + client/alloc_runner_test.go | 17 +++++++++++------ 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/client/alloc_runner.go b/client/alloc_runner.go index 3dea31a89815..6275bd1dd5a1 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -291,6 +291,7 @@ func (r *AllocRunner) RestoreState() error { // Restart task runner if RestoreState gave a reason if restartReason != "" { + r.logger.Printf("[INFO] client: restarting alloc %s task %s: %v", r.alloc.ID, name, restartReason) tr.Restart("upgrade", restartReason) } } diff --git a/client/alloc_runner_test.go b/client/alloc_runner_test.go index 7d3ea253e684..ee3b2e0c0d49 100644 --- a/client/alloc_runner_test.go +++ b/client/alloc_runner_test.go @@ -522,7 +522,14 @@ func TestAllocRunner_SaveRestoreState_Upgrade(t *testing.T) { // Snapshot state testutil.WaitForResult(func() (bool, error) { - return len(ar.tasks) == 1, nil + if upd.Count == 0 { + return false, fmt.Errorf("No updates") + } + last := upd.Allocs[upd.Count-1] + if last.ClientStatus != structs.AllocClientStatusRunning { + return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusRunning) + } + return true, nil }, func(err error) { t.Fatalf("task never started: %v", err) }) @@ -534,9 +541,7 @@ func TestAllocRunner_SaveRestoreState_Upgrade(t *testing.T) { // Create a new alloc runner l2 := prefixedTestLogger("----- ar2: ") - ar2 := NewAllocRunner(l2, origConfig, upd.Update, - &structs.Allocation{ID: ar.alloc.ID}, ar.vaultClient, - ar.consulClient) + ar2 := NewAllocRunner(l2, origConfig, ar.stateDB, upd.Update, &structs.Allocation{ID: ar.alloc.ID}, ar.vaultClient, ar.consulClient) err = ar2.RestoreState() if err != nil { t.Fatalf("err: %v", err) @@ -552,14 +557,14 @@ func TestAllocRunner_SaveRestoreState_Upgrade(t *testing.T) { return false, nil } - for _, ev := range ar2.alloc.TaskStates["web"].Events { + for _, ev := range ar2.taskStates["web"].Events { if strings.HasSuffix(ev.RestartReason, pre06ScriptCheckReason) { return true, nil } } return false, fmt.Errorf("no restart with proper reason found") }, func(err error) { - t.Fatalf("err: %v\nAllocs: %#v\nWeb State: %#v", err, upd.Allocs, ar2.alloc.TaskStates["web"]) + t.Fatalf("err: %v\nAllocs: %#v\nWeb State: %#v", err, upd.Allocs, ar2.taskStates["web"]) }) // Destroy and wait