diff --git a/client/allocdir/alloc_dir.go b/client/allocdir/alloc_dir.go index 1c31a68b4170..65831af6630e 100644 --- a/client/allocdir/alloc_dir.go +++ b/client/allocdir/alloc_dir.go @@ -9,6 +9,7 @@ import ( "log" "os" "path/filepath" + "sync" "time" "github.com/hashicorp/go-multierror" @@ -58,6 +59,8 @@ var ( TaskDirs = map[string]os.FileMode{TmpDirName: os.ModeSticky | 0777} ) +// AllocDir allows creating, destroying, and accessing an allocation's +// directory. All methods are safe for concurrent use. type AllocDir struct { // AllocDir is the directory used for storing any state // of this allocation. It will be purged on alloc destroy. @@ -73,6 +76,8 @@ type AllocDir struct { // built is true if Build has successfully run built bool + mu sync.RWMutex + logger *log.Logger } @@ -100,6 +105,9 @@ func NewAllocDir(logger *log.Logger, allocDir string) *AllocDir { // Copy an AllocDir and all of its TaskDirs. Returns nil if AllocDir is // nil. func (d *AllocDir) Copy() *AllocDir { + d.mu.RLock() + defer d.mu.RUnlock() + if d == nil { return nil } @@ -117,6 +125,9 @@ func (d *AllocDir) Copy() *AllocDir { // NewTaskDir creates a new TaskDir and adds it to the AllocDirs TaskDirs map. func (d *AllocDir) NewTaskDir(name string) *TaskDir { + d.mu.Lock() + defer d.mu.Unlock() + td := newTaskDir(d.logger, d.AllocDir, name) d.TaskDirs[name] = td return td @@ -129,6 +140,9 @@ func (d *AllocDir) NewTaskDir(name string) *TaskDir { // file "NOMAD-${ALLOC_ID}-ERROR.log" will be appended to the tar with the // error message as the contents. func (d *AllocDir) Snapshot(w io.Writer) error { + d.mu.RLock() + defer d.mu.RUnlock() + allocDataDir := filepath.Join(d.SharedDir, SharedDataDir) rootPaths := []string{allocDataDir} for _, taskdir := range d.TaskDirs { @@ -206,11 +220,16 @@ func (d *AllocDir) Snapshot(w io.Writer) error { // Move other alloc directory's shared path and local dir to this alloc dir. func (d *AllocDir) Move(other *AllocDir, tasks []*structs.Task) error { + d.mu.RLock() if !d.built { // Enforce the invariant that Build is called before Move + d.mu.RUnlock() return fmt.Errorf("unable to move to %q - alloc dir is not built", d.AllocDir) } + // Moving is slow and only reads immutable fields, so unlock during heavy IO + d.mu.RUnlock() + // Move the data directory otherDataDir := filepath.Join(other.SharedDir, SharedDataDir) dataDir := filepath.Join(d.SharedDir, SharedDataDir) @@ -246,7 +265,6 @@ func (d *AllocDir) Move(other *AllocDir, tasks []*structs.Task) error { // Tears down previously build directory structure. func (d *AllocDir) Destroy() error { - // Unmount all mounted shared alloc dirs. var mErr multierror.Error if err := d.UnmountAll(); err != nil { @@ -258,12 +276,17 @@ func (d *AllocDir) Destroy() error { } // Unset built since the alloc dir has been destroyed. + d.mu.Lock() d.built = false + d.mu.Unlock() return mErr.ErrorOrNil() } // UnmountAll linked/mounted directories in task dirs. func (d *AllocDir) UnmountAll() error { + d.mu.RLock() + defer d.mu.RUnlock() + var mErr multierror.Error for _, dir := range d.TaskDirs { // Check if the directory has the shared alloc mounted. @@ -322,7 +345,9 @@ func (d *AllocDir) Build() error { } // Mark as built + d.mu.Lock() d.built = true + d.mu.Unlock() return nil } @@ -386,11 +411,14 @@ func (d *AllocDir) ReadAt(path string, offset int64) (io.ReadCloser, error) { p := filepath.Join(d.AllocDir, path) // Check if it is trying to read into a secret directory + d.mu.RLock() for _, dir := range d.TaskDirs { if filepath.HasPrefix(p, dir.SecretsDir) { + d.mu.RUnlock() return nil, fmt.Errorf("Reading secret file prohibited: %s", path) } } + d.mu.RUnlock() f, err := os.Open(p) if err != nil { diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index b60432332e57..cee65299d7f2 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -13,6 +13,7 @@ import ( "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/allocrunner/taskrunner" + "github.com/hashicorp/nomad/client/allocwatcher" "github.com/hashicorp/nomad/client/config" consulApi "github.com/hashicorp/nomad/client/consul" "github.com/hashicorp/nomad/client/vaultclient" @@ -77,7 +78,7 @@ type AllocRunner struct { // the migrates it data. If sticky volumes aren't used and there's no // previous allocation a noop implementation is used so it always safe // to call. - prevAlloc prevAllocWatcher + prevAlloc allocwatcher.PrevAllocWatcher // ctx is cancelled with exitFn to cause the alloc to be destroyed // (stopped and GC'd). @@ -133,26 +134,26 @@ type allocRunnerMutableState struct { // NewAllocRunner is used to create a new allocation context func NewAllocRunner(logger *log.Logger, config *config.Config, stateDB *bolt.DB, updater AllocStateUpdater, alloc *structs.Allocation, vaultClient vaultclient.VaultClient, consulClient consulApi.ConsulServiceAPI, - prevAlloc prevAllocWatcher) *AllocRunner { + prevAlloc allocwatcher.PrevAllocWatcher) *AllocRunner { ar := &AllocRunner{ - config: config, - stateDB: stateDB, - updater: updater, - logger: logger, - alloc: alloc, - allocID: alloc.ID, - allocBroadcast: cstructs.NewAllocBroadcaster(8), - prevAlloc: prevAlloc, - dirtyCh: make(chan struct{}, 1), - allocDir: allocdir.NewAllocDir(logger, filepath.Join(config.AllocDir, alloc.ID)), - tasks: make(map[string]*taskrunner.TaskRunner), - taskStates: copyTaskStates(alloc.TaskStates), - restored: make(map[string]struct{}), - updateCh: make(chan *structs.Allocation, 64), - waitCh: make(chan struct{}), - vaultClient: vaultClient, - consulClient: consulClient, + config: config, + stateDB: stateDB, + updater: updater, + logger: logger, + alloc: alloc, + allocID: alloc.ID, + //allocBroadcast: cstructs.NewAllocBroadcaster(8), + prevAlloc: prevAlloc, + dirtyCh: make(chan struct{}, 1), + allocDir: allocdir.NewAllocDir(logger, filepath.Join(config.AllocDir, alloc.ID)), + tasks: make(map[string]*taskrunner.TaskRunner), + taskStates: copyTaskStates(alloc.TaskStates), + restored: make(map[string]struct{}), + updateCh: make(chan *structs.Allocation, 64), + waitCh: make(chan struct{}), + vaultClient: vaultClient, + consulClient: consulClient, } // TODO Should be passed a context @@ -612,9 +613,9 @@ func (r *AllocRunner) sendBroadcast(alloc *structs.Allocation) { // Try to send the alloc up to three times with a delay to allow recovery. sent := false for i := 0; i < 3; i++ { - if sent = r.allocBroadcast.Send(alloc); sent { - break - } + //if sent = r.allocBroadcast.Send(alloc); sent { + // break + //} time.Sleep(500 * time.Millisecond) } if !sent { diff --git a/client/allocrunner/testing.go b/client/allocrunner/testing.go index f9e8152bcf04..4c4c1a3ff743 100644 --- a/client/allocrunner/testing.go +++ b/client/allocrunner/testing.go @@ -7,6 +7,7 @@ import ( "testing" "github.com/boltdb/bolt" + "github.com/hashicorp/nomad/client/allocwatcher" "github.com/hashicorp/nomad/client/config" consulApi "github.com/hashicorp/nomad/client/consul" "github.com/hashicorp/nomad/client/vaultclient" @@ -51,7 +52,7 @@ func TestAllocRunnerFromAlloc(t *testing.T, alloc *structs.Allocation, restarts alloc.Job.Type = structs.JobTypeBatch } vclient := vaultclient.NewMockVaultClient() - ar := NewAllocRunner(testlog.Logger(t), conf, db, upd.Update, alloc, vclient, consulApi.NewMockConsulServiceClient(t, testlog.HCLogger(t)), NoopPrevAlloc{}) + ar := NewAllocRunner(testlog.Logger(t), conf, db, upd.Update, alloc, vclient, consulApi.NewMockConsulServiceClient(t), allocwatcher.NoopPrevAlloc{}) return upd, ar } diff --git a/client/allocrunnerv2/alloc_runner.go b/client/allocrunnerv2/alloc_runner.go index b9cce6f126a8..4172fcd52327 100644 --- a/client/allocrunnerv2/alloc_runner.go +++ b/client/allocrunnerv2/alloc_runner.go @@ -13,6 +13,7 @@ import ( "github.com/hashicorp/nomad/client/allocrunnerv2/interfaces" "github.com/hashicorp/nomad/client/allocrunnerv2/state" "github.com/hashicorp/nomad/client/allocrunnerv2/taskrunner" + "github.com/hashicorp/nomad/client/allocwatcher" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/consul" cinterfaces "github.com/hashicorp/nomad/client/interfaces" @@ -51,10 +52,14 @@ type allocRunner struct { // vaultClient is the used to manage Vault tokens vaultClient vaultclient.VaultClient - // waitCh is closed when the alloc runner has transitioned to a terminal - // state + // waitCh is closed when the Run() loop has exited waitCh chan struct{} + // destroyed is true when the Run() loop has exited, postrun hooks have + // run, and alloc runner has been destroyed + destroyed bool + destroyedLock sync.Mutex + // Alloc captures the allocation being run. alloc *structs.Allocation allocLock sync.RWMutex @@ -81,6 +86,13 @@ type allocRunner struct { // have buffer size 1 in order to support dropping pending updates when // a newer allocation is received. updateCh chan *structs.Allocation + + // allocBroadcaster sends client allocation updates to all listeners + allocBroadcaster *cstructs.AllocBroadcaster + + // prevAllocWatcher allows waiting for a previous allocation to exit + // and if necessary migrate its alloc dir. + prevAllocWatcher allocwatcher.PrevAllocWatcher } // NewAllocRunner returns a new allocation runner. @@ -92,17 +104,19 @@ func NewAllocRunner(config *Config) (*allocRunner, error) { } ar := &allocRunner{ - id: alloc.ID, - alloc: alloc, - clientConfig: config.ClientConfig, - consulClient: config.Consul, - vaultClient: config.Vault, - tasks: make(map[string]*taskrunner.TaskRunner, len(tg.Tasks)), - waitCh: make(chan struct{}), - updateCh: make(chan *structs.Allocation, updateChCap), - state: &state.State{}, - stateDB: config.StateDB, - stateUpdater: config.StateUpdater, + id: alloc.ID, + alloc: alloc, + clientConfig: config.ClientConfig, + consulClient: config.Consul, + vaultClient: config.Vault, + tasks: make(map[string]*taskrunner.TaskRunner, len(tg.Tasks)), + waitCh: make(chan struct{}), + updateCh: make(chan *structs.Allocation, updateChCap), + state: &state.State{}, + stateDB: config.StateDB, + stateUpdater: config.StateUpdater, + allocBroadcaster: cstructs.NewAllocBroadcaster(alloc), + prevAllocWatcher: config.PrevAllocWatcher, } // Create alloc dir @@ -154,7 +168,7 @@ func (ar *allocRunner) WaitCh() <-chan struct{} { } // XXX How does alloc Restart work -// Run is the main go-routine that executes all the tasks. +// Run is the main goroutine that executes all the tasks. func (ar *allocRunner) Run() { // Close the wait channel defer close(ar.waitCh) @@ -220,6 +234,11 @@ func (ar *allocRunner) setAlloc(updated *structs.Allocation) { ar.allocLock.Unlock() } +// GetAllocDir returns the alloc dir which is safe for concurrent use. +func (ar *allocRunner) GetAllocDir() *allocdir.AllocDir { + return ar.allocDir +} + // Restore state from database. Must be called after NewAllocRunner but before // Run. func (ar *allocRunner) Restore() error { @@ -287,6 +306,9 @@ func (ar *allocRunner) TaskStateUpdated(taskName string, state *structs.TaskStat // Update the server ar.stateUpdater.AllocStateUpdated(calloc) + + // Broadcast client alloc to listeners + ar.allocBroadcaster.Send(calloc) } // clientAlloc takes in the task states and returns an Allocation populated @@ -395,19 +417,45 @@ func (ar *allocRunner) Update(update *structs.Allocation) { } } -// Destroy the alloc runner by stopping it if it is still running and cleaning -// up all of its resources. +func (ar *allocRunner) Listener() *cstructs.AllocListener { + return ar.allocBroadcaster.Listen() +} + +// Destroy the alloc runner by synchronously stopping it if it is still running +// and cleaning up all of its resources. // -// This method is safe for calling concurrently with Run(). Callers must -// receive on WaitCh() to block until alloc runner has stopped and been -// destroyed. -//XXX TODO +// This method is safe for calling concurrently with Run() and will cause it to +// exit (thus closing WaitCh). func (ar *allocRunner) Destroy() { - //TODO + // Stop tasks + for name, tr := range ar.tasks { + err := tr.Kill(context.TODO(), structs.NewTaskEvent(structs.TaskKilled)) + if err != nil { + if err == taskrunner.ErrTaskNotRunning { + ar.logger.Trace("task not running", "task_name", name) + } else { + ar.logger.Warn("failed to kill task", "error", err, "task_name", name) + } + } + } - for _, tr := range ar.tasks { - tr.Kill(context.Background(), structs.NewTaskEvent(structs.TaskKilled)) + // Wait for tasks to exit and postrun hooks to finish + <-ar.waitCh + + // Run destroy hooks + if err := ar.destroy(); err != nil { + ar.logger.Warn("error running destroy hooks", "error", err) } + + // Cleanup state db + if err := ar.stateDB.DeleteAllocationBucket(ar.id); err != nil { + ar.logger.Warn("failed to delete allocation state", "error", err) + } + + // Mark alloc as destroyed + ar.destroyedLock.Lock() + ar.destroyed = true + ar.destroyedLock.Unlock() } // IsDestroyed returns true if the alloc runner has been destroyed (stopped and @@ -416,27 +464,26 @@ func (ar *allocRunner) Destroy() { // This method is safe for calling concurrently with Run(). Callers must // receive on WaitCh() to block until alloc runner has stopped and been // destroyed. -//XXX TODO func (ar *allocRunner) IsDestroyed() bool { - return false + ar.destroyedLock.Lock() + defer ar.destroyedLock.Unlock() + return ar.destroyed } // IsWaiting returns true if the alloc runner is waiting for its previous // allocation to terminate. // // This method is safe for calling concurrently with Run(). -//XXX TODO func (ar *allocRunner) IsWaiting() bool { - return false + return ar.prevAllocWatcher.IsWaiting() } // IsMigrating returns true if the alloc runner is migrating data from its // previous allocation. // // This method is safe for calling concurrently with Run(). -//XXX TODO func (ar *allocRunner) IsMigrating() bool { - return false + return ar.prevAllocWatcher.IsMigrating() } // StatsReporter needs implementing diff --git a/client/allocrunnerv2/alloc_runner_hooks.go b/client/allocrunnerv2/alloc_runner_hooks.go index a3cac16f07d7..4ac7b9ab8b7b 100644 --- a/client/allocrunnerv2/alloc_runner_hooks.go +++ b/client/allocrunnerv2/alloc_runner_hooks.go @@ -6,17 +6,22 @@ import ( "time" log "github.com/hashicorp/go-hclog" + multierror "github.com/hashicorp/go-multierror" + "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/allocrunnerv2/interfaces" + "github.com/hashicorp/nomad/client/allocwatcher" ) // initRunnerHooks intializes the runners hooks. func (ar *allocRunner) initRunnerHooks() { hookLogger := ar.logger.Named("runner_hook") - ar.runnerHooks = make([]interfaces.RunnerHook, 0, 3) // Create the alloc directory hook. This is run first to ensure the // directoy path exists for other hooks. - ar.runnerHooks = append(ar.runnerHooks, newAllocDirHook(ar, hookLogger)) + ar.runnerHooks = []interfaces.RunnerHook{ + newAllocDirHook(hookLogger, ar), + newDiskMigrationHook(hookLogger, ar.prevAllocWatcher, ar.allocDir), + } } // prerun is used to run the runners prerun hooks. @@ -36,17 +41,21 @@ func (ar *allocRunner) prerun() error { continue } + //TODO Check hook state + name := pre.Name() var start time.Time if ar.logger.IsTrace() { start = time.Now() - ar.logger.Trace("running prestart hook", "name", name, "start", start) + ar.logger.Trace("running pre-run hook", "name", name, "start", start) } - if err := pre.Prerun(); err != nil { + if err := pre.Prerun(context.TODO()); err != nil { return fmt.Errorf("hook %q failed: %v", name, err) } + //TODO Persist hook state locally + if ar.logger.IsTrace() { end := time.Now() ar.logger.Trace("finished pre-run hooks", "name", name, "end", end, "duration", end.Sub(start)) @@ -93,28 +102,53 @@ func (ar *allocRunner) postrun() error { return nil } -/* -What state is needed to transfer: -*/ +// destroy is used to run the runners destroy hooks. All hooks are run and +// errors are returned as a multierror. +func (ar *allocRunner) destroy() error { + if ar.logger.IsTrace() { + start := time.Now() + ar.logger.Trace("running destroy hooks", "start", start) + defer func() { + end := time.Now() + ar.logger.Trace("finished destroy hooks", "end", end, "duration", end.Sub(start)) + }() + } + + var merr multierror.Error + for _, hook := range ar.runnerHooks { + h, ok := hook.(interfaces.RunnerDestroyHook) + if !ok { + continue + } + + name := h.Name() + var start time.Time + if ar.logger.IsTrace() { + start = time.Now() + ar.logger.Trace("running destroy hook", "name", name, "start", start) + } -/* -AR Hooks: -Alloc Dir Build: -Needs to know the folder to create + if err := h.Destroy(); err != nil { + merr.Errors = append(merr.Errors, fmt.Errorf("destroy hook %q failed: %v", name, err)) + } -Alloc Migrate -Needs access to RPC + if ar.logger.IsTrace() { + end := time.Now() + ar.logger.Trace("finished destroy hooks", "name", name, "end", end, "duration", end.Sub(start)) + } + } -Alloc Health Watcher: -Requires: Access to consul to watch health, access to every task event, task status change -*/ + return nil +} +// allocDirHook creates and destroys the root directory and shared directories +// for an allocation. type allocDirHook struct { runner *allocRunner logger log.Logger } -func newAllocDirHook(runner *allocRunner, logger log.Logger) *allocDirHook { +func newAllocDirHook(logger log.Logger, runner *allocRunner) *allocDirHook { ad := &allocDirHook{ runner: runner, } @@ -126,7 +160,7 @@ func (h *allocDirHook) Name() string { return "alloc_dir" } -func (h *allocDirHook) Prerun() error { +func (h *allocDirHook) Prerun(context.Context) error { return h.runner.allocDir.Build() } @@ -134,6 +168,52 @@ func (h *allocDirHook) Destroy() error { return h.runner.allocDir.Destroy() } +// diskMigrationHook migrates ephemeral disk volumes. Depends on alloc dir +// being built but must be run before anything else manipulates the alloc dir. +type diskMigrationHook struct { + allocDir *allocdir.AllocDir + allocWatcher allocwatcher.PrevAllocWatcher + logger log.Logger +} + +func newDiskMigrationHook(logger log.Logger, allocWatcher allocwatcher.PrevAllocWatcher, allocDir *allocdir.AllocDir) *diskMigrationHook { + h := &diskMigrationHook{ + allocDir: allocDir, + allocWatcher: allocWatcher, + } + h.logger = logger.Named(h.Name()) + return h +} + +func (h *diskMigrationHook) Name() string { + return "migrate_disk" +} + +func (h *diskMigrationHook) Prerun(ctx context.Context) error { + // Wait for a previous alloc - if any - to terminate + if err := h.allocWatcher.Wait(ctx); err != nil { + return err + } + + // Wait for data to be migrated from a previous alloc if applicable + if err := h.allocWatcher.Migrate(ctx, h.allocDir); err != nil { + if err == context.Canceled { + return err + } + + // Soft-fail on migration errors + h.logger.Warn("error migrating data from previous alloc", "error", err) + + // Recreate alloc dir to ensure a clean slate + h.allocDir.Destroy() + if err := h.allocDir.Build(); err != nil { + return fmt.Errorf("failed to clean task directories after failed migration: %v", err) + } + } + + return nil +} + // TODO type allocHealthWatcherHook struct { runner *allocRunner diff --git a/client/allocrunnerv2/config.go b/client/allocrunnerv2/config.go index 6d58652995f5..50b8a672a132 100644 --- a/client/allocrunnerv2/config.go +++ b/client/allocrunnerv2/config.go @@ -2,6 +2,7 @@ package allocrunnerv2 import ( log "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/client/allocwatcher" clientconfig "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/consul" "github.com/hashicorp/nomad/client/interfaces" @@ -32,4 +33,8 @@ type Config struct { // StateUpdater is used to emit updated task state StateUpdater interfaces.AllocStateHandler + + // PrevAllocWatcher handles waiting on previous allocations and + // migrating their ephemeral disk when necessary. + PrevAllocWatcher allocwatcher.PrevAllocWatcher } diff --git a/client/allocrunnerv2/interfaces/runner_lifecycle.go b/client/allocrunnerv2/interfaces/runner_lifecycle.go index 5ccece2e8692..63164dce9aa4 100644 --- a/client/allocrunnerv2/interfaces/runner_lifecycle.go +++ b/client/allocrunnerv2/interfaces/runner_lifecycle.go @@ -1,6 +1,8 @@ package interfaces import ( + "context" + "github.com/hashicorp/nomad/client/allocrunnerv2/state" ) @@ -11,7 +13,7 @@ type RunnerHook interface { type RunnerPrerunHook interface { RunnerHook - Prerun() error + Prerun(context.Context) error } type RunnerPostrunHook interface { diff --git a/client/allocrunnerv2/taskrunner/lifecycle.go b/client/allocrunnerv2/taskrunner/lifecycle.go index 9de87f528425..521c0157d9d8 100644 --- a/client/allocrunnerv2/taskrunner/lifecycle.go +++ b/client/allocrunnerv2/taskrunner/lifecycle.go @@ -55,6 +55,8 @@ func (tr *TaskRunner) Signal(event *structs.TaskEvent, s os.Signal) error { return handle.Signal(s) } +// Kill a task. Blocks until task exits or context is canceled. State is set to +// dead. func (tr *TaskRunner) Kill(ctx context.Context, event *structs.TaskEvent) error { // Grab the handle handle := tr.getDriverHandle() diff --git a/client/allocrunnerv2/taskrunner/task_runner.go b/client/allocrunnerv2/taskrunner/task_runner.go index 9844bb28e8f0..634a4f804abb 100644 --- a/client/allocrunnerv2/taskrunner/task_runner.go +++ b/client/allocrunnerv2/taskrunner/task_runner.go @@ -333,11 +333,21 @@ func (tr *TaskRunner) handleUpdates() { for { select { case <-tr.triggerUpdateCh: - // Update triggered; run hooks - tr.updateHooks() case <-tr.waitCh: return } + + if tr.Alloc().TerminalStatus() { + // Terminal update: kill TaskRunner and let Run execute postrun hooks + err := tr.Kill(context.TODO(), structs.NewTaskEvent(structs.TaskKilled)) + if err != nil { + tr.logger.Warn("error stopping task", "error", err) + } + continue + } + + // Non-terminal update; run hooks + tr.updateHooks() } } diff --git a/client/allocrunner/alloc_watcher.go b/client/allocwatcher/alloc_watcher.go similarity index 95% rename from client/allocrunner/alloc_watcher.go rename to client/allocwatcher/alloc_watcher.go index 1cf47bd67d77..34774e85bbfe 100644 --- a/client/allocrunner/alloc_watcher.go +++ b/client/allocwatcher/alloc_watcher.go @@ -1,4 +1,4 @@ -package allocrunner +package allocwatcher import ( "archive/tar" @@ -38,10 +38,18 @@ type terminated interface { Terminated() bool } -// prevAllocWatcher allows AllocRunners to wait for a previous allocation to +// AllocRunnerMeta provides metadata about an AllocRunner such as its alloc and +// alloc dir. +type AllocRunnerMeta interface { + GetAllocDir() *allocdir.AllocDir + Listener() *cstructs.AllocListener + Alloc() *structs.Allocation +} + +// PrevAllocWatcher allows AllocRunners to wait for a previous allocation to // terminate and migrate its data whether or not the previous allocation is // local or remote. -type prevAllocWatcher interface { +type PrevAllocWatcher interface { // Wait for previous alloc to terminate Wait(context.Context) error @@ -55,10 +63,10 @@ type prevAllocWatcher interface { IsMigrating() bool } -// NewAllocWatcher creates a prevAllocWatcher appropriate for whether this +// NewAllocWatcher creates a PrevAllocWatcher appropriate for whether this // alloc's previous allocation was local or remote. If this alloc has no // previous alloc then a noop implementation is returned. -func NewAllocWatcher(alloc *structs.Allocation, prevAR *AllocRunner, rpc rpcer, config *config.Config, l *log.Logger, migrateToken string) prevAllocWatcher { +func NewAllocWatcher(alloc *structs.Allocation, prevAR AllocRunnerMeta, rpc rpcer, config *config.Config, l *log.Logger, migrateToken string) PrevAllocWatcher { if alloc.PreviousAllocation == "" { // No previous allocation, use noop transitioner return NoopPrevAlloc{} @@ -74,8 +82,7 @@ func NewAllocWatcher(alloc *structs.Allocation, prevAR *AllocRunner, rpc rpcer, tasks: tg.Tasks, sticky: tg.EphemeralDisk != nil && tg.EphemeralDisk.Sticky, prevAllocDir: prevAR.GetAllocDir(), - prevListener: prevAR.GetListener(), - prevWaitCh: prevAR.WaitCh(), + prevListener: prevAR.Listener(), prevStatus: prevAR.Alloc(), logger: l, } @@ -118,10 +125,6 @@ type localPrevAlloc struct { // terminated (and therefore won't send updates to the listener) prevStatus terminated - // prevWaitCh is closed when the previous alloc is garbage collected - // which is a failsafe against blocking the new alloc forever - prevWaitCh <-chan struct{} - // waiting and migrating are true when alloc runner is waiting on the // prevAllocWatcher. Writers must acquire the waitingLock and readers // should use the helper methods IsWaiting and IsMigrating. @@ -161,11 +164,6 @@ func (p *localPrevAlloc) Wait(ctx context.Context) error { defer p.prevListener.Close() - if p.prevStatus.Terminated() { - // Fast path - previous alloc already terminated! - return nil - } - // Block until previous alloc exits p.logger.Printf("[DEBUG] client: alloc %q waiting for previous alloc %q to terminate", p.allocID, p.prevAllocID) for { @@ -174,8 +172,6 @@ func (p *localPrevAlloc) Wait(ctx context.Context) error { if !ok || prevAlloc.Terminated() { return nil } - case <-p.prevWaitCh: - return nil case <-ctx.Done(): return ctx.Err() } diff --git a/client/allocrunner/alloc_watcher_test.go b/client/allocwatcher/alloc_watcher_test.go similarity index 58% rename from client/allocrunner/alloc_watcher_test.go rename to client/allocwatcher/alloc_watcher_test.go index 907f4a521e08..64509b03f82d 100644 --- a/client/allocrunner/alloc_watcher_test.go +++ b/client/allocwatcher/alloc_watcher_test.go @@ -1,4 +1,4 @@ -package allocrunner +package allocwatcher import ( "archive/tar" @@ -14,26 +14,118 @@ import ( "testing" "time" + hclog "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/client/allocdir" - "github.com/hashicorp/nomad/client/testutil" + cstructs "github.com/hashicorp/nomad/client/structs" + ctestutil "github.com/hashicorp/nomad/client/testutil" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/testutil" + "github.com/stretchr/testify/require" ) +// fakeAllocRunner implements AllocRunnerMeta +type fakeAllocRunner struct { + alloc *structs.Allocation + AllocDir *allocdir.AllocDir + Broadcaster *cstructs.AllocBroadcaster +} + +// newFakeAllocRunner creates a new AllocRunnerMeta. Callers must call +// AllocDir.Destroy() when finished. +func newFakeAllocRunner(t *testing.T, logger hclog.Logger) *fakeAllocRunner { + alloc := mock.Alloc() + alloc.Job.TaskGroups[0].EphemeralDisk.Sticky = true + alloc.Job.TaskGroups[0].EphemeralDisk.Migrate = true + + path, err := ioutil.TempDir("", "nomad_test_wathcer") + require.NoError(t, err) + + return &fakeAllocRunner{ + alloc: alloc, + AllocDir: allocdir.NewAllocDir(logger, path), + Broadcaster: cstructs.NewAllocBroadcaster(alloc), + } +} + +func (f *fakeAllocRunner) GetAllocDir() *allocdir.AllocDir { + return f.AllocDir +} + +func (f *fakeAllocRunner) Listener() *cstructs.AllocListener { + return f.Broadcaster.Listen() +} + +func (f *fakeAllocRunner) Alloc() *structs.Allocation { + return f.alloc +} + +// newConfig returns a new Config and cleanup func +func newConfig(t *testing.T) (Config, func()) { + logger := testlog.HCLogger(t) + + prevAR := newFakeAllocRunner(t, logger) + + alloc := mock.Alloc() + alloc.PreviousAllocation = prevAR.Alloc().ID + alloc.Job.TaskGroups[0].EphemeralDisk.Sticky = true + alloc.Job.TaskGroups[0].EphemeralDisk.Migrate = true + + config := Config{ + Alloc: alloc, + PreviousRunner: prevAR, + RPC: nil, + Config: nil, + MigrateToken: "fake_token", + Logger: logger, + } + + cleanup := func() { + prevAR.AllocDir.Destroy() + } + + return config, cleanup +} + +// TestPrevAlloc_Noop asserts that when no previous allocation is set the noop +// implementation is returned that does not block or perform migrations. +func TestPrevAlloc_Noop(t *testing.T) { + conf, cleanup := newConfig(t) + defer cleanup() + + conf.Alloc.PreviousAllocation = "" + + watcher := NewAllocWatcher(conf) + require.NotNil(t, watcher) + _, ok := watcher.(NoopPrevAlloc) + require.True(t, ok, "expected watcher to be NoopPrevAlloc") + + done := make(chan int, 2) + go func() { + watcher.Wait(context.Background()) + done <- 1 + watcher.Migrate(context.Background(), nil) + done <- 1 + }() + require.False(t, watcher.IsWaiting()) + require.False(t, watcher.IsMigrating()) + <-done + <-done +} + // TestPrevAlloc_LocalPrevAlloc asserts that when a previous alloc runner is // set a localPrevAlloc will block on it. func TestPrevAlloc_LocalPrevAlloc(t *testing.T) { - _, prevAR := TestAllocRunner(t, false) - prevAR.alloc.Job.TaskGroups[0].Tasks[0].Config["run_for"] = "10s" + t.Parallel() + conf, cleanup := newConfig(t) + + defer cleanup() - newAlloc := mock.Alloc() - newAlloc.PreviousAllocation = prevAR.Alloc().ID - newAlloc.Job.TaskGroups[0].EphemeralDisk.Sticky = false - task := newAlloc.Job.TaskGroups[0].Tasks[0] - task.Driver = "mock_driver" - task.Config["run_for"] = "500ms" + conf.Alloc.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver" + conf.Alloc.Job.TaskGroups[0].Tasks[0].Config["run_for"] = "500ms" - waiter := NewAllocWatcher(newAlloc, prevAR, nil, nil, testlog.Logger(t), "") + waiter := NewAllocWatcher(conf) // Wait in a goroutine with a context to make sure it exits at the right time ctx, cancel := context.WithCancel(context.Background()) @@ -43,39 +135,55 @@ func TestPrevAlloc_LocalPrevAlloc(t *testing.T) { waiter.Wait(ctx) }() - select { - case <-ctx.Done(): - t.Fatalf("Wait exited too early") - case <-time.After(33 * time.Millisecond): - // Good! It's blocking - } + // Assert watcher is waiting + testutil.WaitForResult(func() (bool, error) { + return waiter.IsWaiting(), fmt.Errorf("expected watcher to be waiting") + }, func(err error) { + t.Fatalf("error: %v", err) + }) - // Start the previous allocs to cause it to update but not terminate - go prevAR.Run() - defer prevAR.Destroy() + // Broadcast a non-terminal alloc update to assert only terminal + // updates break out of waiting. + update := conf.PreviousRunner.Alloc().Copy() + update.DesiredStatus = structs.AllocDesiredStatusStop + update.ModifyIndex++ + update.AllocModifyIndex++ + + broadcaster := conf.PreviousRunner.(*fakeAllocRunner).Broadcaster + err := broadcaster.Send(update) + require.NoError(t, err) + + // Assert watcher is still waiting because alloc isn't terminal + testutil.WaitForResult(func() (bool, error) { + return waiter.IsWaiting(), fmt.Errorf("expected watcher to be waiting") + }, func(err error) { + t.Fatalf("error: %v", err) + }) - select { - case <-ctx.Done(): - t.Fatalf("Wait exited too early") - case <-time.After(33 * time.Millisecond): - // Good! It's still blocking - } + // Stop the previous alloc and assert watcher stops blocking + update = update.Copy() + update.DesiredStatus = structs.AllocDesiredStatusStop + update.ClientStatus = structs.AllocClientStatusComplete + update.ModifyIndex++ + update.AllocModifyIndex++ - // Stop the previous alloc - prevAR.Destroy() + err = broadcaster.Send(update) + require.NoError(t, err) - select { - case <-ctx.Done(): - // Good! We unblocked when the previous alloc stopped - case <-time.After(time.Second): - t.Fatalf("Wait exited too early") - } + testutil.WaitForResult(func() (bool, error) { + if waiter.IsWaiting() { + return false, fmt.Errorf("did not expect watcher to be waiting") + } + return !waiter.IsMigrating(), fmt.Errorf("did not expect watcher to be migrating") + }, func(err error) { + t.Fatalf("error: %v", err) + }) } // TestPrevAlloc_StreamAllocDir_Ok asserts that streaming a tar to an alloc dir // works. func TestPrevAlloc_StreamAllocDir_Ok(t *testing.T) { - testutil.RequireRoot(t) + ctestutil.RequireRoot(t) t.Parallel() dir, err := ioutil.TempDir("", "") if err != nil { @@ -178,7 +286,7 @@ func TestPrevAlloc_StreamAllocDir_Ok(t *testing.T) { defer os.RemoveAll(dir1) rc := ioutil.NopCloser(buf) - prevAlloc := &remotePrevAlloc{logger: testlog.Logger(t)} + prevAlloc := &remotePrevAlloc{logger: testlog.HCLogger(t)} if err := prevAlloc.streamAllocDir(context.Background(), rc, dir1); err != nil { t.Fatalf("err: %v", err) } @@ -228,7 +336,7 @@ func TestPrevAlloc_StreamAllocDir_Error(t *testing.T) { // This test only unit tests streamAllocDir so we only need a partially // complete remotePrevAlloc prevAlloc := &remotePrevAlloc{ - logger: testlog.Logger(t), + logger: testlog.HCLogger(t), allocID: "123", prevAllocID: "abc", migrate: true, diff --git a/client/allocwatcher/doc.go b/client/allocwatcher/doc.go new file mode 100644 index 000000000000..0580784c9b3a --- /dev/null +++ b/client/allocwatcher/doc.go @@ -0,0 +1,4 @@ +// Package allocwatcher allows blocking until another allocation - whether +// running locally or remotely - completes and migrates the allocation +// directory if necessary. +package allocwatcher diff --git a/client/client.go b/client/client.go index 9bbb8fb6596a..0b31c9e2e4d8 100644 --- a/client/client.go +++ b/client/client.go @@ -30,6 +30,7 @@ import ( "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/allocrunner" "github.com/hashicorp/nomad/client/allocrunnerv2" + "github.com/hashicorp/nomad/client/allocwatcher" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/servers" "github.com/hashicorp/nomad/client/state" @@ -103,9 +104,11 @@ type ClientStatsReporter interface { type AllocRunner interface { StatsReporter() allocrunner.AllocStatsReporter Destroy() + GetAllocDir() *allocdir.AllocDir IsDestroyed() bool IsWaiting() bool IsMigrating() bool + Listener() *cstructs.AllocListener WaitCh() <-chan struct{} Update(*structs.Allocation) Alloc() *structs.Allocation @@ -1913,18 +1916,10 @@ func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error return err } - //FIXME disabled previous alloc waiting/migrating - // get the previous alloc runner - if one exists - for the - // blocking/migrating watcher - /* - var prevAR *allocrunner.AllocRunner - if alloc.PreviousAllocation != "" { - prevAR = c.allocs[alloc.PreviousAllocation] - } - - c.configLock.RLock() - prevAlloc := allocrunner.NewAllocWatcher(alloc, prevAR, c, c.configCopy, c.logger, migrateToken) - */ + // Since only the Client has access to other AllocRunners and the RPC + // client, create the previous allocation watcher here. + prevAlloc := c.allocs[alloc.PreviousAllocation] + prevAllocWatcher := allocwatcher.NewAllocWatcher(alloc, prevAlloc, c, c.configCopy, c.logger, migrateToken) // Copy the config since the node can be swapped out as it is being updated. // The long term fix is to pass in the config and node separately and then @@ -1938,13 +1933,14 @@ func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error c.configLock.RLock() arConf := &allocrunnerv2.Config{ - Alloc: alloc, - Logger: logger, - ClientConfig: c.config, - StateDB: c.stateDB, - Consul: c.consulService, - Vault: c.vaultClient, - StateUpdater: c, + Alloc: alloc, + Logger: logger, + ClientConfig: c.config, + StateDB: c.stateDB, + Consul: c.consulService, + Vault: c.vaultClient, + StateUpdater: c, + PrevAllocWatcher: prevAllocWatcher, } c.configLock.RUnlock() diff --git a/client/state/interface.go b/client/state/interface.go index 59d6bd08bf48..952620422956 100644 --- a/client/state/interface.go +++ b/client/state/interface.go @@ -12,5 +12,7 @@ type StateDB interface { GetTaskRunnerState(allocID, taskName string) (*state.LocalState, *structs.TaskState, error) PutTaskRunnerLocalState(allocID, taskName string, val interface{}) error PutTaskState(allocID, taskName string, state *structs.TaskState) error + DeleteTaskBucket(allocID, taskName string) error + DeleteAllocationBucket(allocID string) error Close() error } diff --git a/client/state/noopdb.go b/client/state/noopdb.go index 7a31c461ffd6..65d105fd3d22 100644 --- a/client/state/noopdb.go +++ b/client/state/noopdb.go @@ -27,6 +27,14 @@ func (n noopDB) PutTaskState(allocID string, taskName string, state *structs.Tas return nil } +func (n noopDB) DeleteTaskBucket(allocID, taskName string) error { + return nil +} + +func (n noopDB) DeleteAllocationBucket(allocID string) error { + return nil +} + func (n noopDB) Close() error { return nil } diff --git a/client/structs/funcs.go b/client/structs/funcs.go index ec6d5421c2c5..021e95d2dcdd 100644 --- a/client/structs/funcs.go +++ b/client/structs/funcs.go @@ -1,27 +1,44 @@ package structs import ( + "errors" "sync" "github.com/hashicorp/nomad/nomad/structs" ) -// AllocBroadcaster implements an allocation broadcast channel. -// The zero value is a usable unbuffered channel. +const ( + // listenerCap is the capacity of the listener chans. Must be exactly 1 + // to prevent Sends from blocking and allows them to pop old pending + // updates from the chan before enqueueing the latest update. + listenerCap = 1 +) + +var ErrAllocBroadcasterClosed = errors.New("alloc broadcaster closed") + +// AllocBroadcaster implements an allocation broadcast channel where each +// listener receives allocation updates. Pending updates are dropped and +// replaced by newer allocation updates, so listeners may not receive every +// allocation update. However this ensures Sends never block and listeners only +// receive the latest allocation update -- never a stale version. type AllocBroadcaster struct { m sync.Mutex - listeners map[int]chan<- *structs.Allocation // lazy init + alloc *structs.Allocation + listeners map[int]chan *structs.Allocation // lazy init nextId int - capacity int closed bool } -// NewAllocBroadcaster returns a new AllocBroadcaster with the given capacity (0 means unbuffered). -func NewAllocBroadcaster(n int) *AllocBroadcaster { - return &AllocBroadcaster{capacity: n} +// NewAllocBroadcaster returns a new AllocBroadcaster with the given initial +// allocation. +func NewAllocBroadcaster(initial *structs.Allocation) *AllocBroadcaster { + return &AllocBroadcaster{ + alloc: initial, + } } -// AllocListener implements a listening endpoint for an allocation broadcast channel. +// AllocListener implements a listening endpoint for an allocation broadcast +// channel. type AllocListener struct { // Ch receives the broadcast messages. Ch <-chan *structs.Allocation @@ -29,27 +46,33 @@ type AllocListener struct { id int } -// Send broadcasts a message to the channel. Send returns whether the message -// was sent to all channels. -func (b *AllocBroadcaster) Send(v *structs.Allocation) bool { +// Send broadcasts an allocation update. Any pending updates are replaced with +// this version of the allocation to prevent blocking on slow receivers. +func (b *AllocBroadcaster) Send(v *structs.Allocation) error { b.m.Lock() defer b.m.Unlock() if b.closed { - return false + return ErrAllocBroadcasterClosed } - sent := true + + // Update alloc on broadcaster to send to newly created listeners + b.alloc = v + + // Send alloc to already created listeners for _, l := range b.listeners { select { case l <- v: - default: - sent = false + case <-l: + // Pop pending update and replace with new update + l <- v } } - return sent + return nil } -// Close closes the channel, disabling the sending of further messages. +// Close closes the channel, disabling the sending of further allocation +// updates. func (b *AllocBroadcaster) Close() { b.m.Lock() defer b.m.Unlock() @@ -57,6 +80,7 @@ func (b *AllocBroadcaster) Close() { return } + b.alloc = nil b.closed = true for _, l := range b.listeners { close(l) @@ -68,16 +92,25 @@ func (b *AllocBroadcaster) Listen() *AllocListener { b.m.Lock() defer b.m.Unlock() if b.listeners == nil { - b.listeners = make(map[int]chan<- *structs.Allocation) + b.listeners = make(map[int]chan *structs.Allocation) } + for b.listeners[b.nextId] != nil { b.nextId++ } - ch := make(chan *structs.Allocation, b.capacity) + + ch := make(chan *structs.Allocation, listenerCap) + if b.closed { + // Broadcaster is already closed, close this listener close(ch) + } else { + // Send the current allocation to the listener + ch <- b.alloc } + b.listeners[b.nextId] = ch + return &AllocListener{ch, b, b.nextId} }