Skip to content

Commit

Permalink
Merge pull request #4963 from hashicorp/dani/f-preempt-alloc-wait
Browse files Browse the repository at this point in the history
client: Wait for preemptions to terminate
  • Loading branch information
endocrimes committed Dec 11, 2018
2 parents 5306b1e + 971586d commit 0a2fba0
Show file tree
Hide file tree
Showing 11 changed files with 390 additions and 50 deletions.
10 changes: 7 additions & 3 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,13 @@ type allocRunner struct {
// allocBroadcaster sends client allocation updates to all listeners
allocBroadcaster *cstructs.AllocBroadcaster

// prevAllocWatcher allows waiting for a previous allocation to exit
// and if necessary migrate its alloc dir.
// prevAllocWatcher allows waiting for any previous or preempted allocations
// to exit
prevAllocWatcher allocwatcher.PrevAllocWatcher

// prevAllocMigrator allows the migration of a previous allocations alloc dir.
prevAllocMigrator allocwatcher.PrevAllocMigrator

// pluginSingletonLoader is a plugin loader that will returns singleton
// instances of the plugins.
pluginSingletonLoader loader.PluginCatalog
Expand Down Expand Up @@ -134,6 +137,7 @@ func NewAllocRunner(config *Config) (*allocRunner, error) {
taskStateUpdateHandlerCh: make(chan struct{}),
deviceStatsReporter: config.DeviceStatsReporter,
prevAllocWatcher: config.PrevAllocWatcher,
prevAllocMigrator: config.PrevAllocMigrator,
pluginSingletonLoader: config.PluginSingletonLoader,
devicemanager: config.DeviceManager,
}
Expand Down Expand Up @@ -713,7 +717,7 @@ func (ar *allocRunner) Shutdown() {
//
// This method is safe for calling concurrently with Run().
func (ar *allocRunner) IsMigrating() bool {
return ar.prevAllocWatcher.IsMigrating()
return ar.prevAllocMigrator.IsMigrating()
}

func (ar *allocRunner) StatsReporter() interfaces.AllocStatsReporter {
Expand Down
3 changes: 2 additions & 1 deletion client/allocrunner/alloc_runner_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ func (ar *allocRunner) initRunnerHooks() {
// directory path exists for other hooks.
ar.runnerHooks = []interfaces.RunnerHook{
newAllocDirHook(hookLogger, ar.allocDir),
newDiskMigrationHook(hookLogger, ar.prevAllocWatcher, ar.allocDir),
newUpstreamAllocsHook(hookLogger, ar.prevAllocWatcher),
newDiskMigrationHook(hookLogger, ar.prevAllocMigrator, ar.allocDir),
newAllocHealthWatcherHook(hookLogger, ar.Alloc(), hs, ar.Listener(), ar.consulClient),
}
}
Expand Down
8 changes: 5 additions & 3 deletions client/allocrunner/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@ type Config struct {
// StateUpdater is used to emit updated task state
StateUpdater interfaces.AllocStateHandler

// deviceStatsReporter is used to lookup resource usage for alloc devices
// DeviceStatsReporter is used to lookup resource usage for alloc devices
DeviceStatsReporter interfaces.DeviceStatsReporter

// PrevAllocWatcher handles waiting on previous allocations and
// migrating their ephemeral disk when necessary.
// PrevAllocWatcher handles waiting on previous or preempted allocations
PrevAllocWatcher allocwatcher.PrevAllocWatcher

// PrevAllocMigrator allows the migration of a previous allocations alloc dir
PrevAllocMigrator allocwatcher.PrevAllocMigrator

// PluginLoader is used to load plugins.
PluginLoader loader.PluginCatalog

Expand Down
4 changes: 2 additions & 2 deletions client/allocrunner/migrate_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ import (
// being built but must be run before anything else manipulates the alloc dir.
type diskMigrationHook struct {
allocDir *allocdir.AllocDir
allocWatcher allocwatcher.PrevAllocWatcher
allocWatcher allocwatcher.PrevAllocMigrator
logger log.Logger
}

func newDiskMigrationHook(logger log.Logger, allocWatcher allocwatcher.PrevAllocWatcher, allocDir *allocdir.AllocDir) *diskMigrationHook {
func newDiskMigrationHook(logger log.Logger, allocWatcher allocwatcher.PrevAllocMigrator, allocDir *allocdir.AllocDir) *diskMigrationHook {
h := &diskMigrationHook{
allocDir: allocDir,
allocWatcher: allocWatcher,
Expand Down
1 change: 1 addition & 0 deletions client/allocrunner/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func testAllocRunnerConfig(t *testing.T, alloc *structs.Allocation) (*Config, fu
Vault: vaultclient.NewMockVaultClient(),
StateUpdater: &MockStateUpdater{},
PrevAllocWatcher: allocwatcher.NoopPrevAlloc{},
PrevAllocMigrator: allocwatcher.NoopPrevAlloc{},
PluginSingletonLoader: singleton.NewSingletonLoader(clientConf.Logger, pluginLoader),
DeviceManager: devicemanager.NoopMockManager(),
}
Expand Down
32 changes: 32 additions & 0 deletions client/allocrunner/upstream_allocs_hook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package allocrunner

import (
"context"

log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocwatcher"
)

// upstreamAllocsHook waits for a PrevAllocWatcher to exit before allowing
// an allocation to be executed
type upstreamAllocsHook struct {
allocWatcher allocwatcher.PrevAllocWatcher
logger log.Logger
}

func newUpstreamAllocsHook(logger log.Logger, allocWatcher allocwatcher.PrevAllocWatcher) *upstreamAllocsHook {
h := &upstreamAllocsHook{
allocWatcher: allocWatcher,
}
h.logger = logger.Named(h.Name())
return h
}

func (h *upstreamAllocsHook) Name() string {
return "await_previous_allocations"
}

func (h *upstreamAllocsHook) Prerun(ctx context.Context) error {
// Wait for a previous alloc - if any - to terminate
return h.allocWatcher.Wait(ctx)
}
115 changes: 88 additions & 27 deletions client/allocwatcher/alloc_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,31 +47,40 @@ type AllocRunnerMeta interface {
}

// PrevAllocWatcher allows AllocRunners to wait for a previous allocation to
// terminate and migrate its data whether or not the previous allocation is
// local or remote.
// terminate whether or not the previous allocation is local or remote.
// See `PrevAllocMigrator` for migrating workloads.
type PrevAllocWatcher interface {
// Wait for previous alloc to terminate
Wait(context.Context) error

// Migrate data from previous alloc
Migrate(ctx context.Context, dest *allocdir.AllocDir) error

// IsWaiting returns true if a concurrent caller is blocked in Wait
IsWaiting() bool
}

// PrevAllocMigrator allows AllocRunners to migrate a previous allocation
// whether or not the previous allocation is local or remote.
type PrevAllocMigrator interface {
PrevAllocWatcher

// IsMigrating returns true if a concurrent caller is in Migrate
IsMigrating() bool

// Migrate data from previous alloc
Migrate(ctx context.Context, dest *allocdir.AllocDir) error
}

type Config struct {
// Alloc is the current allocation which may need to block on its
// previous allocation stopping.
Alloc *structs.Allocation

// PreviousRunner is non-nil iff All has a PreviousAllocation and it is
// PreviousRunner is non-nil if Alloc has a PreviousAllocation and it is
// running locally.
PreviousRunner AllocRunnerMeta

// PreemptedRunners is non-nil if Alloc has one or more PreemptedAllocations.
PreemptedRunners map[string]AllocRunnerMeta

// RPC allows the alloc watcher to monitor remote allocations.
RPC RPCer

Expand All @@ -85,47 +94,99 @@ type Config struct {
Logger hclog.Logger
}

// NewAllocWatcher creates a PrevAllocWatcher appropriate for whether this
// alloc's previous allocation was local or remote. If this alloc has no
// previous alloc then a noop implementation is returned.
func NewAllocWatcher(c Config) PrevAllocWatcher {
if c.Alloc.PreviousAllocation == "" {
// No previous allocation, use noop transitioner
return NoopPrevAlloc{}
func newMigratorForAlloc(c Config, tg *structs.TaskGroup, watchedAllocID string, m AllocRunnerMeta) PrevAllocMigrator {
logger := c.Logger.Named("alloc_migrator").With("alloc_id", c.Alloc.ID).With("previous_alloc", watchedAllocID)

tasks := tg.Tasks
sticky := tg.EphemeralDisk != nil && tg.EphemeralDisk.Sticky
migrate := tg.EphemeralDisk != nil && tg.EphemeralDisk.Migrate

if m != nil {
// Local Allocation because there's no meta
return &localPrevAlloc{
allocID: c.Alloc.ID,
prevAllocID: watchedAllocID,
tasks: tasks,
sticky: sticky,
prevAllocDir: m.GetAllocDir(),
prevListener: m.Listener(),
prevStatus: m.Alloc(),
logger: logger,
}
}

logger := c.Logger.Named("alloc_watcher")
logger = logger.With("alloc_id", c.Alloc.ID)
logger = logger.With("previous_alloc", c.Alloc.PreviousAllocation)
return &remotePrevAlloc{
allocID: c.Alloc.ID,
prevAllocID: c.Alloc.PreviousAllocation,
tasks: tasks,
config: c.Config,
migrate: migrate,
rpc: c.RPC,
migrateToken: c.MigrateToken,
logger: logger,
}
}

tg := c.Alloc.Job.LookupTaskGroup(c.Alloc.TaskGroup)
func newWatcherForAlloc(c Config, watchedAllocID string, m AllocRunnerMeta) PrevAllocWatcher {
logger := c.Logger.Named("alloc_watcher").With("alloc_id", c.Alloc.ID).With("previous_alloc", watchedAllocID)

if c.PreviousRunner != nil {
// Previous allocation is local, use local transitioner
if m != nil {
// Local Allocation because there's no meta
return &localPrevAlloc{
allocID: c.Alloc.ID,
prevAllocID: c.Alloc.PreviousAllocation,
tasks: tg.Tasks,
sticky: tg.EphemeralDisk != nil && tg.EphemeralDisk.Sticky,
prevAllocDir: c.PreviousRunner.GetAllocDir(),
prevListener: c.PreviousRunner.Listener(),
prevStatus: c.PreviousRunner.Alloc(),
prevAllocID: watchedAllocID,
prevAllocDir: m.GetAllocDir(),
prevListener: m.Listener(),
prevStatus: m.Alloc(),
logger: logger,
}
}

return &remotePrevAlloc{
allocID: c.Alloc.ID,
prevAllocID: c.Alloc.PreviousAllocation,
tasks: tg.Tasks,
config: c.Config,
migrate: tg.EphemeralDisk != nil && tg.EphemeralDisk.Migrate,
rpc: c.RPC,
migrateToken: c.MigrateToken,
logger: logger,
}
}

// NewAllocWatcher creates a PrevAllocWatcher appropriate for whether this
// alloc's previous allocation was local or remote. If this alloc has no
// previous alloc then a noop implementation is returned.
func NewAllocWatcher(c Config) (PrevAllocWatcher, PrevAllocMigrator) {
if c.Alloc.PreviousAllocation == "" && c.PreemptedRunners == nil {
return NoopPrevAlloc{}, NoopPrevAlloc{}
}

var prevAllocWatchers []PrevAllocWatcher
var prevAllocMigrator PrevAllocMigrator = NoopPrevAlloc{}

// We have a previous allocation, add its listener to the watchers, and
// use a migrator.
if c.Alloc.PreviousAllocation != "" {
tg := c.Alloc.Job.LookupTaskGroup(c.Alloc.TaskGroup)
m := newMigratorForAlloc(c, tg, c.Alloc.PreviousAllocation, c.PreviousRunner)
prevAllocWatchers = append(prevAllocWatchers, m)
prevAllocMigrator = m
}

// We are preempting allocations, add their listeners to the watchers.
if c.PreemptedRunners != nil {
for aid, r := range c.PreemptedRunners {
w := newWatcherForAlloc(c, aid, r)
prevAllocWatchers = append(prevAllocWatchers, w)
}
}

groupWatcher := &groupPrevAllocWatcher{
prevAllocs: prevAllocWatchers,
}

return groupWatcher, prevAllocMigrator
}

// localPrevAlloc is a prevAllocWatcher for previous allocations on the same
// node as an updated allocation.
type localPrevAlloc struct {
Expand Down
14 changes: 7 additions & 7 deletions client/allocwatcher/alloc_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,20 +97,20 @@ func TestPrevAlloc_Noop(t *testing.T) {

conf.Alloc.PreviousAllocation = ""

watcher := NewAllocWatcher(conf)
watcher, migrator := NewAllocWatcher(conf)
require.NotNil(t, watcher)
_, ok := watcher.(NoopPrevAlloc)
require.True(t, ok, "expected watcher to be NoopPrevAlloc")
_, ok := migrator.(NoopPrevAlloc)
require.True(t, ok, "expected migrator to be NoopPrevAlloc")

done := make(chan int, 2)
go func() {
watcher.Wait(context.Background())
done <- 1
watcher.Migrate(context.Background(), nil)
migrator.Migrate(context.Background(), nil)
done <- 1
}()
require.False(t, watcher.IsWaiting())
require.False(t, watcher.IsMigrating())
require.False(t, migrator.IsMigrating())
<-done
<-done
}
Expand All @@ -127,7 +127,7 @@ func TestPrevAlloc_LocalPrevAlloc_Block(t *testing.T) {
"run_for": "500ms",
}

waiter := NewAllocWatcher(conf)
_, waiter := NewAllocWatcher(conf)

// Wait in a goroutine with a context to make sure it exits at the right time
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -191,7 +191,7 @@ func TestPrevAlloc_LocalPrevAlloc_Terminated(t *testing.T) {

conf.PreviousRunner.Alloc().ClientStatus = structs.AllocClientStatusComplete

waiter := NewAllocWatcher(conf)
waiter, _ := NewAllocWatcher(conf)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
Expand Down
Loading

0 comments on commit 0a2fba0

Please sign in to comment.