From e30aee47c8cdb88a70f32013fcb15d96c3d43455 Mon Sep 17 00:00:00 2001 From: Mauro Stettler Date: Fri, 29 Sep 2017 13:53:45 +0200 Subject: [PATCH] add endpoint for tag based findSeries --- api/cluster.go | 9 ++ api/graphite.go | 87 +++++++++++++++ api/models/cluster.go | 5 + api/models/cluster_gen.go | 187 ++++++++++++++++++++++++++++----- api/models/cluster_gen_test.go | 113 ++++++++++++++++++++ api/models/graphite.go | 8 ++ api/models/node.go | 10 ++ api/routes.go | 2 + idx/cassandra/cassandra.go | 4 + idx/idx.go | 1 + idx/memory/memory.go | 16 ++- idx/memory/memory_test.go | 6 +- 12 files changed, 415 insertions(+), 33 deletions(-) diff --git a/api/cluster.go b/api/cluster.go index 51d7a82bc0..9d4b0c0ddf 100644 --- a/api/cluster.go +++ b/api/cluster.go @@ -116,6 +116,15 @@ func (s *Server) indexTagList(ctx *middleware.Context, req models.IndexTagList) response.Write(ctx, response.NewMsgp(200, &models.IndexTagListResp{Tags: tags})) } +func (s *Server) indexTagFindSeries(ctx *middleware.Context, req models.IndexTagFindSeries) { + ids, err := s.MetricIndex.IdsByTagExpressions(ctx.OrgId, req.Expressions) + if err != nil { + response.Write(ctx, response.NewError(http.StatusBadRequest, err.Error())) + return + } + response.Write(ctx, response.NewMsgp(200, &models.IndexTagFindSeriesResp{Series: ids})) +} + // IndexGet returns a msgp encoded schema.MetricDefinition func (s *Server) indexGet(ctx *middleware.Context, req models.IndexGet) { def, ok := s.MetricIndex.Get(req.Id) diff --git a/api/graphite.go b/api/graphite.go index 9d021c1fff..47d4038e4f 100644 --- a/api/graphite.go +++ b/api/graphite.go @@ -765,6 +765,93 @@ func (s *Server) clusterTag(ctx context.Context, orgId int, tag string) (map[str return result, nil } +func (s *Server) graphiteTagFindSeries(ctx *middleware.Context, request models.GraphiteTagFindSeries) { + series, err := s.clusterTagFindSeries(ctx.Req.Context(), ctx.OrgId, request.Expr) + if err != nil { + response.Write(ctx, response.WrapError(err)) + return + } + + response.Write(ctx, response.NewJson(200, series, "")) +} + +func (s *Server) clusterTagFindSeries(ctx context.Context, orgId int, expressions []string) ([]string, error) { + peers, err := cluster.MembersForQuery() + if err != nil { + log.Error(3, "HTTP tagList unable to get peers, %s", err) + return nil, err + } + + log.Debug("HTTP tagFindSeries across %d instances", len(peers)) + seriesSet := make(map[string]struct{}) + var errors []error + var mu sync.Mutex + var wg sync.WaitGroup + for _, peer := range peers { + wg.Add(1) + if peer.IsLocal() { + go func() { + result, err := s.MetricIndex.IdsByTagExpressions(orgId, expressions) + if err != nil { + log.Error(4, "HTTP Render error querying %s/index/tags/findSeries: %q", peer.Name, err) + mu.Lock() + errors = append(errors, err) + mu.Unlock() + return + } + mu.Lock() + for _, series := range result { + seriesSet[series] = struct{}{} + } + mu.Unlock() + wg.Done() + }() + } else { + go func(peer cluster.Node) { + log.Debug("HTTP Render querying %s/index/tags/findSeries for %d", peer.Name, orgId) + data := models.IndexTagFindSeries{OrgId: orgId, Expressions: expressions} + buf, err := peer.Post(ctx, "indexTagFindSeries", "/index/tags/findSeries", data) + + if err != nil { + log.Error(4, "HTTP Render error querying %s/index/tags/findSeries: %q", peer.Name, err) + mu.Lock() + errors = append(errors, err) + mu.Unlock() + return + } + + resp := models.IndexTagFindSeriesResp{} + _, err = resp.UnmarshalMsg(buf) + if err != nil { + log.Error(4, "HTTP Find() error unmarshaling body from %s/index/tags/findSeries: %q", peer.Name, err) + mu.Lock() + errors = append(errors, err) + mu.Unlock() + return + } + + mu.Lock() + for _, series := range resp.Series { + seriesSet[series] = struct{}{} + } + mu.Unlock() + wg.Done() + }(peer) + } + } + wg.Wait() + if len(errors) > 0 { + return nil, errors[0] + } + + series := make([]string, 0, len(seriesSet)) + for s := range seriesSet { + series = append(series, s) + } + + return series, nil +} + func (s *Server) graphiteTagList(ctx *middleware.Context, request models.GraphiteTagList) { tags, err := s.clusterTagList(ctx.Req.Context(), ctx.OrgId, request.From) if err != nil { diff --git a/api/models/cluster.go b/api/models/cluster.go index ba98e1060c..8449a3d621 100644 --- a/api/models/cluster.go +++ b/api/models/cluster.go @@ -34,3 +34,8 @@ type IndexTagListResp struct { type IndexTagResp struct { Values map[string]uint32 `json:"values"` } + +//go:generate msgp +type IndexTagFindSeriesResp struct { + Series []string `json:"series"` +} diff --git a/api/models/cluster_gen.go b/api/models/cluster_gen.go index e1d55dae10..16a49afb3f 100644 --- a/api/models/cluster_gen.go +++ b/api/models/cluster_gen.go @@ -337,6 +337,135 @@ func (z *IndexFindResp) Msgsize() (s int) { return } +// DecodeMsg implements msgp.Decodable +func (z *IndexTagFindSeriesResp) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var isz uint32 + isz, err = dc.ReadMapHeader() + if err != nil { + return + } + for isz > 0 { + isz-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + return + } + switch msgp.UnsafeString(field) { + case "Series": + var xsz uint32 + xsz, err = dc.ReadArrayHeader() + if err != nil { + return + } + if cap(z.Series) >= int(xsz) { + z.Series = z.Series[:xsz] + } else { + z.Series = make([]string, xsz) + } + for ajw := range z.Series { + z.Series[ajw], err = dc.ReadString() + if err != nil { + return + } + } + default: + err = dc.Skip() + if err != nil { + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *IndexTagFindSeriesResp) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 1 + // write "Series" + err = en.Append(0x81, 0xa6, 0x53, 0x65, 0x72, 0x69, 0x65, 0x73) + if err != nil { + return err + } + err = en.WriteArrayHeader(uint32(len(z.Series))) + if err != nil { + return + } + for ajw := range z.Series { + err = en.WriteString(z.Series[ajw]) + if err != nil { + return + } + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z *IndexTagFindSeriesResp) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 1 + // string "Series" + o = append(o, 0x81, 0xa6, 0x53, 0x65, 0x72, 0x69, 0x65, 0x73) + o = msgp.AppendArrayHeader(o, uint32(len(z.Series))) + for ajw := range z.Series { + o = msgp.AppendString(o, z.Series[ajw]) + } + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *IndexTagFindSeriesResp) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var isz uint32 + isz, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + return + } + for isz > 0 { + isz-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + return + } + switch msgp.UnsafeString(field) { + case "Series": + var xsz uint32 + xsz, bts, err = msgp.ReadArrayHeaderBytes(bts) + if err != nil { + return + } + if cap(z.Series) >= int(xsz) { + z.Series = z.Series[:xsz] + } else { + z.Series = make([]string, xsz) + } + for ajw := range z.Series { + z.Series[ajw], bts, err = msgp.ReadStringBytes(bts) + if err != nil { + return + } + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + return + } + } + } + o = bts + return +} + +func (z *IndexTagFindSeriesResp) Msgsize() (s int) { + s = 1 + 7 + msgp.ArrayHeaderSize + for ajw := range z.Series { + s += msgp.StringPrefixSize + len(z.Series[ajw]) + } + return +} + // DecodeMsg implements msgp.Decodable func (z *IndexTagListResp) DecodeMsg(dc *msgp.Reader) (err error) { var field []byte @@ -364,8 +493,8 @@ func (z *IndexTagListResp) DecodeMsg(dc *msgp.Reader) (err error) { } else { z.Tags = make([]string, xsz) } - for ajw := range z.Tags { - z.Tags[ajw], err = dc.ReadString() + for wht := range z.Tags { + z.Tags[wht], err = dc.ReadString() if err != nil { return } @@ -392,8 +521,8 @@ func (z *IndexTagListResp) EncodeMsg(en *msgp.Writer) (err error) { if err != nil { return } - for ajw := range z.Tags { - err = en.WriteString(z.Tags[ajw]) + for wht := range z.Tags { + err = en.WriteString(z.Tags[wht]) if err != nil { return } @@ -408,8 +537,8 @@ func (z *IndexTagListResp) MarshalMsg(b []byte) (o []byte, err error) { // string "Tags" o = append(o, 0x81, 0xa4, 0x54, 0x61, 0x67, 0x73) o = msgp.AppendArrayHeader(o, uint32(len(z.Tags))) - for ajw := range z.Tags { - o = msgp.AppendString(o, z.Tags[ajw]) + for wht := range z.Tags { + o = msgp.AppendString(o, z.Tags[wht]) } return } @@ -441,8 +570,8 @@ func (z *IndexTagListResp) UnmarshalMsg(bts []byte) (o []byte, err error) { } else { z.Tags = make([]string, xsz) } - for ajw := range z.Tags { - z.Tags[ajw], bts, err = msgp.ReadStringBytes(bts) + for wht := range z.Tags { + z.Tags[wht], bts, err = msgp.ReadStringBytes(bts) if err != nil { return } @@ -460,8 +589,8 @@ func (z *IndexTagListResp) UnmarshalMsg(bts []byte) (o []byte, err error) { func (z *IndexTagListResp) Msgsize() (s int) { s = 1 + 5 + msgp.ArrayHeaderSize - for ajw := range z.Tags { - s += msgp.StringPrefixSize + len(z.Tags[ajw]) + for wht := range z.Tags { + s += msgp.StringPrefixSize + len(z.Tags[wht]) } return } @@ -497,17 +626,17 @@ func (z *IndexTagResp) DecodeMsg(dc *msgp.Reader) (err error) { } for msz > 0 { msz-- - var wht string - var hct uint32 - wht, err = dc.ReadString() + var hct string + var cua uint32 + hct, err = dc.ReadString() if err != nil { return } - hct, err = dc.ReadUint32() + cua, err = dc.ReadUint32() if err != nil { return } - z.Values[wht] = hct + z.Values[hct] = cua } default: err = dc.Skip() @@ -531,12 +660,12 @@ func (z *IndexTagResp) EncodeMsg(en *msgp.Writer) (err error) { if err != nil { return } - for wht, hct := range z.Values { - err = en.WriteString(wht) + for hct, cua := range z.Values { + err = en.WriteString(hct) if err != nil { return } - err = en.WriteUint32(hct) + err = en.WriteUint32(cua) if err != nil { return } @@ -551,9 +680,9 @@ func (z *IndexTagResp) MarshalMsg(b []byte) (o []byte, err error) { // string "Values" o = append(o, 0x81, 0xa6, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x73) o = msgp.AppendMapHeader(o, uint32(len(z.Values))) - for wht, hct := range z.Values { - o = msgp.AppendString(o, wht) - o = msgp.AppendUint32(o, hct) + for hct, cua := range z.Values { + o = msgp.AppendString(o, hct) + o = msgp.AppendUint32(o, cua) } return } @@ -588,18 +717,18 @@ func (z *IndexTagResp) UnmarshalMsg(bts []byte) (o []byte, err error) { } } for msz > 0 { - var wht string - var hct uint32 + var hct string + var cua uint32 msz-- - wht, bts, err = msgp.ReadStringBytes(bts) + hct, bts, err = msgp.ReadStringBytes(bts) if err != nil { return } - hct, bts, err = msgp.ReadUint32Bytes(bts) + cua, bts, err = msgp.ReadUint32Bytes(bts) if err != nil { return } - z.Values[wht] = hct + z.Values[hct] = cua } default: bts, err = msgp.Skip(bts) @@ -615,9 +744,9 @@ func (z *IndexTagResp) UnmarshalMsg(bts []byte) (o []byte, err error) { func (z *IndexTagResp) Msgsize() (s int) { s = 1 + 7 + msgp.MapHeaderSize if z.Values != nil { - for wht, hct := range z.Values { - _ = hct - s += msgp.StringPrefixSize + len(wht) + msgp.Uint32Size + for hct, cua := range z.Values { + _ = cua + s += msgp.StringPrefixSize + len(hct) + msgp.Uint32Size } } return diff --git a/api/models/cluster_gen_test.go b/api/models/cluster_gen_test.go index 9b5b8ebe73..41395c5e64 100644 --- a/api/models/cluster_gen_test.go +++ b/api/models/cluster_gen_test.go @@ -237,6 +237,119 @@ func BenchmarkDecodeIndexFindResp(b *testing.B) { } } +func TestMarshalUnmarshalIndexTagFindSeriesResp(t *testing.T) { + v := IndexTagFindSeriesResp{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgIndexTagFindSeriesResp(b *testing.B) { + v := IndexTagFindSeriesResp{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgIndexTagFindSeriesResp(b *testing.B) { + v := IndexTagFindSeriesResp{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalIndexTagFindSeriesResp(b *testing.B) { + v := IndexTagFindSeriesResp{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodeIndexTagFindSeriesResp(t *testing.T) { + v := IndexTagFindSeriesResp{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Logf("WARNING: Msgsize() for %v is inaccurate", v) + } + + vn := IndexTagFindSeriesResp{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodeIndexTagFindSeriesResp(b *testing.B) { + v := IndexTagFindSeriesResp{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodeIndexTagFindSeriesResp(b *testing.B) { + v := IndexTagFindSeriesResp{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} + func TestMarshalUnmarshalIndexTagListResp(t *testing.T) { v := IndexTagListResp{} bts, err := v.MarshalMsg(nil) diff --git a/api/models/graphite.go b/api/models/graphite.go index dccac5aaf5..db348081a6 100644 --- a/api/models/graphite.go +++ b/api/models/graphite.go @@ -74,6 +74,14 @@ type GraphiteTagResp struct { Values []GraphiteTagValueResp `json:"values"` } +type GraphiteTagFindSeries struct { + Expr []string `json:"expr" form:"expr"` +} + +type GraphiteTagFindSeriesResp struct { + Series []string `json:"series"` +} + type GraphiteFind struct { FromTo Query string `json:"query" form:"query" binding:"Required"` diff --git a/api/models/node.go b/api/models/node.go index b19960f5e3..5b68557d86 100644 --- a/api/models/node.go +++ b/api/models/node.go @@ -32,6 +32,16 @@ func (i IndexList) Trace(span opentracing.Span) { span.SetTag("org", i.OrgId) } +type IndexTagFindSeries struct { + OrgId int `json:"orgId" form:"orgId" binding:"Required"` + Expressions []string `json:"expressions"` +} + +func (t IndexTagFindSeries) Trace(span opentracing.Span) { + span.SetTag("org", t.OrgId) + span.SetTag("expressions", t.Expressions) +} + type IndexTag struct { OrgId int `json:"orgId" form:"orgId" binding:"Required"` Tag string `json:"tag"` diff --git a/api/routes.go b/api/routes.go index 52ccffca46..2b8c3ff540 100644 --- a/api/routes.go +++ b/api/routes.go @@ -38,6 +38,7 @@ func (s *Server) RegisterRoutes() { r.Combo("/index/delete", ready, bind(models.IndexDelete{})).Get(s.indexDelete).Post(s.indexDelete) r.Combo("/index/get", ready, bind(models.IndexGet{})).Get(s.indexGet).Post(s.indexGet) r.Combo("/index/tags", ready, bind(models.IndexTagList{})).Get(s.indexTagList).Post(s.indexTagList) + r.Combo("/index/tags/findSeries", ready, bind(models.IndexTagFindSeries{})).Get(s.indexTagFindSeries).Post(s.indexTagFindSeries) r.Combo("/index/tags/:tag([0-9a-zA-Z]+)", ready, bind(models.IndexTag{})).Get(s.indexTag).Post(s.indexTag) r.Options("/*", func(ctx *macaron.Context) { @@ -51,4 +52,5 @@ func (s *Server) RegisterRoutes() { r.Post("/metrics/delete", withOrg, ready, bind(models.MetricsDelete{}), s.metricsDelete) r.Combo("/tags", withOrg, ready, bind(models.GraphiteTagList{})).Get(s.graphiteTagList).Post(s.graphiteTagList) r.Combo("/tags/:tag([0-9a-zA-Z]+)", withOrg, ready, bind(models.GraphiteTag{})).Get(s.graphiteTag).Post(s.graphiteTag) + r.Combo("/tags/findSeries", withOrg, ready, bind(models.GraphiteTagFindSeries{})).Get(s.graphiteTagFindSeries).Post(s.graphiteTagFindSeries) } diff --git a/idx/cassandra/cassandra.go b/idx/cassandra/cassandra.go index 3d8e537a31..405e770d40 100644 --- a/idx/cassandra/cassandra.go +++ b/idx/cassandra/cassandra.go @@ -505,3 +505,7 @@ func (c *CasIdx) TagList(orgId int, from uint32) []string { func (c *CasIdx) Tag(orgId int, tag string) map[string]uint32 { return c.MemoryIdx.Tag(orgId, tag) } + +func (c *CasIdx) IdsByTagExpressions(orgId int, expressions []string) ([]string, error) { + return c.MemoryIdx.IdsByTagExpressions(orgId, expressions) +} diff --git a/idx/idx.go b/idx/idx.go index 4ebaff9054..a0f7f506f1 100644 --- a/idx/idx.go +++ b/idx/idx.go @@ -114,4 +114,5 @@ type MetricIndex interface { Prune(int, time.Time) ([]Archive, error) TagList(int, uint32) []string Tag(int, string) map[string]uint32 + IdsByTagExpressions(int, []string) ([]string, error) } diff --git a/idx/memory/memory.go b/idx/memory/memory.go index 4e9598b361..eace265ae7 100644 --- a/idx/memory/memory.go +++ b/idx/memory/memory.go @@ -400,7 +400,21 @@ func (m *MemoryIdx) TagList(orgId int, from uint32) []string { return results } -func (m *MemoryIdx) IdsByTagQuery(orgId int, query TagQuery) map[string]struct{} { +func (m *MemoryIdx) IdsByTagExpressions(orgId int, expressions []string) ([]string, error) { + query, err := NewTagQuery(expressions) + if err != nil { + return nil, err + } + + seriesMap := m.IdsByTagQuery(orgId, query) + res := make([]string, 0, len(seriesMap)) + for s := range seriesMap { + res = append(res, s) + } + return res, nil +} + +func (m *MemoryIdx) IdsByTagQuery(orgId int, query *TagQuery) map[string]struct{} { m.RLock() defer m.RUnlock() diff --git a/idx/memory/memory_test.go b/idx/memory/memory_test.go index 3b1193cb9f..e65be3901d 100644 --- a/idx/memory/memory_test.go +++ b/idx/memory/memory_test.go @@ -571,7 +571,7 @@ func TestGetByTag(t *testing.T) { if err != nil { t.Fatalf("Got an unexpected error with query %s: %s", expressions[i], err) } - res := ix.IdsByTagQuery(1, *tagQuery) + res := ix.IdsByTagQuery(1, tagQuery) if len(res) != expecting[i] { t.Fatalf("Expected %d in test %d results, but got %d: %q", expecting[i], i, len(res), res) } @@ -596,7 +596,7 @@ func TestDeleteTaggedSeries(t *testing.T) { } tagQuery, _ := NewTagQuery([]string{"key1=value1", "key2=value2"}) - res := ix.IdsByTagQuery(orgId, *tagQuery) + res := ix.IdsByTagQuery(orgId, tagQuery) if len(res) != 1 { t.Fatalf(fmt.Sprintf("Expected to get 1 result, but got %d", len(res))) @@ -615,7 +615,7 @@ func TestDeleteTaggedSeries(t *testing.T) { t.Fatalf(fmt.Sprintf("Expected 1 metric to get deleted, but got %d", len(deleted))) } - res = ix.IdsByTagQuery(orgId, *tagQuery) + res = ix.IdsByTagQuery(orgId, tagQuery) if len(res) != 0 { t.Fatalf(fmt.Sprintf("Expected to get 0 results, but got %d", len(res)))