From 9bf79133c62d32f78dc72934bf13dd7fdf188aeb Mon Sep 17 00:00:00 2001 From: Nikolay Martyanov Date: Fri, 8 Nov 2024 14:09:42 +0100 Subject: [PATCH] pillar/watcher: Make goroutinesMonitor stoppable. Enable the `goroutinesMonitor` to be stopped via a cancellable context within `GoroutineLeakDetectionParams`, allowing controlled termination of the monitoring goroutine. This change introduces a `MakeStoppable` method to set up a cancellable context and a `Stop` method to trigger the cancellation, allowing tests to end monitoring cleanly. Additionally, `checkStopCondition` was added to periodically check if the goroutine should stop. Updated tests to utilize this functionality, adding verification for proper start and stop messages in the log output, ensuring that the monitor operates correctly in both stoppable and unstoppable modes. Signed-off-by: Nikolay Martyanov --- pkg/pillar/cmd/watcher/watcher.go | 38 ++++++++++ pkg/pillar/cmd/watcher/watcher_test.go | 99 ++++++++++++++++++++++++++ 2 files changed, 137 insertions(+) diff --git a/pkg/pillar/cmd/watcher/watcher.go b/pkg/pillar/cmd/watcher/watcher.go index e2d47c41d1..cd1a946b48 100644 --- a/pkg/pillar/cmd/watcher/watcher.go +++ b/pkg/pillar/cmd/watcher/watcher.go @@ -4,6 +4,7 @@ package watcher import ( + "context" "flag" "math" "os" @@ -36,6 +37,9 @@ type GoroutineLeakDetectionParams struct { checkStatsFor time.Duration keepStatsFor time.Duration cooldownPeriod time.Duration + // Context to make the monitoring goroutine cancellable + context context.Context + stop context.CancelFunc } func validateGoroutineLeakDetectionParams(threshold int, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod time.Duration) bool { @@ -79,6 +83,34 @@ func (gldp *GoroutineLeakDetectionParams) Set(threshold int, checkInterval, chec gldp.mutex.Unlock() } +// MakeStoppable creates a cancellable context and a stop function +func (gldp *GoroutineLeakDetectionParams) MakeStoppable() { + gldp.context, gldp.stop = context.WithCancel(context.Background()) +} + +func (gldp *GoroutineLeakDetectionParams) isStoppable() bool { + return gldp.context != nil +} + +func (gldp *GoroutineLeakDetectionParams) checkStopCondition() bool { + if gldp.context != nil { + select { + case <-gldp.context.Done(): + return true + default: + return false + } + } + return false +} + +// Stop cancels the context to stop the monitoring goroutine +func (gldp *GoroutineLeakDetectionParams) Stop() { + if gldp.stop != nil { + gldp.stop() + } +} + // Get atomically gets the global goroutine leak detection parameters func (gldp *GoroutineLeakDetectionParams) Get() (int, time.Duration, time.Duration, time.Duration, time.Duration) { var threshold int @@ -337,12 +369,18 @@ func handlePotentialGoroutineLeak() { // goroutinesMonitor monitors the number of goroutines and detects potential goroutine leaks. func goroutinesMonitor(ctx *watcherContext) { + log.Functionf("Starting goroutines monitor (stoppable: %v)", ctx.GRLDParams.isStoppable()) // Get the initial goroutine leak detection parameters to create the stats slice goroutinesThreshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod := ctx.GRLDParams.Get() entriesToKeep := int(keepStatsFor / checkInterval) stats := make([]int, 0, entriesToKeep+1) var lastLeakHandled time.Time for { + // Check if we have to stop + if ctx.GRLDParams.checkStopCondition() { + log.Functionf("Stopping goroutines monitor") + return + } // Check if we have to resize the stats slice goroutinesThreshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod = ctx.GRLDParams.Get() newEntriesToKeep := int(keepStatsFor / checkInterval) diff --git a/pkg/pillar/cmd/watcher/watcher_test.go b/pkg/pillar/cmd/watcher/watcher_test.go index 38fb3be272..28c6970112 100644 --- a/pkg/pillar/cmd/watcher/watcher_test.go +++ b/pkg/pillar/cmd/watcher/watcher_test.go @@ -249,8 +249,10 @@ func TestGoroutinesMonitorNoLeak(t *testing.T) { // Create context with default parameters ctx := &watcherContext{} ctx.GRLDParams.Set(goroutinesThreshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod) + ctx.GRLDParams.MakeStoppable() go goroutinesMonitor(ctx) + defer ctx.GRLDParams.Stop() timeStart := time.Now() for { @@ -294,8 +296,10 @@ func TestGoroutinesMonitorLeak(t *testing.T) { // Create context with default parameters ctx := &watcherContext{} ctx.GRLDParams.Set(goroutinesThreshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod) + ctx.GRLDParams.MakeStoppable() go goroutinesMonitor(ctx) + defer ctx.GRLDParams.Stop() timeStart := time.Now() for { @@ -346,8 +350,10 @@ func TestGoroutinesMonitorUpdateParamsKeepStatsDecrease(t *testing.T) { // Set the parameters ctx.GRLDParams.Set(goroutinesThreshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod) + ctx.GRLDParams.MakeStoppable() go goroutinesMonitor(ctx) + defer ctx.GRLDParams.Stop() // Wait until we fill the stats slice time.Sleep(2 * keepStatsFor) @@ -409,8 +415,10 @@ func TestGoroutinesMonitorUpdateParamsKeepStatsIncrease(t *testing.T) { // Set the parameters ctx.GRLDParams.Set(goroutinesThreshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod) + ctx.GRLDParams.MakeStoppable() go goroutinesMonitor(ctx) + defer ctx.GRLDParams.Stop() // Wait until we fill the stats slice time.Sleep(2 * keepStatsFor) @@ -441,3 +449,94 @@ func TestGoroutinesMonitorUpdateParamsKeepStatsIncrease(t *testing.T) { } } } + +func TestGoroutineMonitorStops(t *testing.T) { + keepStatsFor := 24 * 60 * time.Millisecond + goroutinesThreshold := 100 + checkInterval := 1 * time.Millisecond + checkStatsFor := 10 * time.Millisecond + cooldownPeriod := 5 * time.Millisecond + + backupOut := logger.Out + backupLevel := logger.Level + // Create a pipe to capture log output + r, w, _ := os.Pipe() + logger.SetOutput(w) + logger.SetLevel(logrus.TraceLevel) + defer func() { + logger.SetOutput(backupOut) + logger.SetLevel(backupLevel) + }() + + // Create context with default parameters + ctx := &watcherContext{} + ctx.GRLDParams.Set(goroutinesThreshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod) + ctx.GRLDParams.MakeStoppable() + + go goroutinesMonitor(ctx) + + // Let the monitor run for a while + time.Sleep(keepStatsFor * 2) + + ctx.GRLDParams.Stop() + + // Wait for several check intervals to allow the monitor to stop + time.Sleep(checkInterval * 100) + + // Close the pipe + _ = w.Close() + + // Read the log output + output, _ := io.ReadAll(r) + + msgStart := "Starting goroutines monitor (stoppable: true)" + msgStop := "Stopping goroutines monitor" + expectedMsgs := []string{msgStart, msgStop} + for _, expectedMsg := range expectedMsgs { + if !strings.Contains(string(output), expectedMsg) { + t.Errorf("Expected log output to contain '%s'", expectedMsg) + } + } +} + +func TestGoroutineMonitorRunsFineUnstoppable(t *testing.T) { + keepStatsFor := 24 * 60 * time.Millisecond + goroutinesThreshold := 100 + checkInterval := 1 * time.Millisecond + checkStatsFor := 10 * time.Millisecond + cooldownPeriod := 5 * time.Millisecond + + backupOut := logger.Out + backupLevel := logger.Level + // Create a pipe to capture log output + r, w, _ := os.Pipe() + logger.SetOutput(w) + logger.SetLevel(logrus.TraceLevel) + defer func() { + logger.SetOutput(backupOut) + logger.SetLevel(backupLevel) + }() + + // Create context with default parameters + ctx := &watcherContext{} + ctx.GRLDParams.Set(goroutinesThreshold, checkInterval, checkStatsFor, keepStatsFor, cooldownPeriod) + + go goroutinesMonitor(ctx) + + time.Sleep(keepStatsFor * 2) + + // Close the pipe + _ = w.Close() + + // Read the log output + output, _ := io.ReadAll(r) + + msgStart := "Starting goroutines monitor (stoppable: false)" + expectedMsgs := []string{msgStart} + for _, expectedMsg := range expectedMsgs { + if !strings.Contains(string(output), expectedMsg) { + t.Errorf("Expected log output to contain '%s'", expectedMsg) + } + } + +}