Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to print probe reports to stdout, for debugging #3204

Merged
merged 5 commits into from
Jun 20, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions app/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,13 @@ func RegisterReportPostHandler(a Adder, router *mux.Router) {
post.HandleFunc("/api/report", requestContextDecorator(func(ctx context.Context, w http.ResponseWriter, r *http.Request) {
var (
rpt report.Report
buf bytes.Buffer
reader = io.TeeReader(r.Body, &buf)
buf = &bytes.Buffer{}
reader = io.TeeReader(r.Body, buf)
)

gzipped := strings.Contains(r.Header.Get("Content-Encoding"), "gzip")
if !gzipped {
reader = io.TeeReader(r.Body, gzip.NewWriter(&buf))
reader = io.TeeReader(r.Body, gzip.NewWriter(buf))
}

contentType := r.Header.Get("Content-Type")
Expand All @@ -150,8 +150,7 @@ func RegisterReportPostHandler(a Adder, router *mux.Router) {

// a.Add(..., buf) assumes buf is gzip'd msgpack
if !isMsgpack {
buf = bytes.Buffer{}
rpt.WriteBinary(&buf, gzip.DefaultCompression)
buf, _ = rpt.WriteBinary()
}

if err := a.Add(ctx, rpt, buf.Bytes()); err != nil {
Expand Down
8 changes: 6 additions & 2 deletions extras/fixprobe/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package main

import (
"bytes"
"flag"
"fmt"
"io/ioutil"
Expand Down Expand Up @@ -60,8 +61,11 @@ func main() {
log.Fatal(err)
}

rp := appclient.NewReportPublisher(client, false)
buf, err := fixedReport.WriteBinary()
if err != nil {
log.Fatal(err)
}
for range time.Tick(*publishInterval) {
rp.Publish(fixedReport)
client.Publish(bytes.NewReader(buf.Bytes()), fixedReport.Shortcut)
}
}
9 changes: 4 additions & 5 deletions probe/appclient/app_client_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,9 @@ func TestAppClientPublish(t *testing.T) {
defer p.Stop()

// First few reports might be dropped as the client is spinning up.
rp := NewReportPublisher(p, false)
for i := 0; i < 10; i++ {
if err := rp.Publish(rpt); err != nil {
buf, _ := rpt.WriteBinary()
if err := p.Publish(buf, false); err != nil {
t.Error(err)
}
time.Sleep(10 * time.Millisecond)
Expand Down Expand Up @@ -204,15 +204,14 @@ func TestStop(t *testing.T) {
t.Fatal(err)
}

rp := NewReportPublisher(p, false)

// Make sure the app received our report and is stuck
for done := false; !done; {
select {
case <-receivedReport:
done = true
default:
if err := rp.Publish(rpt); err != nil {
buf, _ := rpt.WriteBinary()
if err := p.Publish(buf, false); err != nil {
t.Error(err)
}
time.Sleep(10 * time.Millisecond)
Expand Down
24 changes: 4 additions & 20 deletions probe/appclient/multi_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"bytes"
"errors"
"fmt"
"io"
"io/ioutil"
"net/url"
"strings"
"sync"
Expand Down Expand Up @@ -37,21 +35,14 @@ type clientTuple struct {
AppClient
}

// Publisher is something which can send a stream of data somewhere, probably
// to a remote collector.
type Publisher interface {
Publish(io.Reader, bool) error
Stop()
}

// MultiAppClient maintains a set of upstream apps, and ensures we have an
// AppClient for each one.
type MultiAppClient interface {
Set(hostname string, urls []url.URL)
PipeConnection(appID, pipeID string, pipe xfer.Pipe) error
PipeClose(appID, pipeID string) error
Stop()
Publish(io.Reader, bool) error
Publish(r report.Report) error
}

// NewMultiAppClient creates a new MultiAppClient.
Expand Down Expand Up @@ -165,25 +156,18 @@ func (c *multiClient) Stop() {
// underlying publishers sequentially. To do that, it needs to drain the
// reader, and recreate new readers for each publisher. Note that it will
// publish to one endpoint for each unique ID. Failed publishes don't count.
func (c *multiClient) Publish(r io.Reader, shortcut bool) error {
func (c *multiClient) Publish(r report.Report) error {
c.mtx.Lock()
defer c.mtx.Unlock()

if len(c.clients) <= 1 { // optimisation
for _, c := range c.clients {
return c.Publish(r, shortcut)
}
return nil
}

buf, err := ioutil.ReadAll(r)
buf, err := r.WriteBinary()
if err != nil {
return err
}

errs := []string{}
for _, c := range c.clients {
if err := c.Publish(bytes.NewReader(buf), shortcut); err != nil {
if err := c.Publish(bytes.NewReader(buf.Bytes()), r.Shortcut); err != nil {
errs = append(errs, err.Error())
}
}
Expand Down
5 changes: 3 additions & 2 deletions probe/appclient/multi_client_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package appclient_test

import (
"bytes"
"io"
"net/url"
"runtime"
"testing"

"github.com/weaveworks/scope/common/xfer"
"github.com/weaveworks/scope/probe/appclient"
"github.com/weaveworks/scope/report"
)

type mockClient struct {
Expand Down Expand Up @@ -105,8 +105,9 @@ func TestMultiClientPublish(t *testing.T) {
mp.Set("a", []url.URL{{Host: "a1"}, {Host: "a2"}})
mp.Set("b", []url.URL{{Host: "b2"}, {Host: "b3"}})

rpt := report.MakeReport()
for i := 1; i < 10; i++ {
if err := mp.Publish(&bytes.Buffer{}, false); err != nil {
if err := mp.Publish(rpt); err != nil {
t.Error(err)
}
if want, have := 3*i, sum(); want != have {
Expand Down
34 changes: 0 additions & 34 deletions probe/appclient/report_publisher.go

This file was deleted.

9 changes: 9 additions & 0 deletions probe/controls/pipes.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,12 @@ func (p *pipe) Close() error {
}
return err2
}

// DummyPipeClient implements PipeClient when running the probe in debugging mode
type DummyPipeClient struct{}

// PipeConnection implements controls.PipeClient
func (DummyPipeClient) PipeConnection(appID, pipeID string, pipe xfer.Pipe) error { return nil }

// PipeClose implements controls.PipeClient
func (DummyPipeClient) PipeClose(appID, pipeID string) error { return nil }
19 changes: 15 additions & 4 deletions probe/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,23 @@ import (
log "github.com/Sirupsen/logrus"
"github.com/armon/go-metrics"

"github.com/weaveworks/scope/probe/appclient"
"github.com/weaveworks/scope/report"
)

const (
reportBufferSize = 16
)

// ReportPublisher publishes reports, probably to a remote collector.
type ReportPublisher interface {
Publish(r report.Report) error
}

// Probe sits there, generating and publishing reports.
type Probe struct {
spyInterval, publishInterval time.Duration
publisher *appclient.ReportPublisher
publisher ReportPublisher
noControls bool

tickers []Ticker
reporters []Reporter
Expand Down Expand Up @@ -67,13 +72,14 @@ type Ticker interface {
// New makes a new Probe.
func New(
spyInterval, publishInterval time.Duration,
publisher appclient.Publisher,
publisher ReportPublisher,
noControls bool,
) *Probe {
result := &Probe{
spyInterval: spyInterval,
publishInterval: publishInterval,
publisher: appclient.NewReportPublisher(publisher, noControls),
publisher: publisher,
noControls: noControls,
quit: make(chan struct{}),
spiedReports: make(chan report.Report, reportBufferSize),
shortcutReports: make(chan report.Report, reportBufferSize),
Expand Down Expand Up @@ -200,6 +206,11 @@ ForLoop:
}
}

if p.noControls {
rpt.WalkTopologies(func(t *report.Topology) {
t.Controls = report.Controls{}
})
}
if err := p.publisher.Publish(rpt); err != nil {
log.Infof("publish: %v", err)
}
Expand Down
11 changes: 1 addition & 10 deletions probe/probe_internal_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
package probe

import (
"compress/gzip"
"io"
"testing"
"time"

"github.com/ugorji/go/codec"
"github.com/weaveworks/common/mtime"
"github.com/weaveworks/scope/report"
"github.com/weaveworks/scope/test"
Expand Down Expand Up @@ -53,13 +50,7 @@ type mockPublisher struct {
have chan report.Report
}

func (m mockPublisher) Publish(in io.Reader, shortcut bool) error {
var r report.Report
if reader, err := gzip.NewReader(in); err != nil {
return err
} else if err := codec.NewDecoder(reader, &codec.MsgpackHandle{}).Decode(&r); err != nil {
return err
}
func (m mockPublisher) Publish(r report.Report) error {
m.have <- r
return nil
}
Expand Down
2 changes: 2 additions & 0 deletions prog/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ type flags struct {
}

type probeFlags struct {
printOnStdout bool
token string
httpListen string
publishInterval time.Duration
Expand Down Expand Up @@ -274,6 +275,7 @@ func setupFlags(flags *flags) {
flag.Bool("app-only", false, "Only run the app.")

// Probe flags
flag.BoolVar(&flags.probe.printOnStdout, "probe.publish.stdout", false, "Print reports on stdout instead of sending to app, for debugging")
flag.StringVar(&flags.probe.token, serviceTokenFlag, "", "Token to authenticate with cloud.weave.works")
flag.StringVar(&flags.probe.token, probeTokenFlag, "", "Token to authenticate with cloud.weave.works")
flag.StringVar(&flags.probe.httpListen, "probe.http.listen", "", "listen address for HTTP profiling and instrumentation server")
Expand Down
Loading