Skip to content

Commit

Permalink
Merge pull request #125 from hashicorp/gh-124
Browse files Browse the repository at this point in the history
plugin: fix Nomad APM bug when querying groups on multiple clients.
  • Loading branch information
jrasell authored May 18, 2020
2 parents 6edf5f5 + 00be9d2 commit 0057b81
Show file tree
Hide file tree
Showing 2 changed files with 180 additions and 92 deletions.
188 changes: 123 additions & 65 deletions plugins/builtin/apm/nomad/plugin/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,107 +4,165 @@ import (
"fmt"
"math"
"strings"
)

type Metrics struct {
Counters []Counter
Gauges []Gauge
Samples []Sample
}

type Counter struct {
Count float64
Labels map[string]string
Max float64
Mean float64
Min float64
Name string
Rate float64
Stddev float64
Sum float64
}
"github.com/hashicorp/nomad/api"
)

type Gauge struct {
Labels map[string]string
Name string
Value float64
// query is the plugins internal representation of a query and contains all the
// information needed to perform a Nomad APM query.
type query struct {
metric string
job string
group string
operation string
}

type Sample struct {
Counter
}
const (
// queryOps are the supported operators.
queryOpSum = "sum"
queryOpAvg = "avg"
queryOpMax = "max"
queryOpMin = "min"

type Query struct {
Metric string
Job string
Group string
Operation string
}
// queryMetrics are the supported resources for querying.
queryMetricCPU = "cpu"
queryMetricMem = "memory"
)

func (a *APMPlugin) Query(q string) (float64, error) {

// Parse the query ensuring we have all information available to make all
// subsequent calls.
query, err := parseQuery(q)
if err != nil {
return 0, fmt.Errorf("failed to parse query: %v", err)
}

a.logger.Debug("expanded query", "from", q, "to", fmt.Sprintf("%# v", query))

var resp Metrics
_, err = a.client.Raw().Query("/v1/metrics", &resp, nil)
metrics, err := a.getTaskGroupResourceUsage(query)
if err != nil {
return 0, err
}

metrics := []Gauge{}
for _, g := range resp.Gauges {
if g.Name == query.Metric && g.Labels["job"] == query.Job && g.Labels["task_group"] == query.Group {
metrics = append(metrics, g)
if len(metrics) == 0 {
return 0, fmt.Errorf("metric not found: %s", q)
}
a.logger.Debug("metrics found", "num_data_points", len(metrics), "query", q)

return calculateResult(query.operation, metrics), nil
}

// getTaskGroupResourceUsage iterates the allocations within a job and
// identifies those which meet the criteria for being part of the calculation.
func (a *APMPlugin) getTaskGroupResourceUsage(query *query) ([]float64, error) {

// Grab the list of allocations assigned to the job in question.
allocs, _, err := a.client.Jobs().Allocations(query.job, false, nil)
if err != nil {
return nil, fmt.Errorf("failed to get alloc listing for job: %v", err)
}

// The response is a list of data points from each allocation running in
// the task group.
var resp []float64

// Define a function that manages updating our response.
metricFunc := func(m *[]float64, ru *api.ResourceUsage) {}

// Depending on the desired metric, the function will append different data
// to the response. Using a function means we only have to perform the
// switch a single time, rather than on a per allocation basis.
switch query.metric {
case queryMetricCPU:
metricFunc = func(m *[]float64, ru *api.ResourceUsage) {
*m = append(*m, ru.CpuStats.Percent)
}
case queryMetricMem:
metricFunc = func(m *[]float64, ru *api.ResourceUsage) {
*m = append(*m, float64(ru.MemoryStats.Usage))
}
}

if len(metrics) == 0 {
return 0, fmt.Errorf("metric not found: %s", q)
for _, alloc := range allocs {

// If the allocation is not running, or is not part of the target task
// group then we should skip and move onto the next allocation.
if alloc.ClientStatus != api.AllocClientStatusRunning || alloc.TaskGroup != query.group {
continue
}

// Obtains the statistics for the task group allocation. If we get a
// single error during the iteration, we cannot reliably make a scaling
// calculation.
//
// When calling Stats an entire Allocation object is needed, but only
// the ID is used within the call. Further details:
// https://github.com/hashicorp/nomad/issues/7955
allocStats, err := a.client.Allocations().Stats(&api.Allocation{ID: alloc.ID}, nil)
if err != nil {
return nil, fmt.Errorf("failed to get alloc stats: %v", err)
}

// Be safe, be sensible.
if allocStats == nil {
continue
}

// Call the metric function to append the allocation resource metric to
// the response.
metricFunc(&resp, allocStats.ResourceUsage)
}

return resp, nil
}

// calculateResult determines the query result based on the metrics and
// operation to perform.
func calculateResult(op string, metrics []float64) float64 {

var result float64
switch query.Operation {
case "sum":

switch op {
case queryOpSum:
for _, m := range metrics {
result += m.Value
result += m
}
case "avg":
case queryOpAvg:
for _, m := range metrics {
result += m.Value
result += m
}
result /= float64(len(metrics))
case "max":
case queryOpMax:
result = math.SmallestNonzeroFloat64
for _, m := range metrics {
if m.Value > result {
result = m.Value
if m > result {
result = m
}
}
case "min":
case queryOpMin:
result = math.MaxFloat64
for _, m := range metrics {
if m.Value < result {
result = m.Value
if m < result {
result = m
}
}
}

return result, nil
return result
}

func parseQuery(q string) (*Query, error) {
// parseQuery takes the query string and transforms it into our internal query
// representation. Parsing validates that the returned query is usable by all
// subsequent calls but cannot ensure the job or group will actually be found
// on the cluster.
func parseQuery(q string) (*query, error) {
mainParts := strings.SplitN(q, "/", 3)
if len(mainParts) != 3 {
return nil, fmt.Errorf("expected <query>/<job>/group>, received %s", q)
}

query := &Query{
Group: mainParts[1],
Job: mainParts[2],
query := &query{
group: mainParts[1],
job: mainParts[2],
}

opMetricParts := strings.SplitN(mainParts[0], "_", 2)
Expand All @@ -116,19 +174,19 @@ func parseQuery(q string) (*Query, error) {
metric := opMetricParts[1]

switch metric {
case "cpu":
query.Metric = "nomad.client.allocs.cpu.total_percent"
case "memory":
query.Metric = "nomad.client.allocs.memory.usage"
case queryMetricCPU, queryMetricMem:
query.metric = metric
default:
query.Metric = metric
return nil, fmt.Errorf(`invalid metric %q, allowed values are %s or %s`,
metric, queryMetricCPU, queryMetricMem)
}

switch op {
case "sum", "avg", "min", "max":
query.Operation = op
case queryOpSum, queryOpAvg, queryOpMin, queryOpMax:
query.operation = op
default:
return nil, fmt.Errorf(`invalid operation "%s", allowed values are sum, avg, min or max`, op)
return nil, fmt.Errorf(`invalid operation %q, allowed values are %s, %s, %s or %s`,
op, queryOpSum, queryOpAvg, queryOpMin, queryOpMax)
}

return query, nil
Expand Down
84 changes: 57 additions & 27 deletions plugins/builtin/apm/nomad/plugin/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,54 +6,84 @@ import (
"github.com/stretchr/testify/assert"
)

func Test_calculateResult(t *testing.T) {
testCases := []struct {
inputOp string
inputMetrics []float64
expectedOutput float64
name string
}{
{
inputOp: queryOpSum,
inputMetrics: []float64{76.34, 13.13, 24.50},
expectedOutput: 113.97,
name: "sum operation",
},
{
inputOp: queryOpAvg,
inputMetrics: []float64{76.34, 13.13, 24.50},
expectedOutput: 37.99,
name: "avg operation",
},
{
inputOp: queryOpMax,
inputMetrics: []float64{76.34, 13.13, 24.50, 76.33},
expectedOutput: 76.34,
name: "max operation",
},
{
inputOp: queryOpMin,
inputMetrics: []float64{76.34, 13.13, 24.50, 13.14},
expectedOutput: 13.13,
name: "min operation",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
actualOutput := calculateResult(tc.inputOp, tc.inputMetrics)
assert.Equal(t, tc.expectedOutput, actualOutput, tc.name)
})
}
}

func Test_parseQuery(t *testing.T) {
testCases := []struct {
name string
input string
expected *Query
expected *query
expectError bool
}{
{
name: "avg_cpu",
input: "avg_cpu/group/job",
expected: &Query{
Metric: "nomad.client.allocs.cpu.total_percent",
Job: "job",
Group: "group",
Operation: "avg",
expected: &query{
metric: "cpu",
job: "job",
group: "group",
operation: "avg",
},
expectError: false,
},
{
name: "avg_memory",
input: "avg_memory/group/job",
expected: &Query{
Metric: "nomad.client.allocs.memory.usage",
Job: "job",
Group: "group",
Operation: "avg",
},
expectError: false,
},
{
name: "arbritary metric",
input: "avg_nomad.client.allocs.cpu.total_percent/group/job",
expected: &Query{
Metric: "nomad.client.allocs.cpu.total_percent",
Job: "job",
Group: "group",
Operation: "avg",
expected: &query{
metric: "memory",
job: "job",
group: "group",
operation: "avg",
},
expectError: false,
},
{
name: "job with fwd slashes",
input: "avg_cpu/group/my/super/job//",
expected: &Query{
Metric: "nomad.client.allocs.cpu.total_percent",
Job: "my/super/job//",
Group: "group",
Operation: "avg",
expected: &query{
metric: "cpu",
job: "my/super/job//",
group: "group",
operation: "avg",
},
expectError: false,
},
Expand Down

0 comments on commit 0057b81

Please sign in to comment.