Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Make summarize() output canonical in the !alignToFrom case #1849

Merged
merged 7 commits into from
Jun 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion api/dataprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,9 @@ type requestContext struct {
AMKey schema.AMKey // set by combining Req's key, consolidator and archive info
}

// newRequestContext sets a requestContext, in particular from/to, which are crafted such that:
// * raw (non-quantized data), after Fix() will honor the requested from/to
// * the series is pre-canonical wrt to the interval it will have after the after-fetch runtime normalization
func newRequestContext(ctx context.Context, req *models.Req, consolidator consolidation.Consolidator) *requestContext {

rc := requestContext{
Expand Down Expand Up @@ -750,8 +753,10 @@ func newRequestContext(ctx context.Context, req *models.Req, consolidator consol
rc.AMKey = schema.GetAMKey(req.MKey, consolidator.Archive(), req.ArchInterval)
}

// if the request has after-fetch runtime normalization, plan to make the series pre-canonical.
// (note: unfortunately we cannot make inputs for on-demand runtime normalization because by definition those are not predictable!)
if req.AggNum > 1 {
// the series needs to be pre-canonical. There's 2 aspects to this
// There's 2 aspects to this

// 1) `from` adjustment.
// we may need to rewind the from so that we make sure to include all the necessary input raw data
Expand Down
11 changes: 6 additions & 5 deletions api/models/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ type Req struct {
Archive uint8 `json:"archive"` // 0 means original data, 1 means first agg level, 2 means 2nd, etc.
ArchInterval uint32 `json:"archInterval"` // the interval corresponding to the archive we'll fetch
TTL uint32 `json:"ttl"` // the ttl of the archive we'll fetch
OutInterval uint32 `json:"outInterval"` // the interval of the output data, after any runtime consolidation
AggNum uint32 `json:"aggNum"` // how many points to consolidate together at runtime, after fetching from the archive (normalization)
OutInterval uint32 `json:"outInterval"` // the interval of the series after fetching (possibly with pre-normalization) and after-fetch runtime normalization. not aware of further normalization or runtime consolidation
AggNum uint32 `json:"aggNum"` // how many points to consolidate together for after-fetch runtime normalization (see docs/consolidation.md)
}

// PNGroup is an identifier for a pre-normalization group: data that can be pre-normalized together
Expand Down Expand Up @@ -92,7 +92,7 @@ func (r *Req) Plan(i int, a archives.Archive) {
r.AggNum = 1
}

// PlanNormalization updates the planning parameters to accommodate normalization to the specified interval
// PlanNormalization updates the planning parameters to accommodate after-fetch runtime normalization to the specified interval
func (r *Req) PlanNormalization(interval uint32) {
r.OutInterval = interval
r.AggNum = interval / r.ArchInterval
Expand Down Expand Up @@ -132,11 +132,12 @@ func (r Req) PointsFetch() uint32 {
}

// PointsReturn estimates the amount of points that will be returned for this request
// best effort: not aware of summarize(), runtime normalization. but does account for runtime consolidation
// It is not aware of summarize() or on-demand runtime normalization.
// but does account for after-fetch runtime normalization and runtime consolidation
func (r Req) PointsReturn(planMDP uint32) uint32 {
points := (r.To - r.From) / r.OutInterval
if planMDP > 0 && points > planMDP {
// note that we don't assign to req.AggNum here, because that's only for normalization.
// note that we don't assign to req.AggNum here, because that's only for after-fetch runtime normalization.
// MDP runtime consolidation doesn't look at req.AggNum
aggNum := consolidation.AggEvery(points, planMDP)
points /= aggNum
Expand Down
17 changes: 17 additions & 0 deletions api/models/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/grafana/metrictank/expr/tagquery"
"github.com/grafana/metrictank/mdata"
"github.com/grafana/metrictank/schema"
"github.com/grafana/metrictank/util/align"
pickle "github.com/kisielk/og-rek"
)

Expand Down Expand Up @@ -238,6 +239,22 @@ func (s *Series) CopyTagsWith(key, val string) map[string]string {
return out
}

// IsCanonical checks whether the series is canonical wrt to its interval and from/to
func (s Series) IsCanonical() bool {
firstTs := align.ForwardIfNotAligned(s.QueryFrom, s.Interval)
lastTs := align.Backward(s.QueryTo, s.Interval)
num := int((lastTs-firstTs)/s.Interval) + 1
if len(s.Datapoints) != num {
return false
}
for i := 0; i < num; i++ {
if s.Datapoints[i].Ts != firstTs+uint32(i)*s.Interval {
return false
}
}
return true
}

type SeriesByTarget []Series

func (g SeriesByTarget) Len() int { return len(g) }
Expand Down
11 changes: 9 additions & 2 deletions devdocs/render-request-handling.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

`/render` maps to `Server.renderMetrics` which has these main sections/steps:

* `expr.parseMany` validates the target strings (queries) and parses them grammatically.
* `expr.parseMany` validates the target strings (queries) and parses them grammatically (into `*[]expr.expr`)
* Proxy to Graphite if request could not be parsed

* `expr.NewPlan` sets up the needed processing functions, their arguments (validates input to match the signature) and lets function adjust the context as it flows between the processing steps
note: context holds from/to timestamp (may change across processing pipeline, e.g. to handle movingAverage queries) and consolidateBy setting, see NOTES in expr directory.
Expand All @@ -12,13 +13,19 @@
* finds all series by fanning out the query patterns to all other shards.
this gives basically idx.Node back. has the path, leaf, metricdefinition, schema/aggregation(rollup) settings, for each series, as well as on which node it can be found.
* construct models.Req objects for each serie. this uses the MKey to identify series, also sets from/to, maxdatapoints, etc.
* `planRequests`: this plans all models.Req objects, deciding which archive to read from, whether to apply normalization, etc.
* `planRequests`: this plans all models.Req objects, deciding which archive to read from, sets up normalization parameters (if they can be pre-determined), etc
(see NOTES in expr directory for more info)
* `getTargets`: gets the data from the local node and peer nodes based on the models.Req objects
this is where we create a models.Series based on a models.Req. and we call:
* getSeriesFixed
* newRequestContext // sets from/to a bit specially to accommodate non-quantized raw data and to try to make pre-canonical. see code comments
* getSeries
* Fix()
* `mergeSeries`: if there's multiple series with same name/tags, from, to and consolidator (e.g. because there's multiple series because users switched intervals), merge them together into one series
* Sort each merged series so that the output of a function is well-defined and repeatable.
* `plan.Run`: invoke all function processing, followed by runtime consolidation as necessary


## MDP-optimization

MDP at the leaf of the expr tree (fetch request) of 0 means don't optimize. If set it to >0 it means the request can be optimized.
Expand Down
17 changes: 17 additions & 0 deletions docs/consolidation.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,20 @@ LONGER CHUNK (vs SHORTER CHUNK):
* more decoding overhead
* to accommodate newest chunk, cost of keeping 1 extra old chunk in RAM increases (or hitting cassandra more)
* for aggbands, generally means storing more data in RAM that we don't really need in ram




## Consolidation throughout the response path


* Fetching of chunk data (raw or rollup), mainly based on time range queried and max-points-per-req restrictions
* Normalization: consolidates when series need to be combined together (e.g., in sumSeries(), divideSeries(), asPercent(), groupByTags() etc.). There's 2 different cases here:
- Pre-normalization: a specific optimization that, when enabled, can detect when series will definitely get combined together and not used elsewhere (e.g. when feeding into sumSeries), and if the output of the normalization would use an interval, that a series also has available as a rollup, will set up the request to use the rollup instead.
- Runtime normalization: if PN disabled or could not guarantee series will be solely normalized together (e.g. groupByTags(), asPercent) or we didn't have the right archives availables.
This takes two forms:
- on-demand runtime normalization: during function processing, when functions need to combine inputs of different intervals
- after-fetch runtime normalization (by using req.AggNum): when we could predict that normalization will always be needed on a series, but could not use pre-normalization.

* Series run through the function-processing API. Many functions don't invoke consolidation but e.g. summarize() does.
* Runtime consolidation: To honor the MaxDataPoints value
11 changes: 7 additions & 4 deletions docs/render-path.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ canonical form comes into play when we need to normalize (through consolidation
It essentially means a series looks like a "native" fixed series of that higher interval,
with respect to how many points it contains and which timestamps they have.

It is important here to keep in mind that consolidated points get the timestamp of the last of its input points.
It is important here to keep in mind that consolidated points get the timestamp of the last of its input points (postmarking).

Continuing the above example, if we need to normalize the above series with aggNum 3 (OutInterval is 30s)
we would normally get a series of (60,70,80), (90 -null -, 100, 110 - null), so the 30-second timestamps become 80 and 110.
Expand Down Expand Up @@ -55,15 +55,18 @@ Continuing the example again, it could be another series that had a raw interval

## pre-canonical

a pre-canonical series is simply a series that after normalizing, will be canonical.
a series that is pre-canonical (wrt a given interval) is simply a series that after normalizing (to that interval), will be canonical.
I.O.W. is a series that is fetched in such a way that when it is fed to Consolidate(), will produce a canonical series.
See above for more details.
Note: this can only be done to the extent we know what the normalization looks like.
(by setting up req.AggNum and req.OutInterval for normalization). For series that get (further) normalized at runtime,
we can't predict this at fetch time and have to remove points to make the output canonical, or do what Graphite also does,
which is to add null points at the beginning or end as needed, which may lead to inaccurate leading or trailing points that
we can't predict this at fetch time and have to either:
A) remove points to make the output canonical, which removes some information
B) add null points at the beginning or end as needed, which may lead to inaccurate leading or trailing points that
go potentially out of the bounds of the query.

Graphite does B, so we do the same.


## nudging

Expand Down
24 changes: 24 additions & 0 deletions expr/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,27 +313,51 @@ func getCopy(in []schema.Point) []schema.Point {
}

func getSeries(target, patt string, data []schema.Point) models.Series {
from := uint32(10)
to := uint32(61)
if len(data) > 0 {
from = data[0].Ts
to = data[len(data)-1].Ts + 1
}
return models.Series{
Target: target,
QueryPatt: patt,
QueryFrom: from,
QueryTo: to,
Datapoints: getCopy(data),
Interval: 10,
}
}

func getSeriesNamed(name string, data []schema.Point) models.Series {
from := uint32(10)
to := uint32(61)
if len(data) > 0 {
from = data[0].Ts
to = data[len(data)-1].Ts + 1
}
return models.Series{
Target: name,
QueryPatt: name,
QueryFrom: from,
QueryTo: to,
Datapoints: getCopy(data),
Interval: 10,
}
}

func getModel(name string, data []schema.Point) models.Series {
from := uint32(10)
to := uint32(61)
if len(data) > 0 {
from = data[0].Ts
to = data[len(data)-1].Ts + 1
}
series := models.Series{
Target: name,
QueryPatt: name,
QueryFrom: from,
QueryTo: to,
Datapoints: getCopy(data),
Interval: 10,
}
Expand Down
22 changes: 10 additions & 12 deletions expr/func_absolute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,24 +52,14 @@ func TestAbsoluteZeroInput(t *testing.T) {
func TestAbsoluteRandom(t *testing.T) {

input := []models.Series{
{
Interval: 10,
QueryPatt: "random",
Target: "rand",
Datapoints: getCopy(random),
},
getSeries("rand", "random", random),
}
inputCopy := make([]models.Series, len(input))
copy(inputCopy, input)

f := getNewAbsolute(input)
out := []models.Series{
{
Interval: 10,
QueryPatt: "absolute(random)",
Target: "absolute(rand)",
Datapoints: getCopy(randomAbsolute),
},
getSeries("absolute(rand)", "absolute(random)", randomAbsolute),
}

dataMap := initDataMap(input)
Expand All @@ -89,6 +79,14 @@ func TestAbsoluteRandom(t *testing.T) {
t.Fatal("Point slices in datamap overlap: ", err)
}
})
t.Run("OutputIsCanonical", func(t *testing.T) {
for i, s := range got {
if !s.IsCanonical() {
t.Fatalf("Case %s: output series %d is not canonical: %v", "main", i, s)
}
}
})

}
func BenchmarkAbsolute10k_1NoNulls(b *testing.B) {
benchmarkAbsolute(b, 1, test.RandFloats10k, test.RandFloats10k)
Expand Down
8 changes: 8 additions & 0 deletions expr/func_aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,14 @@ func testAggregate(name, agg string, in [][]models.Series, out models.Series, t
t.Fatalf("Case %s: Point slices in datamap overlap, err = %s", name, err)
}
})
t.Run("OutputIsCanonical", func(t *testing.T) {
for i, s := range got {
if !s.IsCanonical() {
t.Fatalf("Case %s: output series %d is not canonical: %v", name, i, s)
}
}
})

}

func BenchmarkAggregate10k_1NoNulls(b *testing.B) {
Expand Down
8 changes: 8 additions & 0 deletions expr/func_alias_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ func testAlias(name string, in []models.Series, out []models.Series, t *testing.
t.Fatalf("Case %s: Point slices in datamap overlap, err = %s", name, err)
}
})
t.Run("OutputIsCanonical", func(t *testing.T) {
for i, s := range got {
if !s.IsCanonical() {
t.Fatalf("Case %s: output series %d is not canonical: %v", name, i, s)
}
}
})

}

func BenchmarkAlias_1(b *testing.B) {
Expand Down
7 changes: 7 additions & 0 deletions expr/func_aliasbymetric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,11 @@ func testAliasByMetric(in []models.Series, out []models.Series, t *testing.T) {
t.Fatalf("Point slices in datamap overlap, err = %s", err)
}
})
t.Run("OutputIsCanonical", func(t *testing.T) {
for i, s := range got {
if !s.IsCanonical() {
t.Fatalf("Case %s: output series %d is not canonical: %v", "main", i, s)
}
}
})
}
7 changes: 7 additions & 0 deletions expr/func_aliasbynode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ func testAliasByNode(name string, in []models.Series, out []models.Series, t *te
t.Fatalf("Case %s: Point slices in datamap overlap, err = %s", name, err)
}
})
t.Run("OutputIsCanonical", func(t *testing.T) {
for i, s := range got {
if !s.IsCanonical() {
t.Fatalf("Case %s: output series %d is not canonical: %v", name, i, s)
}
}
})
}

func BenchmarkAliasByNode_1(b *testing.B) {
Expand Down
Loading