From 0b31865a66d6307376b21ce9b34020a0cf39f09c Mon Sep 17 00:00:00 2001 From: RebeccaMahany Date: Tue, 5 Nov 2024 16:51:46 -0500 Subject: [PATCH 1/7] Support running multiple osquery instances --- pkg/osquery/runtime/runner.go | 166 ++++++++++++++++------ pkg/osquery/runtime/runtime_posix_test.go | 12 +- pkg/osquery/runtime/runtime_test.go | 28 ++-- 3 files changed, 141 insertions(+), 65 deletions(-) diff --git a/pkg/osquery/runtime/runner.go b/pkg/osquery/runtime/runner.go index 7200d5c03..b594b11c6 100644 --- a/pkg/osquery/runtime/runner.go +++ b/pkg/osquery/runtime/runner.go @@ -2,6 +2,7 @@ package runtime import ( "context" + "errors" "fmt" "log/slog" "sync" @@ -9,10 +10,15 @@ import ( "github.com/kolide/launcher/ee/agent/flags/keys" "github.com/kolide/launcher/ee/agent/types" "github.com/kolide/launcher/pkg/service" + "golang.org/x/sync/errgroup" +) + +const ( + defaultRegistrationId = "default" ) type Runner struct { - instance *OsqueryInstance + instances map[string]*OsqueryInstance // maps registration ID to instance instanceLock sync.Mutex slogger *slog.Logger knapsack types.Knapsack @@ -24,7 +30,10 @@ type Runner struct { func New(k types.Knapsack, serviceClient service.KolideService, opts ...OsqueryInstanceOption) *Runner { runner := &Runner{ - instance: newInstance(k, serviceClient, opts...), + instances: map[string]*OsqueryInstance{ + // For now, we only have one (default) instance and we use it for all queries + defaultRegistrationId: newInstance(k, serviceClient, opts...), + }, slogger: k.Slogger().With("component", "osquery_runner"), knapsack: k, serviceClient: serviceClient, @@ -40,60 +49,89 @@ func New(k types.Knapsack, serviceClient service.KolideService, opts ...OsqueryI } func (r *Runner) Run() error { - // Ensure we don't try to restart the instance before it's launched + // Create a group to track the workers running each instance + wg, ctx := errgroup.WithContext(context.Background()) + + // Start each worker for each instance + for registrationId := range r.instances { + wg.Go(func() error { + if err := r.runInstance(registrationId); err != nil { + r.slogger.Log(ctx, slog.LevelWarn, + "runner terminated running osquery instance unexpectedly, shutting down runner", + "err", err, + ) + + if err := r.Shutdown(); err != nil { + r.slogger.Log(ctx, slog.LevelError, + "could not shut down runner after failure to run osquery instance", + "err", err, + ) + } + return err + } + + return nil + }) + } + + // Wait for all workers to exit + if err := wg.Wait(); err != nil { + return fmt.Errorf("running instances: %w", err) + } + + return nil +} + +// runInstance starts a worker that launches the instance for the given registration ID, and +// then ensures that instance stays up. It exits if `Shutdown` is called, or if the instance +// exits and cannot be restarted. +func (r *Runner) runInstance(registrationId string) error { + slogger := r.slogger.With("registration_id", registrationId) + + // First, launch the instance. Ensure we don't try to restart before launch is complete. r.instanceLock.Lock() - if err := r.instance.Launch(); err != nil { - r.slogger.Log(context.TODO(), slog.LevelWarn, - "failed to launch osquery instance", - "err", err, - ) + instance, ok := r.instances[registrationId] + if !ok { r.instanceLock.Unlock() - return fmt.Errorf("starting instance: %w", err) + return fmt.Errorf("no instance exists for %s", registrationId) + } + if err := instance.Launch(); err != nil { + r.instanceLock.Unlock() + return fmt.Errorf("starting instance for %s: %w", registrationId, err) } r.instanceLock.Unlock() - // This loop waits for the completion of the async routines, - // and either restarts the instance (if Shutdown was not - // called), or stops (if Shutdown was called). + // This loop restarts the instance as necessary. It exits when `Shutdown` is called, + // or if the instance exits and cannot be restarted. for { - // Wait for async processes to exit - <-r.instance.Exited() - r.slogger.Log(context.TODO(), slog.LevelInfo, + <-instance.Exited() + slogger.Log(context.TODO(), slog.LevelInfo, "osquery instance exited", + "registration_id", registrationId, ) select { case <-r.shutdown: - // Intentional shutdown, this loop can exit + // Intentional shutdown of runner -- exit worker return nil default: - // Don't block + // Continue on to restart the instance } - // Error case -- osquery instance shut down and needs to be restarted - err := r.instance.WaitShutdown() - r.slogger.Log(context.TODO(), slog.LevelInfo, + // The osquery instance either exited on its own, or we called `Restart`. + // Either way, we wait for exit to complete, and then restart the instance. + err := instance.WaitShutdown() + slogger.Log(context.TODO(), slog.LevelInfo, "unexpected restart of instance", "err", err, ) r.instanceLock.Lock() - r.instance = newInstance(r.knapsack, r.serviceClient, r.opts...) - if err := r.instance.Launch(); err != nil { - r.slogger.Log(context.TODO(), slog.LevelWarn, - "fatal error restarting instance, shutting down", - "err", err, - ) + instance = newInstance(r.knapsack, r.serviceClient, r.opts...) + r.instances[registrationId] = instance + if err := instance.Launch(); err != nil { r.instanceLock.Unlock() - if err := r.Shutdown(); err != nil { - r.slogger.Log(context.TODO(), slog.LevelWarn, - "could not perform shutdown", - "err", err, - ) - } - - // Failed to restart instance -- exit rungroup so launcher can reload - return fmt.Errorf("restarting instance after unexpected exit: %w", err) + return fmt.Errorf("could not restart osquery instance after unexpected exit: %w", err) } r.instanceLock.Unlock() @@ -103,7 +141,14 @@ func (r *Runner) Run() error { func (r *Runner) Query(query string) ([]map[string]string, error) { r.instanceLock.Lock() defer r.instanceLock.Unlock() - return r.instance.Query(query) + + // For now, grab the default (i.e. only) instance + instance, ok := r.instances[defaultRegistrationId] + if !ok { + return nil, errors.New("no default instance exists, cannot query") + } + + return instance.Query(query) } func (r *Runner) Interrupt(_ error) { @@ -125,12 +170,31 @@ func (r *Runner) Shutdown() error { r.interrupted = true close(r.shutdown) + + if err := r.triggerShutdownForInstances(); err != nil { + return fmt.Errorf("triggering shutdown for instances during runner shutdown: %w", err) + } + + return nil +} + +// triggerShutdownForInstances asks all instances in `r.instances` to shut down. +func (r *Runner) triggerShutdownForInstances() error { r.instanceLock.Lock() defer r.instanceLock.Unlock() - r.instance.BeginShutdown() - if err := r.instance.WaitShutdown(); err != context.Canceled && err != nil { - return fmt.Errorf("while shutting down instance: %w", err) + + shutdownErrs := make([]error, 0) + for registrationId, instance := range r.instances { + instance.BeginShutdown() + if err := instance.WaitShutdown(); err != context.Canceled && err != nil { + shutdownErrs = append(shutdownErrs, fmt.Errorf("shutting down instance %s: %w", registrationId, err)) + } + } + + if len(shutdownErrs) > 0 { + return fmt.Errorf("shutting down all instances: %+v", shutdownErrs) } + return nil } @@ -172,11 +236,11 @@ func (r *Runner) Restart() error { r.slogger.Log(context.TODO(), slog.LevelDebug, "runner.Restart called", ) - r.instanceLock.Lock() - defer r.instanceLock.Unlock() - // Shut down the instance -- `Run` will start a new one. - r.instance.BeginShutdown() - r.instance.WaitShutdown() + + // Shut down the instances -- this will trigger a restart in each `runInstance`. + if err := r.triggerShutdownForInstances(); err != nil { + return fmt.Errorf("triggering shutdown for instances during runner restart: %w", err) + } return nil } @@ -186,5 +250,17 @@ func (r *Runner) Restart() error { func (r *Runner) Healthy() error { r.instanceLock.Lock() defer r.instanceLock.Unlock() - return r.instance.Healthy() + + healthcheckErrs := make([]error, 0) + for registrationId, instance := range r.instances { + if err := instance.Healthy(); err != nil { + healthcheckErrs = append(healthcheckErrs, fmt.Errorf("healthcheck error for %s: %w", registrationId, err)) + } + } + + if len(healthcheckErrs) > 0 { + return fmt.Errorf("healthchecking all instances: %+v", healthcheckErrs) + } + + return nil } diff --git a/pkg/osquery/runtime/runtime_posix_test.go b/pkg/osquery/runtime/runtime_posix_test.go index aa1ccd91d..f0fa70bae 100644 --- a/pkg/osquery/runtime/runtime_posix_test.go +++ b/pkg/osquery/runtime/runtime_posix_test.go @@ -134,24 +134,24 @@ func TestRestart(t *testing.T) { runner, logBytes, teardown := setupOsqueryInstanceForTests(t) defer teardown() - previousStats := runner.instance.stats + previousStats := runner.instances[defaultRegistrationId].stats require.NoError(t, runner.Restart()) waitHealthy(t, runner, logBytes) - require.NotEmpty(t, runner.instance.stats.StartTime, "start time should be set on latest instance stats after restart") - require.NotEmpty(t, runner.instance.stats.ConnectTime, "connect time should be set on latest instance stats after restart") + require.NotEmpty(t, runner.instances[defaultRegistrationId].stats.StartTime, "start time should be set on latest instance stats after restart") + require.NotEmpty(t, runner.instances[defaultRegistrationId].stats.ConnectTime, "connect time should be set on latest instance stats after restart") require.NotEmpty(t, previousStats.ExitTime, "exit time should be set on last instance stats when restarted") require.NotEmpty(t, previousStats.Error, "stats instance should have an error on restart") - previousStats = runner.instance.stats + previousStats = runner.instances[defaultRegistrationId].stats require.NoError(t, runner.Restart()) waitHealthy(t, runner, logBytes) - require.NotEmpty(t, runner.instance.stats.StartTime, "start time should be added to latest instance stats after restart") - require.NotEmpty(t, runner.instance.stats.ConnectTime, "connect time should be added to latest instance stats after restart") + require.NotEmpty(t, runner.instances[defaultRegistrationId].stats.StartTime, "start time should be added to latest instance stats after restart") + require.NotEmpty(t, runner.instances[defaultRegistrationId].stats.ConnectTime, "connect time should be added to latest instance stats after restart") require.NotEmpty(t, previousStats.ExitTime, "exit time should be set on instance stats when restarted") require.NotEmpty(t, previousStats.Error, "stats instance should have an error on restart") diff --git a/pkg/osquery/runtime/runtime_test.go b/pkg/osquery/runtime/runtime_test.go index 32629349c..143183f65 100644 --- a/pkg/osquery/runtime/runtime_test.go +++ b/pkg/osquery/runtime/runtime_test.go @@ -409,7 +409,7 @@ func TestFlagsChanged(t *testing.T) { // Confirm watchdog is disabled watchdogDisabled := false - for _, a := range runner.instance.cmd.Args { + for _, a := range runner.instances[defaultRegistrationId].cmd.Args { if strings.Contains(a, "disable_watchdog") { watchdogDisabled = true break @@ -417,7 +417,7 @@ func TestFlagsChanged(t *testing.T) { } require.True(t, watchdogDisabled, "instance not set up with watchdog disabled") - startingInstance := runner.instance + startingInstance := runner.instances[defaultRegistrationId] runner.FlagsChanged(keys.WatchdogEnabled) @@ -426,13 +426,13 @@ func TestFlagsChanged(t *testing.T) { waitHealthy(t, runner, logBytes) // Now confirm that the instance is new - require.NotEqual(t, startingInstance, runner.instance, "instance not replaced") + require.NotEqual(t, startingInstance, runner.instances[defaultRegistrationId], "instance not replaced") // Confirm osquery watchdog is now enabled watchdogMemoryLimitMBFound := false watchdogUtilizationLimitPercentFound := false watchdogDelaySecFound := false - for _, a := range runner.instance.cmd.Args { + for _, a := range runner.instances[defaultRegistrationId].cmd.Args { if strings.Contains(a, "disable_watchdog") { t.Error("disable_watchdog flag set") t.FailNow() @@ -493,10 +493,10 @@ func waitHealthy(t *testing.T, runner *Runner, logBytes *threadsafebuffer.Thread } // Confirm osquery instance setup is complete - if runner.instance == nil { + if runner.instances[defaultRegistrationId] == nil { return errors.New("instance does not exist yet") } - if runner.instance.stats.ConnectTime == "" { + if runner.instances[defaultRegistrationId].stats.ConnectTime == "" { return errors.New("no connect time set yet") } @@ -534,8 +534,8 @@ func TestSimplePath(t *testing.T) { waitHealthy(t, runner, logBytes) - require.NotEmpty(t, runner.instance.stats.StartTime, "start time should be added to instance stats on start up") - require.NotEmpty(t, runner.instance.stats.ConnectTime, "connect time should be added to instance stats on start up") + require.NotEmpty(t, runner.instances[defaultRegistrationId].stats.StartTime, "start time should be added to instance stats on start up") + require.NotEmpty(t, runner.instances[defaultRegistrationId].stats.ConnectTime, "connect time should be added to instance stats on start up") waitShutdown(t, runner, logBytes) } @@ -597,12 +597,12 @@ func TestOsqueryDies(t *testing.T) { waitHealthy(t, runner, logBytes) - previousStats := runner.instance.stats + previousStats := runner.instances[defaultRegistrationId].stats // Simulate the osquery process unexpectedly dying runner.instanceLock.Lock() - require.NoError(t, killProcessGroup(runner.instance.cmd)) - runner.instance.errgroup.Wait() + require.NoError(t, killProcessGroup(runner.instances[defaultRegistrationId].cmd)) + runner.instances[defaultRegistrationId].errgroup.Wait() runner.instanceLock.Unlock() waitHealthy(t, runner, logBytes) @@ -646,10 +646,10 @@ func TestExtensionIsCleanedUp(t *testing.T) { runner, logBytes, teardown := setupOsqueryInstanceForTests(t) defer teardown() - requirePgidMatch(t, runner.instance.cmd.Process.Pid) + requirePgidMatch(t, runner.instances[defaultRegistrationId].cmd.Process.Pid) // kill the current osquery process but not the extension - require.NoError(t, runner.instance.cmd.Process.Kill()) + require.NoError(t, runner.instances[defaultRegistrationId].cmd.Process.Kill()) // We need to (a) let the runner restart osquery, and (b) wait for // the extension to die. Both of these may take up to 30s. We'll @@ -694,7 +694,7 @@ func setupOsqueryInstanceForTests(t *testing.T) (runner *Runner, logBytes *threa go runner.Run() waitHealthy(t, runner, logBytes) - requirePgidMatch(t, runner.instance.cmd.Process.Pid) + requirePgidMatch(t, runner.instances[defaultRegistrationId].cmd.Process.Pid) teardown = func() { waitShutdown(t, runner, logBytes) From f94b739e87c4ea1c711f0cdb65e7732a01e9958d Mon Sep 17 00:00:00 2001 From: RebeccaMahany Date: Tue, 5 Nov 2024 17:03:12 -0500 Subject: [PATCH 2/7] Rename osquery database to avoid collisions --- ee/agent/timemachine/timemachine_darwin_test.go | 12 +++++++----- pkg/osquery/runtime/osqueryinstance.go | 2 +- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/ee/agent/timemachine/timemachine_darwin_test.go b/ee/agent/timemachine/timemachine_darwin_test.go index cfb5a3458..d6d98a30b 100644 --- a/ee/agent/timemachine/timemachine_darwin_test.go +++ b/ee/agent/timemachine/timemachine_darwin_test.go @@ -53,12 +53,14 @@ func TestAddExclusions(t *testing.T) { "metadata.plist": true, "osquery.autoload": true, "osquery.db/": true, - "osquery.pid": true, + fmt.Sprintf("osquery-%s.db/", ulid.New()): true, + "osquery.pid": true, fmt.Sprintf("osquery-%s.pid", ulid.New()): true, - "osquery.sock": true, - "osquery.sock.51807": true, - "osquery.sock.63071": true, - "osqueryd-tuf/": true, + "osquery.sock": true, + fmt.Sprintf("osquery-%s.sock", ulid.New()): true, + "osquery.sock.51807": true, + "osquery.sock.63071": true, + "osqueryd-tuf/": true, // these should NOT be excluded "launcher-tuf/": false, diff --git a/pkg/osquery/runtime/osqueryinstance.go b/pkg/osquery/runtime/osqueryinstance.go index 6e9173dd6..16bd6b299 100644 --- a/pkg/osquery/runtime/osqueryinstance.go +++ b/pkg/osquery/runtime/osqueryinstance.go @@ -679,7 +679,7 @@ func calculateOsqueryPaths(rootDirectory string, runId string, opts osqueryOptio // See: https://github.com/kolide/launcher/issues/1599 osqueryFilePaths := &osqueryFilePaths{ pidfilePath: filepath.Join(rootDirectory, fmt.Sprintf("osquery-%s.pid", runId)), - databasePath: filepath.Join(rootDirectory, "osquery.db"), + databasePath: filepath.Join(rootDirectory, fmt.Sprintf("osquery-%s.db", runId)), augeasPath: filepath.Join(rootDirectory, "augeas-lenses"), extensionSocketPath: extensionSocketPath, extensionAutoloadPath: extensionAutoloadPath, From 68d19da68fd310ada86dc3d61e3ec2e0df2e30ad Mon Sep 17 00:00:00 2001 From: RebeccaMahany Date: Tue, 5 Nov 2024 17:03:22 -0500 Subject: [PATCH 3/7] Add test for multiple osquery instances --- pkg/osquery/runtime/runtime_test.go | 45 ++++++++++++++++++++++++++++- 1 file changed, 44 insertions(+), 1 deletion(-) diff --git a/pkg/osquery/runtime/runtime_test.go b/pkg/osquery/runtime/runtime_test.go index 143183f65..703aae37e 100644 --- a/pkg/osquery/runtime/runtime_test.go +++ b/pkg/osquery/runtime/runtime_test.go @@ -494,7 +494,7 @@ func waitHealthy(t *testing.T, runner *Runner, logBytes *threadsafebuffer.Thread // Confirm osquery instance setup is complete if runner.instances[defaultRegistrationId] == nil { - return errors.New("instance does not exist yet") + return errors.New("default instance does not exist yet") } if runner.instances[defaultRegistrationId].stats.ConnectTime == "" { return errors.New("no connect time set yet") @@ -540,6 +540,49 @@ func TestSimplePath(t *testing.T) { waitShutdown(t, runner, logBytes) } +func TestMultipleInstances(t *testing.T) { + t.Parallel() + rootDirectory := testRootDirectory(t) + + logBytes, slogger, opts := setUpTestSlogger(rootDirectory) + + k := typesMocks.NewKnapsack(t) + k.On("OsqueryHealthcheckStartupDelay").Return(0 * time.Second).Maybe() + k.On("WatchdogEnabled").Return(false) + k.On("RegisterChangeObserver", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything) + k.On("Slogger").Return(slogger) + k.On("LatestOsquerydPath", mock.Anything).Return(testOsqueryBinaryDirectory) + k.On("RootDirectory").Return(rootDirectory).Maybe() + k.On("OsqueryFlags").Return([]string{}) + k.On("OsqueryVerbose").Return(true) + k.On("LoggingInterval").Return(5 * time.Minute).Maybe() + k.On("LogMaxBytesPerBatch").Return(0).Maybe() + k.On("Transport").Return("jsonrpc").Maybe() + k.On("ReadEnrollSecret").Return("", nil).Maybe() + setUpMockStores(t, k) + serviceClient := mockServiceClient() + + runner := New(k, serviceClient, opts...) + + // Add in an extra instance + extraRegistrationId := ulid.New() + runner.instances[extraRegistrationId] = newInstance(k, serviceClient, opts...) + + // Start the instance + go runner.Run() + waitHealthy(t, runner, logBytes) + + // Confirm the default instance was started + require.NotEmpty(t, runner.instances[defaultRegistrationId].stats.StartTime, "start time should be added to default instance stats on start up") + require.NotEmpty(t, runner.instances[defaultRegistrationId].stats.ConnectTime, "connect time should be added to default instance stats on start up") + + // Confirm the additional instance was started + require.NotEmpty(t, runner.instances[extraRegistrationId].stats.StartTime, "start time should be added to secondary instance stats on start up") + require.NotEmpty(t, runner.instances[extraRegistrationId].stats.ConnectTime, "connect time should be added to secondary instance stats on start up") + + waitShutdown(t, runner, logBytes) +} + func TestMultipleShutdowns(t *testing.T) { t.Parallel() rootDirectory := testRootDirectory(t) From baab253df6d97f2a088f0970476a1ba8af3a336e Mon Sep 17 00:00:00 2001 From: RebeccaMahany Date: Tue, 5 Nov 2024 17:13:50 -0500 Subject: [PATCH 4/7] Avoid data race --- pkg/osquery/runtime/runner.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/osquery/runtime/runner.go b/pkg/osquery/runtime/runner.go index b594b11c6..858a98233 100644 --- a/pkg/osquery/runtime/runner.go +++ b/pkg/osquery/runtime/runner.go @@ -54,6 +54,7 @@ func (r *Runner) Run() error { // Start each worker for each instance for registrationId := range r.instances { + registrationId := registrationId wg.Go(func() error { if err := r.runInstance(registrationId); err != nil { r.slogger.Log(ctx, slog.LevelWarn, From b3e2ee91c443c3c5623418b50df615b6e5a21f67 Mon Sep 17 00:00:00 2001 From: RebeccaMahany Date: Wed, 6 Nov 2024 10:29:45 -0500 Subject: [PATCH 5/7] Add registration id to slogger --- pkg/osquery/runtime/osqueryinstance.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/osquery/runtime/osqueryinstance.go b/pkg/osquery/runtime/osqueryinstance.go index 7a51193c1..d439a5b75 100644 --- a/pkg/osquery/runtime/osqueryinstance.go +++ b/pkg/osquery/runtime/osqueryinstance.go @@ -194,7 +194,7 @@ func newInstance(registrationId string, knapsack types.Knapsack, serviceClient s i := &OsqueryInstance{ registrationId: registrationId, knapsack: knapsack, - slogger: knapsack.Slogger().With("component", "osquery_instance", "instance_run_id", runId), + slogger: knapsack.Slogger().With("component", "osquery_instance", "registration_id", registrationId, "instance_run_id", runId), serviceClient: serviceClient, runId: runId, } From 343d5d005dc747fdedf7ea4b6b2adf6171f18da1 Mon Sep 17 00:00:00 2001 From: RebeccaMahany Date: Wed, 6 Nov 2024 11:11:19 -0500 Subject: [PATCH 6/7] Shut down instances in parallel --- pkg/osquery/runtime/runner.go | 20 +++++++++++++------- pkg/osquery/runtime/runtime_test.go | 4 ++++ 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/pkg/osquery/runtime/runner.go b/pkg/osquery/runtime/runner.go index b99e050b5..18f5cb530 100644 --- a/pkg/osquery/runtime/runner.go +++ b/pkg/osquery/runtime/runner.go @@ -184,16 +184,22 @@ func (r *Runner) triggerShutdownForInstances() error { r.instanceLock.Lock() defer r.instanceLock.Unlock() - shutdownErrs := make([]error, 0) + // Shut down the instances in parallel + shutdownWg, _ := errgroup.WithContext(context.Background()) for registrationId, instance := range r.instances { - instance.BeginShutdown() - if err := instance.WaitShutdown(); err != context.Canceled && err != nil { - shutdownErrs = append(shutdownErrs, fmt.Errorf("shutting down instance %s: %w", registrationId, err)) - } + registrationId := registrationId + instance := instance + shutdownWg.Go(func() error { + instance.BeginShutdown() + if err := instance.WaitShutdown(); err != context.Canceled && err != nil { + return fmt.Errorf("shutting down instance %s: %w", registrationId, err) + } + return nil + }) } - if len(shutdownErrs) > 0 { - return fmt.Errorf("shutting down all instances: %+v", shutdownErrs) + if err := shutdownWg.Wait(); err != nil { + return fmt.Errorf("shutting down all instances: %+v", err) } return nil diff --git a/pkg/osquery/runtime/runtime_test.go b/pkg/osquery/runtime/runtime_test.go index 998f32502..bdf6bb0b6 100644 --- a/pkg/osquery/runtime/runtime_test.go +++ b/pkg/osquery/runtime/runtime_test.go @@ -581,6 +581,10 @@ func TestMultipleInstances(t *testing.T) { require.NotEmpty(t, runner.instances[extraRegistrationId].stats.ConnectTime, "connect time should be added to secondary instance stats on start up") waitShutdown(t, runner, logBytes) + + // Confirm both instances exited + require.NotEmpty(t, runner.instances[defaultRegistrationId].stats.ExitTime, "exit time should be added to default instance stats on shutdown") + require.NotEmpty(t, runner.instances[extraRegistrationId].stats.ExitTime, "exit time should be added to secondary instance stats on shutdown") } func TestMultipleShutdowns(t *testing.T) { From 31d3b6b6ed3ed873be79bbe128a49b4e22f3ffc7 Mon Sep 17 00:00:00 2001 From: RebeccaMahany Date: Wed, 6 Nov 2024 11:24:19 -0500 Subject: [PATCH 7/7] Cleanup --- pkg/osquery/runtime/runner.go | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/pkg/osquery/runtime/runner.go b/pkg/osquery/runtime/runner.go index 18f5cb530..521bfd415 100644 --- a/pkg/osquery/runtime/runner.go +++ b/pkg/osquery/runtime/runner.go @@ -54,9 +54,9 @@ func (r *Runner) Run() error { // Start each worker for each instance for registrationId := range r.instances { - registrationId := registrationId + id := registrationId wg.Go(func() error { - if err := r.runInstance(registrationId); err != nil { + if err := r.runInstance(id); err != nil { r.slogger.Log(ctx, slog.LevelWarn, "runner terminated running osquery instance unexpectedly, shutting down runner", "err", err, @@ -77,7 +77,7 @@ func (r *Runner) Run() error { // Wait for all workers to exit if err := wg.Wait(); err != nil { - return fmt.Errorf("running instances: %w", err) + return fmt.Errorf("running osquery instances: %w", err) } return nil @@ -108,7 +108,6 @@ func (r *Runner) runInstance(registrationId string) error { <-instance.Exited() slogger.Log(context.TODO(), slog.LevelInfo, "osquery instance exited", - "registration_id", registrationId, ) select { @@ -187,12 +186,12 @@ func (r *Runner) triggerShutdownForInstances() error { // Shut down the instances in parallel shutdownWg, _ := errgroup.WithContext(context.Background()) for registrationId, instance := range r.instances { - registrationId := registrationId - instance := instance + id := registrationId + i := instance shutdownWg.Go(func() error { - instance.BeginShutdown() - if err := instance.WaitShutdown(); err != context.Canceled && err != nil { - return fmt.Errorf("shutting down instance %s: %w", registrationId, err) + i.BeginShutdown() + if err := i.WaitShutdown(); err != context.Canceled && err != nil { + return fmt.Errorf("shutting down instance %s: %w", id, err) } return nil })