Skip to content

Commit

Permalink
implement stopping, destroying, and disk migration
Browse files Browse the repository at this point in the history
* Stopping an alloc is implemented via Updates but update hooks are
  *not* run.
* Destroying an alloc is a best effort cleanup.
* AllocRunner destroy hooks implemented.
* Disk migration and blocking on a previous allocation exiting moved to
  its own package to avoid cycles. Now only depends on alloc broadcaster
  instead of also using a waitch.
* AllocBroadcaster now only drops stale allocations and always keeps the
  latest version.
* Made AllocDir safe for concurrent use

Lots of internal contexts that are currently unused. Unsure if they
should be used or removed.
  • Loading branch information
schmichael committed Oct 16, 2018
1 parent de54261 commit c95155d
Show file tree
Hide file tree
Showing 16 changed files with 490 additions and 167 deletions.
30 changes: 29 additions & 1 deletion client/allocdir/alloc_dir.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"log"
"os"
"path/filepath"
"sync"
"time"

"github.com/hashicorp/go-multierror"
Expand Down Expand Up @@ -58,6 +59,8 @@ var (
TaskDirs = map[string]os.FileMode{TmpDirName: os.ModeSticky | 0777}
)

// AllocDir allows creating, destroying, and accessing an allocation's
// directory. All methods are safe for concurrent use.
type AllocDir struct {
// AllocDir is the directory used for storing any state
// of this allocation. It will be purged on alloc destroy.
Expand All @@ -73,6 +76,8 @@ type AllocDir struct {
// built is true if Build has successfully run
built bool

mu sync.RWMutex

logger *log.Logger
}

Expand Down Expand Up @@ -100,6 +105,9 @@ func NewAllocDir(logger *log.Logger, allocDir string) *AllocDir {
// Copy an AllocDir and all of its TaskDirs. Returns nil if AllocDir is
// nil.
func (d *AllocDir) Copy() *AllocDir {
d.mu.RLock()
defer d.mu.RUnlock()

if d == nil {
return nil
}
Expand All @@ -117,6 +125,9 @@ func (d *AllocDir) Copy() *AllocDir {

// NewTaskDir creates a new TaskDir and adds it to the AllocDirs TaskDirs map.
func (d *AllocDir) NewTaskDir(name string) *TaskDir {
d.mu.Lock()
defer d.mu.Unlock()

td := newTaskDir(d.logger, d.AllocDir, name)
d.TaskDirs[name] = td
return td
Expand All @@ -129,6 +140,9 @@ func (d *AllocDir) NewTaskDir(name string) *TaskDir {
// file "NOMAD-${ALLOC_ID}-ERROR.log" will be appended to the tar with the
// error message as the contents.
func (d *AllocDir) Snapshot(w io.Writer) error {
d.mu.RLock()
defer d.mu.RUnlock()

allocDataDir := filepath.Join(d.SharedDir, SharedDataDir)
rootPaths := []string{allocDataDir}
for _, taskdir := range d.TaskDirs {
Expand Down Expand Up @@ -206,11 +220,16 @@ func (d *AllocDir) Snapshot(w io.Writer) error {

// Move other alloc directory's shared path and local dir to this alloc dir.
func (d *AllocDir) Move(other *AllocDir, tasks []*structs.Task) error {
d.mu.RLock()
if !d.built {
// Enforce the invariant that Build is called before Move
d.mu.RUnlock()
return fmt.Errorf("unable to move to %q - alloc dir is not built", d.AllocDir)
}

// Moving is slow and only reads immutable fields, so unlock during heavy IO
d.mu.RUnlock()

// Move the data directory
otherDataDir := filepath.Join(other.SharedDir, SharedDataDir)
dataDir := filepath.Join(d.SharedDir, SharedDataDir)
Expand Down Expand Up @@ -246,7 +265,6 @@ func (d *AllocDir) Move(other *AllocDir, tasks []*structs.Task) error {

// Tears down previously build directory structure.
func (d *AllocDir) Destroy() error {

// Unmount all mounted shared alloc dirs.
var mErr multierror.Error
if err := d.UnmountAll(); err != nil {
Expand All @@ -258,12 +276,17 @@ func (d *AllocDir) Destroy() error {
}

// Unset built since the alloc dir has been destroyed.
d.mu.Lock()
d.built = false
d.mu.Unlock()
return mErr.ErrorOrNil()
}

// UnmountAll linked/mounted directories in task dirs.
func (d *AllocDir) UnmountAll() error {
d.mu.RLock()
defer d.mu.RUnlock()

var mErr multierror.Error
for _, dir := range d.TaskDirs {
// Check if the directory has the shared alloc mounted.
Expand Down Expand Up @@ -322,7 +345,9 @@ func (d *AllocDir) Build() error {
}

// Mark as built
d.mu.Lock()
d.built = true
d.mu.Unlock()
return nil
}

Expand Down Expand Up @@ -386,11 +411,14 @@ func (d *AllocDir) ReadAt(path string, offset int64) (io.ReadCloser, error) {
p := filepath.Join(d.AllocDir, path)

// Check if it is trying to read into a secret directory
d.mu.RLock()
for _, dir := range d.TaskDirs {
if filepath.HasPrefix(p, dir.SecretsDir) {
d.mu.RUnlock()
return nil, fmt.Errorf("Reading secret file prohibited: %s", path)
}
}
d.mu.RUnlock()

f, err := os.Open(p)
if err != nil {
Expand Down
45 changes: 23 additions & 22 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/allocrunner/taskrunner"
"github.com/hashicorp/nomad/client/allocwatcher"
"github.com/hashicorp/nomad/client/config"
consulApi "github.com/hashicorp/nomad/client/consul"
"github.com/hashicorp/nomad/client/vaultclient"
Expand Down Expand Up @@ -77,7 +78,7 @@ type AllocRunner struct {
// the migrates it data. If sticky volumes aren't used and there's no
// previous allocation a noop implementation is used so it always safe
// to call.
prevAlloc prevAllocWatcher
prevAlloc allocwatcher.PrevAllocWatcher

// ctx is cancelled with exitFn to cause the alloc to be destroyed
// (stopped and GC'd).
Expand Down Expand Up @@ -133,26 +134,26 @@ type allocRunnerMutableState struct {
// NewAllocRunner is used to create a new allocation context
func NewAllocRunner(logger *log.Logger, config *config.Config, stateDB *bolt.DB, updater AllocStateUpdater,
alloc *structs.Allocation, vaultClient vaultclient.VaultClient, consulClient consulApi.ConsulServiceAPI,
prevAlloc prevAllocWatcher) *AllocRunner {
prevAlloc allocwatcher.PrevAllocWatcher) *AllocRunner {

ar := &AllocRunner{
config: config,
stateDB: stateDB,
updater: updater,
logger: logger,
alloc: alloc,
allocID: alloc.ID,
allocBroadcast: cstructs.NewAllocBroadcaster(8),
prevAlloc: prevAlloc,
dirtyCh: make(chan struct{}, 1),
allocDir: allocdir.NewAllocDir(logger, filepath.Join(config.AllocDir, alloc.ID)),
tasks: make(map[string]*taskrunner.TaskRunner),
taskStates: copyTaskStates(alloc.TaskStates),
restored: make(map[string]struct{}),
updateCh: make(chan *structs.Allocation, 64),
waitCh: make(chan struct{}),
vaultClient: vaultClient,
consulClient: consulClient,
config: config,
stateDB: stateDB,
updater: updater,
logger: logger,
alloc: alloc,
allocID: alloc.ID,
//allocBroadcast: cstructs.NewAllocBroadcaster(8),
prevAlloc: prevAlloc,
dirtyCh: make(chan struct{}, 1),
allocDir: allocdir.NewAllocDir(logger, filepath.Join(config.AllocDir, alloc.ID)),
tasks: make(map[string]*taskrunner.TaskRunner),
taskStates: copyTaskStates(alloc.TaskStates),
restored: make(map[string]struct{}),
updateCh: make(chan *structs.Allocation, 64),
waitCh: make(chan struct{}),
vaultClient: vaultClient,
consulClient: consulClient,
}

// TODO Should be passed a context
Expand Down Expand Up @@ -612,9 +613,9 @@ func (r *AllocRunner) sendBroadcast(alloc *structs.Allocation) {
// Try to send the alloc up to three times with a delay to allow recovery.
sent := false
for i := 0; i < 3; i++ {
if sent = r.allocBroadcast.Send(alloc); sent {
break
}
//if sent = r.allocBroadcast.Send(alloc); sent {
// break
//}
time.Sleep(500 * time.Millisecond)
}
if !sent {
Expand Down
3 changes: 2 additions & 1 deletion client/allocrunner/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"

"github.com/boltdb/bolt"
"github.com/hashicorp/nomad/client/allocwatcher"
"github.com/hashicorp/nomad/client/config"
consulApi "github.com/hashicorp/nomad/client/consul"
"github.com/hashicorp/nomad/client/vaultclient"
Expand Down Expand Up @@ -51,7 +52,7 @@ func TestAllocRunnerFromAlloc(t *testing.T, alloc *structs.Allocation, restarts
alloc.Job.Type = structs.JobTypeBatch
}
vclient := vaultclient.NewMockVaultClient()
ar := NewAllocRunner(testlog.Logger(t), conf, db, upd.Update, alloc, vclient, consulApi.NewMockConsulServiceClient(t, testlog.HCLogger(t)), NoopPrevAlloc{})
ar := NewAllocRunner(testlog.Logger(t), conf, db, upd.Update, alloc, vclient, consulApi.NewMockConsulServiceClient(t), allocwatcher.NoopPrevAlloc{})
return upd, ar
}

Expand Down
Loading

0 comments on commit c95155d

Please sign in to comment.