Skip to content

Commit

Permalink
Make data collection aggregated throughout (#362)
Browse files Browse the repository at this point in the history
  • Loading branch information
klauspost authored Jan 30, 2025
1 parent 0f75881 commit 483f844
Show file tree
Hide file tree
Showing 39 changed files with 3,055 additions and 642 deletions.
112 changes: 93 additions & 19 deletions cli/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package cli

import (
"bufio"
"bytes"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -85,6 +87,10 @@ var analyzeFlags = []cli.Flag{
Name: serverFlagName,
Usage: "When running benchmarks open a webserver to fetch results remotely, eg: localhost:7762",
},
cli.BoolFlag{
Name: "full",
Usage: "Record full analysis data with every request stored. Default will aggregate data.",
},
}

var analyzeCmd = cli.Command{
Expand Down Expand Up @@ -117,31 +123,54 @@ func mainAnalyze(ctx *cli.Context) error {
if len(args) > 1 {
console.Fatal("Only one benchmark file can be given")
}
zstdDec, _ := zstd.NewReader(nil)
defer zstdDec.Close()
monitor := api.NewBenchmarkMonitor(ctx.String(serverFlagName))
defer monitor.Done()
log := console.Printf
log := func(format string, data ...interface{}) {
console.Eraseline()
console.Printf("\r"+format, data...)
}
if globalQuiet {
log = nil
}
for _, arg := range args {
var input io.Reader
if arg == "-" {
input = os.Stdin
rc, isAggregate := openInput(arg)
defer rc.Close()
if !ctx.Bool("full") || isAggregate {
var final aggregate.Realtime
if isAggregate {
if err := json.NewDecoder(rc).Decode(&final); err != nil {
fatalIf(probe.NewError(err), "Unable to parse input")
}
if log != nil {
log("Loading %q", arg)
}
} else {
opCh := make(chan bench.Operation, 10000)
go func() {
err := bench.StreamOperationsFromCSV(rc, false, ctx.Int("analyze.offset"), ctx.Int("analyze.limit"), log, opCh)
fatalIf(probe.NewError(err), "Unable to parse input")
}()
final = *aggregate.Live(opCh, nil, "")
}
rep := final.Report(aggregate.ReportOptions{
Details: true,
Color: !globalNoColor,
})
if globalJSON {
b, err := json.MarshalIndent(final, "", " ")
fatalIf(probe.NewError(err), "Unable to parse input")
fmt.Println(string(b))
} else {
console.Println("\n")
console.Println(rep.String())
}
} else {
f, err := os.Open(arg)
fatalIf(probe.NewError(err), "Unable to open input file")
defer f.Close()
input = f
ops, err := bench.OperationsFromCSV(rc, true, ctx.Int("analyze.offset"), ctx.Int("analyze.limit"), log)
fatalIf(probe.NewError(err), "Unable to parse input")
console.Println("")
printAnalysis(ctx, os.Stdout, ops)
monitor.OperationsReady(ops, strings.TrimSuffix(filepath.Base(arg), ".csv.zst"), commandLine(ctx))
}
err := zstdDec.Reset(input)
fatalIf(probe.NewError(err), "Unable to read input")
ops, err := bench.OperationsFromCSV(zstdDec, true, ctx.Int("analyze.offset"), ctx.Int("analyze.limit"), log)
fatalIf(probe.NewError(err), "Unable to parse input")

printAnalysis(ctx, ops)
monitor.OperationsReady(ops, strings.TrimSuffix(filepath.Base(arg), ".csv.zst"), commandLine(ctx))
}
return nil
}
Expand Down Expand Up @@ -234,7 +263,46 @@ func printMixedOpAnalysis(ctx *cli.Context, aggr aggregate.Aggregated, details b
}
}

func printAnalysis(ctx *cli.Context, o bench.Operations) {
func openInput(arg string) (rc io.ReadCloser, isJSON bool) {
var input io.Reader
var fileClose func() error
if arg == "-" {
input = os.Stdin
} else {
f, err := os.Open(arg)
fatalIf(probe.NewError(err), "Unable to open input file")
fileClose = f.Close
input = f
}
z, err := zstd.NewReader(input)
fatalIf(probe.NewError(err), "could not read from input")

buf := bufio.NewReader(z)
v, err := buf.Peek(1)
fatalIf(probe.NewError(err), "could not read from input")

return readCloser{
Reader: buf,
closeFn: func() error {
z.Close()
if fileClose != nil {
return fileClose()
}
return nil
},
}, bytes.Equal(v, []byte("{"))
}

type readCloser struct {
io.Reader
closeFn func() error
}

func (rc readCloser) Close() error {
return rc.closeFn()
}

func printAnalysis(ctx *cli.Context, w io.Writer, o bench.Operations) {
details := ctx.Bool("analyze.v")
var wrSegs io.Writer
prefiltered := false
Expand Down Expand Up @@ -294,6 +362,12 @@ func printAnalysis(ctx *cli.Context, o bench.Operations) {
return
}

preOutput := color.Output
color.Output = w
defer func() {
color.Output = preOutput
}()

if aggr.Mixed {
printMixedOpAnalysis(ctx, aggr, details)
return
Expand Down Expand Up @@ -522,7 +596,7 @@ func printRequestAnalysis(_ *cli.Context, ops aggregate.Operation, details bool)
console.SetColor("Print", color.New(color.FgHiWhite))
console.Println("\nRequests by host:")

for _, ep := range reqs.HostNames {
for _, ep := range reqs.HostNames.Slice() {
reqs := eps[ep]
if reqs.Requests <= 1 {
continue
Expand Down
53 changes: 41 additions & 12 deletions cli/benchclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/minio/cli"
"github.com/minio/pkg/v3/console"
"github.com/minio/warp/pkg/aggregate"
"github.com/minio/warp/pkg/bench"
"github.com/minio/websocket"
)
Expand All @@ -38,6 +39,7 @@ type clientReplyType string
const (
clientRespBenchmarkStarted clientReplyType = "benchmark_started"
clientRespStatus clientReplyType = "benchmark_status"
clientRespAborted clientReplyType = "abort_requested"
clientRespOps clientReplyType = "ops"
)

Expand All @@ -50,9 +52,10 @@ type clientReply struct {
Started bool `json:"started"`
Finished bool `json:"finished"`
} `json:"stage_info"`
Type clientReplyType `json:"type"`
Err string `json:"err,omitempty"`
Ops bench.Operations `json:"ops,omitempty"`
Type clientReplyType `json:"type"`
Err string `json:"err,omitempty"`
Ops bench.Operations `json:"ops,omitempty"`
Update *aggregate.Realtime `json:"update,omitempty"`
}

// executeBenchmark will execute the benchmark and return any error.
Expand Down Expand Up @@ -237,6 +240,20 @@ func serveWs(w http.ResponseWriter, r *http.Request) {
close(info.start)
}()
resp.Type = clientRespStatus
case serverReqAbortStage:
activeBenchmarkMu.Lock()
ab := activeBenchmark
activeBenchmarkMu.Unlock()
resp.Type = clientRespAborted
if ab == nil {
break
}
console.Infoln("Aborting stage", req.Stage)
ab.Lock()
if cancel := ab.info[req.Stage].cancelFn; cancel != nil {
cancel()
}
ab.Unlock()
case serverReqStageStatus:
activeBenchmarkMu.Lock()
ab := activeBenchmark
Expand All @@ -249,6 +266,7 @@ func serveWs(w http.ResponseWriter, r *http.Request) {
ab.Lock()
err := ab.err
stageInfo := ab.info
updates := ab.updates
ab.Unlock()
if err != nil {
resp.Err = err.Error()
Expand All @@ -270,6 +288,12 @@ func serveWs(w http.ResponseWriter, r *http.Request) {
resp.StageInfo.Custom = info.custom
default:
}
if req.UpdateReq != nil && updates != nil {
u := make(chan *aggregate.Realtime, 1)
req.UpdateReq.C = u
updates <- *req.UpdateReq
resp.Update = <-u
}
case serverReqSendOps:
activeBenchmarkMu.Lock()
ab := activeBenchmark
Expand All @@ -278,6 +302,19 @@ func serveWs(w http.ResponseWriter, r *http.Request) {
resp.Err = "no benchmark running"
break
}
ab.Lock()
updates := ab.updates
ab.Unlock()
if req.UpdateReq != nil && updates != nil {
u := make(chan *aggregate.Realtime, 1)
req.UpdateReq.C = u
updates <- *req.UpdateReq
select {
case <-time.After(time.Second):
resp.Err = "timeout fetching update"
case resp.Update = <-u:
}
}
resp.Type = clientRespOps
ab.Lock()
resp.Ops = ab.results
Expand Down Expand Up @@ -325,15 +362,7 @@ func runCommand(ctx *cli.Context, c *cli.Command) (err error) {
}()
}

if c.Before != nil {
err = c.Before(ctx)
if err != nil {
fmt.Fprintln(ctx.App.Writer, err)
fmt.Fprintln(ctx.App.Writer)
return err
}
}

// We do not run c.Before, since it only updates global flags, which we don't want to modify.
if c.Action == nil {
return errors.New("no action")
}
Expand Down
Loading

0 comments on commit 483f844

Please sign in to comment.