Skip to content

Commit

Permalink
fix(benchmark): support graceful stop
Browse files Browse the repository at this point in the history
This change makes the benchmark loader stop gracefully. Previously the benchmark loader stopped
regardless of inflight requests. In terms of performance testing, it is not important though it is
not intended. From now, the benchmark tool will stop its performance testing gracefully; that is, it
won't terminate ongoing requests.
  • Loading branch information
ijsong committed Jul 12, 2023
1 parent 913e638 commit b3ebf17
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 20 deletions.
8 changes: 5 additions & 3 deletions internal/benchmark/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Benchmark struct {
config
loaders []*Loader
metrics Metrics
stopC chan struct{}
}

// New creates a new Benchmark and returns it. Users must call Close to release resources if it returns successfully.
Expand All @@ -34,6 +35,7 @@ func New(opts ...Option) (bm *Benchmark, err error) {

bm = &Benchmark{
config: cfg,
stopC: make(chan struct{}),
}

defer func() {
Expand All @@ -52,6 +54,7 @@ func New(opts ...Option) (bm *Benchmark, err error) {
mraddrs: bm.mraddrs,
metrics: loaderMetrics,
singleConnPerTarget: bm.singleConnPerTarget,
stopC: bm.stopC,
})
if err != nil {
return bm, err
Expand All @@ -65,8 +68,7 @@ func New(opts ...Option) (bm *Benchmark, err error) {

// Run starts Loaders and metric reporter. It blocks until the loaders are finished.
func (bm *Benchmark) Run() error {
ctx, cancel := context.WithCancel(context.Background())
g, ctx := errgroup.WithContext(ctx)
g, ctx := errgroup.WithContext(context.Background())

benchmarkTimer := time.NewTimer(bm.duration)

Expand All @@ -81,13 +83,13 @@ func (bm *Benchmark) Run() error {
wg.Add(1)
go func() {
defer func() {
cancel()
wg.Done()
fmt.Println(MustEncode(bm.reportEncoder, bm.metrics.Flush()))
}()
for {
select {
case <-benchmarkTimer.C:
close(bm.stopC)
finished = true
slog.Debug("benchmark is finished")
return
Expand Down
54 changes: 37 additions & 17 deletions internal/benchmark/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package benchmark

import (
"context"
"errors"
"fmt"
"time"

Expand All @@ -21,6 +22,7 @@ type loaderConfig struct {
mraddrs []string
metrics *LoaderMetrics
singleConnPerTarget bool
stopC <-chan struct{}
}

type Loader struct {
Expand Down Expand Up @@ -95,12 +97,10 @@ func NewLoader(cfg loaderConfig) (loader *Loader, err error) {
func (loader *Loader) Run(ctx context.Context) (err error) {
loader.metrics.Reset(time.Now())

ctx, cancel := context.WithCancel(ctx)
g, ctx := errgroup.WithContext(ctx)

defer func() {
err = multierr.Append(err, g.Wait())
cancel()
}()

for i := 0; i < len(loader.apps); i++ {
Expand Down Expand Up @@ -137,7 +137,7 @@ func (loader *Loader) Close() error {
return err
}

func (loader *Loader) makeAppendFunc(ctx context.Context, c varlog.Log, am *AppendMetrics) (appendFunc func(), closeFunc func()) {
func (loader *Loader) makeAppendFunc(ctx context.Context, c varlog.Log, am *AppendMetrics) (appendFunc func() error, closeFunc func(), err error) {
begin := true
notifyBegin := func(meta varlogpb.LogEntryMeta) {
loader.begin.ch <- varlogpb.LogSequenceNumber{
Expand Down Expand Up @@ -172,49 +172,59 @@ func (loader *Loader) makeAppendFunc(ctx context.Context, c varlog.Log, am *Appe

closeFunc = func() {}
if lsid.Invalid() {
return func() {
appendFunc = func() error {
ts := time.Now()
res := c.Append(ctx, loader.TopicID, loader.batch)
if res.Err != nil {
panic(res.Err)
return res.Err
}
dur := time.Since(ts)
recordMetrics(dur)
if begin {
notifyBegin(res.Metadata[0])
}
debugLog(res.Metadata)
}, closeFunc
return nil
}
return appendFunc, closeFunc, nil
}

if loader.PipelineSize == 0 {
return func() {
appendFunc = func() error {
ts := time.Now()
res := c.AppendTo(ctx, tpid, lsid, loader.batch)
if res.Err != nil {
panic(res.Err)
return res.Err
}
dur := time.Since(ts)
recordMetrics(dur)
if begin {
notifyBegin(res.Metadata[0])
}
debugLog(res.Metadata)
}, closeFunc
return nil
}
return appendFunc, closeFunc, nil
}

lsa, err := c.NewLogStreamAppender(loader.TopicID, loader.LogStreamID,
varlog.WithPipelineSize(loader.PipelineSize),
)
if err != nil {
panic(err)
return nil, nil, err
}
closeFunc = lsa.Close
return func() {

appendFunc = func() error {
ts := time.Now()
err := lsa.AppendBatch(loader.batch, func(lem []varlogpb.LogEntryMeta, err error) {
if err != nil {
panic(err)
if errors.Is(err, varlog.ErrClosed) {
loader.logger.Debug("closed client", err, slog.Any("tpid", tpid), slog.Any("lsid", lsid))
} else {
loader.logger.Error("could not append", err, slog.Any("tpid", tpid), slog.Any("lsid", lsid))
}
return
}
dur := time.Since(ts)
recordMetrics(dur)
Expand All @@ -223,19 +233,23 @@ func (loader *Loader) makeAppendFunc(ctx context.Context, c varlog.Log, am *Appe
}
debugLog(lem)
})
if err != nil {
panic(err)
}
}, closeFunc
return err
}
return appendFunc, closeFunc, nil
}

func (loader *Loader) appendLoop(ctx context.Context, c varlog.Log) error {
var am AppendMetrics
appendFunc, closeFunc := loader.makeAppendFunc(ctx, c, &am)
appendFunc, closeFunc, err := loader.makeAppendFunc(ctx, c, &am)
if err != nil {
return err
}
defer closeFunc()

for {
select {
case <-loader.stopC:
return nil
case <-ctx.Done():
return ctx.Err()
default:
Expand Down Expand Up @@ -268,6 +282,8 @@ func (loader *Loader) subscribeLoop(ctx context.Context, c varlog.Log) error {
defer closer()

select {
case <-loader.stopC:
return nil
case <-stop:
if subErr != nil {
return fmt.Errorf("subscribe: %w", subErr)
Expand All @@ -284,6 +300,8 @@ func (loader *Loader) subscribeLoop(ctx context.Context, c varlog.Log) error {
}()
for {
select {
case <-loader.stopC:
return nil
case <-ctx.Done():
return ctx.Err()
default:
Expand All @@ -308,6 +326,8 @@ func (loader *Loader) setBeginLSN(ctx context.Context) error {
}
for i := uint(0); i < loader.AppendersCount; i++ {
select {
case <-loader.stopC:
return nil
case lsn := <-loader.begin.ch:
if lsn.GLSN < beginLSN.GLSN {
beginLSN = lsn
Expand Down

0 comments on commit b3ebf17

Please sign in to comment.