Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
add endpoint for tag based findSeries
Browse files Browse the repository at this point in the history
  • Loading branch information
replay committed Sep 29, 2017
1 parent ba2a7b4 commit e30aee4
Show file tree
Hide file tree
Showing 12 changed files with 415 additions and 33 deletions.
9 changes: 9 additions & 0 deletions api/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,15 @@ func (s *Server) indexTagList(ctx *middleware.Context, req models.IndexTagList)
response.Write(ctx, response.NewMsgp(200, &models.IndexTagListResp{Tags: tags}))
}

func (s *Server) indexTagFindSeries(ctx *middleware.Context, req models.IndexTagFindSeries) {
ids, err := s.MetricIndex.IdsByTagExpressions(ctx.OrgId, req.Expressions)
if err != nil {
response.Write(ctx, response.NewError(http.StatusBadRequest, err.Error()))
return
}
response.Write(ctx, response.NewMsgp(200, &models.IndexTagFindSeriesResp{Series: ids}))
}

// IndexGet returns a msgp encoded schema.MetricDefinition
func (s *Server) indexGet(ctx *middleware.Context, req models.IndexGet) {
def, ok := s.MetricIndex.Get(req.Id)
Expand Down
87 changes: 87 additions & 0 deletions api/graphite.go
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,93 @@ func (s *Server) clusterTag(ctx context.Context, orgId int, tag string) (map[str
return result, nil
}

func (s *Server) graphiteTagFindSeries(ctx *middleware.Context, request models.GraphiteTagFindSeries) {
series, err := s.clusterTagFindSeries(ctx.Req.Context(), ctx.OrgId, request.Expr)
if err != nil {
response.Write(ctx, response.WrapError(err))
return
}

response.Write(ctx, response.NewJson(200, series, ""))
}

func (s *Server) clusterTagFindSeries(ctx context.Context, orgId int, expressions []string) ([]string, error) {
peers, err := cluster.MembersForQuery()
if err != nil {
log.Error(3, "HTTP tagList unable to get peers, %s", err)
return nil, err
}

log.Debug("HTTP tagFindSeries across %d instances", len(peers))
seriesSet := make(map[string]struct{})
var errors []error
var mu sync.Mutex
var wg sync.WaitGroup
for _, peer := range peers {
wg.Add(1)
if peer.IsLocal() {
go func() {
result, err := s.MetricIndex.IdsByTagExpressions(orgId, expressions)
if err != nil {
log.Error(4, "HTTP Render error querying %s/index/tags/findSeries: %q", peer.Name, err)
mu.Lock()
errors = append(errors, err)
mu.Unlock()
return
}
mu.Lock()
for _, series := range result {
seriesSet[series] = struct{}{}
}
mu.Unlock()
wg.Done()
}()
} else {
go func(peer cluster.Node) {
log.Debug("HTTP Render querying %s/index/tags/findSeries for %d", peer.Name, orgId)
data := models.IndexTagFindSeries{OrgId: orgId, Expressions: expressions}
buf, err := peer.Post(ctx, "indexTagFindSeries", "/index/tags/findSeries", data)

if err != nil {
log.Error(4, "HTTP Render error querying %s/index/tags/findSeries: %q", peer.Name, err)
mu.Lock()
errors = append(errors, err)
mu.Unlock()
return
}

resp := models.IndexTagFindSeriesResp{}
_, err = resp.UnmarshalMsg(buf)
if err != nil {
log.Error(4, "HTTP Find() error unmarshaling body from %s/index/tags/findSeries: %q", peer.Name, err)
mu.Lock()
errors = append(errors, err)
mu.Unlock()
return
}

mu.Lock()
for _, series := range resp.Series {
seriesSet[series] = struct{}{}
}
mu.Unlock()
wg.Done()
}(peer)
}
}
wg.Wait()
if len(errors) > 0 {
return nil, errors[0]
}

series := make([]string, 0, len(seriesSet))
for s := range seriesSet {
series = append(series, s)
}

return series, nil
}

func (s *Server) graphiteTagList(ctx *middleware.Context, request models.GraphiteTagList) {
tags, err := s.clusterTagList(ctx.Req.Context(), ctx.OrgId, request.From)
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions api/models/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,8 @@ type IndexTagListResp struct {
type IndexTagResp struct {
Values map[string]uint32 `json:"values"`
}

//go:generate msgp
type IndexTagFindSeriesResp struct {
Series []string `json:"series"`
}
187 changes: 158 additions & 29 deletions api/models/cluster_gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,135 @@ func (z *IndexFindResp) Msgsize() (s int) {
return
}

// DecodeMsg implements msgp.Decodable
func (z *IndexTagFindSeriesResp) DecodeMsg(dc *msgp.Reader) (err error) {
var field []byte
_ = field
var isz uint32
isz, err = dc.ReadMapHeader()
if err != nil {
return
}
for isz > 0 {
isz--
field, err = dc.ReadMapKeyPtr()
if err != nil {
return
}
switch msgp.UnsafeString(field) {
case "Series":
var xsz uint32
xsz, err = dc.ReadArrayHeader()
if err != nil {
return
}
if cap(z.Series) >= int(xsz) {
z.Series = z.Series[:xsz]
} else {
z.Series = make([]string, xsz)
}
for ajw := range z.Series {
z.Series[ajw], err = dc.ReadString()
if err != nil {
return
}
}
default:
err = dc.Skip()
if err != nil {
return
}
}
}
return
}

// EncodeMsg implements msgp.Encodable
func (z *IndexTagFindSeriesResp) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 1
// write "Series"
err = en.Append(0x81, 0xa6, 0x53, 0x65, 0x72, 0x69, 0x65, 0x73)
if err != nil {
return err
}
err = en.WriteArrayHeader(uint32(len(z.Series)))
if err != nil {
return
}
for ajw := range z.Series {
err = en.WriteString(z.Series[ajw])
if err != nil {
return
}
}
return
}

// MarshalMsg implements msgp.Marshaler
func (z *IndexTagFindSeriesResp) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 1
// string "Series"
o = append(o, 0x81, 0xa6, 0x53, 0x65, 0x72, 0x69, 0x65, 0x73)
o = msgp.AppendArrayHeader(o, uint32(len(z.Series)))
for ajw := range z.Series {
o = msgp.AppendString(o, z.Series[ajw])
}
return
}

// UnmarshalMsg implements msgp.Unmarshaler
func (z *IndexTagFindSeriesResp) UnmarshalMsg(bts []byte) (o []byte, err error) {
var field []byte
_ = field
var isz uint32
isz, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
return
}
for isz > 0 {
isz--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
return
}
switch msgp.UnsafeString(field) {
case "Series":
var xsz uint32
xsz, bts, err = msgp.ReadArrayHeaderBytes(bts)
if err != nil {
return
}
if cap(z.Series) >= int(xsz) {
z.Series = z.Series[:xsz]
} else {
z.Series = make([]string, xsz)
}
for ajw := range z.Series {
z.Series[ajw], bts, err = msgp.ReadStringBytes(bts)
if err != nil {
return
}
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
return
}
}
}
o = bts
return
}

func (z *IndexTagFindSeriesResp) Msgsize() (s int) {
s = 1 + 7 + msgp.ArrayHeaderSize
for ajw := range z.Series {
s += msgp.StringPrefixSize + len(z.Series[ajw])
}
return
}

// DecodeMsg implements msgp.Decodable
func (z *IndexTagListResp) DecodeMsg(dc *msgp.Reader) (err error) {
var field []byte
Expand Down Expand Up @@ -364,8 +493,8 @@ func (z *IndexTagListResp) DecodeMsg(dc *msgp.Reader) (err error) {
} else {
z.Tags = make([]string, xsz)
}
for ajw := range z.Tags {
z.Tags[ajw], err = dc.ReadString()
for wht := range z.Tags {
z.Tags[wht], err = dc.ReadString()
if err != nil {
return
}
Expand All @@ -392,8 +521,8 @@ func (z *IndexTagListResp) EncodeMsg(en *msgp.Writer) (err error) {
if err != nil {
return
}
for ajw := range z.Tags {
err = en.WriteString(z.Tags[ajw])
for wht := range z.Tags {
err = en.WriteString(z.Tags[wht])
if err != nil {
return
}
Expand All @@ -408,8 +537,8 @@ func (z *IndexTagListResp) MarshalMsg(b []byte) (o []byte, err error) {
// string "Tags"
o = append(o, 0x81, 0xa4, 0x54, 0x61, 0x67, 0x73)
o = msgp.AppendArrayHeader(o, uint32(len(z.Tags)))
for ajw := range z.Tags {
o = msgp.AppendString(o, z.Tags[ajw])
for wht := range z.Tags {
o = msgp.AppendString(o, z.Tags[wht])
}
return
}
Expand Down Expand Up @@ -441,8 +570,8 @@ func (z *IndexTagListResp) UnmarshalMsg(bts []byte) (o []byte, err error) {
} else {
z.Tags = make([]string, xsz)
}
for ajw := range z.Tags {
z.Tags[ajw], bts, err = msgp.ReadStringBytes(bts)
for wht := range z.Tags {
z.Tags[wht], bts, err = msgp.ReadStringBytes(bts)
if err != nil {
return
}
Expand All @@ -460,8 +589,8 @@ func (z *IndexTagListResp) UnmarshalMsg(bts []byte) (o []byte, err error) {

func (z *IndexTagListResp) Msgsize() (s int) {
s = 1 + 5 + msgp.ArrayHeaderSize
for ajw := range z.Tags {
s += msgp.StringPrefixSize + len(z.Tags[ajw])
for wht := range z.Tags {
s += msgp.StringPrefixSize + len(z.Tags[wht])
}
return
}
Expand Down Expand Up @@ -497,17 +626,17 @@ func (z *IndexTagResp) DecodeMsg(dc *msgp.Reader) (err error) {
}
for msz > 0 {
msz--
var wht string
var hct uint32
wht, err = dc.ReadString()
var hct string
var cua uint32
hct, err = dc.ReadString()
if err != nil {
return
}
hct, err = dc.ReadUint32()
cua, err = dc.ReadUint32()
if err != nil {
return
}
z.Values[wht] = hct
z.Values[hct] = cua
}
default:
err = dc.Skip()
Expand All @@ -531,12 +660,12 @@ func (z *IndexTagResp) EncodeMsg(en *msgp.Writer) (err error) {
if err != nil {
return
}
for wht, hct := range z.Values {
err = en.WriteString(wht)
for hct, cua := range z.Values {
err = en.WriteString(hct)
if err != nil {
return
}
err = en.WriteUint32(hct)
err = en.WriteUint32(cua)
if err != nil {
return
}
Expand All @@ -551,9 +680,9 @@ func (z *IndexTagResp) MarshalMsg(b []byte) (o []byte, err error) {
// string "Values"
o = append(o, 0x81, 0xa6, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x73)
o = msgp.AppendMapHeader(o, uint32(len(z.Values)))
for wht, hct := range z.Values {
o = msgp.AppendString(o, wht)
o = msgp.AppendUint32(o, hct)
for hct, cua := range z.Values {
o = msgp.AppendString(o, hct)
o = msgp.AppendUint32(o, cua)
}
return
}
Expand Down Expand Up @@ -588,18 +717,18 @@ func (z *IndexTagResp) UnmarshalMsg(bts []byte) (o []byte, err error) {
}
}
for msz > 0 {
var wht string
var hct uint32
var hct string
var cua uint32
msz--
wht, bts, err = msgp.ReadStringBytes(bts)
hct, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
return
}
hct, bts, err = msgp.ReadUint32Bytes(bts)
cua, bts, err = msgp.ReadUint32Bytes(bts)
if err != nil {
return
}
z.Values[wht] = hct
z.Values[hct] = cua
}
default:
bts, err = msgp.Skip(bts)
Expand All @@ -615,9 +744,9 @@ func (z *IndexTagResp) UnmarshalMsg(bts []byte) (o []byte, err error) {
func (z *IndexTagResp) Msgsize() (s int) {
s = 1 + 7 + msgp.MapHeaderSize
if z.Values != nil {
for wht, hct := range z.Values {
_ = hct
s += msgp.StringPrefixSize + len(wht) + msgp.Uint32Size
for hct, cua := range z.Values {
_ = cua
s += msgp.StringPrefixSize + len(hct) + msgp.Uint32Size
}
}
return
Expand Down
Loading

0 comments on commit e30aee4

Please sign in to comment.