From 13269e81105fd4d18d298b6d10698e23ecade921 Mon Sep 17 00:00:00 2001 From: Jonathan Lange Date: Thu, 16 Jun 2016 18:17:27 +0100 Subject: [PATCH 1/8] Helper for reading & writing from binary --- app/multitenant/dynamo_collector.go | 21 ++------------- probe/appclient/report_publisher.go | 10 +------ report/marshal.go | 42 +++++++++++++++++++++++++++++ 3 files changed, 45 insertions(+), 28 deletions(-) create mode 100644 report/marshal.go diff --git a/app/multitenant/dynamo_collector.go b/app/multitenant/dynamo_collector.go index c8cec91688..0d5181d14f 100644 --- a/app/multitenant/dynamo_collector.go +++ b/app/multitenant/dynamo_collector.go @@ -2,7 +2,6 @@ package multitenant import ( "bytes" - "compress/gzip" "crypto/md5" "fmt" "io" @@ -18,7 +17,6 @@ import ( "github.com/bluele/gcache" "github.com/nats-io/nats" "github.com/prometheus/client_golang/prometheus" - "github.com/ugorji/go/codec" "golang.org/x/net/context" "github.com/weaveworks/scope/app" @@ -312,15 +310,7 @@ func (c *dynamoDBCollector) getNonCachedReport(reportKey string) (*report.Report if err != nil { return nil, err } - reader, err := gzip.NewReader(resp.Body) - if err != nil { - return nil, err - } - rep := report.MakeReport() - if err := codec.NewDecoder(reader, &codec.MsgpackHandle{}).Decode(&rep); err != nil { - return nil, err - } - return &rep, nil + return report.MakeFromBinary(resp.Body) } func (c *dynamoDBCollector) getReports(userid string, row int64, start, end time.Time) ([]report.Report, error) { @@ -387,14 +377,7 @@ func (c *dynamoDBCollector) Add(ctx context.Context, rep report.Report) error { // first, encode the report into a buffer and record its size var buf bytes.Buffer - writer, err := gzip.NewWriterLevel(&buf, gzip.BestCompression) - if err != nil { - return err - } - if err := codec.NewEncoder(writer, &codec.MsgpackHandle{}).Encode(&rep); err != nil { - return err - } - writer.Close() + rep.WriteBinary(&buf) reportSize.Add(float64(buf.Len())) // second, put the report on s3 diff --git a/probe/appclient/report_publisher.go b/probe/appclient/report_publisher.go index 5e713ebb8d..2e21341473 100644 --- a/probe/appclient/report_publisher.go +++ b/probe/appclient/report_publisher.go @@ -2,9 +2,6 @@ package appclient import ( "bytes" - "compress/gzip" - "github.com/ugorji/go/codec" - "github.com/weaveworks/scope/report" ) @@ -24,11 +21,6 @@ func NewReportPublisher(publisher Publisher) *ReportPublisher { // Publish serialises and compresses a report, then passes it to a publisher func (p *ReportPublisher) Publish(r report.Report) error { buf := &bytes.Buffer{} - gzwriter := gzip.NewWriter(buf) - if err := codec.NewEncoder(gzwriter, &codec.MsgpackHandle{}).Encode(r); err != nil { - return err - } - gzwriter.Close() // otherwise the content won't get flushed to the output stream - + r.WriteBinary(buf) return p.publisher.Publish(buf) } diff --git a/report/marshal.go b/report/marshal.go new file mode 100644 index 0000000000..39fa7704d3 --- /dev/null +++ b/report/marshal.go @@ -0,0 +1,42 @@ +package report + +import ( + "compress/gzip" + "io" + + "github.com/ugorji/go/codec" +) + +// WriteBinary writes a Report as a gzipped msgpack. +func (rep Report) WriteBinary(w io.Writer) error { + gzwriter, err := gzip.NewWriterLevel(w, gzip.BestCompression) + if err != nil { + return err + } + if err = codec.NewEncoder(gzwriter, &codec.MsgpackHandle{}).Encode(&rep); err != nil { + return err + } + gzwriter.Close() // otherwise the content won't get flushed to the output stream + return nil +} + +// ReadBinary reads into a Report from a gzipped msgpack. +func (rep *Report) ReadBinary(r io.Reader) error { + reader, err := gzip.NewReader(r) + if err != nil { + return err + } + if err := codec.NewDecoder(reader, &codec.MsgpackHandle{}).Decode(&rep); err != nil { + return err + } + return nil +} + +// MakeFromBinary constructs a Report from a gzipped msgpack. +func MakeFromBinary(r io.Reader) (*Report, error) { + rep := MakeReport() + if err := rep.ReadBinary(r); err != nil { + return nil, err + } + return &rep, nil +} From 81b05a33eea8dfe228d7c205ab3e6dad838c56ef Mon Sep 17 00:00:00 2001 From: Jonathan Lange Date: Mon, 20 Jun 2016 18:02:23 +0100 Subject: [PATCH 2/8] Make ReadBinary more general and re-use in router --- app/router.go | 37 +++++++------------------------------ report/marshal.go | 23 +++++++++++++++++------ 2 files changed, 24 insertions(+), 36 deletions(-) diff --git a/app/router.go b/app/router.go index fce7c7311d..f818a52ad2 100644 --- a/app/router.go +++ b/app/router.go @@ -1,8 +1,6 @@ package app import ( - "compress/gzip" - "encoding/gob" "io" "net/http" "net/url" @@ -122,43 +120,22 @@ func RegisterReportPostHandler(a Adder, router *mux.Router) { post := router.Methods("POST").Subrouter() post.HandleFunc("/api/report", requestContextDecorator(func(ctx context.Context, w http.ResponseWriter, r *http.Request) { var ( - rpt report.Report - reader = r.Body - err error - compressedSize, uncompressedSize uint64 + rpt report.Report + reader = r.Body ) - if log.GetLevel() == log.DebugLevel { - reader = byteCounter{next: reader, count: &compressedSize} - } - if strings.Contains(r.Header.Get("Content-Encoding"), "gzip") { - reader, err = gzip.NewReader(reader) - if err != nil { - respondWith(w, http.StatusBadRequest, err) - return - } - } - - if log.GetLevel() == log.DebugLevel { - reader = byteCounter{next: reader, count: &uncompressedSize} - } - decoder := gob.NewDecoder(reader).Decode + gzipped := strings.Contains(r.Header.Get("Content-Encoding"), "gzip") + var handle codec.Handle if strings.HasPrefix(r.Header.Get("Content-Type"), "application/json") { - decoder = codec.NewDecoder(reader, &codec.JsonHandle{}).Decode + handle = &codec.JsonHandle{} } else if strings.HasPrefix(r.Header.Get("Content-Type"), "application/msgpack") { - decoder = codec.NewDecoder(reader, &codec.MsgpackHandle{}).Decode + handle = &codec.MsgpackHandle{} } - if err := decoder(&rpt); err != nil { + if err := rpt.ReadBinary(reader, gzipped, handle); err != nil { respondWith(w, http.StatusBadRequest, err) return } - log.Debugf( - "Received report sizes: compressed %d bytes, uncompressed %d bytes (%.2f%%)", - compressedSize, - uncompressedSize, - float32(compressedSize)/float32(uncompressedSize)*100, - ) if err := a.Add(ctx, rpt); err != nil { log.Errorf("Error Adding report: %v", err) diff --git a/report/marshal.go b/report/marshal.go index 39fa7704d3..e477b57712 100644 --- a/report/marshal.go +++ b/report/marshal.go @@ -2,6 +2,7 @@ package report import ( "compress/gzip" + "encoding/gob" "io" "github.com/ugorji/go/codec" @@ -21,12 +22,22 @@ func (rep Report) WriteBinary(w io.Writer) error { } // ReadBinary reads into a Report from a gzipped msgpack. -func (rep *Report) ReadBinary(r io.Reader) error { - reader, err := gzip.NewReader(r) - if err != nil { - return err +// +// Will decompress the binary if gzipped is true, and will use the given +// codecHandle to decode it. If codecHandle is nil, will decode as a gob. +func (rep *Report) ReadBinary(r io.Reader, gzipped bool, codecHandle codec.Handle) error { + var err error + if gzipped { + r, err = gzip.NewReader(r) + if err != nil { + return err + } + } + decoder := gob.NewDecoder(r).Decode + if codecHandle != nil { + decoder = codec.NewDecoder(r, codecHandle).Decode } - if err := codec.NewDecoder(reader, &codec.MsgpackHandle{}).Decode(&rep); err != nil { + if err := decoder(&rep); err != nil { return err } return nil @@ -35,7 +46,7 @@ func (rep *Report) ReadBinary(r io.Reader) error { // MakeFromBinary constructs a Report from a gzipped msgpack. func MakeFromBinary(r io.Reader) (*Report, error) { rep := MakeReport() - if err := rep.ReadBinary(r); err != nil { + if err := rep.ReadBinary(r, true, &codec.MsgpackHandle{}); err != nil { return nil, err } return &rep, nil From e5417342baf222068e1e6dd342c9d306a0aa4c0e Mon Sep 17 00:00:00 2001 From: Jonathan Lange Date: Tue, 21 Jun 2016 10:33:15 +0100 Subject: [PATCH 3/8] Review comments --- report/marshal.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/report/marshal.go b/report/marshal.go index e477b57712..3630482d21 100644 --- a/report/marshal.go +++ b/report/marshal.go @@ -21,7 +21,7 @@ func (rep Report) WriteBinary(w io.Writer) error { return nil } -// ReadBinary reads into a Report from a gzipped msgpack. +// ReadBinary reads bytes into a Report. // // Will decompress the binary if gzipped is true, and will use the given // codecHandle to decode it. If codecHandle is nil, will decode as a gob. @@ -33,9 +33,11 @@ func (rep *Report) ReadBinary(r io.Reader, gzipped bool, codecHandle codec.Handl return err } } - decoder := gob.NewDecoder(r).Decode + var decoder func(interface{}) error if codecHandle != nil { decoder = codec.NewDecoder(r, codecHandle).Decode + } else { + decoder = gob.NewDecoder(r).Decode } if err := decoder(&rep); err != nil { return err From 8bd8f883a16976476da1b9c0b9d43dc636af7fac Mon Sep 17 00:00:00 2001 From: Jonathan Lange Date: Tue, 21 Jun 2016 11:08:55 +0100 Subject: [PATCH 4/8] Restore debugging logic --- app/router.go | 15 --------------- report/marshal.go | 29 +++++++++++++++++++++++++++++ 2 files changed, 29 insertions(+), 15 deletions(-) diff --git a/app/router.go b/app/router.go index f818a52ad2..7ce3748baa 100644 --- a/app/router.go +++ b/app/router.go @@ -100,21 +100,6 @@ func RegisterTopologyRoutes(router *mux.Router, r Reporter) { gzipHandler(requestContextDecorator(makeProbeHandler(r)))) } -type byteCounter struct { - next io.ReadCloser - count *uint64 -} - -func (c byteCounter) Read(p []byte) (n int, err error) { - n, err = c.next.Read(p) - *c.count += uint64(n) - return n, err -} - -func (c byteCounter) Close() error { - return c.next.Close() -} - // RegisterReportPostHandler registers the handler for report submission func RegisterReportPostHandler(a Adder, router *mux.Router) { post := router.Methods("POST").Subrouter() diff --git a/report/marshal.go b/report/marshal.go index 3630482d21..7c35f05f27 100644 --- a/report/marshal.go +++ b/report/marshal.go @@ -5,6 +5,7 @@ import ( "encoding/gob" "io" + log "github.com/Sirupsen/logrus" "github.com/ugorji/go/codec" ) @@ -21,18 +22,40 @@ func (rep Report) WriteBinary(w io.Writer) error { return nil } +type byteCounter struct { + next io.Reader + count *uint64 +} + +func (c byteCounter) Read(p []byte) (n int, err error) { + n, err = c.next.Read(p) + *c.count += uint64(n) + return n, err +} + // ReadBinary reads bytes into a Report. // // Will decompress the binary if gzipped is true, and will use the given // codecHandle to decode it. If codecHandle is nil, will decode as a gob. func (rep *Report) ReadBinary(r io.Reader, gzipped bool, codecHandle codec.Handle) error { var err error + var compressedSize, uncompressedSize uint64 + + // We have historically had trouble with reports being too large. We are + // keeping this instrumentation around to help us implement + // weaveworks/scope#985. + if log.GetLevel() == log.DebugLevel { + r = byteCounter{next: r, count: &compressedSize} + } if gzipped { r, err = gzip.NewReader(r) if err != nil { return err } } + if log.GetLevel() == log.DebugLevel { + r = byteCounter{next: r, count: &uncompressedSize} + } var decoder func(interface{}) error if codecHandle != nil { decoder = codec.NewDecoder(r, codecHandle).Decode @@ -42,6 +65,12 @@ func (rep *Report) ReadBinary(r io.Reader, gzipped bool, codecHandle codec.Handl if err := decoder(&rep); err != nil { return err } + log.Debugf( + "Received report sizes: compressed %d bytes, uncompressed %d bytes (%.2f%%)", + compressedSize, + uncompressedSize, + float32(compressedSize)/float32(uncompressedSize)*100, + ) return nil } From ce5c933d3c64face142a20c58ad656d7b9ce15a0 Mon Sep 17 00:00:00 2001 From: Jonathan Lange Date: Tue, 21 Jun 2016 11:14:14 +0100 Subject: [PATCH 5/8] Remove unused import --- app/router.go | 1 - 1 file changed, 1 deletion(-) diff --git a/app/router.go b/app/router.go index 7ce3748baa..37a3ffbed1 100644 --- a/app/router.go +++ b/app/router.go @@ -1,7 +1,6 @@ package app import ( - "io" "net/http" "net/url" "strings" From 23faf583b3e1f5f7afeff2490116db108c7c3399 Mon Sep 17 00:00:00 2001 From: Jonathan Lange Date: Tue, 21 Jun 2016 11:56:56 +0100 Subject: [PATCH 6/8] Drop gob support --- report/marshal.go | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/report/marshal.go b/report/marshal.go index 7c35f05f27..2cafd0568a 100644 --- a/report/marshal.go +++ b/report/marshal.go @@ -2,7 +2,6 @@ package report import ( "compress/gzip" - "encoding/gob" "io" log "github.com/Sirupsen/logrus" @@ -36,7 +35,7 @@ func (c byteCounter) Read(p []byte) (n int, err error) { // ReadBinary reads bytes into a Report. // // Will decompress the binary if gzipped is true, and will use the given -// codecHandle to decode it. If codecHandle is nil, will decode as a gob. +// codecHandle to decode it. func (rep *Report) ReadBinary(r io.Reader, gzipped bool, codecHandle codec.Handle) error { var err error var compressedSize, uncompressedSize uint64 @@ -56,13 +55,7 @@ func (rep *Report) ReadBinary(r io.Reader, gzipped bool, codecHandle codec.Handl if log.GetLevel() == log.DebugLevel { r = byteCounter{next: r, count: &uncompressedSize} } - var decoder func(interface{}) error - if codecHandle != nil { - decoder = codec.NewDecoder(r, codecHandle).Decode - } else { - decoder = gob.NewDecoder(r).Decode - } - if err := decoder(&rep); err != nil { + if err := codec.NewDecoder(r, codecHandle).Decode(&rep); err != nil { return err } log.Debugf( From 40cbf119d3ceb711606d843fda945c05893274a6 Mon Sep 17 00:00:00 2001 From: Jonathan Lange Date: Wed, 22 Jun 2016 10:02:18 +0100 Subject: [PATCH 7/8] Nice error on unsupported content type --- app/router.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/app/router.go b/app/router.go index 37a3ffbed1..27e71ce482 100644 --- a/app/router.go +++ b/app/router.go @@ -1,6 +1,7 @@ package app import ( + "fmt" "net/http" "net/url" "strings" @@ -109,11 +110,16 @@ func RegisterReportPostHandler(a Adder, router *mux.Router) { ) gzipped := strings.Contains(r.Header.Get("Content-Encoding"), "gzip") + contentType := r.Header.Get("Content-Type") var handle codec.Handle - if strings.HasPrefix(r.Header.Get("Content-Type"), "application/json") { + switch { + case strings.HasPrefix(contentType, "application/json"): handle = &codec.JsonHandle{} - } else if strings.HasPrefix(r.Header.Get("Content-Type"), "application/msgpack") { + case strings.HasPrefix(contentType, "application/msgpack"): handle = &codec.MsgpackHandle{} + default: + respondWith(w, http.StatusBadRequest, fmt.Errorf("Unsupported Content-Type: %v", contentType)) + return } if err := rpt.ReadBinary(reader, gzipped, handle); err != nil { From 9e0b27840bb2a80203f252f037546c5f328e3a66 Mon Sep 17 00:00:00 2001 From: Jonathan Lange Date: Wed, 22 Jun 2016 11:19:19 +0100 Subject: [PATCH 8/8] Delete test for unsupported functionality --- app/router_test.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/app/router_test.go b/app/router_test.go index 563320ca47..37980f37be 100644 --- a/app/router_test.go +++ b/app/router_test.go @@ -2,7 +2,6 @@ package app_test import ( "bytes" - "encoding/gob" "io/ioutil" "net/http" "net/http/httptest" @@ -84,11 +83,6 @@ func TestReportPostHandler(t *testing.T) { } } - test("", func(v interface{}) ([]byte, error) { - buf := &bytes.Buffer{} - err := gob.NewEncoder(buf).Encode(v) - return buf.Bytes(), err - }) test("application/json", func(v interface{}) ([]byte, error) { buf := &bytes.Buffer{} err := codec.NewEncoder(buf, &codec.JsonHandle{}).Encode(v)