Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

allow metrictank to run as graphite 1.0 cluster node #611

Merged
merged 2 commits into from
Apr 25, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ and bugs to fix. It should be considered an *alpha* project.

* no performance/availability isolation between tenants per instance. (only data isolation)
* clustering is basic: statically defined peers, master promotions are manual, etc. See [clustering](https://github.com/raintank/metrictank/blob/master/docs/clustering.md) for more.
* no computation locality: we move the data from storage to processing code, which is both metrictank and graphite-api.
* minimum computation locality: we move the data from storage to processing code, which is both metrictank and graphite.
* the datastructures can use performance engineering. [A Go GC issue may occassionally inflate response times](https://github.com/golang/go/issues/14812).
* the native input protocol is inefficient. Should not send all metadata with each point.
* we use metrics2.0 in native input protocol and indexes, but [barely do anything with it yet](https://github.com/raintank/metrictank/blob/master/docs/tags.md).
Expand All @@ -37,8 +37,7 @@ Otherwise data loss of current chunks will be incurred. See [operations guide](
## main features

* 100% open source
* graphite is a first class citizen (note: currently requires a [fork of graphite-api](https://github.com/raintank/graphite-api/)
and the [graphite-metrictank](https://github.com/raintank/graphite-metrictank) plugin)
* graphite is a first class citizen. As of graphite-1.0.1, metrictank can be used as a graphite CLUSTER_SERVER.
* accurate, flexible rollups by storing min/max/sum/count (which also gives us average).
So we can do consolidation (combined runtime+archived) accurately and correctly,
[unlike most other graphite backends like whisper](https://blog.raintank.io/25-graphite-grafana-and-statsd-gotchas/#runtime.consolidation)
Expand Down Expand Up @@ -71,7 +70,7 @@ So we can do consolidation (combined runtime+archived) accurately and correctly,
* [Consolidation](https://github.com/raintank/metrictank/blob/master/docs/consolidation.md)
* [Multi-tenancy](https://github.com/raintank/metrictank/blob/master/docs/multi-tenancy.md)
* [HTTP api](https://github.com/raintank/metrictank/blob/master/docs/http-api.md)
* [Graphite-api](https://github.com/raintank/metrictank/blob/master/docs/graphite-api.md)
* [Graphite](https://github.com/raintank/metrictank/blob/master/docs/graphite.md)
* [Metadata](https://github.com/raintank/metrictank/blob/master/docs/metadata.md)
* [Tags](https://github.com/raintank/metrictank/blob/master/docs/tags.md)
* [Usage reporting](https://github.com/raintank/metrictank/blob/master/docs/usage-reporting.md)
Expand Down
44 changes: 29 additions & 15 deletions api/graphite.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,10 @@ func (s *Server) renderMetrics(ctx *middleware.Context, request models.GraphiteR
plan, err := expr.NewPlan(exprs, fromUnix, toUnix, request.MaxDataPoints, stable, nil)
if err != nil {
if fun, ok := err.(expr.ErrUnknownFunction); ok {
if request.NoProxy {
ctx.Error(http.StatusBadRequest, "localOnly requested, but the request cant be handled locally")
return
}
ctx.Req.Request.Body = ctx.Body
graphiteProxy.ServeHTTP(ctx.Resp, ctx.Req.Request)
proxyStats.Miss(string(fun))
Expand All @@ -202,9 +206,12 @@ func (s *Server) renderMetrics(ctx *middleware.Context, request models.GraphiteR
}
sort.Sort(models.SeriesByTarget(out))

if request.Format == "msgp" {
switch request.Format {
case "msgp":
response.Write(ctx, response.NewMsgp(200, models.SeriesByTarget(out)))
} else {
case "pickle":
response.Write(ctx, response.NewPickle(200, models.SeriesPickleFormat(out)))
default:
response.Write(ctx, response.NewFastJson(200, models.SeriesByTarget(out)))
}
plan.Clean()
Expand Down Expand Up @@ -232,19 +239,14 @@ func (s *Server) metricsFind(ctx *middleware.Context, request models.GraphiteFin
}
}

var b interface{}
switch request.Format {
case "", "treejson", "json":
b, err = findTreejson(request.Query, nodes)
response.Write(ctx, response.NewJson(200, findTreejson(request.Query, nodes), request.Jsonp))
case "completer":
b, err = findCompleter(nodes)
}

if err != nil {
response.Write(ctx, response.WrapError(err))
return
response.Write(ctx, response.NewJson(200, findCompleter(nodes), request.Jsonp))
case "pickle":
response.Write(ctx, response.NewPickle(200, findPickle(nodes, request)))
}
response.Write(ctx, response.NewJson(200, b, request.Jsonp))
}

func (s *Server) listLocal(orgId int) []idx.Archive {
Expand Down Expand Up @@ -326,7 +328,7 @@ func (s *Server) metricsIndex(ctx *middleware.Context) {
response.Write(ctx, response.NewFastJson(200, models.MetricNames(series)))
}

func findCompleter(nodes []idx.Node) (models.SeriesCompleter, error) {
func findCompleter(nodes []idx.Node) models.SeriesCompleter {
var result = models.NewSeriesCompleter()
for _, g := range nodes {
c := models.SeriesCompleterItem{
Expand All @@ -347,12 +349,24 @@ func findCompleter(nodes []idx.Node) (models.SeriesCompleter, error) {
result.Add(c)
}

return result, nil
return result
}

func findPickle(nodes []idx.Node, request models.GraphiteFind) models.SeriesPickle {
result := make([]models.SeriesPickleItem, len(nodes))
var intervals [][]int64
if request.From != 0 && request.Until != 0 {
intervals = [][]int64{{request.From, request.Until}}
}
for i, g := range nodes {
result[i] = models.NewSeriesPickleItem(g.Path, g.Leaf, intervals)
}
return result
}

var treejsonContext = make(map[string]int)

func findTreejson(query string, nodes []idx.Node) (models.SeriesTree, error) {
func findTreejson(query string, nodes []idx.Node) models.SeriesTree {
tree := models.NewSeriesTree()
seen := make(map[string]struct{})

Expand Down Expand Up @@ -392,7 +406,7 @@ func findTreejson(query string, nodes []idx.Node) (models.SeriesTree, error) {
}
tree.Add(&t)
}
return *tree, nil
return *tree
}

func (s *Server) metricsDelete(ctx *middleware.Context, req models.MetricsDelete) {
Expand Down
21 changes: 19 additions & 2 deletions api/models/graphite.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ type GraphiteRender struct {
From string `json:"from" form:"from"`
Until string `json:"until" form:"until"`
To string `json:"to" form:"to"`
Format string `json:"format" form:"format" binding:"In(,json,msgp)"`
Format string `json:"format" form:"format" binding:"In(,json,msgp,pickle)"`
NoProxy bool `json:"local" form:"local"` //this is set to true by graphite-web when it passes request to cluster servers
Process string `json:"process" form:"process" binding:"In(,none,stable,any);Default(stable)"`
}

Expand Down Expand Up @@ -44,7 +45,7 @@ type GraphiteFind struct {
Query string `json:"query" form:"query" binding:"Required"`
From int64 `json:"from" form:"from"`
Until int64 `json:"until" form:"until"`
Format string `json:"format" form:"format" binding:"In(,completer,json,treejson)"`
Format string `json:"format" form:"format" binding:"In(,completer,json,treejson,pickle)"`
Jsonp string `json:"jsonp" form:"jsonp"`
}

Expand Down Expand Up @@ -99,6 +100,22 @@ type SeriesCompleterItem struct {
IsLeaf string `json:"is_leaf"`
}

type SeriesPickle []SeriesPickleItem

type SeriesPickleItem struct {
Path string `pickle:"path"`
IsLeaf bool `pickle:"isLeaf"`
Intervals [][]int64 `pickle:"intervals"` // list of (start,end) tuples
}

func NewSeriesPickleItem(path string, isLeaf bool, intervals [][]int64) SeriesPickleItem {
return SeriesPickleItem{
Path: path,
IsLeaf: isLeaf,
Intervals: intervals,
}
}

type SeriesTree []SeriesTreeItem

func NewSeriesTree() *SeriesTree {
Expand Down
28 changes: 28 additions & 0 deletions api/models/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,31 @@ func (series SeriesByTarget) MarshalJSONFast(b []byte) ([]byte, error) {
func (series SeriesByTarget) MarshalJSON() ([]byte, error) {
return series.MarshalJSONFast(nil)
}

type SeriesForPickle struct {
Name string `pickle:"name"`
Start uint32 `pickle:"start"`
End uint32 `pickle:"end"`
Step uint32 `pickle:"step"`
Values []float64 `pickle:"values"`
PathExpression string `pickle:"pathExpression"`
}

func SeriesPickleFormat(data []Series) []SeriesForPickle {
result := make([]SeriesForPickle, len(data))
for i, s := range data {
datapoints := make([]float64, len(s.Datapoints))
for i, p := range s.Datapoints {
datapoints[i] = p.Val
}
result[i] = SeriesForPickle{
Name: s.Target,
Start: s.QueryFrom,
End: s.QueryTo,
Step: s.Interval,
Values: datapoints,
PathExpression: s.QueryPatt,
}
}
return result
}
39 changes: 39 additions & 0 deletions api/response/pickle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package response

import (
"bytes"
pickle "github.com/kisielk/og-rek"
)

type Pickle struct {
code int
body interface{}
buf []byte
}

func NewPickle(code int, body interface{}) *Pickle {
return &Pickle{
code: code,
body: body,
buf: BufferPool.Get(),
}
}

func (r *Pickle) Code() int {
return r.code
}

func (r *Pickle) Close() {
BufferPool.Put(r.buf)
}

func (r *Pickle) Body() ([]byte, error) {
buffer := bytes.NewBuffer(r.buf)
encoder := pickle.NewEncoder(buffer)
err := encoder.Encode(r.body)
return buffer.Bytes(), err
}

func (r *Pickle) Headers() (headers map[string]string) {
return map[string]string{"content-type": "application/pickle"}
}
138 changes: 138 additions & 0 deletions api/response/pickle_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package response

import (
"math"
"testing"

"github.com/raintank/metrictank/api/models"
"gopkg.in/raintank/schema.v1"
)

func BenchmarkHttpRespPickleEmptySeries(b *testing.B) {
data := []models.Series{
{
Target: "an.empty.series",
Datapoints: make([]schema.Point, 0),
Interval: 10,
},
}
var resp *Pickle
for n := 0; n < b.N; n++ {
resp = NewPickle(200, models.SeriesPickleFormat(data))
resp.Body()
resp.Close()
}
}

func BenchmarkHttpRespPickleEmptySeriesNeedsEscaping(b *testing.B) {
data := []models.Series{
{
Target: `an.empty\series`,
Datapoints: make([]schema.Point, 0),
Interval: 10,
},
}
var resp *Pickle
for n := 0; n < b.N; n++ {
resp = NewPickle(200, models.SeriesPickleFormat(data))
resp.Body()
resp.Close()
}
}

func BenchmarkHttpRespPickleIntegers(b *testing.B) {
points := make([]schema.Point, 1000, 1000)
baseTs := 1500000000
for i := 0; i < 1000; i++ {
points[i] = schema.Point{Val: float64(10000 * i), Ts: uint32(baseTs + 10*i)}
}
data := []models.Series{
{
Target: "some.metric.with.a-whole-bunch-of.integers",
Datapoints: points,
Interval: 10,
},
}
b.SetBytes(int64(len(points) * 12))

b.ResetTimer()
var resp *Pickle
for n := 0; n < b.N; n++ {
resp = NewPickle(200, models.SeriesPickleFormat(data))
resp.Body()
resp.Close()
}
}

func BenchmarkHttpRespPickleFloats(b *testing.B) {
points := make([]schema.Point, 1000, 1000)
baseTs := 1500000000
for i := 0; i < 1000; i++ {
points[i] = schema.Point{Val: 12.34 * float64(i), Ts: uint32(baseTs + 10*i)}
}
data := []models.Series{
{
Target: "some.metric.with.a-whole-bunch-of.floats",
Datapoints: points,
Interval: 10,
},
}
b.SetBytes(int64(len(points) * 12))

b.ResetTimer()
var resp *Pickle
for n := 0; n < b.N; n++ {
resp = NewPickle(200, models.SeriesPickleFormat(data))
resp.Body()
resp.Close()
}
}

func BenchmarkHttpRespPickleNulls(b *testing.B) {
points := make([]schema.Point, 1000, 1000)
baseTs := 1500000000
for i := 0; i < 1000; i++ {
points[i] = schema.Point{Val: math.NaN(), Ts: uint32(baseTs + 10*i)}
}
data := []models.Series{
{
Target: "some.metric.with.a-whole-bunch-of.nulls",
Datapoints: points,
Interval: 10,
},
}
b.SetBytes(int64(len(points) * 12))

b.ResetTimer()
var resp *Pickle
for n := 0; n < b.N; n++ {
resp = NewPickle(200, models.SeriesPickleFormat(data))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm curious what the cost of models.SeriesPickleFormat itself is, that would be very interesting to also have as a separate benchmark.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i added the benchmark.

resp.Body()
resp.Close()
}
}

var foo []models.SeriesForPickle

func BenchmarkConvertSeriesToPickleFormat10k(b *testing.B) {
points := make([]schema.Point, 10000)
baseTs := 1500000000
for i := 0; i < 10000; i++ {
points[i] = schema.Point{Val: 1.2345 * float64(i), Ts: uint32(baseTs + 10*i)}
}
data := []models.Series{
{
Target: "foo",
Datapoints: points,
Interval: 10,
},
}
b.SetBytes(int64(len(points) * 12))

b.ResetTimer()
var bar []models.SeriesForPickle
for n := 0; n < b.N; n++ {
bar = models.SeriesPickleFormat(data)
}
foo = bar
}
2 changes: 1 addition & 1 deletion api/response/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

var ErrMetricNotFound = errors.New("metric not found")

var BufferPool = util.NewBufferPool() // used by fastjson and mspg responses to serialize into
var BufferPool = util.NewBufferPool() // used by pickle, fastjson and msgp responses to serialize into

func Write(w http.ResponseWriter, resp Response) {
defer resp.Close()
Expand Down
2 changes: 1 addition & 1 deletion cmd/mt-explain/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func main() {
plan, err := expr.NewPlan(exps, fromUnix, toUnix, uint32(*mdp), *stable, nil)
if err != nil {
if fun, ok := err.(expr.ErrUnknownFunction); ok {
fmt.Printf("Unsupported function %q: must defer query to graphite-api\n", string(fun))
fmt.Printf("Unsupported function %q: must defer query to graphite\n", string(fun))
plan.Dump(os.Stdout)
return
}
Expand Down
Loading