From c8ff2ab6d75b2d9d668aa64b0c31c0e24204ff77 Mon Sep 17 00:00:00 2001 From: Rebecca Mahany-Horton Date: Fri, 20 Dec 2024 10:50:38 -0500 Subject: [PATCH] Use atomic.Bool for rungroup actors that should not run interrupt routines more than once (#2012) --- cmd/launcher/signal_listener.go | 8 +++++--- ee/agent/storage/bbolt/backup.go | 7 ++++--- .../remoterestartconsumer/remoterestartconsumer.go | 7 ++++--- ee/debug/checkups/checkpoint.go | 7 ++++--- ee/desktop/runner/runner.go | 7 ++++--- ee/desktop/user/notify/notify_darwin.go | 10 +++++++++- ee/desktop/user/notify/notify_linux.go | 7 +++++++ ee/desktop/user/notify/notify_windows.go | 7 +++++++ ee/desktop/user/universallink/handler_darwin.go | 7 ++++--- ee/desktop/user/universallink/handler_other.go | 7 ++++--- ee/powereventwatcher/power_event_watcher_other.go | 7 ++++--- ee/powereventwatcher/power_event_watcher_windows.go | 7 ++++--- ee/tuf/autoupdate.go | 7 ++++--- ee/watchdog/controller_windows.go | 7 ++++--- pkg/osquery/extension.go | 7 ++++--- pkg/osquery/runtime/runner.go | 7 ++++--- pkg/traces/exporter/exporter.go | 7 ++++--- 17 files changed, 80 insertions(+), 43 deletions(-) diff --git a/cmd/launcher/signal_listener.go b/cmd/launcher/signal_listener.go index 99de8549f..b173d5add 100644 --- a/cmd/launcher/signal_listener.go +++ b/cmd/launcher/signal_listener.go @@ -5,6 +5,7 @@ import ( "log/slog" "os" "os/signal" + "sync/atomic" "syscall" ) @@ -13,7 +14,7 @@ type signalListener struct { sigChannel chan os.Signal cancel context.CancelFunc slogger *slog.Logger - interrupted bool + interrupted atomic.Bool } func newSignalListener(sigChannel chan os.Signal, cancel context.CancelFunc, slogger *slog.Logger) *signalListener { @@ -36,10 +37,11 @@ func (s *signalListener) Execute() error { func (s *signalListener) Interrupt(_ error) { // Only perform shutdown tasks on first call to interrupt -- no need to repeat on potential extra calls. - if s.interrupted { + if s.interrupted.Load() { return } - s.interrupted = true + + s.interrupted.Store(true) s.cancel() close(s.sigChannel) } diff --git a/ee/agent/storage/bbolt/backup.go b/ee/agent/storage/bbolt/backup.go index a1402f7ba..42dfd36fb 100644 --- a/ee/agent/storage/bbolt/backup.go +++ b/ee/agent/storage/bbolt/backup.go @@ -6,6 +6,7 @@ import ( "log/slog" "os" "path/filepath" + "sync/atomic" "time" "github.com/kolide/launcher/ee/agent/types" @@ -23,7 +24,7 @@ type databaseBackupSaver struct { knapsack types.Knapsack slogger *slog.Logger interrupt chan struct{} - interrupted bool + interrupted atomic.Bool } func NewDatabaseBackupSaver(k types.Knapsack) *databaseBackupSaver { @@ -71,10 +72,10 @@ func (d *databaseBackupSaver) Execute() error { func (d *databaseBackupSaver) Interrupt(_ error) { // Only perform shutdown tasks on first call to interrupt -- no need to repeat on potential extra calls. - if d.interrupted { + if d.interrupted.Load() { return } - d.interrupted = true + d.interrupted.Store(true) d.interrupt <- struct{}{} } diff --git a/ee/control/consumers/remoterestartconsumer/remoterestartconsumer.go b/ee/control/consumers/remoterestartconsumer/remoterestartconsumer.go index 8b2f2c843..e691b4238 100644 --- a/ee/control/consumers/remoterestartconsumer/remoterestartconsumer.go +++ b/ee/control/consumers/remoterestartconsumer/remoterestartconsumer.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "log/slog" + "sync/atomic" "time" "github.com/kolide/launcher/ee/agent/types" @@ -33,7 +34,7 @@ type RemoteRestartConsumer struct { slogger *slog.Logger signalRestart chan error interrupt chan struct{} - interrupted bool + interrupted atomic.Bool } type remoteRestartAction struct { @@ -122,10 +123,10 @@ func (r *RemoteRestartConsumer) Execute() (err error) { // and be shut down when the rungroup shuts down. func (r *RemoteRestartConsumer) Interrupt(_ error) { // Only perform shutdown tasks on first call to interrupt -- no need to repeat on potential extra calls. - if r.interrupted { + if r.interrupted.Load() { return } - r.interrupted = true + r.interrupted.Store(true) r.interrupt <- struct{}{} } diff --git a/ee/debug/checkups/checkpoint.go b/ee/debug/checkups/checkpoint.go index 8119a570e..087b195a1 100644 --- a/ee/debug/checkups/checkpoint.go +++ b/ee/debug/checkups/checkpoint.go @@ -4,6 +4,7 @@ import ( "context" "io" "log/slog" + "sync/atomic" "time" "github.com/kolide/launcher/ee/agent/types" @@ -14,7 +15,7 @@ type ( slogger *slog.Logger knapsack types.Knapsack interrupt chan struct{} - interrupted bool + interrupted atomic.Bool } ) @@ -49,11 +50,11 @@ func (c *logCheckPointer) Run() error { func (c *logCheckPointer) Interrupt(_ error) { // Only perform shutdown tasks on first call to interrupt -- no need to repeat on potential extra calls. - if c.interrupted { + if c.interrupted.Load() { return } - c.interrupted = true + c.interrupted.Store(true) c.interrupt <- struct{}{} } diff --git a/ee/desktop/runner/runner.go b/ee/desktop/runner/runner.go index ffce886b5..40e1fdf7e 100644 --- a/ee/desktop/runner/runner.go +++ b/ee/desktop/runner/runner.go @@ -18,6 +18,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/kolide/kit/ulid" @@ -103,7 +104,7 @@ type DesktopUsersProcessesRunner struct { // menuRefreshInterval is the interval on which the desktop menu will be refreshed menuRefreshInterval time.Duration interrupt chan struct{} - interrupted bool + interrupted atomic.Bool // uidProcs is a map of uid to desktop process uidProcs map[string]processRecord // procsWg is a WaitGroup to wait for all desktop processes to finish during an interrupt @@ -251,11 +252,11 @@ func (r *DesktopUsersProcessesRunner) Execute() error { // It also signals the execute loop to exit, so new desktop processes cease to spawn. func (r *DesktopUsersProcessesRunner) Interrupt(_ error) { // Only perform shutdown tasks on first call to interrupt -- no need to repeat on potential extra calls. - if r.interrupted { + if r.interrupted.Load() { return } - r.interrupted = true + r.interrupted.Store(true) // Tell the execute loop to stop checking, and exit r.interrupt <- struct{}{} diff --git a/ee/desktop/user/notify/notify_darwin.go b/ee/desktop/user/notify/notify_darwin.go index 4673bce16..7946a1195 100644 --- a/ee/desktop/user/notify/notify_darwin.go +++ b/ee/desktop/user/notify/notify_darwin.go @@ -20,11 +20,13 @@ import ( "log/slog" "os" "strings" + "sync/atomic" "unsafe" ) type macNotifier struct { - interrupt chan struct{} + interrupt chan struct{} + interrupted atomic.Bool } func NewDesktopNotifier(_ *slog.Logger, _ string) *macNotifier { @@ -39,6 +41,12 @@ func (m *macNotifier) Execute() error { } func (m *macNotifier) Interrupt(err error) { + if m.interrupted.Load() { + return + } + + m.interrupted.Store(true) + m.interrupt <- struct{}{} } diff --git a/ee/desktop/user/notify/notify_linux.go b/ee/desktop/user/notify/notify_linux.go index c4cd8deaf..4a06d26e6 100644 --- a/ee/desktop/user/notify/notify_linux.go +++ b/ee/desktop/user/notify/notify_linux.go @@ -9,6 +9,7 @@ import ( "fmt" "log/slog" "sync" + "sync/atomic" "time" "github.com/godbus/dbus/v5" @@ -21,6 +22,7 @@ type dbusNotifier struct { conn *dbus.Conn signal chan *dbus.Signal interrupt chan struct{} + interrupted atomic.Bool sentNotificationIds map[uint32]bool lock sync.RWMutex } @@ -129,6 +131,11 @@ func (d *dbusNotifier) Execute() error { } func (d *dbusNotifier) Interrupt(err error) { + if d.interrupted.Load() { + return + } + d.interrupted.Store(true) + d.interrupt <- struct{}{} d.conn.RemoveSignal(d.signal) diff --git a/ee/desktop/user/notify/notify_windows.go b/ee/desktop/user/notify/notify_windows.go index 021affefe..5e7abe5a4 100644 --- a/ee/desktop/user/notify/notify_windows.go +++ b/ee/desktop/user/notify/notify_windows.go @@ -5,6 +5,7 @@ package notify import ( "log/slog" + "sync/atomic" "github.com/kolide/toast" ) @@ -12,6 +13,7 @@ import ( type windowsNotifier struct { iconFilepath string interrupt chan struct{} + interrupted atomic.Bool } func NewDesktopNotifier(_ *slog.Logger, iconFilepath string) *windowsNotifier { @@ -32,6 +34,11 @@ func (w *windowsNotifier) Execute() error { func (w *windowsNotifier) Listen() {} func (w *windowsNotifier) Interrupt(err error) { + if w.interrupted.Load() { + return + } + w.interrupted.Store(true) + w.interrupt <- struct{}{} } diff --git a/ee/desktop/user/universallink/handler_darwin.go b/ee/desktop/user/universallink/handler_darwin.go index f453e8d69..c26978709 100644 --- a/ee/desktop/user/universallink/handler_darwin.go +++ b/ee/desktop/user/universallink/handler_darwin.go @@ -20,6 +20,7 @@ import ( "net/url" "os" "strings" + "sync/atomic" "unsafe" ) @@ -28,7 +29,7 @@ import ( type universalLinkHandler struct { urlInput chan string slogger *slog.Logger - interrupted bool + interrupted atomic.Bool interrupt chan struct{} } @@ -73,10 +74,10 @@ func (u *universalLinkHandler) Interrupt(_ error) { ) // Only perform shutdown tasks on first call to interrupt -- no need to repeat on potential extra calls. - if u.interrupted { + if u.interrupted.Load() { return } - u.interrupted = true + u.interrupted.Store(true) u.interrupt <- struct{}{} close(u.urlInput) diff --git a/ee/desktop/user/universallink/handler_other.go b/ee/desktop/user/universallink/handler_other.go index 903c516c8..ea13e0521 100644 --- a/ee/desktop/user/universallink/handler_other.go +++ b/ee/desktop/user/universallink/handler_other.go @@ -5,12 +5,13 @@ package universallink import ( "log/slog" + "sync/atomic" ) // On other OSes, universal link handling is a no-op. type noopUniversalLinkHandler struct { unusedInput chan string - interrupted bool + interrupted atomic.Bool interrupt chan struct{} } @@ -29,10 +30,10 @@ func (n *noopUniversalLinkHandler) Execute() error { func (n *noopUniversalLinkHandler) Interrupt(_ error) { // Only perform shutdown tasks on first call to interrupt -- no need to repeat on potential extra calls. - if n.interrupted { + if n.interrupted.Load() { return } - n.interrupted = true + n.interrupted.Store(true) n.interrupt <- struct{}{} close(n.unusedInput) diff --git a/ee/powereventwatcher/power_event_watcher_other.go b/ee/powereventwatcher/power_event_watcher_other.go index e824290ad..fe78b402f 100644 --- a/ee/powereventwatcher/power_event_watcher_other.go +++ b/ee/powereventwatcher/power_event_watcher_other.go @@ -6,6 +6,7 @@ package powereventwatcher import ( "context" "log/slog" + "sync/atomic" "github.com/kolide/launcher/ee/agent/types" "github.com/kolide/launcher/pkg/traces" @@ -13,7 +14,7 @@ import ( type noOpPowerEventWatcher struct { interrupt chan struct{} - interrupted bool + interrupted atomic.Bool } type noOpKnapsackSleepStateUpdater struct{} @@ -38,11 +39,11 @@ func (n *noOpPowerEventWatcher) Execute() error { func (n *noOpPowerEventWatcher) Interrupt(_ error) { // Only perform shutdown tasks on first call to interrupt -- no need to repeat on potential extra calls. - if n.interrupted { + if n.interrupted.Load() { return } - n.interrupted = true + n.interrupted.Store(true) n.interrupt <- struct{}{} } diff --git a/ee/powereventwatcher/power_event_watcher_windows.go b/ee/powereventwatcher/power_event_watcher_windows.go index a977e9a65..3dd7cfd4f 100644 --- a/ee/powereventwatcher/power_event_watcher_windows.go +++ b/ee/powereventwatcher/power_event_watcher_windows.go @@ -9,6 +9,7 @@ import ( "fmt" "log/slog" "sync" + "sync/atomic" "syscall" "unsafe" @@ -35,7 +36,7 @@ type ( unsubscribeProcedure *syscall.LazyProc renderEventLogProcedure *syscall.LazyProc interrupt chan struct{} - interrupted bool + interrupted atomic.Bool } // powerEventSubscriber is an interface to be implemented by anything utilizing the power event updates. @@ -234,11 +235,11 @@ func (p *powerEventWatcher) Execute() error { func (p *powerEventWatcher) Interrupt(_ error) { // Only perform shutdown tasks on first call to interrupt -- no need to repeat on potential extra calls. - if p.interrupted { + if p.interrupted.Load() { return } - p.interrupted = true + p.interrupted.Store(true) // EvtClose: https://learn.microsoft.com/en-us/windows/win32/api/winevt/nf-winevt-evtclose ret, _, err := p.unsubscribeProcedure.Call(p.subscriptionHandle) diff --git a/ee/tuf/autoupdate.go b/ee/tuf/autoupdate.go index dccea909a..c10569427 100644 --- a/ee/tuf/autoupdate.go +++ b/ee/tuf/autoupdate.go @@ -19,6 +19,7 @@ import ( "slices" "strconv" "sync" + "sync/atomic" "time" "github.com/kolide/kit/version" @@ -95,7 +96,7 @@ type TufAutoupdater struct { initialDelayEnd time.Time updateLock *sync.Mutex interrupt chan struct{} - interrupted bool + interrupted atomic.Bool signalRestart chan error slogger *slog.Logger restartFuncs map[autoupdatableBinary]func() error @@ -273,10 +274,10 @@ func (ta *TufAutoupdater) Execute() (err error) { func (ta *TufAutoupdater) Interrupt(_ error) { // Only perform shutdown tasks on first call to interrupt -- no need to repeat on potential extra calls. - if ta.interrupted { + if ta.interrupted.Load() { return } - ta.interrupted = true + ta.interrupted.Store(true) ta.interrupt <- struct{}{} } diff --git a/ee/watchdog/controller_windows.go b/ee/watchdog/controller_windows.go index e155ccc55..4e8740c15 100644 --- a/ee/watchdog/controller_windows.go +++ b/ee/watchdog/controller_windows.go @@ -10,6 +10,7 @@ import ( "log/slog" "slices" "strings" + "sync/atomic" "time" "github.com/go-ole/go-ole" @@ -36,7 +37,7 @@ type WatchdogController struct { slogger *slog.Logger knapsack types.Knapsack interrupt chan struct{} - interrupted bool + interrupted atomic.Bool logPublisher types.LogStore configFilePath string } @@ -156,12 +157,12 @@ func (wc *WatchdogController) publishLogs(ctx context.Context) { func (wc *WatchdogController) Interrupt(_ error) { // Only perform shutdown tasks on first call to interrupt -- no need to repeat on potential extra calls. - if wc.interrupted { + if wc.interrupted.Load() { return } wc.logPublisher.Close() - wc.interrupted = true + wc.interrupted.Store(true) wc.interrupt <- struct{}{} } diff --git a/pkg/osquery/extension.go b/pkg/osquery/extension.go index 2898550a8..96193af9b 100644 --- a/pkg/osquery/extension.go +++ b/pkg/osquery/extension.go @@ -11,6 +11,7 @@ import ( "log/slog" "os" "sync" + "sync/atomic" "time" "github.com/google/uuid" @@ -42,7 +43,7 @@ type Extension struct { serviceClient service.KolideService enrollMutex sync.Mutex done chan struct{} - interrupted bool + interrupted atomic.Bool slogger *slog.Logger logPublicationState *logPublicationState } @@ -172,10 +173,10 @@ func (e *Extension) Execute() error { // with this extension. func (e *Extension) Shutdown(_ error) { // Only perform shutdown tasks on first call to interrupt -- no need to repeat on potential extra calls. - if e.interrupted { + if e.interrupted.Load() { return } - e.interrupted = true + e.interrupted.Store(true) close(e.done) } diff --git a/pkg/osquery/runtime/runner.go b/pkg/osquery/runtime/runner.go index 3f926c46d..dfde2563b 100644 --- a/pkg/osquery/runtime/runner.go +++ b/pkg/osquery/runtime/runner.go @@ -6,6 +6,7 @@ import ( "fmt" "log/slog" "sync" + "sync/atomic" "time" "github.com/kolide/launcher/ee/agent/flags/keys" @@ -27,7 +28,7 @@ type Runner struct { serviceClient service.KolideService // shared service client for communication between osquery instance and Kolide SaaS opts []OsqueryInstanceOption // global options applying to all osquery instances shutdown chan struct{} - interrupted bool + interrupted atomic.Bool } func New(k types.Knapsack, serviceClient service.KolideService, opts ...OsqueryInstanceOption) *Runner { @@ -197,12 +198,12 @@ func (r *Runner) Interrupt(_ error) { // Shutdown instructs the runner to permanently stop the running instance (no // restart will be attempted). func (r *Runner) Shutdown() error { - if r.interrupted { + if r.interrupted.Load() { // Already shut down, nothing else to do return nil } - r.interrupted = true + r.interrupted.Store(true) close(r.shutdown) if err := r.triggerShutdownForInstances(); err != nil { diff --git a/pkg/traces/exporter/exporter.go b/pkg/traces/exporter/exporter.go index 1dd72a3e8..b84d4a1c9 100644 --- a/pkg/traces/exporter/exporter.go +++ b/pkg/traces/exporter/exporter.go @@ -5,6 +5,7 @@ import ( "errors" "log/slog" "sync" + "sync/atomic" "time" "github.com/kolide/launcher/ee/agent/flags/keys" @@ -57,7 +58,7 @@ type TraceExporter struct { batchTimeout time.Duration ctx context.Context // nolint:containedctx cancel context.CancelFunc - interrupted bool + interrupted atomic.Bool } // NewTraceExporter sets up our traces to be exported via OTLP over HTTP. @@ -318,11 +319,11 @@ func (t *TraceExporter) Execute() error { func (t *TraceExporter) Interrupt(_ error) { // Only perform shutdown tasks on first call to interrupt -- no need to repeat on potential extra calls. - if t.interrupted { + if t.interrupted.Load() { return } - t.interrupted = true + t.interrupted.Store(true) if t.provider != nil { t.provider.Shutdown(t.ctx)