diff --git a/internal/benchmark/benchmark.go b/internal/benchmark/benchmark.go index 4e53e53ed..ea750f9ff 100644 --- a/internal/benchmark/benchmark.go +++ b/internal/benchmark/benchmark.go @@ -18,6 +18,7 @@ type Benchmark struct { config loaders []*Loader metrics Metrics + stopC chan struct{} } // New creates a new Benchmark and returns it. Users must call Close to release resources if it returns successfully. @@ -34,6 +35,7 @@ func New(opts ...Option) (bm *Benchmark, err error) { bm = &Benchmark{ config: cfg, + stopC: make(chan struct{}), } defer func() { @@ -52,6 +54,7 @@ func New(opts ...Option) (bm *Benchmark, err error) { mraddrs: bm.mraddrs, metrics: loaderMetrics, singleConnPerTarget: bm.singleConnPerTarget, + stopC: bm.stopC, }) if err != nil { return bm, err @@ -65,8 +68,7 @@ func New(opts ...Option) (bm *Benchmark, err error) { // Run starts Loaders and metric reporter. It blocks until the loaders are finished. func (bm *Benchmark) Run() error { - ctx, cancel := context.WithCancel(context.Background()) - g, ctx := errgroup.WithContext(ctx) + g, ctx := errgroup.WithContext(context.Background()) benchmarkTimer := time.NewTimer(bm.duration) @@ -81,13 +83,13 @@ func (bm *Benchmark) Run() error { wg.Add(1) go func() { defer func() { - cancel() wg.Done() fmt.Println(MustEncode(bm.reportEncoder, bm.metrics.Flush())) }() for { select { case <-benchmarkTimer.C: + close(bm.stopC) finished = true slog.Debug("benchmark is finished") return diff --git a/internal/benchmark/loader.go b/internal/benchmark/loader.go index bbbf3038b..7f3f65e6e 100644 --- a/internal/benchmark/loader.go +++ b/internal/benchmark/loader.go @@ -2,6 +2,7 @@ package benchmark import ( "context" + "errors" "fmt" "time" @@ -21,6 +22,7 @@ type loaderConfig struct { mraddrs []string metrics *LoaderMetrics singleConnPerTarget bool + stopC <-chan struct{} } type Loader struct { @@ -95,12 +97,10 @@ func NewLoader(cfg loaderConfig) (loader *Loader, err error) { func (loader *Loader) Run(ctx context.Context) (err error) { loader.metrics.Reset(time.Now()) - ctx, cancel := context.WithCancel(ctx) g, ctx := errgroup.WithContext(ctx) defer func() { err = multierr.Append(err, g.Wait()) - cancel() }() for i := 0; i < len(loader.apps); i++ { @@ -137,7 +137,7 @@ func (loader *Loader) Close() error { return err } -func (loader *Loader) makeAppendFunc(ctx context.Context, c varlog.Log, am *AppendMetrics) (appendFunc func(), closeFunc func()) { +func (loader *Loader) makeAppendFunc(ctx context.Context, c varlog.Log, am *AppendMetrics) (appendFunc func() error, closeFunc func(), err error) { begin := true notifyBegin := func(meta varlogpb.LogEntryMeta) { loader.begin.ch <- varlogpb.LogSequenceNumber{ @@ -172,11 +172,11 @@ func (loader *Loader) makeAppendFunc(ctx context.Context, c varlog.Log, am *Appe closeFunc = func() {} if lsid.Invalid() { - return func() { + appendFunc = func() error { ts := time.Now() res := c.Append(ctx, loader.TopicID, loader.batch) if res.Err != nil { - panic(res.Err) + return res.Err } dur := time.Since(ts) recordMetrics(dur) @@ -184,15 +184,17 @@ func (loader *Loader) makeAppendFunc(ctx context.Context, c varlog.Log, am *Appe notifyBegin(res.Metadata[0]) } debugLog(res.Metadata) - }, closeFunc + return nil + } + return appendFunc, closeFunc, nil } if loader.PipelineSize == 0 { - return func() { + appendFunc = func() error { ts := time.Now() res := c.AppendTo(ctx, tpid, lsid, loader.batch) if res.Err != nil { - panic(res.Err) + return res.Err } dur := time.Since(ts) recordMetrics(dur) @@ -200,21 +202,29 @@ func (loader *Loader) makeAppendFunc(ctx context.Context, c varlog.Log, am *Appe notifyBegin(res.Metadata[0]) } debugLog(res.Metadata) - }, closeFunc + return nil + } + return appendFunc, closeFunc, nil } lsa, err := c.NewLogStreamAppender(loader.TopicID, loader.LogStreamID, varlog.WithPipelineSize(loader.PipelineSize), ) if err != nil { - panic(err) + return nil, nil, err } closeFunc = lsa.Close - return func() { + + appendFunc = func() error { ts := time.Now() err := lsa.AppendBatch(loader.batch, func(lem []varlogpb.LogEntryMeta, err error) { if err != nil { - panic(err) + if errors.Is(err, varlog.ErrClosed) { + loader.logger.Debug("closed client", err, slog.Any("tpid", tpid), slog.Any("lsid", lsid)) + } else { + loader.logger.Error("could not append", err, slog.Any("tpid", tpid), slog.Any("lsid", lsid)) + } + return } dur := time.Since(ts) recordMetrics(dur) @@ -223,19 +233,23 @@ func (loader *Loader) makeAppendFunc(ctx context.Context, c varlog.Log, am *Appe } debugLog(lem) }) - if err != nil { - panic(err) - } - }, closeFunc + return err + } + return appendFunc, closeFunc, nil } func (loader *Loader) appendLoop(ctx context.Context, c varlog.Log) error { var am AppendMetrics - appendFunc, closeFunc := loader.makeAppendFunc(ctx, c, &am) + appendFunc, closeFunc, err := loader.makeAppendFunc(ctx, c, &am) + if err != nil { + return err + } defer closeFunc() for { select { + case <-loader.stopC: + return nil case <-ctx.Done(): return ctx.Err() default: @@ -268,6 +282,8 @@ func (loader *Loader) subscribeLoop(ctx context.Context, c varlog.Log) error { defer closer() select { + case <-loader.stopC: + return nil case <-stop: if subErr != nil { return fmt.Errorf("subscribe: %w", subErr) @@ -284,6 +300,8 @@ func (loader *Loader) subscribeLoop(ctx context.Context, c varlog.Log) error { }() for { select { + case <-loader.stopC: + return nil case <-ctx.Done(): return ctx.Err() default: @@ -308,6 +326,8 @@ func (loader *Loader) setBeginLSN(ctx context.Context) error { } for i := uint(0); i < loader.AppendersCount; i++ { select { + case <-loader.stopC: + return nil case lsn := <-loader.begin.ch: if lsn.GLSN < beginLSN.GLSN { beginLSN = lsn