From cfa32f7c55e1b39957a7d6f719d8c2fee4e781c9 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Mon, 7 Sep 2015 20:58:47 +0000 Subject: [PATCH] Push reports to the app in the background and in parallel. --- probe/main.go | 9 ++++++- xfer/publisher.go | 56 ++++++++++++++++++++++++++++++++++++++++++ xfer/publisher_test.go | 1 + 3 files changed, 65 insertions(+), 1 deletion(-) diff --git a/probe/main.go b/probe/main.go index 40de9fdb2e..86d8249c86 100644 --- a/probe/main.go +++ b/probe/main.go @@ -86,8 +86,15 @@ func main() { log.Printf("warning: process reporting enabled, but that requires root to find everything") } - publisherFactory := func(target string) (xfer.Publisher, error) { return xfer.NewHTTPPublisher(target, *token, probeID) } + publisherFactory := func(target string) (xfer.Publisher, error) { + publisher, err := xfer.NewHTTPPublisher(target, *token, probeID) + if err != nil { + return nil, err + } + return xfer.NewBackgroundPublisher(publisher), nil + } publishers := xfer.NewMultiPublisher(publisherFactory) + defer publishers.Stop() resolver := newStaticResolver(targets, publishers.Add) defer resolver.Stop() diff --git a/xfer/publisher.go b/xfer/publisher.go index bceaf0c08b..015cd01057 100644 --- a/xfer/publisher.go +++ b/xfer/publisher.go @@ -17,6 +17,7 @@ import ( // Publisher is something which can send a report to a remote collector. type Publisher interface { Publish(report.Report) error + Stop() } // HTTPPublisher publishes reports by POST to a fixed endpoint. @@ -83,12 +84,57 @@ func (p HTTPPublisher) Publish(rpt report.Report) error { return nil } +// Stop implements Publisher +func (p HTTPPublisher) Stop() {} + // AuthorizationHeader returns a value suitable for an HTTP Authorization // header, based on the passed token string. func AuthorizationHeader(token string) string { return fmt.Sprintf("Scope-Probe token=%s", token) } +// BackgroundPublisher is a publisher which does the publish asynchronously. +// It will only do one publish at once; if there is an ongoing publish, +// concurrent publishes are dropped. +type BackgroundPublisher struct { + publisher Publisher + reports chan report.Report +} + +// NewBackgroundPublisher creates a new BackgroundPublisher with the given publisher +func NewBackgroundPublisher(p Publisher) *BackgroundPublisher { + result := &BackgroundPublisher{ + publisher: p, + reports: make(chan report.Report), + } + go result.loop() + return result +} + +func (b *BackgroundPublisher) loop() { + for r := range b.reports { + if err := b.publisher.Publish(r); err != nil { + log.Printf("Error publishing: %v", err) + } + } +} + +// Publish implements Publisher +func (b *BackgroundPublisher) Publish(r report.Report) error { + select { + case b.reports <- r: + return nil + default: + return fmt.Errorf("Dropping report - can't push fast enough") + } +} + +// Stop implements Publisher +func (b *BackgroundPublisher) Stop() { + close(b.reports) + b.publisher.Stop() +} + // MultiPublisher implements Publisher over a set of publishers. type MultiPublisher struct { mtx sync.RWMutex @@ -142,3 +188,13 @@ func (p *MultiPublisher) Publish(rpt report.Report) error { } return nil } + +// Stop implements Publisher +func (p *MultiPublisher) Stop() { + p.mtx.RLock() + defer p.mtx.RUnlock() + + for _, publisher := range p.m { + publisher.Stop() + } +} diff --git a/xfer/publisher_test.go b/xfer/publisher_test.go index 6316c2e032..ecf0148b0f 100644 --- a/xfer/publisher_test.go +++ b/xfer/publisher_test.go @@ -102,3 +102,4 @@ func TestMultiPublisher(t *testing.T) { type mockPublisher struct{ count int } func (p *mockPublisher) Publish(report.Report) error { p.count++; return nil } +func (p *mockPublisher) Stop() {}