Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

fix consolidateBy across clustered requests #707

Merged
merged 3 commits into from
Aug 8, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions api/models/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,6 @@ func (r Req) String() string {
}

func (r Req) DebugString() string {
return fmt.Sprintf("Req key=%s %d - %d (%s - %s) (span %d) maxPoints=%d rawInt=%d cons=%s schemaId=%d aggId=%d archive=%d archInt=%d ttl=%d outInt=%d aggNum=%d",
r.Key, r.From, r.To, util.TS(r.From), util.TS(r.To), r.To-r.From-1, r.MaxPoints, r.RawInterval, r.Consolidator, r.SchemaId, r.AggId, r.Archive, r.ArchInterval, r.TTL, r.OutInterval, r.AggNum)
return fmt.Sprintf("Req key=%q target=%q pattern=%q %d - %d (%s - %s) (span %d) maxPoints=%d rawInt=%d cons=%s consReq=%d schemaId=%d aggId=%d archive=%d archInt=%d ttl=%d outInt=%d aggNum=%d",
r.Key, r.Target, r.Pattern, r.From, r.To, util.TS(r.From), util.TS(r.To), r.To-r.From-1, r.MaxPoints, r.RawInterval, r.Consolidator, r.ConsReq, r.SchemaId, r.AggId, r.Archive, r.ArchInterval, r.TTL, r.OutInterval, r.AggNum)
}
60 changes: 55 additions & 5 deletions api/models/series_gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,16 @@ func (z *Series) DecodeMsg(dc *msgp.Reader) (err error) {
if err != nil {
return
}
case "QueryCons":
err = z.QueryCons.DecodeMsg(dc)
if err != nil {
return
}
case "Consolidator":
err = z.Consolidator.DecodeMsg(dc)
if err != nil {
return
}
default:
err = dc.Skip()
if err != nil {
Expand All @@ -79,9 +89,9 @@ func (z *Series) DecodeMsg(dc *msgp.Reader) (err error) {

// EncodeMsg implements msgp.Encodable
func (z *Series) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 6
// map header, size 8
// write "Target"
err = en.Append(0x86, 0xa6, 0x54, 0x61, 0x72, 0x67, 0x65, 0x74)
err = en.Append(0x88, 0xa6, 0x54, 0x61, 0x72, 0x67, 0x65, 0x74)
if err != nil {
return err
}
Expand Down Expand Up @@ -140,15 +150,33 @@ func (z *Series) EncodeMsg(en *msgp.Writer) (err error) {
if err != nil {
return
}
// write "QueryCons"
err = en.Append(0xa9, 0x51, 0x75, 0x65, 0x72, 0x79, 0x43, 0x6f, 0x6e, 0x73)
if err != nil {
return err
}
err = z.QueryCons.EncodeMsg(en)
if err != nil {
return
}
// write "Consolidator"
err = en.Append(0xac, 0x43, 0x6f, 0x6e, 0x73, 0x6f, 0x6c, 0x69, 0x64, 0x61, 0x74, 0x6f, 0x72)
if err != nil {
return err
}
err = z.Consolidator.EncodeMsg(en)
if err != nil {
return
}
return
}

// MarshalMsg implements msgp.Marshaler
func (z *Series) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 6
// map header, size 8
// string "Target"
o = append(o, 0x86, 0xa6, 0x54, 0x61, 0x72, 0x67, 0x65, 0x74)
o = append(o, 0x88, 0xa6, 0x54, 0x61, 0x72, 0x67, 0x65, 0x74)
o = msgp.AppendString(o, z.Target)
// string "Datapoints"
o = append(o, 0xaa, 0x44, 0x61, 0x74, 0x61, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x73)
Expand All @@ -171,6 +199,18 @@ func (z *Series) MarshalMsg(b []byte) (o []byte, err error) {
// string "QueryTo"
o = append(o, 0xa7, 0x51, 0x75, 0x65, 0x72, 0x79, 0x54, 0x6f)
o = msgp.AppendUint32(o, z.QueryTo)
// string "QueryCons"
o = append(o, 0xa9, 0x51, 0x75, 0x65, 0x72, 0x79, 0x43, 0x6f, 0x6e, 0x73)
o, err = z.QueryCons.MarshalMsg(o)
if err != nil {
return
}
// string "Consolidator"
o = append(o, 0xac, 0x43, 0x6f, 0x6e, 0x73, 0x6f, 0x6c, 0x69, 0x64, 0x61, 0x74, 0x6f, 0x72)
o, err = z.Consolidator.MarshalMsg(o)
if err != nil {
return
}
return
}

Expand Down Expand Up @@ -232,6 +272,16 @@ func (z *Series) UnmarshalMsg(bts []byte) (o []byte, err error) {
if err != nil {
return
}
case "QueryCons":
bts, err = z.QueryCons.UnmarshalMsg(bts)
if err != nil {
return
}
case "Consolidator":
bts, err = z.Consolidator.UnmarshalMsg(bts)
if err != nil {
return
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
Expand All @@ -249,7 +299,7 @@ func (z *Series) Msgsize() (s int) {
for zxvk := range z.Datapoints {
s += z.Datapoints[zxvk].Msgsize()
}
s += 9 + msgp.Uint32Size + 10 + msgp.StringPrefixSize + len(z.QueryPatt) + 10 + msgp.Uint32Size + 8 + msgp.Uint32Size
s += 9 + msgp.Uint32Size + 10 + msgp.StringPrefixSize + len(z.QueryPatt) + 10 + msgp.Uint32Size + 8 + msgp.Uint32Size + 10 + z.QueryCons.Msgsize() + 13 + z.Consolidator.Msgsize()
return
}

Expand Down
1 change: 1 addition & 0 deletions consolidation/consolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

// consolidator is a highlevel description of a point consolidation method
// mostly for use by the http api, but can also be used internally for data processing
//go:generate msgp
type Consolidator int

var errUnknownConsolidationFunction = errors.New("unknown consolidation function")
Expand Down
58 changes: 58 additions & 0 deletions consolidation/consolidation_gen.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package consolidation

// NOTE: THIS FILE WAS PRODUCED BY THE
// MSGP CODE GENERATION TOOL (github.com/tinylib/msgp)
// DO NOT EDIT

import (
"github.com/tinylib/msgp/msgp"
)

// DecodeMsg implements msgp.Decodable
func (z *Consolidator) DecodeMsg(dc *msgp.Reader) (err error) {
{
var zxvk int
zxvk, err = dc.ReadInt()
(*z) = Consolidator(zxvk)
}
if err != nil {
return
}
return
}

// EncodeMsg implements msgp.Encodable
func (z Consolidator) EncodeMsg(en *msgp.Writer) (err error) {
err = en.WriteInt(int(z))
if err != nil {
return
}
return
}

// MarshalMsg implements msgp.Marshaler
func (z Consolidator) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
o = msgp.AppendInt(o, int(z))
return
}

// UnmarshalMsg implements msgp.Unmarshaler
func (z *Consolidator) UnmarshalMsg(bts []byte) (o []byte, err error) {
{
var zbzg int
zbzg, bts, err = msgp.ReadIntBytes(bts)
(*z) = Consolidator(zbzg)
}
if err != nil {
return
}
o = bts
return
}

// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z Consolidator) Msgsize() (s int) {
s = msgp.IntSize
return
}
5 changes: 5 additions & 0 deletions consolidation/consolidation_gen_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package consolidation

// NOTE: THIS FILE WAS PRODUCED BY THE
// MSGP CODE GENERATION TOOL (github.com/tinylib/msgp)
// DO NOT EDIT
2 changes: 1 addition & 1 deletion docker/docker-cluster/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ services:
environment:
GRAPHITE_CLUSTER_SERVERS: metrictank0:6060
GRAPHITE_STATSD_HOST: statsdaemon
SINGLE_TENANT: yes
SINGLE_TENANT: "true"
WSGI_PROCESSES: 4
WSGI_THREADS: 25

Expand Down