Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
schmichael committed Oct 26, 2017
1 parent f21e19c commit 45080ab
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 7 deletions.
6 changes: 5 additions & 1 deletion client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -921,7 +921,6 @@ func (r *AllocRunner) handleDestroy() {
// Final state sync. We do this to ensure that the server has the correct
// state as we wait for a destroy.
alloc := r.Alloc()
r.updater(alloc)

// Broadcast and persist state synchronously
r.sendBroadcast(alloc)
Expand All @@ -936,6 +935,11 @@ func (r *AllocRunner) handleDestroy() {
r.logger.Printf("[ERR] client: alloc %q unable unmount task directories: %v", r.allocID, err)
}

// Update the server with the alloc's status -- also marks the alloc as
// being eligible for GC, so from this point on the alloc can be gc'd
// at any time.
r.updater(alloc)

for {
select {
case <-r.ctx.Done():
Expand Down
15 changes: 10 additions & 5 deletions client/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type AllocGarbageCollector struct {
func NewAllocGarbageCollector(logger *log.Logger, statsCollector stats.NodeStatsCollector, ac AllocCounter, config *GCConfig) *AllocGarbageCollector {
// Require at least 1 to make progress
if config.ParallelDestroys <= 0 {
logger.Printf("[WARN] client: garbage collector defaulting parallism to 1 due to invalid input value of %d", config.ParallelDestroys)
logger.Printf("[WARN] client.gc: garbage collector defaulting parallism to 1 due to invalid input value of %d", config.ParallelDestroys)
config.ParallelDestroys = 1
}

Expand All @@ -75,7 +75,7 @@ func (a *AllocGarbageCollector) Run() {
select {
case <-ticker.C:
if err := a.keepUsageBelowThreshold(); err != nil {
a.logger.Printf("[ERR] client: error garbage collecting allocation: %v", err)
a.logger.Printf("[ERR] client.gc: error garbage collecting allocation: %v", err)
}
case <-a.shutdownCh:
ticker.Stop()
Expand Down Expand Up @@ -124,7 +124,7 @@ func (a *AllocGarbageCollector) keepUsageBelowThreshold() error {
// Collect an allocation
gcAlloc := a.allocRunners.Pop()
if gcAlloc == nil {
a.logger.Printf("[WARN] client: garbage collection due to %s skipped because no terminal allocations", reason)
a.logger.Printf("[WARN] client.gc: garbage collection due to %s skipped because no terminal allocations", reason)
break
}

Expand All @@ -142,7 +142,7 @@ func (a *AllocGarbageCollector) destroyAllocRunner(ar *AllocRunner, reason strin
if alloc := ar.Alloc(); alloc != nil {
id = alloc.ID
}
a.logger.Printf("[INFO] client: garbage collecting allocation %s due to %s", id, reason)
a.logger.Printf("[INFO] client.gc: garbage collecting allocation %s due to %s", id, reason)

// Acquire the destroy lock
select {
Expand All @@ -158,7 +158,7 @@ func (a *AllocGarbageCollector) destroyAllocRunner(ar *AllocRunner, reason strin
case <-a.shutdownCh:
}

a.logger.Printf("[DEBUG] client: garbage collected %q", ar.Alloc().ID)
a.logger.Printf("[DEBUG] client.gc: garbage collected %q", ar.Alloc().ID)

// Release the lock
<-a.destroyCh
Expand Down Expand Up @@ -199,6 +199,11 @@ func (a *AllocGarbageCollector) CollectAll() {
// MakeRoomFor garbage collects enough number of allocations in the terminal
// state to make room for new allocations
func (a *AllocGarbageCollector) MakeRoomFor(allocations []*structs.Allocation) error {
if len(allocations) == 0 {
// Nothing to make room for!
return nil
}

// GC allocs until below the max limit + the new allocations
max := a.config.MaxAllocs - len(allocations)
for a.numAllocs() > max {
Expand Down
10 changes: 9 additions & 1 deletion client/gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,23 +310,31 @@ func TestAllocGarbageCollector_MakeRoomForAllocations_MaxAllocs(t *testing.T) {
defer client.Shutdown()
waitTilNodeReady(client, t)

callN := 0
assertAllocs := func(expectedAll, expectedDestroyed int) {
// Wait for allocs to be started
callN++
client.logger.Printf("[TEST] %d -- Waiting for %d total allocs, %d GC'd", callN, expectedAll, expectedDestroyed)
testutil.WaitForResult(func() (bool, error) {
all, destroyed := 0, 0
for _, ar := range client.getAllocRunners() {
all++
if ar.IsDestroyed() {
destroyed++
}

// assert is waiting
// 2017/10/26 21:38:01.649166
}
return all == expectedAll && destroyed == expectedDestroyed, fmt.Errorf(
"expected %d allocs (found %d); expected %d destroy (found %d)",
expectedAll, all, expectedDestroyed, destroyed,
)
}, func(err error) {
t.Fatalf("alloc state: %v", err)
client.logger.Printf("[TEST] %d -- FAILED to find %d total allocs, %d GC'd!", callN, expectedAll, expectedDestroyed)
t.Fatalf("%d alloc state: %v", callN, err)
})
client.logger.Printf("[TEST] %d -- Found %d total allocs, %d GC'd!", callN, expectedAll, expectedDestroyed)
}

// Create a job
Expand Down

0 comments on commit 45080ab

Please sign in to comment.