Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(benchmark): support graceful stop #527

Merged
merged 1 commit into from
Jul 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions internal/benchmark/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Benchmark struct {
config
loaders []*Loader
metrics Metrics
stopC chan struct{}
}

// New creates a new Benchmark and returns it. Users must call Close to release resources if it returns successfully.
Expand All @@ -34,6 +35,7 @@ func New(opts ...Option) (bm *Benchmark, err error) {

bm = &Benchmark{
config: cfg,
stopC: make(chan struct{}),
}

defer func() {
Expand All @@ -52,6 +54,7 @@ func New(opts ...Option) (bm *Benchmark, err error) {
mraddrs: bm.mraddrs,
metrics: loaderMetrics,
singleConnPerTarget: bm.singleConnPerTarget,
stopC: bm.stopC,
})
if err != nil {
return bm, err
Expand All @@ -65,8 +68,7 @@ func New(opts ...Option) (bm *Benchmark, err error) {

// Run starts Loaders and metric reporter. It blocks until the loaders are finished.
func (bm *Benchmark) Run() error {
ctx, cancel := context.WithCancel(context.Background())
g, ctx := errgroup.WithContext(ctx)
g, ctx := errgroup.WithContext(context.Background())

benchmarkTimer := time.NewTimer(bm.duration)

Expand All @@ -81,7 +83,7 @@ func (bm *Benchmark) Run() error {
wg.Add(1)
go func() {
defer func() {
cancel()
close(bm.stopC)
wg.Done()
fmt.Println(MustEncode(bm.reportEncoder, bm.metrics.Flush()))
}()
Expand Down
58 changes: 40 additions & 18 deletions internal/benchmark/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package benchmark

import (
"context"
"errors"
"fmt"
"time"

Expand All @@ -21,6 +22,7 @@ type loaderConfig struct {
mraddrs []string
metrics *LoaderMetrics
singleConnPerTarget bool
stopC <-chan struct{}
}

type Loader struct {
Expand Down Expand Up @@ -95,12 +97,10 @@ func NewLoader(cfg loaderConfig) (loader *Loader, err error) {
func (loader *Loader) Run(ctx context.Context) (err error) {
loader.metrics.Reset(time.Now())

ctx, cancel := context.WithCancel(ctx)
g, ctx := errgroup.WithContext(ctx)

defer func() {
err = multierr.Append(err, g.Wait())
cancel()
}()

for i := 0; i < len(loader.apps); i++ {
Expand Down Expand Up @@ -137,7 +137,7 @@ func (loader *Loader) Close() error {
return err
}

func (loader *Loader) makeAppendFunc(ctx context.Context, c varlog.Log, am *AppendMetrics) (appendFunc func(), closeFunc func()) {
func (loader *Loader) makeAppendFunc(ctx context.Context, c varlog.Log, am *AppendMetrics) (appendFunc func() error, closeFunc func(), err error) {
begin := true
notifyBegin := func(meta varlogpb.LogEntryMeta) {
loader.begin.ch <- varlogpb.LogSequenceNumber{
Expand Down Expand Up @@ -172,49 +172,59 @@ func (loader *Loader) makeAppendFunc(ctx context.Context, c varlog.Log, am *Appe

closeFunc = func() {}
if lsid.Invalid() {
return func() {
appendFunc = func() error {
ts := time.Now()
res := c.Append(ctx, loader.TopicID, loader.batch)
if res.Err != nil {
panic(res.Err)
return res.Err
}
dur := time.Since(ts)
recordMetrics(dur)
if begin {
notifyBegin(res.Metadata[0])
}
debugLog(res.Metadata)
}, closeFunc
return nil
}
return appendFunc, closeFunc, nil
}

if loader.PipelineSize == 0 {
return func() {
appendFunc = func() error {
ts := time.Now()
res := c.AppendTo(ctx, tpid, lsid, loader.batch)
if res.Err != nil {
panic(res.Err)
return res.Err
}
dur := time.Since(ts)
recordMetrics(dur)
if begin {
notifyBegin(res.Metadata[0])
}
debugLog(res.Metadata)
}, closeFunc
return nil
}
return appendFunc, closeFunc, nil
}

lsa, err := c.NewLogStreamAppender(loader.TopicID, loader.LogStreamID,
varlog.WithPipelineSize(loader.PipelineSize),
)
if err != nil {
panic(err)
return nil, nil, err
}
closeFunc = lsa.Close
return func() {

appendFunc = func() error {
ts := time.Now()
err := lsa.AppendBatch(loader.batch, func(lem []varlogpb.LogEntryMeta, err error) {
if err != nil {
panic(err)
if errors.Is(err, varlog.ErrClosed) {
loader.logger.Debug("closed client", err, slog.Any("tpid", tpid), slog.Any("lsid", lsid))
} else {
loader.logger.Error("could not append", err, slog.Any("tpid", tpid), slog.Any("lsid", lsid))
}
return
}
dur := time.Since(ts)
recordMetrics(dur)
Expand All @@ -223,23 +233,29 @@ func (loader *Loader) makeAppendFunc(ctx context.Context, c varlog.Log, am *Appe
}
debugLog(lem)
})
if err != nil {
panic(err)
}
}, closeFunc
return err
}
return appendFunc, closeFunc, nil
}

func (loader *Loader) appendLoop(ctx context.Context, c varlog.Log) error {
var am AppendMetrics
appendFunc, closeFunc := loader.makeAppendFunc(ctx, c, &am)
appendFunc, closeFunc, err := loader.makeAppendFunc(ctx, c, &am)
if err != nil {
return err
}
defer closeFunc()

for {
select {
case <-loader.stopC:
return nil
case <-ctx.Done():
return ctx.Err()
default:
appendFunc()
if err := appendFunc(); err != nil {
return err
}
}
}
}
Expand Down Expand Up @@ -268,6 +284,8 @@ func (loader *Loader) subscribeLoop(ctx context.Context, c varlog.Log) error {
defer closer()

select {
case <-loader.stopC:
return nil
case <-stop:
if subErr != nil {
return fmt.Errorf("subscribe: %w", subErr)
Expand All @@ -284,6 +302,8 @@ func (loader *Loader) subscribeLoop(ctx context.Context, c varlog.Log) error {
}()
for {
select {
case <-loader.stopC:
return nil
case <-ctx.Done():
return ctx.Err()
default:
Expand All @@ -308,6 +328,8 @@ func (loader *Loader) setBeginLSN(ctx context.Context) error {
}
for i := uint(0); i < loader.AppendersCount; i++ {
select {
case <-loader.stopC:
return nil
case lsn := <-loader.begin.ch:
if lsn.GLSN < beginLSN.GLSN {
beginLSN = lsn
Expand Down