diff --git a/app/benchmark_internal_test.go b/app/benchmark_internal_test.go index fe6a479703..0b75c24d65 100644 --- a/app/benchmark_internal_test.go +++ b/app/benchmark_internal_test.go @@ -22,7 +22,7 @@ func loadReport() (report.Report, error) { return fixture.Report, nil } - c, err := NewFileCollector(*benchReportFile) + c, err := NewFileCollector(*benchReportFile, 0) if err != nil { return fixture.Report, err } diff --git a/app/collector.go b/app/collector.go index b37dd85b32..2a57ba10f6 100644 --- a/app/collector.go +++ b/app/collector.go @@ -4,6 +4,7 @@ import ( "fmt" "os" "path/filepath" + "strconv" "strings" "sync" "time" @@ -161,17 +162,74 @@ func (c StaticCollector) WaitOn(context.Context, chan struct{}) {} // implements Reporter. func (c StaticCollector) UnWait(context.Context, chan struct{}) {} -// NewFileCollector reads and parses the given path, returning a collector -// which always returns that report. -func NewFileCollector(path string) (Collector, error) { +// NewFileCollector reads and parses the files at path (a file or +// directory) as reports. If there are multiple files, and they all +// have names representing "nanoseconds since epoch" timestamps, +// e.g. "1488557088545489008.msgpack.gz", then the collector will +// return merged reports resulting from replaying the file reports in +// a loop at a sequence and speed determined by the timestamps. +// Otherwise the collector always returns the merger of all reports. +func NewFileCollector(path string, window time.Duration) (Collector, error) { + var ( + timestamps []time.Time + reports []report.Report + ) + allTimestamped := true + if err := filepath.Walk(path, + func(p string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if info.IsDir() { + return nil + } + t, err := timestampFromFilepath(p) + if err != nil { + allTimestamped = false + } + timestamps = append(timestamps, t) + + rpt, err := readReport(p) + if err != nil { + return err + } + reports = append(reports, rpt) + return nil + }); err != nil { + return nil, err + } + if len(reports) > 1 && allTimestamped { + collector := NewCollector(window) + go replay(collector, timestamps, reports) + return collector, nil + } + return StaticCollector(NewSmartMerger().Merge(reports).Upgrade()), nil +} + +func timestampFromFilepath(path string) (time.Time, error) { + name := filepath.Base(path) + for { + ext := filepath.Ext(name) + if ext == "" { + break + } + name = strings.TrimSuffix(name, ext) + } + nanosecondsSinceEpoch, err := strconv.ParseInt(name, 10, 64) + if err != nil { + return time.Time{}, fmt.Errorf("filename '%s' is not a number (representing nanoseconds since epoch): %v", name, err) + } + return time.Unix(0, nanosecondsSinceEpoch), nil +} + +func readReport(path string) (rpt report.Report, _ error) { f, err := os.Open(path) if err != nil { - return nil, err + return rpt, err } defer f.Close() var ( - rpt report.Report handle codec.Handle gzipped bool ) @@ -186,12 +244,37 @@ func NewFileCollector(path string) (Collector, error) { case ".msgpack": handle = &codec.MsgpackHandle{} default: - return nil, fmt.Errorf("Unsupported file extension: %v", fileType) + return rpt, fmt.Errorf("Unsupported file extension: %v", fileType) } - if err := rpt.ReadBinary(f, gzipped, handle); err != nil { - return nil, err + err = rpt.ReadBinary(f, gzipped, handle) + + return rpt, err +} + +func replay(a Adder, timestamps []time.Time, reports []report.Report) { + // calculate delays between report n and n+1 + l := len(timestamps) + delays := make([]time.Duration, l, l) + for i, t := range timestamps[0 : l-1] { + delays[i] = timestamps[i+1].Sub(t) + if delays[i] < 0 { + panic(fmt.Errorf("replay timestamps are not in order! %v", timestamps)) + } } + // We don't know how long to wait before looping round, so make a + // good guess. + delays[l-1] = timestamps[l-1].Sub(timestamps[0]) / time.Duration(l) - return StaticCollector(rpt), nil + due := time.Now() + for { + for i, r := range reports { + a.Add(nil, r, nil) + due = due.Add(delays[i]) + delay := due.Sub(time.Now()) + if delay > 0 { + time.Sleep(delay) + } + } + } } diff --git a/prog/app.go b/prog/app.go index ffd67d4446..0565688117 100644 --- a/prog/app.go +++ b/prog/app.go @@ -100,7 +100,7 @@ func collectorFactory(userIDer multitenant.UserIDer, collectorURL, s3URL, natsHo switch parsed.Scheme { case "file": - return app.NewFileCollector(parsed.Path) + return app.NewFileCollector(parsed.Path, window) case "dynamodb": s3, err := url.Parse(s3URL) if err != nil { diff --git a/prog/main.go b/prog/main.go index d361e8d70b..1bbafee25b 100644 --- a/prog/main.go +++ b/prog/main.go @@ -326,7 +326,7 @@ func main() { flag.Var(&containerLabelFilterFlags, "app.container-label-filter", "Add container label-based view filter, specified as title:label. Multiple flags are accepted. Example: --app.container-label-filter='Database Containers:role=db'") flag.Var(&containerLabelFilterFlagsExclude, "app.container-label-filter-exclude", "Add container label-based view filter that excludes containers with the given label, specified as title:label. Multiple flags are accepted. Example: --app.container-label-filter-exclude='Database Containers:role=db'") - flag.StringVar(&flags.app.collectorURL, "app.collector", "local", "Collector to use (local, dynamodb, or file)") + flag.StringVar(&flags.app.collectorURL, "app.collector", "local", "Collector to use (local, dynamodb, or file/directory)") flag.StringVar(&flags.app.s3URL, "app.collector.s3", "local", "S3 URL to use (when collector is dynamodb)") flag.StringVar(&flags.app.controlRouterURL, "app.control.router", "local", "Control router to use (local or sqs)") flag.StringVar(&flags.app.pipeRouterURL, "app.pipe.router", "local", "Pipe router to use (local)")