Skip to content

Commit

Permalink
Merge pull request #459 from weaveworks/458-blankety-blank
Browse files Browse the repository at this point in the history
Make report propagation more reliable
  • Loading branch information
tomwilkie committed Sep 9, 2015
2 parents 5f2e632 + d8ac330 commit 57aaf4b
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 1 deletion.
9 changes: 8 additions & 1 deletion probe/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,15 @@ func main() {
log.Printf("warning: process reporting enabled, but that requires root to find everything")
}

publisherFactory := func(target string) (xfer.Publisher, error) { return xfer.NewHTTPPublisher(target, *token, probeID) }
publisherFactory := func(target string) (xfer.Publisher, error) {
publisher, err := xfer.NewHTTPPublisher(target, *token, probeID)
if err != nil {
return nil, err
}
return xfer.NewBackgroundPublisher(publisher), nil
}
publishers := xfer.NewMultiPublisher(publisherFactory)
defer publishers.Stop()
resolver := newStaticResolver(targets, publishers.Add)
defer resolver.Stop()

Expand Down
82 changes: 82 additions & 0 deletions xfer/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,20 @@ import (
"net/url"
"strings"
"sync"
"time"

"github.com/weaveworks/scope/report"
)

const (
initialBackoff = 1 * time.Second
maxBackoff = 60 * time.Second
)

// Publisher is something which can send a report to a remote collector.
type Publisher interface {
Publish(report.Report) error
Stop()
}

// HTTPPublisher publishes reports by POST to a fixed endpoint.
Expand Down Expand Up @@ -51,6 +58,10 @@ func NewHTTPPublisher(target, token, id string) (*HTTPPublisher, error) {
}, nil
}

func (p HTTPPublisher) String() string {
return p.url
}

// Publish publishes the report to the URL.
func (p HTTPPublisher) Publish(rpt report.Report) error {
gzbuf := bytes.Buffer{}
Expand Down Expand Up @@ -83,12 +94,73 @@ func (p HTTPPublisher) Publish(rpt report.Report) error {
return nil
}

// Stop implements Publisher
func (p HTTPPublisher) Stop() {}

// AuthorizationHeader returns a value suitable for an HTTP Authorization
// header, based on the passed token string.
func AuthorizationHeader(token string) string {
return fmt.Sprintf("Scope-Probe token=%s", token)
}

// BackgroundPublisher is a publisher which does the publish asynchronously.
// It will only do one publish at once; if there is an ongoing publish,
// concurrent publishes are dropped.
type BackgroundPublisher struct {
publisher Publisher
reports chan report.Report
quit chan struct{}
}

// NewBackgroundPublisher creates a new BackgroundPublisher with the given publisher
func NewBackgroundPublisher(p Publisher) *BackgroundPublisher {
result := &BackgroundPublisher{
publisher: p,
reports: make(chan report.Report),
quit: make(chan struct{}),
}
go result.loop()
return result
}

func (b *BackgroundPublisher) loop() {
backoff := initialBackoff

for r := range b.reports {
err := b.publisher.Publish(r)
if err == nil {
backoff = initialBackoff
continue
}

log.Printf("Error publishing to %s, backing off %s: %v", b.publisher, backoff, err)
select {
case <-time.After(backoff):
case <-b.quit:
}
backoff = backoff * 2
if backoff > maxBackoff {
backoff = maxBackoff
}
}
}

// Publish implements Publisher
func (b *BackgroundPublisher) Publish(r report.Report) error {
select {
case b.reports <- r:
default:
}
return nil
}

// Stop implements Publisher
func (b *BackgroundPublisher) Stop() {
close(b.reports)
close(b.quit)
b.publisher.Stop()
}

// MultiPublisher implements Publisher over a set of publishers.
type MultiPublisher struct {
mtx sync.RWMutex
Expand Down Expand Up @@ -142,3 +214,13 @@ func (p *MultiPublisher) Publish(rpt report.Report) error {
}
return nil
}

// Stop implements Publisher
func (p *MultiPublisher) Stop() {
p.mtx.RLock()
defer p.mtx.RUnlock()

for _, publisher := range p.m {
publisher.Stop()
}
}
1 change: 1 addition & 0 deletions xfer/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,4 @@ func TestMultiPublisher(t *testing.T) {
type mockPublisher struct{ count int }

func (p *mockPublisher) Publish(report.Report) error { p.count++; return nil }
func (p *mockPublisher) Stop() {}

0 comments on commit 57aaf4b

Please sign in to comment.