Skip to content

Commit

Permalink
pillar/watcher: Make goroutine leak detector configurable.
Browse files Browse the repository at this point in the history
Added new configuration options for the goroutine leak detector,
enabling adjustment of parameters such as threshold, check interval,
analysis window, stats retention, and cooldown period through global
settings. This enhancement allows for dynamic tuning of leak detection
based on deployment needs without code modification.

Introduced `GoroutineLeakDetectionParams` struct to manage these
settings, along with validation to ensure parameters meet minimum
requirements. Updated the `watcher` component to use this configurable
setup, and integrated new global config keys to hold these values.

Signed-off-by: Nikolay Martyanov <nikolay@zededa.com>
  • Loading branch information
OhmSpectator committed Nov 7, 2024
1 parent d91b50e commit 1721635
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 25 deletions.
8 changes: 6 additions & 2 deletions docs/CONFIG-PROPERTIES.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# EVE Runtime Configuration Properties

| Name | Type | Default | Description |
| ---- | ---- | ------- | ----------- |
| ---- | ---- | ------ | ----------- |
| app.allow.vnc | boolean | false (only local access) | allow access to EVE's VNC ports from external IPs |
| app.fml.resolution | string | notset | Set system-wide value of forced resolution for applications running in FML mode, it can be one of [predefined](/pkg/pillar/types/global.go) FmlResolution* values. |
| timer.config.interval | integer in seconds | 60 | how frequently device gets config |
Expand Down Expand Up @@ -64,7 +64,11 @@
| network.switch.enable.arpsnoop | boolean | true | enable ARP Snooping on switch Network Instances |
| wwan.query.visible.providers | bool | false | enable to periodically (once per hour) query the set of visible cellular service providers and publish them under WirelessStatus (for every modem) |
| network.local.legacy.mac.address | bool | false | enables legacy MAC address generation for local network instances for those EVE nodes where changing MAC addresses in applications will lead to incorrect network configuration |

| goroutine.leak.detection.threshold | integer | 5000 | Amount of goroutines, reaching which will trigger leak detection regardless of growth rate. |
| goroutine.leak.detection.check.interval.minutes | integer (minutes) | 1 | Interval in minutes between the measurements of the goroutine count. |
| goroutine.leak.detection.check.window.minutes | integer (minutes) | 10 | Interval in minutes for which the leak analysis is performed. It should contain at least 10 measurements, so no less than 10 × goroutine.leak.detection.check.interval.minutes. |
| goroutine.leak.detection.keep.stats.hours | integer (hours) | 24 | Amount of hours to keep the stats for leak detection. We keep more stats than the check window to be able to react to settings with a bigger check window via configuration. |
| goroutine.leak.detection.cooldown.minutes | integer (minutes) | 5 | Cooldown period in minutes after the leak detection is triggered. During this period, no stack traces are collected; only warning messages are logged. |
In addition, there can be per-agent settings.
The Per-agent settings begin with "agent.*agentname*.*setting*"
The following per-agent settings override the corresponding default ones:
Expand Down
141 changes: 124 additions & 17 deletions pkg/pillar/cmd/watcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,72 @@ const (
usageThreshold = 2
)

type GoroutineLeakDetectionParams struct {
mutex sync.Mutex
threshold int
checkInterval time.Duration
checkStatsFor time.Duration
keepStatsFor time.Duration
cooldownPeriod time.Duration
}

func validateGoroutineLeakDetectionParams(threshold int, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod time.Duration) bool {
if threshold < 1 {
log.Warnf("Invalid threshold: %d", threshold)
return false
}
if checkInterval < 0 {
log.Warnf("Invalid check interval: %v", checkInterval)
return false
}
if checkStatsFor < checkInterval*10 {
log.Warnf("Invalid check window: %v", checkStatsFor)
log.Warnf("Check window must be at least 10 times the check interval (%v)", checkInterval)
return false
}
if keepStatsFor < checkStatsFor {
log.Warnf("Invalid keep stats duration: %v", keepStatsFor)
log.Warnf("Keep stats duration must be greater than a check window (%v)", checkStatsFor)
return false
}
if cooldownPeriod < checkInterval {
log.Warnf("Invalid cooldown period: %v", cooldownPeriod)
log.Warnf("Cooldown period must be greater than a check interval (%v)", checkInterval)
return false
}
return true
}

func (gldp *GoroutineLeakDetectionParams) Set(threshold int, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod time.Duration) {
if !validateGoroutineLeakDetectionParams(threshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod) {
return
}
gldp.mutex.Lock()
gldp.threshold = threshold
gldp.checkInterval = checkInterval
gldp.checkStatsFor = checkStatsFor
gldp.keepStatsFor = keepStatsFor
gldp.cooldownPeriod = cooldownPeriod
gldp.mutex.Unlock()
}

// Get returns the global goroutine leak detection parameters:
// threshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod
func (gldp *GoroutineLeakDetectionParams) Get() (int, time.Duration, time.Duration, time.Duration, time.Duration) {
var threshold int
var checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod time.Duration

gldp.mutex.Lock()
threshold = gldp.threshold
checkInterval = gldp.checkInterval
checkStatsFor = gldp.checkStatsFor
keepStatsFor = gldp.keepStatsFor
cooldownPeriod = gldp.cooldownPeriod
gldp.mutex.Unlock()

return threshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod
}

type watcherContext struct {
agentbase.AgentBase
ps *pubsub.PubSub
Expand All @@ -40,6 +106,9 @@ type watcherContext struct {

GCInitialized bool
// cli options

// Global goroutine leak detection parameters
GRLDParams GoroutineLeakDetectionParams
}

// AddAgentSpecificCLIFlags adds CLI options
Expand Down Expand Up @@ -84,6 +153,22 @@ func setForcedGOGCParams(ctx *watcherContext) {
gogcForcedLock.Unlock()
}

// Read the global goroutine leak detection parameters to the context
func readGlobalGoroutineLeakDetectionParams(ctx *watcherContext) {
gcp := agentlog.GetGlobalConfig(log, ctx.subGlobalConfig)
if gcp == nil {
return
}

threshold := int(gcp.GlobalValueInt(types.GoroutineLeakDetectionThreshold))
checkInterval := time.Duration(gcp.GlobalValueInt(types.GoroutineLeakDetectionCheckIntervalMinutes)) * time.Minute
checkStatsFor := time.Duration(gcp.GlobalValueInt(types.GoroutineLeakDetectionCheckWindowMinutes)) * time.Minute
keepStatsFor := time.Duration(gcp.GlobalValueInt(types.GoroutineLeakDetectionKeepStatsHours)) * time.Hour
cooldownPeriod := time.Duration(gcp.GlobalValueInt(types.GoroutineLeakDetectionCooldownMinutes)) * time.Minute

ctx.GRLDParams.Set(threshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod)
}

// Listens to root cgroup in hierarchy mode (events always propagate
// up to the root) and call Go garbage collector with reasonable
// interval when certain amount of memory has been allocated (presumably
Expand Down Expand Up @@ -249,12 +334,44 @@ func handlePotentialGoroutineLeak() {
agentlog.DumpAllStacks(log, agentName)
}

func goroutinesMonitor(goroutinesThreshold int, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod time.Duration) {
// goroutinesMonitor monitors the number of goroutines and detects potential goroutine leaks.
func goroutinesMonitor(ctx *watcherContext) {
// Get the initial goroutine leak detection parameters to create the stats slice
goroutinesThreshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod := ctx.GRLDParams.Get()
entriesToKeep := int(keepStatsFor / checkInterval)
entriesToCheck := int(checkStatsFor / checkInterval)
stats := make([]int, 0, entriesToKeep+1)
var lastLeakHandled time.Time
for {
goroutinesThreshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod = ctx.GRLDParams.Get()
// Check if we have to resize the stats slice
newEntriesToKeep := int(keepStatsFor / checkInterval)
if newEntriesToKeep != entriesToKeep {
if newEntriesToKeep > entriesToKeep {
// **Increasing Size**
if cap(stats) >= newEntriesToKeep+1 {
// Capacity is sufficient; update entriesToKeep
log.Functionf("Capacity sufficient; update entriesToKeep: %d", newEntriesToKeep)
entriesToKeep = newEntriesToKeep
} else {
// Capacity insufficient; create a new slice
log.Functionf("Capacity insufficient; create a new slice: %d", newEntriesToKeep+1)
newStats := make([]int, len(stats), newEntriesToKeep+1)
copy(newStats, stats)
stats = newStats
entriesToKeep = newEntriesToKeep
}
} else {
// **Decreasing Size**
if len(stats) > newEntriesToKeep {
// Remove oldest entries
log.Functionf("Remove oldest entries: %d", len(stats)-newEntriesToKeep)
stats = stats[len(stats)-newEntriesToKeep:]
}
log.Functionf("Update entriesToKeep: %d", newEntriesToKeep)
entriesToKeep = newEntriesToKeep
}
}
entriesToCheck := int(checkStatsFor / checkInterval)
time.Sleep(checkInterval)
numGoroutines := runtime.NumGoroutine()
// First check for the threshold
Expand Down Expand Up @@ -411,27 +528,17 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar
// Handle memory pressure events by calling GC explicitly
go handleMemoryPressureEvents()

// Detect goroutine leaks
const (
// Threshold for the number of goroutines
// When reached, we assume there's a potential goroutine leak
goroutinesThreshold = 1000
// Check interval for the number of goroutines
checkInterval = 1 * time.Minute
// Check window. On each check, we analyze the stats for that period
checkWindow = 1 * time.Hour
// Keep statistic for that long
keepStatsFor = 24 * time.Hour
// Cooldown period for the leak detection
cooldownPeriod = 10 * time.Minute
)
go goroutinesMonitor(goroutinesThreshold, checkInterval, checkWindow, keepStatsFor, cooldownPeriod)
// Set the goroutine leak detection parameters to default values
ctx.GRLDParams.Set(5000, 1*time.Minute, 10*time.Minute, 24*time.Hour, 5*time.Minute)

go goroutinesMonitor(&ctx)

for {
select {
case change := <-subGlobalConfig.MsgChan():
subGlobalConfig.ProcessChange(change)
setForcedGOGCParams(&ctx)
readGlobalGoroutineLeakDetectionParams(&ctx)
case change := <-subHostMemory.MsgChan():
subHostMemory.ProcessChange(change)
case change := <-subDiskMetric.MsgChan():
Expand Down
16 changes: 10 additions & 6 deletions pkg/pillar/cmd/watcher/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,11 @@ func TestGoroutinesMonitorNoLeak(t *testing.T) {
r, w, _ := os.Pipe()
logger.Out = w

go func() {
goroutinesMonitor(goroutinesThreshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod)
}()
// Create context with default parameters
ctx := &watcherContext{}
ctx.GRLDParams.Set(goroutinesThreshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod)

go goroutinesMonitor(ctx)

var wg sync.WaitGroup

Expand Down Expand Up @@ -298,9 +300,11 @@ func TestGoroutinesMonitorLeak(t *testing.T) {
r, w, _ := os.Pipe()
logger.Out = w

go func() {
goroutinesMonitor(goroutinesThreshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod)
}()
// Create context with default parameters
ctx := &watcherContext{}
ctx.GRLDParams.Set(goroutinesThreshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod)

go goroutinesMonitor(ctx)

var wg sync.WaitGroup

Expand Down
23 changes: 23 additions & 0 deletions pkg/pillar/types/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,22 @@ const (
// WwanQueryVisibleProviders : periodically query visible cellular service providers
WwanQueryVisibleProviders GlobalSettingKey = "wwan.query.visible.providers"

// GoroutineLeakDetectionThreshold amount of goroutines, reaching which will trigger leak detection
// regardless of growth rate.
GoroutineLeakDetectionThreshold GlobalSettingKey = "goroutine.leak.detection.threshold"
// GoroutineLeakDetectionCheckIntervalMinutes interval in minutes between the measurements of the
// goroutine count.
GoroutineLeakDetectionCheckIntervalMinutes GlobalSettingKey = "goroutine.leak.detection.check.interval.minutes"
// GoroutineLeakDetectionCheckWindowMinutes interval in minutes for which the leak analysis is performed.
// It should contain at least 10 measurements, so no less than 10 * GoroutineLeakDetectionCheckIntervalMinutes.
GoroutineLeakDetectionCheckWindowMinutes GlobalSettingKey = "goroutine.leak.detection.check.window.minutes"
// GoroutineLeakDetectionKeepStatsHours amount of hours to keep the stats for the leak detection. We keep more
// stats than the check window to be able to react to settings a bigger check window via configuration.
GoroutineLeakDetectionKeepStatsHours GlobalSettingKey = "goroutine.leak.detection.keep.stats.hours"
// GoroutineLeakDetectionCooldownMinutes cooldown period in minutes after the leak detection is triggered. During
// this period no stack traces are collected, only warning messages are logged.
GoroutineLeakDetectionCooldownMinutes GlobalSettingKey = "goroutine.leak.detection.cooldown.minutes"

// TriState Items
// NetworkFallbackAnyEth global setting key
NetworkFallbackAnyEth GlobalSettingKey = "network.fallback.any.eth"
Expand Down Expand Up @@ -933,6 +949,13 @@ func NewConfigItemSpecMap() ConfigItemSpecMap {
configItemSpecMap.AddIntItem(LogRemainToSendMBytes, 2048, 10, 0xFFFFFFFF)
configItemSpecMap.AddIntItem(DownloadMaxPortCost, 0, 0, 255)

// Goroutine Leak Detection section
configItemSpecMap.AddIntItem(GoroutineLeakDetectionThreshold, 5000, 1, 0xFFFFFFFF)
configItemSpecMap.AddIntItem(GoroutineLeakDetectionCheckIntervalMinutes, 1, 1, 0xFFFFFFFF)
configItemSpecMap.AddIntItem(GoroutineLeakDetectionCheckWindowMinutes, 10, 10, 0xFFFFFFFF)
configItemSpecMap.AddIntItem(GoroutineLeakDetectionKeepStatsHours, 24, 1, 0xFFFFFFFF)
configItemSpecMap.AddIntItem(GoroutineLeakDetectionCooldownMinutes, 5, 1, 0xFFFFFFFF)

// Add Bool Items
configItemSpecMap.AddBoolItem(UsbAccess, true) // Controller likely default to false
configItemSpecMap.AddBoolItem(VgaAccess, true) // Controller likely default to false
Expand Down
5 changes: 5 additions & 0 deletions pkg/pillar/types/global_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,11 @@ func TestNewConfigItemSpecMap(t *testing.T) {
NetDumpTopicPostOnboardInterval,
NetDumpDownloaderPCAP,
NetDumpDownloaderHTTPWithFieldValue,
GoroutineLeakDetectionThreshold,
GoroutineLeakDetectionCheckIntervalMinutes,
GoroutineLeakDetectionCheckWindowMinutes,
GoroutineLeakDetectionKeepStatsHours,
GoroutineLeakDetectionCooldownMinutes,
}
if len(specMap.GlobalSettings) != len(gsKeys) {
t.Errorf("GlobalSettings has more (%d) than expected keys (%d)",
Expand Down

0 comments on commit 1721635

Please sign in to comment.