Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Graphitey consolidation step and 3 (redo) #570

Merged
merged 29 commits into from
Mar 20, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
b2d7fbf
go-carbon: support multiple aggregationMethods per pattern
Dieterbe Feb 20, 2017
eca8031
go-carbon: support extra fields for retentions
Dieterbe Feb 28, 2017
8d14264
go-carbon: upon schema/agg match, also return index
Dieterbe Mar 1, 2017
dcf7116
go-carbon: expose WhisperAggregationItem and properties
Dieterbe Mar 1, 2017
8f800f3
rework archives & consolidation
Dieterbe Feb 20, 2017
49214b5
fix runtime consolidation and normalization
Dieterbe Mar 3, 2017
99e686b
fix tests
Dieterbe Mar 1, 2017
59e70b0
go-carbon: stop the pointer abuse
Dieterbe Mar 1, 2017
6e92dc4
add storage-aggregation.conf to our various deployments
Dieterbe Mar 3, 2017
b36e4ee
use /etc/metrictank instead of /etc/raintank config directory
Dieterbe Mar 3, 2017
d451b5d
don't whine if build directory already exists
Dieterbe Mar 3, 2017
6e364f9
be more graphite-like and document it
Dieterbe Mar 3, 2017
036e099
update config-to-doc.sh script to publish new config files
Dieterbe Mar 3, 2017
0613804
better output
Dieterbe Mar 6, 2017
eaf10ad
make name a public property
Dieterbe Mar 8, 2017
839ff60
fix : set aggregation/schema correctly
Dieterbe Mar 8, 2017
c10137a
support custom output formats
Dieterbe Mar 8, 2017
a3ea153
clarify `from` parameter
Dieterbe Mar 8, 2017
164a9c3
simplify a bit
Dieterbe Mar 8, 2017
7cd7aea
back schemas and aggs matching with a cache
Dieterbe Mar 9, 2017
7475789
split up input Processing tests to compare same metrics vs new ones
Dieterbe Mar 9, 2017
800b430
replace the already-quite-patched lomik libraries with our own version
Dieterbe Mar 9, 2017
b61de1b
hit/miss stats for matchcache
Dieterbe Mar 9, 2017
22bc4ca
Revert "hit/miss stats for matchcache"
Dieterbe Mar 9, 2017
fb2f6b1
add a mode to mt-store-cat to see a full dump
Dieterbe Mar 10, 2017
f912baa
bugfix: make sure MemoryIdx also updates Partition
Dieterbe Mar 16, 2017
1673644
fixups based on woodsaj's feedback and stuff
Dieterbe Mar 15, 2017
28cc30b
carbon-in: instead of matching every point, use idx.Find
Dieterbe Mar 17, 2017
e6db43a
add a quick path to get the archives under a path
Dieterbe Mar 17, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions api/dataprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,22 +316,22 @@ func (s *Server) getTargetsLocal(reqs []models.Req) ([]models.Series, error) {

func (s *Server) getTarget(req models.Req) (points []schema.Point, interval uint32, err error) {
defer doRecover(&err)
readConsolidated := req.Archive != 0 // do we need to read from a downsampled series?
runtimeConsolidation := req.AggNum > 1 // do we need to compress any points at runtime?
readRollup := req.Archive != 0 // do we need to read from a downsampled series?
normalize := req.AggNum > 1 // do we need to compress any points at runtime?

if LogLevel < 2 {
if runtimeConsolidation {
log.Debug("DP getTarget() %s runtimeConsolidation: true. agg factor: %d -> output interval: %d", req, req.AggNum, req.OutInterval)
if normalize {
log.Debug("DP getTarget() %s normalize: true. agg factor: %d -> output interval: %d", req, req.AggNum, req.OutInterval)
} else {
log.Debug("DP getTarget() %s runtimeConsolidation: false. output interval: %d", req, req.OutInterval)
log.Debug("DP getTarget() %s normalize: false. output interval: %d", req, req.OutInterval)
}
}

if !readConsolidated && !runtimeConsolidation {
if !readRollup && !normalize {
return s.getSeriesFixed(req, consolidation.None), req.OutInterval, nil
} else if !readConsolidated && runtimeConsolidation {
} else if !readRollup && normalize {
return consolidate(s.getSeriesFixed(req, consolidation.None), req.AggNum, req.Consolidator), req.OutInterval, nil
} else if readConsolidated && !runtimeConsolidation {
} else if readRollup && !normalize {
if req.Consolidator == consolidation.Avg {
return divide(
s.getSeriesFixed(req, consolidation.Sum),
Expand All @@ -341,7 +341,7 @@ func (s *Server) getTarget(req models.Req) (points []schema.Point, interval uint
return s.getSeriesFixed(req, req.Consolidator), req.OutInterval, nil
}
} else {
// readConsolidated && runtimeConsolidation
// readRollup && normalize
if req.Consolidator == consolidation.Avg {
return divide(
consolidate(s.getSeriesFixed(req, consolidation.Sum), req.AggNum, consolidation.Sum),
Expand Down
75 changes: 48 additions & 27 deletions api/dataprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,21 @@ package api

import (
"fmt"
"math"
"math/rand"
"reflect"
"testing"
"time"

"github.com/raintank/metrictank/api/models"
"github.com/raintank/metrictank/cluster"
"github.com/raintank/metrictank/conf"
"github.com/raintank/metrictank/consolidation"
"github.com/raintank/metrictank/mdata"
"github.com/raintank/metrictank/mdata/cache"
"github.com/raintank/metrictank/mdata/cache/accnt"
"github.com/raintank/metrictank/mdata/chunk"
"gopkg.in/raintank/schema.v1"
"math"
"math/rand"
"reflect"
"testing"
"time"
)

type testCase struct {
Expand Down Expand Up @@ -155,6 +157,20 @@ func TestConsolidationFunctions(t *testing.T) {
{Val: 3, Ts: 1449178151},
{Val: 4, Ts: 1449178161},
},
consolidation.Lst,
2,
[]schema.Point{
{2, 1449178141},
{4, 1449178161},
},
},
{
[]schema.Point{
{1, 1449178131},
{2, 1449178141},
{3, 1449178151},
{4, 1449178161},
},
consolidation.Min,
2,
[]schema.Point{
Expand Down Expand Up @@ -542,20 +558,24 @@ func TestPrevBoundary(t *testing.T) {
}

// TestGetSeriesFixed assures that series data is returned in proper form.
// for each case, we generate a new series of 5 points to cover every possible combination of:
// * every possible data offset (against its quantized version) e.g. offset between 0 and interval-1
// * every possible `from` offset (against its quantized query results) e.g. offset between 0 and interval-1
// * every possible `to` offset (against its quantized query results) e.g. offset between 0 and interval-1
// and asserts that we get the appropriate data back in all possible query (based on to/from) of the raw data

func TestGetSeriesFixed(t *testing.T) {
cluster.Init("default", "test", time.Now(), "http", 6060)
store := mdata.NewDevnullStore()
metrics := mdata.NewAggMetrics(store, &cache.MockCache{}, 600, 10, 0, 0, 0, 0, []mdata.AggSetting{})

mdata.SetSingleAgg(conf.Avg, conf.Min, conf.Max)
mdata.SetSingleSchema(conf.NewRetentionMT(10, 100, 600, 10, true))

metrics := mdata.NewAggMetrics(store, &cache.MockCache{}, 0, 0, 0)
srv, _ := NewServer()
srv.BindBackendStore(store)
srv.BindMemoryStore(metrics)

// the tests below cycles through every possible combination of:
// * every possible data offset (against its quantized version) e.g. offset between 0 and interval-1
// * every possible `from` offset (against its quantized query results) e.g. offset between 0 and interval-1
// * every possible `to` offset (against its quantized query results) e.g. offset between 0 and interval-1
// and asserts that we get the appropriate data back in all possible scenarios.

expected := []schema.Point{
{Val: 20, Ts: 20},
{Val: 30, Ts: 30},
Expand All @@ -566,13 +586,13 @@ func TestGetSeriesFixed(t *testing.T) {
for to := uint32(31); to <= 40; to++ { // should always yield result with last point at 30 (because to is exclusive)
name := fmt.Sprintf("case.data.offset.%d.query:%d-%d", offset, from, to)

metric := metrics.GetOrCreate(name)
metric := metrics.GetOrCreate(name, name, 0, 0)
metric.Add(offset, 10) // this point will always be quantized to 10
metric.Add(10+offset, 20) // this point will always be quantized to 20, so it should be selected
metric.Add(20+offset, 30) // this point will always be quantized to 30, so it should be selected
metric.Add(30+offset, 40) // this point will always be quantized to 40
metric.Add(40+offset, 50) // this point will always be quantized to 50
req := models.NewReq(name, name, from, to, 1000, 10, consolidation.Avg, cluster.Manager.ThisNode())
req := models.NewReq(name, name, from, to, 1000, 10, consolidation.Avg, cluster.Manager.ThisNode(), 0, 0)
req.ArchInterval = 10
points := srv.getSeriesFixed(req, consolidation.None)
if !reflect.DeepEqual(expected, points) {
Expand All @@ -583,14 +603,15 @@ func TestGetSeriesFixed(t *testing.T) {
}
}

func reqRaw(key string, from, to, maxPoints, rawInterval uint32, consolidator consolidation.Consolidator) models.Req {
req := models.NewReq(key, key, from, to, maxPoints, rawInterval, consolidator, cluster.Manager.ThisNode())
func reqRaw(key string, from, to, maxPoints, rawInterval uint32, consolidator consolidation.Consolidator, schemaId, aggId uint16) models.Req {
req := models.NewReq(key, key, from, to, maxPoints, rawInterval, consolidator, cluster.Manager.ThisNode(), schemaId, aggId)
return req
}
func reqOut(key string, from, to, maxPoints, rawInterval uint32, consolidator consolidation.Consolidator, archive int, archInterval, outInterval, aggNum uint32) models.Req {
req := models.NewReq(key, key, from, to, maxPoints, rawInterval, consolidator, cluster.Manager.ThisNode())
func reqOut(key string, from, to, maxPoints, rawInterval uint32, consolidator consolidation.Consolidator, schemaId, aggId uint16, archive int, archInterval, ttl, outInterval, aggNum uint32) models.Req {
req := models.NewReq(key, key, from, to, maxPoints, rawInterval, consolidator, cluster.Manager.ThisNode(), schemaId, aggId)
req.Archive = archive
req.ArchInterval = archInterval
req.TTL = ttl
req.OutInterval = outInterval
req.AggNum = aggNum
return req
Expand Down Expand Up @@ -686,7 +707,7 @@ func TestMergeSeries(t *testing.T) {
func TestRequestContextWithoutConsolidator(t *testing.T) {
metric := "metric1"
archInterval := uint32(10)
req := reqRaw(metric, 44, 88, 100, 10, consolidation.None)
req := reqRaw(metric, 44, 88, 100, 10, consolidation.None, 0, 0)
req.ArchInterval = archInterval
ctx := newRequestContext(&req, consolidation.None)

Expand All @@ -711,7 +732,7 @@ func TestRequestContextWithConsolidator(t *testing.T) {
archInterval := uint32(10)
from := uint32(44)
to := uint32(88)
req := reqRaw(metric, from, to, 100, 10, consolidation.Sum)
req := reqRaw(metric, from, to, 100, 10, consolidation.Sum, 0, 0)
req.ArchInterval = archInterval
ctx := newRequestContext(&req, consolidation.Sum)

Expand Down Expand Up @@ -778,7 +799,8 @@ func TestGetSeriesCachedStore(t *testing.T) {
srv, _ := NewServer()
store := mdata.NewMockStore()
srv.BindBackendStore(store)
metrics := mdata.NewAggMetrics(store, &cache.MockCache{}, 1, 1, 0, 0, 0, 0, []mdata.AggSetting{})

metrics := mdata.NewAggMetrics(store, &cache.MockCache{}, 0, 0, 0)
srv.BindMemoryStore(metrics)
metric := "metric1"
var c *cache.CCache
Expand Down Expand Up @@ -837,7 +859,7 @@ func TestGetSeriesCachedStore(t *testing.T) {
}

// create a request for the current range
req := reqRaw(metric, from, to, span, 1, consolidation.None)
req := reqRaw(metric, from, to, span, 1, consolidation.None, 0, 0)
req.ArchInterval = 1
ctx := newRequestContext(&req, consolidation.None)
iters := srv.getSeriesCachedStore(ctx, to)
Expand Down Expand Up @@ -952,21 +974,20 @@ func TestGetSeriesCachedStore(t *testing.T) {
func TestGetSeriesAggMetrics(t *testing.T) {
cluster.Init("default", "test", time.Now(), "http", 6060)
store := mdata.NewMockStore()
chunkSpan := uint32(600)
numChunks := uint32(10)
metrics := mdata.NewAggMetrics(store, &cache.MockCache{}, chunkSpan, numChunks, 0, 0, 0, 0, []mdata.AggSetting{})

metrics := mdata.NewAggMetrics(store, &cache.MockCache{}, 0, 0, 0)
srv, _ := NewServer()
srv.BindBackendStore(store)
srv.BindMemoryStore(metrics)
from := uint32(1744)
to := uint32(1888)
metricKey := "metric1"
archInterval := uint32(10)
req := reqRaw(metricKey, from, to, 100, 10, consolidation.None)
req := reqRaw(metricKey, from, to, 100, 10, consolidation.None, 0, 0)
req.ArchInterval = archInterval
ctx := newRequestContext(&req, consolidation.None)

metric := metrics.GetOrCreate(metricKey)
metric := metrics.GetOrCreate(metricKey, metricKey, 0, 0)
for i := uint32(50); i < 3000; i++ {
metric.Add(i, float64(i^2))
}
Expand Down
36 changes: 16 additions & 20 deletions api/graphite.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ import (
"github.com/raintank/metrictank/cluster"
"github.com/raintank/metrictank/consolidation"
"github.com/raintank/metrictank/idx"
"github.com/raintank/metrictank/mdata"
"github.com/raintank/metrictank/stats"
"github.com/raintank/worldping-api/pkg/log"
"gopkg.in/raintank/schema.v1"
)

var MissingOrgHeaderErr = errors.New("orgId not set in headers")
Expand Down Expand Up @@ -209,11 +209,9 @@ func (s *Server) renderMetrics(ctx *middleware.Context, request models.GraphiteR

reqs := make([]models.Req, 0)

// consolidatorForPattern[<pattern>]<consolidateBy>
consolidatorForPattern := make(map[string]string)
patterns := make([]string, 0)
type locatedDef struct {
def schema.MetricDefinition
def idx.Archive
node cluster.Node
}

Expand All @@ -222,12 +220,11 @@ func (s *Server) renderMetrics(ctx *middleware.Context, request models.GraphiteR
//targetForPattern[<pattern>]<target>
targetForPattern := make(map[string]string)
for _, target := range targets {
pattern, consolidateBy, err := parseTarget(target)
pattern, _, err := parseTarget(target)
if err != nil {
ctx.Error(http.StatusBadRequest, err.Error())
return
}
consolidatorForPattern[pattern] = consolidateBy
patterns = append(patterns, pattern)
targetForPattern[pattern] = target
locatedDefs[pattern] = make(map[string]locatedDef)
Expand All @@ -252,18 +249,17 @@ func (s *Server) renderMetrics(ctx *middleware.Context, request models.GraphiteR

for pattern, ldefs := range locatedDefs {
for _, locdef := range ldefs {
def := locdef.def
consolidator, err := consolidation.GetConsolidator(&def, consolidatorForPattern[pattern])
if err != nil {
response.Write(ctx, response.NewError(http.StatusBadRequest, err.Error()))
return
}
archive := locdef.def
// set consolidator that will be used to normalize raw data before feeding into processing functions
// not to be confused with runtime consolidation which happens in the graphite api, after all processing.
fn := mdata.Aggregations.Get(archive.AggId).AggregationMethod[0]
consolidator := consolidation.Consolidator(fn) // we use the same number assignments so we can cast them
// target is like foo.bar or foo.* or consolidateBy(foo.*,'sum')
// pattern is like foo.bar or foo.*
// def.Name is like foo.concretebar
// so we want target to contain the concrete graphite name, potentially wrapped with consolidateBy().
target := strings.Replace(targetForPattern[pattern], pattern, def.Name, -1)
reqs = append(reqs, models.NewReq(def.Id, target, fromUnix, toUnix, request.MaxDataPoints, uint32(def.Interval), consolidator, locdef.node))
target := strings.Replace(targetForPattern[pattern], pattern, archive.Name, -1)
reqs = append(reqs, models.NewReq(archive.Id, target, fromUnix, toUnix, request.MaxDataPoints, uint32(archive.Interval), consolidator, locdef.node, archive.SchemaId, archive.AggId))
}
}

Expand All @@ -279,7 +275,7 @@ func (s *Server) renderMetrics(ctx *middleware.Context, request models.GraphiteR
ctx.Req.Method, from, to, len(request.Targets), request.MaxDataPoints)
}

reqs, err = alignRequests(uint32(time.Now().Unix()), reqs, s.MemoryStore.AggSettings())
reqs, err = alignRequests(uint32(time.Now().Unix()), reqs)
if err != nil {
log.Error(3, "HTTP Render alignReq error: %s", err)
response.Write(ctx, response.WrapError(err))
Expand Down Expand Up @@ -351,20 +347,20 @@ func (s *Server) metricsFind(ctx *middleware.Context, request models.GraphiteFin
response.Write(ctx, response.NewJson(200, b, request.Jsonp))
}

func (s *Server) listLocal(orgId int) []schema.MetricDefinition {
func (s *Server) listLocal(orgId int) []idx.Archive {
return s.MetricIndex.List(orgId)
}

func (s *Server) listRemote(orgId int, peer cluster.Node) ([]schema.MetricDefinition, error) {
func (s *Server) listRemote(orgId int, peer cluster.Node) ([]idx.Archive, error) {
log.Debug("HTTP IndexJson() querying %s/index/list for %d", peer.Name, orgId)
buf, err := peer.Post("/index/list", models.IndexList{OrgId: orgId})
if err != nil {
log.Error(4, "HTTP IndexJson() error querying %s/index/list: %q", peer.Name, err)
return nil, err
}
result := make([]schema.MetricDefinition, 0)
result := make([]idx.Archive, 0)
for len(buf) != 0 {
var def schema.MetricDefinition
var def idx.Archive
buf, err = def.UnmarshalMsg(buf)
if err != nil {
log.Error(3, "HTTP IndexJson() error unmarshaling body from %s/index/list: %q", peer.Name, err)
Expand All @@ -378,7 +374,7 @@ func (s *Server) listRemote(orgId int, peer cluster.Node) ([]schema.MetricDefini
func (s *Server) metricsIndex(ctx *middleware.Context) {
peers := cluster.MembersForQuery()
errors := make([]error, 0)
series := make([]schema.MetricDefinition, 0)
series := make([]idx.Archive, 0)
seenDefs := make(map[string]struct{})
var mu sync.Mutex
var wg sync.WaitGroup
Expand Down
Loading