Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Goroutine leak detection with unit tests #4416

Merged
5 changes: 5 additions & 0 deletions docs/CONFIG-PROPERTIES.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@
| network.switch.enable.arpsnoop | boolean | true | enable ARP Snooping on switch Network Instances |
| wwan.query.visible.providers | bool | false | enable to periodically (once per hour) query the set of visible cellular service providers and publish them under WirelessStatus (for every modem) |
| network.local.legacy.mac.address | bool | false | enables legacy MAC address generation for local network instances for those EVE nodes where changing MAC addresses in applications will lead to incorrect network configuration |
| goroutine.leak.detection.threshold | integer | 5000 | Amount of goroutines, reaching which will trigger leak detection regardless of growth rate. |
| goroutine.leak.detection.check.interval.minutes | integer (minutes) | 1 | Interval in minutes between the measurements of the goroutine count. |
| goroutine.leak.detection.check.window.minutes | integer (minutes) | 10 | Interval in minutes for which the leak analysis is performed. It should contain at least 10 measurements, so no less than 10 × goroutine.leak.detection.check.interval.minutes. |
| goroutine.leak.detection.keep.stats.hours | integer (hours) | 24 | Amount of hours to keep the stats for leak detection. We keep more stats than the check window to be able to react to settings with a bigger check window via configuration. |
| goroutine.leak.detection.cooldown.minutes | integer (minutes) | 5 | Cooldown period in minutes after the leak detection is triggered. During this period, no stack traces are collected; only warning messages are logged. |

In addition, there can be per-agent settings.
OhmSpectator marked this conversation as resolved.
Show resolved Hide resolved
The Per-agent settings begin with "agent.*agentname*.*setting*"
Expand Down
3 changes: 3 additions & 0 deletions pkg/pillar/agentlog/agentlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,9 @@ func dumpStacks(log *base.LogObject, fileName string) {
// DumpAllStacks writes to file but does not log
func DumpAllStacks(log *base.LogObject, agentName string) {
agentDebugDir := fmt.Sprintf("%s/%s/", types.PersistDebugDir, agentName)
// Create the directory if it does not exist
_ = os.MkdirAll(agentDebugDir, 0755)

sigUsr1FileName := agentDebugDir + "/sigusr1"

stacks := getStacks(true)
Expand Down
286 changes: 286 additions & 0 deletions pkg/pillar/cmd/watcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
package watcher

import (
"context"
"flag"
"math"
"os"
"runtime"
"sync"
Expand All @@ -27,6 +29,104 @@ const (
usageThreshold = 2
)

// GoroutineLeakDetectionParams holds the global goroutine leak detection parameters
type GoroutineLeakDetectionParams struct {
mutex sync.Mutex
threshold int
checkInterval time.Duration
checkStatsFor time.Duration
keepStatsFor time.Duration
cooldownPeriod time.Duration
// Context to make the monitoring goroutine cancellable
context context.Context
stop context.CancelFunc
}

func validateGoroutineLeakDetectionParams(threshold int, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod time.Duration) bool {
if threshold < 1 {
log.Warnf("Invalid threshold: %d", threshold)
return false
}
if checkInterval < 0 {
log.Warnf("Invalid check interval: %v", checkInterval)
return false
}
if checkStatsFor < checkInterval*10 {
log.Warnf("Invalid check window: %v", checkStatsFor)
log.Warnf("Check window must be at least 10 times the check interval (%v)", checkInterval)
return false
}
if keepStatsFor < checkStatsFor {
log.Warnf("Invalid keep stats duration: %v", keepStatsFor)
log.Warnf("Keep stats duration must be greater than a check window (%v)", checkStatsFor)
return false
}
if cooldownPeriod < checkInterval {
log.Warnf("Invalid cooldown period: %v", cooldownPeriod)
log.Warnf("Cooldown period must be greater than a check interval (%v)", checkInterval)
return false
}
return true
}

// Set atomically sets the global goroutine leak detection parameters
func (gldp *GoroutineLeakDetectionParams) Set(threshold int, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod time.Duration) {
if !validateGoroutineLeakDetectionParams(threshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod) {
return
}
gldp.mutex.Lock()
gldp.threshold = threshold
gldp.checkInterval = checkInterval
gldp.checkStatsFor = checkStatsFor
gldp.keepStatsFor = keepStatsFor
gldp.cooldownPeriod = cooldownPeriod
gldp.mutex.Unlock()
}

// MakeStoppable creates a cancellable context and a stop function
func (gldp *GoroutineLeakDetectionParams) MakeStoppable() {
gldp.context, gldp.stop = context.WithCancel(context.Background())
}

func (gldp *GoroutineLeakDetectionParams) isStoppable() bool {
return gldp.context != nil
}

func (gldp *GoroutineLeakDetectionParams) checkStopCondition() bool {
if gldp.context != nil {
select {
case <-gldp.context.Done():
return true
default:
return false
}
}
return false
}

// Stop cancels the context to stop the monitoring goroutine
func (gldp *GoroutineLeakDetectionParams) Stop() {
if gldp.stop != nil {
gldp.stop()
}
}

// Get atomically gets the global goroutine leak detection parameters
func (gldp *GoroutineLeakDetectionParams) Get() (int, time.Duration, time.Duration, time.Duration, time.Duration) {
var threshold int
var checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod time.Duration

gldp.mutex.Lock()
threshold = gldp.threshold
checkInterval = gldp.checkInterval
checkStatsFor = gldp.checkStatsFor
keepStatsFor = gldp.keepStatsFor
cooldownPeriod = gldp.cooldownPeriod
gldp.mutex.Unlock()

return threshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod
}

type watcherContext struct {
agentbase.AgentBase
ps *pubsub.PubSub
Expand All @@ -39,6 +139,9 @@ type watcherContext struct {

GCInitialized bool
// cli options

// Global goroutine leak detection parameters
GRLDParams GoroutineLeakDetectionParams
}

// AddAgentSpecificCLIFlags adds CLI options
Expand Down Expand Up @@ -83,6 +186,22 @@ func setForcedGOGCParams(ctx *watcherContext) {
gogcForcedLock.Unlock()
}

// Read the global goroutine leak detection parameters to the context
func readGlobalGoroutineLeakDetectionParams(ctx *watcherContext) {
gcp := agentlog.GetGlobalConfig(log, ctx.subGlobalConfig)
if gcp == nil {
return
}

threshold := int(gcp.GlobalValueInt(types.GoroutineLeakDetectionThreshold))
checkInterval := time.Duration(gcp.GlobalValueInt(types.GoroutineLeakDetectionCheckIntervalMinutes)) * time.Minute
checkStatsFor := time.Duration(gcp.GlobalValueInt(types.GoroutineLeakDetectionCheckWindowMinutes)) * time.Minute
keepStatsFor := time.Duration(gcp.GlobalValueInt(types.GoroutineLeakDetectionKeepStatsHours)) * time.Hour
cooldownPeriod := time.Duration(gcp.GlobalValueInt(types.GoroutineLeakDetectionCooldownMinutes)) * time.Minute

ctx.GRLDParams.Set(threshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod)
}

// Listens to root cgroup in hierarchy mode (events always propagate
// up to the root) and call Go garbage collector with reasonable
// interval when certain amount of memory has been allocated (presumably
Expand Down Expand Up @@ -156,6 +275,169 @@ func handleMemoryPressureEvents() {
}
}

func movingAverage(data []int, windowSize int) []float64 {
// Validates the window size
if windowSize <= 0 {
windowSize = 1 // Do not smooth the data
}

if windowSize > len(data) {
windowSize = len(data)
}

if len(data) == 0 {
return nil
}

smoothed := make([]float64, len(data)-windowSize+1)
var windowSum int

// Calculates the sum of the first window
for i := 0; i < windowSize; i++ {
windowSum += data[i]
}
smoothed[0] = float64(windowSum) / float64(windowSize)

// Slides the window through the data
for i := 1; i < len(smoothed); i++ {
windowSum = windowSum - data[i-1] + data[i+windowSize-1]
smoothed[i] = float64(windowSum) / float64(windowSize)
}

return smoothed
}

// calculateMeanStdDev calculates the mean and standard deviation of a slice of float64 numbers.
func calculateMeanStdDev(data []float64) (mean, stdDev float64) {
n := float64(len(data))
var sum, sumSq float64
for _, value := range data {
sum += value
sumSq += value * value
}
mean = sum / n
variance := (sumSq / n) - (mean * mean)
stdDev = math.Sqrt(variance)
return
}

// detectGoroutineLeaks detects if there's a potential goroutine leak over time.
// Returns true if a leak is detected, false otherwise.
func detectGoroutineLeaks(stats []int) (bool, []float64) {

if len(stats) < 10 {
// Not enough data to determine trend
return false, nil
}

// The window size for the moving average
windowSize := len(stats) / 10

// Step 1: Smooth the data
smoothedData := movingAverage(stats, windowSize)

if len(smoothedData) < 2 {
// Not enough data to determine trend
return false, smoothedData
}

// Step 2: Calculate the rate of change
rateOfChange := make([]float64, len(smoothedData)-1)
for i := 1; i < len(smoothedData); i++ {
rateOfChange[i-1] = smoothedData[i] - smoothedData[i-1]
}

// Step 3: Calculate mean and standard deviation of the rate of change
mean, stdDev := calculateMeanStdDev(rateOfChange)

// Step 4: Determine the dynamic threshold
threshold := 0.0 + stdDev

// Step 5: Check if the latest rate of change exceeds the threshold
latestChange := rateOfChange[len(rateOfChange)-1]
if mean > threshold && latestChange > threshold {
log.Warnf("Potential goroutine leak detected: latest increase of %.2f exceeds dynamic threshold of %.2f.", latestChange, threshold)
return true, smoothedData
}
return false, smoothedData
}

func handlePotentialGoroutineLeak() {
// Dump the stack traces of all goroutines
agentlog.DumpAllStacks(log, agentName)
}

// goroutinesMonitor monitors the number of goroutines and detects potential goroutine leaks.
func goroutinesMonitor(ctx *watcherContext) {
log.Functionf("Starting goroutines monitor (stoppable: %v)", ctx.GRLDParams.isStoppable())
// Get the initial goroutine leak detection parameters to create the stats slice
goroutinesThreshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod := ctx.GRLDParams.Get()
entriesToKeep := int(keepStatsFor / checkInterval)
stats := make([]int, 0, entriesToKeep+1)
var lastLeakHandled time.Time
for {
// Check if we have to stop
if ctx.GRLDParams.checkStopCondition() {
log.Functionf("Stopping goroutines monitor")
return
}
// Check if we have to resize the stats slice
goroutinesThreshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod = ctx.GRLDParams.Get()
newEntriesToKeep := int(keepStatsFor / checkInterval)
if newEntriesToKeep != entriesToKeep {
OhmSpectator marked this conversation as resolved.
Show resolved Hide resolved
entriesToKeep = newEntriesToKeep
log.Functionf("Resizing stats slice to %d", entriesToKeep)
if len(stats) > entriesToKeep {
log.Functionf("Removing %d oldest entries", len(stats)-entriesToKeep)
stats = stats[len(stats)-entriesToKeep:]
}
}
entriesToCheck := int(checkStatsFor / checkInterval)
// Wait for the next check interval
time.Sleep(checkInterval)
numGoroutines := runtime.NumGoroutine()
// First check for the threshold
if numGoroutines > goroutinesThreshold {
log.Warnf("Number of goroutines exceeds threshold: %d", numGoroutines)
if time.Since(lastLeakHandled) < cooldownPeriod {
// Skip if we've handled a leak recently
log.Warnf("Skipping stacks dumping due to cooldown period")
continue
}
handlePotentialGoroutineLeak()
lastLeakHandled = time.Now()
continue
}
stats = append(stats, numGoroutines)
// Keep the stats for the last keepStatsFor duration
if len(stats) > entriesToKeep {
stats = stats[1:]
}

// If we have enough data, detect goroutine leaks
if len(stats) > entriesToCheck {
// Analyze the data for the last check window
entriesInLastCheckWindow := stats[len(stats)-entriesToCheck:]
leakDetected, _ := detectGoroutineLeaks(entriesInLastCheckWindow)
if leakDetected {
// Count the number of goroutines that were created in the last check window
numGoroutinesCheckWindowAgo := stats[len(stats)-entriesToCheck]
OhmSpectator marked this conversation as resolved.
Show resolved Hide resolved
leakCount := numGoroutines - numGoroutinesCheckWindowAgo
minutesInCheckWindow := int(checkStatsFor.Minutes())
log.Warnf("Potential goroutine leak! Created in the last %d minutes: %d, total: %d",
minutesInCheckWindow, leakCount, numGoroutines)
if time.Since(lastLeakHandled) < cooldownPeriod {
// Skip detailed handling if we've handled a leak recently
log.Warnf("Skipping stacks dumping due to cooldown period")
continue
}
handlePotentialGoroutineLeak()
lastLeakHandled = time.Now()
}
}
}
}

// Run :
func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, arguments []string, baseDir string) int {
logger = loggerArg
Expand Down Expand Up @@ -268,6 +550,8 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar
// Handle memory pressure events by calling GC explicitly
go handleMemoryPressureEvents()

go goroutinesMonitor(&ctx)

for {
select {
case change := <-subGlobalConfig.MsgChan():
Expand Down Expand Up @@ -487,6 +771,8 @@ func handleGlobalConfigImpl(ctxArg interface{}, key string,
if gcp != nil {
ctx.GCInitialized = true
}
// Update the global goroutine leak detection parameters
readGlobalGoroutineLeakDetectionParams(ctx)
log.Functionf("handleGlobalConfigImpl done for %s", key)
}

Expand Down
Loading
Loading