From 107203dbab293ab8a8f6e1a352dc57d8119b16bf Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Mon, 31 Oct 2016 14:31:08 +0200 Subject: [PATCH] make getSeries responsible for returning quantized data. Instead of having the somewhat awkward requirement that data returned by getSeries() needs an additional pass through fix(), just have getSeries take care of it. An additional benefit is that it simplifies the code. --- dataprocessor.go | 85 +++++++++++++----------------------------------- 1 file changed, 23 insertions(+), 62 deletions(-) diff --git a/dataprocessor.go b/dataprocessor.go index c3ecf55123..070fab1697 100644 --- a/dataprocessor.go +++ b/dataprocessor.go @@ -223,77 +223,39 @@ func getTarget(store mdata.Store, req Req) (points []schema.Point, interval uint } if !readConsolidated && !runtimeConsolidation { - return fix( - getSeries(store, req.key, consolidation.None, req.archInterval, req.from, req.to), - req.from, - req.to, - req.archInterval, - ), req.outInterval, nil + return getSeries(store, req.key, consolidation.None, req.archInterval, req.from, req.to), + req.outInterval, nil } else if !readConsolidated && runtimeConsolidation { return consolidate( - fix( - getSeries(store, req.key, consolidation.None, req.archInterval, req.from, req.to), - req.from, - req.to, - req.archInterval, - ), + getSeries(store, req.key, consolidation.None, req.archInterval, req.from, req.to), req.aggNum, req.consolidator), req.outInterval, nil } else if readConsolidated && !runtimeConsolidation { if req.consolidator == consolidation.Avg { return divide( - fix( - getSeries(store, req.key, consolidation.Sum, req.archInterval, req.from, req.to), - req.from, - req.to, - req.archInterval, - ), - fix( - getSeries(store, req.key, consolidation.Cnt, req.archInterval, req.from, req.to), - req.from, - req.to, - req.archInterval, - ), + getSeries(store, req.key, consolidation.Sum, req.archInterval, req.from, req.to), + getSeries(store, req.key, consolidation.Cnt, req.archInterval, req.from, req.to), ), req.outInterval, nil } else { - return fix( - getSeries(store, req.key, req.consolidator, req.archInterval, req.from, req.to), - req.from, - req.to, - req.archInterval, - ), req.outInterval, nil + return getSeries(store, req.key, req.consolidator, req.archInterval, req.from, req.to), + req.outInterval, nil } } else { // readConsolidated && runtimeConsolidation if req.consolidator == consolidation.Avg { return divide( consolidate( - fix( - getSeries(store, req.key, consolidation.Sum, req.archInterval, req.from, req.to), - req.from, - req.to, - req.archInterval, - ), + getSeries(store, req.key, consolidation.Sum, req.archInterval, req.from, req.to), req.aggNum, consolidation.Sum), consolidate( - fix( - getSeries(store, req.key, consolidation.Cnt, req.archInterval, req.from, req.to), - req.from, - req.to, - req.archInterval, - ), + getSeries(store, req.key, consolidation.Cnt, req.archInterval, req.from, req.to), req.aggNum, consolidation.Sum), ), req.outInterval, nil } else { return consolidate( - fix( - getSeries(store, req.key, req.consolidator, req.archInterval, req.from, req.to), - req.from, - req.to, - req.archInterval, - ), + getSeries(store, req.key, req.consolidator, req.archInterval, req.from, req.to), req.aggNum, req.consolidator), req.outInterval, nil } } @@ -316,24 +278,23 @@ func prevBoundary(ts uint32, span uint32) uint32 { // getSeries returns points from mem (and cassandra if needed), within the range from (inclusive) - to (exclusive) // it can query for data within aggregated archives, by using fn min/max/sum/cnt and providing the matching agg span as interval // pass consolidation.None as consolidator to mean read from raw interval, otherwise we'll read from aggregated series. -// !! note that raw, unquantized data may be returned outside of the requested range. We expect that callers of getSeries -// will also call fix() afterwards which will fix that. -// here's why: while aggregated archives are quantized, raw intervals are not. And quantizing happens *after* calling this function. (in fix().) -// So we have to be adjust the range: -// (ranges described as a..b include both and b) -// REQ 0[---FROM---60]----------120-----------180[----TO----240] any request from 1..60 to 181..240 should ... -// QUANTD RESULT 0----------[60]---------[120]---------[180] return points 60, 120 and 180 (simply because of to/from and inclusive/exclusive rules) -// STORED DATA 0[----------60][---------120][---------180][---------240] .. but data for 60 may be at ts 1..60, data for 120 at 61..120 and for 180 at 121..180 (due to quantizing) -// to retrieve the stored data, we also use from inclusive and to exclusive, so to make sure that the data after quantization (fix()) is correct), we have to make the following adjustment: -// `from` 1..60 needs data 1..60 -> always adjust `from` to previous boundary+1 (here 1) -// `to` 181..240 needs data 121..180 -> always adjust `to` to previous boundary+1 (here 181) - +// all data will also be quantized. func getSeries(store mdata.Store, key string, consolidator consolidation.Consolidator, interval, fromUnix, toUnix uint32) []schema.Point { iters := make([]iter.Iter, 0) memIters := make([]iter.Iter, 0) oldest := toUnix - // see explanation in function documentation + // while aggregated archives are quantized, raw intervals are not. quantizing happens at the end of this function, **after* this step. + // So we have to be adjust the range to get the right data. + // (ranges described as a..b include both and b) + // REQ 0[---FROM---60]----------120-----------180[----TO----240] any request from 1..60 to 181..240 should ... + // QUANTD RESULT 0----------[60]---------[120]---------[180] return points 60, 120 and 180 (simply because of to/from and inclusive/exclusive rules) .. + // STORED DATA 0[----------60][---------120][---------180][---------240] but data for 60 may be at 1..60, data for 120 at 61..120 and for 180 at 121..180 (due to quantizing) + // to retrieve the stored data, we also use from inclusive and to exclusive, + // so to make sure that the data after quantization (fix()) is correct, we have to make the following adjustment: + // `from` 1..60 needs data 1..60 -> always adjust `from` to previous boundary+1 (here 1) + // `to` 181..240 needs data 121..180 -> always adjust `to` to previous boundary+1 (here 181) + if consolidator == consolidation.None { fromUnix = prevBoundary(fromUnix, interval) + 1 toUnix = prevBoundary(toUnix, interval) + 1 @@ -389,7 +350,7 @@ func getSeries(store mdata.Store, key string, consolidator consolidation.Consoli } } itersToPointsDuration.Value(time.Now().Sub(pre)) - return points + return fix(points, fromUnix, toUnix, interval) } // check for duplicate series names. If found merge the results.