Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new gc_max_allocs tuneable #2636

Merged
merged 6 commits into from
May 30, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ IMPROVEMENTS:
* api/job: Ability to revert job to older versions [GH-2575]
* client: Environment variables for client DC and Region [GH-2507]
* client: Hash host ID so its stable and well distributed [GH-2541]
* client: GC dead allocs if total allocs > `gc_max_allocs` tunable [GH-2636]
* client: Persist state using bolt-db and more efficient write patterns
[GH-2610]
* client: Fingerprint all routable addresses on an interface including IPv6
Expand Down
25 changes: 19 additions & 6 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,13 +240,15 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic

// Add the garbage collector
gcConfig := &GCConfig{
MaxAllocs: cfg.GCMaxAllocs,
DiskUsageThreshold: cfg.GCDiskUsageThreshold,
InodeUsageThreshold: cfg.GCInodeUsageThreshold,
Interval: cfg.GCInterval,
ParallelDestroys: cfg.GCParallelDestroys,
ReservedDiskMB: cfg.Node.Reserved.DiskMB,
}
c.garbageCollector = NewAllocGarbageCollector(logger, statsCollector, gcConfig)
c.garbageCollector = NewAllocGarbageCollector(logger, statsCollector, c, gcConfig)
go c.garbageCollector.Run()

// Setup the node
if err := c.setupNode(); err != nil {
Expand Down Expand Up @@ -482,17 +484,13 @@ func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
// Stats is used to return statistics for debugging and insight
// for various sub-systems
func (c *Client) Stats() map[string]map[string]string {
c.allocLock.RLock()
numAllocs := len(c.allocs)
c.allocLock.RUnlock()

c.heartbeatLock.Lock()
defer c.heartbeatLock.Unlock()
stats := map[string]map[string]string{
"client": map[string]string{
"node_id": c.Node().ID,
"known_servers": c.servers.all().String(),
"num_allocations": strconv.Itoa(numAllocs),
"num_allocations": strconv.Itoa(c.NumAllocs()),
"last_heartbeat": fmt.Sprintf("%v", time.Since(c.lastHeartbeat)),
"heartbeat_ttl": fmt.Sprintf("%v", c.heartbeatTTL),
},
Expand Down Expand Up @@ -722,6 +720,21 @@ func (c *Client) getAllocRunners() map[string]*AllocRunner {
return runners
}

// NumAllocs returns the number of allocs this client has. Used to
// fulfill the AllocCounter interface for the GC.
func (c *Client) NumAllocs() int {
c.allocLock.RLock()
c.blockedAllocsLock.Lock()
c.migratingAllocsLock.Lock()
n := len(c.allocs)
n += len(c.blockedAllocations)
n += len(c.migratingAllocs)
c.migratingAllocsLock.Unlock()
c.blockedAllocsLock.Unlock()
c.allocLock.RUnlock()
return n
}

// nodeID restores, or generates if necessary, a unique node ID and SecretID.
// The node ID is, if available, a persistent unique ID. The secret ID is a
// high-entropy random UUID.
Expand Down
5 changes: 5 additions & 0 deletions client/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ type Config struct {
// beyond which the Nomad client triggers GC of the terminal allocations
GCInodeUsageThreshold float64

// GCMaxAllocs is the maximum number of allocations a node can have
// before garbage collection is triggered.
GCMaxAllocs int

// LogLevel is the level of the logs to putout
LogLevel string

Expand Down Expand Up @@ -205,6 +209,7 @@ func DefaultConfig() *Config {
GCParallelDestroys: 2,
GCDiskUsageThreshold: 80,
GCInodeUsageThreshold: 70,
GCMaxAllocs: 50,
}
}

Expand Down
94 changes: 70 additions & 24 deletions client/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,37 @@ const (

// GCConfig allows changing the behaviour of the garbage collector
type GCConfig struct {
// MaxAllocs is the maximum number of allocations to track before a GC
// is triggered.
MaxAllocs int
DiskUsageThreshold float64
InodeUsageThreshold float64
Interval time.Duration
ReservedDiskMB int
ParallelDestroys int
}

// AllocCounter is used by AllocGarbageCollector to discover how many
// allocations a node has and is generally fulfilled by the Client.
type AllocCounter interface {
NumAllocs() int
}

// AllocGarbageCollector garbage collects terminated allocations on a node
type AllocGarbageCollector struct {
allocRunners *IndexedGCAllocPQ
statsCollector stats.NodeStatsCollector
allocCounter AllocCounter
config *GCConfig
logger *log.Logger
destroyCh chan struct{}
shutdownCh chan struct{}
}

// NewAllocGarbageCollector returns a garbage collector for terminated
// allocations on a node.
func NewAllocGarbageCollector(logger *log.Logger, statsCollector stats.NodeStatsCollector, config *GCConfig) *AllocGarbageCollector {
// allocations on a node. Must call Run() in a goroutine enable periodic
// garbage collection.
func NewAllocGarbageCollector(logger *log.Logger, statsCollector stats.NodeStatsCollector, ac AllocCounter, config *GCConfig) *AllocGarbageCollector {
// Require at least 1 to make progress
if config.ParallelDestroys <= 0 {
logger.Printf("[WARN] client: garbage collector defaulting parallism to 1 due to invalid input value of %d", config.ParallelDestroys)
Expand All @@ -47,17 +58,18 @@ func NewAllocGarbageCollector(logger *log.Logger, statsCollector stats.NodeStats
gc := &AllocGarbageCollector{
allocRunners: NewIndexedGCAllocPQ(),
statsCollector: statsCollector,
allocCounter: ac,
config: config,
logger: logger,
destroyCh: make(chan struct{}, config.ParallelDestroys),
shutdownCh: make(chan struct{}),
}

go gc.run()
return gc
}

func (a *AllocGarbageCollector) run() {
// Run the periodic garbage collector.
func (a *AllocGarbageCollector) Run() {
ticker := time.NewTicker(a.config.Interval)
for {
select {
Expand Down Expand Up @@ -100,31 +112,47 @@ func (a *AllocGarbageCollector) keepUsageBelowThreshold() error {
break
}

if diskStats.UsedPercent <= a.config.DiskUsageThreshold &&
diskStats.InodesUsedPercent <= a.config.InodeUsageThreshold {
reason := ""

switch {
case diskStats.UsedPercent > a.config.DiskUsageThreshold:
reason = fmt.Sprintf("disk usage of %.0f is over gc threshold of %.0f",
diskStats.UsedPercent, a.config.DiskUsageThreshold)
case diskStats.InodesUsedPercent > a.config.InodeUsageThreshold:
reason = fmt.Sprintf("inode usage of %.0f is over gc threshold of %.0f",
diskStats.InodesUsedPercent, a.config.InodeUsageThreshold)
case a.numAllocs() > a.config.MaxAllocs:
reason = fmt.Sprintf("number of allocations is over the limit (%d)", a.config.MaxAllocs)
}

// No reason to gc, exit
if reason == "" {
break
}

// Collect an allocation
gcAlloc := a.allocRunners.Pop()
if gcAlloc == nil {
a.logger.Printf("[WARN] client: garbage collection due to %s skipped because no terminal allocations", reason)
break
}

ar := gcAlloc.allocRunner
alloc := ar.Alloc()
a.logger.Printf("[INFO] client: garbage collecting allocation %v", alloc.ID)

// Destroy the alloc runner and wait until it exits
a.destroyAllocRunner(ar)
a.destroyAllocRunner(gcAlloc.allocRunner, reason)
}
return nil
}

// destroyAllocRunner is used to destroy an allocation runner. It will acquire a
// lock to restrict parallelism and then destroy the alloc runner, returning
// once the allocation has been destroyed.
func (a *AllocGarbageCollector) destroyAllocRunner(ar *AllocRunner) {
func (a *AllocGarbageCollector) destroyAllocRunner(ar *AllocRunner, reason string) {
id := "<nil>"
if alloc := ar.Alloc(); alloc != nil {
id = alloc.ID
}
a.logger.Printf("[INFO] client: garbage collecting allocation %s due to %s", id, reason)

// Acquire the destroy lock
select {
case <-a.shutdownCh:
Expand Down Expand Up @@ -155,11 +183,7 @@ func (a *AllocGarbageCollector) Collect(allocID string) error {
if err != nil {
return fmt.Errorf("unable to collect allocation %q: %v", allocID, err)
}

ar := gcAlloc.allocRunner
a.logger.Printf("[INFO] client: garbage collecting allocation %q", ar.Alloc().ID)

a.destroyAllocRunner(ar)
a.destroyAllocRunner(gcAlloc.allocRunner, "forced collection")
return nil
}

Expand All @@ -177,16 +201,34 @@ func (a *AllocGarbageCollector) CollectAll() error {
break
}

ar := gcAlloc.allocRunner
a.logger.Printf("[INFO] client: garbage collecting alloc runner for alloc %q", ar.Alloc().ID)
go a.destroyAllocRunner(ar)
go a.destroyAllocRunner(gcAlloc.allocRunner, "forced full collection")
}
return nil
}

// MakeRoomFor garbage collects enough number of allocations in the terminal
// state to make room for new allocations
func (a *AllocGarbageCollector) MakeRoomFor(allocations []*structs.Allocation) error {
// GC allocs until below the max limit + the new allocations
max := a.config.MaxAllocs - len(allocations)
for a.numAllocs() > max {
select {
case <-a.shutdownCh:
return nil
default:
}

gcAlloc := a.allocRunners.Pop()
if gcAlloc == nil {
// It's fine if we can't lower below the limit here as
// we'll keep trying to drop below the limit with each
// periodic gc
break
}

// Destroy the alloc runner and wait until it exits
a.destroyAllocRunner(gcAlloc.allocRunner, "new allocations")
}
totalResource := &structs.Resources{}
for _, alloc := range allocations {
if err := totalResource.Add(alloc.Resources); err != nil {
Expand Down Expand Up @@ -244,10 +286,9 @@ func (a *AllocGarbageCollector) MakeRoomFor(allocations []*structs.Allocation) e

ar := gcAlloc.allocRunner
alloc := ar.Alloc()
a.logger.Printf("[INFO] client: garbage collecting allocation %v", alloc.ID)

// Destroy the alloc runner and wait until it exits
a.destroyAllocRunner(ar)
a.destroyAllocRunner(ar, fmt.Sprintf("freeing %d MB for new allocations", alloc.Resources.DiskMB))

// Call stats collect again
diskCleared += alloc.Resources.DiskMB
Expand All @@ -261,8 +302,7 @@ func (a *AllocGarbageCollector) MarkForCollection(ar *AllocRunner) error {
return fmt.Errorf("nil allocation runner inserted for garbage collection")
}
if ar.Alloc() == nil {
a.logger.Printf("[INFO] client: alloc is nil, so garbage collecting")
a.destroyAllocRunner(ar)
a.destroyAllocRunner(ar, "alloc is nil")
}

a.logger.Printf("[INFO] client: marking allocation %v for GC", ar.Alloc().ID)
Expand All @@ -281,6 +321,12 @@ func (a *AllocGarbageCollector) Remove(ar *AllocRunner) {
}
}

// numAllocs returns the total number of allocs tracked by the client as well
// as those marked for GC.
func (a *AllocGarbageCollector) numAllocs() int {
return a.allocRunners.Length() + a.allocCounter.NumAllocs()
}

// GCAlloc wraps an allocation runner and an index enabling it to be used within
// a PQ
type GCAlloc struct {
Expand Down
Loading