Skip to content

Commit

Permalink
add Nomad APM query relative to allocated memory values
Browse files Browse the repository at this point in the history
  • Loading branch information
lgfa29 committed Jan 5, 2021
1 parent 98a1a1d commit b99a20f
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 13 deletions.
60 changes: 52 additions & 8 deletions plugins/builtin/apm/nomad/plugin/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,23 @@ func (a *APMPlugin) getTaskGroupResourceUsage(query *taskGroupQuery) ([]float64,
metricFunc = func(m *[]float64, ru *api.ResourceUsage) {
*m = append(*m, float64(ru.MemoryStats.Usage))
}
case queryMetricMemAllocated:

// Similarly to `queryMetricCPUAllocated` we must calculate the alloacted
// memory since it's not provided as a metric.
allocatedMem, err := a.getAllocatedMemForTaskGroup(query.job, query.group)
if err != nil {
return nil, fmt.Errorf("failed to get total alloacted memory for taskgroup: %v", err)
}

// Create the metric function now that the total allocated memory is known.
// The job info has total memory in memibytes (incorrectly labeled as MB) and
// the `MemoryStats.Usage` metric is reported in bytes, so we must convert one
// of them.
metricFunc = func(m *[]float64, ru *api.ResourceUsage) {
usageMiB := ru.MemoryStats.Usage / 1024 / 1024
*m = append(*m, (float64(usageMiB)/float64(allocatedMem))*100)
}
}

for _, alloc := range allocs {
Expand Down Expand Up @@ -124,14 +141,9 @@ func (a *APMPlugin) getTaskGroupResourceUsage(query *taskGroupQuery) ([]float64,

// getAllocatedCPUForTaskGroup calculates the total allocated CPU in MHz for a taskgroup
func (a *APMPlugin) getAllocatedCPUForTaskGroup(job, taskgroup string) (int, error) {
jobInfo, _, err := a.client.Jobs().Info(job, nil)
taskGroupConfig, err := a.getTaskGroup(job, taskgroup)
if err != nil {
return -1, fmt.Errorf("failed to get info for job: %v", err)
}

taskGroupConfig := jobInfo.LookupTaskGroup(taskgroup)
if taskGroupConfig == nil {
return -1, fmt.Errorf("specified taskgroup was not found in job config")
return -1, err
}

taskGroupAllocatedCPU := 0
Expand All @@ -144,6 +156,38 @@ func (a *APMPlugin) getAllocatedCPUForTaskGroup(job, taskgroup string) (int, err
return taskGroupAllocatedCPU, nil
}

// getAllocatedMemForTaskGroup calculates the total allocated memory in MiB for a taskgroup
func (a *APMPlugin) getAllocatedMemForTaskGroup(job, taskgroup string) (int, error) {
taskGroupConfig, err := a.getTaskGroup(job, taskgroup)
if err != nil {
return -1, err
}

taskGroupAllocatedMem := 0
for _, task := range taskGroupConfig.Tasks {
if task.Resources == nil || task.Resources.MemoryMB == nil {
continue
}
taskGroupAllocatedMem += *task.Resources.MemoryMB
}
return taskGroupAllocatedMem, nil
}

// getTaskGroup returns a task group configuration from a job.
func (a *APMPlugin) getTaskGroup(job, taskgroup string) (*api.TaskGroup, error) {
jobInfo, _, err := a.client.Jobs().Info(job, nil)
if err != nil {
return nil, fmt.Errorf("failed to get info for job: %v", err)
}

taskGroupConfig := jobInfo.LookupTaskGroup(taskgroup)
if taskGroupConfig == nil {
return nil, fmt.Errorf("task group %q not found in job %q", taskgroup, job)
}

return taskGroupConfig, nil
}

// calculateTaskGroupResult determines the query result based on the metrics
// and operation to perform.
func calculateTaskGroupResult(op string, metrics []float64) sdk.TimestampedMetrics {
Expand Down Expand Up @@ -223,5 +267,5 @@ func parseTaskGroupQuery(q string) (*taskGroupQuery, error) {
}

func validateMetricTaskGroupQuery(metric string) error {
return validateMetric(metric, []string{queryMetricCPU, queryMetricCPUAllocated, queryMetricMem})
return validateMetric(metric, []string{queryMetricCPU, queryMetricCPUAllocated, queryMetricMem, queryMetricMemAllocated})
}
11 changes: 11 additions & 0 deletions plugins/builtin/apm/nomad/plugin/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,17 @@ func Test_parseTaskGroupQuery(t *testing.T) {
},
expectError: false,
},
{
name: "avg_memory-allocated",
input: "taskgroup_avg_memory-allocated/group/job",
expected: &taskGroupQuery{
metric: "memory-allocated",
job: "job",
group: "group",
operation: "avg",
},
expectError: false,
},
{
name: "job with fwd slashes",
input: "taskgroup_avg_cpu/group/my/super/job//",
Expand Down
1 change: 1 addition & 0 deletions plugins/builtin/apm/nomad/plugin/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const (
queryMetricCPU = "cpu"
queryMetricCPUAllocated = "cpu-allocated"
queryMetricMem = "memory"
queryMetricMemAllocated = "memory-allocated"
)

// Query satisfies the Query function on the apm.APM interface.
Expand Down
16 changes: 11 additions & 5 deletions plugins/builtin/apm/nomad/plugin/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,32 @@ func Test_validateMetric(t *testing.T) {
}{
{
inputMetric: "memory",
inputValidMetrics: []string{queryMetricCPU, queryMetricCPUAllocated, queryMetricMem},
inputValidMetrics: []string{queryMetricCPU, queryMetricCPUAllocated, queryMetricMem, queryMetricMemAllocated},
expectedOutput: nil,
name: "memory metric",
},
{
inputMetric: "memory-allocated",
inputValidMetrics: []string{queryMetricCPU, queryMetricCPUAllocated, queryMetricMem, queryMetricMemAllocated},
expectedOutput: nil,
name: "memory-allocated metric",
},
{
inputMetric: "cpu",
inputValidMetrics: []string{queryMetricCPU, queryMetricCPUAllocated, queryMetricMem},
inputValidMetrics: []string{queryMetricCPU, queryMetricCPUAllocated, queryMetricMem, queryMetricMemAllocated},
expectedOutput: nil,
name: "cpu metric",
},
{
inputMetric: "cpu-allocated",
inputValidMetrics: []string{queryMetricCPU, queryMetricCPUAllocated, queryMetricMem},
inputValidMetrics: []string{queryMetricCPU, queryMetricCPUAllocated, queryMetricMem, queryMetricMemAllocated},
expectedOutput: nil,
name: "cpu-allocated metric",
},
{
inputMetric: "cost-of-server",
inputValidMetrics: []string{queryMetricCPU, queryMetricCPUAllocated, queryMetricMem},
expectedOutput: errors.New("invalid metric \"cost-of-server\", allowed values are: cpu, cpu-allocated, memory"),
inputValidMetrics: []string{queryMetricCPU, queryMetricCPUAllocated, queryMetricMem, queryMetricMemAllocated},
expectedOutput: errors.New("invalid metric \"cost-of-server\", allowed values are: cpu, cpu-allocated, memory, memory-allocated"),
name: "invalid metric",
},
}
Expand Down

0 comments on commit b99a20f

Please sign in to comment.