Skip to content

Commit

Permalink
Trigger GCs after alloc changes
Browse files Browse the repository at this point in the history
GC much more aggressively by triggering GCs when allocations become
terminal as well as after new allocations are added.
  • Loading branch information
schmichael committed Oct 26, 2017
1 parent 0a6344a commit 307a1e5
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 19 deletions.
6 changes: 5 additions & 1 deletion client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -921,7 +921,6 @@ func (r *AllocRunner) handleDestroy() {
// Final state sync. We do this to ensure that the server has the correct
// state as we wait for a destroy.
alloc := r.Alloc()
r.updater(alloc)

// Broadcast and persist state synchronously
r.sendBroadcast(alloc)
Expand All @@ -936,6 +935,11 @@ func (r *AllocRunner) handleDestroy() {
r.logger.Printf("[ERR] client: alloc %q unable unmount task directories: %v", r.allocID, err)
}

// Update the server with the alloc's status -- also marks the alloc as
// being eligible for GC, so from this point on the alloc can be gc'd
// at any time.
r.updater(alloc)

for {
select {
case <-r.ctx.Done():
Expand Down
8 changes: 8 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1250,6 +1250,10 @@ func (c *Client) updateAllocStatus(alloc *structs.Allocation) {

if ok {
c.garbageCollector.MarkForCollection(ar)

// Trigger a GC in case we're over thresholds and just
// waiting for eligible allocs.
c.garbageCollector.Trigger()
}
}

Expand Down Expand Up @@ -1568,6 +1572,10 @@ func (c *Client) runAllocs(update *allocUpdates) {
add.ID, err)
}
}

// Trigger the GC once more now that new allocs are started that could
// have caused thesholds to be exceeded
c.garbageCollector.Trigger()
}

// removeAlloc is invoked when we should remove an allocation because it has
Expand Down
61 changes: 47 additions & 14 deletions client/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,28 @@ type AllocCounter interface {

// AllocGarbageCollector garbage collects terminated allocations on a node
type AllocGarbageCollector struct {
allocRunners *IndexedGCAllocPQ
config *GCConfig

// allocRunners marked for GC
allocRunners *IndexedGCAllocPQ

// statsCollector for node based thresholds (eg disk)
statsCollector stats.NodeStatsCollector
allocCounter AllocCounter
config *GCConfig
logger *log.Logger
destroyCh chan struct{}
shutdownCh chan struct{}

// allocCounter return the number of un-GC'd allocs on this node
allocCounter AllocCounter

// destroyCh is a semaphore for rate limiting concurrent garbage
// collections
destroyCh chan struct{}

// shutdownCh is closed when the GC's run method should exit
shutdownCh chan struct{}

// triggerCh is ticked by the Trigger method to cause a GC
triggerCh chan struct{}

logger *log.Logger
}

// NewAllocGarbageCollector returns a garbage collector for terminated
Expand All @@ -51,7 +66,7 @@ type AllocGarbageCollector struct {
func NewAllocGarbageCollector(logger *log.Logger, statsCollector stats.NodeStatsCollector, ac AllocCounter, config *GCConfig) *AllocGarbageCollector {
// Require at least 1 to make progress
if config.ParallelDestroys <= 0 {
logger.Printf("[WARN] client: garbage collector defaulting parallism to 1 due to invalid input value of %d", config.ParallelDestroys)
logger.Printf("[WARN] client.gc: garbage collector defaulting parallism to 1 due to invalid input value of %d", config.ParallelDestroys)
config.ParallelDestroys = 1
}

Expand All @@ -63,6 +78,7 @@ func NewAllocGarbageCollector(logger *log.Logger, statsCollector stats.NodeStats
logger: logger,
destroyCh: make(chan struct{}, config.ParallelDestroys),
shutdownCh: make(chan struct{}),
triggerCh: make(chan struct{}, 1),
}

return gc
Expand All @@ -71,16 +87,28 @@ func NewAllocGarbageCollector(logger *log.Logger, statsCollector stats.NodeStats
// Run the periodic garbage collector.
func (a *AllocGarbageCollector) Run() {
ticker := time.NewTicker(a.config.Interval)
a.logger.Printf("[DEBUG] client.gc: GC'ing ever %v", a.config.Interval)
for {
select {
case <-a.triggerCh:
case <-ticker.C:
if err := a.keepUsageBelowThreshold(); err != nil {
a.logger.Printf("[ERR] client: error garbage collecting allocation: %v", err)
}
case <-a.shutdownCh:
ticker.Stop()
return
}

if err := a.keepUsageBelowThreshold(); err != nil {
a.logger.Printf("[ERR] client.gc: error garbage collecting allocation: %v", err)
}
}
}

// Force the garbage collector to run.
func (a *AllocGarbageCollector) Trigger() {
select {
case a.triggerCh <- struct{}{}:
default:
// already triggered
}
}

Expand Down Expand Up @@ -116,15 +144,15 @@ func (a *AllocGarbageCollector) keepUsageBelowThreshold() error {
reason = fmt.Sprintf("number of allocations (%d) is over the limit (%d)", liveAllocs, a.config.MaxAllocs)
}

// No reason to gc, exit
if reason == "" {
// No reason to gc, exit
break
}

// Collect an allocation
gcAlloc := a.allocRunners.Pop()
if gcAlloc == nil {
a.logger.Printf("[WARN] client: garbage collection due to %s skipped because no terminal allocations", reason)
a.logger.Printf("[WARN] client.gc: garbage collection due to %s skipped because no terminal allocations", reason)
break
}

Expand All @@ -142,7 +170,7 @@ func (a *AllocGarbageCollector) destroyAllocRunner(ar *AllocRunner, reason strin
if alloc := ar.Alloc(); alloc != nil {
id = alloc.ID
}
a.logger.Printf("[INFO] client: garbage collecting allocation %s due to %s", id, reason)
a.logger.Printf("[INFO] client.gc: garbage collecting allocation %s due to %s", id, reason)

// Acquire the destroy lock
select {
Expand All @@ -158,7 +186,7 @@ func (a *AllocGarbageCollector) destroyAllocRunner(ar *AllocRunner, reason strin
case <-a.shutdownCh:
}

a.logger.Printf("[DEBUG] client: garbage collected %q", ar.Alloc().ID)
a.logger.Printf("[DEBUG] client.gc: garbage collected %q", ar.Alloc().ID)

// Release the lock
<-a.destroyCh
Expand Down Expand Up @@ -199,6 +227,11 @@ func (a *AllocGarbageCollector) CollectAll() {
// MakeRoomFor garbage collects enough number of allocations in the terminal
// state to make room for new allocations
func (a *AllocGarbageCollector) MakeRoomFor(allocations []*structs.Allocation) error {
if len(allocations) == 0 {
// Nothing to make room for!
return nil
}

// GC allocs until below the max limit + the new allocations
max := a.config.MaxAllocs - len(allocations)
for a.allocCounter.NumAllocs() > max {
Expand Down
19 changes: 15 additions & 4 deletions client/gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,23 +310,31 @@ func TestAllocGarbageCollector_MakeRoomForAllocations_MaxAllocs(t *testing.T) {
defer client.Shutdown()
waitTilNodeReady(client, t)

callN := 0
assertAllocs := func(expectedAll, expectedDestroyed int) {
// Wait for allocs to be started
callN++
client.logger.Printf("[TEST] %d -- Waiting for %d total allocs, %d GC'd", callN, expectedAll, expectedDestroyed)
testutil.WaitForResult(func() (bool, error) {
all, destroyed := 0, 0
for _, ar := range client.getAllocRunners() {
all++
if ar.IsDestroyed() {
destroyed++
}

// assert is waiting
// 2017/10/26 21:38:01.649166
}
return all == expectedAll && destroyed == expectedDestroyed, fmt.Errorf(
"expected %d allocs (found %d); expected %d destroy (found %d)",
expectedAll, all, expectedDestroyed, destroyed,
)
}, func(err error) {
t.Fatalf("alloc state: %v", err)
client.logger.Printf("[TEST] %d -- FAILED to find %d total allocs, %d GC'd!", callN, expectedAll, expectedDestroyed)
t.Fatalf("%d alloc state: %v", callN, err)
})
client.logger.Printf("[TEST] %d -- Found %d total allocs, %d GC'd!", callN, expectedAll, expectedDestroyed)
}

// Create a job
Expand Down Expand Up @@ -374,15 +382,18 @@ func TestAllocGarbageCollector_MakeRoomForAllocations_MaxAllocs(t *testing.T) {
t.Fatalf("error upserting stopped allocs: %v", err)
}

// 7 total, 0 GC'd still, but 4 should be marked for GC
assertAllocs(7, 0)
// 7 total, 1 GC'd to get down to limit of 6
assertAllocs(7, 1)

// Add one more alloc
if err := state.UpsertAllocs(102, []*structs.Allocation{newAlloc()}); err != nil {
t.Fatalf("error upserting new alloc: %v", err)
}

// 8 total, 2 GC'd to get down to limit of 6
// 8 total, 1 GC'd to get down to limit of 6
// If this fails it may be due to the gc's Run and MarkRoomFor methods
// gc'ing concurrently. May have to disable gc's run loop if this test
// is flaky.
assertAllocs(8, 2)

// Add new allocs to cause the gc of old terminal ones
Expand Down

0 comments on commit 307a1e5

Please sign in to comment.