Skip to content

Commit

Permalink
Fix filtered time function (#149)
Browse files Browse the repository at this point in the history
When a pre-analysis segmentation is added like `--analyze.host=xyz` don't expect all threads to have completed one operation at each end.

Since we don't have the operations any more, there may be significant gaps.

Instead use the simpler time function for these.
  • Loading branch information
klauspost authored Nov 19, 2020
1 parent 21a0080 commit 24668c7
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 14 deletions.
5 changes: 4 additions & 1 deletion api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,10 @@ func (s *Server) handleAggregated(w http.ResponseWriter, req *http.Request) {
return
}
if s.agrr == nil || s.aggrDur != segmentDur {
aggr := aggregate.Aggregate(s.ops, durFn, 0)
aggr := aggregate.Aggregate(s.ops, aggregate.Options{
DurFunc: durFn,
SkipDur: 0,
})
s.agrr = &aggr
s.aggrDur = segmentDur
}
Expand Down
20 changes: 18 additions & 2 deletions cli/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ func printMixedOpAnalysis(ctx *cli.Context, aggr aggregate.Aggregated, details b
func printAnalysis(ctx *cli.Context, o bench.Operations) {
details := ctx.Bool("analyze.v")
var wrSegs io.Writer
prefiltered := false
if fn := ctx.String("analyze.out"); fn != "" {
if fn == "-" {
wrSegs = os.Stdout
Expand All @@ -229,10 +230,21 @@ func printAnalysis(ctx *cli.Context, o bench.Operations) {
}
}
if onlyHost := ctx.String("analyze.host"); onlyHost != "" {
o = o.FilterByEndpoint(onlyHost)
o2 := o.FilterByEndpoint(onlyHost)
if len(o2) == 0 {
hosts := o.Endpoints()
console.Println("Host not found, valid hosts are:")
for _, h := range hosts {
console.Println("\t* %s", h)
}
return
}
prefiltered = true
o = o2
}

if wantOp := ctx.String("analyze.op"); wantOp != "" {
prefiltered = prefiltered || o.IsMixed()
o = o.FilterByOp(wantOp)
}
durFn := func(total time.Duration) time.Duration {
Expand All @@ -241,7 +253,11 @@ func printAnalysis(ctx *cli.Context, o bench.Operations) {
}
return analysisDur(ctx, total)
}
aggr := aggregate.Aggregate(o, durFn, ctx.Duration("analyze.skip"))
aggr := aggregate.Aggregate(o, aggregate.Options{
Prefiltered: prefiltered,
DurFunc: durFn,
SkipDur: ctx.Duration("analyze.skip"),
})
if wrSegs != nil {
for _, ops := range aggr.Operations {
writeSegs(ctx, wrSegs, o.FilterByOp(ops.Type), aggr.Mixed, details)
Expand Down
28 changes: 17 additions & 11 deletions pkg/aggregate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,14 @@ type Operation struct {
// SegmentDurFn accepts a total time and should return the duration used for each segment.
type SegmentDurFn func(total time.Duration) time.Duration

type Options struct {
Prefiltered bool
DurFunc SegmentDurFn
SkipDur time.Duration
}

// Aggregate returns statistics when only a single operation was running concurrently.
func Aggregate(o bench.Operations, dFn SegmentDurFn, skipDur time.Duration) Aggregated {
func Aggregate(o bench.Operations, opts Options) Aggregated {
o.SortByStartTime()
types := o.OpTypes()
a := Aggregated{
Expand All @@ -90,14 +96,14 @@ func Aggregate(o bench.Operations, dFn SegmentDurFn, skipDur time.Duration) Aggr
a.Type = "mixed"
o.SortByStartTime()
start, end := o.ActiveTimeRange(true)
start.Add(skipDur)
start.Add(opts.SkipDur)
total := o.FilterInsideRange(start, end).Total(false)
a.MixedServerStats = &Throughput{}
a.MixedServerStats.fill(total)

segmentDur := dFn(total.Duration())
segmentDur := opts.DurFunc(total.Duration())
segs := o.Segment(bench.SegmentOptions{
From: start.Add(skipDur),
From: start.Add(opts.SkipDur),
PerSegDuration: segmentDur,
AllThreads: true,
MultiOp: true,
Expand Down Expand Up @@ -143,9 +149,9 @@ func Aggregate(o bench.Operations, dFn SegmentDurFn, skipDur time.Duration) Aggr
}()
a.Type = typ
ops := o.FilterByOp(typ)
if skipDur > 0 {
if opts.SkipDur > 0 {
start, end := ops.TimeRange()
start = start.Add(skipDur)
start = start.Add(opts.SkipDur)
ops = ops.FilterInsideRange(start, end)
}

Expand All @@ -166,18 +172,18 @@ func Aggregate(o bench.Operations, dFn SegmentDurFn, skipDur time.Duration) Aggr
a.Skipped = true
return
}
segmentDur := dFn(ops.Duration())
segmentDur := opts.DurFunc(ops.Duration())
segs := ops.Segment(bench.SegmentOptions{
From: time.Time{},
PerSegDuration: segmentDur,
AllThreads: !isMixed,
AllThreads: !isMixed && !opts.Prefiltered,
})
a.N = len(ops)
if len(segs) <= 1 {
a.Skipped = true
return
}
total := ops.Total(!isMixed)
total := ops.Total(!isMixed && !opts.Prefiltered)
a.StartTime, a.EndTime = ops.TimeRange()
a.Throughput.fill(total)
a.Throughput.Segmented = &ThroughputSegmented{
Expand All @@ -190,9 +196,9 @@ func Aggregate(o bench.Operations, dFn SegmentDurFn, skipDur time.Duration) Aggr
a.Hosts = ops.Hosts()

if !ops.MultipleSizes() {
a.SingleSizedRequests = RequestAnalysisSingleSized(ops, !isMixed)
a.SingleSizedRequests = RequestAnalysisSingleSized(ops, !isMixed && !opts.Prefiltered)
} else {
a.MultiSizedRequests = RequestAnalysisMultiSized(ops, !isMixed)
a.MultiSizedRequests = RequestAnalysisMultiSized(ops, !isMixed && !opts.Prefiltered)
}

eps := ops.Endpoints()
Expand Down

0 comments on commit 24668c7

Please sign in to comment.