Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make report propagation more reliable #459

Merged
merged 2 commits into from
Sep 9, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion probe/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,15 @@ func main() {
log.Printf("warning: process reporting enabled, but that requires root to find everything")
}

publisherFactory := func(target string) (xfer.Publisher, error) { return xfer.NewHTTPPublisher(target, *token, probeID) }
publisherFactory := func(target string) (xfer.Publisher, error) {
publisher, err := xfer.NewHTTPPublisher(target, *token, probeID)
if err != nil {
return nil, err
}
return xfer.NewBackgroundPublisher(publisher), nil
}
publishers := xfer.NewMultiPublisher(publisherFactory)
defer publishers.Stop()
resolver := newStaticResolver(targets, publishers.Add)
defer resolver.Stop()

Expand Down
82 changes: 82 additions & 0 deletions xfer/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,20 @@ import (
"net/url"
"strings"
"sync"
"time"

"github.com/weaveworks/scope/report"
)

const (
initialBackoff = 1 * time.Second
maxBackoff = 60 * time.Second
)

// Publisher is something which can send a report to a remote collector.
type Publisher interface {
Publish(report.Report) error
Stop()
}

// HTTPPublisher publishes reports by POST to a fixed endpoint.
Expand Down Expand Up @@ -51,6 +58,10 @@ func NewHTTPPublisher(target, token, id string) (*HTTPPublisher, error) {
}, nil
}

func (p HTTPPublisher) String() string {
return p.url
}

// Publish publishes the report to the URL.
func (p HTTPPublisher) Publish(rpt report.Report) error {
gzbuf := bytes.Buffer{}
Expand Down Expand Up @@ -83,12 +94,73 @@ func (p HTTPPublisher) Publish(rpt report.Report) error {
return nil
}

// Stop implements Publisher
func (p HTTPPublisher) Stop() {}

This comment was marked as abuse.

This comment was marked as abuse.


// AuthorizationHeader returns a value suitable for an HTTP Authorization
// header, based on the passed token string.
func AuthorizationHeader(token string) string {
return fmt.Sprintf("Scope-Probe token=%s", token)
}

// BackgroundPublisher is a publisher which does the publish asynchronously.
// It will only do one publish at once; if there is an ongoing publish,
// concurrent publishes are dropped.
type BackgroundPublisher struct {
publisher Publisher
reports chan report.Report
quit chan struct{}
}

// NewBackgroundPublisher creates a new BackgroundPublisher with the given publisher
func NewBackgroundPublisher(p Publisher) *BackgroundPublisher {
result := &BackgroundPublisher{
publisher: p,
reports: make(chan report.Report),
quit: make(chan struct{}),
}
go result.loop()
return result
}

func (b *BackgroundPublisher) loop() {
backoff := initialBackoff

for r := range b.reports {
err := b.publisher.Publish(r)
if err == nil {
backoff = initialBackoff
continue
}

log.Printf("Error publishing to %s, backing off %s: %v", b.publisher, backoff, err)
select {
case <-time.After(backoff):
case <-b.quit:
}
backoff = backoff * 2
if backoff > maxBackoff {
backoff = maxBackoff
}
}
}

// Publish implements Publisher
func (b *BackgroundPublisher) Publish(r report.Report) error {
select {
case b.reports <- r:
default:
}
return nil
}

// Stop implements Publisher
func (b *BackgroundPublisher) Stop() {
close(b.reports)
close(b.quit)
b.publisher.Stop()
}

// MultiPublisher implements Publisher over a set of publishers.
type MultiPublisher struct {
mtx sync.RWMutex
Expand Down Expand Up @@ -142,3 +214,13 @@ func (p *MultiPublisher) Publish(rpt report.Report) error {
}
return nil
}

// Stop implements Publisher
func (p *MultiPublisher) Stop() {
p.mtx.RLock()
defer p.mtx.RUnlock()

for _, publisher := range p.m {
publisher.Stop()
}
}
1 change: 1 addition & 0 deletions xfer/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,4 @@ func TestMultiPublisher(t *testing.T) {
type mockPublisher struct{ count int }

func (p *mockPublisher) Publish(report.Report) error { p.count++; return nil }
func (p *mockPublisher) Stop() {}