diff --git a/pkg/osquery/runtime/osqueryinstance.go b/pkg/osquery/runtime/osqueryinstance.go index 8fa121722..09b4871dd 100644 --- a/pkg/osquery/runtime/osqueryinstance.go +++ b/pkg/osquery/runtime/osqueryinstance.go @@ -596,8 +596,7 @@ func (o *OsqueryInstance) StartOsqueryExtensionManagerServer(name string, socket <-o.doneCtx.Done() o.slogger.Log(context.TODO(), slog.LevelDebug, - "exiting errgroup", - "errgroup", "starting extension shutdown", + "starting extension shutdown", "extension_name", name, ) diff --git a/pkg/osquery/runtime/runner.go b/pkg/osquery/runtime/runner.go index 1f02d1fa7..1a9f063d7 100644 --- a/pkg/osquery/runtime/runner.go +++ b/pkg/osquery/runtime/runner.go @@ -481,13 +481,6 @@ func (r *Runner) launchOsqueryInstance() error { span.AddEvent("extension_server_created") } - if err := o.stats.Connected(o); err != nil { - r.slogger.Log(ctx, slog.LevelWarn, - "could not set connection time for osquery instance history", - "err", err, - ) - } - // Now spawn an extension manager for the tables. We need to // start this one in the background, because the runner.Start // function needs to return promptly enough for osquery to use @@ -517,6 +510,15 @@ func (r *Runner) launchOsqueryInstance() error { return nil }) + // All done with osquery setup! Mark instance as connected, then proceed + // with setting up remaining errgroups. + if err := o.stats.Connected(o); err != nil { + r.slogger.Log(ctx, slog.LevelWarn, + "could not set connection time for osquery instance history", + "err", err, + ) + } + // Health check on interval o.errgroup.Go(func() error { defer r.slogger.Log(ctx, slog.LevelInfo, @@ -612,9 +614,16 @@ func (r *Runner) launchOsqueryInstance() error { ) <-o.doneCtx.Done() - if err := os.Remove(paths.pidfilePath); err != nil { + // We do a couple retries -- on Windows, the PID file may still be in use + // and therefore unable to be removed. + if err := backoff.WaitFor(func() error { + if err := os.Remove(paths.pidfilePath); err != nil && !os.IsNotExist(err) { + return fmt.Errorf("removing PID file: %w", err) + } + return nil + }, 5*time.Second, 500*time.Millisecond); err != nil { r.slogger.Log(ctx, slog.LevelInfo, - "could not remove PID file", + "could not remove PID file, despite retries", "pid_file", paths.pidfilePath, "err", err, ) diff --git a/pkg/osquery/runtime/runtime_helpers_windows.go b/pkg/osquery/runtime/runtime_helpers_windows.go index cfbe90799..5c5dda948 100644 --- a/pkg/osquery/runtime/runtime_helpers_windows.go +++ b/pkg/osquery/runtime/runtime_helpers_windows.go @@ -60,7 +60,7 @@ func SocketPath(rootDir string) string { // launcher and osquery. We would like to be able to run multiple // launchers. // - // We could use something based on the laumcher root, but given the + // We could use something based on the launcher root, but given the // context this runs in a ulid seems simpler. return fmt.Sprintf(`\\.\pipe\kolide-osquery-%s`, ulid.New()) } diff --git a/pkg/osquery/runtime/runtime_posix_test.go b/pkg/osquery/runtime/runtime_posix_test.go index e2ee780f0..bbca95fb2 100644 --- a/pkg/osquery/runtime/runtime_posix_test.go +++ b/pkg/osquery/runtime/runtime_posix_test.go @@ -78,11 +78,11 @@ func TestOsquerySlowStart(t *testing.T) { }), ) go runner.Run() - waitHealthy(t, runner) + waitHealthy(t, runner, &logBytes) // ensure that we actually had to wait on the socket require.Contains(t, logBytes.String(), "osquery extension socket not created yet") - require.NoError(t, runner.Shutdown()) + waitShutdown(t, runner, &logBytes) } // TestExtensionSocketPath tests that the launcher can start osqueryd with a custom extension socket path. @@ -94,11 +94,17 @@ func TestExtensionSocketPath(t *testing.T) { require.NoError(t, err) defer rmRootDirectory() + var logBytes threadsafebuffer.ThreadSafeBuffer + slogger := slog.New(slog.NewTextHandler(&logBytes, &slog.HandlerOptions{ + AddSource: true, + Level: slog.LevelDebug, + })) + k := typesMocks.NewKnapsack(t) k.On("OsqueryHealthcheckStartupDelay").Return(0 * time.Second).Maybe() k.On("WatchdogEnabled").Return(false) k.On("RegisterChangeObserver", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything) - k.On("Slogger").Return(multislogger.NewNopLogger()) + k.On("Slogger").Return(slogger) k.On("LatestOsquerydPath", mock.Anything).Return(testOsqueryBinaryDirectory) store, err := storageci.NewStore(t, multislogger.NewNopLogger(), storage.KatcConfigStore.String()) require.NoError(t, err) @@ -113,7 +119,7 @@ func TestExtensionSocketPath(t *testing.T) { ) go runner.Run() - waitHealthy(t, runner) + waitHealthy(t, runner, &logBytes) // wait for the launcher-provided extension to register time.Sleep(2 * time.Second) @@ -127,7 +133,7 @@ func TestExtensionSocketPath(t *testing.T) { assert.Equal(t, int32(0), resp.Status.Code) assert.Equal(t, "OK", resp.Status.Message) - require.NoError(t, runner.Shutdown()) + waitShutdown(t, runner, &logBytes) } // TestRestart tests that the launcher can restart the osqueryd process. @@ -135,13 +141,13 @@ func TestExtensionSocketPath(t *testing.T) { // Should investigate why this is the case. func TestRestart(t *testing.T) { t.Parallel() - runner, teardown := setupOsqueryInstanceForTests(t) + runner, logBytes, teardown := setupOsqueryInstanceForTests(t) defer teardown() previousStats := runner.instance.stats require.NoError(t, runner.Restart()) - waitHealthy(t, runner) + waitHealthy(t, runner, logBytes) require.NotEmpty(t, runner.instance.stats.StartTime, "start time should be set on latest instance stats after restart") require.NotEmpty(t, runner.instance.stats.ConnectTime, "connect time should be set on latest instance stats after restart") @@ -152,7 +158,7 @@ func TestRestart(t *testing.T) { previousStats = runner.instance.stats require.NoError(t, runner.Restart()) - waitHealthy(t, runner) + waitHealthy(t, runner, logBytes) require.NotEmpty(t, runner.instance.stats.StartTime, "start time should be added to latest instance stats after restart") require.NotEmpty(t, runner.instance.stats.ConnectTime, "connect time should be added to latest instance stats after restart") diff --git a/pkg/osquery/runtime/runtime_test.go b/pkg/osquery/runtime/runtime_test.go index 25d80f171..48e49cce8 100644 --- a/pkg/osquery/runtime/runtime_test.go +++ b/pkg/osquery/runtime/runtime_test.go @@ -6,6 +6,7 @@ import ( "context" "errors" "fmt" + "log/slog" "os" "os/exec" "path/filepath" @@ -24,6 +25,7 @@ import ( "github.com/kolide/launcher/pkg/log/multislogger" "github.com/kolide/launcher/pkg/osquery/runtime/history" "github.com/kolide/launcher/pkg/packaging" + "github.com/kolide/launcher/pkg/threadsafebuffer" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -330,11 +332,17 @@ func TestWithOsqueryFlags(t *testing.T) { require.NoError(t, err) defer rmRootDirectory() + var logBytes threadsafebuffer.ThreadSafeBuffer + slogger := slog.New(slog.NewTextHandler(&logBytes, &slog.HandlerOptions{ + AddSource: true, + Level: slog.LevelDebug, + })) + k := typesMocks.NewKnapsack(t) k.On("OsqueryHealthcheckStartupDelay").Return(0 * time.Second).Maybe() k.On("WatchdogEnabled").Return(false) k.On("RegisterChangeObserver", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything) - k.On("Slogger").Return(multislogger.NewNopLogger()) + k.On("Slogger").Return(slogger) k.On("LatestOsquerydPath", mock.Anything).Return(testOsqueryBinaryDirectory) store, err := storageci.NewStore(t, multislogger.NewNopLogger(), storage.KatcConfigStore.String()) require.NoError(t, err) @@ -347,10 +355,10 @@ func TestWithOsqueryFlags(t *testing.T) { WithOsqueryFlags([]string{"verbose=false"}), ) go runner.Run() - waitHealthy(t, runner) + waitHealthy(t, runner, &logBytes) assert.Equal(t, []string{"verbose=false"}, runner.instance.opts.osqueryFlags) - runner.Interrupt(errors.New("test error")) + waitShutdown(t, runner, &logBytes) } func TestFlagsChanged(t *testing.T) { @@ -360,6 +368,12 @@ func TestFlagsChanged(t *testing.T) { require.NoError(t, err) defer rmRootDirectory() + var logBytes threadsafebuffer.ThreadSafeBuffer + slogger := slog.New(slog.NewTextHandler(&logBytes, &slog.HandlerOptions{ + AddSource: true, + Level: slog.LevelDebug, + })) + k := typesMocks.NewKnapsack(t) k.On("OsqueryHealthcheckStartupDelay").Return(0 * time.Second).Maybe() // First, it should return false, then on the next call, it should return true @@ -369,7 +383,7 @@ func TestFlagsChanged(t *testing.T) { k.On("WatchdogUtilizationLimitPercent").Return(20) k.On("WatchdogDelaySec").Return(120) k.On("RegisterChangeObserver", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything) - k.On("Slogger").Return(multislogger.NewNopLogger()) + k.On("Slogger").Return(slogger) k.On("LatestOsquerydPath", mock.Anything).Return(testOsqueryBinaryDirectory) store, err := storageci.NewStore(t, multislogger.NewNopLogger(), storage.KatcConfigStore.String()) require.NoError(t, err) @@ -386,7 +400,7 @@ func TestFlagsChanged(t *testing.T) { // Wait for the instance to start time.Sleep(2 * time.Second) - waitHealthy(t, runner) + waitHealthy(t, runner, &logBytes) // Confirm watchdog is disabled watchdogDisabled := false @@ -404,7 +418,7 @@ func TestFlagsChanged(t *testing.T) { // Wait for the instance to restart time.Sleep(2 * time.Second) - waitHealthy(t, runner) + waitHealthy(t, runner, &logBytes) // Now confirm that the instance is new require.NotEqual(t, startingInstance, runner.instance, "instance not replaced") @@ -441,18 +455,49 @@ func TestFlagsChanged(t *testing.T) { k.AssertExpectations(t) - runner.Interrupt(errors.New("test error")) + waitShutdown(t, runner, &logBytes) +} + +func waitShutdown(t *testing.T, runner *Runner, logBytes *threadsafebuffer.ThreadSafeBuffer) { + // We don't want to retry shutdowns because subsequent shutdown calls don't do anything -- + // they return nil immediately, which would give `backoff` the impression that shutdown has + // completed when it hasn't. + // Instead, call `Shutdown` once, wait for our timeout (1 minute), and report failure if + // `Shutdown` has not returned. + shutdownErr := make(chan error) + go func() { + shutdownErr <- runner.Shutdown() + }() + + select { + case err := <-shutdownErr: + require.NoError(t, err, fmt.Sprintf("runner logs: %s", logBytes.String())) + case <-time.After(1 * time.Minute): + t.Error("runner did not shut down within timeout", fmt.Sprintf("runner logs: %s", logBytes.String())) + t.FailNow() + } } // waitHealthy expects the instance to be healthy within 30 seconds, or else -// fatals the test -func waitHealthy(t *testing.T, runner *Runner) { +// fatals the test. +func waitHealthy(t *testing.T, runner *Runner, logBytes *threadsafebuffer.ThreadSafeBuffer) { require.NoError(t, backoff.WaitFor(func() error { - if runner.Healthy() == nil { - return nil + // Instance self-reports as healthy + if err := runner.Healthy(); err != nil { + return fmt.Errorf("instance not healthy: %w", err) } - return fmt.Errorf("instance not healthy") - }, 30*time.Second, 1*time.Second)) + + // Confirms osquery instance setup is complete + if runner.instance != nil && runner.instance.stats.ConnectTime == "" { + return errors.New("no connect time set yet") + } + + // Good to go + return nil + }, 30*time.Second, 1*time.Second), fmt.Sprintf("runner logs: %s", logBytes.String())) + + // Give the instance just a little bit of buffer before we proceed + time.Sleep(2 * time.Second) } func TestSimplePath(t *testing.T) { @@ -461,11 +506,17 @@ func TestSimplePath(t *testing.T) { require.NoError(t, err) defer rmRootDirectory() + var logBytes threadsafebuffer.ThreadSafeBuffer + slogger := slog.New(slog.NewTextHandler(&logBytes, &slog.HandlerOptions{ + AddSource: true, + Level: slog.LevelDebug, + })) + k := typesMocks.NewKnapsack(t) k.On("OsqueryHealthcheckStartupDelay").Return(0 * time.Second).Maybe() k.On("WatchdogEnabled").Return(false) k.On("RegisterChangeObserver", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything) - k.On("Slogger").Return(multislogger.NewNopLogger()) + k.On("Slogger").Return(slogger) k.On("LatestOsquerydPath", mock.Anything).Return(testOsqueryBinaryDirectory) store, err := storageci.NewStore(t, multislogger.NewNopLogger(), storage.KatcConfigStore.String()) require.NoError(t, err) @@ -478,12 +529,12 @@ func TestSimplePath(t *testing.T) { ) go runner.Run() - waitHealthy(t, runner) + waitHealthy(t, runner, &logBytes) require.NotEmpty(t, runner.instance.stats.StartTime, "start time should be added to instance stats on start up") require.NotEmpty(t, runner.instance.stats.ConnectTime, "connect time should be added to instance stats on start up") - require.NoError(t, runner.Shutdown()) + waitShutdown(t, runner, &logBytes) } func TestMultipleShutdowns(t *testing.T) { @@ -492,11 +543,17 @@ func TestMultipleShutdowns(t *testing.T) { require.NoError(t, err) defer rmRootDirectory() + var logBytes threadsafebuffer.ThreadSafeBuffer + slogger := slog.New(slog.NewTextHandler(&logBytes, &slog.HandlerOptions{ + AddSource: true, + Level: slog.LevelDebug, + })) + k := typesMocks.NewKnapsack(t) k.On("OsqueryHealthcheckStartupDelay").Return(0 * time.Second).Maybe() k.On("WatchdogEnabled").Return(false) k.On("RegisterChangeObserver", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything) - k.On("Slogger").Return(multislogger.NewNopLogger()) + k.On("Slogger").Return(slogger) k.On("LatestOsquerydPath", mock.Anything).Return(testOsqueryBinaryDirectory) store, err := storageci.NewStore(t, multislogger.NewNopLogger(), storage.KatcConfigStore.String()) require.NoError(t, err) @@ -509,10 +566,10 @@ func TestMultipleShutdowns(t *testing.T) { ) go runner.Run() - waitHealthy(t, runner) + waitHealthy(t, runner, &logBytes) for i := 0; i < 3; i += 1 { - require.NoError(t, runner.Shutdown(), "expected no error on calling shutdown but received error on attempt: ", i) + waitShutdown(t, runner, &logBytes) } } @@ -522,11 +579,17 @@ func TestOsqueryDies(t *testing.T) { require.NoError(t, err) defer rmRootDirectory() + var logBytes threadsafebuffer.ThreadSafeBuffer + slogger := slog.New(slog.NewTextHandler(&logBytes, &slog.HandlerOptions{ + AddSource: true, + Level: slog.LevelDebug, + })) + k := typesMocks.NewKnapsack(t) k.On("OsqueryHealthcheckStartupDelay").Return(0 * time.Second).Maybe() k.On("WatchdogEnabled").Return(false) k.On("RegisterChangeObserver", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything) - k.On("Slogger").Return(multislogger.NewNopLogger()) + k.On("Slogger").Return(slogger) k.On("LatestOsquerydPath", mock.Anything).Return(testOsqueryBinaryDirectory) store, err := storageci.NewStore(t, multislogger.NewNopLogger(), storage.KatcConfigStore.String()) require.NoError(t, err) @@ -540,7 +603,7 @@ func TestOsqueryDies(t *testing.T) { go runner.Run() require.NoError(t, err) - waitHealthy(t, runner) + waitHealthy(t, runner, &logBytes) previousStats := runner.instance.stats @@ -550,11 +613,11 @@ func TestOsqueryDies(t *testing.T) { runner.instance.errgroup.Wait() runner.instanceLock.Unlock() - waitHealthy(t, runner) + waitHealthy(t, runner, &logBytes) require.NotEmpty(t, previousStats.Error, "error should be added to stats when unexpected shutdown") require.NotEmpty(t, previousStats.ExitTime, "exit time should be added to instance when unexpected shutdown") - require.NoError(t, runner.Shutdown()) + waitShutdown(t, runner, &logBytes) } func TestNotStarted(t *testing.T) { @@ -588,7 +651,7 @@ func TestExtensionIsCleanedUp(t *testing.T) { t.Skip("https://github.com/kolide/launcher/issues/478") t.Parallel() - runner, teardown := setupOsqueryInstanceForTests(t) + runner, logBytes, teardown := setupOsqueryInstanceForTests(t) defer teardown() requirePgidMatch(t, runner.instance.cmd.Process.Pid) @@ -605,17 +668,23 @@ func TestExtensionIsCleanedUp(t *testing.T) { timer1 := time.NewTimer(35 * time.Second) // Wait for osquery to respawn - waitHealthy(t, runner) + waitHealthy(t, runner, logBytes) // Ensure we've waited at least 32s <-timer1.C } // sets up an osquery instance with a running extension to be used in tests. -func setupOsqueryInstanceForTests(t *testing.T) (runner *Runner, teardown func()) { +func setupOsqueryInstanceForTests(t *testing.T) (runner *Runner, logBytes *threadsafebuffer.ThreadSafeBuffer, teardown func()) { rootDirectory, rmRootDirectory, err := osqueryTempDir() require.NoError(t, err) + logBytes = &threadsafebuffer.ThreadSafeBuffer{} + slogger := slog.New(slog.NewTextHandler(logBytes, &slog.HandlerOptions{ + AddSource: true, + Level: slog.LevelDebug, + })) + k := typesMocks.NewKnapsack(t) k.On("OsqueryHealthcheckStartupDelay").Return(0 * time.Second).Maybe() k.On("WatchdogEnabled").Return(true) @@ -623,7 +692,7 @@ func setupOsqueryInstanceForTests(t *testing.T) (runner *Runner, teardown func() k.On("WatchdogUtilizationLimitPercent").Return(20) k.On("WatchdogDelaySec").Return(120) k.On("RegisterChangeObserver", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Maybe() - k.On("Slogger").Return(multislogger.NewNopLogger()) + k.On("Slogger").Return(slogger) k.On("LatestOsquerydPath", mock.Anything).Return(testOsqueryBinaryDirectory) store, err := storageci.NewStore(t, multislogger.NewNopLogger(), storage.KatcConfigStore.String()) require.NoError(t, err) @@ -635,13 +704,13 @@ func setupOsqueryInstanceForTests(t *testing.T) (runner *Runner, teardown func() WithRootDirectory(rootDirectory), ) go runner.Run() - waitHealthy(t, runner) + waitHealthy(t, runner, logBytes) requirePgidMatch(t, runner.instance.cmd.Process.Pid) teardown = func() { defer rmRootDirectory() - require.NoError(t, runner.Shutdown()) + waitShutdown(t, runner, logBytes) } - return runner, teardown + return runner, logBytes, teardown }