From b3ebf1713a51dd99d970838649ad10b1584d601b Mon Sep 17 00:00:00 2001 From: Injun Song Date: Wed, 12 Jul 2023 18:16:09 +0900 Subject: [PATCH] fix(benchmark): support graceful stop This change makes the benchmark loader stop gracefully. Previously the benchmark loader stopped regardless of inflight requests. In terms of performance testing, it is not important though it is not intended. From now, the benchmark tool will stop its performance testing gracefully; that is, it won't terminate ongoing requests. --- internal/benchmark/benchmark.go | 8 +++-- internal/benchmark/loader.go | 54 ++++++++++++++++++++++----------- 2 files changed, 42 insertions(+), 20 deletions(-) diff --git a/internal/benchmark/benchmark.go b/internal/benchmark/benchmark.go index 4e53e53ed..ea750f9ff 100644 --- a/internal/benchmark/benchmark.go +++ b/internal/benchmark/benchmark.go @@ -18,6 +18,7 @@ type Benchmark struct { config loaders []*Loader metrics Metrics + stopC chan struct{} } // New creates a new Benchmark and returns it. Users must call Close to release resources if it returns successfully. @@ -34,6 +35,7 @@ func New(opts ...Option) (bm *Benchmark, err error) { bm = &Benchmark{ config: cfg, + stopC: make(chan struct{}), } defer func() { @@ -52,6 +54,7 @@ func New(opts ...Option) (bm *Benchmark, err error) { mraddrs: bm.mraddrs, metrics: loaderMetrics, singleConnPerTarget: bm.singleConnPerTarget, + stopC: bm.stopC, }) if err != nil { return bm, err @@ -65,8 +68,7 @@ func New(opts ...Option) (bm *Benchmark, err error) { // Run starts Loaders and metric reporter. It blocks until the loaders are finished. func (bm *Benchmark) Run() error { - ctx, cancel := context.WithCancel(context.Background()) - g, ctx := errgroup.WithContext(ctx) + g, ctx := errgroup.WithContext(context.Background()) benchmarkTimer := time.NewTimer(bm.duration) @@ -81,13 +83,13 @@ func (bm *Benchmark) Run() error { wg.Add(1) go func() { defer func() { - cancel() wg.Done() fmt.Println(MustEncode(bm.reportEncoder, bm.metrics.Flush())) }() for { select { case <-benchmarkTimer.C: + close(bm.stopC) finished = true slog.Debug("benchmark is finished") return diff --git a/internal/benchmark/loader.go b/internal/benchmark/loader.go index bbbf3038b..7f3f65e6e 100644 --- a/internal/benchmark/loader.go +++ b/internal/benchmark/loader.go @@ -2,6 +2,7 @@ package benchmark import ( "context" + "errors" "fmt" "time" @@ -21,6 +22,7 @@ type loaderConfig struct { mraddrs []string metrics *LoaderMetrics singleConnPerTarget bool + stopC <-chan struct{} } type Loader struct { @@ -95,12 +97,10 @@ func NewLoader(cfg loaderConfig) (loader *Loader, err error) { func (loader *Loader) Run(ctx context.Context) (err error) { loader.metrics.Reset(time.Now()) - ctx, cancel := context.WithCancel(ctx) g, ctx := errgroup.WithContext(ctx) defer func() { err = multierr.Append(err, g.Wait()) - cancel() }() for i := 0; i < len(loader.apps); i++ { @@ -137,7 +137,7 @@ func (loader *Loader) Close() error { return err } -func (loader *Loader) makeAppendFunc(ctx context.Context, c varlog.Log, am *AppendMetrics) (appendFunc func(), closeFunc func()) { +func (loader *Loader) makeAppendFunc(ctx context.Context, c varlog.Log, am *AppendMetrics) (appendFunc func() error, closeFunc func(), err error) { begin := true notifyBegin := func(meta varlogpb.LogEntryMeta) { loader.begin.ch <- varlogpb.LogSequenceNumber{ @@ -172,11 +172,11 @@ func (loader *Loader) makeAppendFunc(ctx context.Context, c varlog.Log, am *Appe closeFunc = func() {} if lsid.Invalid() { - return func() { + appendFunc = func() error { ts := time.Now() res := c.Append(ctx, loader.TopicID, loader.batch) if res.Err != nil { - panic(res.Err) + return res.Err } dur := time.Since(ts) recordMetrics(dur) @@ -184,15 +184,17 @@ func (loader *Loader) makeAppendFunc(ctx context.Context, c varlog.Log, am *Appe notifyBegin(res.Metadata[0]) } debugLog(res.Metadata) - }, closeFunc + return nil + } + return appendFunc, closeFunc, nil } if loader.PipelineSize == 0 { - return func() { + appendFunc = func() error { ts := time.Now() res := c.AppendTo(ctx, tpid, lsid, loader.batch) if res.Err != nil { - panic(res.Err) + return res.Err } dur := time.Since(ts) recordMetrics(dur) @@ -200,21 +202,29 @@ func (loader *Loader) makeAppendFunc(ctx context.Context, c varlog.Log, am *Appe notifyBegin(res.Metadata[0]) } debugLog(res.Metadata) - }, closeFunc + return nil + } + return appendFunc, closeFunc, nil } lsa, err := c.NewLogStreamAppender(loader.TopicID, loader.LogStreamID, varlog.WithPipelineSize(loader.PipelineSize), ) if err != nil { - panic(err) + return nil, nil, err } closeFunc = lsa.Close - return func() { + + appendFunc = func() error { ts := time.Now() err := lsa.AppendBatch(loader.batch, func(lem []varlogpb.LogEntryMeta, err error) { if err != nil { - panic(err) + if errors.Is(err, varlog.ErrClosed) { + loader.logger.Debug("closed client", err, slog.Any("tpid", tpid), slog.Any("lsid", lsid)) + } else { + loader.logger.Error("could not append", err, slog.Any("tpid", tpid), slog.Any("lsid", lsid)) + } + return } dur := time.Since(ts) recordMetrics(dur) @@ -223,19 +233,23 @@ func (loader *Loader) makeAppendFunc(ctx context.Context, c varlog.Log, am *Appe } debugLog(lem) }) - if err != nil { - panic(err) - } - }, closeFunc + return err + } + return appendFunc, closeFunc, nil } func (loader *Loader) appendLoop(ctx context.Context, c varlog.Log) error { var am AppendMetrics - appendFunc, closeFunc := loader.makeAppendFunc(ctx, c, &am) + appendFunc, closeFunc, err := loader.makeAppendFunc(ctx, c, &am) + if err != nil { + return err + } defer closeFunc() for { select { + case <-loader.stopC: + return nil case <-ctx.Done(): return ctx.Err() default: @@ -268,6 +282,8 @@ func (loader *Loader) subscribeLoop(ctx context.Context, c varlog.Log) error { defer closer() select { + case <-loader.stopC: + return nil case <-stop: if subErr != nil { return fmt.Errorf("subscribe: %w", subErr) @@ -284,6 +300,8 @@ func (loader *Loader) subscribeLoop(ctx context.Context, c varlog.Log) error { }() for { select { + case <-loader.stopC: + return nil case <-ctx.Done(): return ctx.Err() default: @@ -308,6 +326,8 @@ func (loader *Loader) setBeginLSN(ctx context.Context) error { } for i := uint(0); i < loader.AppendersCount; i++ { select { + case <-loader.stopC: + return nil case lsn := <-loader.begin.ch: if lsn.GLSN < beginLSN.GLSN { beginLSN = lsn