Skip to content

Commit

Permalink
Add --watch option to see progress while benchmark is running
Browse files Browse the repository at this point in the history
  • Loading branch information
fridrik01 committed Apr 26, 2023
1 parent cbd1b99 commit 1161d5e
Showing 1 changed file with 78 additions and 24 deletions.
102 changes: 78 additions & 24 deletions cmd/lotus-bench/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ Here are some real examples:
Usage: `Method to benchmark, you can specify multiple methods by repeating this flag. You can also specify method specific options to set the concurrency and qps for each method (see usage).
`,
},
&cli.DurationFlag{
Name: "watch",
Value: 0 * time.Second,
Usage: "If >0 then generates reports every N seconds (only supports linux/unix)",
},
&cli.BoolFlag{
Name: "print-response",
Value: false,
Expand Down Expand Up @@ -135,6 +140,7 @@ Here are some real examples:
}

rpcMethods = append(rpcMethods, &RPCMethod{
w: os.Stdout,
uri: cctx.String("endpoint"),
method: entries[0],
concurrency: concurrency,
Expand Down Expand Up @@ -177,8 +183,42 @@ Here are some real examples:
}(e)
}

// if watch is set then print a report every N seconds
var progressCh chan bool
if cctx.Duration("watch") > 0 {
progressCh = make(chan bool, 1)
go func(progressCh chan bool) {
ticker := time.NewTicker(time.Duration(cctx.Duration("watch")))
for {
clearAndPrintReport := func() {
for _, e := range rpcMethods {
// clear the screen move the curser to the top left
fmt.Print("\033[2J")
fmt.Printf("\033[%d;%dH", 1, 1)
e.Report()
}
}
select {
case <-ticker.C:
clearAndPrintReport()
case <-progressCh:
clearAndPrintReport()
return
}
}
}(progressCh)
}

wg.Wait()

if progressCh != nil {
// wait for the watch go routine to return
progressCh <- true

// no need to print the report again
return nil
}

// print the report for each endpoint
for i, e := range rpcMethods {
e.Report()
Expand All @@ -193,6 +233,7 @@ Here are some real examples:

// RPCMethod handles the benchmarking of a single endpoint method.
type RPCMethod struct {
w io.Writer
// the endpoint uri
uri string
// the rpc method we want to benchmark
Expand Down Expand Up @@ -234,7 +275,7 @@ func (rpc *RPCMethod) Run() error {
rpc.stopCh = make(chan struct{}, rpc.concurrency)

go func() {
rpc.reporter = NewReporter(rpc.results)
rpc.reporter = NewReporter(rpc.results, rpc.w)
rpc.reporter.Run()
}()

Expand Down Expand Up @@ -376,20 +417,25 @@ func (rpc *RPCMethod) Stop() {

func (rpc *RPCMethod) Report() {
total := time.Since(rpc.start)
fmt.Printf("[%s]:\n", rpc.method)
fmt.Printf("- Options:\n")
fmt.Printf(" - concurrency: %d\n", rpc.concurrency)
fmt.Printf(" - params: %s\n", rpc.params)
fmt.Printf(" - qps: %d\n", rpc.qps)
rpc.reporter.Print(total)
fmt.Fprintf(rpc.w, "[%s]:\n", rpc.method)
fmt.Fprintf(rpc.w, "- Options:\n")
fmt.Fprintf(rpc.w, " - concurrency: %d\n", rpc.concurrency)
fmt.Fprintf(rpc.w, " - params: %s\n", rpc.params)
fmt.Fprintf(rpc.w, " - qps: %d\n", rpc.qps)
rpc.reporter.Print(total, rpc.w)
}

// Reporter reads the results from the workers through the results channel and aggregates the results.
type Reporter struct {
// write the report to this writer
w io.Writer
// the reporter read the results from this channel
results chan *result
// doneCh is used to signal that the reporter has finished reading the results (channel has closed)
doneCh chan bool

// lock protect the following fields during critical sections (if --watch was specified)
lock sync.Mutex
// the latencies of all requests
latencies []int64
// the number of requests that returned each status code
Expand All @@ -398,8 +444,9 @@ type Reporter struct {
errors map[string]int
}

func NewReporter(results chan *result) *Reporter {
func NewReporter(results chan *result, w io.Writer) *Reporter {
return &Reporter{
w: w,
results: results,
doneCh: make(chan bool, 1),
statusCodes: make(map[int]int),
Expand All @@ -409,6 +456,8 @@ func NewReporter(results chan *result) *Reporter {

func (r *Reporter) Run() {
for res := range r.results {
r.lock.Lock()

r.latencies = append(r.latencies, res.duration.Milliseconds())

if res.statusCode != nil {
Expand All @@ -425,12 +474,17 @@ func (r *Reporter) Run() {
} else {
r.errors["nil"]++
}

r.lock.Unlock()
}

r.doneCh <- true
}

func (r *Reporter) Print(elapsed time.Duration) {
func (r *Reporter) Print(elapsed time.Duration, w io.Writer) {
r.lock.Lock()
defer r.lock.Unlock()

nrReq := int64(len(r.latencies))
if nrReq == 0 {
fmt.Println("No requests were made")
Expand All @@ -447,16 +501,16 @@ func (r *Reporter) Print(elapsed time.Duration) {
totalLatency += latency
}

fmt.Printf("- Total Requests: %d\n", nrReq)
fmt.Printf("- Total Duration: %dms\n", elapsed.Milliseconds())
fmt.Printf("- Requests/sec: %f\n", float64(nrReq)/elapsed.Seconds())
fmt.Printf("- Avg latency: %dms\n", totalLatency/nrReq)
fmt.Printf("- Median latency: %dms\n", r.latencies[nrReq/2])
fmt.Printf("- Latency distribution:\n")
fmt.Fprintf(w, "- Total Requests: %d\n", nrReq)
fmt.Fprintf(w, "- Total Duration: %dms\n", elapsed.Milliseconds())
fmt.Fprintf(w, "- Requests/sec: %f\n", float64(nrReq)/elapsed.Seconds())
fmt.Fprintf(w, "- Avg latency: %dms\n", totalLatency/nrReq)
fmt.Fprintf(w, "- Median latency: %dms\n", r.latencies[nrReq/2])
fmt.Fprintf(w, "- Latency distribution:\n")
percentiles := []float64{0.1, 0.5, 0.9, 0.95, 0.99, 0.999}
for _, p := range percentiles {
idx := int64(p * float64(nrReq))
fmt.Printf(" %s%% in %dms\n", fmt.Sprintf("%.2f", p*100.0), r.latencies[idx])
fmt.Fprintf(w, " %s%% in %dms\n", fmt.Sprintf("%.2f", p*100.0), r.latencies[idx])
}

// create a simple histogram with 10 buckets spanning the range of latency
Expand Down Expand Up @@ -489,19 +543,19 @@ func (r *Reporter) Print(elapsed time.Duration) {
}

// print the histogram using a tabwriter which will align the columns nicely
fmt.Printf("- Histogram:\n")
fmt.Fprintf(w, "- Histogram:\n")
const padding = 2
w := tabwriter.NewWriter(os.Stdout, 0, 0, padding, ' ', tabwriter.AlignRight|tabwriter.Debug)
tabWriter := tabwriter.NewWriter(w, 0, 0, padding, ' ', tabwriter.AlignRight|tabwriter.Debug)
for i := 0; i < nrBucket; i++ {
ratio := float64(buckets[i].cnt) / float64(nrReq)
bars := strings.Repeat("#", int(ratio*100))
fmt.Fprintf(w, " %d-%dms\t%d\t%s (%s%%)\n", buckets[i].start, buckets[i].end, buckets[i].cnt, bars, fmt.Sprintf("%.2f", ratio*100))
fmt.Fprintf(tabWriter, " %d-%dms\t%d\t%s (%s%%)\n", buckets[i].start, buckets[i].end, buckets[i].cnt, bars, fmt.Sprintf("%.2f", ratio*100))
}
w.Flush() //nolint:errcheck
tabWriter.Flush() //nolint:errcheck

fmt.Printf("- Status codes:\n")
fmt.Fprintf(w, "- Status codes:\n")
for code, cnt := range r.statusCodes {
fmt.Printf(" [%d]: %d\n", code, cnt)
fmt.Fprintf(w, " [%d]: %d\n", code, cnt)
}

// print the 10 most occurring errors (in case error values are not unique)
Expand All @@ -517,12 +571,12 @@ func (r *Reporter) Print(elapsed time.Duration) {
sort.Slice(sortedErrors, func(i, j int) bool {
return sortedErrors[i].cnt > sortedErrors[j].cnt
})
fmt.Printf("- Errors (top 10):\n")
fmt.Fprintf(w, "- Errors (top 10):\n")
for i, se := range sortedErrors {
if i > 10 {
break
}
fmt.Printf(" [%s]: %d\n", se.err, se.cnt)
fmt.Fprintf(w, " [%s]: %d\n", se.err, se.cnt)
}
}

Expand Down

0 comments on commit 1161d5e

Please sign in to comment.