diff --git a/api/dataprocessor.go b/api/dataprocessor.go index 5365fbea94..01a2f2acf5 100644 --- a/api/dataprocessor.go +++ b/api/dataprocessor.go @@ -696,6 +696,9 @@ type requestContext struct { AMKey schema.AMKey // set by combining Req's key, consolidator and archive info } +// newRequestContext sets a requestContext, in particular from/to, which are crafted such that: +// * raw (non-quantized data), after Fix() will honor the requested from/to +// * the series is pre-canonical wrt to the interval it will have after the after-fetch runtime normalization func newRequestContext(ctx context.Context, req *models.Req, consolidator consolidation.Consolidator) *requestContext { rc := requestContext{ @@ -750,8 +753,10 @@ func newRequestContext(ctx context.Context, req *models.Req, consolidator consol rc.AMKey = schema.GetAMKey(req.MKey, consolidator.Archive(), req.ArchInterval) } + // if the request has after-fetch runtime normalization, plan to make the series pre-canonical. + // (note: unfortunately we cannot make inputs for on-demand runtime normalization because by definition those are not predictable!) if req.AggNum > 1 { - // the series needs to be pre-canonical. There's 2 aspects to this + // There's 2 aspects to this // 1) `from` adjustment. // we may need to rewind the from so that we make sure to include all the necessary input raw data diff --git a/api/models/request.go b/api/models/request.go index 79f39ec507..3a4d60f2d9 100644 --- a/api/models/request.go +++ b/api/models/request.go @@ -45,8 +45,8 @@ type Req struct { Archive uint8 `json:"archive"` // 0 means original data, 1 means first agg level, 2 means 2nd, etc. ArchInterval uint32 `json:"archInterval"` // the interval corresponding to the archive we'll fetch TTL uint32 `json:"ttl"` // the ttl of the archive we'll fetch - OutInterval uint32 `json:"outInterval"` // the interval of the output data, after any runtime consolidation - AggNum uint32 `json:"aggNum"` // how many points to consolidate together at runtime, after fetching from the archive (normalization) + OutInterval uint32 `json:"outInterval"` // the interval of the series after fetching (possibly with pre-normalization) and after-fetch runtime normalization. not aware of further normalization or runtime consolidation + AggNum uint32 `json:"aggNum"` // how many points to consolidate together for after-fetch runtime normalization (see docs/consolidation.md) } // PNGroup is an identifier for a pre-normalization group: data that can be pre-normalized together @@ -92,7 +92,7 @@ func (r *Req) Plan(i int, a archives.Archive) { r.AggNum = 1 } -// PlanNormalization updates the planning parameters to accommodate normalization to the specified interval +// PlanNormalization updates the planning parameters to accommodate after-fetch runtime normalization to the specified interval func (r *Req) PlanNormalization(interval uint32) { r.OutInterval = interval r.AggNum = interval / r.ArchInterval @@ -132,11 +132,12 @@ func (r Req) PointsFetch() uint32 { } // PointsReturn estimates the amount of points that will be returned for this request -// best effort: not aware of summarize(), runtime normalization. but does account for runtime consolidation +// It is not aware of summarize() or on-demand runtime normalization. +// but does account for after-fetch runtime normalization and runtime consolidation func (r Req) PointsReturn(planMDP uint32) uint32 { points := (r.To - r.From) / r.OutInterval if planMDP > 0 && points > planMDP { - // note that we don't assign to req.AggNum here, because that's only for normalization. + // note that we don't assign to req.AggNum here, because that's only for after-fetch runtime normalization. // MDP runtime consolidation doesn't look at req.AggNum aggNum := consolidation.AggEvery(points, planMDP) points /= aggNum diff --git a/api/models/series.go b/api/models/series.go index b5a99a613b..5e4d668449 100644 --- a/api/models/series.go +++ b/api/models/series.go @@ -11,6 +11,7 @@ import ( "github.com/grafana/metrictank/expr/tagquery" "github.com/grafana/metrictank/mdata" "github.com/grafana/metrictank/schema" + "github.com/grafana/metrictank/util/align" pickle "github.com/kisielk/og-rek" ) @@ -238,6 +239,22 @@ func (s *Series) CopyTagsWith(key, val string) map[string]string { return out } +// IsCanonical checks whether the series is canonical wrt to its interval and from/to +func (s Series) IsCanonical() bool { + firstTs := align.ForwardIfNotAligned(s.QueryFrom, s.Interval) + lastTs := align.Backward(s.QueryTo, s.Interval) + num := int((lastTs-firstTs)/s.Interval) + 1 + if len(s.Datapoints) != num { + return false + } + for i := 0; i < num; i++ { + if s.Datapoints[i].Ts != firstTs+uint32(i)*s.Interval { + return false + } + } + return true +} + type SeriesByTarget []Series func (g SeriesByTarget) Len() int { return len(g) } diff --git a/devdocs/render-request-handling.md b/devdocs/render-request-handling.md index 97769e8733..0451065889 100644 --- a/devdocs/render-request-handling.md +++ b/devdocs/render-request-handling.md @@ -2,7 +2,8 @@ `/render` maps to `Server.renderMetrics` which has these main sections/steps: -* `expr.parseMany` validates the target strings (queries) and parses them grammatically. +* `expr.parseMany` validates the target strings (queries) and parses them grammatically (into `*[]expr.expr`) +* Proxy to Graphite if request could not be parsed * `expr.NewPlan` sets up the needed processing functions, their arguments (validates input to match the signature) and lets function adjust the context as it flows between the processing steps note: context holds from/to timestamp (may change across processing pipeline, e.g. to handle movingAverage queries) and consolidateBy setting, see NOTES in expr directory. @@ -12,13 +13,19 @@ * finds all series by fanning out the query patterns to all other shards. this gives basically idx.Node back. has the path, leaf, metricdefinition, schema/aggregation(rollup) settings, for each series, as well as on which node it can be found. * construct models.Req objects for each serie. this uses the MKey to identify series, also sets from/to, maxdatapoints, etc. - * `planRequests`: this plans all models.Req objects, deciding which archive to read from, whether to apply normalization, etc. + * `planRequests`: this plans all models.Req objects, deciding which archive to read from, sets up normalization parameters (if they can be pre-determined), etc (see NOTES in expr directory for more info) * `getTargets`: gets the data from the local node and peer nodes based on the models.Req objects + this is where we create a models.Series based on a models.Req. and we call: + * getSeriesFixed + * newRequestContext // sets from/to a bit specially to accommodate non-quantized raw data and to try to make pre-canonical. see code comments + * getSeries + * Fix() * `mergeSeries`: if there's multiple series with same name/tags, from, to and consolidator (e.g. because there's multiple series because users switched intervals), merge them together into one series * Sort each merged series so that the output of a function is well-defined and repeatable. * `plan.Run`: invoke all function processing, followed by runtime consolidation as necessary + ## MDP-optimization MDP at the leaf of the expr tree (fetch request) of 0 means don't optimize. If set it to >0 it means the request can be optimized. diff --git a/docs/consolidation.md b/docs/consolidation.md index 10dd6d092d..f3ad60554a 100644 --- a/docs/consolidation.md +++ b/docs/consolidation.md @@ -124,3 +124,20 @@ LONGER CHUNK (vs SHORTER CHUNK): * more decoding overhead * to accommodate newest chunk, cost of keeping 1 extra old chunk in RAM increases (or hitting cassandra more) * for aggbands, generally means storing more data in RAM that we don't really need in ram + + + + +## Consolidation throughout the response path + + +* Fetching of chunk data (raw or rollup), mainly based on time range queried and max-points-per-req restrictions +* Normalization: consolidates when series need to be combined together (e.g., in sumSeries(), divideSeries(), asPercent(), groupByTags() etc.). There's 2 different cases here: + - Pre-normalization: a specific optimization that, when enabled, can detect when series will definitely get combined together and not used elsewhere (e.g. when feeding into sumSeries), and if the output of the normalization would use an interval, that a series also has available as a rollup, will set up the request to use the rollup instead. + - Runtime normalization: if PN disabled or could not guarantee series will be solely normalized together (e.g. groupByTags(), asPercent) or we didn't have the right archives availables. + This takes two forms: + - on-demand runtime normalization: during function processing, when functions need to combine inputs of different intervals + - after-fetch runtime normalization (by using req.AggNum): when we could predict that normalization will always be needed on a series, but could not use pre-normalization. + +* Series run through the function-processing API. Many functions don't invoke consolidation but e.g. summarize() does. +* Runtime consolidation: To honor the MaxDataPoints value diff --git a/docs/render-path.md b/docs/render-path.md index 1e0b4d1d3d..1044669376 100644 --- a/docs/render-path.md +++ b/docs/render-path.md @@ -24,7 +24,7 @@ canonical form comes into play when we need to normalize (through consolidation It essentially means a series looks like a "native" fixed series of that higher interval, with respect to how many points it contains and which timestamps they have. -It is important here to keep in mind that consolidated points get the timestamp of the last of its input points. +It is important here to keep in mind that consolidated points get the timestamp of the last of its input points (postmarking). Continuing the above example, if we need to normalize the above series with aggNum 3 (OutInterval is 30s) we would normally get a series of (60,70,80), (90 -null -, 100, 110 - null), so the 30-second timestamps become 80 and 110. @@ -55,15 +55,18 @@ Continuing the example again, it could be another series that had a raw interval ## pre-canonical -a pre-canonical series is simply a series that after normalizing, will be canonical. +a series that is pre-canonical (wrt a given interval) is simply a series that after normalizing (to that interval), will be canonical. I.O.W. is a series that is fetched in such a way that when it is fed to Consolidate(), will produce a canonical series. See above for more details. Note: this can only be done to the extent we know what the normalization looks like. (by setting up req.AggNum and req.OutInterval for normalization). For series that get (further) normalized at runtime, -we can't predict this at fetch time and have to remove points to make the output canonical, or do what Graphite also does, -which is to add null points at the beginning or end as needed, which may lead to inaccurate leading or trailing points that +we can't predict this at fetch time and have to either: +A) remove points to make the output canonical, which removes some information +B) add null points at the beginning or end as needed, which may lead to inaccurate leading or trailing points that go potentially out of the bounds of the query. +Graphite does B, so we do the same. + ## nudging diff --git a/expr/data_test.go b/expr/data_test.go index b001bdc4e6..54627f3ed8 100644 --- a/expr/data_test.go +++ b/expr/data_test.go @@ -313,27 +313,51 @@ func getCopy(in []schema.Point) []schema.Point { } func getSeries(target, patt string, data []schema.Point) models.Series { + from := uint32(10) + to := uint32(61) + if len(data) > 0 { + from = data[0].Ts + to = data[len(data)-1].Ts + 1 + } return models.Series{ Target: target, QueryPatt: patt, + QueryFrom: from, + QueryTo: to, Datapoints: getCopy(data), Interval: 10, } } func getSeriesNamed(name string, data []schema.Point) models.Series { + from := uint32(10) + to := uint32(61) + if len(data) > 0 { + from = data[0].Ts + to = data[len(data)-1].Ts + 1 + } return models.Series{ Target: name, QueryPatt: name, + QueryFrom: from, + QueryTo: to, Datapoints: getCopy(data), Interval: 10, } } func getModel(name string, data []schema.Point) models.Series { + from := uint32(10) + to := uint32(61) + if len(data) > 0 { + from = data[0].Ts + to = data[len(data)-1].Ts + 1 + } series := models.Series{ Target: name, QueryPatt: name, + QueryFrom: from, + QueryTo: to, Datapoints: getCopy(data), Interval: 10, } diff --git a/expr/func_absolute_test.go b/expr/func_absolute_test.go index bf735c3e89..0a3fea98eb 100644 --- a/expr/func_absolute_test.go +++ b/expr/func_absolute_test.go @@ -52,24 +52,14 @@ func TestAbsoluteZeroInput(t *testing.T) { func TestAbsoluteRandom(t *testing.T) { input := []models.Series{ - { - Interval: 10, - QueryPatt: "random", - Target: "rand", - Datapoints: getCopy(random), - }, + getSeries("rand", "random", random), } inputCopy := make([]models.Series, len(input)) copy(inputCopy, input) f := getNewAbsolute(input) out := []models.Series{ - { - Interval: 10, - QueryPatt: "absolute(random)", - Target: "absolute(rand)", - Datapoints: getCopy(randomAbsolute), - }, + getSeries("absolute(rand)", "absolute(random)", randomAbsolute), } dataMap := initDataMap(input) @@ -89,6 +79,14 @@ func TestAbsoluteRandom(t *testing.T) { t.Fatal("Point slices in datamap overlap: ", err) } }) + t.Run("OutputIsCanonical", func(t *testing.T) { + for i, s := range got { + if !s.IsCanonical() { + t.Fatalf("Case %s: output series %d is not canonical: %v", "main", i, s) + } + } + }) + } func BenchmarkAbsolute10k_1NoNulls(b *testing.B) { benchmarkAbsolute(b, 1, test.RandFloats10k, test.RandFloats10k) diff --git a/expr/func_aggregate_test.go b/expr/func_aggregate_test.go index f421663ef3..2d08952609 100644 --- a/expr/func_aggregate_test.go +++ b/expr/func_aggregate_test.go @@ -284,6 +284,14 @@ func testAggregate(name, agg string, in [][]models.Series, out models.Series, t t.Fatalf("Case %s: Point slices in datamap overlap, err = %s", name, err) } }) + t.Run("OutputIsCanonical", func(t *testing.T) { + for i, s := range got { + if !s.IsCanonical() { + t.Fatalf("Case %s: output series %d is not canonical: %v", name, i, s) + } + } + }) + } func BenchmarkAggregate10k_1NoNulls(b *testing.B) { diff --git a/expr/func_alias_test.go b/expr/func_alias_test.go index de5ab2a8ef..5cfc44525a 100644 --- a/expr/func_alias_test.go +++ b/expr/func_alias_test.go @@ -71,6 +71,14 @@ func testAlias(name string, in []models.Series, out []models.Series, t *testing. t.Fatalf("Case %s: Point slices in datamap overlap, err = %s", name, err) } }) + t.Run("OutputIsCanonical", func(t *testing.T) { + for i, s := range got { + if !s.IsCanonical() { + t.Fatalf("Case %s: output series %d is not canonical: %v", name, i, s) + } + } + }) + } func BenchmarkAlias_1(b *testing.B) { diff --git a/expr/func_aliasbymetric_test.go b/expr/func_aliasbymetric_test.go index 4e3691348d..2d8b3611f8 100644 --- a/expr/func_aliasbymetric_test.go +++ b/expr/func_aliasbymetric_test.go @@ -109,4 +109,11 @@ func testAliasByMetric(in []models.Series, out []models.Series, t *testing.T) { t.Fatalf("Point slices in datamap overlap, err = %s", err) } }) + t.Run("OutputIsCanonical", func(t *testing.T) { + for i, s := range got { + if !s.IsCanonical() { + t.Fatalf("Case %s: output series %d is not canonical: %v", "main", i, s) + } + } + }) } diff --git a/expr/func_aliasbynode_test.go b/expr/func_aliasbynode_test.go index 52cdfa8b85..8d950e57dc 100644 --- a/expr/func_aliasbynode_test.go +++ b/expr/func_aliasbynode_test.go @@ -72,6 +72,13 @@ func testAliasByNode(name string, in []models.Series, out []models.Series, t *te t.Fatalf("Case %s: Point slices in datamap overlap, err = %s", name, err) } }) + t.Run("OutputIsCanonical", func(t *testing.T) { + for i, s := range got { + if !s.IsCanonical() { + t.Fatalf("Case %s: output series %d is not canonical: %v", name, i, s) + } + } + }) } func BenchmarkAliasByNode_1(b *testing.B) { diff --git a/expr/func_aspercent_test.go b/expr/func_aspercent_test.go index 59d563da7a..9715337697 100644 --- a/expr/func_aspercent_test.go +++ b/expr/func_aspercent_test.go @@ -226,6 +226,13 @@ func execAndCheck(in, out []models.Series, f GraphiteFunc, t *testing.T) { t.Fatalf("Point slices in datamap overlap, err = %s", err) } }) + t.Run("OutputIsCanonical", func(t *testing.T) { + for i, s := range got { + if !s.IsCanonical() { + t.Fatalf("Case %s: output series %d is not canonical: %v", "main", i, s) + } + } + }) } func TestAsPercentSingleNoArg(t *testing.T) { @@ -238,6 +245,8 @@ func TestAsPercentSingleNoArg(t *testing.T) { out := []models.Series{ { Interval: 10, + QueryFrom: 10, + QueryTo: 61, QueryPatt: "asPercent(func(tag=something;tag2=anything),sumSeries(func(tag=something;tag2=anything)))", Target: "asPercent(a;tag=something;tag2=anything,sumSeries(func(tag=something;tag2=anything)))", Datapoints: []schema.Point{ @@ -282,12 +291,16 @@ func TestAsPercentMultipleNoArg(t *testing.T) { out := []models.Series{ { Interval: 10, + QueryFrom: 10, + QueryTo: 61, QueryPatt: "asPercent(func(tag=something;tag2=anything),sumSeries(func(tag=something;tag2=anything)))", Target: "asPercent(a;tag=something;tag2=anything,sumSeries(func(tag=something;tag2=anything)))", Datapoints: out1, }, { Interval: 10, + QueryFrom: 10, + QueryTo: 61, QueryPatt: "asPercent(func(tag=something;tag2=anything),sumSeries(func(tag=something;tag2=anything)))", Target: "asPercent(b;tag=something;tag2=anything,sumSeries(func(tag=something;tag2=anything)))", Datapoints: out2, @@ -308,6 +321,8 @@ func TestAsPercentTotalFloat(t *testing.T) { out := []models.Series{ { Interval: 10, + QueryFrom: 10, + QueryTo: 61, QueryPatt: "asPercent(func(a;tag=something;tag2=anything),123.456)", Target: "asPercent(a;tag=something;tag2=anything,123.456)", Datapoints: []schema.Point{ @@ -354,12 +369,16 @@ func TestAsPercentTotalSerie(t *testing.T) { out := []models.Series{ { Interval: 10, + QueryFrom: 10, + QueryTo: 61, QueryPatt: "asPercent(func(tag=something;tag2=anything),func(tag=some;tag2=totalSerie))", Target: "asPercent(b;tag=something;tag2=anything,a;tag=some;tag2=totalSerie)", Datapoints: out1, }, { Interval: 10, + QueryFrom: 10, + QueryTo: 61, QueryPatt: "asPercent(func(tag=something;tag2=anything),func(tag=some;tag2=totalSerie))", Target: "asPercent(d;tag=something;tag2=anything,a;tag=some;tag2=totalSerie)", Datapoints: out2, @@ -402,12 +421,16 @@ func TestAsPercentTotalSeries(t *testing.T) { out := []models.Series{ { Interval: 10, + QueryFrom: 10, + QueryTo: 61, QueryPatt: "asPercent(func(tag=something;tag2=anything),func(tag=some;tag2=totalSerie))", Target: "asPercent(b;tag=something;tag2=anything,a;tag=some;tag2=totalSerie)", Datapoints: out1, }, { Interval: 10, + QueryFrom: 10, + QueryTo: 61, QueryPatt: "asPercent(func(tag=something;tag2=anything),func(tag=some;tag2=totalSerie))", Target: "asPercent(d;tag=something;tag2=anything,c;tag=some;tag2=totalSerie)", Datapoints: out2, @@ -454,18 +477,24 @@ func TestAsPercentNoArgNodes(t *testing.T) { out := []models.Series{ { Interval: 10, + QueryFrom: 10, + QueryTo: 61, QueryPatt: "asPercent(func(tag=something;tag2=anything),sumSeries(func(tag=something;tag2=anything)))", Target: "asPercent(this.that.a;tag=something;tag2=anything,sumSeries(func(tag=something;tag2=anything)))", Datapoints: out1, }, { Interval: 10, + QueryFrom: 10, + QueryTo: 61, QueryPatt: "asPercent(func(tag=something;tag2=anything),sumSeries(func(tag=something;tag2=anything)))", Target: "asPercent(this.that.b;tag=something;tag2=anything,sumSeries(func(tag=something;tag2=anything)))", Datapoints: out2, }, { Interval: 10, + QueryFrom: 10, + QueryTo: 61, QueryPatt: "asPercent(func(tag=something;tag2=anything),func(tag=something;tag2=anything))", Target: "asPercent(this.this.c;tag=something;tag2=anything,this.this.c;tag=something;tag2=anything)", Datapoints: out3, @@ -518,18 +547,24 @@ func TestAsPercentNoArgTagNodes(t *testing.T) { out := []models.Series{ { Interval: 10, + QueryFrom: 10, + QueryTo: 61, QueryPatt: "asPercent(func(tag=something1;tag2=anything),sumSeries(func(tag=something1;tag2=anything)))", Target: "asPercent(this.that.a;tag=something1;tag2=anything,sumSeries(func(tag=something1;tag2=anything)))", Datapoints: out1, }, { Interval: 10, + QueryFrom: 10, + QueryTo: 61, QueryPatt: "asPercent(func(tag=something1;tag2=anything),sumSeries(func(tag=something1;tag2=anything)))", Target: "asPercent(this.those.b;tag=something1;tag2=anything,sumSeries(func(tag=something1;tag2=anything)))", Datapoints: out2, }, { Interval: 10, + QueryFrom: 10, + QueryTo: 61, QueryPatt: "asPercent(func(tag=something2;tag2=anything),func(tag=something2;tag2=anything))", Target: "asPercent(this.this.c;tag=something2;tag2=anything,this.this.c;tag=something2;tag2=anything)", Datapoints: out3, @@ -588,24 +623,32 @@ func TestAsPercentSeriesByNodes(t *testing.T) { out := []models.Series{ { Interval: 10, + QueryFrom: 10, + QueryTo: 61, QueryPatt: "asPercent(func(tag=something;tag2=anything),sumSeries(func(tag=something;tag2=totalSerie)))", Target: "asPercent(this.that.a;tag=something;tag2=anything,sumSeries(func(tag=something;tag2=totalSerie)))", Datapoints: out1, }, { Interval: 10, + QueryFrom: 10, + QueryTo: 61, QueryPatt: "asPercent(func(tag=something;tag2=anything),sumSeries(func(tag=something;tag2=totalSerie)))", Target: "asPercent(this.that.b;tag=something;tag2=anything,sumSeries(func(tag=something;tag2=totalSerie)))", Datapoints: out2, }, { Interval: 10, + QueryFrom: 10, + QueryTo: 61, QueryPatt: "asPercent(func(tag=something;tag2=anything),MISSING)", Target: "asPercent(this.this.c;tag=something;tag2=anything,MISSING)", Datapoints: allNaN, }, { Interval: 10, + QueryFrom: 10, + QueryTo: 61, QueryPatt: "asPercent(MISSING,func(tag=something;tag2=totalSerie))", Target: "asPercent(MISSING,this.those.ab;tag=something;tag2=totalSerie)", Datapoints: allNaN, diff --git a/expr/func_consolidateby_test.go b/expr/func_consolidateby_test.go index 1637d99000..a70b970a00 100644 --- a/expr/func_consolidateby_test.go +++ b/expr/func_consolidateby_test.go @@ -78,4 +78,11 @@ func testConsolidateBy(name string, in []models.Series, out []models.Series, t * t.Fatalf("Case %s: Point slices in datamap overlap, err = %s", name, err) } }) + t.Run("OutputIsCanonical", func(t *testing.T) { + for i, s := range got { + if !s.IsCanonical() { + t.Fatalf("Case %s: output series %d is not canonical: %v", name, i, s) + } + } + }) } diff --git a/expr/func_countseries_test.go b/expr/func_countseries_test.go index b92317567d..1f6034b0a4 100644 --- a/expr/func_countseries_test.go +++ b/expr/func_countseries_test.go @@ -74,6 +74,13 @@ func testCountSeries(name string, in [][]models.Series, out []models.Series, t * t.Fatalf("Case %s: Point slices in datamap overlap, err = %s", name, err) } }) + t.Run("OutputIsCanonical", func(t *testing.T) { + for i, s := range got { + if !s.IsCanonical() { + t.Fatalf("Case %s: output series %d is not canonical: %v", name, i, s) + } + } + }) } func BenchmarkCountSeries10k_1NoNulls(b *testing.B) { diff --git a/expr/func_derivative_test.go b/expr/func_derivative_test.go index 58dee8fbc7..228929acca 100644 --- a/expr/func_derivative_test.go +++ b/expr/func_derivative_test.go @@ -100,6 +100,13 @@ func testDerivative(name string, in []models.Series, out []models.Series, t *tes t.Fatalf("Case %s: Point slices in datamap overlap, err = %s", name, err) } }) + t.Run("OutputIsCanonical", func(t *testing.T) { + for i, s := range got { + if !s.IsCanonical() { + t.Fatalf("Case %s: output series %d is not canonical: %v", name, i, s) + } + } + }) } func BenchmarkDerivative10k_1NoNulls(b *testing.B) { benchmarkDerivative(b, 1, test.RandFloats10k, test.RandFloats10k) diff --git a/expr/func_divideseries_test.go b/expr/func_divideseries_test.go index 091207bb2b..6ba92559e6 100644 --- a/expr/func_divideseries_test.go +++ b/expr/func_divideseries_test.go @@ -28,6 +28,8 @@ func TestDivideSeriesSingle(t *testing.T) { []models.Series{ { Interval: 10, + QueryFrom: 10, + QueryTo: 21, Target: "divideSeries(foo;a=a;b=b,bar;a=a1;b=b)", QueryPatt: "divideSeries(foo;a=a;b=b,bar;a=a1;b=b)", Datapoints: []schema.Point{ @@ -65,6 +67,8 @@ func TestDivideSeriesMultiple(t *testing.T) { []models.Series{ { Interval: 10, + QueryFrom: 10, + QueryTo: 21, Target: "divideSeries(foo-1;a=1;b=2;c=3,overbar;a=3;b=2;c=1)", QueryPatt: "divideSeries(foo-1;a=1;b=2;c=3,overbar;a=3;b=2;c=1)", Datapoints: []schema.Point{ @@ -77,6 +81,8 @@ func TestDivideSeriesMultiple(t *testing.T) { }, { Interval: 10, + QueryFrom: 10, + QueryTo: 21, Target: "divideSeries(foo-2;a=2;b=2;b=2,overbar;a=3;b=2;c=1)", QueryPatt: "divideSeries(foo-2;a=2;b=2;b=2,overbar;a=3;b=2;c=1)", Datapoints: []schema.Point{ diff --git a/expr/func_divideserieslists_test.go b/expr/func_divideserieslists_test.go index 0ecca4b0f0..1777ec6d63 100644 --- a/expr/func_divideserieslists_test.go +++ b/expr/func_divideserieslists_test.go @@ -32,6 +32,8 @@ func TestDivideSeriesListsSingle(t *testing.T) { []models.Series{ { Interval: 10, + QueryFrom: 10, + QueryTo: 41, Target: "divideSeries(foo;a=a;b=b,bar;a=a1;b=b)", QueryPatt: "divideSeries(foo;a=a;b=b,bar;a=a1;b=b)", Datapoints: []schema.Point{ @@ -75,6 +77,8 @@ func TestDivideSeriesListsMultiple(t *testing.T) { []models.Series{ { Interval: 10, + QueryFrom: 10, + QueryTo: 21, Target: "divideSeries(foo-1;a=1;b=2;c=3,overbar;a=3;b=2;c=1)", QueryPatt: "divideSeries(foo-1;a=1;b=2;c=3,overbar;a=3;b=2;c=1)", Datapoints: []schema.Point{ @@ -87,6 +91,8 @@ func TestDivideSeriesListsMultiple(t *testing.T) { }, { Interval: 10, + QueryFrom: 10, + QueryTo: 21, Target: "divideSeries(foo-2;a=2;b=2;b=2,overbar-2;a=3;b=2;c=1)", QueryPatt: "divideSeries(foo-2;a=2;b=2;b=2,overbar-2;a=3;b=2;c=1)", Datapoints: []schema.Point{ @@ -138,6 +144,13 @@ func testDivideSeriesLists(name string, dividend, divisor []models.Series, out [ t.Fatalf("Case %s: Point slices in datamap overlap, err = %s", name, err) } }) + t.Run("OutputIsCanonical", func(t *testing.T) { + for i, s := range got { + if !s.IsCanonical() { + t.Fatalf("Case %s: output series %d is not canonical: %v", name, i, s) + } + } + }) } func BenchmarkDivideSeriesLists10k_1AllSeriesHalfNulls(b *testing.B) { diff --git a/expr/func_offset_test.go b/expr/func_offset_test.go index cd9d4293a6..d9c0394cb3 100644 --- a/expr/func_offset_test.go +++ b/expr/func_offset_test.go @@ -89,6 +89,14 @@ func testOffset(name string, factor float64, in []models.Series, out []models.Se t.Fatalf("Case %s: Point slices in datamap overlap, err = %s", name, err) } }) + + t.Run("OutputIsCanonical", func(t *testing.T) { + for i, s := range got { + if !s.IsCanonical() { + t.Fatalf("Case %s: output series %d is not canonical: %v", name, i, s) + } + } + }) } func BenchmarkOffset10k_1NoNulls(b *testing.B) { diff --git a/expr/func_round_test.go b/expr/func_round_test.go index c2a80452a3..8283fe6c09 100644 --- a/expr/func_round_test.go +++ b/expr/func_round_test.go @@ -97,12 +97,7 @@ type TestCase struct { } func TestRoundLowPrecInput(t *testing.T) { - input := models.Series{ - Interval: 10, - QueryPatt: "lowPrec", - Target: "lowPrec", - Datapoints: getCopy(lowPrec), - } + input := getSeriesNamed("lowPrec", lowPrec) testData := []TestCase{ { @@ -128,12 +123,7 @@ func TestRoundLowPrecInput(t *testing.T) { } func TestRoundHighPrecInput(t *testing.T) { - input := models.Series{ - Interval: 10, - QueryPatt: "highPrec", - Target: "highPrec", - Datapoints: getCopy(highPrec), - } + input := getSeriesNamed("highPrec", highPrec) testData := []TestCase{ { @@ -168,12 +158,7 @@ func TestRoundOverflow(t *testing.T) { {Val: 1.0e+306, Ts: 10}, {Val: -1.0e+306, Ts: 20}, } - input := models.Series{ - Interval: 10, - QueryPatt: "overflow", - Target: "overflow", - Datapoints: getCopy(massiveDatapoints), - } + input := getSeriesNamed("overflow", massiveDatapoints) testData := []TestCase{ { @@ -197,12 +182,9 @@ func TestRoundOverflow(t *testing.T) { for _, data := range testData { f := getNewRound([]models.Series{input}, data.precision) - out := []models.Series{{ - Interval: 10, - QueryPatt: data.expectedName, - Target: data.expectedName, - Datapoints: data.expectedOutput, - }} + out := []models.Series{ + getSeriesNamed(data.expectedName, data.expectedOutput), + } got, err := f.Exec(make(map[Req][]models.Series)) if err := equalOutput(out, got, nil, err); err != nil { t.Fatal("Failed test:", data.expectedName, err) @@ -211,7 +193,7 @@ func TestRoundOverflow(t *testing.T) { } func TestRoundUnderflow(t *testing.T) { - minisculeDatapoint := []schema.Point{ + minisculeDatapoints := []schema.Point{ {Val: 1.0e-306, Ts: 10}, {Val: -1.0e-306, Ts: 20}, } @@ -219,12 +201,7 @@ func TestRoundUnderflow(t *testing.T) { {Val: 0, Ts: 10}, {Val: 0, Ts: 20}, } - input := models.Series{ - Interval: 10, - QueryPatt: "underflow", - Target: "underflow", - Datapoints: getCopy(minisculeDatapoint), - } + input := getSeriesNamed("underflow", minisculeDatapoints) testData := []TestCase{ { @@ -242,7 +219,7 @@ func TestRoundUnderflow(t *testing.T) { } func TestRoundTiny(t *testing.T) { - minisculeDatapoint := []schema.Point{ + minisculeDatapoints := []schema.Point{ {Val: 1.123e-36, Ts: 10}, {Val: -1.123e-36, Ts: 20}, } @@ -250,12 +227,7 @@ func TestRoundTiny(t *testing.T) { {Val: 1.12e-36, Ts: 10}, {Val: -1.12e-36, Ts: 20}, } - input := models.Series{ - Interval: 10, - QueryPatt: "underflow", - Target: "underflow", - Datapoints: getCopy(minisculeDatapoint), - } + input := getSeriesNamed("underflow", minisculeDatapoints) testData := []TestCase{ { diff --git a/expr/func_summarize.go b/expr/func_summarize.go index 9221b23b74..8bc3d6073f 100644 --- a/expr/func_summarize.go +++ b/expr/func_summarize.go @@ -95,6 +95,8 @@ func summarizeValues(serie models.Series, aggFunc batch.AggFunc, interval, start numPoints := len(serie.Datapoints) + // graphite-compatible bit + for ts, i := start, 0; i < numPoints && ts < end; ts += interval { s := i for ; i < numPoints && serie.Datapoints[i].Ts < ts+interval; i++ { @@ -111,5 +113,11 @@ func summarizeValues(serie models.Series, aggFunc batch.AggFunc, interval, start out = append(out, aggPoint) } + // MT specific bit: if !s.alignToFrom we want the output to be canonical + // only thing needed is strip out the first point if its TS < from + if len(out) != 0 && out[0].Ts < serie.QueryFrom { + out = out[1:] + } + return out } diff --git a/expr/func_summarize_test.go b/expr/func_summarize_test.go index ffec7d6211..c305775f23 100644 --- a/expr/func_summarize_test.go +++ b/expr/func_summarize_test.go @@ -9,6 +9,7 @@ import ( "github.com/grafana/metrictank/api/models" "github.com/grafana/metrictank/schema" "github.com/grafana/metrictank/test" + "github.com/grafana/metrictank/util/align" ) var abSummarize = []schema.Point{ @@ -63,7 +64,7 @@ func TestSummarizeDefaultInterval(t *testing.T) { Target: "a", QueryPatt: "a", QueryFrom: 10, - QueryTo: 60, + QueryTo: 61, Interval: 10, Datapoints: getCopy(a), }, @@ -74,7 +75,7 @@ func TestSummarizeDefaultInterval(t *testing.T) { Target: "summarize(a, \"10\", \"sum\")", QueryPatt: "summarize(a, \"10\", \"sum\")", QueryFrom: 10, - QueryTo: 60, + QueryTo: 61, Interval: 10, Datapoints: getCopy(a), }, @@ -84,7 +85,7 @@ func TestSummarizeDefaultInterval(t *testing.T) { Target: "summarize(a, \"10\", \"sum\", true)", QueryPatt: "summarize(a, \"10\", \"sum\", true)", QueryFrom: 10, - QueryTo: 60, + QueryTo: 61, Interval: 10, Datapoints: getCopy(a), }, @@ -96,7 +97,7 @@ func TestSummarizeDefaultInterval(t *testing.T) { Target: "summarize(a, \"10\", \"max\")", QueryPatt: "summarize(a, \"10\", \"max\")", QueryFrom: 10, - QueryTo: 60, + QueryTo: 61, Interval: 10, Datapoints: getCopy(a), }, @@ -106,7 +107,7 @@ func TestSummarizeDefaultInterval(t *testing.T) { Target: "summarize(a, \"10\", \"max\", true)", QueryPatt: "summarize(a, \"10\", \"max\", true)", QueryFrom: 10, - QueryTo: 60, + QueryTo: 61, Interval: 10, Datapoints: getCopy(a), }, @@ -140,7 +141,7 @@ func TestSummarizeOversampled(t *testing.T) { Target: "a", QueryPatt: "a", QueryFrom: 10, - QueryTo: 60, + QueryTo: 61, Interval: 10, Datapoints: getCopy(a), }, @@ -151,7 +152,7 @@ func TestSummarizeOversampled(t *testing.T) { Target: "summarize(a, \"5\", \"sum\")", QueryPatt: "summarize(a, \"5\", \"sum\")", QueryFrom: 10, - QueryTo: 60, + QueryTo: 61, Interval: 5, Datapoints: getCopy(aOversampled), }, @@ -161,7 +162,7 @@ func TestSummarizeOversampled(t *testing.T) { Target: "summarize(a, \"5\", \"sum\", true)", QueryPatt: "summarize(a, \"5\", \"sum\", true)", QueryFrom: 10, - QueryTo: 60, + QueryTo: 61, Interval: 5, Datapoints: getCopy(aOversampled), }, @@ -173,7 +174,7 @@ func TestSummarizeOversampled(t *testing.T) { Target: "summarize(a, \"5\", \"max\")", QueryPatt: "summarize(a, \"5\", \"max\")", QueryFrom: 10, - QueryTo: 60, + QueryTo: 61, Interval: 5, Datapoints: getCopy(aOversampled), }, @@ -183,7 +184,7 @@ func TestSummarizeOversampled(t *testing.T) { Target: "summarize(a, \"5\", \"max\", true)", QueryPatt: "summarize(a, \"5\", \"max\", true)", QueryFrom: 10, - QueryTo: 60, + QueryTo: 61, Interval: 5, Datapoints: getCopy(aOversampled), }, @@ -202,7 +203,7 @@ func TestSummarizeNyquistSingleIdentity(t *testing.T) { Target: "a", QueryPatt: "a", QueryFrom: 10, - QueryTo: 60, + QueryTo: 61, Interval: 10, Datapoints: getCopy(a), }, @@ -213,7 +214,7 @@ func TestSummarizeNyquistSingleIdentity(t *testing.T) { Target: "summarize(a, \"10s\", \"sum\")", QueryPatt: "summarize(a, \"10s\", \"sum\")", QueryFrom: 10, - QueryTo: 60, + QueryTo: 61, Interval: 10, Datapoints: getCopy(a), }, @@ -223,7 +224,7 @@ func TestSummarizeNyquistSingleIdentity(t *testing.T) { Target: "summarize(a, \"10s\", \"sum\", true)", QueryPatt: "summarize(a, \"10s\", \"sum\", true)", QueryFrom: 10, - QueryTo: 60, + QueryTo: 61, Interval: 10, Datapoints: getCopy(a), }, @@ -235,7 +236,7 @@ func TestSummarizeNyquistSingleIdentity(t *testing.T) { Target: "summarize(a, \"10s\", \"max\")", QueryPatt: "summarize(a, \"10s\", \"max\")", QueryFrom: 10, - QueryTo: 60, + QueryTo: 61, Interval: 10, Datapoints: getCopy(a), }, @@ -245,7 +246,7 @@ func TestSummarizeNyquistSingleIdentity(t *testing.T) { Target: "summarize(a, \"10s\", \"max\", true)", QueryPatt: "summarize(a, \"10s\", \"max\", true)", QueryFrom: 10, - QueryTo: 60, + QueryTo: 61, Interval: 10, Datapoints: getCopy(a), }, @@ -263,7 +264,7 @@ func TestSummarizeAllFuncs(t *testing.T) { Target: "a", QueryPatt: "a", QueryFrom: 10, - QueryTo: 60, + QueryTo: 61, Interval: 10, Datapoints: []schema.Point{ {Val: 0, Ts: 10}, @@ -364,7 +365,7 @@ func TestSummarizeAllFuncs(t *testing.T) { Target: target, QueryPatt: target, QueryFrom: 10, - QueryTo: 60, + QueryTo: 61, Interval: 30, Datapoints: c.points, }} @@ -378,7 +379,7 @@ func TestSummarizeMultipleIdentity(t *testing.T) { Target: "a", QueryPatt: "a", QueryFrom: 10, - QueryTo: 60, + QueryTo: 61, Interval: 10, Datapoints: getCopy(a), }, @@ -386,7 +387,7 @@ func TestSummarizeMultipleIdentity(t *testing.T) { Target: "b", QueryPatt: "b", QueryFrom: 10, - QueryTo: 60, + QueryTo: 61, Interval: 10, Datapoints: getCopy(b), }, @@ -397,7 +398,7 @@ func TestSummarizeMultipleIdentity(t *testing.T) { Target: "summarize(a, \"10s\", \"sum\")", QueryPatt: "summarize(a, \"10s\", \"sum\")", QueryFrom: 10, - QueryTo: 60, + QueryTo: 61, Interval: 10, Datapoints: getCopy(a), }, @@ -405,7 +406,7 @@ func TestSummarizeMultipleIdentity(t *testing.T) { Target: "summarize(b, \"10s\", \"sum\")", QueryPatt: "summarize(b, \"10s\", \"sum\")", QueryFrom: 10, - QueryTo: 60, + QueryTo: 61, Interval: 10, Datapoints: getCopy(b), }, @@ -415,7 +416,7 @@ func TestSummarizeMultipleIdentity(t *testing.T) { Target: "summarize(a, \"10s\", \"sum\", true)", QueryPatt: "summarize(a, \"10s\", \"sum\", true)", QueryFrom: 10, - QueryTo: 60, + QueryTo: 61, Interval: 10, Datapoints: getCopy(a), }, @@ -423,7 +424,7 @@ func TestSummarizeMultipleIdentity(t *testing.T) { Target: "summarize(b, \"10s\", \"sum\", true)", QueryPatt: "summarize(b, \"10s\", \"sum\", true)", QueryFrom: 10, - QueryTo: 60, + QueryTo: 61, Interval: 10, Datapoints: getCopy(b), }, @@ -435,7 +436,7 @@ func TestSummarizeMultipleIdentity(t *testing.T) { Target: "summarize(a, \"10s\", \"max\")", QueryPatt: "summarize(a, \"10s\", \"max\")", QueryFrom: 10, - QueryTo: 60, + QueryTo: 61, Interval: 10, Datapoints: getCopy(a), }, @@ -443,7 +444,7 @@ func TestSummarizeMultipleIdentity(t *testing.T) { Target: "summarize(b, \"10s\", \"max\")", QueryPatt: "summarize(b, \"10s\", \"max\")", QueryFrom: 10, - QueryTo: 60, + QueryTo: 61, Interval: 10, Datapoints: getCopy(b), }, @@ -453,7 +454,7 @@ func TestSummarizeMultipleIdentity(t *testing.T) { Target: "summarize(a, \"10s\", \"max\", true)", QueryPatt: "summarize(a, \"10s\", \"max\", true)", QueryFrom: 10, - QueryTo: 60, + QueryTo: 61, Interval: 10, Datapoints: getCopy(a), }, @@ -461,7 +462,7 @@ func TestSummarizeMultipleIdentity(t *testing.T) { Target: "summarize(b, \"10s\", \"max\", true)", QueryPatt: "summarize(b, \"10s\", \"max\", true)", QueryFrom: 10, - QueryTo: 60, + QueryTo: 61, Interval: 10, Datapoints: getCopy(b), }, @@ -690,7 +691,6 @@ func TestSummarizeAlignToFrom(t *testing.T) { {Val: 8, Ts: 240}, } var unaligned45sum, unaligned45max = []schema.Point{ - {Val: 1, Ts: 0}, {Val: 2, Ts: 45}, {Val: 7, Ts: 90}, {Val: 5, Ts: 135}, @@ -698,7 +698,6 @@ func TestSummarizeAlignToFrom(t *testing.T) { {Val: 8, Ts: 225}, }, []schema.Point{ - {Val: 1, Ts: 0}, {Val: 2, Ts: 45}, {Val: 4, Ts: 90}, {Val: 5, Ts: 135}, @@ -725,7 +724,7 @@ func TestSummarizeAlignToFrom(t *testing.T) { Target: "align", QueryPatt: "align", QueryFrom: 30, - QueryTo: 240, + QueryTo: 241, Interval: 30, Datapoints: getCopy(aligned30), }, @@ -736,7 +735,7 @@ func TestSummarizeAlignToFrom(t *testing.T) { Target: "summarize(align, \"45s\", \"sum\")", QueryPatt: "summarize(align, \"45s\", \"sum\")", QueryFrom: 30, - QueryTo: 240, + QueryTo: 241, Interval: 45, Datapoints: getCopy(unaligned45sum), }, @@ -746,7 +745,7 @@ func TestSummarizeAlignToFrom(t *testing.T) { Target: "summarize(align, \"45s\", \"sum\", true)", QueryPatt: "summarize(align, \"45s\", \"sum\", true)", QueryFrom: 30, - QueryTo: 240, + QueryTo: 241, Interval: 45, Datapoints: getCopy(aligned45sum), }, @@ -758,7 +757,7 @@ func TestSummarizeAlignToFrom(t *testing.T) { Target: "summarize(align, \"45s\", \"max\")", QueryPatt: "summarize(align, \"45s\", \"max\")", QueryFrom: 30, - QueryTo: 240, + QueryTo: 241, Interval: 45, Datapoints: getCopy(unaligned45max), }, @@ -768,7 +767,7 @@ func TestSummarizeAlignToFrom(t *testing.T) { Target: "summarize(align, \"45s\", \"max\", true)", QueryPatt: "summarize(align, \"45s\", \"max\", true)", QueryFrom: 30, - QueryTo: 240, + QueryTo: 241, Interval: 45, Datapoints: getCopy(aligned45max), }, @@ -799,12 +798,11 @@ func TestSummarizeLargeIntervalTimestamps(t *testing.T) { outputInterval := uint32(30 * 24 * 60 * 60) // alignToFrom = false - starting timestamp is a multiple of `outputInterval` - unalignedStart := startTime - (startTime % outputInterval) + unalignedStart := align.ForwardIfNotAligned(startTime, outputInterval) var unalignedExpected = []schema.Point{ {Val: 0, Ts: unalignedStart}, {Val: 0, Ts: unalignedStart + outputInterval}, {Val: 0, Ts: unalignedStart + 2*outputInterval}, - {Val: 0, Ts: unalignedStart + 3*outputInterval}, } // alignToFrom = true - starting timestamp is unchanged from input @@ -882,6 +880,15 @@ func testSummarize(name string, in []models.Series, out []models.Series, interva t.Fatalf("Case %s: Point slices in datamap overlap, err = %s", name, err) } }) + if !alignToFrom { + t.Run("OutputIsCanonical", func(t *testing.T) { + for i, s := range got { + if !s.IsCanonical() { + t.Fatalf("Case %s: output series %d is not canonical: %v", name, i, s) + } + } + }) + } } func BenchmarkSummarize10k_1NoNulls(b *testing.B) { diff --git a/expr/normalize.go b/expr/normalize.go index adc21bfb09..ec38423214 100644 --- a/expr/normalize.go +++ b/expr/normalize.go @@ -50,6 +50,7 @@ func NormalizeTwo(dataMap DataMap, a, b models.Series) (models.Series, models.Se } // NormalizeTo normalizes the given series to the desired interval +// will pad front and strip from back as needed, to assure the output is canonical for the given interval // the following MUST be true when calling this: // * interval > in.Interval // * interval % in.Interval == 0 @@ -60,24 +61,38 @@ func NormalizeTo(dataMap DataMap, in models.Series, interval uint32) models.Seri } // we need to copy the datapoints first because the consolidater will reuse the input slice - // also, the input may not be pre-canonical. so add nulls in front and at the back to make it pre-canonical. + // also, for the consolidator's output to be canonical, the input must be pre-canonical. + // so add nulls in front and at the back to make it pre-canonical. // this may make points in front and at the back less accurate when consolidated (e.g. summing when some of the points are null results in a lower value) // but this is what graphite does.... datapoints := pointSlicePool.Get().([]schema.Point) + datapoints = makePreCanonicalCopy(in, interval, datapoints) + // series may have been created by a function that didn't know which consolidation function to default to. + // in the future maybe we can do more clever things here. e.g. perSecond maybe consolidate by max. + if in.Consolidator == 0 { + in.Consolidator = consolidation.Avg + } + in.Datapoints = consolidation.Consolidate(datapoints, interval/in.Interval, in.Consolidator) + in.Interval = interval + dataMap.Add(Req{}, in) + return in +} + +// makePreCanonicalCopy returns a copy of in's datapoints slice, but adjusted to be pre-canonical with respect to interval. +// for this, it reuses the 'datapoints' slice. +func makePreCanonicalCopy(in models.Series, interval uint32, datapoints []schema.Point) []schema.Point { + // to achieve this we need to assure our input starts and ends with the right timestamp. + + // we need to figure out what is the ts of the first point to feed into the consolidator // example of how this works: - // if in.Interval is 5, and interval is 15, then for example, to generate point 15, you want inputs 5, 10 and 15. + // if in.Interval is 5, and interval is 15, then for example, to generate point 15, because we postmark and we want a full input going into this point, + // you want inputs 5, 10 and 15. // or more generally (you can follow any example vertically): // 5 10 15 20 25 30 35 40 45 50 <-- if any of these timestamps are your first point in `in` // 5 5 5 20 20 20 35 35 35 50 <-- then these are the corresponding timestamps of the first values we want as input for the consolidator // 15 15 15 30 30 30 45 45 45 60 <-- which, when fed through alignForwardIfNotAligned(), result in these numbers - // 5 5 5 20 20 20 35 35 35 50 <-- subtract (aggnum-1)* in.interval or equivalent -interval + in.Interval = -15 + 5 = -10. these are our desired numbers! - - // now, for the final value, it's important to be aware of cases like this: - // until=47, interval=10, in.Interval = 5 - // a canonical 10s series would have as last point 40. whereas our input series will have 45, which will consolidate into a point with timestamp 50, which is incorrect - // (it breaches `to`, and may have more points than other series it needs to be combined with) - // thus, we also need to potentially trim points from the back until the last point has the same Ts as a canonical series would + // 5 5 5 20 20 20 35 35 35 50 <-- subtract (aggnum-1)* in.interval or equivalent -interval + in.Interval = -15 + 5 = -10. this is our initial timestamp. canonicalStart := align.ForwardIfNotAligned(in.Datapoints[0].Ts, interval) - interval + in.Interval for ts := canonicalStart; ts < in.Datapoints[0].Ts; ts += in.Interval { @@ -86,17 +101,13 @@ func NormalizeTo(dataMap DataMap, in models.Series, interval uint32) models.Seri datapoints = append(datapoints, in.Datapoints...) + // for the desired last input ts, it's important to be aware of cases like this: + // until=47, interval=10, in.Interval = 5 + // a canonical 10s series would have as last point 40. whereas our input series will have 45, which will consolidate into a point with timestamp 50, which is incorrect + // (it breaches `to`, and may have more points than other series it needs to be combined with) + // thus, we also need to potentially trim points from the back until the last point has the same Ts as a canonical series would + canonicalTs := (datapoints[len(datapoints)-1].Ts / interval) * interval numDrop := int((datapoints[len(datapoints)-1].Ts - canonicalTs) / in.Interval) - datapoints = datapoints[0 : len(datapoints)-numDrop] - - // series may have been created by a function that didn't know which consolidation function to default to. - // in the future maybe we can do more clever things here. e.g. perSecond maybe consolidate by max. - if in.Consolidator == 0 { - in.Consolidator = consolidation.Avg - } - in.Datapoints = consolidation.Consolidate(datapoints, interval/in.Interval, in.Consolidator) - in.Interval = interval - dataMap.Add(Req{}, in) - return in + return datapoints[0 : len(datapoints)-numDrop] }