Skip to content

Commit

Permalink
Merge pull request #1600 from weaveworks/extract-report-serialzn
Browse files Browse the repository at this point in the history
Helper for reading & writing from binary

Removes gob support from router.
  • Loading branch information
jml authored Jun 22, 2016
2 parents 29054f7 + 9e0b278 commit 37f482c
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 82 deletions.
21 changes: 2 additions & 19 deletions app/multitenant/dynamo_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package multitenant

import (
"bytes"
"compress/gzip"
"crypto/md5"
"fmt"
"io"
Expand All @@ -18,7 +17,6 @@ import (
"github.com/bluele/gcache"
"github.com/nats-io/nats"
"github.com/prometheus/client_golang/prometheus"
"github.com/ugorji/go/codec"
"golang.org/x/net/context"

"github.com/weaveworks/scope/app"
Expand Down Expand Up @@ -312,15 +310,7 @@ func (c *dynamoDBCollector) getNonCachedReport(reportKey string) (*report.Report
if err != nil {
return nil, err
}
reader, err := gzip.NewReader(resp.Body)
if err != nil {
return nil, err
}
rep := report.MakeReport()
if err := codec.NewDecoder(reader, &codec.MsgpackHandle{}).Decode(&rep); err != nil {
return nil, err
}
return &rep, nil
return report.MakeFromBinary(resp.Body)
}

func (c *dynamoDBCollector) getReports(userid string, row int64, start, end time.Time) ([]report.Report, error) {
Expand Down Expand Up @@ -387,14 +377,7 @@ func (c *dynamoDBCollector) Add(ctx context.Context, rep report.Report) error {

// first, encode the report into a buffer and record its size
var buf bytes.Buffer
writer, err := gzip.NewWriterLevel(&buf, gzip.BestCompression)
if err != nil {
return err
}
if err := codec.NewEncoder(writer, &codec.MsgpackHandle{}).Encode(&rep); err != nil {
return err
}
writer.Close()
rep.WriteBinary(&buf)
reportSize.Add(float64(buf.Len()))

// second, put the report on s3
Expand Down
63 changes: 15 additions & 48 deletions app/router.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package app

import (
"compress/gzip"
"encoding/gob"
"io"
"fmt"
"net/http"
"net/url"
"strings"
Expand Down Expand Up @@ -102,63 +100,32 @@ func RegisterTopologyRoutes(router *mux.Router, r Reporter) {
gzipHandler(requestContextDecorator(makeProbeHandler(r))))
}

type byteCounter struct {
next io.ReadCloser
count *uint64
}

func (c byteCounter) Read(p []byte) (n int, err error) {
n, err = c.next.Read(p)
*c.count += uint64(n)
return n, err
}

func (c byteCounter) Close() error {
return c.next.Close()
}

// RegisterReportPostHandler registers the handler for report submission
func RegisterReportPostHandler(a Adder, router *mux.Router) {
post := router.Methods("POST").Subrouter()
post.HandleFunc("/api/report", requestContextDecorator(func(ctx context.Context, w http.ResponseWriter, r *http.Request) {
var (
rpt report.Report
reader = r.Body
err error
compressedSize, uncompressedSize uint64
rpt report.Report
reader = r.Body
)

if log.GetLevel() == log.DebugLevel {
reader = byteCounter{next: reader, count: &compressedSize}
}
if strings.Contains(r.Header.Get("Content-Encoding"), "gzip") {
reader, err = gzip.NewReader(reader)
if err != nil {
respondWith(w, http.StatusBadRequest, err)
return
}
}

if log.GetLevel() == log.DebugLevel {
reader = byteCounter{next: reader, count: &uncompressedSize}
}
decoder := gob.NewDecoder(reader).Decode
if strings.HasPrefix(r.Header.Get("Content-Type"), "application/json") {
decoder = codec.NewDecoder(reader, &codec.JsonHandle{}).Decode
} else if strings.HasPrefix(r.Header.Get("Content-Type"), "application/msgpack") {
decoder = codec.NewDecoder(reader, &codec.MsgpackHandle{}).Decode
gzipped := strings.Contains(r.Header.Get("Content-Encoding"), "gzip")
contentType := r.Header.Get("Content-Type")
var handle codec.Handle
switch {
case strings.HasPrefix(contentType, "application/json"):
handle = &codec.JsonHandle{}
case strings.HasPrefix(contentType, "application/msgpack"):
handle = &codec.MsgpackHandle{}
default:
respondWith(w, http.StatusBadRequest, fmt.Errorf("Unsupported Content-Type: %v", contentType))
return
}

if err := decoder(&rpt); err != nil {
if err := rpt.ReadBinary(reader, gzipped, handle); err != nil {
respondWith(w, http.StatusBadRequest, err)
return
}
log.Debugf(
"Received report sizes: compressed %d bytes, uncompressed %d bytes (%.2f%%)",
compressedSize,
uncompressedSize,
float32(compressedSize)/float32(uncompressedSize)*100,
)

if err := a.Add(ctx, rpt); err != nil {
log.Errorf("Error Adding report: %v", err)
Expand Down
6 changes: 0 additions & 6 deletions app/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package app_test

import (
"bytes"
"encoding/gob"
"io/ioutil"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -84,11 +83,6 @@ func TestReportPostHandler(t *testing.T) {
}
}

test("", func(v interface{}) ([]byte, error) {
buf := &bytes.Buffer{}
err := gob.NewEncoder(buf).Encode(v)
return buf.Bytes(), err
})
test("application/json", func(v interface{}) ([]byte, error) {
buf := &bytes.Buffer{}
err := codec.NewEncoder(buf, &codec.JsonHandle{}).Encode(v)
Expand Down
10 changes: 1 addition & 9 deletions probe/appclient/report_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@ package appclient

import (
"bytes"
"compress/gzip"
"github.com/ugorji/go/codec"

"github.com/weaveworks/scope/report"
)

Expand All @@ -24,11 +21,6 @@ func NewReportPublisher(publisher Publisher) *ReportPublisher {
// Publish serialises and compresses a report, then passes it to a publisher
func (p *ReportPublisher) Publish(r report.Report) error {
buf := &bytes.Buffer{}
gzwriter := gzip.NewWriter(buf)
if err := codec.NewEncoder(gzwriter, &codec.MsgpackHandle{}).Encode(r); err != nil {
return err
}
gzwriter.Close() // otherwise the content won't get flushed to the output stream

r.WriteBinary(buf)
return p.publisher.Publish(buf)
}
77 changes: 77 additions & 0 deletions report/marshal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package report

import (
"compress/gzip"
"io"

log "github.com/Sirupsen/logrus"
"github.com/ugorji/go/codec"
)

// WriteBinary writes a Report as a gzipped msgpack.
func (rep Report) WriteBinary(w io.Writer) error {
gzwriter, err := gzip.NewWriterLevel(w, gzip.BestCompression)
if err != nil {
return err
}
if err = codec.NewEncoder(gzwriter, &codec.MsgpackHandle{}).Encode(&rep); err != nil {
return err
}
gzwriter.Close() // otherwise the content won't get flushed to the output stream
return nil
}

type byteCounter struct {
next io.Reader
count *uint64
}

func (c byteCounter) Read(p []byte) (n int, err error) {
n, err = c.next.Read(p)
*c.count += uint64(n)
return n, err
}

// ReadBinary reads bytes into a Report.
//
// Will decompress the binary if gzipped is true, and will use the given
// codecHandle to decode it.
func (rep *Report) ReadBinary(r io.Reader, gzipped bool, codecHandle codec.Handle) error {
var err error
var compressedSize, uncompressedSize uint64

// We have historically had trouble with reports being too large. We are
// keeping this instrumentation around to help us implement
// weaveworks/scope#985.
if log.GetLevel() == log.DebugLevel {
r = byteCounter{next: r, count: &compressedSize}
}
if gzipped {
r, err = gzip.NewReader(r)
if err != nil {
return err
}
}
if log.GetLevel() == log.DebugLevel {
r = byteCounter{next: r, count: &uncompressedSize}
}
if err := codec.NewDecoder(r, codecHandle).Decode(&rep); err != nil {
return err
}
log.Debugf(
"Received report sizes: compressed %d bytes, uncompressed %d bytes (%.2f%%)",
compressedSize,
uncompressedSize,
float32(compressedSize)/float32(uncompressedSize)*100,
)
return nil
}

// MakeFromBinary constructs a Report from a gzipped msgpack.
func MakeFromBinary(r io.Reader) (*Report, error) {
rep := MakeReport()
if err := rep.ReadBinary(r, true, &codec.MsgpackHandle{}); err != nil {
return nil, err
}
return &rep, nil
}

0 comments on commit 37f482c

Please sign in to comment.