diff --git a/cmd/benchmark/test_command.go b/cmd/benchmark/test_command.go index 32beac2c3..da1f2f3f3 100644 --- a/cmd/benchmark/test_command.go +++ b/cmd/benchmark/test_command.go @@ -65,6 +65,10 @@ var ( Usage: "Pipeline size, no pipelined requests if zero. Not support per-target pipeline size yet.", Value: 0, } + flagSingleConnPerTarget = &cli.BoolFlag{ + Name: "single-conn-per-target", + Usage: "Use single connection shared by appenders in a target. Each target uses different connection.", + } ) func newCommandTest() *cli.Command { @@ -83,6 +87,7 @@ func newCommandTest() *cli.Command { flagReportInterval, flagPrintJSON, flagPipelineSize, + flagSingleConnPerTarget, }, Action: runCommandTest, } @@ -145,14 +150,18 @@ func runCommandTest(c *cli.Context) error { enc = benchmark.StringEncoder{} } - bm, err := benchmark.New( + opts := []benchmark.Option{ benchmark.WithClusterID(clusterID), benchmark.WithTargets(targets...), benchmark.WithMetadataRepository(c.StringSlice(flagMRAddrs.Name)), benchmark.WithDuration(duration), benchmark.WithReportInterval(reportInterval), benchmark.WithReportEncoder(enc), - ) + } + if c.Bool(flagSingleConnPerTarget.Name) { + opts = append(opts, benchmark.WithSingleConnPerTarget()) + } + bm, err := benchmark.New(opts...) if err != nil { return err } diff --git a/internal/benchmark/benchmark.go b/internal/benchmark/benchmark.go index e74794566..4e53e53ed 100644 --- a/internal/benchmark/benchmark.go +++ b/internal/benchmark/benchmark.go @@ -47,10 +47,11 @@ func New(opts ...Option) (bm *Benchmark, err error) { target := bm.targets[idx] loaderMetrics := &LoaderMetrics{tgt: target} loader, err = NewLoader(loaderConfig{ - Target: target, - cid: bm.cid, - mraddrs: bm.mraddrs, - metrics: loaderMetrics, + Target: target, + cid: bm.cid, + mraddrs: bm.mraddrs, + metrics: loaderMetrics, + singleConnPerTarget: bm.singleConnPerTarget, }) if err != nil { return bm, err diff --git a/internal/benchmark/config.go b/internal/benchmark/config.go index 7c1d2ec82..f7711d19e 100644 --- a/internal/benchmark/config.go +++ b/internal/benchmark/config.go @@ -17,12 +17,13 @@ const ( ) type config struct { - cid types.ClusterID - targets []Target - mraddrs []string - duration time.Duration - reportInterval time.Duration - reportEncoder ReportEncoder + cid types.ClusterID + targets []Target + mraddrs []string + duration time.Duration + reportInterval time.Duration + reportEncoder ReportEncoder + singleConnPerTarget bool } func newConfig(opts []Option) (config, error) { @@ -106,3 +107,9 @@ func WithReportEncoder(enc ReportEncoder) Option { cfg.reportEncoder = enc }) } + +func WithSingleConnPerTarget() Option { + return newFuncOption(func(cfg *config) { + cfg.singleConnPerTarget = true + }) +} diff --git a/internal/benchmark/loader.go b/internal/benchmark/loader.go index 3df4f2ecc..bbbf3038b 100644 --- a/internal/benchmark/loader.go +++ b/internal/benchmark/loader.go @@ -17,9 +17,10 @@ import ( type loaderConfig struct { Target - cid types.ClusterID - mraddrs []string - metrics *LoaderMetrics + cid types.ClusterID + mraddrs []string + metrics *LoaderMetrics + singleConnPerTarget bool } type Loader struct { @@ -47,9 +48,25 @@ func NewLoader(cfg loaderConfig) (loader *Loader, err error) { } }() + var scli varlog.Log + getClient := func() (varlog.Log, error) { + if loader.singleConnPerTarget { + if scli != nil { + return scli, nil + } + cli, err := varlog.Open(context.TODO(), loader.cid, loader.mraddrs) + if err != nil { + return nil, err + } + scli = cli + return scli, nil + } + return varlog.Open(context.TODO(), loader.cid, loader.mraddrs) + } + var c varlog.Log for i := uint(0); i < loader.AppendersCount; i++ { - c, err = varlog.Open(context.TODO(), loader.cid, loader.mraddrs) + c, err = getClient() if err != nil { return loader, err } @@ -57,7 +74,7 @@ func NewLoader(cfg loaderConfig) (loader *Loader, err error) { } for i := uint(0); i < loader.SubscribersCount; i++ { - c, err = varlog.Open(context.TODO(), loader.cid, loader.mraddrs) + c, err = getClient() if err != nil { return loader, err }