diff --git a/api/dataprocessor.go b/api/dataprocessor.go index 586982b2d0..9939938141 100644 --- a/api/dataprocessor.go +++ b/api/dataprocessor.go @@ -10,6 +10,7 @@ import ( "github.com/raintank/metrictank/api/models" "github.com/raintank/metrictank/consolidation" + "github.com/raintank/metrictank/mdata" "github.com/raintank/metrictank/mdata/chunk" "github.com/raintank/metrictank/util" "github.com/raintank/worldping-api/pkg/log" @@ -314,26 +315,27 @@ func AggMetricKey(key, archive string, aggSpan uint32) string { func (s *Server) getSeriesFixed(req models.Req, consolidator consolidation.Consolidator) []schema.Point { ctx := newRequestContext(&req, consolidator) - iters := s.getSeries(ctx) - points := s.itersToPoints(ctx, iters) - return Fix(points, req.From, req.To, req.ArchInterval) + res := s.getSeries(ctx) + res.Points = append(s.itersToPoints(ctx, res.Iters), res.Points...) + return Fix(res.Points, req.From, req.To, req.ArchInterval) } -func (s *Server) getSeries(ctx *requestContext) []chunk.Iter { +func (s *Server) getSeries(ctx *requestContext) mdata.Result { - oldest, memIters := s.getSeriesAggMetrics(ctx) - log.Debug("oldest from aggmetrics is %d", oldest) + res := s.getSeriesAggMetrics(ctx) + log.Debug("oldest from aggmetrics is %d", res.Oldest) - if oldest <= ctx.From { + if res.Oldest <= ctx.From { reqSpanMem.ValueUint32(ctx.To - ctx.From) - return memIters + return res } // if oldest < to -> search until oldest, we already have the rest from mem // if to < oldest -> no need to search until oldest, only search until to - until := util.Min(oldest, ctx.To) + until := util.Min(res.Oldest, ctx.To) - return append(s.getSeriesCachedStore(ctx, until), memIters...) + res.Iters = append(s.getSeriesCachedStore(ctx, until), res.Iters...) + return res } // getSeries returns points from mem (and cassandra if needed), within the range from (inclusive) - to (exclusive) @@ -363,24 +365,21 @@ func (s *Server) itersToPoints(ctx *requestContext, iters []chunk.Iter) []schema return points } -func (s *Server) getSeriesAggMetrics(ctx *requestContext) (uint32, []chunk.Iter) { - oldest := ctx.Req.To - var memIters []chunk.Iter - +func (s *Server) getSeriesAggMetrics(ctx *requestContext) mdata.Result { metric, ok := s.MemoryStore.Get(ctx.Key) if !ok { - return oldest, memIters + return mdata.Result{ + Oldest: ctx.Req.To, + } } if ctx.Cons != consolidation.None { logLoad("memory", ctx.AggKey, ctx.From, ctx.To) - oldest, memIters = metric.GetAggregated(ctx.Cons, ctx.Req.ArchInterval, ctx.From, ctx.To) + return metric.GetAggregated(ctx.Cons, ctx.Req.ArchInterval, ctx.From, ctx.To) } else { logLoad("memory", ctx.Req.Key, ctx.From, ctx.To) - oldest, memIters = metric.Get(ctx.From, ctx.To) + return metric.Get(ctx.From, ctx.To) } - - return oldest, memIters } // will only fetch until until, but uses ctx.To for debug logging diff --git a/api/dataprocessor_test.go b/api/dataprocessor_test.go index 4e9025b94d..27de9c33c4 100644 --- a/api/dataprocessor_test.go +++ b/api/dataprocessor_test.go @@ -767,11 +767,11 @@ func TestGetSeriesAggMetrics(t *testing.T) { metric.Add(i, float64(i^2)) } - oldest, iters := srv.getSeriesAggMetrics(ctx) + res := srv.getSeriesAggMetrics(ctx) timestamps := make([]uint32, 0) values := make([]float64, 0) - for _, it := range iters { + for _, it := range res.Iters { for it.Next() { ts, val := it.Values() timestamps = append(timestamps, ts) @@ -782,8 +782,8 @@ func TestGetSeriesAggMetrics(t *testing.T) { // should be the T0 of the chunk from (1744) is in // 1744 - (1744 % 600) = 1200 expected := uint32(1200) - if oldest != expected { - t.Errorf("Expected oldest to be %d but got %d", expected, oldest) + if res.Oldest != expected { + t.Errorf("Expected oldest to be %d but got %d", expected, res.Oldest) } // number of returned ts should be the number of chunks the searched range spans across * chunkspan diff --git a/conf/schemas.go b/conf/schemas.go index 6db5022850..b458b2fbad 100644 --- a/conf/schemas.go +++ b/conf/schemas.go @@ -26,10 +26,11 @@ func (s SchemaSlice) Less(i, j int) bool { return s[i].Priority >= s[j].Priority // Schema represents one schema setting type Schema struct { - Name string - Pattern *regexp.Regexp - Retentions Retentions - Priority int64 + Name string + Pattern *regexp.Regexp + Retentions Retentions + Priority int64 + ReorderWindow uint32 } func NewSchemas(schemas []Schema) Schemas { @@ -116,6 +117,19 @@ func ReadSchemas(file string) (Schemas, error) { } schema.Priority = int64(p)<<32 - int64(i) // to sort records with same priority by position in file + reorderBufferStr := sec.ValueOf("reorderBuffer") + if len(reorderBufferStr) > 0 { + reorderWindow, err := strconv.ParseUint(reorderBufferStr, 10, 32) + if err != nil { + return Schemas{}, fmt.Errorf("[%s]: Failed to parse reorder buffer conf, expected a number: %s", schema.Name, reorderBufferStr) + } + + // if reorderWindow == 0 we just disable the buffer + if reorderWindow > 0 { + schema.ReorderWindow = uint32(reorderWindow) + } + } + schemas = append(schemas, schema) } diff --git a/docker/docker-cluster/storage-schemas.conf b/docker/docker-cluster/storage-schemas.conf index d13cde4086..759ee61eb4 100644 --- a/docker/docker-cluster/storage-schemas.conf +++ b/docker/docker-cluster/storage-schemas.conf @@ -1,3 +1,56 @@ +# This config file sets up your retention rules. +# It is an extension of http://graphite.readthedocs.io/en/latest/config-carbon.html#storage-schemas-conf +# Note: +# * You can have 0 to N sections +# * The first match wins, starting from the top. If no match found, we default to single archive of minutely points, retained for 7 days in 2h chunks +# * The patterns are unanchored regular expressions, add '^' or '$' to match the beginning or end of a pattern. +# * When running a cluster of metrictank instances, all instances should have the same agg-settings. +# * Unlike whisper (graphite), the config doesn't stick: if you restart metrictank with updated settings, then those +# will be applied. The configured rollups will be saved by primary nodes and served in responses if they are ready. +# (note in particular that if you remove archives here, we will no longer read from them) +# * Retentions must be specified in order of increasing interval and retention +# * The reorderBuffer an optional buffer that temporarily keeps data points in memory as raw data and allows insertion at random order. The specified value is how many datapoints, based on the raw interval specified in the first defined retention, should be kept before they are flushed out. This is useful if the metric producers cannot guarantee that the data will arrive in order, but it is relatively memory intensive. If you are unsure whether you need this, better leave it disabled to not waste memory. +# +# A given rule is made up of at least 3 lines: the name, regex pattern, retentions and optionally the reorder buffer size. +# The retentions line can specify multiple retention definitions. You need one or more, space separated. +# +# There are 2 formats for a single retention definition: +# 1) 'series-interval:count-of-datapoints' legacy and not easy to read +# 2) 'series-interval:retention[:chunkspan:numchunks:ready]' more friendly format with optionally 3 extra fields +# +#Series intervals and retentions are specified using the following suffixes: +# +#s - second +#m - minute +#h - hour +#d - day +#y - year +# +# The final 3 fields are specific to metrictank and if unspecified, use sane defaults. +# See https://github.com/raintank/metrictank/blob/master/docs/memory-server.md for more details +# +# chunkspan: duration of chunks. e.g. 10min, 30min, 1h, 90min... +# must be valid value as described here https://github.com/raintank/metrictank/blob/master/docs/memory-server.md#valid-chunk-spans +# Defaults to a the smallest chunkspan that can hold at least 100 points. +# +# numchunks: number of raw chunks to keep in in-memory ring buffer +# See https://github.com/raintank/metrictank/blob/master/docs/memory-server.md for details and trade-offs, especially when compared to chunk-cache +# which may be a more effective method to cache data and alleviate workload for cassandra. +# Defaults to 2 +# +# ready: whether the archive is ready for querying. This is useful if you recently introduced a new archive, but it's still being populated +# so you rather query other archives, even if they don't have the retention to serve your queries +# Defaults to true +# +# Here's an example with multiple retentions: +# [apache_busyWorkers] +# pattern = ^servers\.www.*\.workers\.busyWorkers$ +# retentions = 1s:1d:10min:1,1m:21d,15m:5y:2h:1:false +# +# This example has 3 retention definitions, the first and last override some default options (to use 10minutely and 2hourly chunks and only keep one of them in memory +# and the last rollup is marked as not ready yet for querying. + [default] pattern = .* retentions = 1s:1d +# reorderBuffer = 20 diff --git a/docker/docker-dev-custom-cfg-kafka/storage-aggregation.conf b/docker/docker-dev-custom-cfg-kafka/storage-aggregation.conf index 465906149c..804e096218 100644 --- a/docker/docker-dev-custom-cfg-kafka/storage-aggregation.conf +++ b/docker/docker-dev-custom-cfg-kafka/storage-aggregation.conf @@ -1,3 +1,16 @@ +# This config file controls which summaries are created (using which consolidation functions) for your lower-precision archives, as defined in storage-schemas.conf +# It is an extension of http://graphite.readthedocs.io/en/latest/config-carbon.html#storage-aggregation-conf +# Note: +# * This file is optional. If it is not present, we will use avg for everything +# * Anything not matched also uses avg for everything +# * xFilesFactor is not honored yet. What it is in graphite is a floating point number between 0 and 1 specifying what fraction of the previous retention level's slots must have non-null values in order to aggregate to a non-null value. The default is 0.5. +# * aggregationMethod specifies the functions used to aggregate values for the next retention level. Legal methods are avg/average, sum, min, max, and last. The default is average. +# Unlike Graphite, you can specify multiple, as it is often handy to have different summaries available depending on what analysis you need to do. +# When using multiple, the first one is used for reading. In the future, we will add capabilities to select the different archives for reading. +# * the settings configured when metrictank starts are what is applied. So you can enable or disable archives by restarting metrictank. +# +# see https://github.com/raintank/metrictank/blob/master/docs/consolidation.md for related info. + [default] pattern = .* xFilesFactor = 0.5 diff --git a/docker/docker-dev-custom-cfg-kafka/storage-schemas.conf b/docker/docker-dev-custom-cfg-kafka/storage-schemas.conf index 77c2614104..ccd6eb803d 100644 --- a/docker/docker-dev-custom-cfg-kafka/storage-schemas.conf +++ b/docker/docker-dev-custom-cfg-kafka/storage-schemas.conf @@ -1,3 +1,56 @@ +# This config file sets up your retention rules. +# It is an extension of http://graphite.readthedocs.io/en/latest/config-carbon.html#storage-schemas-conf +# Note: +# * You can have 0 to N sections +# * The first match wins, starting from the top. If no match found, we default to single archive of minutely points, retained for 7 days in 2h chunks +# * The patterns are unanchored regular expressions, add '^' or '$' to match the beginning or end of a pattern. +# * When running a cluster of metrictank instances, all instances should have the same agg-settings. +# * Unlike whisper (graphite), the config doesn't stick: if you restart metrictank with updated settings, then those +# will be applied. The configured rollups will be saved by primary nodes and served in responses if they are ready. +# (note in particular that if you remove archives here, we will no longer read from them) +# * Retentions must be specified in order of increasing interval and retention +# * The reorderBuffer an optional buffer that temporarily keeps data points in memory as raw data and allows insertion at random order. The specified value is how many datapoints, based on the raw interval specified in the first defined retention, should be kept before they are flushed out. This is useful if the metric producers cannot guarantee that the data will arrive in order, but it is relatively memory intensive. If you are unsure whether you need this, better leave it disabled to not waste memory. +# +# A given rule is made up of at least 3 lines: the name, regex pattern, retentions and optionally the reorder buffer size. +# The retentions line can specify multiple retention definitions. You need one or more, space separated. +# +# There are 2 formats for a single retention definition: +# 1) 'series-interval:count-of-datapoints' legacy and not easy to read +# 2) 'series-interval:retention[:chunkspan:numchunks:ready]' more friendly format with optionally 3 extra fields +# +#Series intervals and retentions are specified using the following suffixes: +# +#s - second +#m - minute +#h - hour +#d - day +#y - year +# +# The final 3 fields are specific to metrictank and if unspecified, use sane defaults. +# See https://github.com/raintank/metrictank/blob/master/docs/memory-server.md for more details +# +# chunkspan: duration of chunks. e.g. 10min, 30min, 1h, 90min... +# must be valid value as described here https://github.com/raintank/metrictank/blob/master/docs/memory-server.md#valid-chunk-spans +# Defaults to a the smallest chunkspan that can hold at least 100 points. +# +# numchunks: number of raw chunks to keep in in-memory ring buffer +# See https://github.com/raintank/metrictank/blob/master/docs/memory-server.md for details and trade-offs, especially when compared to chunk-cache +# which may be a more effective method to cache data and alleviate workload for cassandra. +# Defaults to 2 +# +# ready: whether the archive is ready for querying. This is useful if you recently introduced a new archive, but it's still being populated +# so you rather query other archives, even if they don't have the retention to serve your queries +# Defaults to true +# +# Here's an example with multiple retentions: +# [apache_busyWorkers] +# pattern = ^servers\.www.*\.workers\.busyWorkers$ +# retentions = 1s:1d:10min:1,1m:21d,15m:5y:2h:1:false +# +# This example has 3 retention definitions, the first and last override some default options (to use 10minutely and 2hourly chunks and only keep one of them in memory +# and the last rollup is marked as not ready yet for querying. + [default] pattern = .* retentions = 1s:35d:2min:2 +# reorderBuffer = 20 diff --git a/docs/config.md b/docs/config.md index f3c6deddc8..ceedb5c686 100644 --- a/docs/config.md +++ b/docs/config.md @@ -358,8 +358,9 @@ enabled = false # will be applied. The configured rollups will be saved by primary nodes and served in responses if they are ready. # (note in particular that if you remove archives here, we will no longer read from them) # * Retentions must be specified in order of increasing interval and retention +# * The reorderBuffer an optional buffer that temporarily keeps data points in memory as raw data and allows insertion at random order. The specified value is how many datapoints, based on the raw interval specified in the first defined retention, should be kept before they are flushed out. This is useful if the metric producers cannot guarantee that the data will arrive in order, but it is relatively memory intensive. If you are unsure whether you need this, better leave it disabled to not waste memory. # -# A given rule is made up of 3 lines: the name, regex pattern and retentions. +# A given rule is made up of at least 3 lines: the name, regex pattern, retentions and optionally the reorder buffer size. # The retentions line can specify multiple retention definitions. You need one or more, space separated. # # There are 2 formats for a single retention definition: @@ -401,6 +402,7 @@ enabled = false [default] pattern = .* retentions = 1s:35d:10min:7 +# reorderBuffer = 20 ``` # storage-aggregation.conf diff --git a/docs/metrics.md b/docs/metrics.md index d5c749944f..51a40d6bb9 100644 --- a/docs/metrics.md +++ b/docs/metrics.md @@ -229,6 +229,11 @@ a counter of how many chunks are created the number of times the metrics GC is about to inspect a metric (series) * `tank.metrics_active`: the number of currently known metrics (excl rollup series), measured every second +* `tank.metrics_reordered`: +the number of points received that are going back in time, but are still +within the reorder window. in such a case they will be inserted in the correct order. +E.g. if the reorder window is 60 (datapoints) then points may be inserted at random order as long as their +ts is not older than the 60th datapoint counting from the newest. * `tank.metrics_too_old`: points that go back in time. E.g. for any given series, when a point has a timestamp diff --git a/mdata/aggmetric.go b/mdata/aggmetric.go index ebe0d5ea1f..7bd826779d 100644 --- a/mdata/aggmetric.go +++ b/mdata/aggmetric.go @@ -26,6 +26,7 @@ type AggMetric struct { cachePusher cache.CachePusher sync.RWMutex Key string + rob *ReorderBuffer CurrentChunkPos int // element in []Chunks that is active. All others are either finished or nil. NumChunks uint32 // max size of the circular buffer ChunkSpan uint32 // span of individual chunks in seconds @@ -43,7 +44,7 @@ type AggMetric struct { // it optionally also creates aggregations with the given settings // the 0th retention is the native archive of this metric. if there's several others, we create aggregators, using agg. // it's the callers responsibility to make sure agg is not nil in that case! -func NewAggMetric(store Store, cachePusher cache.CachePusher, key string, retentions conf.Retentions, agg *conf.Aggregation, dropFirstChunk bool) *AggMetric { +func NewAggMetric(store Store, cachePusher cache.CachePusher, key string, retentions conf.Retentions, reorderWindow uint32, agg *conf.Aggregation, dropFirstChunk bool) *AggMetric { // note: during parsing of retentions, we assure there's at least 1. ret := retentions[0] @@ -61,6 +62,10 @@ func NewAggMetric(store Store, cachePusher cache.CachePusher, key string, retent // garbage collected right after creating it, before we can push to it. lastWrite: uint32(time.Now().Unix()), } + if reorderWindow != 0 { + m.rob = NewReorderBuffer(reorderWindow, ret.SecondsPerPoint) + } + for _, ret := range retentions[1:] { m.aggregators = append(m.aggregators, NewAggregator(store, cachePusher, key, ret, *agg, dropFirstChunk)) } @@ -132,7 +137,7 @@ func (a *AggMetric) getChunk(pos int) *chunk.Chunk { return a.Chunks[pos] } -func (a *AggMetric) GetAggregated(consolidator consolidation.Consolidator, aggSpan, from, to uint32) (uint32, []chunk.Iter) { +func (a *AggMetric) GetAggregated(consolidator consolidation.Consolidator, aggSpan, from, to uint32) Result { // no lock needed cause aggregators don't change at runtime for _, a := range a.aggregators { if a.span == aggSpan { @@ -161,7 +166,7 @@ func (a *AggMetric) GetAggregated(consolidator consolidation.Consolidator, aggSp // Get all data between the requested time ranges. From is inclusive, to is exclusive. from <= x < to // more data then what's requested may be included // also returns oldest point we have, so that if your query needs data before it, the caller knows when to query cassandra -func (a *AggMetric) Get(from, to uint32) (uint32, []chunk.Iter) { +func (a *AggMetric) Get(from, to uint32) Result { pre := time.Now() if LogLevel < 2 { log.Debug("AM %s Get(): %d - %d (%s - %s) span:%ds", a.Key, from, to, TS(from), TS(to), to-from-1) @@ -172,12 +177,26 @@ func (a *AggMetric) Get(from, to uint32) (uint32, []chunk.Iter) { a.RLock() defer a.RUnlock() + result := Result{ + Oldest: math.MaxInt32, + } + + if a.rob != nil { + result.Points = a.rob.Get() + if len(result.Points) > 0 { + result.Oldest = result.Points[0].Ts + if result.Oldest <= from { + return result + } + } + } + if len(a.Chunks) == 0 { // we dont have any data yet. if LogLevel < 2 { log.Debug("AM %s Get(): no data for requested range.", a.Key) } - return math.MaxInt32, make([]chunk.Iter, 0) + return result } newestChunk := a.getChunk(a.CurrentChunkPos) @@ -197,7 +216,8 @@ func (a *AggMetric) Get(from, to uint32) (uint32, []chunk.Iter) { if LogLevel < 2 { log.Debug("AM %s Get(): no data for requested range.", a.Key) } - return from, make([]chunk.Iter, 0) + result.Oldest = from + return result } // get the oldest chunk we have. @@ -219,7 +239,7 @@ func (a *AggMetric) Get(from, to uint32) (uint32, []chunk.Iter) { oldestChunk := a.getChunk(oldestPos) if oldestChunk == nil { log.Error(3, "unexpected nil chunk.") - return math.MaxInt32, make([]chunk.Iter, 0) + return result } // The first chunk is likely only a partial chunk. If we are not the primary node @@ -233,7 +253,7 @@ func (a *AggMetric) Get(from, to uint32) (uint32, []chunk.Iter) { oldestChunk = a.getChunk(oldestPos) if oldestChunk == nil { log.Error(3, "unexpected nil chunk.") - return math.MaxInt32, make([]chunk.Iter, 0) + return result } } @@ -242,7 +262,8 @@ func (a *AggMetric) Get(from, to uint32) (uint32, []chunk.Iter) { if LogLevel < 2 { log.Debug("AM %s Get(): no data for requested range", a.Key) } - return oldestChunk.T0, make([]chunk.Iter, 0) + result.Oldest = oldestChunk.T0 + return result } // Find the oldest Chunk that the "from" ts falls in. If from extends before the oldest @@ -255,7 +276,8 @@ func (a *AggMetric) Get(from, to uint32) (uint32, []chunk.Iter) { oldestChunk = a.getChunk(oldestPos) if oldestChunk == nil { log.Error(3, "unexpected nil chunk.") - return to, make([]chunk.Iter, 0) + result.Oldest = to + return result } } @@ -274,15 +296,15 @@ func (a *AggMetric) Get(from, to uint32) (uint32, []chunk.Iter) { newestChunk = a.getChunk(newestPos) if newestChunk == nil { log.Error(3, "unexpected nil chunk.") - return to, make([]chunk.Iter, 0) + result.Oldest = to + return result } } // now just start at oldestPos and move through the Chunks circular Buffer to newestPos - iters := make([]chunk.Iter, 0, a.NumChunks) for { c := a.getChunk(oldestPos) - iters = append(iters, chunk.NewIter(c.Iter())) + result.Iters = append(result.Iters, chunk.NewIter(c.Iter())) if oldestPos == newestPos { break @@ -295,7 +317,8 @@ func (a *AggMetric) Get(from, to uint32) (uint32, []chunk.Iter) { } memToIterDuration.Value(time.Now().Sub(pre)) - return oldestChunk.T0, iters + result.Oldest = oldestChunk.T0 + return result } // this function must only be called while holding the lock @@ -407,6 +430,21 @@ func (a *AggMetric) Add(ts uint32, val float64) { a.Lock() defer a.Unlock() + if a.rob == nil { + // write directly + a.add(ts, val) + } else { + // write through reorder buffer + res := a.rob.Add(ts, val) + for _, p := range res { + a.add(p.Ts, p.Val) + } + } +} + +// don't ever call with a ts of 0, cause we use 0 to mean not initialized! +// assumes a write lock is held by the call-site +func (a *AggMetric) add(ts uint32, val float64) { t0 := ts - (ts % a.ChunkSpan) if len(a.Chunks) == 0 { diff --git a/mdata/aggmetric_test.go b/mdata/aggmetric_test.go index 849269a733..5609ecf304 100644 --- a/mdata/aggmetric_test.go +++ b/mdata/aggmetric_test.go @@ -2,6 +2,8 @@ package mdata import ( "fmt" + "regexp" + "sort" "testing" "time" @@ -17,6 +19,12 @@ type point struct { val float64 } +type ByTs []point + +func (a ByTs) Len() int { return len(a) } +func (a ByTs) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a ByTs) Less(i, j int) bool { return a[i].ts < a[j].ts } + func (p point) String() string { return fmt.Sprintf("point{%0.f at %d}", p.val, p.ts) } @@ -31,19 +39,29 @@ func NewChecker(t *testing.T, agg *AggMetric) *Checker { return &Checker{t, agg, make([]point, 0)} } -// always add points in ascending order, never same ts! func (c *Checker) Add(ts uint32, val float64) { c.agg.Add(ts, val) c.points = append(c.points, point{ts, val}) } +func (c *Checker) DropPointByTs(ts uint32) { + for i := 0; i != len(c.points); { + if c.points[i].ts == ts { + c.points = append(c.points[:i], c.points[i+1:]...) + } else { + i++ + } + } +} + // from to is the range that gets requested from AggMetric // first/last is what we use as data range to compare to (both inclusive) // these may be different because AggMetric returns broader rangers (due to packed format), func (c *Checker) Verify(primary bool, from, to, first, last uint32) { currentClusterStatus := cluster.Manager.IsPrimary() + sort.Sort(ByTs(c.points)) cluster.Manager.SetPrimary(primary) - _, iters := c.agg.Get(from, to) + res := c.agg.Get(from, to) // we don't do checking or fancy logic, it is assumed that the caller made sure first and last are ts of actual points var pi int // index of first point we want var pj int // index of last point we want @@ -53,18 +71,27 @@ func (c *Checker) Verify(primary bool, from, to, first, last uint32) { } c.t.Logf("verifying AggMetric.Get(%d,%d) =?= %d <= ts <= %d", from, to, first, last) index := pi - 1 - for _, iter := range iters { + for _, iter := range res.Iters { for iter.Next() { index++ tt, vv := iter.Values() if index > pj { - c.t.Fatalf("Values()=(%v,%v), want end of stream\n", tt, vv) + c.t.Fatalf("Iters: Values()=(%v,%v), want end of stream\n", tt, vv) } if c.points[index].ts != tt || c.points[index].val != vv { - c.t.Fatalf("Values()=(%v,%v), want (%v,%v)\n", tt, vv, c.points[index].ts, c.points[index].val) + c.t.Fatalf("Iters: Values()=(%v,%v), want (%v,%v)\n", tt, vv, c.points[index].ts, c.points[index].val) } } } + for _, point := range res.Points { + index++ + if index > pj { + c.t.Fatalf("Points: Values()=(%v,%v), want end of stream\n", point.Ts, point.Val) + } + if c.points[index].ts != point.Ts || c.points[index].val != point.Val { + c.t.Fatalf("Points: Values()=(%v,%v), want (%v,%v)\n", point.Ts, point.Val, c.points[index].ts, c.points[index].val) + } + } if index != pj { c.t.Fatalf("not all values returned. missing %v", c.points[index:pj+1]) } @@ -95,7 +122,7 @@ func testMetricPersistOptionalPrimary(t *testing.T, primary bool) { numChunks, chunkAddCount, chunkSpan := uint32(5), uint32(10), uint32(300) ret := []conf.Retention{conf.NewRetentionMT(1, 1, chunkSpan, numChunks, true)} - agg := NewAggMetric(dnstore, &mockCache, "foo", ret, nil, false) + agg := NewAggMetric(dnstore, &mockCache, "foo", ret, 0, nil, false) for ts := chunkSpan; ts <= chunkSpan*chunkAddCount; ts += chunkSpan { agg.Add(ts, 1) @@ -131,7 +158,7 @@ func TestAggMetric(t *testing.T) { cluster.Init("default", "test", time.Now(), "http", 6060) ret := []conf.Retention{conf.NewRetentionMT(1, 1, 100, 5, true)} - c := NewChecker(t, NewAggMetric(dnstore, &cache.MockCache{}, "foo", ret, nil, false)) + c := NewChecker(t, NewAggMetric(dnstore, &cache.MockCache{}, "foo", ret, 0, nil, false)) // basic case, single range c.Add(101, 101) @@ -199,6 +226,49 @@ func TestAggMetric(t *testing.T) { // c.Verify(true, 800, 1299, 1299, 1299) } +func TestAggMetricWithReorderBuffer(t *testing.T) { + cluster.Init("default", "test", time.Now(), "http", 6060) + + agg := conf.Aggregation{ + Name: "Default", + Pattern: regexp.MustCompile(".*"), + XFilesFactor: 0.5, + AggregationMethod: []conf.Method{conf.Avg}, + } + ret := []conf.Retention{conf.NewRetentionMT(1, 1, 100, 5, true)} + c := NewChecker(t, NewAggMetric(dnstore, &cache.MockCache{}, "foo", ret, 10, &agg, false)) + + // basic adds and verifies with test data + c.Add(101, 101) + c.Verify(true, 100, 200, 101, 101) + c.Add(105, 105) + c.Verify(true, 100, 199, 101, 105) + c.Add(115, 115) + c.Add(125, 125) + c.Add(135, 135) + c.Verify(true, 100, 199, 101, 135) + c.Add(200, 200) + c.Add(315, 315) + c.Verify(true, 100, 399, 101, 315) + + metricsTooOld.SetUint32(0) + + // adds 10 entries that are out of order and the reorder buffer should order the first 9 + // the last item (305) will be too old, so it increases metricsTooOld counter + for i := uint32(314); i > 304; i-- { + c.Add(i, float64(i)) + } + c.DropPointByTs(305) + + // get subranges + c.Verify(true, 100, 320, 101, 315) + + // one point has been added out of order and too old for the buffer to reorder + if metricsTooOld.Peek() != 1 { + t.Fatalf("Expected the out of order count to be 1, not %d", metricsTooOld.Peek()) + } +} + func TestAggMetricDropFirstChunk(t *testing.T) { cluster.Init("default", "test", time.Now(), "http", 6060) cluster.Manager.SetPrimary(true) @@ -206,7 +276,7 @@ func TestAggMetricDropFirstChunk(t *testing.T) { chunkSpan := uint32(10) numChunks := uint32(5) ret := []conf.Retention{conf.NewRetentionMT(1, 1, chunkSpan, numChunks, true)} - m := NewAggMetric(store, &cache.MockCache{}, "foo", ret, nil, true) + m := NewAggMetric(store, &cache.MockCache{}, "foo", ret, 0, nil, true) m.Add(10, 10) m.Add(11, 11) m.Add(12, 12) diff --git a/mdata/aggmetrics.go b/mdata/aggmetrics.go index a21eae6c53..55ccf7f14b 100644 --- a/mdata/aggmetrics.go +++ b/mdata/aggmetrics.go @@ -86,7 +86,8 @@ func (ms *AggMetrics) GetOrCreate(key, name string, schemaId, aggId uint16) Metr m, ok := ms.Metrics[key] if !ok { agg := Aggregations.Get(aggId) - m = NewAggMetric(ms.store, ms.cachePusher, key, Schemas.Get(schemaId).Retentions, &agg, ms.dropFirstChunk) + schema := Schemas.Get(schemaId) + m = NewAggMetric(ms.store, ms.cachePusher, key, schema.Retentions, schema.ReorderWindow, &agg, ms.dropFirstChunk) ms.Metrics[key] = m metricsActive.Set(len(ms.Metrics)) } diff --git a/mdata/aggregator.go b/mdata/aggregator.go index 8714c85656..39dc620199 100644 --- a/mdata/aggregator.go +++ b/mdata/aggregator.go @@ -45,26 +45,26 @@ func NewAggregator(store Store, cachePusher cache.CachePusher, key string, ret c switch agg { case conf.Avg: if aggregator.sumMetric == nil { - aggregator.sumMetric = NewAggMetric(store, cachePusher, fmt.Sprintf("%s_sum_%d", key, span), conf.Retentions{ret}, nil, dropFirstChunk) + aggregator.sumMetric = NewAggMetric(store, cachePusher, fmt.Sprintf("%s_sum_%d", key, span), conf.Retentions{ret}, 0, nil, dropFirstChunk) } if aggregator.cntMetric == nil { - aggregator.cntMetric = NewAggMetric(store, cachePusher, fmt.Sprintf("%s_cnt_%d", key, span), conf.Retentions{ret}, nil, dropFirstChunk) + aggregator.cntMetric = NewAggMetric(store, cachePusher, fmt.Sprintf("%s_cnt_%d", key, span), conf.Retentions{ret}, 0, nil, dropFirstChunk) } case conf.Sum: if aggregator.sumMetric == nil { - aggregator.sumMetric = NewAggMetric(store, cachePusher, fmt.Sprintf("%s_sum_%d", key, span), conf.Retentions{ret}, nil, dropFirstChunk) + aggregator.sumMetric = NewAggMetric(store, cachePusher, fmt.Sprintf("%s_sum_%d", key, span), conf.Retentions{ret}, 0, nil, dropFirstChunk) } case conf.Lst: if aggregator.lstMetric == nil { - aggregator.lstMetric = NewAggMetric(store, cachePusher, fmt.Sprintf("%s_lst_%d", key, span), conf.Retentions{ret}, nil, dropFirstChunk) + aggregator.lstMetric = NewAggMetric(store, cachePusher, fmt.Sprintf("%s_lst_%d", key, span), conf.Retentions{ret}, 0, nil, dropFirstChunk) } case conf.Max: if aggregator.maxMetric == nil { - aggregator.maxMetric = NewAggMetric(store, cachePusher, fmt.Sprintf("%s_max_%d", key, span), conf.Retentions{ret}, nil, dropFirstChunk) + aggregator.maxMetric = NewAggMetric(store, cachePusher, fmt.Sprintf("%s_max_%d", key, span), conf.Retentions{ret}, 0, nil, dropFirstChunk) } case conf.Min: if aggregator.minMetric == nil { - aggregator.minMetric = NewAggMetric(store, cachePusher, fmt.Sprintf("%s_min_%d", key, span), conf.Retentions{ret}, nil, dropFirstChunk) + aggregator.minMetric = NewAggMetric(store, cachePusher, fmt.Sprintf("%s_min_%d", key, span), conf.Retentions{ret}, 0, nil, dropFirstChunk) } } } diff --git a/mdata/aggregator_test.go b/mdata/aggregator_test.go index 91db808382..dff6a73fcc 100644 --- a/mdata/aggregator_test.go +++ b/mdata/aggregator_test.go @@ -44,9 +44,9 @@ func TestAggregator(t *testing.T) { cluster.Init("default", "test", time.Now(), "http", 6060) compare := func(key string, metric Metric, expected []schema.Point) { cluster.Manager.SetPrimary(true) - _, iters := metric.Get(0, 1000) + res := metric.Get(0, 1000) got := make([]schema.Point, 0, len(expected)) - for _, iter := range iters { + for _, iter := range res.Iters { for iter.Next() { ts, val := iter.Values() got = append(got, schema.Point{Val: val, Ts: ts}) diff --git a/mdata/ifaces.go b/mdata/ifaces.go index 823124f9f5..cc90fe2512 100644 --- a/mdata/ifaces.go +++ b/mdata/ifaces.go @@ -2,7 +2,6 @@ package mdata import ( "github.com/raintank/metrictank/consolidation" - "github.com/raintank/metrictank/mdata/chunk" ) type Metrics interface { @@ -12,6 +11,6 @@ type Metrics interface { type Metric interface { Add(ts uint32, val float64) - Get(from, to uint32) (uint32, []chunk.Iter) - GetAggregated(consolidator consolidation.Consolidator, aggSpan, from, to uint32) (uint32, []chunk.Iter) + Get(from, to uint32) Result + GetAggregated(consolidator consolidation.Consolidator, aggSpan, from, to uint32) Result } diff --git a/mdata/init.go b/mdata/init.go index ca75a68ba2..4b746a9d9a 100644 --- a/mdata/init.go +++ b/mdata/init.go @@ -22,9 +22,14 @@ var ( // metric tank.chunk_operations.clear is a counter of how many chunks are cleared (replaced by new chunks) chunkClear = stats.NewCounter32("tank.chunk_operations.clear") - // metric tank.metrics_too_old is points that go back in time. - // E.g. for any given series, when a point has a timestamp - // that is not higher than the timestamp of the last written timestamp for that series. + // metric tank.metrics_reordered is the number of points received that are going back in time, but are still + // within the reorder window. in such a case they will be inserted in the correct order. + // E.g. if the reorder window is 60 (datapoints) then points may be inserted at random order as long as their + // ts is not older than the 60th datapoint counting from the newest. + metricsReordered = stats.NewCounter32("tank.metrics_reordered") + + // metric tank.metrics_too_old is points that go back in time beyond the scope of the optional reorder window. + // these points will end up being dropped and lost. metricsTooOld = stats.NewCounter32("tank.metrics_too_old") // metric tank.add_to_closed_chunk is points received for the most recent chunk diff --git a/mdata/reorder_buffer.go b/mdata/reorder_buffer.go new file mode 100644 index 0000000000..6bf8c992d2 --- /dev/null +++ b/mdata/reorder_buffer.go @@ -0,0 +1,81 @@ +package mdata + +import ( + "gopkg.in/raintank/schema.v1" +) + +// ReorderBuffer keeps a window of data during which it is ok to send data out of order. +// Once the reorder window has passed Add() will return the old data and delete it from the buffer. +// The reorder buffer itself is not thread safe because it is only used by AggMetric, +// which is thread safe, so there is no locking in the buffer. +type ReorderBuffer struct { + len uint32 // length of buffer in number of datapoints + newest uint32 // index of newest buffer entry + interval uint32 // metric interval + buf []schema.Point // the actual buffer holding the data +} + +func NewReorderBuffer(reorderWindow uint32, interval int) *ReorderBuffer { + buf := &ReorderBuffer{ + len: reorderWindow, + interval: uint32(interval), + buf: make([]schema.Point, reorderWindow), + } + return buf +} + +func (rob *ReorderBuffer) Add(ts uint32, val float64) []schema.Point { + ts = aggBoundary(ts, rob.interval) + + // out of order and too old + if rob.buf[rob.newest].Ts != 0 && ts <= rob.buf[rob.newest].Ts-(rob.len*rob.interval) { + metricsTooOld.Inc() + return nil + } + + var res []schema.Point + oldest := (rob.newest + 1) % rob.len + index := (ts / rob.interval) % rob.len + if ts > rob.buf[rob.newest].Ts { + flushCount := (ts - rob.buf[rob.newest].Ts) / rob.interval + if flushCount > rob.len { + flushCount = rob.len + } + + for i := uint32(0); i < flushCount; i++ { + if rob.buf[oldest].Ts != 0 { + res = append(res, rob.buf[oldest]) + } + rob.buf[oldest].Ts = 0 + rob.buf[oldest].Val = 0 + oldest = (oldest + 1) % rob.len + } + rob.buf[index].Ts = ts + rob.buf[index].Val = val + rob.newest = index + } else { + metricsReordered.Inc() + rob.buf[index].Ts = ts + rob.buf[index].Val = val + } + + return res +} + +// returns all the data in the buffer as a raw list of points +func (rob *ReorderBuffer) Get() []schema.Point { + res := make([]schema.Point, 0, rob.len) + oldest := (rob.newest + 1) % rob.len + + for { + if rob.buf[oldest].Ts != 0 { + res = append(res, rob.buf[oldest]) + } + if oldest == rob.newest { + break + } + oldest = (oldest + 1) % rob.len + } + + return res +} diff --git a/mdata/reorder_buffer_test.go b/mdata/reorder_buffer_test.go new file mode 100644 index 0000000000..68362ff497 --- /dev/null +++ b/mdata/reorder_buffer_test.go @@ -0,0 +1,336 @@ +package mdata + +import ( + "reflect" + "testing" + + "gopkg.in/raintank/schema.v1" +) + +func testAddAndGet(t *testing.T, reorderWindow uint32, testData, expectedData []schema.Point, expectAddFail, expectReordered uint32) []schema.Point { + var flushed []schema.Point + b := NewReorderBuffer(reorderWindow, 1) + metricsTooOld.SetUint32(0) + metricsReordered.SetUint32(0) + for _, point := range testData { + addRes := b.Add(point.Ts, point.Val) + flushed = append(flushed, addRes...) + } + if expectAddFail != metricsTooOld.Peek() { + t.Fatalf("Expected %d failures, but had %d", expectAddFail, metricsTooOld.Peek()) + } + if metricsReordered.Peek() != expectReordered { + t.Fatalf("Expected %d metrics to get reordered, but had %d", expectReordered, metricsReordered.Peek()) + } + returned := b.Get() + + if !reflect.DeepEqual(expectedData, returned) { + t.Fatalf("Returned data does not match expected data\n%+v\n %+v", testData, expectedData) + } + return flushed +} + +// mixes up a sorted slice, but only within a certain range +// it simply reverses the order of bunches of a fixed size that's defined by the unsortBy parameter +// for example if unsortBy is 3, then this: +// [0,1,2,3,4,5,6,7,8,9] +// will be turned into this: +// [2,1,0,5,4,3,8,7,6,9] +func unsort(data []schema.Point, unsortBy int) []schema.Point { + out := make([]schema.Point, len(data)) + i := 0 + for ; i < len(data)-unsortBy; i = i + unsortBy { + for j := 0; j < unsortBy; j++ { + out[i+j] = data[i+unsortBy-j-1] + } + } + for ; i < len(data); i++ { + out[i] = data[i] + } + return out +} + +func TestReorderBufferUnsort(t *testing.T) { + testData := []schema.Point{ + {Ts: 0, Val: 0}, + {Ts: 1, Val: 100}, + {Ts: 2, Val: 200}, + {Ts: 3, Val: 300}, + {Ts: 4, Val: 400}, + {Ts: 5, Val: 500}, + {Ts: 6, Val: 600}, + {Ts: 7, Val: 700}, + {Ts: 8, Val: 800}, + {Ts: 9, Val: 900}, + } + expectedData := []schema.Point{ + {Ts: 2, Val: 200}, + {Ts: 1, Val: 100}, + {Ts: 0, Val: 0}, + {Ts: 5, Val: 500}, + {Ts: 4, Val: 400}, + {Ts: 3, Val: 300}, + {Ts: 8, Val: 800}, + {Ts: 7, Val: 700}, + {Ts: 6, Val: 600}, + {Ts: 9, Val: 900}, + } + unsortedData := unsort(testData, 3) + + for i := 0; i < len(expectedData); i++ { + if unsortedData[i] != expectedData[i] { + t.Fatalf("unsort function returned unexpected data %+v", unsortedData) + } + } +} + +func TestReorderBufferAddAndGetInOrder(t *testing.T) { + testData := []schema.Point{ + {Ts: 1001, Val: 100}, + {Ts: 1002, Val: 200}, + {Ts: 1003, Val: 300}, + } + expectedData := []schema.Point{ + {Ts: 1001, Val: 100}, + {Ts: 1002, Val: 200}, + {Ts: 1003, Val: 300}, + } + testAddAndGet(t, 600, testData, expectedData, 0, 0) +} + +func TestReorderBufferAddAndGetInReverseOrderOutOfWindow(t *testing.T) { + testData := []schema.Point{ + {Ts: 1003, Val: 300}, + {Ts: 1002, Val: 200}, + {Ts: 1001, Val: 100}, + } + expectedData := []schema.Point{ + {Ts: 1003, Val: 300}, + } + testAddAndGet(t, 1, testData, expectedData, 2, 0) +} + +func TestReorderBufferAddAndGetOutOfOrderInsideWindow(t *testing.T) { + testData := []schema.Point{ + {Ts: 1001, Val: 100}, + {Ts: 1002, Val: 200}, + {Ts: 1004, Val: 400}, + {Ts: 1003, Val: 300}, + {Ts: 1005, Val: 500}, + {Ts: 1006, Val: 600}, + {Ts: 1008, Val: 800}, + {Ts: 1007, Val: 700}, + {Ts: 1009, Val: 900}, + } + expectedData := []schema.Point{ + {Ts: 1001, Val: 100}, + {Ts: 1002, Val: 200}, + {Ts: 1003, Val: 300}, + {Ts: 1004, Val: 400}, + {Ts: 1005, Val: 500}, + {Ts: 1006, Val: 600}, + {Ts: 1007, Val: 700}, + {Ts: 1008, Val: 800}, + {Ts: 1009, Val: 900}, + } + testAddAndGet(t, 600, testData, expectedData, 0, 2) +} + +func TestReorderBufferAddAndGetOutOfOrderInsideWindowAsFirstPoint(t *testing.T) { + testData := []schema.Point{ + {Ts: 1002, Val: 200}, + {Ts: 1004, Val: 400}, + {Ts: 1003, Val: 300}, + {Ts: 1005, Val: 500}, + {Ts: 1001, Val: 100}, + {Ts: 1006, Val: 600}, + {Ts: 1008, Val: 800}, + {Ts: 1007, Val: 700}, + {Ts: 1009, Val: 900}, + } + expectedData := []schema.Point{ + {Ts: 1001, Val: 100}, + {Ts: 1002, Val: 200}, + {Ts: 1003, Val: 300}, + {Ts: 1004, Val: 400}, + {Ts: 1005, Val: 500}, + {Ts: 1006, Val: 600}, + {Ts: 1007, Val: 700}, + {Ts: 1008, Val: 800}, + {Ts: 1009, Val: 900}, + } + testAddAndGet(t, 600, testData, expectedData, 0, 3) +} + +func TestReorderBufferOmitFlushIfNotEnoughData(t *testing.T) { + b := NewReorderBuffer(9, 1) + for i := uint32(1); i < 10; i++ { + flushed := b.Add(i, float64(i*100)) + if len(flushed) > 0 { + t.Fatalf("Expected no data to get flushed out") + } + } +} + +func TestReorderBufferAddAndGetOutOfOrderOutOfWindow(t *testing.T) { + testData := []schema.Point{ + {Ts: 1001, Val: 100}, + {Ts: 1004, Val: 400}, + {Ts: 1003, Val: 300}, + {Ts: 1005, Val: 500}, + {Ts: 1006, Val: 600}, + {Ts: 1008, Val: 800}, + {Ts: 1007, Val: 700}, + {Ts: 1009, Val: 900}, + {Ts: 1002, Val: 200}, + } + expectedData := []schema.Point{ + {Ts: 1005, Val: 500}, + {Ts: 1006, Val: 600}, + {Ts: 1007, Val: 700}, + {Ts: 1008, Val: 800}, + {Ts: 1009, Val: 900}, + } + // point 2 should be missing because out of reorder window + expectedFlushedData := []schema.Point{ + {Ts: 1001, Val: 100}, + {Ts: 1003, Val: 300}, + {Ts: 1004, Val: 400}, + } + flushedData := testAddAndGet(t, 5, testData, expectedData, 1, 2) + if !reflect.DeepEqual(flushedData, expectedFlushedData) { + t.Fatalf("Flushed data does not match expected flushed data:\n%+v\n%+v", flushedData, expectedFlushedData) + } +} + +func TestReorderBufferFlushSortedData(t *testing.T) { + var results []schema.Point + buf := NewReorderBuffer(600, 1) + metricsTooOld.SetUint32(0) + for i := 1100; i < 2100; i++ { + flushed := buf.Add(uint32(i), float64(i)) + if metricsTooOld.Peek() != 0 { + t.Fatalf("Adding failed") + } + results = append(results, flushed...) + } + + for i := 0; i < 400; i++ { + if results[i].Ts != uint32(i+1100) || results[i].Val != float64(i+1100) { + t.Fatalf("Unexpected results %+v", results) + } + } +} + +func TestReorderBufferFlushUnsortedData1(t *testing.T) { + var results []schema.Point + buf := NewReorderBuffer(3, 1) + data := []schema.Point{ + {10, 10}, + {11, 11}, + {9, 9}, + {12, 12}, + {13, 13}, + {20, 20}, + {11, 11}, + {19, 19}, + } + failedCount := 0 + metricsTooOld.SetUint32(0) + for _, p := range data { + flushed := buf.Add(p.Ts, p.Val) + if metricsTooOld.Peek() != 0 { + failedCount++ + metricsTooOld.SetUint32(0) + } else { + results = append(results, flushed...) + } + } + expecting := []schema.Point{ + {9, 9}, + {10, 10}, + {11, 11}, + {12, 12}, + {13, 13}, + } + for i := range expecting { + if expecting[i] != results[i] { + t.Fatalf("Unexpected results %+v, %+v", expecting, results) + } + } + if failedCount != 1 { + t.Fatalf("expecting failed count to be 1, not %d", failedCount) + } +} + +func TestReorderBufferFlushUnsortedData2(t *testing.T) { + var results []schema.Point + buf := NewReorderBuffer(600, 1) + data := make([]schema.Point, 1000) + for i := 0; i < 1000; i++ { + data[i] = schema.Point{Ts: uint32(i + 1000), Val: float64(i + 1000)} + } + unsortedData := unsort(data, 10) + for i := 0; i < len(data); i++ { + flushed := buf.Add(unsortedData[i].Ts, unsortedData[i].Val) + results = append(results, flushed...) + } + for i := 0; i < 400; i++ { + if results[i].Ts != uint32(i+1000) || results[i].Val != float64(i+1000) { + t.Fatalf("Unexpected results %+v", results) + } + } +} + +func BenchmarkAddInOrder(b *testing.B) { + data := make([]schema.Point, b.N) + buf := NewReorderBuffer(uint32(b.N), 1) + b.ResetTimer() + + for i := 0; i < b.N; i++ { + buf.Add(data[i].Ts, data[i].Val) + } +} + +func BenchmarkAddOutOfOrder(b *testing.B) { + data := make([]schema.Point, b.N) + unsortedData := unsort(data, 10) + buf := NewReorderBuffer(uint32(b.N), 1) + b.ResetTimer() + + for i := 0; i < b.N; i++ { + buf.Add(unsortedData[i].Ts, unsortedData[i].Val) + } +} + +func benchmarkAddAndFlushX(b *testing.B, datapoints, reorderWindow uint32) { + buf := NewReorderBuffer( + reorderWindow, + 1, + ) + ts := uint32(1) + for ; ts <= datapoints; ts++ { + buf.Add(ts, float64(ts*100)) + } + + b.ResetTimer() + + for run := 0; run < b.N; run++ { + ts := uint32(1) + for ; ts <= datapoints; ts++ { + buf.Add(ts, float64(ts*100)) + } + } +} + +func BenchmarkAddAndFlush10000(b *testing.B) { + benchmarkAddAndFlushX(b, 10000, 1000) +} + +func BenchmarkAddAndFlush1000(b *testing.B) { + benchmarkAddAndFlushX(b, 1000, 100) +} + +func BenchmarkAddAndFlush100(b *testing.B) { + benchmarkAddAndFlushX(b, 100, 10) +} diff --git a/mdata/result.go b/mdata/result.go new file mode 100644 index 0000000000..886d2d92d9 --- /dev/null +++ b/mdata/result.go @@ -0,0 +1,12 @@ +package mdata + +import ( + "github.com/raintank/metrictank/mdata/chunk" + "gopkg.in/raintank/schema.v1" +) + +type Result struct { + Points []schema.Point + Iters []chunk.Iter + Oldest uint32 +} diff --git a/scripts/config/storage-schemas.conf b/scripts/config/storage-schemas.conf index 2228841c62..93fee450ef 100644 --- a/scripts/config/storage-schemas.conf +++ b/scripts/config/storage-schemas.conf @@ -9,8 +9,9 @@ # will be applied. The configured rollups will be saved by primary nodes and served in responses if they are ready. # (note in particular that if you remove archives here, we will no longer read from them) # * Retentions must be specified in order of increasing interval and retention +# * The reorderBuffer an optional buffer that temporarily keeps data points in memory as raw data and allows insertion at random order. The specified value is how many datapoints, based on the raw interval specified in the first defined retention, should be kept before they are flushed out. This is useful if the metric producers cannot guarantee that the data will arrive in order, but it is relatively memory intensive. If you are unsure whether you need this, better leave it disabled to not waste memory. # -# A given rule is made up of 3 lines: the name, regex pattern and retentions. +# A given rule is made up of at least 3 lines: the name, regex pattern, retentions and optionally the reorder buffer size. # The retentions line can specify multiple retention definitions. You need one or more, space separated. # # There are 2 formats for a single retention definition: @@ -52,3 +53,4 @@ [default] pattern = .* retentions = 1s:35d:10min:7 +# reorderBuffer = 20 diff --git a/usage/usage_test.go b/usage/usage_test.go index 090d6476df..2dcb4a7f48 100644 --- a/usage/usage_test.go +++ b/usage/usage_test.go @@ -53,11 +53,11 @@ func (f *FakeAggMetric) Add(ts uint32, val float64) { } // we won't use this -func (f *FakeAggMetric) Get(from, to uint32) (uint32, []chunk.Iter) { - return 0, make([]chunk.Iter, 0) +func (f *FakeAggMetric) Get(from, to uint32) mdata.Result { + return mdata.Result{Oldest: 0, Iters: make([]chunk.Iter, 0)} } -func (f *FakeAggMetric) GetAggregated(consolidator consolidation.Consolidator, aggSpan, from, to uint32) (uint32, []chunk.Iter) { - return 0, make([]chunk.Iter, 0) +func (f *FakeAggMetric) GetAggregated(consolidator consolidation.Consolidator, aggSpan, from, to uint32) mdata.Result { + return mdata.Result{Oldest: 0, Iters: make([]chunk.Iter, 0)} } func idFor(org int, metric, unit, mtype string, tags []string, interval uint32) string {