Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[m3query] Add graphite function support - groupByNodes #2579

Merged
merged 3 commits into from
Sep 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions src/query/graphite/native/aggregation_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,55 @@ func groupByNode(ctx *common.Context, series singlePathSpec, node int, fname str
metaSeries[key] = append(metaSeries[key], s)
}

return applyFnToMetaSeries(ctx, series, metaSeries, fname)
}

// Takes a serieslist and maps a callback to subgroups within as defined by multiple nodes
//
// &target=groupByNodes(ganglia.server*.*.cpu.load*,"sum",1,4)
//
// Would return multiple series which are each the result of applying the “sum” aggregation to groups joined on the
// nodes’ list (0 indexed) resulting in a list of targets like
//
// sumSeries(ganglia.server1.*.cpu.load5),sumSeries(ganglia.server1.*.cpu.load10),sumSeries(ganglia.server1.*.cpu.load15),
// sumSeries(ganglia.server2.*.cpu.load5),sumSeries(ganglia.server2.*.cpu.load10),sumSeries(ganglia.server2.*.cpu.load15),...
//
// NOTE: if len(nodes) = 0, aggregate all series into 1 series.
func groupByNodes(ctx *common.Context, series singlePathSpec, fname string, nodes ...int) (ts.SeriesList, error) {
metaSeries := make(map[string][]*ts.Series)

nodeLen := len(nodes)
if nodeLen == 0 {
key := "*" // put into single group, not ideal, but more graphite-ish.
for _, s := range series.Values {
metaSeries[key] = append(metaSeries[key], s)
}
} else {
for _, s := range series.Values {
parts := strings.Split(s.Name(), ".")

var keys []string
for _, n := range nodes {
if n < 0 {
n = len(parts) + n
}

if n >= len(parts) || n < 0 {
err := errors.NewInvalidParamsError(fmt.Errorf("could not group %s by nodes %v; not enough parts", s.Name(), nodes))
return ts.NewSeriesList(), err
}

keys = append(keys, parts[n])
}
key := strings.Join(keys, ".")
metaSeries[key] = append(metaSeries[key], s)
}
}

return applyFnToMetaSeries(ctx, series, metaSeries, fname)
}

func applyFnToMetaSeries(ctx *common.Context, series singlePathSpec, metaSeries map[string][]*ts.Series, fname string) (ts.SeriesList, error) {
if fname == "" {
fname = "sum"
}
Expand Down
79 changes: 79 additions & 0 deletions src/query/graphite/native/aggregation_functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,85 @@ func TestGroupByNode(t *testing.T) {
}
}

func TestGroupByNodes(t *testing.T) {
var (
start, _ = time.Parse(time.RFC1123, "Mon, 27 Jul 2015 19:41:19 GMT")
end, _ = time.Parse(time.RFC1123, "Mon, 27 Jul 2015 19:43:19 GMT")
ctx = common.NewContext(common.ContextOptions{Start: start, End: end})
inputs = []*ts.Series{
ts.NewSeries(ctx, "servers.foo-1.pod1.status.500", start,
ts.NewConstantValues(ctx, 2, 12, 10000)),
ts.NewSeries(ctx, "servers.foo-2.pod1.status.500", start,
ts.NewConstantValues(ctx, 4, 12, 10000)),
ts.NewSeries(ctx, "servers.foo-3.pod1.status.500", start,
ts.NewConstantValues(ctx, 6, 12, 10000)),
ts.NewSeries(ctx, "servers.foo-1.pod2.status.500", start,
ts.NewConstantValues(ctx, 8, 12, 10000)),
ts.NewSeries(ctx, "servers.foo-2.pod2.status.500", start,
ts.NewConstantValues(ctx, 10, 12, 10000)),

ts.NewSeries(ctx, "servers.foo-1.pod1.status.400", start,
ts.NewConstantValues(ctx, 20, 12, 10000)),
ts.NewSeries(ctx, "servers.foo-2.pod1.status.400", start,
ts.NewConstantValues(ctx, 30, 12, 10000)),
ts.NewSeries(ctx, "servers.foo-3.pod2.status.400", start,
ts.NewConstantValues(ctx, 40, 12, 10000)),
}
)
defer ctx.Close()

type result struct {
name string
sumOfVals float64
}

tests := []struct {
fname string
nodes []int
expectedResults []result
}{
{"avg", []int{2, 4}, []result{ // test normal group by nodes
{"pod1.400", ((20 + 30) / 2) * 12},
{"pod1.500", ((2 + 4 + 6) / 3) * 12},
{"pod2.400", (40 / 1) * 12},
{"pod2.500", ((8 + 10) / 2) * 12},
}},
{"max", []int{2, 4}, []result{ // test with different function
{"pod1.400", 30 * 12},
{"pod1.500", 6 * 12},
{"pod2.400", 40 * 12},
{"pod2.500", 10 * 12},
}},
{"min", []int{2, -1}, []result{ // test negative index handling
{"pod1.400", 20 * 12},
{"pod1.500", 2 * 12},
{"pod2.400", 40 * 12},
{"pod2.500", 8 * 12},
}},
{"sum", []int{}, []result{ // test empty slice handing.
{"*", (2 + 4 + 6 + 8 + 10 + 20 + 30 + 40) * 12},
}},
}

for _, test := range tests {
outSeries, err := groupByNodes(ctx, singlePathSpec{
Values: inputs,
}, test.fname, test.nodes...)
require.NoError(t, err)
require.Equal(t, len(test.expectedResults), len(outSeries.Values))

outSeries, _ = sortByName(ctx, singlePathSpec(outSeries))

for i, expected := range test.expectedResults {
series := outSeries.Values[i]
assert.Equal(t, expected.name, series.Name(),
"wrong name for %v %s (%d)", test.nodes, test.fname, i)
assert.Equal(t, expected.sumOfVals, series.SafeSum(),
"wrong result for %v %s (%d)", test.nodes, test.fname, i)
}
}
}

func TestWeightedAverage(t *testing.T) {
ctx, _ := newConsolidationTestSeries()
defer ctx.Close()
Expand Down
1 change: 1 addition & 0 deletions src/query/graphite/native/builtin_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -1892,6 +1892,7 @@ func init() {
MustRegisterFunction(fallbackSeries)
MustRegisterFunction(group)
MustRegisterFunction(groupByNode)
MustRegisterFunction(groupByNodes)
MustRegisterFunction(highestAverage)
MustRegisterFunction(highestCurrent)
MustRegisterFunction(highestMax)
Expand Down
1 change: 1 addition & 0 deletions src/query/graphite/native/builtin_functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2934,6 +2934,7 @@ func TestFunctionsRegistered(t *testing.T) {
"fallbackSeries",
"group",
"groupByNode",
"groupByNodes",
"highestAverage",
"highestCurrent",
"highestMax",
Expand Down