Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix GC'd alloc tracking #3445

Merged
merged 4 commits into from
Oct 30, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 21 additions & 5 deletions client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,13 @@ type AllocRunner struct {
// to call.
prevAlloc prevAllocWatcher

// ctx is cancelled with exitFn to cause the alloc to be destroyed
// (stopped and GC'd).
ctx context.Context
exitFn context.CancelFunc

// waitCh is closed when the Run method exits. At that point the alloc
// has stopped and been GC'd.
waitCh chan struct{}

// State related fields
Expand Down Expand Up @@ -917,11 +922,6 @@ func (r *AllocRunner) handleDestroy() {
// state as we wait for a destroy.
alloc := r.Alloc()

//TODO(schmichael) updater can cause a GC which can block on this alloc
// runner shutting down. Since handleDestroy can be called by Run() we
// can't block shutdown here as it would cause a deadlock.
go r.updater(alloc)

// Broadcast and persist state synchronously
r.sendBroadcast(alloc)
if err := r.saveAllocRunnerState(); err != nil {
Expand All @@ -935,6 +935,11 @@ func (r *AllocRunner) handleDestroy() {
r.logger.Printf("[ERR] client: alloc %q unable unmount task directories: %v", r.allocID, err)
}

// Update the server with the alloc's status -- also marks the alloc as
// being eligible for GC, so from this point on the alloc can be gc'd
// at any time.
r.updater(alloc)

for {
select {
case <-r.ctx.Done():
Expand Down Expand Up @@ -1065,6 +1070,17 @@ func (r *AllocRunner) Destroy() {
r.allocBroadcast.Close()
}

// IsDestroyed returns true if the AllocRunner is not running and has been
// destroyed (GC'd).
func (r *AllocRunner) IsDestroyed() bool {
select {
case <-r.waitCh:
return true
default:
return false
}
}

// WaitCh returns a channel to wait for termination
func (r *AllocRunner) WaitCh() <-chan struct{} {
return r.waitCh
Expand Down
72 changes: 46 additions & 26 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ type Client struct {
// successfully
serversDiscoveredCh chan struct{}

// allocs is the current set of allocations
// allocs maps alloc IDs to their AllocRunner. This map includes all
// AllocRunners - running and GC'd - until the server GCs them.
allocs map[string]*AllocRunner
allocLock sync.RWMutex

Expand Down Expand Up @@ -486,15 +487,16 @@ func (c *Client) Stats() map[string]map[string]string {
return stats
}

// CollectAllocation garbage collects a single allocation
func (c *Client) CollectAllocation(allocID string) error {
// CollectAllocation garbage collects a single allocation on a node. Returns
// true if alloc was found and garbage collected; otherwise false.
func (c *Client) CollectAllocation(allocID string) bool {
return c.garbageCollector.Collect(allocID)
}

// CollectAllAllocs garbage collects all allocations on a node in the terminal
// state
func (c *Client) CollectAllAllocs() error {
return c.garbageCollector.CollectAll()
func (c *Client) CollectAllAllocs() {
c.garbageCollector.CollectAll()
}

// Node returns the locally registered node
Expand Down Expand Up @@ -721,11 +723,16 @@ func (c *Client) getAllocRunners() map[string]*AllocRunner {
return runners
}

// NumAllocs returns the number of allocs this client has. Used to
// NumAllocs returns the number of un-GC'd allocs this client has. Used to
// fulfill the AllocCounter interface for the GC.
func (c *Client) NumAllocs() int {
n := 0
c.allocLock.RLock()
n := len(c.allocs)
for _, a := range c.allocs {
if !a.IsDestroyed() {
n++
}
}
c.allocLock.RUnlock()
return n
}
Expand Down Expand Up @@ -1205,6 +1212,7 @@ func (c *Client) updateNodeStatus() error {
for _, s := range resp.Servers {
addr, err := resolveServer(s.RPCAdvertiseAddr)
if err != nil {
c.logger.Printf("[WARN] client: ignoring invalid server %q: %v", s.RPCAdvertiseAddr, err)
continue
}
e := endpoint{name: s.RPCAdvertiseAddr, addr: addr}
Expand Down Expand Up @@ -1234,9 +1242,19 @@ func (c *Client) updateNodeStatus() error {
// updateAllocStatus is used to update the status of an allocation
func (c *Client) updateAllocStatus(alloc *structs.Allocation) {
if alloc.Terminated() {
// Terminated, mark for GC
if ar, ok := c.getAllocRunners()[alloc.ID]; ok {
// Terminated, mark for GC if we're still tracking this alloc
// runner. If it's not being tracked that means the server has
// already GC'd it (see removeAlloc).
c.allocLock.RLock()
ar, ok := c.allocs[alloc.ID]
c.allocLock.RUnlock()

if ok {
c.garbageCollector.MarkForCollection(ar)

// Trigger a GC in case we're over thresholds and just
// waiting for eligible allocs.
c.garbageCollector.Trigger()
}
}

Expand Down Expand Up @@ -1531,9 +1549,7 @@ func (c *Client) runAllocs(update *allocUpdates) {

// Remove the old allocations
for _, remove := range diff.removed {
if err := c.removeAlloc(remove); err != nil {
c.logger.Printf("[ERR] client: failed to remove alloc '%s': %v", remove.ID, err)
}
c.removeAlloc(remove)
}

// Update the existing allocations
Expand All @@ -1544,6 +1560,11 @@ func (c *Client) runAllocs(update *allocUpdates) {
}
}

// Make room for new allocations before running
if err := c.garbageCollector.MakeRoomFor(diff.added); err != nil {
c.logger.Printf("[ERR] client: error making room for new allocations: %v", err)
}

// Start the new allocations
for _, add := range diff.added {
migrateToken := update.migrateTokens[add.ID]
Expand All @@ -1552,26 +1573,33 @@ func (c *Client) runAllocs(update *allocUpdates) {
add.ID, err)
}
}

// Trigger the GC once more now that new allocs are started that could
// have caused thesholds to be exceeded
c.garbageCollector.Trigger()
}

// removeAlloc is invoked when we should remove an allocation
func (c *Client) removeAlloc(alloc *structs.Allocation) error {
// removeAlloc is invoked when we should remove an allocation because it has
// been removed by the server.
func (c *Client) removeAlloc(alloc *structs.Allocation) {
c.allocLock.Lock()
ar, ok := c.allocs[alloc.ID]
if !ok {
c.allocLock.Unlock()
c.logger.Printf("[WARN] client: missing context for alloc '%s'", alloc.ID)
return nil
return
}

// Stop tracking alloc runner as it's been GC'd by the server
delete(c.allocs, alloc.ID)
c.allocLock.Unlock()

// Ensure the GC has a reference and then collect. Collecting through the GC
// applies rate limiting
c.garbageCollector.MarkForCollection(ar)
go c.garbageCollector.Collect(alloc.ID)

return nil
// GC immediately since the server has GC'd it
go c.garbageCollector.Collect(alloc.ID)
}

// updateAlloc is invoked when we should update an allocation
Expand All @@ -1592,9 +1620,9 @@ func (c *Client) updateAlloc(exist, update *structs.Allocation) error {
func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error {
// Check if we already have an alloc runner
c.allocLock.Lock()
defer c.allocLock.Unlock()
if _, ok := c.allocs[alloc.ID]; ok {
c.logger.Printf("[DEBUG]: client: dropping duplicate add allocation request: %q", alloc.ID)
c.allocLock.Unlock()
return nil
}

Expand All @@ -1618,14 +1646,6 @@ func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error
c.logger.Printf("[WARN] client: initial save state for alloc %q failed: %v", alloc.ID, err)
}

// Must release allocLock as GC acquires it to count allocs
c.allocLock.Unlock()

// Make room for the allocation before running it
if err := c.garbageCollector.MakeRoomFor([]*structs.Allocation{alloc}); err != nil {
c.logger.Printf("[ERR] client: error making room for allocation: %v", err)
}

go ar.Run()
return nil
}
Expand Down
17 changes: 8 additions & 9 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ func testServer(t *testing.T, cb func(*nomad.Config)) (*nomad.Server, string) {
cb(config)
}

// Enable raft as leader if we have bootstrap on
config.RaftConfig.StartAsLeader = !config.DevDisableBootstrap

for i := 10; i >= 0; i-- {
ports := freeport.GetT(t, 2)
config.RPCAddr = &net.TCPAddr{
Expand Down Expand Up @@ -657,7 +660,6 @@ func TestClient_WatchAllocs(t *testing.T) {
alloc2.JobID = job.ID
alloc2.Job = job

// Insert at zero so they are pulled
state := s1.State()
if err := state.UpsertJob(100, job); err != nil {
t.Fatal(err)
Expand All @@ -681,23 +683,20 @@ func TestClient_WatchAllocs(t *testing.T) {
})

// Delete one allocation
err = state.DeleteEval(103, nil, []string{alloc1.ID})
if err != nil {
if err := state.DeleteEval(103, nil, []string{alloc1.ID}); err != nil {
t.Fatalf("err: %v", err)
}

// Update the other allocation. Have to make a copy because the allocs are
// shared in memory in the test and the modify index would be updated in the
// alloc runner.
alloc2_2 := new(structs.Allocation)
*alloc2_2 = *alloc2
alloc2_2 := alloc2.Copy()
alloc2_2.DesiredStatus = structs.AllocDesiredStatusStop
err = state.UpsertAllocs(104, []*structs.Allocation{alloc2_2})
if err != nil {
t.Fatalf("err: %v", err)
if err := state.UpsertAllocs(104, []*structs.Allocation{alloc2_2}); err != nil {
t.Fatalf("err upserting stopped alloc: %v", err)
}

// One allocations should get de-registered
// One allocation should get GC'd and removed
testutil.WaitForResult(func() (bool, error) {
c1.allocLock.RLock()
num := len(c1.allocs)
Expand Down
Loading