Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
Merge pull request #675 from raintank/write_ahead_buffer
Browse files Browse the repository at this point in the history
Reorder buffer
  • Loading branch information
Dieterbe authored Jul 17, 2017
2 parents 7e9bf99 + 6570db5 commit c7d2cbf
Show file tree
Hide file tree
Showing 20 changed files with 752 additions and 69 deletions.
37 changes: 18 additions & 19 deletions api/dataprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/raintank/metrictank/api/models"
"github.com/raintank/metrictank/consolidation"
"github.com/raintank/metrictank/mdata"
"github.com/raintank/metrictank/mdata/chunk"
"github.com/raintank/metrictank/util"
"github.com/raintank/worldping-api/pkg/log"
Expand Down Expand Up @@ -314,26 +315,27 @@ func AggMetricKey(key, archive string, aggSpan uint32) string {

func (s *Server) getSeriesFixed(req models.Req, consolidator consolidation.Consolidator) []schema.Point {
ctx := newRequestContext(&req, consolidator)
iters := s.getSeries(ctx)
points := s.itersToPoints(ctx, iters)
return Fix(points, req.From, req.To, req.ArchInterval)
res := s.getSeries(ctx)
res.Points = append(s.itersToPoints(ctx, res.Iters), res.Points...)
return Fix(res.Points, req.From, req.To, req.ArchInterval)
}

func (s *Server) getSeries(ctx *requestContext) []chunk.Iter {
func (s *Server) getSeries(ctx *requestContext) mdata.Result {

oldest, memIters := s.getSeriesAggMetrics(ctx)
log.Debug("oldest from aggmetrics is %d", oldest)
res := s.getSeriesAggMetrics(ctx)
log.Debug("oldest from aggmetrics is %d", res.Oldest)

if oldest <= ctx.From {
if res.Oldest <= ctx.From {
reqSpanMem.ValueUint32(ctx.To - ctx.From)
return memIters
return res
}

// if oldest < to -> search until oldest, we already have the rest from mem
// if to < oldest -> no need to search until oldest, only search until to
until := util.Min(oldest, ctx.To)
until := util.Min(res.Oldest, ctx.To)

return append(s.getSeriesCachedStore(ctx, until), memIters...)
res.Iters = append(s.getSeriesCachedStore(ctx, until), res.Iters...)
return res
}

// getSeries returns points from mem (and cassandra if needed), within the range from (inclusive) - to (exclusive)
Expand Down Expand Up @@ -363,24 +365,21 @@ func (s *Server) itersToPoints(ctx *requestContext, iters []chunk.Iter) []schema
return points
}

func (s *Server) getSeriesAggMetrics(ctx *requestContext) (uint32, []chunk.Iter) {
oldest := ctx.Req.To
var memIters []chunk.Iter

func (s *Server) getSeriesAggMetrics(ctx *requestContext) mdata.Result {
metric, ok := s.MemoryStore.Get(ctx.Key)
if !ok {
return oldest, memIters
return mdata.Result{
Oldest: ctx.Req.To,
}
}

if ctx.Cons != consolidation.None {
logLoad("memory", ctx.AggKey, ctx.From, ctx.To)
oldest, memIters = metric.GetAggregated(ctx.Cons, ctx.Req.ArchInterval, ctx.From, ctx.To)
return metric.GetAggregated(ctx.Cons, ctx.Req.ArchInterval, ctx.From, ctx.To)
} else {
logLoad("memory", ctx.Req.Key, ctx.From, ctx.To)
oldest, memIters = metric.Get(ctx.From, ctx.To)
return metric.Get(ctx.From, ctx.To)
}

return oldest, memIters
}

// will only fetch until until, but uses ctx.To for debug logging
Expand Down
8 changes: 4 additions & 4 deletions api/dataprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -767,11 +767,11 @@ func TestGetSeriesAggMetrics(t *testing.T) {
metric.Add(i, float64(i^2))
}

oldest, iters := srv.getSeriesAggMetrics(ctx)
res := srv.getSeriesAggMetrics(ctx)
timestamps := make([]uint32, 0)
values := make([]float64, 0)

for _, it := range iters {
for _, it := range res.Iters {
for it.Next() {
ts, val := it.Values()
timestamps = append(timestamps, ts)
Expand All @@ -782,8 +782,8 @@ func TestGetSeriesAggMetrics(t *testing.T) {
// should be the T0 of the chunk from (1744) is in
// 1744 - (1744 % 600) = 1200
expected := uint32(1200)
if oldest != expected {
t.Errorf("Expected oldest to be %d but got %d", expected, oldest)
if res.Oldest != expected {
t.Errorf("Expected oldest to be %d but got %d", expected, res.Oldest)
}

// number of returned ts should be the number of chunks the searched range spans across * chunkspan
Expand Down
22 changes: 18 additions & 4 deletions conf/schemas.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ func (s SchemaSlice) Less(i, j int) bool { return s[i].Priority >= s[j].Priority

// Schema represents one schema setting
type Schema struct {
Name string
Pattern *regexp.Regexp
Retentions Retentions
Priority int64
Name string
Pattern *regexp.Regexp
Retentions Retentions
Priority int64
ReorderWindow uint32
}

func NewSchemas(schemas []Schema) Schemas {
Expand Down Expand Up @@ -116,6 +117,19 @@ func ReadSchemas(file string) (Schemas, error) {
}
schema.Priority = int64(p)<<32 - int64(i) // to sort records with same priority by position in file

reorderBufferStr := sec.ValueOf("reorderBuffer")
if len(reorderBufferStr) > 0 {
reorderWindow, err := strconv.ParseUint(reorderBufferStr, 10, 32)
if err != nil {
return Schemas{}, fmt.Errorf("[%s]: Failed to parse reorder buffer conf, expected a number: %s", schema.Name, reorderBufferStr)
}

// if reorderWindow == 0 we just disable the buffer
if reorderWindow > 0 {
schema.ReorderWindow = uint32(reorderWindow)
}
}

schemas = append(schemas, schema)
}

Expand Down
53 changes: 53 additions & 0 deletions docker/docker-cluster/storage-schemas.conf
Original file line number Diff line number Diff line change
@@ -1,3 +1,56 @@
# This config file sets up your retention rules.
# It is an extension of http://graphite.readthedocs.io/en/latest/config-carbon.html#storage-schemas-conf
# Note:
# * You can have 0 to N sections
# * The first match wins, starting from the top. If no match found, we default to single archive of minutely points, retained for 7 days in 2h chunks
# * The patterns are unanchored regular expressions, add '^' or '$' to match the beginning or end of a pattern.
# * When running a cluster of metrictank instances, all instances should have the same agg-settings.
# * Unlike whisper (graphite), the config doesn't stick: if you restart metrictank with updated settings, then those
# will be applied. The configured rollups will be saved by primary nodes and served in responses if they are ready.
# (note in particular that if you remove archives here, we will no longer read from them)
# * Retentions must be specified in order of increasing interval and retention
# * The reorderBuffer an optional buffer that temporarily keeps data points in memory as raw data and allows insertion at random order. The specified value is how many datapoints, based on the raw interval specified in the first defined retention, should be kept before they are flushed out. This is useful if the metric producers cannot guarantee that the data will arrive in order, but it is relatively memory intensive. If you are unsure whether you need this, better leave it disabled to not waste memory.
#
# A given rule is made up of at least 3 lines: the name, regex pattern, retentions and optionally the reorder buffer size.
# The retentions line can specify multiple retention definitions. You need one or more, space separated.
#
# There are 2 formats for a single retention definition:
# 1) 'series-interval:count-of-datapoints' legacy and not easy to read
# 2) 'series-interval:retention[:chunkspan:numchunks:ready]' more friendly format with optionally 3 extra fields
#
#Series intervals and retentions are specified using the following suffixes:
#
#s - second
#m - minute
#h - hour
#d - day
#y - year
#
# The final 3 fields are specific to metrictank and if unspecified, use sane defaults.
# See https://github.com/raintank/metrictank/blob/master/docs/memory-server.md for more details
#
# chunkspan: duration of chunks. e.g. 10min, 30min, 1h, 90min...
# must be valid value as described here https://github.com/raintank/metrictank/blob/master/docs/memory-server.md#valid-chunk-spans
# Defaults to a the smallest chunkspan that can hold at least 100 points.
#
# numchunks: number of raw chunks to keep in in-memory ring buffer
# See https://github.com/raintank/metrictank/blob/master/docs/memory-server.md for details and trade-offs, especially when compared to chunk-cache
# which may be a more effective method to cache data and alleviate workload for cassandra.
# Defaults to 2
#
# ready: whether the archive is ready for querying. This is useful if you recently introduced a new archive, but it's still being populated
# so you rather query other archives, even if they don't have the retention to serve your queries
# Defaults to true
#
# Here's an example with multiple retentions:
# [apache_busyWorkers]
# pattern = ^servers\.www.*\.workers\.busyWorkers$
# retentions = 1s:1d:10min:1,1m:21d,15m:5y:2h:1:false
#
# This example has 3 retention definitions, the first and last override some default options (to use 10minutely and 2hourly chunks and only keep one of them in memory
# and the last rollup is marked as not ready yet for querying.

[default]
pattern = .*
retentions = 1s:1d
# reorderBuffer = 20
13 changes: 13 additions & 0 deletions docker/docker-dev-custom-cfg-kafka/storage-aggregation.conf
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
# This config file controls which summaries are created (using which consolidation functions) for your lower-precision archives, as defined in storage-schemas.conf
# It is an extension of http://graphite.readthedocs.io/en/latest/config-carbon.html#storage-aggregation-conf
# Note:
# * This file is optional. If it is not present, we will use avg for everything
# * Anything not matched also uses avg for everything
# * xFilesFactor is not honored yet. What it is in graphite is a floating point number between 0 and 1 specifying what fraction of the previous retention level's slots must have non-null values in order to aggregate to a non-null value. The default is 0.5.
# * aggregationMethod specifies the functions used to aggregate values for the next retention level. Legal methods are avg/average, sum, min, max, and last. The default is average.
# Unlike Graphite, you can specify multiple, as it is often handy to have different summaries available depending on what analysis you need to do.
# When using multiple, the first one is used for reading. In the future, we will add capabilities to select the different archives for reading.
# * the settings configured when metrictank starts are what is applied. So you can enable or disable archives by restarting metrictank.
#
# see https://github.com/raintank/metrictank/blob/master/docs/consolidation.md for related info.

[default]
pattern = .*
xFilesFactor = 0.5
Expand Down
53 changes: 53 additions & 0 deletions docker/docker-dev-custom-cfg-kafka/storage-schemas.conf
Original file line number Diff line number Diff line change
@@ -1,3 +1,56 @@
# This config file sets up your retention rules.
# It is an extension of http://graphite.readthedocs.io/en/latest/config-carbon.html#storage-schemas-conf
# Note:
# * You can have 0 to N sections
# * The first match wins, starting from the top. If no match found, we default to single archive of minutely points, retained for 7 days in 2h chunks
# * The patterns are unanchored regular expressions, add '^' or '$' to match the beginning or end of a pattern.
# * When running a cluster of metrictank instances, all instances should have the same agg-settings.
# * Unlike whisper (graphite), the config doesn't stick: if you restart metrictank with updated settings, then those
# will be applied. The configured rollups will be saved by primary nodes and served in responses if they are ready.
# (note in particular that if you remove archives here, we will no longer read from them)
# * Retentions must be specified in order of increasing interval and retention
# * The reorderBuffer an optional buffer that temporarily keeps data points in memory as raw data and allows insertion at random order. The specified value is how many datapoints, based on the raw interval specified in the first defined retention, should be kept before they are flushed out. This is useful if the metric producers cannot guarantee that the data will arrive in order, but it is relatively memory intensive. If you are unsure whether you need this, better leave it disabled to not waste memory.
#
# A given rule is made up of at least 3 lines: the name, regex pattern, retentions and optionally the reorder buffer size.
# The retentions line can specify multiple retention definitions. You need one or more, space separated.
#
# There are 2 formats for a single retention definition:
# 1) 'series-interval:count-of-datapoints' legacy and not easy to read
# 2) 'series-interval:retention[:chunkspan:numchunks:ready]' more friendly format with optionally 3 extra fields
#
#Series intervals and retentions are specified using the following suffixes:
#
#s - second
#m - minute
#h - hour
#d - day
#y - year
#
# The final 3 fields are specific to metrictank and if unspecified, use sane defaults.
# See https://github.com/raintank/metrictank/blob/master/docs/memory-server.md for more details
#
# chunkspan: duration of chunks. e.g. 10min, 30min, 1h, 90min...
# must be valid value as described here https://github.com/raintank/metrictank/blob/master/docs/memory-server.md#valid-chunk-spans
# Defaults to a the smallest chunkspan that can hold at least 100 points.
#
# numchunks: number of raw chunks to keep in in-memory ring buffer
# See https://github.com/raintank/metrictank/blob/master/docs/memory-server.md for details and trade-offs, especially when compared to chunk-cache
# which may be a more effective method to cache data and alleviate workload for cassandra.
# Defaults to 2
#
# ready: whether the archive is ready for querying. This is useful if you recently introduced a new archive, but it's still being populated
# so you rather query other archives, even if they don't have the retention to serve your queries
# Defaults to true
#
# Here's an example with multiple retentions:
# [apache_busyWorkers]
# pattern = ^servers\.www.*\.workers\.busyWorkers$
# retentions = 1s:1d:10min:1,1m:21d,15m:5y:2h:1:false
#
# This example has 3 retention definitions, the first and last override some default options (to use 10minutely and 2hourly chunks and only keep one of them in memory
# and the last rollup is marked as not ready yet for querying.

[default]
pattern = .*
retentions = 1s:35d:2min:2
# reorderBuffer = 20
4 changes: 3 additions & 1 deletion docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -358,8 +358,9 @@ enabled = false
# will be applied. The configured rollups will be saved by primary nodes and served in responses if they are ready.
# (note in particular that if you remove archives here, we will no longer read from them)
# * Retentions must be specified in order of increasing interval and retention
# * The reorderBuffer an optional buffer that temporarily keeps data points in memory as raw data and allows insertion at random order. The specified value is how many datapoints, based on the raw interval specified in the first defined retention, should be kept before they are flushed out. This is useful if the metric producers cannot guarantee that the data will arrive in order, but it is relatively memory intensive. If you are unsure whether you need this, better leave it disabled to not waste memory.
#
# A given rule is made up of 3 lines: the name, regex pattern and retentions.
# A given rule is made up of at least 3 lines: the name, regex pattern, retentions and optionally the reorder buffer size.
# The retentions line can specify multiple retention definitions. You need one or more, space separated.
#
# There are 2 formats for a single retention definition:
Expand Down Expand Up @@ -401,6 +402,7 @@ enabled = false
[default]
pattern = .*
retentions = 1s:35d:10min:7
# reorderBuffer = 20
```

# storage-aggregation.conf
Expand Down
5 changes: 5 additions & 0 deletions docs/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,11 @@ a counter of how many chunks are created
the number of times the metrics GC is about to inspect a metric (series)
* `tank.metrics_active`:
the number of currently known metrics (excl rollup series), measured every second
* `tank.metrics_reordered`:
the number of points received that are going back in time, but are still
within the reorder window. in such a case they will be inserted in the correct order.
E.g. if the reorder window is 60 (datapoints) then points may be inserted at random order as long as their
ts is not older than the 60th datapoint counting from the newest.
* `tank.metrics_too_old`:
points that go back in time.
E.g. for any given series, when a point has a timestamp
Expand Down
Loading

0 comments on commit c7d2cbf

Please sign in to comment.