Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

report playback #2301

Merged
merged 1 commit into from
Mar 5, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/benchmark_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func loadReport() (report.Report, error) {
return fixture.Report, nil
}

c, err := NewFileCollector(*benchReportFile)
c, err := NewFileCollector(*benchReportFile, 0)
if err != nil {
return fixture.Report, err
}
Expand Down
101 changes: 92 additions & 9 deletions app/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -161,17 +162,74 @@ func (c StaticCollector) WaitOn(context.Context, chan struct{}) {}
// implements Reporter.
func (c StaticCollector) UnWait(context.Context, chan struct{}) {}

// NewFileCollector reads and parses the given path, returning a collector
// which always returns that report.
func NewFileCollector(path string) (Collector, error) {
// NewFileCollector reads and parses the files at path (a file or
// directory) as reports. If there are multiple files, and they all
// have names representing "nanoseconds since epoch" timestamps,
// e.g. "1488557088545489008.msgpack.gz", then the collector will
// return merged reports resulting from replaying the file reports in
// a loop at a sequence and speed determined by the timestamps.
// Otherwise the collector always returns the merger of all reports.
func NewFileCollector(path string, window time.Duration) (Collector, error) {
var (
timestamps []time.Time
reports []report.Report
)
allTimestamped := true
if err := filepath.Walk(path,
func(p string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
return nil
}
t, err := timestampFromFilepath(p)
if err != nil {
allTimestamped = false
}
timestamps = append(timestamps, t)

rpt, err := readReport(p)
if err != nil {
return err
}
reports = append(reports, rpt)
return nil
}); err != nil {
return nil, err
}
if len(reports) > 1 && allTimestamped {
collector := NewCollector(window)
go replay(collector, timestamps, reports)
return collector, nil
}
return StaticCollector(NewSmartMerger().Merge(reports).Upgrade()), nil
}

func timestampFromFilepath(path string) (time.Time, error) {
name := filepath.Base(path)
for {
ext := filepath.Ext(name)
if ext == "" {
break
}
name = strings.TrimSuffix(name, ext)
}
nanosecondsSinceEpoch, err := strconv.ParseInt(name, 10, 64)
if err != nil {
return time.Time{}, fmt.Errorf("filename '%s' is not a number (representing nanoseconds since epoch): %v", name, err)
}
return time.Unix(0, nanosecondsSinceEpoch), nil
}

func readReport(path string) (rpt report.Report, _ error) {
f, err := os.Open(path)
if err != nil {
return nil, err
return rpt, err
}
defer f.Close()

var (
rpt report.Report
handle codec.Handle
gzipped bool
)
Expand All @@ -186,12 +244,37 @@ func NewFileCollector(path string) (Collector, error) {
case ".msgpack":
handle = &codec.MsgpackHandle{}
default:
return nil, fmt.Errorf("Unsupported file extension: %v", fileType)
return rpt, fmt.Errorf("Unsupported file extension: %v", fileType)
}

if err := rpt.ReadBinary(f, gzipped, handle); err != nil {
return nil, err
err = rpt.ReadBinary(f, gzipped, handle)

return rpt, err
}

func replay(a Adder, timestamps []time.Time, reports []report.Report) {
// calculate delays between report n and n+1
l := len(timestamps)
delays := make([]time.Duration, l, l)
for i, t := range timestamps[0 : l-1] {
delays[i] = timestamps[i+1].Sub(t)
if delays[i] < 0 {
panic(fmt.Errorf("replay timestamps are not in order! %v", timestamps))
}
}
// We don't know how long to wait before looping round, so make a
// good guess.
delays[l-1] = timestamps[l-1].Sub(timestamps[0]) / time.Duration(l)

return StaticCollector(rpt), nil
due := time.Now()
for {
for i, r := range reports {
a.Add(nil, r, nil)
due = due.Add(delays[i])
delay := due.Sub(time.Now())
if delay > 0 {
time.Sleep(delay)
}
}
}
}
2 changes: 1 addition & 1 deletion prog/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func collectorFactory(userIDer multitenant.UserIDer, collectorURL, s3URL, natsHo

switch parsed.Scheme {
case "file":
return app.NewFileCollector(parsed.Path)
return app.NewFileCollector(parsed.Path, window)
case "dynamodb":
s3, err := url.Parse(s3URL)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion prog/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func main() {
flag.Var(&containerLabelFilterFlags, "app.container-label-filter", "Add container label-based view filter, specified as title:label. Multiple flags are accepted. Example: --app.container-label-filter='Database Containers:role=db'")
flag.Var(&containerLabelFilterFlagsExclude, "app.container-label-filter-exclude", "Add container label-based view filter that excludes containers with the given label, specified as title:label. Multiple flags are accepted. Example: --app.container-label-filter-exclude='Database Containers:role=db'")

flag.StringVar(&flags.app.collectorURL, "app.collector", "local", "Collector to use (local, dynamodb, or file)")
flag.StringVar(&flags.app.collectorURL, "app.collector", "local", "Collector to use (local, dynamodb, or file/directory)")
flag.StringVar(&flags.app.s3URL, "app.collector.s3", "local", "S3 URL to use (when collector is dynamodb)")
flag.StringVar(&flags.app.controlRouterURL, "app.control.router", "local", "Control router to use (local or sqs)")
flag.StringVar(&flags.app.pipeRouterURL, "app.pipe.router", "local", "Pipe router to use (local)")
Expand Down