Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[query] Fix Graphite asPercent function to handle nodes and multiple total divisor series #3142

Merged
merged 18 commits into from
Mar 18, 2021
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
259 changes: 177 additions & 82 deletions src/query/graphite/native/builtin_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -1013,111 +1013,206 @@ func asPercent(ctx *common.Context, input singlePathSpec, total genericInterface
return ts.SeriesList(input), nil
}

var toNormalize, normalized []*ts.Series
var tf totalFunc
var totalText string
if len(nodes) > 0 {
metaSeries := getMetaSeriesGrouping(input, nodes)
totalSeries := make(map[string]*ts.Series)

switch totalArg := total.(type) {
case nil:
for k, series := range metaSeries {
if len(series) == 1 {
totalSeries[k] = series[0]
} else {
group, err := sumSeries(ctx, multiplePathSpecs(ts.NewSeriesListWithSeries(series...)))
if err != nil {
return ts.NewSeriesList(), err
}
totalSeries[k] = group.Values[0]
}
}
case ts.SeriesList, singlePathSpec:
var total ts.SeriesList
switch v := totalArg.(type) {
case ts.SeriesList:
total = v
case singlePathSpec:
total = ts.SeriesList(v)
}
totalGroups := getMetaSeriesGrouping(singlePathSpec(total), nodes)
for k, series := range totalGroups {
if len(series) == 1 {
totalSeries[k] = series[0]
} else {
group, err := sumSeries(ctx, multiplePathSpecs(ts.NewSeriesListWithSeries(series...)))
if err != nil {
return ts.NewSeriesList(), err
}
totalSeries[k] = group.Values[0]
}
}
default:
return ts.NewSeriesList(), errors.NewInvalidParamsError(fmt.Errorf("total must be nil or series list"))
}

switch totalArg := total.(type) {
case ts.SeriesList, singlePathSpec:
var total ts.SeriesList
switch v := totalArg.(type) {
case ts.SeriesList:
total = v
case singlePathSpec:
total = ts.SeriesList(v)
keys := make([]string, 0, len(metaSeries)+len(totalSeries))
// Create meta series keys copy.
for k := range metaSeries {
keys = append(keys, k)
}
if total.Len() == 0 {
// normalize input and sum up input as the total series
toNormalize = input.Values
tf = func(idx int, _ *ts.Series) float64 { return totalBySum(normalized, idx) }
} else {
if len(nodes) > 0 {
// group the series by specified nodes and then sum those groups
groupedTotal, err := groupByNodes(ctx, input, "sum", nodes...)
// Add any missing keys from totals.
for k := range totalSeries {
if _, ok := metaSeries[k]; ok {
continue
}
keys = append(keys, k)
}
// Sort keys for determinism and test result.
sort.Strings(keys)

results := make([]*ts.Series, 0, len(metaSeries))
for _, k := range keys {
seriesList, ok := metaSeries[k]
if !ok {
total := totalSeries[k]
newName := fmt.Sprintf("asPercent(MISSING,%s)", total.Name())
nanValues := ts.NewConstantValues(ctx, math.NaN(), total.Len(), total.MillisPerStep())
newSeries := ts.NewSeries(ctx, newName, total.StartTime(), nanValues)
results = append(results, newSeries)
continue
}

for _, series := range seriesList {
total, ok := totalSeries[k]
if !ok {
newName := fmt.Sprintf("asPercent(%s,MISSING)", series.Name())
nanValues := ts.NewConstantValues(ctx, math.NaN(), series.Len(), series.MillisPerStep())
newSeries := ts.NewSeries(ctx, newName, series.StartTime(), nanValues)
results = append(results, newSeries)
continue
}

normalized, start, _, millisPerStep, err := common.Normalize(ctx,
ts.NewSeriesListWithSeries(series, total))
if err != nil {
return ts.NewSeriesList(), err
}
toNormalize = append(input.Values, groupedTotal.Values[0])
metaSeriesSumByKey := make(map[string]*ts.Series)

// map the aggregation key to the aggregated series
for _, series := range groupedTotal.Values {
metaSeriesSumByKey[series.Name()] = series
steps := normalized.Values[0].Len()
values := ts.NewValues(ctx, millisPerStep, steps)
for i := 0; i < steps; i++ {
v, t := normalized.Values[0].ValueAt(i), normalized.Values[1].ValueAt(i)
if !math.IsNaN(v) && !math.IsNaN(t) && t != 0 {
values.SetValueAt(i, (v/t)*100.0)
}
}

tf = func(idx int, series *ts.Series) float64 {
// find which aggregation key this series falls under
// and return the sum for that aggregated group
key := getAggregationKey(series, nodes)
return metaSeriesSumByKey[key].ValueAt(idx)
}
totalText = groupedTotal.Values[0].Name()
} else {
toNormalize = append(input.Values, total.Values[0])
tf = func(idx int, _ *ts.Series) float64 { return normalized[len(normalized)-1].ValueAt(idx) }
totalText = total.Values[0].Name()
newName := fmt.Sprintf("asPercent(%s,%s)", series.Name(), total.Name())
divided := ts.NewSeries(ctx, newName, start, values)
results = append(results, divided)
}
}

result := ts.NewSeriesList()
result.Values = results
return result, nil
}

var totalSeriesList ts.SeriesList
switch totalArg := total.(type) {
case float64:
toNormalize = input.Values
tf = func(idx int, _ *ts.Series) float64 { return totalArg }
totalText = fmt.Sprintf(common.FloatingPointFormat, totalArg)
// No normalization required, simply divide each by constant.
results := make([]*ts.Series, 0, len(input.Values))
totalText := fmt.Sprintf(common.FloatingPointFormat, totalArg)
for _, series := range input.Values {
steps := series.Len()
values := ts.NewValues(ctx, series.MillisPerStep(), steps)
for i := 0; i < steps; i++ {
v, t := series.ValueAt(i), totalArg
if !math.IsNaN(v) && !math.IsNaN(t) && t != 0 {
values.SetValueAt(i, (v/t)*100.0)
}
}
newName := fmt.Sprintf("asPercent(%s,%s)", series.Name(), totalText)
divided := ts.NewSeries(ctx, newName, series.StartTime(), values)
results = append(results, divided)
}
result := ts.NewSeriesList()
result.Values = results
return result, nil
case nil:
// if total is nil, the total is the sum of all the input series
toNormalize = input.Values
var err error
summedSeries, err := sumSeries(ctx, multiplePathSpecs(input))
// Totals of the total sum.
sum, err := sumSeries(ctx, multiplePathSpecs(input))
if err != nil {
return ts.NewSeriesList(), err
}
tf = func(idx int, _ *ts.Series) float64 { return summedSeries.Values[0].ValueAt(idx) }
totalText = summedSeries.Values[0].Name()
totalSeriesList = sum
case ts.SeriesList, singlePathSpec:
// Total of a single series or same number of matching series.
var total ts.SeriesList
switch v := totalArg.(type) {
case ts.SeriesList:
total = v
case singlePathSpec:
total = ts.SeriesList(v)
}
if total.Len() != 0 && total.Len() != 1 && total.Len() != len(input.Values) {
return ts.NewSeriesList(), errors.NewInvalidParamsError(fmt.Errorf(
"require total to be nil, float, single series or same number of series: series=%d, total=%d",
len(input.Values), total.Len()))
}
if total.Len() == 0 {
// Same as nil take as sum of input.
sum, err := sumSeries(ctx, multiplePathSpecs(input))
if err != nil {
return ts.NewSeriesList(), err
}
totalSeriesList = sum
} else {
// Sort both by name so they get divided by same name if same length
// or closest match.
sort.Slice(input.Values, func(i, j int) bool {
return strings.Compare(input.Values[i].Name(), input.Values[j].Name()) < 0
})
sort.Slice(total.Values, func(i, j int) bool {
return strings.Compare(total.Values[i].Name(), total.Values[j].Name()) < 0
})
totalSeriesList = total
}
default:
err := errors.NewInvalidParamsError(errors.New("total must be either an int, a series, or nil"))
err := errors.NewInvalidParamsError(errors.New(
"total must be either an nil, float, a series or same number of series"))
return ts.NewSeriesList(), err
}

result, _, _, _, err := common.Normalize(ctx, ts.SeriesList{
Values: toNormalize,
Metadata: input.Metadata,
})
if err != nil {
return ts.NewSeriesList(), err
}
results := make([]*ts.Series, 0, len(input.Values))
for idx, series := range input.Values {
totalSeries := totalSeriesList.Values[0]
if totalSeriesList.Len() == len(input.Values) {
// Divide each by their matching total if matching
// number of total.
totalSeries = totalSeriesList.Values[idx]
}
normalized, start, _, millisPerStep, err := common.Normalize(ctx,
ts.NewSeriesListWithSeries(series, totalSeries))
if err != nil {
return ts.NewSeriesList(), nil
}

normalized = result.Values
numInputSeries := len(input.Values)
values := make([]ts.MutableValues, 0, numInputSeries)
for i := 0; i < numInputSeries; i++ {
percents := ts.NewValues(ctx, normalized[i].MillisPerStep(), normalized[i].Len())
values = append(values, percents)
}
for i := 0; i < normalized[0].Len(); i++ {
for j := 0; j < numInputSeries; j++ {
t := tf(i, normalized[j])
v := normalized[j].ValueAt(i)
steps := normalized.Values[0].Len()
values := ts.NewValues(ctx, millisPerStep, steps)
for i := 0; i < steps; i++ {
v, t := normalized.Values[0].ValueAt(i), normalized.Values[1].ValueAt(i)
if !math.IsNaN(v) && !math.IsNaN(t) && t != 0 {
values[j].SetValueAt(i, 100.0*v/t)
values.SetValueAt(i, (v/t)*100.0)
}
}
newName := fmt.Sprintf("asPercent(%s,%s)", series.Name(), totalSeries.Name())
divided := ts.NewSeries(ctx, newName, start, values)
results = append(results, divided)
}

results := make([]*ts.Series, 0, numInputSeries)
for i := 0; i < numInputSeries; i++ {
var totalName string
if len(totalText) == 0 {
totalName = normalized[i].Specification
} else {
totalName = totalText
}
newName := fmt.Sprintf("asPercent(%s, %s)", normalized[i].Name(), totalName)
newSeries := ts.NewSeries(ctx, newName, normalized[i].StartTime(), values[i])
results = append(results, newSeries)
}

r := ts.SeriesList(input)
r.Values = results
return r, nil
result := ts.NewSeriesList()
result.Values = results
return result, nil
}

// exclude takes a metric or a wildcard seriesList, followed by a regular
Expand Down Expand Up @@ -2116,7 +2211,7 @@ func newMovingBinaryTransform(

results := make([]*ts.Series, 0, original.Len())
maxWindowPoints := 0
for i, _ := range bootstrapList.Values {
for i := range bootstrapList.Values {
series := original.Values[i]
windowPoints := windowPointsLength(series, interval)
if windowPoints <= 0 {
Expand Down
Loading