Skip to content

Commit

Permalink
Fix GC'd alloc tracking
Browse files Browse the repository at this point in the history
The Client.allocs map now contains all AllocRunners again, not just
un-GC'd AllocRunners. Client.allocs is only pruned when the server GCs
allocs.

Also stops logging "marked for GC" twice.
  • Loading branch information
schmichael committed Oct 25, 2017
1 parent b0304d1 commit 5211ce8
Show file tree
Hide file tree
Showing 7 changed files with 209 additions and 170 deletions.
22 changes: 17 additions & 5 deletions client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,13 @@ type AllocRunner struct {
// to call.
prevAlloc prevAllocWatcher

// ctx is cancelled with exitFn to cause the alloc to be destroyed
// (stopped and GC'd).
ctx context.Context
exitFn context.CancelFunc

// waitCh is closed when the Run method exits. At that point the alloc
// has stopped and been GC'd.
waitCh chan struct{}

// State related fields
Expand Down Expand Up @@ -916,11 +921,7 @@ func (r *AllocRunner) handleDestroy() {
// Final state sync. We do this to ensure that the server has the correct
// state as we wait for a destroy.
alloc := r.Alloc()

//TODO(schmichael) updater can cause a GC which can block on this alloc
// runner shutting down. Since handleDestroy can be called by Run() we
// can't block shutdown here as it would cause a deadlock.
go r.updater(alloc)
r.updater(alloc)

// Broadcast and persist state synchronously
r.sendBroadcast(alloc)
Expand Down Expand Up @@ -1065,6 +1066,17 @@ func (r *AllocRunner) Destroy() {
r.allocBroadcast.Close()
}

// IsDestroyed returns true if the AllocRunner is not running and has been
// destroyed (GC'd).
func (r *AllocRunner) IsDestroyed() bool {
select {
case <-r.waitCh:
return true
default:
return false
}
}

// WaitCh returns a channel to wait for termination
func (r *AllocRunner) WaitCh() <-chan struct{} {
return r.waitCh
Expand Down
63 changes: 38 additions & 25 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ type Client struct {
// successfully
serversDiscoveredCh chan struct{}

// allocs is the current set of allocations
// allocs maps alloc IDs to their AllocRunner. This map includes all
// AllocRunners - running and GC'd - until the server GCs them.
allocs map[string]*AllocRunner
allocLock sync.RWMutex

Expand Down Expand Up @@ -487,14 +488,14 @@ func (c *Client) Stats() map[string]map[string]string {
}

// CollectAllocation garbage collects a single allocation
func (c *Client) CollectAllocation(allocID string) error {
return c.garbageCollector.Collect(allocID)
func (c *Client) CollectAllocation(allocID string) {
c.garbageCollector.Collect(allocID)
}

// CollectAllAllocs garbage collects all allocations on a node in the terminal
// state
func (c *Client) CollectAllAllocs() error {
return c.garbageCollector.CollectAll()
func (c *Client) CollectAllAllocs() {
c.garbageCollector.CollectAll()
}

// Node returns the locally registered node
Expand Down Expand Up @@ -721,11 +722,16 @@ func (c *Client) getAllocRunners() map[string]*AllocRunner {
return runners
}

// NumAllocs returns the number of allocs this client has. Used to
// NumAllocs returns the number of un-GC'd allocs this client has. Used to
// fulfill the AllocCounter interface for the GC.
func (c *Client) NumAllocs() int {
n := 0
c.allocLock.RLock()
n := len(c.allocs)
for _, a := range c.allocs {
if !a.IsDestroyed() {
n++
}
}
c.allocLock.RUnlock()
return n
}
Expand Down Expand Up @@ -1205,6 +1211,7 @@ func (c *Client) updateNodeStatus() error {
for _, s := range resp.Servers {
addr, err := resolveServer(s.RPCAdvertiseAddr)
if err != nil {
c.logger.Printf("[DEBUG] client: ignoring invalid server %q: %v", s.RPCAdvertiseAddr, err)
continue
}
e := endpoint{name: s.RPCAdvertiseAddr, addr: addr}
Expand Down Expand Up @@ -1234,8 +1241,14 @@ func (c *Client) updateNodeStatus() error {
// updateAllocStatus is used to update the status of an allocation
func (c *Client) updateAllocStatus(alloc *structs.Allocation) {
if alloc.Terminated() {
// Terminated, mark for GC
if ar, ok := c.getAllocRunners()[alloc.ID]; ok {
// Terminated, mark for GC iff we're still tracking this alloc
// runner. If it's not being tracked that means the server has
// already GC'd it (see removeAlloc).
c.allocLock.RLock()
ar, ok := c.allocs[alloc.ID]
c.allocLock.RUnlock()

if ok {
c.garbageCollector.MarkForCollection(ar)
}
}
Expand Down Expand Up @@ -1531,9 +1544,7 @@ func (c *Client) runAllocs(update *allocUpdates) {

// Remove the old allocations
for _, remove := range diff.removed {
if err := c.removeAlloc(remove); err != nil {
c.logger.Printf("[ERR] client: failed to remove alloc '%s': %v", remove.ID, err)
}
c.removeAlloc(remove)
}

// Update the existing allocations
Expand All @@ -1544,6 +1555,11 @@ func (c *Client) runAllocs(update *allocUpdates) {
}
}

// Make room for new allocations before running
if err := c.garbageCollector.MakeRoomFor(diff.added); err != nil {
c.logger.Printf("[ERR] client: error making room for new allocations: %v", err)
}

// Start the new allocations
for _, add := range diff.added {
migrateToken := update.migrateTokens[add.ID]
Expand All @@ -1554,24 +1570,29 @@ func (c *Client) runAllocs(update *allocUpdates) {
}
}

// removeAlloc is invoked when we should remove an allocation
func (c *Client) removeAlloc(alloc *structs.Allocation) error {
// removeAlloc is invoked when we should remove an allocation because it has
// been removed by the server.
func (c *Client) removeAlloc(alloc *structs.Allocation) {
c.allocLock.Lock()
ar, ok := c.allocs[alloc.ID]
if !ok {
c.allocLock.Unlock()
c.logger.Printf("[WARN] client: missing context for alloc '%s'", alloc.ID)
return nil
return
}

// Stop tracking alloc runner as it's been GC'd by the server
delete(c.allocs, alloc.ID)
c.allocLock.Unlock()

// Ensure the GC has a reference and then collect. Collecting through the GC
// applies rate limiting
c.garbageCollector.MarkForCollection(ar)

// GC immediately since the server has GC'd it
go c.garbageCollector.Collect(alloc.ID)

return nil
return
}

// updateAlloc is invoked when we should update an allocation
Expand All @@ -1592,9 +1613,9 @@ func (c *Client) updateAlloc(exist, update *structs.Allocation) error {
func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error {
// Check if we already have an alloc runner
c.allocLock.Lock()
defer c.allocLock.Unlock()
if _, ok := c.allocs[alloc.ID]; ok {
c.logger.Printf("[DEBUG]: client: dropping duplicate add allocation request: %q", alloc.ID)
c.allocLock.Unlock()
return nil
}

Expand All @@ -1618,14 +1639,6 @@ func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error
c.logger.Printf("[WARN] client: initial save state for alloc %q failed: %v", alloc.ID, err)
}

// Must release allocLock as GC acquires it to count allocs
c.allocLock.Unlock()

// Make room for the allocation before running it
if err := c.garbageCollector.MakeRoomFor([]*structs.Allocation{alloc}); err != nil {
c.logger.Printf("[ERR] client: error making room for allocation: %v", err)
}

go ar.Run()
return nil
}
Expand Down
17 changes: 8 additions & 9 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ func testServer(t *testing.T, cb func(*nomad.Config)) (*nomad.Server, string) {
cb(config)
}

// Enable raft as leader if we have bootstrap on
config.RaftConfig.StartAsLeader = !config.DevDisableBootstrap

for i := 10; i >= 0; i-- {
ports := freeport.GetT(t, 2)
config.RPCAddr = &net.TCPAddr{
Expand Down Expand Up @@ -657,7 +660,6 @@ func TestClient_WatchAllocs(t *testing.T) {
alloc2.JobID = job.ID
alloc2.Job = job

// Insert at zero so they are pulled
state := s1.State()
if err := state.UpsertJob(100, job); err != nil {
t.Fatal(err)
Expand All @@ -681,23 +683,20 @@ func TestClient_WatchAllocs(t *testing.T) {
})

// Delete one allocation
err = state.DeleteEval(103, nil, []string{alloc1.ID})
if err != nil {
if err := state.DeleteEval(103, nil, []string{alloc1.ID}); err != nil {
t.Fatalf("err: %v", err)
}

// Update the other allocation. Have to make a copy because the allocs are
// shared in memory in the test and the modify index would be updated in the
// alloc runner.
alloc2_2 := new(structs.Allocation)
*alloc2_2 = *alloc2
alloc2_2 := alloc2.Copy()
alloc2_2.DesiredStatus = structs.AllocDesiredStatusStop
err = state.UpsertAllocs(104, []*structs.Allocation{alloc2_2})
if err != nil {
t.Fatalf("err: %v", err)
if err := state.UpsertAllocs(104, []*structs.Allocation{alloc2_2}); err != nil {
t.Fatalf("err upserting stopped alloc: %v", err)
}

// One allocations should get de-registered
// One allocation should get GC'd and removed
testutil.WaitForResult(func() (bool, error) {
c1.allocLock.RLock()
num := len(c1.allocs)
Expand Down
Loading

0 comments on commit 5211ce8

Please sign in to comment.