From f9cd1d6b2b447dcbb3d40239df80a82c9dab0383 Mon Sep 17 00:00:00 2001 From: Nikolay Martyanov Date: Fri, 1 Nov 2024 18:15:47 +0100 Subject: [PATCH] [WIP] pillar/watcher: Add goroutine leak detection. Introduce goroutine leak detection functionality that monitors the number of goroutines over a 24-hour period to identify potential leaks. The detection algorithm employs a moving average smoothing technique and linear regression to analyze trends. If an upward trend is statistically significant, it flags the anomaly. This approach helps identify subtle issues in goroutine management, leveraging a t-statistic for confidence testing. Signed-off-by: Nikolay Martyanov --- pkg/pillar/cmd/watcher/watcher.go | 99 +++++++++++++++++++++++++++++++ 1 file changed, 99 insertions(+) diff --git a/pkg/pillar/cmd/watcher/watcher.go b/pkg/pillar/cmd/watcher/watcher.go index c1b5a64906..7f9d3578ca 100644 --- a/pkg/pillar/cmd/watcher/watcher.go +++ b/pkg/pillar/cmd/watcher/watcher.go @@ -5,6 +5,7 @@ package watcher import ( "flag" + "math" "os" "runtime" "sync" @@ -156,6 +157,101 @@ func handleMemoryPressureEvents() { } } +// Function to calculate moving average +func movingAverage(data []int, windowSize int) []float64 { + var ma []float64 + for i := 0; i <= len(data)-windowSize; i++ { + sum := 0.0 + for j := i; j < i+windowSize; j++ { + sum += float64(data[j]) + } + ma = append(ma, sum/float64(windowSize)) + } + return ma +} + +// Function to perform linear regression +func linearRegression(x, y []float64) (slope, intercept, rValue float64) { + n := float64(len(x)) + var sumX, sumY, sumXY, sumX2, sumY2 float64 + for i := 0; i < len(x); i++ { + sumX += x[i] + sumY += y[i] + sumXY += x[i] * y[i] + sumX2 += x[i] * x[i] + sumY2 += y[i] * y[i] + } + slope = (n*sumXY - sumX*sumY) / (n*sumX2 - sumX*sumX) + intercept = (sumY - slope*sumX) / n + rNumerator := n*sumXY - sumX*sumY + rDenominator := math.Sqrt((n*sumX2 - sumX*sumX) * (n*sumY2 - sumY*sumY)) + rValue = rNumerator / rDenominator + return +} + +// Function to calculate t-statistic for the slope +func tStatistic(slope, stdErr float64, df int) float64 { + return slope / stdErr +} + +// Detects goroutine leaks by analyzing the number of goroutines created +// in the last 24 hours. It uses linear regression to detect any significant +// upward trend in the number of goroutines. +func detectGoroutineLeaks(stats []int) bool { + // Step 1: Smooth the data + windowSize := 20 // Choose appropriate window size (let it be 20 minutes) + smoothedData := movingAverage(stats, windowSize) + + // Step 2: Prepare data for regression + x := make([]float64, len(smoothedData)) + for i := range x { + x[i] = float64(i) + } + y := smoothedData + + // Step 3: Perform linear regression + slope, _, rValue := linearRegression(x, y) + + // Step 4: Statistical significance testing + n := float64(len(x)) + degreesOfFreedom := int(n - 2) + stdErr := math.Sqrt((1 - rValue*rValue) / (n - 2)) + tStat := tStatistic(slope, stdErr, degreesOfFreedom) + + // Critical t-value for 95% confidence + // For large n, t-critical ~1.96 + tCritical := 1.96 + if math.Abs(tStat) > tCritical { + log.Warnf("Significant upward trend detected in goroutine count.") + return true + } + return false +} + +func handleGoroutineLeaks() { + // Create a map to keep track of the number of goroutines at a given time + // Limit it to 100 entries + const maxEntries = 24 * 60 // 24 hours + // Create a slice to keep track of the last 100 entries + stats := make([]int, 0, maxEntries) + for { + time.Sleep(1 * time.Minute) + numGoroutines := runtime.NumGoroutine() + stats = append(stats, numGoroutines) + if len(stats) > maxEntries { + stats = stats[1:] + } + // Check if there are any leaks + if len(stats) > 60 && detectGoroutineLeaks(stats) { + // Count the number of goroutines that were created in the last 1 hour + numGoroutinesHourAgo := stats[len(stats)-60] + leakCount := numGoroutines - numGoroutinesHourAgo + log.Warnf("Goroutine leak detected. Number of goroutines created in the last 1 hour: %d", leakCount) + // TODO, call function similar to dumpStacks() in agentlog.go + } + } +} + // Run : func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, arguments []string, baseDir string) int { logger = loggerArg @@ -268,6 +364,9 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar // Handle memory pressure events by calling GC explicitly go handleMemoryPressureEvents() + // Detect goroutine leaks + go handleGoroutineLeaks() + for { select { case change := <-subGlobalConfig.MsgChan():