Skip to content

Commit

Permalink
feat(benchmark): share a connection between appenders in a target
Browse files Browse the repository at this point in the history
This change adds a new flag, `--single-conn-per-target`, which makes appenders in a benchmark target
share a connection.
  • Loading branch information
ijsong committed Jul 12, 2023
1 parent 8b9823f commit 1f8f464
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 17 deletions.
13 changes: 11 additions & 2 deletions cmd/benchmark/test_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ var (
Usage: "Pipeline size, no pipelined requests if zero. Not support per-target pipeline size yet.",
Value: 0,
}
flagSingleConnPerTarget = &cli.BoolFlag{
Name: "single-conn-per-target",
Usage: "Use single connection shared by appenders in a target. Each target uses different connection.",
}
)

func newCommandTest() *cli.Command {
Expand All @@ -83,6 +87,7 @@ func newCommandTest() *cli.Command {
flagReportInterval,
flagPrintJSON,
flagPipelineSize,
flagSingleConnPerTarget,
},
Action: runCommandTest,
}
Expand Down Expand Up @@ -145,14 +150,18 @@ func runCommandTest(c *cli.Context) error {
enc = benchmark.StringEncoder{}
}

bm, err := benchmark.New(
opts := []benchmark.Option{
benchmark.WithClusterID(clusterID),
benchmark.WithTargets(targets...),
benchmark.WithMetadataRepository(c.StringSlice(flagMRAddrs.Name)),
benchmark.WithDuration(duration),
benchmark.WithReportInterval(reportInterval),
benchmark.WithReportEncoder(enc),
)
}
if c.Bool(flagSingleConnPerTarget.Name) {
opts = append(opts, benchmark.WithSingleConnPerTarget())
}
bm, err := benchmark.New(opts...)
if err != nil {
return err
}
Expand Down
9 changes: 5 additions & 4 deletions internal/benchmark/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,11 @@ func New(opts ...Option) (bm *Benchmark, err error) {
target := bm.targets[idx]
loaderMetrics := &LoaderMetrics{tgt: target}
loader, err = NewLoader(loaderConfig{
Target: target,
cid: bm.cid,
mraddrs: bm.mraddrs,
metrics: loaderMetrics,
Target: target,
cid: bm.cid,
mraddrs: bm.mraddrs,
metrics: loaderMetrics,
singleConnPerTarget: bm.singleConnPerTarget,
})
if err != nil {
return bm, err
Expand Down
19 changes: 13 additions & 6 deletions internal/benchmark/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ const (
)

type config struct {
cid types.ClusterID
targets []Target
mraddrs []string
duration time.Duration
reportInterval time.Duration
reportEncoder ReportEncoder
cid types.ClusterID
targets []Target
mraddrs []string
duration time.Duration
reportInterval time.Duration
reportEncoder ReportEncoder
singleConnPerTarget bool
}

func newConfig(opts []Option) (config, error) {
Expand Down Expand Up @@ -106,3 +107,9 @@ func WithReportEncoder(enc ReportEncoder) Option {
cfg.reportEncoder = enc
})
}

func WithSingleConnPerTarget() Option {
return newFuncOption(func(cfg *config) {
cfg.singleConnPerTarget = true
})
}
27 changes: 22 additions & 5 deletions internal/benchmark/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ import (

type loaderConfig struct {
Target
cid types.ClusterID
mraddrs []string
metrics *LoaderMetrics
cid types.ClusterID
mraddrs []string
metrics *LoaderMetrics
singleConnPerTarget bool
}

type Loader struct {
Expand Down Expand Up @@ -47,17 +48,33 @@ func NewLoader(cfg loaderConfig) (loader *Loader, err error) {
}
}()

var scli varlog.Log
getClient := func() (varlog.Log, error) {
if loader.singleConnPerTarget {
if scli != nil {
return scli, nil
}
cli, err := varlog.Open(context.TODO(), loader.cid, loader.mraddrs)
if err != nil {
return nil, err
}
scli = cli
return scli, nil
}
return varlog.Open(context.TODO(), loader.cid, loader.mraddrs)
}

var c varlog.Log
for i := uint(0); i < loader.AppendersCount; i++ {
c, err = varlog.Open(context.TODO(), loader.cid, loader.mraddrs)
c, err = getClient()
if err != nil {
return loader, err
}
loader.apps = append(loader.apps, c)
}

for i := uint(0); i < loader.SubscribersCount; i++ {
c, err = varlog.Open(context.TODO(), loader.cid, loader.mraddrs)
c, err = getClient()
if err != nil {
return loader, err
}
Expand Down

0 comments on commit 1f8f464

Please sign in to comment.