From b99a20f0349e1ad0e2ca1200ca78669ec30804f5 Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Tue, 5 Jan 2021 17:27:19 -0500 Subject: [PATCH] add Nomad APM query relative to allocated memory values --- plugins/builtin/apm/nomad/plugin/job.go | 60 ++++++++++++++++--- plugins/builtin/apm/nomad/plugin/job_test.go | 11 ++++ plugins/builtin/apm/nomad/plugin/metrics.go | 1 + .../builtin/apm/nomad/plugin/metrics_test.go | 16 +++-- 4 files changed, 75 insertions(+), 13 deletions(-) diff --git a/plugins/builtin/apm/nomad/plugin/job.go b/plugins/builtin/apm/nomad/plugin/job.go index 5e094de7..100191e5 100644 --- a/plugins/builtin/apm/nomad/plugin/job.go +++ b/plugins/builtin/apm/nomad/plugin/job.go @@ -87,6 +87,23 @@ func (a *APMPlugin) getTaskGroupResourceUsage(query *taskGroupQuery) ([]float64, metricFunc = func(m *[]float64, ru *api.ResourceUsage) { *m = append(*m, float64(ru.MemoryStats.Usage)) } + case queryMetricMemAllocated: + + // Similarly to `queryMetricCPUAllocated` we must calculate the alloacted + // memory since it's not provided as a metric. + allocatedMem, err := a.getAllocatedMemForTaskGroup(query.job, query.group) + if err != nil { + return nil, fmt.Errorf("failed to get total alloacted memory for taskgroup: %v", err) + } + + // Create the metric function now that the total allocated memory is known. + // The job info has total memory in memibytes (incorrectly labeled as MB) and + // the `MemoryStats.Usage` metric is reported in bytes, so we must convert one + // of them. + metricFunc = func(m *[]float64, ru *api.ResourceUsage) { + usageMiB := ru.MemoryStats.Usage / 1024 / 1024 + *m = append(*m, (float64(usageMiB)/float64(allocatedMem))*100) + } } for _, alloc := range allocs { @@ -124,14 +141,9 @@ func (a *APMPlugin) getTaskGroupResourceUsage(query *taskGroupQuery) ([]float64, // getAllocatedCPUForTaskGroup calculates the total allocated CPU in MHz for a taskgroup func (a *APMPlugin) getAllocatedCPUForTaskGroup(job, taskgroup string) (int, error) { - jobInfo, _, err := a.client.Jobs().Info(job, nil) + taskGroupConfig, err := a.getTaskGroup(job, taskgroup) if err != nil { - return -1, fmt.Errorf("failed to get info for job: %v", err) - } - - taskGroupConfig := jobInfo.LookupTaskGroup(taskgroup) - if taskGroupConfig == nil { - return -1, fmt.Errorf("specified taskgroup was not found in job config") + return -1, err } taskGroupAllocatedCPU := 0 @@ -144,6 +156,38 @@ func (a *APMPlugin) getAllocatedCPUForTaskGroup(job, taskgroup string) (int, err return taskGroupAllocatedCPU, nil } +// getAllocatedMemForTaskGroup calculates the total allocated memory in MiB for a taskgroup +func (a *APMPlugin) getAllocatedMemForTaskGroup(job, taskgroup string) (int, error) { + taskGroupConfig, err := a.getTaskGroup(job, taskgroup) + if err != nil { + return -1, err + } + + taskGroupAllocatedMem := 0 + for _, task := range taskGroupConfig.Tasks { + if task.Resources == nil || task.Resources.MemoryMB == nil { + continue + } + taskGroupAllocatedMem += *task.Resources.MemoryMB + } + return taskGroupAllocatedMem, nil +} + +// getTaskGroup returns a task group configuration from a job. +func (a *APMPlugin) getTaskGroup(job, taskgroup string) (*api.TaskGroup, error) { + jobInfo, _, err := a.client.Jobs().Info(job, nil) + if err != nil { + return nil, fmt.Errorf("failed to get info for job: %v", err) + } + + taskGroupConfig := jobInfo.LookupTaskGroup(taskgroup) + if taskGroupConfig == nil { + return nil, fmt.Errorf("task group %q not found in job %q", taskgroup, job) + } + + return taskGroupConfig, nil +} + // calculateTaskGroupResult determines the query result based on the metrics // and operation to perform. func calculateTaskGroupResult(op string, metrics []float64) sdk.TimestampedMetrics { @@ -223,5 +267,5 @@ func parseTaskGroupQuery(q string) (*taskGroupQuery, error) { } func validateMetricTaskGroupQuery(metric string) error { - return validateMetric(metric, []string{queryMetricCPU, queryMetricCPUAllocated, queryMetricMem}) + return validateMetric(metric, []string{queryMetricCPU, queryMetricCPUAllocated, queryMetricMem, queryMetricMemAllocated}) } diff --git a/plugins/builtin/apm/nomad/plugin/job_test.go b/plugins/builtin/apm/nomad/plugin/job_test.go index 8dcdae5a..21bd6680 100644 --- a/plugins/builtin/apm/nomad/plugin/job_test.go +++ b/plugins/builtin/apm/nomad/plugin/job_test.go @@ -87,6 +87,17 @@ func Test_parseTaskGroupQuery(t *testing.T) { }, expectError: false, }, + { + name: "avg_memory-allocated", + input: "taskgroup_avg_memory-allocated/group/job", + expected: &taskGroupQuery{ + metric: "memory-allocated", + job: "job", + group: "group", + operation: "avg", + }, + expectError: false, + }, { name: "job with fwd slashes", input: "taskgroup_avg_cpu/group/my/super/job//", diff --git a/plugins/builtin/apm/nomad/plugin/metrics.go b/plugins/builtin/apm/nomad/plugin/metrics.go index b87e1195..232e133e 100644 --- a/plugins/builtin/apm/nomad/plugin/metrics.go +++ b/plugins/builtin/apm/nomad/plugin/metrics.go @@ -27,6 +27,7 @@ const ( queryMetricCPU = "cpu" queryMetricCPUAllocated = "cpu-allocated" queryMetricMem = "memory" + queryMetricMemAllocated = "memory-allocated" ) // Query satisfies the Query function on the apm.APM interface. diff --git a/plugins/builtin/apm/nomad/plugin/metrics_test.go b/plugins/builtin/apm/nomad/plugin/metrics_test.go index 59094067..46dee610 100644 --- a/plugins/builtin/apm/nomad/plugin/metrics_test.go +++ b/plugins/builtin/apm/nomad/plugin/metrics_test.go @@ -16,26 +16,32 @@ func Test_validateMetric(t *testing.T) { }{ { inputMetric: "memory", - inputValidMetrics: []string{queryMetricCPU, queryMetricCPUAllocated, queryMetricMem}, + inputValidMetrics: []string{queryMetricCPU, queryMetricCPUAllocated, queryMetricMem, queryMetricMemAllocated}, expectedOutput: nil, name: "memory metric", }, + { + inputMetric: "memory-allocated", + inputValidMetrics: []string{queryMetricCPU, queryMetricCPUAllocated, queryMetricMem, queryMetricMemAllocated}, + expectedOutput: nil, + name: "memory-allocated metric", + }, { inputMetric: "cpu", - inputValidMetrics: []string{queryMetricCPU, queryMetricCPUAllocated, queryMetricMem}, + inputValidMetrics: []string{queryMetricCPU, queryMetricCPUAllocated, queryMetricMem, queryMetricMemAllocated}, expectedOutput: nil, name: "cpu metric", }, { inputMetric: "cpu-allocated", - inputValidMetrics: []string{queryMetricCPU, queryMetricCPUAllocated, queryMetricMem}, + inputValidMetrics: []string{queryMetricCPU, queryMetricCPUAllocated, queryMetricMem, queryMetricMemAllocated}, expectedOutput: nil, name: "cpu-allocated metric", }, { inputMetric: "cost-of-server", - inputValidMetrics: []string{queryMetricCPU, queryMetricCPUAllocated, queryMetricMem}, - expectedOutput: errors.New("invalid metric \"cost-of-server\", allowed values are: cpu, cpu-allocated, memory"), + inputValidMetrics: []string{queryMetricCPU, queryMetricCPUAllocated, queryMetricMem, queryMetricMemAllocated}, + expectedOutput: errors.New("invalid metric \"cost-of-server\", allowed values are: cpu, cpu-allocated, memory, memory-allocated"), name: "invalid metric", }, }