Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements for osqueryinstance's errgroup #2017

Merged
merged 14 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 163 additions & 0 deletions ee/errgroup/errgroup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package errgroup

import (
"context"
"fmt"
"log/slog"
"time"

"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)

type LoggedErrgroup struct {
errgroup *errgroup.Group
cancel context.CancelFunc
doneCtx context.Context // nolint:containedctx
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same nolint exception we had previously, just moved from osqueryinstance to here. In the future I think we might be able to get rid of this entirely -- but the errgroup is very sensitive when it comes to updating anything that touches doneCtx so I want to do it in a separate PR at a later time.

slogger *slog.Logger
}

const (
maxShutdownGoroutineDuration = 3 * time.Second
)

func NewLoggedErrgroup(ctx context.Context, slogger *slog.Logger) *LoggedErrgroup {
ctx, cancel := context.WithCancel(ctx)
e, doneCtx := errgroup.WithContext(ctx)

return &LoggedErrgroup{
errgroup: e,
cancel: cancel,
doneCtx: doneCtx,
slogger: slogger,
}
}

// StartGoroutine starts the given goroutine in the errgroup, ensuring that we log its start and exit.
func (l *LoggedErrgroup) StartGoroutine(ctx context.Context, goroutineName string, goroutine func() error) {
l.errgroup.Go(func() (err error) {
slogger := l.slogger.With("goroutine_name", goroutineName)

// Catch any panicking goroutines and log them. We also want to make sure
// we return an error from this goroutine overall if it panics.
defer func() {
James-Pickett marked this conversation as resolved.
Show resolved Hide resolved
if r := recover(); r != nil {
slogger.Log(ctx, slog.LevelError,
"panic occurred in goroutine",
"err", r,
)
if recoveredErr, ok := r.(error); ok {
slogger.Log(ctx, slog.LevelError,
"panic stack trace",
"stack_trace", fmt.Sprintf("%+v", errors.WithStack(recoveredErr)),
)
err = recoveredErr
}
}
}()

slogger.Log(ctx, slog.LevelInfo,
"starting goroutine in errgroup",
)

err = goroutine()

slogger.Log(ctx, slog.LevelInfo,
"exiting goroutine in errgroup",
"goroutine_err", err,
)

return err
})
}

// StartRepeatedGoroutine starts the given goroutine in the errgroup, ensuring that we log its start and exit.
// If the delay is non-zero, the goroutine will not start until after the delay interval has elapsed. The goroutine
// will run on the given interval, and will continue to run until it returns an error or the errgroup shuts down.
func (l *LoggedErrgroup) StartRepeatedGoroutine(ctx context.Context, goroutineName string, interval time.Duration, delay time.Duration, goroutine func() error) {
l.StartGoroutine(ctx, goroutineName, func() error {
slogger := l.slogger.With("goroutine_name", goroutineName)

if delay != 0*time.Second {
select {
case <-time.After(delay):
slogger.Log(ctx, slog.LevelDebug,
"exiting delay before starting repeated goroutine",
)
case <-l.doneCtx.Done():
return nil
}
}

ticker := time.NewTicker(interval)
defer ticker.Stop()

for {
// Run goroutine immediately
if err := goroutine(); err != nil {
slogger.Log(ctx, slog.LevelInfo,
"exiting repeated goroutine in errgroup due to error",
"goroutine_err", err,
)
return err
}

// Wait for next interval or for errgroup shutdown
select {
case <-l.doneCtx.Done():
slogger.Log(ctx, slog.LevelInfo,
"exiting repeated goroutine in errgroup due to shutdown",
)
return nil
case <-ticker.C:
continue
}
}
})
}

// AddShutdownGoroutine adds the given goroutine to the errgroup, ensuring that we log its start and exit.
// The goroutine will not execute until the errgroup has received a signal to exit.
func (l *LoggedErrgroup) AddShutdownGoroutine(ctx context.Context, goroutineName string, goroutine func() error) {
l.StartGoroutine(ctx, goroutineName, func() error {
slogger := l.slogger.With("goroutine_name", goroutineName)

// Wait for errgroup to exit
<-l.doneCtx.Done()

goroutineStart := time.Now()
err := goroutine()
elapsedTime := time.Since(goroutineStart)

// Log anything amiss about the shutdown goroutine -- did it return an error? Did it take too long?
if err != nil {
slogger.Log(ctx, slog.LevelWarn,
"shutdown routine returned err",
"goroutine_run_time", elapsedTime.String(),
"goroutine_err", err,
)
} else if elapsedTime > maxShutdownGoroutineDuration {
slogger.Log(ctx, slog.LevelWarn,
"noticed slow shutdown routine",
"goroutine_run_time", elapsedTime.String(),
)
}

// We don't want to actually return the error here, to avoid causing an otherwise successful call
// to `Shutdown` => `Wait` to return an error. Shutdown routine errors don't matter for the success
// of the errgroup overall.
return l.doneCtx.Err()
})
}

func (l *LoggedErrgroup) Shutdown() {
l.cancel()
}

func (l *LoggedErrgroup) Wait() error {
return l.errgroup.Wait()
}

func (l *LoggedErrgroup) Exited() <-chan struct{} {
return l.doneCtx.Done()
}
130 changes: 130 additions & 0 deletions ee/errgroup/errgroup_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package errgroup

import (
"context"
"errors"
"fmt"
"strconv"
"testing"
"time"

"github.com/kolide/launcher/pkg/log/multislogger"
"github.com/stretchr/testify/require"
)

func TestWait(t *testing.T) {
t.Parallel()

err1 := errors.New("errgroup_test: 1")
err2 := errors.New("errgroup_test: 2")

for _, tt := range []struct {
testCaseName string
errs []error
expectedErr error
}{
{
testCaseName: "no error on exit",
errs: []error{nil},
expectedErr: nil,
},
{
testCaseName: "only first routine has error on exit",
errs: []error{err1, nil},
expectedErr: err1,
},
{
testCaseName: "only second routine has error on exit",
errs: []error{nil, err2},
expectedErr: err2,
},
{
testCaseName: "multiple routines have error on exit",
errs: []error{err1, nil, err2},
expectedErr: err1,
},
} {
tt := tt
t.Run(tt.testCaseName, func(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.TODO())
defer cancel()

eg := NewLoggedErrgroup(ctx, multislogger.NewNopLogger())

for i, err := range tt.errs {
err := err
eg.StartGoroutine(ctx, strconv.Itoa(i), func() error { return err })
time.Sleep(500 * time.Millisecond) // try to enforce ordering of goroutines
}

// We should get the expected error when we wait for the routines to exit
require.Equal(t, tt.expectedErr, eg.Wait(), "incorrect error returned by errgroup")

// We expect that the errgroup shuts down
canceled := false
select {
case <-eg.Exited():
canceled = true
default:
}

require.True(t, canceled, "errgroup did not exit")
})
}
}

func TestShutdown(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.TODO())
defer cancel()

eg := NewLoggedErrgroup(ctx, multislogger.NewNopLogger())

eg.StartGoroutine(ctx, "test_goroutine", func() error {
return nil
})

// We should get the expected error when we wait for the routines to exit
eg.Shutdown()
require.Nil(t, eg.Wait(), "should not have returned error on shutdown")

// We expect that the errgroup shuts down
canceled := false
select {
case <-eg.Exited():
canceled = true
default:
}

require.True(t, canceled, "errgroup did not exit")
}

func Test_HandlesPanic(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.TODO())
defer cancel()

eg := NewLoggedErrgroup(ctx, multislogger.NewNopLogger())

eg.StartGoroutine(ctx, "test_goroutine", func() error {
testArr := make([]int, 0)
fmt.Println(testArr[2]) // cause out-of-bounds panic
return nil
})

// We expect that the errgroup shuts down -- the test should not panic
eg.Shutdown()
require.Error(t, eg.Wait(), "should have returned error from panicking goroutine")
canceled := false
select {
case <-eg.Exited():
canceled = true
default:
}

require.True(t, canceled, "errgroup did not exit")
}
Loading
Loading