Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

WIP Graphitey consolidation step 2 and 3 #557

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
9580fe2
go-carbon: support multiple aggregationMethods per pattern
Dieterbe Feb 20, 2017
2b3e201
go-carbon: support extra fields for retentions
Dieterbe Feb 28, 2017
22a1e6e
go-carbon: upon schema/agg match, also return index
Dieterbe Mar 1, 2017
0d51ea6
go-carbon: expose WhisperAggregationItem and properties
Dieterbe Mar 1, 2017
e1b502f
rework archives & consolidation
Dieterbe Feb 20, 2017
59590d8
fix runtime consolidation and normalization
Dieterbe Mar 3, 2017
b40a1e8
fix tests
Dieterbe Mar 1, 2017
5f553a9
go-carbon: stop the pointer abuse
Dieterbe Mar 1, 2017
b708d8e
add storage-aggregation.conf to our various deployments
Dieterbe Mar 3, 2017
9004262
use /etc/metrictank instead of /etc/raintank config directory
Dieterbe Mar 3, 2017
41de181
don't whine if build directory already exists
Dieterbe Mar 3, 2017
f451b69
be more graphite-like and document it
Dieterbe Mar 3, 2017
c2b2017
update config-to-doc.sh script to publish new config files
Dieterbe Mar 3, 2017
82b256e
better output
Dieterbe Mar 6, 2017
a09ea73
make name a public property
Dieterbe Mar 8, 2017
fa7a302
fix : set aggregation/schema correctly
Dieterbe Mar 8, 2017
6523e39
support custom output formats
Dieterbe Mar 8, 2017
f1a16de
clarify `from` parameter
Dieterbe Mar 8, 2017
d1fc89d
simplify a bit
Dieterbe Mar 8, 2017
345eca5
back schemas and aggs matching with a cache
Dieterbe Mar 9, 2017
60c2ba6
split up input Processing tests to compare same metrics vs new ones
Dieterbe Mar 9, 2017
02d9f42
replace the already-quite-patched lomik libraries with our own version
Dieterbe Mar 9, 2017
be0b16e
hit/miss stats for matchcache
Dieterbe Mar 9, 2017
8ddd548
Revert "hit/miss stats for matchcache"
Dieterbe Mar 9, 2017
14d0c71
add a mode to mt-store-cat to see a full dump
Dieterbe Mar 10, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
.notouch
build/
/metrictank
/cmd/mt-index-cat/mt-index-cat
/cmd/mt-split-metrics-by-ttl/mt-split-metrics-by-ttl
/cmd/mt-update-ttl/mt-update-ttl
/cmd/mt-store-cat/mt-store-cat
18 changes: 9 additions & 9 deletions api/dataprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,22 +316,22 @@ func (s *Server) getTargetsLocal(reqs []models.Req) ([]models.Series, error) {

func (s *Server) getTarget(req models.Req) (points []schema.Point, interval uint32, err error) {
defer doRecover(&err)
readConsolidated := req.Archive != 0 // do we need to read from a downsampled series?
runtimeConsolidation := req.AggNum > 1 // do we need to compress any points at runtime?
readRollup := req.Archive != 0 // do we need to read from a downsampled series?
normalize := req.AggNum > 1 // do we need to compress any points at runtime?

if LogLevel < 2 {
if runtimeConsolidation {
log.Debug("DP getTarget() %s runtimeConsolidation: true. agg factor: %d -> output interval: %d", req, req.AggNum, req.OutInterval)
if normalize {
log.Debug("DP getTarget() %s normalize: true. agg factor: %d -> output interval: %d", req, req.AggNum, req.OutInterval)
} else {
log.Debug("DP getTarget() %s runtimeConsolidation: false. output interval: %d", req, req.OutInterval)
log.Debug("DP getTarget() %s normalize: false. output interval: %d", req, req.OutInterval)
}
}

if !readConsolidated && !runtimeConsolidation {
if !readRollup && !normalize {
return s.getSeriesFixed(req, consolidation.None), req.OutInterval, nil
} else if !readConsolidated && runtimeConsolidation {
} else if !readRollup && normalize {
return consolidate(s.getSeriesFixed(req, consolidation.None), req.AggNum, req.Consolidator), req.OutInterval, nil
} else if readConsolidated && !runtimeConsolidation {
} else if readRollup && !normalize {
if req.Consolidator == consolidation.Avg {
return divide(
s.getSeriesFixed(req, consolidation.Sum),
Expand All @@ -341,7 +341,7 @@ func (s *Server) getTarget(req models.Req) (points []schema.Point, interval uint
return s.getSeriesFixed(req, req.Consolidator), req.OutInterval, nil
}
} else {
// readConsolidated && runtimeConsolidation
// readRollup && normalize
if req.Consolidator == consolidation.Avg {
return divide(
consolidate(s.getSeriesFixed(req, consolidation.Sum), req.AggNum, consolidation.Sum),
Expand Down
75 changes: 48 additions & 27 deletions api/dataprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,21 @@ package api

import (
"fmt"
"math"
"math/rand"
"reflect"
"testing"
"time"

"github.com/raintank/metrictank/api/models"
"github.com/raintank/metrictank/cluster"
"github.com/raintank/metrictank/conf"
"github.com/raintank/metrictank/consolidation"
"github.com/raintank/metrictank/mdata"
"github.com/raintank/metrictank/mdata/cache"
"github.com/raintank/metrictank/mdata/cache/accnt"
"github.com/raintank/metrictank/mdata/chunk"
"gopkg.in/raintank/schema.v1"
"math"
"math/rand"
"reflect"
"testing"
"time"
)

type testCase struct {
Expand Down Expand Up @@ -155,6 +157,20 @@ func TestConsolidationFunctions(t *testing.T) {
{Val: 3, Ts: 1449178151},
{Val: 4, Ts: 1449178161},
},
consolidation.Lst,
2,
[]schema.Point{
{2, 1449178141},
{4, 1449178161},
},
},
{
[]schema.Point{
{1, 1449178131},
{2, 1449178141},
{3, 1449178151},
{4, 1449178161},
},
consolidation.Min,
2,
[]schema.Point{
Expand Down Expand Up @@ -542,20 +558,24 @@ func TestPrevBoundary(t *testing.T) {
}

// TestGetSeriesFixed assures that series data is returned in proper form.
// for each case, we generate a new series of 5 points to cover every possible combination of:
// * every possible data offset (against its quantized version) e.g. offset between 0 and interval-1
// * every possible `from` offset (against its quantized query results) e.g. offset between 0 and interval-1
// * every possible `to` offset (against its quantized query results) e.g. offset between 0 and interval-1
// and asserts that we get the appropriate data back in all possible query (based on to/from) of the raw data

func TestGetSeriesFixed(t *testing.T) {
cluster.Init("default", "test", time.Now(), "http", 6060)
store := mdata.NewDevnullStore()
metrics := mdata.NewAggMetrics(store, &cache.MockCache{}, 600, 10, 0, 0, 0, 0, []mdata.AggSetting{})

mdata.SetSingleAgg(conf.Avg, conf.Min, conf.Max)
mdata.SetSingleSchema(conf.NewRetentionMT(10, 100, 600, 10, true))

metrics := mdata.NewAggMetrics(store, &cache.MockCache{}, 0, 0, 0)
srv, _ := NewServer()
srv.BindBackendStore(store)
srv.BindMemoryStore(metrics)

// the tests below cycles through every possible combination of:
// * every possible data offset (against its quantized version) e.g. offset between 0 and interval-1
// * every possible `from` offset (against its quantized query results) e.g. offset between 0 and interval-1
// * every possible `to` offset (against its quantized query results) e.g. offset between 0 and interval-1
// and asserts that we get the appropriate data back in all possible scenarios.

expected := []schema.Point{
{Val: 20, Ts: 20},
{Val: 30, Ts: 30},
Expand All @@ -566,13 +586,13 @@ func TestGetSeriesFixed(t *testing.T) {
for to := uint32(31); to <= 40; to++ { // should always yield result with last point at 30 (because to is exclusive)
name := fmt.Sprintf("case.data.offset.%d.query:%d-%d", offset, from, to)

metric := metrics.GetOrCreate(name)
metric := metrics.GetOrCreate(name, name, 0, 0)
metric.Add(offset, 10) // this point will always be quantized to 10
metric.Add(10+offset, 20) // this point will always be quantized to 20, so it should be selected
metric.Add(20+offset, 30) // this point will always be quantized to 30, so it should be selected
metric.Add(30+offset, 40) // this point will always be quantized to 40
metric.Add(40+offset, 50) // this point will always be quantized to 50
req := models.NewReq(name, name, from, to, 1000, 10, consolidation.Avg, cluster.Manager.ThisNode())
req := models.NewReq(name, name, from, to, 1000, 10, consolidation.Avg, cluster.Manager.ThisNode(), 0, 0)
req.ArchInterval = 10
points := srv.getSeriesFixed(req, consolidation.None)
if !reflect.DeepEqual(expected, points) {
Expand All @@ -583,14 +603,15 @@ func TestGetSeriesFixed(t *testing.T) {
}
}

func reqRaw(key string, from, to, maxPoints, rawInterval uint32, consolidator consolidation.Consolidator) models.Req {
req := models.NewReq(key, key, from, to, maxPoints, rawInterval, consolidator, cluster.Manager.ThisNode())
func reqRaw(key string, from, to, maxPoints, rawInterval uint32, consolidator consolidation.Consolidator, schemaI, aggI uint16) models.Req {
req := models.NewReq(key, key, from, to, maxPoints, rawInterval, consolidator, cluster.Manager.ThisNode(), schemaI, aggI)
return req
}
func reqOut(key string, from, to, maxPoints, rawInterval uint32, consolidator consolidation.Consolidator, archive int, archInterval, outInterval, aggNum uint32) models.Req {
req := models.NewReq(key, key, from, to, maxPoints, rawInterval, consolidator, cluster.Manager.ThisNode())
func reqOut(key string, from, to, maxPoints, rawInterval uint32, consolidator consolidation.Consolidator, schemaI, aggI uint16, archive int, archInterval, ttl, outInterval, aggNum uint32) models.Req {
req := models.NewReq(key, key, from, to, maxPoints, rawInterval, consolidator, cluster.Manager.ThisNode(), schemaI, aggI)
req.Archive = archive
req.ArchInterval = archInterval
req.TTL = ttl
req.OutInterval = outInterval
req.AggNum = aggNum
return req
Expand Down Expand Up @@ -686,7 +707,7 @@ func TestMergeSeries(t *testing.T) {
func TestRequestContextWithoutConsolidator(t *testing.T) {
metric := "metric1"
archInterval := uint32(10)
req := reqRaw(metric, 44, 88, 100, 10, consolidation.None)
req := reqRaw(metric, 44, 88, 100, 10, consolidation.None, 0, 0)
req.ArchInterval = archInterval
ctx := newRequestContext(&req, consolidation.None)

Expand All @@ -711,7 +732,7 @@ func TestRequestContextWithConsolidator(t *testing.T) {
archInterval := uint32(10)
from := uint32(44)
to := uint32(88)
req := reqRaw(metric, from, to, 100, 10, consolidation.Sum)
req := reqRaw(metric, from, to, 100, 10, consolidation.Sum, 0, 0)
req.ArchInterval = archInterval
ctx := newRequestContext(&req, consolidation.Sum)

Expand Down Expand Up @@ -778,7 +799,8 @@ func TestGetSeriesCachedStore(t *testing.T) {
srv, _ := NewServer()
store := mdata.NewMockStore()
srv.BindBackendStore(store)
metrics := mdata.NewAggMetrics(store, &cache.MockCache{}, 1, 1, 0, 0, 0, 0, []mdata.AggSetting{})

metrics := mdata.NewAggMetrics(store, &cache.MockCache{}, 0, 0, 0)
srv.BindMemoryStore(metrics)
metric := "metric1"
var c *cache.CCache
Expand Down Expand Up @@ -837,7 +859,7 @@ func TestGetSeriesCachedStore(t *testing.T) {
}

// create a request for the current range
req := reqRaw(metric, from, to, span, 1, consolidation.None)
req := reqRaw(metric, from, to, span, 1, consolidation.None, 0, 0)
req.ArchInterval = 1
ctx := newRequestContext(&req, consolidation.None)
iters := srv.getSeriesCachedStore(ctx, to)
Expand Down Expand Up @@ -952,21 +974,20 @@ func TestGetSeriesCachedStore(t *testing.T) {
func TestGetSeriesAggMetrics(t *testing.T) {
cluster.Init("default", "test", time.Now(), "http", 6060)
store := mdata.NewMockStore()
chunkSpan := uint32(600)
numChunks := uint32(10)
metrics := mdata.NewAggMetrics(store, &cache.MockCache{}, chunkSpan, numChunks, 0, 0, 0, 0, []mdata.AggSetting{})

metrics := mdata.NewAggMetrics(store, &cache.MockCache{}, 0, 0, 0)
srv, _ := NewServer()
srv.BindBackendStore(store)
srv.BindMemoryStore(metrics)
from := uint32(1744)
to := uint32(1888)
metricKey := "metric1"
archInterval := uint32(10)
req := reqRaw(metricKey, from, to, 100, 10, consolidation.None)
req := reqRaw(metricKey, from, to, 100, 10, consolidation.None, 0, 0)
req.ArchInterval = archInterval
ctx := newRequestContext(&req, consolidation.None)

metric := metrics.GetOrCreate(metricKey)
metric := metrics.GetOrCreate(metricKey, metricKey, 0, 0)
for i := uint32(50); i < 3000; i++ {
metric.Add(i, float64(i^2))
}
Expand Down
27 changes: 13 additions & 14 deletions api/graphite.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/raintank/metrictank/cluster"
"github.com/raintank/metrictank/consolidation"
"github.com/raintank/metrictank/idx"
"github.com/raintank/metrictank/mdata"
"github.com/raintank/metrictank/stats"
"github.com/raintank/worldping-api/pkg/log"
"gopkg.in/raintank/schema.v1"
Expand Down Expand Up @@ -209,25 +210,24 @@ func (s *Server) renderMetrics(ctx *middleware.Context, request models.GraphiteR

reqs := make([]models.Req, 0)

// consolidatorForPattern[<pattern>]<consolidateBy>
consolidatorForPattern := make(map[string]string)
patterns := make([]string, 0)
type locatedDef struct {
def schema.MetricDefinition
node cluster.Node
def schema.MetricDefinition
node cluster.Node
SchemaI uint16
AggI uint16
}

//locatedDefs[<pattern>][<def.id>]locatedDef
locatedDefs := make(map[string]map[string]locatedDef)
//targetForPattern[<pattern>]<target>
targetForPattern := make(map[string]string)
for _, target := range targets {
pattern, consolidateBy, err := parseTarget(target)
pattern, _, err := parseTarget(target)
if err != nil {
ctx.Error(http.StatusBadRequest, err.Error())
return
}
consolidatorForPattern[pattern] = consolidateBy
patterns = append(patterns, pattern)
targetForPattern[pattern] = target
locatedDefs[pattern] = make(map[string]locatedDef)
Expand All @@ -245,25 +245,24 @@ func (s *Server) renderMetrics(ctx *middleware.Context, request models.GraphiteR
continue
}
for _, def := range metric.Defs {
locatedDefs[s.Pattern][def.Id] = locatedDef{def, s.Node}
locatedDefs[s.Pattern][def.Id] = locatedDef{def, s.Node, metric.SchemaI, metric.AggI}
}
}
}

for pattern, ldefs := range locatedDefs {
for _, locdef := range ldefs {
def := locdef.def
consolidator, err := consolidation.GetConsolidator(&def, consolidatorForPattern[pattern])
if err != nil {
response.Write(ctx, response.NewError(http.StatusBadRequest, err.Error()))
return
}
// set consolidator that will be used to normalize raw data before feeding into processing functions
// not to be confused with runtime consolidation which happens in the graphite api, after all processing.
fn := mdata.Aggregations.Get(locdef.AggI).AggregationMethod[0]
consolidator := consolidation.Consolidator(fn) // we use the same number assignments so we can cast them
// target is like foo.bar or foo.* or consolidateBy(foo.*,'sum')
// pattern is like foo.bar or foo.*
// def.Name is like foo.concretebar
// so we want target to contain the concrete graphite name, potentially wrapped with consolidateBy().
target := strings.Replace(targetForPattern[pattern], pattern, def.Name, -1)
reqs = append(reqs, models.NewReq(def.Id, target, fromUnix, toUnix, request.MaxDataPoints, uint32(def.Interval), consolidator, locdef.node))
reqs = append(reqs, models.NewReq(def.Id, target, fromUnix, toUnix, request.MaxDataPoints, uint32(def.Interval), consolidator, locdef.node, locdef.SchemaI, locdef.AggI))
}
}

Expand All @@ -279,7 +278,7 @@ func (s *Server) renderMetrics(ctx *middleware.Context, request models.GraphiteR
ctx.Req.Method, from, to, len(request.Targets), request.MaxDataPoints)
}

reqs, err = alignRequests(uint32(time.Now().Unix()), reqs, s.MemoryStore.AggSettings())
reqs, err = alignRequests(uint32(time.Now().Unix()), reqs)
if err != nil {
log.Error(3, "HTTP Render alignReq error: %s", err)
response.Write(ctx, response.WrapError(err))
Expand Down
14 changes: 9 additions & 5 deletions api/models/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ type Req struct {
From uint32 `json:"from"`
To uint32 `json:"to"`
MaxPoints uint32 `json:"maxPoints"`
RawInterval uint32 `json:"rawInterval"` // the interval of the raw metric before any consolidation
Consolidator consolidation.Consolidator `json:"consolidator"`
RawInterval uint32 `json:"rawInterval"` // the interval of the raw metric before any consolidation
Consolidator consolidation.Consolidator `json:"consolidator"` // consolidation method for rollup archive and normalization. (not runtime consolidation)
Node cluster.Node `json:"-"`
SchemaI uint16 `json:"schemaI"`
AggI uint16 `json:"aggI"`

// these fields need some more coordination and are typically set later
Archive int `json:"archive"` // 0 means original data, 1 means first agg level, 2 means 2nd, etc.
Expand All @@ -26,7 +28,7 @@ type Req struct {
AggNum uint32 `json:"aggNum"` // how many points to consolidate together at runtime, after fetching from the archive
}

func NewReq(key, target string, from, to, maxPoints, rawInterval uint32, consolidator consolidation.Consolidator, node cluster.Node) Req {
func NewReq(key, target string, from, to, maxPoints, rawInterval uint32, consolidator consolidation.Consolidator, node cluster.Node, schemaI, aggI uint16) Req {
return Req{
key,
target,
Expand All @@ -36,6 +38,8 @@ func NewReq(key, target string, from, to, maxPoints, rawInterval uint32, consoli
rawInterval,
consolidator,
node,
schemaI,
aggI,
-1, // this is supposed to be updated still!
0, // this is supposed to be updated still
0, // this is supposed to be updated still
Expand All @@ -49,6 +53,6 @@ func (r Req) String() string {
}

func (r Req) DebugString() string {
return fmt.Sprintf("%s %d - %d . points <= %d. %s - archive %d, rawInt %d, archInt %d, outInt %d, aggNum %d",
r.Key, r.From, r.To, r.MaxPoints, r.Consolidator, r.Archive, r.RawInterval, r.ArchInterval, r.OutInterval, r.AggNum)
return fmt.Sprintf("Req key=%s %d - %d maxPoints=%d rawInt=%d cons=%s schemaI=%d aggI=%d archive=%d archInt=%d ttl=%d outInt=%d aggNum=%d",
r.Key, r.From, r.To, r.MaxPoints, r.RawInterval, r.Consolidator, r.SchemaI, r.AggI, r.Archive, r.ArchInterval, r.TTL, r.OutInterval, r.AggNum)
}
Loading