diff --git a/app/router.go b/app/router.go index 414f45f99f..e9efa01568 100644 --- a/app/router.go +++ b/app/router.go @@ -121,13 +121,13 @@ func RegisterReportPostHandler(a Adder, router *mux.Router) { post.HandleFunc("/api/report", requestContextDecorator(func(ctx context.Context, w http.ResponseWriter, r *http.Request) { var ( rpt report.Report - buf bytes.Buffer - reader = io.TeeReader(r.Body, &buf) + buf = &bytes.Buffer{} + reader = io.TeeReader(r.Body, buf) ) gzipped := strings.Contains(r.Header.Get("Content-Encoding"), "gzip") if !gzipped { - reader = io.TeeReader(r.Body, gzip.NewWriter(&buf)) + reader = io.TeeReader(r.Body, gzip.NewWriter(buf)) } contentType := r.Header.Get("Content-Type") @@ -150,8 +150,7 @@ func RegisterReportPostHandler(a Adder, router *mux.Router) { // a.Add(..., buf) assumes buf is gzip'd msgpack if !isMsgpack { - buf = bytes.Buffer{} - rpt.WriteBinary(&buf, gzip.DefaultCompression) + buf, _ = rpt.WriteBinary() } if err := a.Add(ctx, rpt, buf.Bytes()); err != nil { diff --git a/extras/fixprobe/main.go b/extras/fixprobe/main.go index 5b99925cbc..12b80cc708 100644 --- a/extras/fixprobe/main.go +++ b/extras/fixprobe/main.go @@ -2,6 +2,7 @@ package main import ( + "bytes" "flag" "fmt" "io/ioutil" @@ -60,8 +61,11 @@ func main() { log.Fatal(err) } - rp := appclient.NewReportPublisher(client, false) + buf, err := fixedReport.WriteBinary() + if err != nil { + log.Fatal(err) + } for range time.Tick(*publishInterval) { - rp.Publish(fixedReport) + client.Publish(bytes.NewReader(buf.Bytes()), fixedReport.Shortcut) } } diff --git a/probe/appclient/app_client_internal_test.go b/probe/appclient/app_client_internal_test.go index 66bcae51d7..15f4b636d0 100644 --- a/probe/appclient/app_client_internal_test.go +++ b/probe/appclient/app_client_internal_test.go @@ -104,9 +104,9 @@ func TestAppClientPublish(t *testing.T) { defer p.Stop() // First few reports might be dropped as the client is spinning up. - rp := NewReportPublisher(p, false) for i := 0; i < 10; i++ { - if err := rp.Publish(rpt); err != nil { + buf, _ := rpt.WriteBinary() + if err := p.Publish(buf, false); err != nil { t.Error(err) } time.Sleep(10 * time.Millisecond) @@ -204,15 +204,14 @@ func TestStop(t *testing.T) { t.Fatal(err) } - rp := NewReportPublisher(p, false) - // Make sure the app received our report and is stuck for done := false; !done; { select { case <-receivedReport: done = true default: - if err := rp.Publish(rpt); err != nil { + buf, _ := rpt.WriteBinary() + if err := p.Publish(buf, false); err != nil { t.Error(err) } time.Sleep(10 * time.Millisecond) diff --git a/probe/appclient/multi_client.go b/probe/appclient/multi_client.go index c8187a5ed8..79100cd45e 100644 --- a/probe/appclient/multi_client.go +++ b/probe/appclient/multi_client.go @@ -4,8 +4,6 @@ import ( "bytes" "errors" "fmt" - "io" - "io/ioutil" "net/url" "strings" "sync" @@ -37,13 +35,6 @@ type clientTuple struct { AppClient } -// Publisher is something which can send a stream of data somewhere, probably -// to a remote collector. -type Publisher interface { - Publish(io.Reader, bool) error - Stop() -} - // MultiAppClient maintains a set of upstream apps, and ensures we have an // AppClient for each one. type MultiAppClient interface { @@ -51,7 +42,7 @@ type MultiAppClient interface { PipeConnection(appID, pipeID string, pipe xfer.Pipe) error PipeClose(appID, pipeID string) error Stop() - Publish(io.Reader, bool) error + Publish(r report.Report) error } // NewMultiAppClient creates a new MultiAppClient. @@ -165,25 +156,18 @@ func (c *multiClient) Stop() { // underlying publishers sequentially. To do that, it needs to drain the // reader, and recreate new readers for each publisher. Note that it will // publish to one endpoint for each unique ID. Failed publishes don't count. -func (c *multiClient) Publish(r io.Reader, shortcut bool) error { +func (c *multiClient) Publish(r report.Report) error { c.mtx.Lock() defer c.mtx.Unlock() - if len(c.clients) <= 1 { // optimisation - for _, c := range c.clients { - return c.Publish(r, shortcut) - } - return nil - } - - buf, err := ioutil.ReadAll(r) + buf, err := r.WriteBinary() if err != nil { return err } errs := []string{} for _, c := range c.clients { - if err := c.Publish(bytes.NewReader(buf), shortcut); err != nil { + if err := c.Publish(bytes.NewReader(buf.Bytes()), r.Shortcut); err != nil { errs = append(errs, err.Error()) } } diff --git a/probe/appclient/multi_client_test.go b/probe/appclient/multi_client_test.go index e1789a7100..770916557a 100644 --- a/probe/appclient/multi_client_test.go +++ b/probe/appclient/multi_client_test.go @@ -1,7 +1,6 @@ package appclient_test import ( - "bytes" "io" "net/url" "runtime" @@ -9,6 +8,7 @@ import ( "github.com/weaveworks/scope/common/xfer" "github.com/weaveworks/scope/probe/appclient" + "github.com/weaveworks/scope/report" ) type mockClient struct { @@ -105,8 +105,9 @@ func TestMultiClientPublish(t *testing.T) { mp.Set("a", []url.URL{{Host: "a1"}, {Host: "a2"}}) mp.Set("b", []url.URL{{Host: "b2"}, {Host: "b3"}}) + rpt := report.MakeReport() for i := 1; i < 10; i++ { - if err := mp.Publish(&bytes.Buffer{}, false); err != nil { + if err := mp.Publish(rpt); err != nil { t.Error(err) } if want, have := 3*i, sum(); want != have { diff --git a/probe/appclient/report_publisher.go b/probe/appclient/report_publisher.go deleted file mode 100644 index 2f7bdd0b82..0000000000 --- a/probe/appclient/report_publisher.go +++ /dev/null @@ -1,34 +0,0 @@ -package appclient - -import ( - "bytes" - "compress/gzip" - "github.com/weaveworks/scope/report" -) - -// A ReportPublisher uses a buffer pool to serialise reports, which it -// then passes to a publisher -type ReportPublisher struct { - publisher Publisher - noControls bool -} - -// NewReportPublisher creates a new report publisher -func NewReportPublisher(publisher Publisher, noControls bool) *ReportPublisher { - return &ReportPublisher{ - publisher: publisher, - noControls: noControls, - } -} - -// Publish serialises and compresses a report, then passes it to a publisher -func (p *ReportPublisher) Publish(r report.Report) error { - if p.noControls { - r.WalkTopologies(func(t *report.Topology) { - t.Controls = report.Controls{} - }) - } - buf := &bytes.Buffer{} - r.WriteBinary(buf, gzip.DefaultCompression) - return p.publisher.Publish(buf, r.Shortcut) -} diff --git a/probe/controls/pipes.go b/probe/controls/pipes.go index ec4198eeda..0f2779fab6 100644 --- a/probe/controls/pipes.go +++ b/probe/controls/pipes.go @@ -54,3 +54,12 @@ func (p *pipe) Close() error { } return err2 } + +// DummyPipeClient implements PipeClient when running the probe in debugging mode +type DummyPipeClient struct{} + +// PipeConnection implements controls.PipeClient +func (DummyPipeClient) PipeConnection(appID, pipeID string, pipe xfer.Pipe) error { return nil } + +// PipeClose implements controls.PipeClient +func (DummyPipeClient) PipeClose(appID, pipeID string) error { return nil } diff --git a/probe/probe.go b/probe/probe.go index a9139fc4d7..cb7286a8d1 100644 --- a/probe/probe.go +++ b/probe/probe.go @@ -7,7 +7,6 @@ import ( log "github.com/Sirupsen/logrus" "github.com/armon/go-metrics" - "github.com/weaveworks/scope/probe/appclient" "github.com/weaveworks/scope/report" ) @@ -15,10 +14,16 @@ const ( reportBufferSize = 16 ) +// ReportPublisher publishes reports, probably to a remote collector. +type ReportPublisher interface { + Publish(r report.Report) error +} + // Probe sits there, generating and publishing reports. type Probe struct { spyInterval, publishInterval time.Duration - publisher *appclient.ReportPublisher + publisher ReportPublisher + noControls bool tickers []Ticker reporters []Reporter @@ -67,13 +72,14 @@ type Ticker interface { // New makes a new Probe. func New( spyInterval, publishInterval time.Duration, - publisher appclient.Publisher, + publisher ReportPublisher, noControls bool, ) *Probe { result := &Probe{ spyInterval: spyInterval, publishInterval: publishInterval, - publisher: appclient.NewReportPublisher(publisher, noControls), + publisher: publisher, + noControls: noControls, quit: make(chan struct{}), spiedReports: make(chan report.Report, reportBufferSize), shortcutReports: make(chan report.Report, reportBufferSize), @@ -200,6 +206,11 @@ ForLoop: } } + if p.noControls { + rpt.WalkTopologies(func(t *report.Topology) { + t.Controls = report.Controls{} + }) + } if err := p.publisher.Publish(rpt); err != nil { log.Infof("publish: %v", err) } diff --git a/probe/probe_internal_test.go b/probe/probe_internal_test.go index af6404af62..adc5cd143f 100644 --- a/probe/probe_internal_test.go +++ b/probe/probe_internal_test.go @@ -1,12 +1,9 @@ package probe import ( - "compress/gzip" - "io" "testing" "time" - "github.com/ugorji/go/codec" "github.com/weaveworks/common/mtime" "github.com/weaveworks/scope/report" "github.com/weaveworks/scope/test" @@ -53,13 +50,7 @@ type mockPublisher struct { have chan report.Report } -func (m mockPublisher) Publish(in io.Reader, shortcut bool) error { - var r report.Report - if reader, err := gzip.NewReader(in); err != nil { - return err - } else if err := codec.NewDecoder(reader, &codec.MsgpackHandle{}).Decode(&r); err != nil { - return err - } +func (m mockPublisher) Publish(r report.Report) error { m.have <- r return nil } diff --git a/prog/main.go b/prog/main.go index 390171f1e5..f0776b65f7 100644 --- a/prog/main.go +++ b/prog/main.go @@ -93,6 +93,7 @@ type flags struct { } type probeFlags struct { + printOnStdout bool token string httpListen string publishInterval time.Duration @@ -274,6 +275,7 @@ func setupFlags(flags *flags) { flag.Bool("app-only", false, "Only run the app.") // Probe flags + flag.BoolVar(&flags.probe.printOnStdout, "probe.publish.stdout", false, "Print reports on stdout instead of sending to app, for debugging") flag.StringVar(&flags.probe.token, serviceTokenFlag, "", "Token to authenticate with cloud.weave.works") flag.StringVar(&flags.probe.token, probeTokenFlag, "", "Token to authenticate with cloud.weave.works") flag.StringVar(&flags.probe.httpListen, "probe.http.listen", "", "listen address for HTTP profiling and instrumentation server") diff --git a/prog/probe.go b/prog/probe.go index 24f32fde37..15418b7110 100644 --- a/prog/probe.go +++ b/prog/probe.go @@ -130,23 +130,63 @@ func probeMain(flags probeFlags, targets []appclient.Target) { xfer.ControlHandlerFunc(handlerRegistry.HandleControlRequest), ) } - clients := appclient.NewMultiAppClient(clientFactory, flags.noControls) - defer clients.Stop() - dnsLookupFn := net.LookupIP - if flags.resolver != "" { - dnsLookupFn = appclient.LookupUsing(flags.resolver) + var clients interface { + probe.ReportPublisher + controls.PipeClient } - resolver, err := appclient.NewResolver(appclient.ResolverConfig{ - Targets: targets, - Lookup: dnsLookupFn, - Set: clients.Set, - }) - if err != nil { - log.Fatalf("Failed to create resolver: %v", err) - return + if flags.printOnStdout { + if len(targets) > 0 { + log.Warnf("Dumping to stdout only: targets %v will be ignored", targets) + } + clients = new(struct { + report.StdoutPublisher + controls.DummyPipeClient + }) + } else { + multiClients := appclient.NewMultiAppClient(clientFactory, flags.noControls) + defer multiClients.Stop() + + dnsLookupFn := net.LookupIP + if flags.resolver != "" { + dnsLookupFn = appclient.LookupUsing(flags.resolver) + } + resolver, err := appclient.NewResolver(appclient.ResolverConfig{ + Targets: targets, + Lookup: dnsLookupFn, + Set: multiClients.Set, + }) + if err != nil { + log.Fatalf("Failed to create resolver: %v", err) + return + } + defer resolver.Stop() + + if flags.weaveEnabled && flags.weaveHostname != "" { + dockerBridgeIP, err := network.GetFirstAddressOf(flags.dockerBridge) + if err != nil { + log.Errorf("Error getting docker bridge ip: %v", err) + } else { + weaveDNSLookup := appclient.LookupUsing(dockerBridgeIP + ":53") + weaveTargets, err := appclient.ParseTargets([]string{flags.weaveHostname}) + if err != nil { + log.Errorf("Failed to parse weave targets: %v", err) + } else { + weaveResolver, err := appclient.NewResolver(appclient.ResolverConfig{ + Targets: weaveTargets, + Lookup: weaveDNSLookup, + Set: multiClients.Set, + }) + if err != nil { + log.Errorf("Failed to create weave resolver: %v", err) + } else { + defer weaveResolver.Stop() + } + } + } + } + clients = multiClients } - defer resolver.Stop() p := probe.New(flags.spyInterval, flags.publishInterval, clients, flags.noControls) @@ -241,30 +281,6 @@ func probeMain(flags probeFlags, targets []appclient.Target) { defer weave.Stop() p.AddTagger(weave) p.AddReporter(weave) - - if flags.weaveHostname != "" { - dockerBridgeIP, err := network.GetFirstAddressOf(flags.dockerBridge) - if err != nil { - log.Errorf("Error getting docker bridge ip: %v", err) - } else { - weaveDNSLookup := appclient.LookupUsing(dockerBridgeIP + ":53") - weaveTargets, err := appclient.ParseTargets([]string{flags.weaveHostname}) - if err != nil { - log.Errorf("Failed to parse weave targets: %v", err) - } else { - weaveResolver, err := appclient.NewResolver(appclient.ResolverConfig{ - Targets: weaveTargets, - Lookup: weaveDNSLookup, - Set: clients.Set, - }) - if err != nil { - log.Errorf("Failed to create weave resolver: %v", err) - } else { - defer weaveResolver.Stop() - } - } - } - } } } diff --git a/report/marshal.go b/report/marshal.go index f20f30a083..65921e3c5a 100644 --- a/report/marshal.go +++ b/report/marshal.go @@ -26,17 +26,28 @@ func (s *dummySelfer) CodecEncodeSelf(encoder *codec.Encoder) { panic("This shouldn't happen: perhaps something has gone wrong in code generation?") } -// WriteBinary writes a Report as a gzipped msgpack. -func (rep Report) WriteBinary(w io.Writer, compressionLevel int) error { - gzwriter, err := gzip.NewWriterLevel(w, compressionLevel) +// StdoutPublisher is useful when debugging +type StdoutPublisher struct{} + +// Publish implements probe.ReportPublisher +func (StdoutPublisher) Publish(rep Report) error { + handle := &codec.JsonHandle{Indent: 2} + handle.Canonical = true + return codec.NewEncoder(os.Stdout, handle).Encode(rep) +} + +// WriteBinary writes a Report as a gzipped msgpack into a bytes.Buffer +func (rep Report) WriteBinary() (*bytes.Buffer, error) { + w := &bytes.Buffer{} + gzwriter, err := gzip.NewWriterLevel(w, gzip.DefaultCompression) if err != nil { - return err + return nil, err } if err = codec.NewEncoder(gzwriter, &codec.MsgpackHandle{}).Encode(&rep); err != nil { - return err + return nil, err } gzwriter.Close() // otherwise the content won't get flushed to the output stream - return nil + return w, nil } type byteCounter struct { diff --git a/report/marshal_test.go b/report/marshal_test.go index 28615f1298..5fb2381ec0 100644 --- a/report/marshal_test.go +++ b/report/marshal_test.go @@ -1,8 +1,6 @@ package report_test import ( - "bytes" - "compress/gzip" "reflect" "testing" "time" @@ -13,11 +11,10 @@ import ( ) func TestRoundtrip(t *testing.T) { - var buf bytes.Buffer r1 := report.MakeReport() - r1.WriteBinary(&buf, gzip.DefaultCompression) + buf, _ := r1.WriteBinary() bytes := append([]byte{}, buf.Bytes()...) // copy the contents for later - r2, err := report.MakeFromBinary(&buf) + r2, err := report.MakeFromBinary(buf) if err != nil { t.Error(err) } @@ -74,10 +71,9 @@ func makeTestReport() report.Report { } func TestBiggerRoundtrip(t *testing.T) { - var buf bytes.Buffer r1 := makeTestReport() - r1.WriteBinary(&buf, gzip.BestCompression) - r2, err := report.MakeFromBinary(&buf) + buf, _ := r1.WriteBinary() + r2, err := report.MakeFromBinary(buf) if err != nil { t.Error(err) } @@ -85,29 +81,3 @@ func TestBiggerRoundtrip(t *testing.T) { t.Errorf("%v != %v", r1, *r2) } } - -func TestRoundtripNoCompression(t *testing.T) { - // Make sure that we can use our standard routines for decompressing - // something with '0' level compression. - var buf bytes.Buffer - r1 := report.MakeReport() - r1.WriteBinary(&buf, 0) - r2, err := report.MakeFromBinary(&buf) - if err != nil { - t.Error(err) - } - if !reflect.DeepEqual(r1, *r2) { - t.Errorf("%v != %v", r1, *r2) - } -} - -func TestMoreCompressionMeansSmaller(t *testing.T) { - // Make sure that 0 level compression actually does compress less. - var buf1, buf2 bytes.Buffer - r := report.MakeReport() - r.WriteBinary(&buf1, gzip.DefaultCompression) - r.WriteBinary(&buf2, 0) - if buf1.Len() >= buf2.Len() { - t.Errorf("Compression doesn't change size: %v >= %v", buf1.Len(), buf2.Len()) - } -}