diff --git a/probe/main.go b/probe/main.go index db0c24971d..a43b99a848 100644 --- a/probe/main.go +++ b/probe/main.go @@ -86,8 +86,15 @@ func main() { log.Printf("warning: process reporting enabled, but that requires root to find everything") } - publisherFactory := func(target string) (xfer.Publisher, error) { return xfer.NewHTTPPublisher(target, *token, probeID) } + publisherFactory := func(target string) (xfer.Publisher, error) { + publisher, err := xfer.NewHTTPPublisher(target, *token, probeID) + if err != nil { + return nil, err + } + return xfer.NewBackgroundPublisher(publisher), nil + } publishers := xfer.NewMultiPublisher(publisherFactory) + defer publishers.Stop() resolver := newStaticResolver(targets, publishers.Add) defer resolver.Stop() diff --git a/xfer/publisher.go b/xfer/publisher.go index bceaf0c08b..1eb1fcdd26 100644 --- a/xfer/publisher.go +++ b/xfer/publisher.go @@ -10,13 +10,20 @@ import ( "net/url" "strings" "sync" + "time" "github.com/weaveworks/scope/report" ) +const ( + initialBackoff = 1 * time.Second + maxBackoff = 60 * time.Second +) + // Publisher is something which can send a report to a remote collector. type Publisher interface { Publish(report.Report) error + Stop() } // HTTPPublisher publishes reports by POST to a fixed endpoint. @@ -51,6 +58,10 @@ func NewHTTPPublisher(target, token, id string) (*HTTPPublisher, error) { }, nil } +func (p HTTPPublisher) String() string { + return p.url +} + // Publish publishes the report to the URL. func (p HTTPPublisher) Publish(rpt report.Report) error { gzbuf := bytes.Buffer{} @@ -83,12 +94,73 @@ func (p HTTPPublisher) Publish(rpt report.Report) error { return nil } +// Stop implements Publisher +func (p HTTPPublisher) Stop() {} + // AuthorizationHeader returns a value suitable for an HTTP Authorization // header, based on the passed token string. func AuthorizationHeader(token string) string { return fmt.Sprintf("Scope-Probe token=%s", token) } +// BackgroundPublisher is a publisher which does the publish asynchronously. +// It will only do one publish at once; if there is an ongoing publish, +// concurrent publishes are dropped. +type BackgroundPublisher struct { + publisher Publisher + reports chan report.Report + quit chan struct{} +} + +// NewBackgroundPublisher creates a new BackgroundPublisher with the given publisher +func NewBackgroundPublisher(p Publisher) *BackgroundPublisher { + result := &BackgroundPublisher{ + publisher: p, + reports: make(chan report.Report), + quit: make(chan struct{}), + } + go result.loop() + return result +} + +func (b *BackgroundPublisher) loop() { + backoff := initialBackoff + + for r := range b.reports { + err := b.publisher.Publish(r) + if err == nil { + backoff = initialBackoff + continue + } + + log.Printf("Error publishing to %s, backing off %s: %v", b.publisher, backoff, err) + select { + case <-time.After(backoff): + case <-b.quit: + } + backoff = backoff * 2 + if backoff > maxBackoff { + backoff = maxBackoff + } + } +} + +// Publish implements Publisher +func (b *BackgroundPublisher) Publish(r report.Report) error { + select { + case b.reports <- r: + default: + } + return nil +} + +// Stop implements Publisher +func (b *BackgroundPublisher) Stop() { + close(b.reports) + close(b.quit) + b.publisher.Stop() +} + // MultiPublisher implements Publisher over a set of publishers. type MultiPublisher struct { mtx sync.RWMutex @@ -142,3 +214,13 @@ func (p *MultiPublisher) Publish(rpt report.Report) error { } return nil } + +// Stop implements Publisher +func (p *MultiPublisher) Stop() { + p.mtx.RLock() + defer p.mtx.RUnlock() + + for _, publisher := range p.m { + publisher.Stop() + } +} diff --git a/xfer/publisher_test.go b/xfer/publisher_test.go index 6316c2e032..ecf0148b0f 100644 --- a/xfer/publisher_test.go +++ b/xfer/publisher_test.go @@ -102,3 +102,4 @@ func TestMultiPublisher(t *testing.T) { type mockPublisher struct{ count int } func (p *mockPublisher) Publish(report.Report) error { p.count++; return nil } +func (p *mockPublisher) Stop() {}