Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
make getSeries responsible for returning quantized data.
Browse files Browse the repository at this point in the history
Instead of having the somewhat awkward requirement that
data returned by getSeries() needs an additional pass through fix(),
just have getSeries take care of it.

An additional benefit is that it simplifies the code.
  • Loading branch information
Dieterbe committed Oct 31, 2016
1 parent bceb415 commit 107203d
Showing 1 changed file with 23 additions and 62 deletions.
85 changes: 23 additions & 62 deletions dataprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,77 +223,39 @@ func getTarget(store mdata.Store, req Req) (points []schema.Point, interval uint
}

if !readConsolidated && !runtimeConsolidation {
return fix(
getSeries(store, req.key, consolidation.None, req.archInterval, req.from, req.to),
req.from,
req.to,
req.archInterval,
), req.outInterval, nil
return getSeries(store, req.key, consolidation.None, req.archInterval, req.from, req.to),
req.outInterval, nil
} else if !readConsolidated && runtimeConsolidation {
return consolidate(
fix(
getSeries(store, req.key, consolidation.None, req.archInterval, req.from, req.to),
req.from,
req.to,
req.archInterval,
),
getSeries(store, req.key, consolidation.None, req.archInterval, req.from, req.to),
req.aggNum,
req.consolidator), req.outInterval, nil
} else if readConsolidated && !runtimeConsolidation {
if req.consolidator == consolidation.Avg {
return divide(
fix(
getSeries(store, req.key, consolidation.Sum, req.archInterval, req.from, req.to),
req.from,
req.to,
req.archInterval,
),
fix(
getSeries(store, req.key, consolidation.Cnt, req.archInterval, req.from, req.to),
req.from,
req.to,
req.archInterval,
),
getSeries(store, req.key, consolidation.Sum, req.archInterval, req.from, req.to),
getSeries(store, req.key, consolidation.Cnt, req.archInterval, req.from, req.to),
), req.outInterval, nil
} else {
return fix(
getSeries(store, req.key, req.consolidator, req.archInterval, req.from, req.to),
req.from,
req.to,
req.archInterval,
), req.outInterval, nil
return getSeries(store, req.key, req.consolidator, req.archInterval, req.from, req.to),
req.outInterval, nil
}
} else {
// readConsolidated && runtimeConsolidation
if req.consolidator == consolidation.Avg {
return divide(
consolidate(
fix(
getSeries(store, req.key, consolidation.Sum, req.archInterval, req.from, req.to),
req.from,
req.to,
req.archInterval,
),
getSeries(store, req.key, consolidation.Sum, req.archInterval, req.from, req.to),
req.aggNum,
consolidation.Sum),
consolidate(
fix(
getSeries(store, req.key, consolidation.Cnt, req.archInterval, req.from, req.to),
req.from,
req.to,
req.archInterval,
),
getSeries(store, req.key, consolidation.Cnt, req.archInterval, req.from, req.to),
req.aggNum,
consolidation.Sum),
), req.outInterval, nil
} else {
return consolidate(
fix(
getSeries(store, req.key, req.consolidator, req.archInterval, req.from, req.to),
req.from,
req.to,
req.archInterval,
),
getSeries(store, req.key, req.consolidator, req.archInterval, req.from, req.to),
req.aggNum, req.consolidator), req.outInterval, nil
}
}
Expand All @@ -316,24 +278,23 @@ func prevBoundary(ts uint32, span uint32) uint32 {
// getSeries returns points from mem (and cassandra if needed), within the range from (inclusive) - to (exclusive)
// it can query for data within aggregated archives, by using fn min/max/sum/cnt and providing the matching agg span as interval
// pass consolidation.None as consolidator to mean read from raw interval, otherwise we'll read from aggregated series.
// !! note that raw, unquantized data may be returned outside of the requested range. We expect that callers of getSeries
// will also call fix() afterwards which will fix that.
// here's why: while aggregated archives are quantized, raw intervals are not. And quantizing happens *after* calling this function. (in fix().)
// So we have to be adjust the range:
// (ranges described as a..b include both and b)
// REQ 0[---FROM---60]----------120-----------180[----TO----240] any request from 1..60 to 181..240 should ...
// QUANTD RESULT 0----------[60]---------[120]---------[180] return points 60, 120 and 180 (simply because of to/from and inclusive/exclusive rules)
// STORED DATA 0[----------60][---------120][---------180][---------240] .. but data for 60 may be at ts 1..60, data for 120 at 61..120 and for 180 at 121..180 (due to quantizing)
// to retrieve the stored data, we also use from inclusive and to exclusive, so to make sure that the data after quantization (fix()) is correct), we have to make the following adjustment:
// `from` 1..60 needs data 1..60 -> always adjust `from` to previous boundary+1 (here 1)
// `to` 181..240 needs data 121..180 -> always adjust `to` to previous boundary+1 (here 181)

// all data will also be quantized.
func getSeries(store mdata.Store, key string, consolidator consolidation.Consolidator, interval, fromUnix, toUnix uint32) []schema.Point {
iters := make([]iter.Iter, 0)
memIters := make([]iter.Iter, 0)
oldest := toUnix

// see explanation in function documentation
// while aggregated archives are quantized, raw intervals are not. quantizing happens at the end of this function, **after* this step.
// So we have to be adjust the range to get the right data.
// (ranges described as a..b include both and b)
// REQ 0[---FROM---60]----------120-----------180[----TO----240] any request from 1..60 to 181..240 should ...
// QUANTD RESULT 0----------[60]---------[120]---------[180] return points 60, 120 and 180 (simply because of to/from and inclusive/exclusive rules) ..
// STORED DATA 0[----------60][---------120][---------180][---------240] but data for 60 may be at 1..60, data for 120 at 61..120 and for 180 at 121..180 (due to quantizing)
// to retrieve the stored data, we also use from inclusive and to exclusive,
// so to make sure that the data after quantization (fix()) is correct, we have to make the following adjustment:
// `from` 1..60 needs data 1..60 -> always adjust `from` to previous boundary+1 (here 1)
// `to` 181..240 needs data 121..180 -> always adjust `to` to previous boundary+1 (here 181)

if consolidator == consolidation.None {
fromUnix = prevBoundary(fromUnix, interval) + 1
toUnix = prevBoundary(toUnix, interval) + 1
Expand Down Expand Up @@ -389,7 +350,7 @@ func getSeries(store mdata.Store, key string, consolidator consolidation.Consoli
}
}
itersToPointsDuration.Value(time.Now().Sub(pre))
return points
return fix(points, fromUnix, toUnix, interval)
}

// check for duplicate series names. If found merge the results.
Expand Down

0 comments on commit 107203d

Please sign in to comment.