Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Time travel control #2524

Merged
merged 21 commits into from
Jun 12, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions app/api_report.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
// Raw report handler
func makeRawReportHandler(rep Reporter) CtxHandlerFunc {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) {
report, err := rep.Report(ctx)
report, err := rep.Report(ctx, time.Now())
if err != nil {
respondWith(w, http.StatusInternalServerError, err)
return
Expand All @@ -32,7 +32,7 @@ type probeDesc struct {
// Probe handler
func makeProbeHandler(rep Reporter) CtxHandlerFunc {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) {
rpt, err := rep.Report(ctx)
rpt, err := rep.Report(ctx, time.Now())
if err != nil {
respondWith(w, http.StatusInternalServerError, err)
return
Expand Down
5 changes: 3 additions & 2 deletions app/api_topologies.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sort"
"strings"
"sync"
"time"

log "github.com/Sirupsen/logrus"
"github.com/gorilla/mux"
Expand Down Expand Up @@ -476,7 +477,7 @@ func (r *Registry) walk(f func(APITopologyDesc)) {
// makeTopologyList returns a handler that yields an APITopologyList.
func (r *Registry) makeTopologyList(rep Reporter) CtxHandlerFunc {
return func(ctx context.Context, w http.ResponseWriter, req *http.Request) {
report, err := rep.Report(ctx)
report, err := rep.Report(ctx, time.Now())
if err != nil {
respondWith(w, http.StatusInternalServerError, err)
return
Expand Down Expand Up @@ -568,7 +569,7 @@ func (r *Registry) captureRenderer(rep Reporter, f rendererHandler) CtxHandlerFu
http.NotFound(w, req)
return
}
rpt, err := rep.Report(ctx)
rpt, err := rep.Report(ctx, time.Now())
if err != nil {
respondWith(w, http.StatusInternalServerError, err)
return
Expand Down
27 changes: 22 additions & 5 deletions app/api_topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,16 +93,33 @@ func handleWebsocket(
}(conn)

var (
previousTopo detailed.NodeSummaries
tick = time.Tick(loop)
wait = make(chan struct{}, 1)
topologyID = mux.Vars(r)["topology"]
previousTopo detailed.NodeSummaries
tick = time.Tick(loop)
wait = make(chan struct{}, 1)
topologyID = mux.Vars(r)["topology"]
channelOpenedAt = time.Now()
// By default we will always be reporting the most recent state.
startReportingAt = time.Now()
)

// If the timestamp is provided explicitly by the UI, we start reporting from there.
if timestampStr := r.Form.Get("timestamp"); timestampStr != "" {
startReportingAt, _ = time.Parse(time.RFC3339, timestampStr)
}

rep.WaitOn(ctx, wait)
defer rep.UnWait(ctx, wait)

for {
report, err := rep.Report(ctx)
// We measure how much time has passed since the channel was opened
// and add it to the initial report timestamp to get the timestamp
// of the snapshot we want to report right now.
// NOTE: Multiplying `timestampDelta` by a constant factor here
// would have an effect of fast-forward, which is something we
// might be interested in implementing in the future.
timestampDelta := time.Since(channelOpenedAt)
reportTimestamp := startReportingAt.Add(timestampDelta)
report, err := rep.Report(ctx, reportTimestamp)
if err != nil {
log.Errorf("Error generating report: %v", err)
return
Expand Down
3 changes: 2 additions & 1 deletion app/benchmark_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"net/http"
"net/url"
"testing"
"time"

"golang.org/x/net/context"

Expand All @@ -27,7 +28,7 @@ func loadReport() (report.Report, error) {
return fixture.Report, err
}

return c.Report(context.Background())
return c.Report(context.Background(), time.Now())
}

func BenchmarkTopologyList(b *testing.B) {
Expand Down
10 changes: 6 additions & 4 deletions app/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const reportQuantisationInterval = 3 * time.Second
// Reporter is something that can produce reports on demand. It's a convenient
// interface for parts of the app, and several experimental components.
type Reporter interface {
Report(context.Context) (report.Report, error)
Report(context.Context, time.Time) (report.Report, error)
WaitOn(context.Context, chan struct{})
UnWait(context.Context, chan struct{})
}
Expand Down Expand Up @@ -118,14 +118,14 @@ func (c *collector) Add(_ context.Context, rpt report.Report, _ []byte) error {

// Report returns a merged report over all added reports. It implements
// Reporter.
func (c *collector) Report(_ context.Context) (report.Report, error) {
func (c *collector) Report(_ context.Context, timestamp time.Time) (report.Report, error) {
c.mtx.Lock()
defer c.mtx.Unlock()

// If the oldest report is still within range,
// and there is a cached report, return that.
if c.cached != nil && len(c.reports) > 0 {
oldest := mtime.Now().Add(-c.window)
oldest := timestamp.Add(-c.window)
if c.timestamps[0].After(oldest) {
return *c.cached, nil
}
Expand Down Expand Up @@ -191,7 +191,9 @@ type StaticCollector report.Report

// Report returns a merged report over all added reports. It implements
// Reporter.
func (c StaticCollector) Report(context.Context) (report.Report, error) { return report.Report(c), nil }
func (c StaticCollector) Report(context.Context, time.Time) (report.Report, error) {
return report.Report(c), nil
}

// Add adds a report to the collector's internal state. It implements Adder.
func (c StaticCollector) Add(context.Context, report.Report, []byte) error { return nil }
Expand Down
29 changes: 23 additions & 6 deletions app/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,17 @@ func TestCollector(t *testing.T) {
window := 10 * time.Second
c := app.NewCollector(window)

now := time.Now()
mtime.NowForce(now)
defer mtime.NowReset()

r1 := report.MakeReport()
r1.Endpoint.AddNode(report.MakeNode("foo"))

r2 := report.MakeReport()
r2.Endpoint.AddNode(report.MakeNode("foo"))

have, err := c.Report(ctx)
have, err := c.Report(ctx, mtime.Now())
if err != nil {
t.Error(err)
}
Expand All @@ -33,25 +37,38 @@ func TestCollector(t *testing.T) {
}

c.Add(ctx, r1, nil)
have, err = c.Report(ctx)
have, err = c.Report(ctx, mtime.Now())
if err != nil {
t.Error(err)
}
if want := r1; !reflect.DeepEqual(want, have) {
t.Error(test.Diff(want, have))
}

timeBefore := mtime.Now()
mtime.NowForce(now.Add(time.Second))

c.Add(ctx, r2, nil)
merged := report.MakeReport()
merged = merged.Merge(r1)
merged = merged.Merge(r2)
have, err = c.Report(ctx)
have, err = c.Report(ctx, mtime.Now())
if err != nil {
t.Error(err)
}
if want := merged; !reflect.DeepEqual(want, have) {
t.Error(test.Diff(want, have))
}

// Since the timestamp given is before r2 was added,
// it shouldn't be included in the final report.
have, err = c.Report(ctx, timeBefore)
if err != nil {
t.Error(err)
}
if want := r1; !reflect.DeepEqual(want, have) {
t.Error(test.Diff(want, have))
}
}

func TestCollectorExpire(t *testing.T) {
Expand All @@ -64,7 +81,7 @@ func TestCollectorExpire(t *testing.T) {
c := app.NewCollector(window)

// 1st check the collector is empty
have, err := c.Report(ctx)
have, err := c.Report(ctx, mtime.Now())
if err != nil {
t.Error(err)
}
Expand All @@ -76,7 +93,7 @@ func TestCollectorExpire(t *testing.T) {
r1 := report.MakeReport()
r1.Endpoint.AddNode(report.MakeNode("foo"))
c.Add(ctx, r1, nil)
have, err = c.Report(ctx)
have, err = c.Report(ctx, mtime.Now())
if err != nil {
t.Error(err)
}
Expand All @@ -86,7 +103,7 @@ func TestCollectorExpire(t *testing.T) {

// Finally move time forward to expire the report
mtime.NowForce(now.Add(window))
have, err = c.Report(ctx)
have, err = c.Report(ctx, mtime.Now())
if err != nil {
t.Error(err)
}
Expand Down
19 changes: 10 additions & 9 deletions app/multitenant/aws_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,12 +297,13 @@ func (c *awsCollector) getReports(ctx context.Context, reportKeys []string) ([]r
return reports, nil
}

func (c *awsCollector) Report(ctx context.Context) (report.Report, error) {
func (c *awsCollector) Report(ctx context.Context, timestamp time.Time) (report.Report, error) {
var (
now = time.Now()
start = now.Add(-c.window)
rowStart, rowEnd = start.UnixNano() / time.Hour.Nanoseconds(), now.UnixNano() / time.Hour.Nanoseconds()
userid, err = c.userIDer(ctx)
end = timestamp
start = end.Add(-c.window)
rowStart = start.UnixNano() / time.Hour.Nanoseconds()
rowEnd = end.UnixNano() / time.Hour.Nanoseconds()
userid, err = c.userIDer(ctx)
)
if err != nil {
return report.MakeReport(), err
Expand All @@ -311,25 +312,25 @@ func (c *awsCollector) Report(ctx context.Context) (report.Report, error) {
// Queries will only every span 2 rows max.
var reportKeys []string
if rowStart != rowEnd {
reportKeys1, err := c.getReportKeys(ctx, userid, rowStart, start, now)
reportKeys1, err := c.getReportKeys(ctx, userid, rowStart, start, end)
if err != nil {
return report.MakeReport(), err
}

reportKeys2, err := c.getReportKeys(ctx, userid, rowEnd, start, now)
reportKeys2, err := c.getReportKeys(ctx, userid, rowEnd, start, end)
if err != nil {
return report.MakeReport(), err
}

reportKeys = append(reportKeys, reportKeys1...)
reportKeys = append(reportKeys, reportKeys2...)
} else {
if reportKeys, err = c.getReportKeys(ctx, userid, rowEnd, start, now); err != nil {
if reportKeys, err = c.getReportKeys(ctx, userid, rowEnd, start, end); err != nil {
return report.MakeReport(), err
}
}

log.Debugf("Fetching %d reports from %v to %v", len(reportKeys), start, now)
log.Debugf("Fetching %d reports from %v to %v", len(reportKeys), start, end)
reports, err := c.getReports(ctx, reportKeys)
if err != nil {
return report.MakeReport(), err
Expand Down
3 changes: 2 additions & 1 deletion app/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net/url"
"strings"
"sync"
"time"

"github.com/PuerkitoBio/ghost/handlers"
log "github.com/Sirupsen/logrus"
Expand Down Expand Up @@ -179,7 +180,7 @@ func NewVersion(version, downloadURL string) {

func apiHandler(rep Reporter) CtxHandlerFunc {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) {
report, err := rep.Report(ctx)
report, err := rep.Report(ctx, time.Now())
if err != nil {
respondWith(w, http.StatusInternalServerError, err)
return
Expand Down
2 changes: 1 addition & 1 deletion app/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestReportPostHandler(t *testing.T) {
}

ctx := context.Background()
report, err := c.Report(ctx)
report, err := c.Report(ctx, time.Now())
if err != nil {
t.Error(err)
}
Expand Down
Loading