From b2c271c51150b205c42626ab5cf08c505bc3b34d Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Mon, 7 Sep 2015 20:58:47 +0000 Subject: [PATCH 1/2] Push reports to the app in the background and in parallel. --- probe/main.go | 9 ++++++- xfer/publisher.go | 56 ++++++++++++++++++++++++++++++++++++++++++ xfer/publisher_test.go | 1 + 3 files changed, 65 insertions(+), 1 deletion(-) diff --git a/probe/main.go b/probe/main.go index db0c24971d..a43b99a848 100644 --- a/probe/main.go +++ b/probe/main.go @@ -86,8 +86,15 @@ func main() { log.Printf("warning: process reporting enabled, but that requires root to find everything") } - publisherFactory := func(target string) (xfer.Publisher, error) { return xfer.NewHTTPPublisher(target, *token, probeID) } + publisherFactory := func(target string) (xfer.Publisher, error) { + publisher, err := xfer.NewHTTPPublisher(target, *token, probeID) + if err != nil { + return nil, err + } + return xfer.NewBackgroundPublisher(publisher), nil + } publishers := xfer.NewMultiPublisher(publisherFactory) + defer publishers.Stop() resolver := newStaticResolver(targets, publishers.Add) defer resolver.Stop() diff --git a/xfer/publisher.go b/xfer/publisher.go index bceaf0c08b..015cd01057 100644 --- a/xfer/publisher.go +++ b/xfer/publisher.go @@ -17,6 +17,7 @@ import ( // Publisher is something which can send a report to a remote collector. type Publisher interface { Publish(report.Report) error + Stop() } // HTTPPublisher publishes reports by POST to a fixed endpoint. @@ -83,12 +84,57 @@ func (p HTTPPublisher) Publish(rpt report.Report) error { return nil } +// Stop implements Publisher +func (p HTTPPublisher) Stop() {} + // AuthorizationHeader returns a value suitable for an HTTP Authorization // header, based on the passed token string. func AuthorizationHeader(token string) string { return fmt.Sprintf("Scope-Probe token=%s", token) } +// BackgroundPublisher is a publisher which does the publish asynchronously. +// It will only do one publish at once; if there is an ongoing publish, +// concurrent publishes are dropped. +type BackgroundPublisher struct { + publisher Publisher + reports chan report.Report +} + +// NewBackgroundPublisher creates a new BackgroundPublisher with the given publisher +func NewBackgroundPublisher(p Publisher) *BackgroundPublisher { + result := &BackgroundPublisher{ + publisher: p, + reports: make(chan report.Report), + } + go result.loop() + return result +} + +func (b *BackgroundPublisher) loop() { + for r := range b.reports { + if err := b.publisher.Publish(r); err != nil { + log.Printf("Error publishing: %v", err) + } + } +} + +// Publish implements Publisher +func (b *BackgroundPublisher) Publish(r report.Report) error { + select { + case b.reports <- r: + return nil + default: + return fmt.Errorf("Dropping report - can't push fast enough") + } +} + +// Stop implements Publisher +func (b *BackgroundPublisher) Stop() { + close(b.reports) + b.publisher.Stop() +} + // MultiPublisher implements Publisher over a set of publishers. type MultiPublisher struct { mtx sync.RWMutex @@ -142,3 +188,13 @@ func (p *MultiPublisher) Publish(rpt report.Report) error { } return nil } + +// Stop implements Publisher +func (p *MultiPublisher) Stop() { + p.mtx.RLock() + defer p.mtx.RUnlock() + + for _, publisher := range p.m { + publisher.Stop() + } +} diff --git a/xfer/publisher_test.go b/xfer/publisher_test.go index 6316c2e032..ecf0148b0f 100644 --- a/xfer/publisher_test.go +++ b/xfer/publisher_test.go @@ -102,3 +102,4 @@ func TestMultiPublisher(t *testing.T) { type mockPublisher struct{ count int } func (p *mockPublisher) Publish(report.Report) error { p.count++; return nil } +func (p *mockPublisher) Stop() {} From d8ac330589e46858857f4394e2760ed8b565aaab Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Wed, 9 Sep 2015 09:56:39 +0000 Subject: [PATCH 2/2] Backoff on failed report pushes. --- xfer/publisher.go | 34 ++++++++++++++++++++++++++++++---- 1 file changed, 30 insertions(+), 4 deletions(-) diff --git a/xfer/publisher.go b/xfer/publisher.go index 015cd01057..1eb1fcdd26 100644 --- a/xfer/publisher.go +++ b/xfer/publisher.go @@ -10,10 +10,16 @@ import ( "net/url" "strings" "sync" + "time" "github.com/weaveworks/scope/report" ) +const ( + initialBackoff = 1 * time.Second + maxBackoff = 60 * time.Second +) + // Publisher is something which can send a report to a remote collector. type Publisher interface { Publish(report.Report) error @@ -52,6 +58,10 @@ func NewHTTPPublisher(target, token, id string) (*HTTPPublisher, error) { }, nil } +func (p HTTPPublisher) String() string { + return p.url +} + // Publish publishes the report to the URL. func (p HTTPPublisher) Publish(rpt report.Report) error { gzbuf := bytes.Buffer{} @@ -99,6 +109,7 @@ func AuthorizationHeader(token string) string { type BackgroundPublisher struct { publisher Publisher reports chan report.Report + quit chan struct{} } // NewBackgroundPublisher creates a new BackgroundPublisher with the given publisher @@ -106,15 +117,30 @@ func NewBackgroundPublisher(p Publisher) *BackgroundPublisher { result := &BackgroundPublisher{ publisher: p, reports: make(chan report.Report), + quit: make(chan struct{}), } go result.loop() return result } func (b *BackgroundPublisher) loop() { + backoff := initialBackoff + for r := range b.reports { - if err := b.publisher.Publish(r); err != nil { - log.Printf("Error publishing: %v", err) + err := b.publisher.Publish(r) + if err == nil { + backoff = initialBackoff + continue + } + + log.Printf("Error publishing to %s, backing off %s: %v", b.publisher, backoff, err) + select { + case <-time.After(backoff): + case <-b.quit: + } + backoff = backoff * 2 + if backoff > maxBackoff { + backoff = maxBackoff } } } @@ -123,15 +149,15 @@ func (b *BackgroundPublisher) loop() { func (b *BackgroundPublisher) Publish(r report.Report) error { select { case b.reports <- r: - return nil default: - return fmt.Errorf("Dropping report - can't push fast enough") } + return nil } // Stop implements Publisher func (b *BackgroundPublisher) Stop() { close(b.reports) + close(b.quit) b.publisher.Stop() }