From 96f51c47af0168526a7104b75807cf8cdc2714fa Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Thu, 31 May 2018 11:05:07 +0000 Subject: [PATCH 1/5] probe: Eliminate Publisher interface from app_client Simplification: everything now implements Publish(Report), and we do away with writing/reading/writing in the MultiAppClient. --- extras/fixprobe/main.go | 10 +++++-- probe/appclient/app_client_internal_test.go | 9 +++---- probe/appclient/multi_client.go | 24 +++-------------- probe/appclient/multi_client_test.go | 5 ++-- probe/appclient/report_publisher.go | 30 +++++++++++++-------- probe/probe.go | 4 +-- probe/probe_internal_test.go | 11 +------- 7 files changed, 41 insertions(+), 52 deletions(-) diff --git a/extras/fixprobe/main.go b/extras/fixprobe/main.go index 5b99925cbc..7dcc9a821c 100644 --- a/extras/fixprobe/main.go +++ b/extras/fixprobe/main.go @@ -2,6 +2,8 @@ package main import ( + "bytes" + "compress/gzip" "flag" "fmt" "io/ioutil" @@ -60,8 +62,12 @@ func main() { log.Fatal(err) } - rp := appclient.NewReportPublisher(client, false) + buf := &bytes.Buffer{} + err = fixedReport.WriteBinary(buf, gzip.DefaultCompression) + if err != nil { + log.Fatal(err) + } for range time.Tick(*publishInterval) { - rp.Publish(fixedReport) + client.Publish(bytes.NewReader(buf.Bytes()), fixedReport.Shortcut) } } diff --git a/probe/appclient/app_client_internal_test.go b/probe/appclient/app_client_internal_test.go index 66bcae51d7..210878b12f 100644 --- a/probe/appclient/app_client_internal_test.go +++ b/probe/appclient/app_client_internal_test.go @@ -104,9 +104,9 @@ func TestAppClientPublish(t *testing.T) { defer p.Stop() // First few reports might be dropped as the client is spinning up. - rp := NewReportPublisher(p, false) for i := 0; i < 10; i++ { - if err := rp.Publish(rpt); err != nil { + buf, _ := serializeReport(rpt) + if err := p.Publish(buf, false); err != nil { t.Error(err) } time.Sleep(10 * time.Millisecond) @@ -204,15 +204,14 @@ func TestStop(t *testing.T) { t.Fatal(err) } - rp := NewReportPublisher(p, false) - // Make sure the app received our report and is stuck for done := false; !done; { select { case <-receivedReport: done = true default: - if err := rp.Publish(rpt); err != nil { + buf, _ := serializeReport(rpt) + if err := p.Publish(buf, false); err != nil { t.Error(err) } time.Sleep(10 * time.Millisecond) diff --git a/probe/appclient/multi_client.go b/probe/appclient/multi_client.go index c8187a5ed8..e93b33674b 100644 --- a/probe/appclient/multi_client.go +++ b/probe/appclient/multi_client.go @@ -4,8 +4,6 @@ import ( "bytes" "errors" "fmt" - "io" - "io/ioutil" "net/url" "strings" "sync" @@ -37,13 +35,6 @@ type clientTuple struct { AppClient } -// Publisher is something which can send a stream of data somewhere, probably -// to a remote collector. -type Publisher interface { - Publish(io.Reader, bool) error - Stop() -} - // MultiAppClient maintains a set of upstream apps, and ensures we have an // AppClient for each one. type MultiAppClient interface { @@ -51,7 +42,7 @@ type MultiAppClient interface { PipeConnection(appID, pipeID string, pipe xfer.Pipe) error PipeClose(appID, pipeID string) error Stop() - Publish(io.Reader, bool) error + ReportPublisher } // NewMultiAppClient creates a new MultiAppClient. @@ -165,25 +156,18 @@ func (c *multiClient) Stop() { // underlying publishers sequentially. To do that, it needs to drain the // reader, and recreate new readers for each publisher. Note that it will // publish to one endpoint for each unique ID. Failed publishes don't count. -func (c *multiClient) Publish(r io.Reader, shortcut bool) error { +func (c *multiClient) Publish(r report.Report) error { c.mtx.Lock() defer c.mtx.Unlock() - if len(c.clients) <= 1 { // optimisation - for _, c := range c.clients { - return c.Publish(r, shortcut) - } - return nil - } - - buf, err := ioutil.ReadAll(r) + buf, err := serializeReport(r) if err != nil { return err } errs := []string{} for _, c := range c.clients { - if err := c.Publish(bytes.NewReader(buf), shortcut); err != nil { + if err := c.Publish(bytes.NewReader(buf.Bytes()), r.Shortcut); err != nil { errs = append(errs, err.Error()) } } diff --git a/probe/appclient/multi_client_test.go b/probe/appclient/multi_client_test.go index e1789a7100..770916557a 100644 --- a/probe/appclient/multi_client_test.go +++ b/probe/appclient/multi_client_test.go @@ -1,7 +1,6 @@ package appclient_test import ( - "bytes" "io" "net/url" "runtime" @@ -9,6 +8,7 @@ import ( "github.com/weaveworks/scope/common/xfer" "github.com/weaveworks/scope/probe/appclient" + "github.com/weaveworks/scope/report" ) type mockClient struct { @@ -105,8 +105,9 @@ func TestMultiClientPublish(t *testing.T) { mp.Set("a", []url.URL{{Host: "a1"}, {Host: "a2"}}) mp.Set("b", []url.URL{{Host: "b2"}, {Host: "b3"}}) + rpt := report.MakeReport() for i := 1; i < 10; i++ { - if err := mp.Publish(&bytes.Buffer{}, false); err != nil { + if err := mp.Publish(rpt); err != nil { t.Error(err) } if want, have := 3*i, sum(); want != have { diff --git a/probe/appclient/report_publisher.go b/probe/appclient/report_publisher.go index 2f7bdd0b82..8d373f7106 100644 --- a/probe/appclient/report_publisher.go +++ b/probe/appclient/report_publisher.go @@ -3,32 +3,40 @@ package appclient import ( "bytes" "compress/gzip" + "github.com/weaveworks/scope/report" ) -// A ReportPublisher uses a buffer pool to serialise reports, which it -// then passes to a publisher -type ReportPublisher struct { - publisher Publisher +// ReportPublisher publishes reports, probably to a remote collector. +type ReportPublisher interface { + Publish(r report.Report) error +} + +type reportPublisher struct { + publisher ReportPublisher noControls bool } // NewReportPublisher creates a new report publisher -func NewReportPublisher(publisher Publisher, noControls bool) *ReportPublisher { - return &ReportPublisher{ +func NewReportPublisher(publisher ReportPublisher, noControls bool) ReportPublisher { + return &reportPublisher{ publisher: publisher, noControls: noControls, } } -// Publish serialises and compresses a report, then passes it to a publisher -func (p *ReportPublisher) Publish(r report.Report) error { +func serializeReport(r report.Report) (*bytes.Buffer, error) { + buf := &bytes.Buffer{} + err := r.WriteBinary(buf, gzip.DefaultCompression) + return buf, err +} + +// Publish sanitises a report, then passes it to a publisher +func (p *reportPublisher) Publish(r report.Report) error { if p.noControls { r.WalkTopologies(func(t *report.Topology) { t.Controls = report.Controls{} }) } - buf := &bytes.Buffer{} - r.WriteBinary(buf, gzip.DefaultCompression) - return p.publisher.Publish(buf, r.Shortcut) + return p.publisher.Publish(r) } diff --git a/probe/probe.go b/probe/probe.go index a9139fc4d7..d9c07bd9ab 100644 --- a/probe/probe.go +++ b/probe/probe.go @@ -18,7 +18,7 @@ const ( // Probe sits there, generating and publishing reports. type Probe struct { spyInterval, publishInterval time.Duration - publisher *appclient.ReportPublisher + publisher appclient.ReportPublisher tickers []Ticker reporters []Reporter @@ -67,7 +67,7 @@ type Ticker interface { // New makes a new Probe. func New( spyInterval, publishInterval time.Duration, - publisher appclient.Publisher, + publisher appclient.ReportPublisher, noControls bool, ) *Probe { result := &Probe{ diff --git a/probe/probe_internal_test.go b/probe/probe_internal_test.go index af6404af62..adc5cd143f 100644 --- a/probe/probe_internal_test.go +++ b/probe/probe_internal_test.go @@ -1,12 +1,9 @@ package probe import ( - "compress/gzip" - "io" "testing" "time" - "github.com/ugorji/go/codec" "github.com/weaveworks/common/mtime" "github.com/weaveworks/scope/report" "github.com/weaveworks/scope/test" @@ -53,13 +50,7 @@ type mockPublisher struct { have chan report.Report } -func (m mockPublisher) Publish(in io.Reader, shortcut bool) error { - var r report.Report - if reader, err := gzip.NewReader(in); err != nil { - return err - } else if err := codec.NewDecoder(reader, &codec.MsgpackHandle{}).Decode(&r); err != nil { - return err - } +func (m mockPublisher) Publish(r report.Report) error { m.have <- r return nil } From 56137211b58fbb279d5489e950095c77639207bc Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Thu, 31 May 2018 11:20:03 +0000 Subject: [PATCH 2/5] probe: Eliminate appclient.reportPublisher Simplification: move the 'noControls' functionality into the probe, as we don't need a whole struct to do that. The ReportPublisher interface also moves into probe where it belongs: "the consumer should define the interface" - Dave Cheney --- probe/appclient/multi_client.go | 9 ++++++- probe/appclient/report_publisher.go | 42 ----------------------------- probe/probe.go | 19 ++++++++++--- 3 files changed, 23 insertions(+), 47 deletions(-) delete mode 100644 probe/appclient/report_publisher.go diff --git a/probe/appclient/multi_client.go b/probe/appclient/multi_client.go index e93b33674b..668d234d1c 100644 --- a/probe/appclient/multi_client.go +++ b/probe/appclient/multi_client.go @@ -2,6 +2,7 @@ package appclient import ( "bytes" + "compress/gzip" "errors" "fmt" "net/url" @@ -42,7 +43,7 @@ type MultiAppClient interface { PipeConnection(appID, pipeID string, pipe xfer.Pipe) error PipeClose(appID, pipeID string) error Stop() - ReportPublisher + Publish(r report.Report) error } // NewMultiAppClient creates a new MultiAppClient. @@ -155,6 +156,12 @@ func (c *multiClient) Stop() { // Publish implements Publisher by publishing the reader to all of the // underlying publishers sequentially. To do that, it needs to drain the // reader, and recreate new readers for each publisher. Note that it will +func serializeReport(r report.Report) (*bytes.Buffer, error) { + buf := &bytes.Buffer{} + err := r.WriteBinary(buf, gzip.DefaultCompression) + return buf, err +} + // publish to one endpoint for each unique ID. Failed publishes don't count. func (c *multiClient) Publish(r report.Report) error { c.mtx.Lock() diff --git a/probe/appclient/report_publisher.go b/probe/appclient/report_publisher.go deleted file mode 100644 index 8d373f7106..0000000000 --- a/probe/appclient/report_publisher.go +++ /dev/null @@ -1,42 +0,0 @@ -package appclient - -import ( - "bytes" - "compress/gzip" - - "github.com/weaveworks/scope/report" -) - -// ReportPublisher publishes reports, probably to a remote collector. -type ReportPublisher interface { - Publish(r report.Report) error -} - -type reportPublisher struct { - publisher ReportPublisher - noControls bool -} - -// NewReportPublisher creates a new report publisher -func NewReportPublisher(publisher ReportPublisher, noControls bool) ReportPublisher { - return &reportPublisher{ - publisher: publisher, - noControls: noControls, - } -} - -func serializeReport(r report.Report) (*bytes.Buffer, error) { - buf := &bytes.Buffer{} - err := r.WriteBinary(buf, gzip.DefaultCompression) - return buf, err -} - -// Publish sanitises a report, then passes it to a publisher -func (p *reportPublisher) Publish(r report.Report) error { - if p.noControls { - r.WalkTopologies(func(t *report.Topology) { - t.Controls = report.Controls{} - }) - } - return p.publisher.Publish(r) -} diff --git a/probe/probe.go b/probe/probe.go index d9c07bd9ab..cb7286a8d1 100644 --- a/probe/probe.go +++ b/probe/probe.go @@ -7,7 +7,6 @@ import ( log "github.com/Sirupsen/logrus" "github.com/armon/go-metrics" - "github.com/weaveworks/scope/probe/appclient" "github.com/weaveworks/scope/report" ) @@ -15,10 +14,16 @@ const ( reportBufferSize = 16 ) +// ReportPublisher publishes reports, probably to a remote collector. +type ReportPublisher interface { + Publish(r report.Report) error +} + // Probe sits there, generating and publishing reports. type Probe struct { spyInterval, publishInterval time.Duration - publisher appclient.ReportPublisher + publisher ReportPublisher + noControls bool tickers []Ticker reporters []Reporter @@ -67,13 +72,14 @@ type Ticker interface { // New makes a new Probe. func New( spyInterval, publishInterval time.Duration, - publisher appclient.ReportPublisher, + publisher ReportPublisher, noControls bool, ) *Probe { result := &Probe{ spyInterval: spyInterval, publishInterval: publishInterval, - publisher: appclient.NewReportPublisher(publisher, noControls), + publisher: publisher, + noControls: noControls, quit: make(chan struct{}), spiedReports: make(chan report.Report, reportBufferSize), shortcutReports: make(chan report.Report, reportBufferSize), @@ -200,6 +206,11 @@ ForLoop: } } + if p.noControls { + rpt.WalkTopologies(func(t *report.Topology) { + t.Controls = report.Controls{} + }) + } if err := p.publisher.Publish(rpt); err != nil { log.Infof("publish: %v", err) } From 06c895267c85b9017940dd8b17c08e67e8f3daf9 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Fri, 1 Jun 2018 19:57:59 +0000 Subject: [PATCH 3/5] Reports: streamline report serialization Move the creation of the buffer and the choice of compression level (which never changes) into WriteBinary(), to simplify the code. --- app/router.go | 9 +++-- extras/fixprobe/main.go | 4 +-- probe/appclient/app_client_internal_test.go | 4 +-- probe/appclient/multi_client.go | 9 +---- report/marshal.go | 13 +++---- report/marshal_test.go | 38 +++------------------ 6 files changed, 19 insertions(+), 58 deletions(-) diff --git a/app/router.go b/app/router.go index 414f45f99f..e9efa01568 100644 --- a/app/router.go +++ b/app/router.go @@ -121,13 +121,13 @@ func RegisterReportPostHandler(a Adder, router *mux.Router) { post.HandleFunc("/api/report", requestContextDecorator(func(ctx context.Context, w http.ResponseWriter, r *http.Request) { var ( rpt report.Report - buf bytes.Buffer - reader = io.TeeReader(r.Body, &buf) + buf = &bytes.Buffer{} + reader = io.TeeReader(r.Body, buf) ) gzipped := strings.Contains(r.Header.Get("Content-Encoding"), "gzip") if !gzipped { - reader = io.TeeReader(r.Body, gzip.NewWriter(&buf)) + reader = io.TeeReader(r.Body, gzip.NewWriter(buf)) } contentType := r.Header.Get("Content-Type") @@ -150,8 +150,7 @@ func RegisterReportPostHandler(a Adder, router *mux.Router) { // a.Add(..., buf) assumes buf is gzip'd msgpack if !isMsgpack { - buf = bytes.Buffer{} - rpt.WriteBinary(&buf, gzip.DefaultCompression) + buf, _ = rpt.WriteBinary() } if err := a.Add(ctx, rpt, buf.Bytes()); err != nil { diff --git a/extras/fixprobe/main.go b/extras/fixprobe/main.go index 7dcc9a821c..12b80cc708 100644 --- a/extras/fixprobe/main.go +++ b/extras/fixprobe/main.go @@ -3,7 +3,6 @@ package main import ( "bytes" - "compress/gzip" "flag" "fmt" "io/ioutil" @@ -62,8 +61,7 @@ func main() { log.Fatal(err) } - buf := &bytes.Buffer{} - err = fixedReport.WriteBinary(buf, gzip.DefaultCompression) + buf, err := fixedReport.WriteBinary() if err != nil { log.Fatal(err) } diff --git a/probe/appclient/app_client_internal_test.go b/probe/appclient/app_client_internal_test.go index 210878b12f..15f4b636d0 100644 --- a/probe/appclient/app_client_internal_test.go +++ b/probe/appclient/app_client_internal_test.go @@ -105,7 +105,7 @@ func TestAppClientPublish(t *testing.T) { // First few reports might be dropped as the client is spinning up. for i := 0; i < 10; i++ { - buf, _ := serializeReport(rpt) + buf, _ := rpt.WriteBinary() if err := p.Publish(buf, false); err != nil { t.Error(err) } @@ -210,7 +210,7 @@ func TestStop(t *testing.T) { case <-receivedReport: done = true default: - buf, _ := serializeReport(rpt) + buf, _ := rpt.WriteBinary() if err := p.Publish(buf, false); err != nil { t.Error(err) } diff --git a/probe/appclient/multi_client.go b/probe/appclient/multi_client.go index 668d234d1c..79100cd45e 100644 --- a/probe/appclient/multi_client.go +++ b/probe/appclient/multi_client.go @@ -2,7 +2,6 @@ package appclient import ( "bytes" - "compress/gzip" "errors" "fmt" "net/url" @@ -156,18 +155,12 @@ func (c *multiClient) Stop() { // Publish implements Publisher by publishing the reader to all of the // underlying publishers sequentially. To do that, it needs to drain the // reader, and recreate new readers for each publisher. Note that it will -func serializeReport(r report.Report) (*bytes.Buffer, error) { - buf := &bytes.Buffer{} - err := r.WriteBinary(buf, gzip.DefaultCompression) - return buf, err -} - // publish to one endpoint for each unique ID. Failed publishes don't count. func (c *multiClient) Publish(r report.Report) error { c.mtx.Lock() defer c.mtx.Unlock() - buf, err := serializeReport(r) + buf, err := r.WriteBinary() if err != nil { return err } diff --git a/report/marshal.go b/report/marshal.go index f20f30a083..255f5a6fd0 100644 --- a/report/marshal.go +++ b/report/marshal.go @@ -26,17 +26,18 @@ func (s *dummySelfer) CodecEncodeSelf(encoder *codec.Encoder) { panic("This shouldn't happen: perhaps something has gone wrong in code generation?") } -// WriteBinary writes a Report as a gzipped msgpack. -func (rep Report) WriteBinary(w io.Writer, compressionLevel int) error { - gzwriter, err := gzip.NewWriterLevel(w, compressionLevel) +// WriteBinary writes a Report as a gzipped msgpack into a bytes.Buffer +func (rep Report) WriteBinary() (*bytes.Buffer, error) { + w := &bytes.Buffer{} + gzwriter, err := gzip.NewWriterLevel(w, gzip.DefaultCompression) if err != nil { - return err + return nil, err } if err = codec.NewEncoder(gzwriter, &codec.MsgpackHandle{}).Encode(&rep); err != nil { - return err + return nil, err } gzwriter.Close() // otherwise the content won't get flushed to the output stream - return nil + return w, nil } type byteCounter struct { diff --git a/report/marshal_test.go b/report/marshal_test.go index 28615f1298..5fb2381ec0 100644 --- a/report/marshal_test.go +++ b/report/marshal_test.go @@ -1,8 +1,6 @@ package report_test import ( - "bytes" - "compress/gzip" "reflect" "testing" "time" @@ -13,11 +11,10 @@ import ( ) func TestRoundtrip(t *testing.T) { - var buf bytes.Buffer r1 := report.MakeReport() - r1.WriteBinary(&buf, gzip.DefaultCompression) + buf, _ := r1.WriteBinary() bytes := append([]byte{}, buf.Bytes()...) // copy the contents for later - r2, err := report.MakeFromBinary(&buf) + r2, err := report.MakeFromBinary(buf) if err != nil { t.Error(err) } @@ -74,10 +71,9 @@ func makeTestReport() report.Report { } func TestBiggerRoundtrip(t *testing.T) { - var buf bytes.Buffer r1 := makeTestReport() - r1.WriteBinary(&buf, gzip.BestCompression) - r2, err := report.MakeFromBinary(&buf) + buf, _ := r1.WriteBinary() + r2, err := report.MakeFromBinary(buf) if err != nil { t.Error(err) } @@ -85,29 +81,3 @@ func TestBiggerRoundtrip(t *testing.T) { t.Errorf("%v != %v", r1, *r2) } } - -func TestRoundtripNoCompression(t *testing.T) { - // Make sure that we can use our standard routines for decompressing - // something with '0' level compression. - var buf bytes.Buffer - r1 := report.MakeReport() - r1.WriteBinary(&buf, 0) - r2, err := report.MakeFromBinary(&buf) - if err != nil { - t.Error(err) - } - if !reflect.DeepEqual(r1, *r2) { - t.Errorf("%v != %v", r1, *r2) - } -} - -func TestMoreCompressionMeansSmaller(t *testing.T) { - // Make sure that 0 level compression actually does compress less. - var buf1, buf2 bytes.Buffer - r := report.MakeReport() - r.WriteBinary(&buf1, gzip.DefaultCompression) - r.WriteBinary(&buf2, 0) - if buf1.Len() >= buf2.Len() { - t.Errorf("Compression doesn't change size: %v >= %v", buf1.Len(), buf2.Len()) - } -} From 31490e4dcb72f965403d72f289cd73c7042b6bdd Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Fri, 1 Jun 2018 19:17:18 +0000 Subject: [PATCH 4/5] probe: Refactor: Move Weave resolver up to where other apps are added This makes the next code change easier to see. --- prog/probe.go | 48 ++++++++++++++++++++++++------------------------ 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/prog/probe.go b/prog/probe.go index 24f32fde37..e4275aa7db 100644 --- a/prog/probe.go +++ b/prog/probe.go @@ -148,6 +148,30 @@ func probeMain(flags probeFlags, targets []appclient.Target) { } defer resolver.Stop() + if flags.weaveEnabled && flags.weaveHostname != "" { + dockerBridgeIP, err := network.GetFirstAddressOf(flags.dockerBridge) + if err != nil { + log.Errorf("Error getting docker bridge ip: %v", err) + } else { + weaveDNSLookup := appclient.LookupUsing(dockerBridgeIP + ":53") + weaveTargets, err := appclient.ParseTargets([]string{flags.weaveHostname}) + if err != nil { + log.Errorf("Failed to parse weave targets: %v", err) + } else { + weaveResolver, err := appclient.NewResolver(appclient.ResolverConfig{ + Targets: weaveTargets, + Lookup: weaveDNSLookup, + Set: clients.Set, + }) + if err != nil { + log.Errorf("Failed to create weave resolver: %v", err) + } else { + defer weaveResolver.Stop() + } + } + } + } + p := probe.New(flags.spyInterval, flags.publishInterval, clients, flags.noControls) hostReporter := host.NewReporter(hostID, hostName, probeID, version, clients, handlerRegistry) @@ -241,30 +265,6 @@ func probeMain(flags probeFlags, targets []appclient.Target) { defer weave.Stop() p.AddTagger(weave) p.AddReporter(weave) - - if flags.weaveHostname != "" { - dockerBridgeIP, err := network.GetFirstAddressOf(flags.dockerBridge) - if err != nil { - log.Errorf("Error getting docker bridge ip: %v", err) - } else { - weaveDNSLookup := appclient.LookupUsing(dockerBridgeIP + ":53") - weaveTargets, err := appclient.ParseTargets([]string{flags.weaveHostname}) - if err != nil { - log.Errorf("Failed to parse weave targets: %v", err) - } else { - weaveResolver, err := appclient.NewResolver(appclient.ResolverConfig{ - Targets: weaveTargets, - Lookup: weaveDNSLookup, - Set: clients.Set, - }) - if err != nil { - log.Errorf("Failed to create weave resolver: %v", err) - } else { - defer weaveResolver.Stop() - } - } - } - } } } From 80dbd3443c544b63629567231c573746b733bd1d Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Thu, 31 May 2018 11:31:36 +0000 Subject: [PATCH 5/5] probe: Add -probe.publish.stdout option for debugging This option gives a crude way to view the raw probe data as json in the container logs, so that you can check exactly what it would have sent. We stub out the PipeClient interface with a dummy implementation in this mode. --- probe/controls/pipes.go | 9 +++++ prog/main.go | 2 ++ prog/probe.go | 74 +++++++++++++++++++++++++---------------- report/marshal.go | 10 ++++++ 4 files changed, 66 insertions(+), 29 deletions(-) diff --git a/probe/controls/pipes.go b/probe/controls/pipes.go index ec4198eeda..0f2779fab6 100644 --- a/probe/controls/pipes.go +++ b/probe/controls/pipes.go @@ -54,3 +54,12 @@ func (p *pipe) Close() error { } return err2 } + +// DummyPipeClient implements PipeClient when running the probe in debugging mode +type DummyPipeClient struct{} + +// PipeConnection implements controls.PipeClient +func (DummyPipeClient) PipeConnection(appID, pipeID string, pipe xfer.Pipe) error { return nil } + +// PipeClose implements controls.PipeClient +func (DummyPipeClient) PipeClose(appID, pipeID string) error { return nil } diff --git a/prog/main.go b/prog/main.go index 390171f1e5..f0776b65f7 100644 --- a/prog/main.go +++ b/prog/main.go @@ -93,6 +93,7 @@ type flags struct { } type probeFlags struct { + printOnStdout bool token string httpListen string publishInterval time.Duration @@ -274,6 +275,7 @@ func setupFlags(flags *flags) { flag.Bool("app-only", false, "Only run the app.") // Probe flags + flag.BoolVar(&flags.probe.printOnStdout, "probe.publish.stdout", false, "Print reports on stdout instead of sending to app, for debugging") flag.StringVar(&flags.probe.token, serviceTokenFlag, "", "Token to authenticate with cloud.weave.works") flag.StringVar(&flags.probe.token, probeTokenFlag, "", "Token to authenticate with cloud.weave.works") flag.StringVar(&flags.probe.httpListen, "probe.http.listen", "", "listen address for HTTP profiling and instrumentation server") diff --git a/prog/probe.go b/prog/probe.go index e4275aa7db..15418b7110 100644 --- a/prog/probe.go +++ b/prog/probe.go @@ -130,46 +130,62 @@ func probeMain(flags probeFlags, targets []appclient.Target) { xfer.ControlHandlerFunc(handlerRegistry.HandleControlRequest), ) } - clients := appclient.NewMultiAppClient(clientFactory, flags.noControls) - defer clients.Stop() - dnsLookupFn := net.LookupIP - if flags.resolver != "" { - dnsLookupFn = appclient.LookupUsing(flags.resolver) + var clients interface { + probe.ReportPublisher + controls.PipeClient } - resolver, err := appclient.NewResolver(appclient.ResolverConfig{ - Targets: targets, - Lookup: dnsLookupFn, - Set: clients.Set, - }) - if err != nil { - log.Fatalf("Failed to create resolver: %v", err) - return - } - defer resolver.Stop() + if flags.printOnStdout { + if len(targets) > 0 { + log.Warnf("Dumping to stdout only: targets %v will be ignored", targets) + } + clients = new(struct { + report.StdoutPublisher + controls.DummyPipeClient + }) + } else { + multiClients := appclient.NewMultiAppClient(clientFactory, flags.noControls) + defer multiClients.Stop() - if flags.weaveEnabled && flags.weaveHostname != "" { - dockerBridgeIP, err := network.GetFirstAddressOf(flags.dockerBridge) + dnsLookupFn := net.LookupIP + if flags.resolver != "" { + dnsLookupFn = appclient.LookupUsing(flags.resolver) + } + resolver, err := appclient.NewResolver(appclient.ResolverConfig{ + Targets: targets, + Lookup: dnsLookupFn, + Set: multiClients.Set, + }) if err != nil { - log.Errorf("Error getting docker bridge ip: %v", err) - } else { - weaveDNSLookup := appclient.LookupUsing(dockerBridgeIP + ":53") - weaveTargets, err := appclient.ParseTargets([]string{flags.weaveHostname}) + log.Fatalf("Failed to create resolver: %v", err) + return + } + defer resolver.Stop() + + if flags.weaveEnabled && flags.weaveHostname != "" { + dockerBridgeIP, err := network.GetFirstAddressOf(flags.dockerBridge) if err != nil { - log.Errorf("Failed to parse weave targets: %v", err) + log.Errorf("Error getting docker bridge ip: %v", err) } else { - weaveResolver, err := appclient.NewResolver(appclient.ResolverConfig{ - Targets: weaveTargets, - Lookup: weaveDNSLookup, - Set: clients.Set, - }) + weaveDNSLookup := appclient.LookupUsing(dockerBridgeIP + ":53") + weaveTargets, err := appclient.ParseTargets([]string{flags.weaveHostname}) if err != nil { - log.Errorf("Failed to create weave resolver: %v", err) + log.Errorf("Failed to parse weave targets: %v", err) } else { - defer weaveResolver.Stop() + weaveResolver, err := appclient.NewResolver(appclient.ResolverConfig{ + Targets: weaveTargets, + Lookup: weaveDNSLookup, + Set: multiClients.Set, + }) + if err != nil { + log.Errorf("Failed to create weave resolver: %v", err) + } else { + defer weaveResolver.Stop() + } } } } + clients = multiClients } p := probe.New(flags.spyInterval, flags.publishInterval, clients, flags.noControls) diff --git a/report/marshal.go b/report/marshal.go index 255f5a6fd0..65921e3c5a 100644 --- a/report/marshal.go +++ b/report/marshal.go @@ -26,6 +26,16 @@ func (s *dummySelfer) CodecEncodeSelf(encoder *codec.Encoder) { panic("This shouldn't happen: perhaps something has gone wrong in code generation?") } +// StdoutPublisher is useful when debugging +type StdoutPublisher struct{} + +// Publish implements probe.ReportPublisher +func (StdoutPublisher) Publish(rep Report) error { + handle := &codec.JsonHandle{Indent: 2} + handle.Canonical = true + return codec.NewEncoder(os.Stdout, handle).Encode(rep) +} + // WriteBinary writes a Report as a gzipped msgpack into a bytes.Buffer func (rep Report) WriteBinary() (*bytes.Buffer, error) { w := &bytes.Buffer{}