diff --git a/pkg/pillar/cmd/watcher/watcher.go b/pkg/pillar/cmd/watcher/watcher.go index c1b5a64906..7f9d3578ca 100644 --- a/pkg/pillar/cmd/watcher/watcher.go +++ b/pkg/pillar/cmd/watcher/watcher.go @@ -5,6 +5,7 @@ package watcher import ( "flag" + "math" "os" "runtime" "sync" @@ -156,6 +157,101 @@ func handleMemoryPressureEvents() { } } +// Function to calculate moving average +func movingAverage(data []int, windowSize int) []float64 { + var ma []float64 + for i := 0; i <= len(data)-windowSize; i++ { + sum := 0.0 + for j := i; j < i+windowSize; j++ { + sum += float64(data[j]) + } + ma = append(ma, sum/float64(windowSize)) + } + return ma +} + +// Function to perform linear regression +func linearRegression(x, y []float64) (slope, intercept, rValue float64) { + n := float64(len(x)) + var sumX, sumY, sumXY, sumX2, sumY2 float64 + for i := 0; i < len(x); i++ { + sumX += x[i] + sumY += y[i] + sumXY += x[i] * y[i] + sumX2 += x[i] * x[i] + sumY2 += y[i] * y[i] + } + slope = (n*sumXY - sumX*sumY) / (n*sumX2 - sumX*sumX) + intercept = (sumY - slope*sumX) / n + rNumerator := n*sumXY - sumX*sumY + rDenominator := math.Sqrt((n*sumX2 - sumX*sumX) * (n*sumY2 - sumY*sumY)) + rValue = rNumerator / rDenominator + return +} + +// Function to calculate t-statistic for the slope +func tStatistic(slope, stdErr float64, df int) float64 { + return slope / stdErr +} + +// Detects goroutine leaks by analyzing the number of goroutines created +// in the last 24 hours. It uses linear regression to detect any significant +// upward trend in the number of goroutines. +func detectGoroutineLeaks(stats []int) bool { + // Step 1: Smooth the data + windowSize := 20 // Choose appropriate window size (let it be 20 minutes) + smoothedData := movingAverage(stats, windowSize) + + // Step 2: Prepare data for regression + x := make([]float64, len(smoothedData)) + for i := range x { + x[i] = float64(i) + } + y := smoothedData + + // Step 3: Perform linear regression + slope, _, rValue := linearRegression(x, y) + + // Step 4: Statistical significance testing + n := float64(len(x)) + degreesOfFreedom := int(n - 2) + stdErr := math.Sqrt((1 - rValue*rValue) / (n - 2)) + tStat := tStatistic(slope, stdErr, degreesOfFreedom) + + // Critical t-value for 95% confidence + // For large n, t-critical ~1.96 + tCritical := 1.96 + if math.Abs(tStat) > tCritical { + log.Warnf("Significant upward trend detected in goroutine count.") + return true + } + return false +} + +func handleGoroutineLeaks() { + // Create a map to keep track of the number of goroutines at a given time + // Limit it to 100 entries + const maxEntries = 24 * 60 // 24 hours + // Create a slice to keep track of the last 100 entries + stats := make([]int, 0, maxEntries) + for { + time.Sleep(1 * time.Minute) + numGoroutines := runtime.NumGoroutine() + stats = append(stats, numGoroutines) + if len(stats) > maxEntries { + stats = stats[1:] + } + // Check if there are any leaks + if len(stats) > 60 && detectGoroutineLeaks(stats) { + // Count the number of goroutines that were created in the last 1 hour + numGoroutinesHourAgo := stats[len(stats)-60] + leakCount := numGoroutines - numGoroutinesHourAgo + log.Warnf("Goroutine leak detected. Number of goroutines created in the last 1 hour: %d", leakCount) + // TODO, call function similar to dumpStacks() in agentlog.go + } + } +} + // Run : func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, arguments []string, baseDir string) int { logger = loggerArg @@ -268,6 +364,9 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar // Handle memory pressure events by calling GC explicitly go handleMemoryPressureEvents() + // Detect goroutine leaks + go handleGoroutineLeaks() + for { select { case change := <-subGlobalConfig.MsgChan():