Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query Nomad APM for memory values relative to task group #334

Merged
merged 1 commit into from
Jan 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 52 additions & 8 deletions plugins/builtin/apm/nomad/plugin/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,23 @@ func (a *APMPlugin) getTaskGroupResourceUsage(query *taskGroupQuery) ([]float64,
metricFunc = func(m *[]float64, ru *api.ResourceUsage) {
*m = append(*m, float64(ru.MemoryStats.Usage))
}
case queryMetricMemAllocated:

// Similarly to `queryMetricCPUAllocated` we must calculate the alloacted
// memory since it's not provided as a metric.
allocatedMem, err := a.getAllocatedMemForTaskGroup(query.job, query.group)
if err != nil {
return nil, fmt.Errorf("failed to get total alloacted memory for taskgroup: %v", err)
}

// Create the metric function now that the total allocated memory is known.
// The job info has total memory in memibytes (incorrectly labeled as MB) and
// the `MemoryStats.Usage` metric is reported in bytes, so we must convert one
// of them.
metricFunc = func(m *[]float64, ru *api.ResourceUsage) {
usageMiB := ru.MemoryStats.Usage / 1024 / 1024
*m = append(*m, (float64(usageMiB)/float64(allocatedMem))*100)
}
}

for _, alloc := range allocs {
Expand Down Expand Up @@ -124,14 +141,9 @@ func (a *APMPlugin) getTaskGroupResourceUsage(query *taskGroupQuery) ([]float64,

// getAllocatedCPUForTaskGroup calculates the total allocated CPU in MHz for a taskgroup
func (a *APMPlugin) getAllocatedCPUForTaskGroup(job, taskgroup string) (int, error) {
jobInfo, _, err := a.client.Jobs().Info(job, nil)
taskGroupConfig, err := a.getTaskGroup(job, taskgroup)
if err != nil {
return -1, fmt.Errorf("failed to get info for job: %v", err)
}

taskGroupConfig := jobInfo.LookupTaskGroup(taskgroup)
if taskGroupConfig == nil {
return -1, fmt.Errorf("specified taskgroup was not found in job config")
return -1, err
}

taskGroupAllocatedCPU := 0
Expand All @@ -144,6 +156,38 @@ func (a *APMPlugin) getAllocatedCPUForTaskGroup(job, taskgroup string) (int, err
return taskGroupAllocatedCPU, nil
}

// getAllocatedMemForTaskGroup calculates the total allocated memory in MiB for a taskgroup
func (a *APMPlugin) getAllocatedMemForTaskGroup(job, taskgroup string) (int, error) {
taskGroupConfig, err := a.getTaskGroup(job, taskgroup)
if err != nil {
return -1, err
}

taskGroupAllocatedMem := 0
for _, task := range taskGroupConfig.Tasks {
if task.Resources == nil || task.Resources.MemoryMB == nil {
continue
}
taskGroupAllocatedMem += *task.Resources.MemoryMB
}
return taskGroupAllocatedMem, nil
}

// getTaskGroup returns a task group configuration from a job.
func (a *APMPlugin) getTaskGroup(job, taskgroup string) (*api.TaskGroup, error) {
jobInfo, _, err := a.client.Jobs().Info(job, nil)
if err != nil {
return nil, fmt.Errorf("failed to get info for job: %v", err)
}

taskGroupConfig := jobInfo.LookupTaskGroup(taskgroup)
if taskGroupConfig == nil {
return nil, fmt.Errorf("task group %q not found in job %q", taskgroup, job)
}

return taskGroupConfig, nil
}

// calculateTaskGroupResult determines the query result based on the metrics
// and operation to perform.
func calculateTaskGroupResult(op string, metrics []float64) sdk.TimestampedMetrics {
Expand Down Expand Up @@ -223,5 +267,5 @@ func parseTaskGroupQuery(q string) (*taskGroupQuery, error) {
}

func validateMetricTaskGroupQuery(metric string) error {
return validateMetric(metric, []string{queryMetricCPU, queryMetricCPUAllocated, queryMetricMem})
return validateMetric(metric, []string{queryMetricCPU, queryMetricCPUAllocated, queryMetricMem, queryMetricMemAllocated})
}
11 changes: 11 additions & 0 deletions plugins/builtin/apm/nomad/plugin/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,17 @@ func Test_parseTaskGroupQuery(t *testing.T) {
},
expectError: false,
},
{
name: "avg_memory-allocated",
input: "taskgroup_avg_memory-allocated/group/job",
expected: &taskGroupQuery{
metric: "memory-allocated",
job: "job",
group: "group",
operation: "avg",
},
expectError: false,
},
{
name: "job with fwd slashes",
input: "taskgroup_avg_cpu/group/my/super/job//",
Expand Down
1 change: 1 addition & 0 deletions plugins/builtin/apm/nomad/plugin/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const (
queryMetricCPU = "cpu"
queryMetricCPUAllocated = "cpu-allocated"
queryMetricMem = "memory"
queryMetricMemAllocated = "memory-allocated"
)

// Query satisfies the Query function on the apm.APM interface.
Expand Down
16 changes: 11 additions & 5 deletions plugins/builtin/apm/nomad/plugin/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,32 @@ func Test_validateMetric(t *testing.T) {
}{
{
inputMetric: "memory",
inputValidMetrics: []string{queryMetricCPU, queryMetricCPUAllocated, queryMetricMem},
inputValidMetrics: []string{queryMetricCPU, queryMetricCPUAllocated, queryMetricMem, queryMetricMemAllocated},
expectedOutput: nil,
name: "memory metric",
},
{
inputMetric: "memory-allocated",
inputValidMetrics: []string{queryMetricCPU, queryMetricCPUAllocated, queryMetricMem, queryMetricMemAllocated},
expectedOutput: nil,
name: "memory-allocated metric",
},
{
inputMetric: "cpu",
inputValidMetrics: []string{queryMetricCPU, queryMetricCPUAllocated, queryMetricMem},
inputValidMetrics: []string{queryMetricCPU, queryMetricCPUAllocated, queryMetricMem, queryMetricMemAllocated},
expectedOutput: nil,
name: "cpu metric",
},
{
inputMetric: "cpu-allocated",
inputValidMetrics: []string{queryMetricCPU, queryMetricCPUAllocated, queryMetricMem},
inputValidMetrics: []string{queryMetricCPU, queryMetricCPUAllocated, queryMetricMem, queryMetricMemAllocated},
expectedOutput: nil,
name: "cpu-allocated metric",
},
{
inputMetric: "cost-of-server",
inputValidMetrics: []string{queryMetricCPU, queryMetricCPUAllocated, queryMetricMem},
expectedOutput: errors.New("invalid metric \"cost-of-server\", allowed values are: cpu, cpu-allocated, memory"),
inputValidMetrics: []string{queryMetricCPU, queryMetricCPUAllocated, queryMetricMem, queryMetricMemAllocated},
expectedOutput: errors.New("invalid metric \"cost-of-server\", allowed values are: cpu, cpu-allocated, memory, memory-allocated"),
name: "invalid metric",
},
}
Expand Down