Skip to content

Commit

Permalink
Use atomic.Bool for rungroup actors that should not run interrupt rou…
Browse files Browse the repository at this point in the history
…tines more than once (#2012)
  • Loading branch information
RebeccaMahany authored Dec 20, 2024
1 parent 503ca05 commit c8ff2ab
Show file tree
Hide file tree
Showing 17 changed files with 80 additions and 43 deletions.
8 changes: 5 additions & 3 deletions cmd/launcher/signal_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"log/slog"
"os"
"os/signal"
"sync/atomic"
"syscall"
)

Expand All @@ -13,7 +14,7 @@ type signalListener struct {
sigChannel chan os.Signal
cancel context.CancelFunc
slogger *slog.Logger
interrupted bool
interrupted atomic.Bool
}

func newSignalListener(sigChannel chan os.Signal, cancel context.CancelFunc, slogger *slog.Logger) *signalListener {
Expand All @@ -36,10 +37,11 @@ func (s *signalListener) Execute() error {

func (s *signalListener) Interrupt(_ error) {
// Only perform shutdown tasks on first call to interrupt -- no need to repeat on potential extra calls.
if s.interrupted {
if s.interrupted.Load() {
return
}
s.interrupted = true

s.interrupted.Store(true)
s.cancel()
close(s.sigChannel)
}
7 changes: 4 additions & 3 deletions ee/agent/storage/bbolt/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"log/slog"
"os"
"path/filepath"
"sync/atomic"
"time"

"github.com/kolide/launcher/ee/agent/types"
Expand All @@ -23,7 +24,7 @@ type databaseBackupSaver struct {
knapsack types.Knapsack
slogger *slog.Logger
interrupt chan struct{}
interrupted bool
interrupted atomic.Bool
}

func NewDatabaseBackupSaver(k types.Knapsack) *databaseBackupSaver {
Expand Down Expand Up @@ -71,10 +72,10 @@ func (d *databaseBackupSaver) Execute() error {

func (d *databaseBackupSaver) Interrupt(_ error) {
// Only perform shutdown tasks on first call to interrupt -- no need to repeat on potential extra calls.
if d.interrupted {
if d.interrupted.Load() {
return
}
d.interrupted = true
d.interrupted.Store(true)

d.interrupt <- struct{}{}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"io"
"log/slog"
"sync/atomic"
"time"

"github.com/kolide/launcher/ee/agent/types"
Expand All @@ -33,7 +34,7 @@ type RemoteRestartConsumer struct {
slogger *slog.Logger
signalRestart chan error
interrupt chan struct{}
interrupted bool
interrupted atomic.Bool
}

type remoteRestartAction struct {
Expand Down Expand Up @@ -122,10 +123,10 @@ func (r *RemoteRestartConsumer) Execute() (err error) {
// and be shut down when the rungroup shuts down.
func (r *RemoteRestartConsumer) Interrupt(_ error) {
// Only perform shutdown tasks on first call to interrupt -- no need to repeat on potential extra calls.
if r.interrupted {
if r.interrupted.Load() {
return
}
r.interrupted = true
r.interrupted.Store(true)

r.interrupt <- struct{}{}
}
7 changes: 4 additions & 3 deletions ee/debug/checkups/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"io"
"log/slog"
"sync/atomic"
"time"

"github.com/kolide/launcher/ee/agent/types"
Expand All @@ -14,7 +15,7 @@ type (
slogger *slog.Logger
knapsack types.Knapsack
interrupt chan struct{}
interrupted bool
interrupted atomic.Bool
}
)

Expand Down Expand Up @@ -49,11 +50,11 @@ func (c *logCheckPointer) Run() error {

func (c *logCheckPointer) Interrupt(_ error) {
// Only perform shutdown tasks on first call to interrupt -- no need to repeat on potential extra calls.
if c.interrupted {
if c.interrupted.Load() {
return
}

c.interrupted = true
c.interrupted.Store(true)

c.interrupt <- struct{}{}
}
Expand Down
7 changes: 4 additions & 3 deletions ee/desktop/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/kolide/kit/ulid"
Expand Down Expand Up @@ -103,7 +104,7 @@ type DesktopUsersProcessesRunner struct {
// menuRefreshInterval is the interval on which the desktop menu will be refreshed
menuRefreshInterval time.Duration
interrupt chan struct{}
interrupted bool
interrupted atomic.Bool
// uidProcs is a map of uid to desktop process
uidProcs map[string]processRecord
// procsWg is a WaitGroup to wait for all desktop processes to finish during an interrupt
Expand Down Expand Up @@ -251,11 +252,11 @@ func (r *DesktopUsersProcessesRunner) Execute() error {
// It also signals the execute loop to exit, so new desktop processes cease to spawn.
func (r *DesktopUsersProcessesRunner) Interrupt(_ error) {
// Only perform shutdown tasks on first call to interrupt -- no need to repeat on potential extra calls.
if r.interrupted {
if r.interrupted.Load() {
return
}

r.interrupted = true
r.interrupted.Store(true)

// Tell the execute loop to stop checking, and exit
r.interrupt <- struct{}{}
Expand Down
10 changes: 9 additions & 1 deletion ee/desktop/user/notify/notify_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ import (
"log/slog"
"os"
"strings"
"sync/atomic"
"unsafe"
)

type macNotifier struct {
interrupt chan struct{}
interrupt chan struct{}
interrupted atomic.Bool
}

func NewDesktopNotifier(_ *slog.Logger, _ string) *macNotifier {
Expand All @@ -39,6 +41,12 @@ func (m *macNotifier) Execute() error {
}

func (m *macNotifier) Interrupt(err error) {
if m.interrupted.Load() {
return
}

m.interrupted.Store(true)

m.interrupt <- struct{}{}
}

Expand Down
7 changes: 7 additions & 0 deletions ee/desktop/user/notify/notify_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"log/slog"
"sync"
"sync/atomic"
"time"

"github.com/godbus/dbus/v5"
Expand All @@ -21,6 +22,7 @@ type dbusNotifier struct {
conn *dbus.Conn
signal chan *dbus.Signal
interrupt chan struct{}
interrupted atomic.Bool
sentNotificationIds map[uint32]bool
lock sync.RWMutex
}
Expand Down Expand Up @@ -129,6 +131,11 @@ func (d *dbusNotifier) Execute() error {
}

func (d *dbusNotifier) Interrupt(err error) {
if d.interrupted.Load() {
return
}
d.interrupted.Store(true)

d.interrupt <- struct{}{}

d.conn.RemoveSignal(d.signal)
Expand Down
7 changes: 7 additions & 0 deletions ee/desktop/user/notify/notify_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ package notify

import (
"log/slog"
"sync/atomic"

"github.com/kolide/toast"
)

type windowsNotifier struct {
iconFilepath string
interrupt chan struct{}
interrupted atomic.Bool
}

func NewDesktopNotifier(_ *slog.Logger, iconFilepath string) *windowsNotifier {
Expand All @@ -32,6 +34,11 @@ func (w *windowsNotifier) Execute() error {
func (w *windowsNotifier) Listen() {}

func (w *windowsNotifier) Interrupt(err error) {
if w.interrupted.Load() {
return
}
w.interrupted.Store(true)

w.interrupt <- struct{}{}
}

Expand Down
7 changes: 4 additions & 3 deletions ee/desktop/user/universallink/handler_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"net/url"
"os"
"strings"
"sync/atomic"
"unsafe"
)

Expand All @@ -28,7 +29,7 @@ import (
type universalLinkHandler struct {
urlInput chan string
slogger *slog.Logger
interrupted bool
interrupted atomic.Bool
interrupt chan struct{}
}

Expand Down Expand Up @@ -73,10 +74,10 @@ func (u *universalLinkHandler) Interrupt(_ error) {
)

// Only perform shutdown tasks on first call to interrupt -- no need to repeat on potential extra calls.
if u.interrupted {
if u.interrupted.Load() {
return
}
u.interrupted = true
u.interrupted.Store(true)

u.interrupt <- struct{}{}
close(u.urlInput)
Expand Down
7 changes: 4 additions & 3 deletions ee/desktop/user/universallink/handler_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ package universallink

import (
"log/slog"
"sync/atomic"
)

// On other OSes, universal link handling is a no-op.
type noopUniversalLinkHandler struct {
unusedInput chan string
interrupted bool
interrupted atomic.Bool
interrupt chan struct{}
}

Expand All @@ -29,10 +30,10 @@ func (n *noopUniversalLinkHandler) Execute() error {

func (n *noopUniversalLinkHandler) Interrupt(_ error) {
// Only perform shutdown tasks on first call to interrupt -- no need to repeat on potential extra calls.
if n.interrupted {
if n.interrupted.Load() {
return
}
n.interrupted = true
n.interrupted.Store(true)

n.interrupt <- struct{}{}
close(n.unusedInput)
Expand Down
7 changes: 4 additions & 3 deletions ee/powereventwatcher/power_event_watcher_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ package powereventwatcher
import (
"context"
"log/slog"
"sync/atomic"

"github.com/kolide/launcher/ee/agent/types"
"github.com/kolide/launcher/pkg/traces"
)

type noOpPowerEventWatcher struct {
interrupt chan struct{}
interrupted bool
interrupted atomic.Bool
}

type noOpKnapsackSleepStateUpdater struct{}
Expand All @@ -38,11 +39,11 @@ func (n *noOpPowerEventWatcher) Execute() error {

func (n *noOpPowerEventWatcher) Interrupt(_ error) {
// Only perform shutdown tasks on first call to interrupt -- no need to repeat on potential extra calls.
if n.interrupted {
if n.interrupted.Load() {
return
}

n.interrupted = true
n.interrupted.Store(true)

n.interrupt <- struct{}{}
}
7 changes: 4 additions & 3 deletions ee/powereventwatcher/power_event_watcher_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"log/slog"
"sync"
"sync/atomic"
"syscall"
"unsafe"

Expand All @@ -35,7 +36,7 @@ type (
unsubscribeProcedure *syscall.LazyProc
renderEventLogProcedure *syscall.LazyProc
interrupt chan struct{}
interrupted bool
interrupted atomic.Bool
}

// powerEventSubscriber is an interface to be implemented by anything utilizing the power event updates.
Expand Down Expand Up @@ -234,11 +235,11 @@ func (p *powerEventWatcher) Execute() error {

func (p *powerEventWatcher) Interrupt(_ error) {
// Only perform shutdown tasks on first call to interrupt -- no need to repeat on potential extra calls.
if p.interrupted {
if p.interrupted.Load() {
return
}

p.interrupted = true
p.interrupted.Store(true)

// EvtClose: https://learn.microsoft.com/en-us/windows/win32/api/winevt/nf-winevt-evtclose
ret, _, err := p.unsubscribeProcedure.Call(p.subscriptionHandle)
Expand Down
7 changes: 4 additions & 3 deletions ee/tuf/autoupdate.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"slices"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/kolide/kit/version"
Expand Down Expand Up @@ -95,7 +96,7 @@ type TufAutoupdater struct {
initialDelayEnd time.Time
updateLock *sync.Mutex
interrupt chan struct{}
interrupted bool
interrupted atomic.Bool
signalRestart chan error
slogger *slog.Logger
restartFuncs map[autoupdatableBinary]func() error
Expand Down Expand Up @@ -273,10 +274,10 @@ func (ta *TufAutoupdater) Execute() (err error) {

func (ta *TufAutoupdater) Interrupt(_ error) {
// Only perform shutdown tasks on first call to interrupt -- no need to repeat on potential extra calls.
if ta.interrupted {
if ta.interrupted.Load() {
return
}
ta.interrupted = true
ta.interrupted.Store(true)

ta.interrupt <- struct{}{}
}
Expand Down
Loading

0 comments on commit c8ff2ab

Please sign in to comment.