diff --git a/pkg/pillar/cmd/watcher/watcher.go b/pkg/pillar/cmd/watcher/watcher.go index e2d47c41d1..cd1a946b48 100644 --- a/pkg/pillar/cmd/watcher/watcher.go +++ b/pkg/pillar/cmd/watcher/watcher.go @@ -4,6 +4,7 @@ package watcher import ( + "context" "flag" "math" "os" @@ -36,6 +37,9 @@ type GoroutineLeakDetectionParams struct { checkStatsFor time.Duration keepStatsFor time.Duration cooldownPeriod time.Duration + // Context to make the monitoring goroutine cancellable + context context.Context + stop context.CancelFunc } func validateGoroutineLeakDetectionParams(threshold int, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod time.Duration) bool { @@ -79,6 +83,34 @@ func (gldp *GoroutineLeakDetectionParams) Set(threshold int, checkInterval, chec gldp.mutex.Unlock() } +// MakeStoppable creates a cancellable context and a stop function +func (gldp *GoroutineLeakDetectionParams) MakeStoppable() { + gldp.context, gldp.stop = context.WithCancel(context.Background()) +} + +func (gldp *GoroutineLeakDetectionParams) isStoppable() bool { + return gldp.context != nil +} + +func (gldp *GoroutineLeakDetectionParams) checkStopCondition() bool { + if gldp.context != nil { + select { + case <-gldp.context.Done(): + return true + default: + return false + } + } + return false +} + +// Stop cancels the context to stop the monitoring goroutine +func (gldp *GoroutineLeakDetectionParams) Stop() { + if gldp.stop != nil { + gldp.stop() + } +} + // Get atomically gets the global goroutine leak detection parameters func (gldp *GoroutineLeakDetectionParams) Get() (int, time.Duration, time.Duration, time.Duration, time.Duration) { var threshold int @@ -337,12 +369,18 @@ func handlePotentialGoroutineLeak() { // goroutinesMonitor monitors the number of goroutines and detects potential goroutine leaks. func goroutinesMonitor(ctx *watcherContext) { + log.Functionf("Starting goroutines monitor (stoppable: %v)", ctx.GRLDParams.isStoppable()) // Get the initial goroutine leak detection parameters to create the stats slice goroutinesThreshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod := ctx.GRLDParams.Get() entriesToKeep := int(keepStatsFor / checkInterval) stats := make([]int, 0, entriesToKeep+1) var lastLeakHandled time.Time for { + // Check if we have to stop + if ctx.GRLDParams.checkStopCondition() { + log.Functionf("Stopping goroutines monitor") + return + } // Check if we have to resize the stats slice goroutinesThreshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod = ctx.GRLDParams.Get() newEntriesToKeep := int(keepStatsFor / checkInterval) diff --git a/pkg/pillar/cmd/watcher/watcher_test.go b/pkg/pillar/cmd/watcher/watcher_test.go index 38fb3be272..28c6970112 100644 --- a/pkg/pillar/cmd/watcher/watcher_test.go +++ b/pkg/pillar/cmd/watcher/watcher_test.go @@ -249,8 +249,10 @@ func TestGoroutinesMonitorNoLeak(t *testing.T) { // Create context with default parameters ctx := &watcherContext{} ctx.GRLDParams.Set(goroutinesThreshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod) + ctx.GRLDParams.MakeStoppable() go goroutinesMonitor(ctx) + defer ctx.GRLDParams.Stop() timeStart := time.Now() for { @@ -294,8 +296,10 @@ func TestGoroutinesMonitorLeak(t *testing.T) { // Create context with default parameters ctx := &watcherContext{} ctx.GRLDParams.Set(goroutinesThreshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod) + ctx.GRLDParams.MakeStoppable() go goroutinesMonitor(ctx) + defer ctx.GRLDParams.Stop() timeStart := time.Now() for { @@ -346,8 +350,10 @@ func TestGoroutinesMonitorUpdateParamsKeepStatsDecrease(t *testing.T) { // Set the parameters ctx.GRLDParams.Set(goroutinesThreshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod) + ctx.GRLDParams.MakeStoppable() go goroutinesMonitor(ctx) + defer ctx.GRLDParams.Stop() // Wait until we fill the stats slice time.Sleep(2 * keepStatsFor) @@ -409,8 +415,10 @@ func TestGoroutinesMonitorUpdateParamsKeepStatsIncrease(t *testing.T) { // Set the parameters ctx.GRLDParams.Set(goroutinesThreshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod) + ctx.GRLDParams.MakeStoppable() go goroutinesMonitor(ctx) + defer ctx.GRLDParams.Stop() // Wait until we fill the stats slice time.Sleep(2 * keepStatsFor) @@ -441,3 +449,94 @@ func TestGoroutinesMonitorUpdateParamsKeepStatsIncrease(t *testing.T) { } } } + +func TestGoroutineMonitorStops(t *testing.T) { + keepStatsFor := 24 * 60 * time.Millisecond + goroutinesThreshold := 100 + checkInterval := 1 * time.Millisecond + checkStatsFor := 10 * time.Millisecond + cooldownPeriod := 5 * time.Millisecond + + backupOut := logger.Out + backupLevel := logger.Level + // Create a pipe to capture log output + r, w, _ := os.Pipe() + logger.SetOutput(w) + logger.SetLevel(logrus.TraceLevel) + defer func() { + logger.SetOutput(backupOut) + logger.SetLevel(backupLevel) + }() + + // Create context with default parameters + ctx := &watcherContext{} + ctx.GRLDParams.Set(goroutinesThreshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod) + ctx.GRLDParams.MakeStoppable() + + go goroutinesMonitor(ctx) + + // Let the monitor run for a while + time.Sleep(keepStatsFor * 2) + + ctx.GRLDParams.Stop() + + // Wait for several check intervals to allow the monitor to stop + time.Sleep(checkInterval * 100) + + // Close the pipe + _ = w.Close() + + // Read the log output + output, _ := io.ReadAll(r) + + msgStart := "Starting goroutines monitor (stoppable: true)" + msgStop := "Stopping goroutines monitor" + expectedMsgs := []string{msgStart, msgStop} + for _, expectedMsg := range expectedMsgs { + if !strings.Contains(string(output), expectedMsg) { + t.Errorf("Expected log output to contain '%s'", expectedMsg) + } + } +} + +func TestGoroutineMonitorRunsFineUnstoppable(t *testing.T) { + keepStatsFor := 24 * 60 * time.Millisecond + goroutinesThreshold := 100 + checkInterval := 1 * time.Millisecond + checkStatsFor := 10 * time.Millisecond + cooldownPeriod := 5 * time.Millisecond + + backupOut := logger.Out + backupLevel := logger.Level + // Create a pipe to capture log output + r, w, _ := os.Pipe() + logger.SetOutput(w) + logger.SetLevel(logrus.TraceLevel) + defer func() { + logger.SetOutput(backupOut) + logger.SetLevel(backupLevel) + }() + + // Create context with default parameters + ctx := &watcherContext{} + ctx.GRLDParams.Set(goroutinesThreshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod) + + go goroutinesMonitor(ctx) + + time.Sleep(keepStatsFor * 2) + + // Close the pipe + _ = w.Close() + + // Read the log output + output, _ := io.ReadAll(r) + + msgStart := "Starting goroutines monitor (stoppable: false)" + expectedMsgs := []string{msgStart} + for _, expectedMsg := range expectedMsgs { + if !strings.Contains(string(output), expectedMsg) { + t.Errorf("Expected log output to contain '%s'", expectedMsg) + } + } + +}