Skip to content

Commit

Permalink
Merge pull request #2301 from weaveworks/report-playback
Browse files Browse the repository at this point in the history
report playback
  • Loading branch information
rade authored Mar 5, 2017
2 parents 6bba59c + 289b4c6 commit a391ae8
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 12 deletions.
2 changes: 1 addition & 1 deletion app/benchmark_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func loadReport() (report.Report, error) {
return fixture.Report, nil
}

c, err := NewFileCollector(*benchReportFile)
c, err := NewFileCollector(*benchReportFile, 0)
if err != nil {
return fixture.Report, err
}
Expand Down
101 changes: 92 additions & 9 deletions app/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -161,17 +162,74 @@ func (c StaticCollector) WaitOn(context.Context, chan struct{}) {}
// implements Reporter.
func (c StaticCollector) UnWait(context.Context, chan struct{}) {}

// NewFileCollector reads and parses the given path, returning a collector
// which always returns that report.
func NewFileCollector(path string) (Collector, error) {
// NewFileCollector reads and parses the files at path (a file or
// directory) as reports. If there are multiple files, and they all
// have names representing "nanoseconds since epoch" timestamps,
// e.g. "1488557088545489008.msgpack.gz", then the collector will
// return merged reports resulting from replaying the file reports in
// a loop at a sequence and speed determined by the timestamps.
// Otherwise the collector always returns the merger of all reports.
func NewFileCollector(path string, window time.Duration) (Collector, error) {
var (
timestamps []time.Time
reports []report.Report
)
allTimestamped := true
if err := filepath.Walk(path,
func(p string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
return nil
}
t, err := timestampFromFilepath(p)
if err != nil {
allTimestamped = false
}
timestamps = append(timestamps, t)

rpt, err := readReport(p)
if err != nil {
return err
}
reports = append(reports, rpt)
return nil
}); err != nil {
return nil, err
}
if len(reports) > 1 && allTimestamped {
collector := NewCollector(window)
go replay(collector, timestamps, reports)
return collector, nil
}
return StaticCollector(NewSmartMerger().Merge(reports).Upgrade()), nil
}

func timestampFromFilepath(path string) (time.Time, error) {
name := filepath.Base(path)
for {
ext := filepath.Ext(name)
if ext == "" {
break
}
name = strings.TrimSuffix(name, ext)
}
nanosecondsSinceEpoch, err := strconv.ParseInt(name, 10, 64)
if err != nil {
return time.Time{}, fmt.Errorf("filename '%s' is not a number (representing nanoseconds since epoch): %v", name, err)
}
return time.Unix(0, nanosecondsSinceEpoch), nil
}

func readReport(path string) (rpt report.Report, _ error) {
f, err := os.Open(path)
if err != nil {
return nil, err
return rpt, err
}
defer f.Close()

var (
rpt report.Report
handle codec.Handle
gzipped bool
)
Expand All @@ -186,12 +244,37 @@ func NewFileCollector(path string) (Collector, error) {
case ".msgpack":
handle = &codec.MsgpackHandle{}
default:
return nil, fmt.Errorf("Unsupported file extension: %v", fileType)
return rpt, fmt.Errorf("Unsupported file extension: %v", fileType)
}

if err := rpt.ReadBinary(f, gzipped, handle); err != nil {
return nil, err
err = rpt.ReadBinary(f, gzipped, handle)

return rpt, err
}

func replay(a Adder, timestamps []time.Time, reports []report.Report) {
// calculate delays between report n and n+1
l := len(timestamps)
delays := make([]time.Duration, l, l)
for i, t := range timestamps[0 : l-1] {
delays[i] = timestamps[i+1].Sub(t)
if delays[i] < 0 {
panic(fmt.Errorf("replay timestamps are not in order! %v", timestamps))
}
}
// We don't know how long to wait before looping round, so make a
// good guess.
delays[l-1] = timestamps[l-1].Sub(timestamps[0]) / time.Duration(l)

return StaticCollector(rpt), nil
due := time.Now()
for {
for i, r := range reports {
a.Add(nil, r, nil)
due = due.Add(delays[i])
delay := due.Sub(time.Now())
if delay > 0 {
time.Sleep(delay)
}
}
}
}
2 changes: 1 addition & 1 deletion prog/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func collectorFactory(userIDer multitenant.UserIDer, collectorURL, s3URL, natsHo

switch parsed.Scheme {
case "file":
return app.NewFileCollector(parsed.Path)
return app.NewFileCollector(parsed.Path, window)
case "dynamodb":
s3, err := url.Parse(s3URL)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion prog/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func main() {
flag.Var(&containerLabelFilterFlags, "app.container-label-filter", "Add container label-based view filter, specified as title:label. Multiple flags are accepted. Example: --app.container-label-filter='Database Containers:role=db'")
flag.Var(&containerLabelFilterFlagsExclude, "app.container-label-filter-exclude", "Add container label-based view filter that excludes containers with the given label, specified as title:label. Multiple flags are accepted. Example: --app.container-label-filter-exclude='Database Containers:role=db'")

flag.StringVar(&flags.app.collectorURL, "app.collector", "local", "Collector to use (local, dynamodb, or file)")
flag.StringVar(&flags.app.collectorURL, "app.collector", "local", "Collector to use (local, dynamodb, or file/directory)")
flag.StringVar(&flags.app.s3URL, "app.collector.s3", "local", "S3 URL to use (when collector is dynamodb)")
flag.StringVar(&flags.app.controlRouterURL, "app.control.router", "local", "Control router to use (local or sqs)")
flag.StringVar(&flags.app.pipeRouterURL, "app.pipe.router", "local", "Pipe router to use (local)")
Expand Down

0 comments on commit a391ae8

Please sign in to comment.