Skip to content

Commit

Permalink
feat(benchmark): rework benchmark
Browse files Browse the repository at this point in the history
This patch reworked the benchmark tool. This PR is the first version of it.

Resolves kakao#209
  • Loading branch information
ijsong committed Nov 22, 2022
1 parent 55703a4 commit 9c3f84a
Show file tree
Hide file tree
Showing 38 changed files with 4,528 additions and 226 deletions.
158 changes: 110 additions & 48 deletions cmd/benchmark/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,62 @@ package main
import (
"fmt"
"os"
"strings"

"github.com/urfave/cli/v2"

"github.com/kakao/varlog/internal/benchmark"
"github.com/kakao/varlog/internal/flags"
"github.com/kakao/varlog/pkg/types"
)

var (
flagClusterID = &cli.StringFlag{
Name: "cluster",
Usage: "Cluster ID",
Value: benchmark.DefaultClusterID.String(),
}
flagTarget = &cli.StringSliceFlag{
Name: "target",
Required: true,
Usage: "The target of the benchmark load formatted by \"topic1:logstream1,topic2:logstream2,...<topic_id:logstream_id>\"",
}
flagMRAddrs = &cli.StringSliceFlag{
Name: "address",
Required: true,
}
flagMsgSize = &cli.UintSliceFlag{
Name: "message-size",
Aliases: []string{"msg-size"},
Value: cli.NewUintSlice(benchmark.DefaultMessageSize),
Usage: "Message sizes for each load target",
}
flagBatchSize = &cli.UintSliceFlag{
Name: "batch-size",
Value: cli.NewUintSlice(benchmark.DefaultBatchSize),
Usage: "Batch sizes for each load target",
}
flagAppenders = &cli.UintSliceFlag{
Name: "appenders",
Aliases: []string{"appenders-count"},
Value: cli.NewUintSlice(benchmark.DefaultConcurrency),
Usage: "The number of appenders for each load target",
}
flagSubscribers = &cli.UintSliceFlag{
Name: "subscribers",
Aliases: []string{"subscribers-count"},
Value: cli.NewUintSlice(benchmark.DefaultConcurrency),
Usage: "The number of subscribers for each load target",
}
flagDuration = &cli.DurationFlag{
Name: "duration",
Value: benchmark.DefaultDuration,
}
flagReportInterval = &cli.DurationFlag{
Name: "report-interval",
Value: benchmark.DefaultReportInterval,
}
)

func main() {
os.Exit(run())
}
Expand All @@ -28,33 +76,21 @@ func newApp() *cli.App {
app := &cli.App{
Name: "benchmark",
Flags: []cli.Flag{
flagClusterID.StringFlag(false, benchmark.DefaultClusterID.String()),
flagTopicID.StringFlag(true, ""),
flagLogStreamID.StringFlag(true, ""),
flagMRAddrs.StringSliceFlag(true, nil),
flagMsgSize.IntFlag(true, 0),
flagBatchSize.IntFlag(false, benchmark.DefaultBatchSize),
flagConcurrency.IntFlag(false, benchmark.DefaultConcurrency),
flagDuration.DurationFlag(false, benchmark.DefaultDuration),
flagReportInterval.DurationFlag(false, benchmark.DefaultReportInterval),
flagClusterID,
flagTarget,
flagMRAddrs,
flagMsgSize,
flagBatchSize,
flagAppenders,
flagSubscribers,
flagDuration,
flagReportInterval,
},
Action: start,
}
return app
}

var (
flagClusterID = flags.ClusterID()
flagTopicID = flags.TopicID()
flagLogStreamID = flags.LogStreamID()
flagMRAddrs = flags.FlagDesc{Name: "address"}
flagMsgSize = flags.FlagDesc{Name: "msg-size"}
flagBatchSize = flags.FlagDesc{Name: "batch-size"}
flagConcurrency = flags.FlagDesc{Name: "concurrency"}
flagDuration = flags.FlagDesc{Name: "duration"}
flagReportInterval = flags.FlagDesc{Name: "report-interval"}
)

func start(c *cli.Context) error {
if c.NArg() > 0 {
return fmt.Errorf("unexpected args: %v", c.Args().Slice())
Expand All @@ -65,44 +101,70 @@ func start(c *cli.Context) error {
return err
}

topicID, err := types.ParseTopicID(c.String(flagTopicID.Name))
if err != nil {
return err
targets := make([]benchmark.Target, len(c.StringSlice(flagTarget.Name)))
for idx, str := range c.StringSlice(flagTarget.Name) {
toks := strings.Split(str, ":")
if len(toks) != 2 {
return fmt.Errorf("malformed target %s", str)
}
var target benchmark.Target
target.TopicID, err = types.ParseTopicID(toks[0])
if err != nil {
return fmt.Errorf("malformed target %s: invalid topic %s", str, toks[0])
}
if toks[1] != "*" {
target.LogStreamID, err = types.ParseLogStreamID(toks[1])
if err != nil {
return fmt.Errorf("malformed target %s: invalid log stream %s", str, toks[1])
}
}
targets[idx] = target
}

logStreamID, err := types.ParseLogStreamID(c.String(flagLogStreamID.Name))
if err != nil {
return err
}
targets = setSizes(targets, c.UintSlice(flagMsgSize.Name), func(idx int, size uint) {
targets[idx].MessageSize = size
})

msgSize := c.Int(flagMsgSize.Name)
if msgSize < 0 {
return fmt.Errorf("stress: invalid msg size %d", msgSize)
}
targets = setSizes(targets, c.UintSlice(flagBatchSize.Name), func(idx int, size uint) {
targets[idx].BatchSize = size
})

batchSize := c.Int(flagBatchSize.Name)
if batchSize <= 0 {
return fmt.Errorf("stress: invalid batch size %d", batchSize)
}

concurrency := c.Int(flagConcurrency.Name)
if concurrency < 1 {
return fmt.Errorf("stress: invalid concurrency %d", concurrency)
}
targets = setSizes(targets, c.UintSlice(flagAppenders.Name), func(idx int, size uint) {
targets[idx].AppendersCount = size
})
targets = setSizes(targets, c.UintSlice(flagSubscribers.Name), func(idx int, size uint) {
targets[idx].SubscribersCount = size
})

duration := c.Duration(flagDuration.Name)

reportInterval := c.Duration(flagReportInterval.Name)

return benchmark.Append(
bm, err := benchmark.New(
benchmark.WithClusterID(clusterID),
benchmark.WithTopicID(topicID),
benchmark.WithLogStreamID(logStreamID),
benchmark.WithTargets(targets...),
benchmark.WithMetadataRepository(c.StringSlice(flagMRAddrs.Name)),
benchmark.WithMessageSize(msgSize),
benchmark.WithBatchSize(batchSize),
benchmark.WithConcurrency(concurrency),
benchmark.WithDuration(duration),
benchmark.WithReportInterval(reportInterval),
)
if err != nil {
return err
}
defer func() {
_ = bm.Close()
}()
return bm.Run()
}

func setSizes(targets []benchmark.Target, sizes []uint, setter func(idx int, size uint)) []benchmark.Target {
for idx := range targets {
var size uint
if idx < len(sizes) {
size = sizes[idx]
} else {
size = sizes[len(sizes)-1]
}
setter(idx, size)
}
return targets
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ require (
go.uber.org/goleak v1.2.0
go.uber.org/multierr v1.8.0
go.uber.org/zap v1.23.0
golang.org/x/exp v0.0.0-20221114191408-850992195362
golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5
golang.org/x/sync v0.1.0
golang.org/x/sys v0.2.0
Expand Down Expand Up @@ -104,7 +105,6 @@ require (
go.opentelemetry.io/otel/trace v1.11.1 // indirect
go.opentelemetry.io/proto/otlp v0.11.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
golang.org/x/exp v0.0.0-20200513190911-00229845015e // indirect
golang.org/x/mod v0.7.0 // indirect
golang.org/x/net v0.2.0 // indirect
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -568,8 +568,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0
golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
golang.org/x/exp v0.0.0-20200513190911-00229845015e h1:rMqLP+9XLy+LdbCXHjJHAmTfXCr93W7oruWA6Hq1Alc=
golang.org/x/exp v0.0.0-20200513190911-00229845015e/go.mod h1:4M0jN8W1tt0AVLNr8HDosyJCDCDuyL9N9+3m7wDWgKw=
golang.org/x/exp v0.0.0-20221114191408-850992195362 h1:NoHlPRbyl1VFI6FjwHtPQCN7wAMXI6cKcqrmXhOOfBQ=
golang.org/x/exp v0.0.0-20221114191408-850992195362/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
Expand Down
Loading

0 comments on commit 9c3f84a

Please sign in to comment.