Skip to content

Commit

Permalink
Add group_measurements_by_instance_label flag to perfmon configuration (
Browse files Browse the repository at this point in the history
#11002)

This flag will send all perfmon measurements with a matching instance label as part of the same event (i.e. all metrics for C:, Processor X, etc.). This addresses some of the issues raised in #6584.

In most cases enabling this flag considerably reduces the number of events sent by metricbeat.

Co-Authored-By: Josh Smith <j_smith95@live.com>
  • Loading branch information
jsoriano and ramenjosh authored Mar 1, 2019
1 parent f527cd5 commit 81f00f7
Show file tree
Hide file tree
Showing 9 changed files with 117 additions and 25 deletions.
1 change: 1 addition & 0 deletions CHANGELOG-developer.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,4 @@ The list below covers the major changes between 6.3.0 and 7.0.0-alpha2 only.
- Update Beats to use go 1.11.2 {pull}8746[8746]
- Allow/Merge fields.yml overrides {pull}9188[9188]
- Filesets can now define multiple ingest pipelines, with the first one considered as the entry point pipeline. {pull}8914[8914]
- Add `group_measurements_by_instance` option to windows perfmon metricset. {pull}8688[8688]
3 changes: 2 additions & 1 deletion metricbeat/docs/modules/windows.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ metricbeat.modules:
metricsets: ["perfmon"]
enabled: true
period: 10s
perfmon.ignore_non_existent_counters: true
perfmon.ignore_non_existent_counters: false
perfmon.group_measurements_by_instance: false
perfmon.counters:
# - instance_label: processor.name
# instance_name: total
Expand Down
3 changes: 2 additions & 1 deletion metricbeat/metricbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,8 @@ metricbeat.modules:
metricsets: ["perfmon"]
enabled: true
period: 10s
perfmon.ignore_non_existent_counters: true
perfmon.ignore_non_existent_counters: false
perfmon.group_measurements_by_instance: false
perfmon.counters:
# - instance_label: processor.name
# instance_name: total
Expand Down
3 changes: 2 additions & 1 deletion metricbeat/module/windows/_meta/config.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
metricsets: ["perfmon"]
enabled: true
period: 10s
perfmon.ignore_non_existent_counters: true
perfmon.ignore_non_existent_counters: false
perfmon.group_measurements_by_instance: false
perfmon.counters:
# - instance_label: processor.name
# instance_name: total
Expand Down
7 changes: 7 additions & 0 deletions metricbeat/module/windows/perfmon/_meta/docs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ to collect. The example below collects processor time and disk writes every
metricsets: [perfmon]
period: 10s
perfmon.ignore_non_existent_counters: true
perfmon.group_measurements_by_instance: true
perfmon.counters:
- instance_label: processor.name
instance_name: total
Expand All @@ -34,6 +35,12 @@ metricset to ignore errors caused by counters that do not exist when set to
true. Instead of an error, a message will be logged at the info level stating
that the counter does not exist.

*`group_measurements_by_instance`*:: A boolean option that causes metricbeat
to send all measurements with a matching perfmon instance as part of a single
event. In the above example, this will cause the physical_disk.write.per_sec
and physical_disk.write.time.pct measurements to be sent as a single event.
The default behaviour is for all measurements to be sent as separate events.

*`counters`*:: Counters specifies a list of queries to perform. Each individual
counter requires three config options - `instance_label`, `measurement_label`,
and `query`.
Expand Down
59 changes: 59 additions & 0 deletions metricbeat/module/windows/perfmon/pdh_integration_windows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,3 +336,62 @@ func TestWildcardQuery(t *testing.T) {

t.Log(values)
}

func TestGroupByInstance(t *testing.T) {
config := Config{
CounterConfig: make([]CounterConfig, 3),
GroupMeasurements: true,
}
config.CounterConfig[0].InstanceLabel = "processor.name"
config.CounterConfig[0].MeasurementLabel = "processor.time.pct"
config.CounterConfig[0].Query = `\Processor Information(_Total)\% Processor Time`
config.CounterConfig[0].Format = "float"

config.CounterConfig[1].InstanceLabel = "processor.name"
config.CounterConfig[1].MeasurementLabel = "processor.time.user.pct"
config.CounterConfig[1].Query = `\Processor Information(_Total)\% User Time`
config.CounterConfig[1].Format = "float"

config.CounterConfig[2].InstanceLabel = "processor.name"
config.CounterConfig[2].MeasurementLabel = "processor.time.idle.average.ns"
config.CounterConfig[2].Query = `\Processor Information(_Total)\Average Idle Time`
config.CounterConfig[2].Format = "float"

handle, err := NewPerfmonReader(config)
if err != nil {
t.Fatal(err)
}
defer handle.query.Close()

values, _ := handle.Read()

time.Sleep(time.Millisecond * 1000)

values, err = handle.Read()
if err != nil {
t.Fatal(err)
}

assert.EqualValues(t, 1, len(values)) // Assert all metrics have been grouped into a single event

// Test all keys exist in the event
pctKey, err := values[0].MetricSetFields.HasKey("processor.time.pct")
if err != nil {
t.Fatal(err)
}
assert.True(t, pctKey)

pctKey, err = values[0].MetricSetFields.HasKey("processor.time.user.pct")
if err != nil {
t.Fatal(err)
}
assert.True(t, pctKey)

pctKey, err = values[0].MetricSetFields.HasKey("processor.time.idle.average.ns")
if err != nil {
t.Fatal(err)
}
assert.True(t, pctKey)

t.Log(values)
}
58 changes: 39 additions & 19 deletions metricbeat/module/windows/perfmon/pdh_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,11 +326,12 @@ func (q *Query) Close() error {
}

type PerfmonReader struct {
query *Query // PDH Query
instanceLabel map[string]string // Mapping of counter path to key used for the label (e.g. processor.name)
measurement map[string]string // Mapping of counter path to key used for the value (e.g. processor.cpu_time).
executed bool // Indicates if the query has been executed.
log *logp.Logger
query *Query // PDH Query
instanceLabel map[string]string // Mapping of counter path to key used for the label (e.g. processor.name)
measurement map[string]string // Mapping of counter path to key used for the value (e.g. processor.cpu_time).
executed bool // Indicates if the query has been executed.
log *logp.Logger //
groupMeasurements bool // Indicates if measurements with the same instance label should be sent in the same event
}

// NewPerfmonReader creates a new instance of PerfmonReader.
Expand All @@ -341,10 +342,11 @@ func NewPerfmonReader(config Config) (*PerfmonReader, error) {
}

r := &PerfmonReader{
query: query,
instanceLabel: map[string]string{},
measurement: map[string]string{},
log: logp.NewLogger("perfmon"),
query: query,
instanceLabel: map[string]string{},
measurement: map[string]string{},
log: logp.NewLogger("perfmon"),
groupMeasurements: config.GroupMeasurements,
}

for _, counter := range config.CounterConfig {
Expand Down Expand Up @@ -388,36 +390,54 @@ func (r *PerfmonReader) Read() ([]mb.Event, error) {
return nil, errors.Wrap(err, "failed formatting counter values")
}

// Write the values into the map.
events := make([]mb.Event, 0, len(values))
eventMap := make(map[string]*mb.Event)

for counterPath, values := range values {
for _, val := range values {
for ind, val := range values {
if val.Err != nil && !r.executed {
r.log.Debugw("Ignoring the first measurement because the data isn't ready",
"error", val.Err, logp.Namespace("perfmon"), "query", counterPath)
continue
}

event := mb.Event{
MetricSetFields: common.MapStr{},
Error: errors.Wrapf(val.Err, "failed on query=%v", counterPath),
var eventKey string
if r.groupMeasurements && val.Err == nil {
// Send measurements with the same instance label as part of the same event
eventKey = val.Instance
} else {
// Send every measurement as an individual event
// If a counter contains an error, it will always be sent as an individual event
eventKey = counterPath + strconv.Itoa(ind)
}

if val.Instance != "" {
event.MetricSetFields.Put(r.instanceLabel[counterPath], val.Instance)
// Create a new event if the key doesn't exist in the map
if _, ok := eventMap[eventKey]; !ok {
eventMap[eventKey] = &mb.Event{
MetricSetFields: common.MapStr{},
Error: errors.Wrapf(val.Err, "failed on query=%v", counterPath),
}

if val.Instance != "" {
eventMap[eventKey].MetricSetFields.Put(r.instanceLabel[counterPath], val.Instance)
}
}

event := eventMap[eventKey]

if val.Measurement != nil {
event.MetricSetFields.Put(r.measurement[counterPath], val.Measurement)
} else {
event.MetricSetFields.Put(r.measurement[counterPath], 0)
}

events = append(events, event)
}
}

// Write the values into the map.
events := make([]mb.Event, 0, len(eventMap))
for _, val := range eventMap {
events = append(events, *val)
}

r.executed = true
return events, nil
}
Expand Down
5 changes: 3 additions & 2 deletions metricbeat/module/windows/perfmon/perfmon.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ type CounterConfig struct {

// Config for the windows perfmon metricset.
type Config struct {
IgnoreNECounters bool `config:"perfmon.ignore_non_existent_counters"`
CounterConfig []CounterConfig `config:"perfmon.counters" validate:"required"`
IgnoreNECounters bool `config:"perfmon.ignore_non_existent_counters"`
GroupMeasurements bool `config:"perfmon.group_measurements_by_instance"`
CounterConfig []CounterConfig `config:"perfmon.counters" validate:"required"`
}

func init() {
Expand Down
3 changes: 2 additions & 1 deletion x-pack/metricbeat/metricbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -725,7 +725,8 @@ metricbeat.modules:
metricsets: ["perfmon"]
enabled: true
period: 10s
perfmon.ignore_non_existent_counters: true
perfmon.ignore_non_existent_counters: false
perfmon.group_measurements_by_instance: false
perfmon.counters:
# - instance_label: processor.name
# instance_name: total
Expand Down

0 comments on commit 81f00f7

Please sign in to comment.