Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix empty field error in the iis/application pool metricset #19537

Merged
merged 11 commits into from
Jul 14, 2020
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix incorrect usage of hints builder when exposed port is a substring of the hint {pull}19052[19052]
- Remove dedot for tag values in aws module. {issue}19112[19112] {pull}19221[19221]
- Stop counterCache only when already started {pull}19103[19103]
- Fix empty field name errors in the application pool metricset. {pull}19537[19537]
- Set tags correctly if the dimension value is ARN {issue}19111[19111] {pull}19433[19433]
- Fix bug incorrect parsing of float numbers as integers in Couchbase module {issue}18949[18949] {pull}19055[19055]

Expand Down
22 changes: 13 additions & 9 deletions x-pack/metricbeat/module/iis/application_pool/application_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return nil, err
}
// instantiate reader object
reader, err := newReader()
reader, err := newReader(config)
if err != nil {
return nil, err
}
Expand All @@ -55,22 +55,25 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
log: logp.NewLogger("application pool"),
reader: reader,
}
if err := ms.reader.initCounters(config.Names); err != nil {
return ms, err
}

return ms, nil
}

// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(report mb.ReporterV2) error {
var config Config
if err := m.Module().UnpackConfig(&config); err != nil {
return nil
// refresh performance counter list
// Some counters, such as rate counters, require two counter values in order to compute a displayable value. In this case we must call PdhCollectQueryData twice before calling PdhGetFormattedCounterValue.
// For more information, see Collecting Performance Data (https://docs.microsoft.com/en-us/windows/desktop/PerfCtrs/collecting-performance-data).
// A flag is set if the second call has been executed else refresh will fail (reader.executed)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code added to newReader seems to be the same as a refresh, and it is run before reader.executed, could it also fail?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we try to run one refresh at a time, running them one after another seems to yield exceptions

if m.reader.executed {
err := m.reader.refreshCounterPaths()
if err != nil {
return errors.Wrap(err, "failed retrieving counters")
}
}

events, err := m.reader.fetch(config.Names)
events, err := m.reader.read()
if err != nil {
return errors.Wrap(err, "failed reading counters")
}
Expand All @@ -81,6 +84,7 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) error {
break
}
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: not related?

return nil
}

Expand Down
199 changes: 121 additions & 78 deletions x-pack/metricbeat/module/iis/application_pool/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,40 +9,41 @@ package application_pool
import (
"strings"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/metricbeat/helper/windows/pdh"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: please fix import's order

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ran fmt so that should be the order

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that mage fmt touches the order, I just made a small local test and it seems that doesn't. You can verify that by putting them in order and running again fmt.
Not sure how this change occurred and preserving order is not crucial but it's a more convention we follow.

"github.com/elastic/go-sysinfo"

"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/metricbeat/helper/windows/pdh"
"github.com/elastic/beats/v7/metricbeat/mb"
)

// Reader strucr will contain the pdh query and config options
const ecsProcessId = "process.pid"

// Reader will contain the config options
type Reader struct {
Query pdh.Query // PDH Query
ApplicationPools []ApplicationPool // Mapping of counter path to key used for the label (e.g. processor.name)
log *logp.Logger // logger
hasRun bool // will check if the reader has run a first time
WorkerProcesses map[string]string
applicationPools []ApplicationPool
workerProcesses map[string]string
query pdh.Query // PDH Query
executed bool // Indicates if the query has been executed.
log *logp.Logger //
config Config // Metricset configuration
}

// ApplicationPool struct contains the list of applications and their worker processes
type ApplicationPool struct {
Name string
WorkerProcessIds []int
name string
workerProcessIds []int
counters map[string]string
}

// WorkerProcess struct contains the worker process details
type WorkerProcess struct {
ProcessId int
InstanceName string
processId int
instanceName string
}

const ecsProcessId = "process.pid"

var appPoolCounters = map[string]string{
"process.pid": "\\Process(w3wp*)\\ID Process",
"process.cpu_usage_perc": "\\Process(w3wp*)\\% Processor Time",
Expand All @@ -62,87 +63,138 @@ var appPoolCounters = map[string]string{
}

// newReader creates a new instance of Reader.
func newReader() (*Reader, error) {
func newReader(config Config) (*Reader, error) {
var query pdh.Query
if err := query.Open(); err != nil {
return nil, err
}
reader := &Reader{
Query: query,
log: logp.NewLogger("website"),
r := &Reader{
query: query,
log: logp.NewLogger("application_pool"),
config: config,
workerProcesses: make(map[string]string),
}
apps, err := getApplicationPools(config.Names)
if err != nil {
return r, errors.Wrap(err, "failed retrieving running worker processes")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If something fails here, should we close query to avoid leaks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indeed, added the call

}
if len(apps) == 0 {
r.log.Info("no running application pools found")
return r, nil
}
r.applicationPools = apps

for key, value := range appPoolCounters {
childQueries, err := query.GetCounterPaths(value)
if err != nil {
if err == pdh.PDH_CSTATUS_NO_COUNTER || err == pdh.PDH_CSTATUS_NO_COUNTERNAME || err == pdh.PDH_CSTATUS_NO_INSTANCE || err == pdh.PDH_CSTATUS_NO_OBJECT {
r.log.Infow("Ignoring non existent counter", "error", err,
logp.Namespace("application pool"), "query", value)
continue
} else {
return nil, errors.Wrapf(err, `failed to expand counter (query="%v")`, value)
}
}
// check if the pdhexpandcounterpath/pdhexpandwildcardpath functions have expanded the counter successfully.
if len(childQueries) == 0 || (len(childQueries) == 1 && strings.Contains(childQueries[0], "*")) {
// covering cases when PdhExpandWildCardPathW returns no counter paths or is unable to expand and the ignore_non_existent_counters flag is set
r.log.Debugw("No counter paths returned but PdhExpandWildCardPathW returned no errors", "initial query", value,
logp.Namespace("application pool"), "expanded query", childQueries)
continue
}
for _, v := range childQueries {
if err := query.AddCounter(v, "", "float", len(childQueries) > 1); err != nil {
return nil, errors.Wrapf(err, `failed to add counter (query="%v")`, v)
}
r.workerProcesses[v] = key
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code added here is mostly the same as in refreshCounterPaths(), could it be reused?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactored the code so the function is reused

}

return reader, nil
return r, nil
}

// initCounters func retrieves the running application worker processes and adds the counters to the pdh query
func (re *Reader) initCounters(filtered []string) error {
apps, err := getApplicationPools(filtered)
// refreshCounterPaths will recheck for any new instances and add them to the counter list
func (r *Reader) refreshCounterPaths() error {
apps, err := getApplicationPools(r.config.Names)
if err != nil {
return errors.Wrap(err, "failed retrieving running worker processes")
}
r.applicationPools = apps
if len(apps) == 0 {
re.log.Info("no running application pools found")
r.log.Info("no running application pools found")
return nil
}
re.ApplicationPools = apps
re.WorkerProcesses = make(map[string]string)

var newQueries []string
r.workerProcesses = make(map[string]string)
for key, value := range appPoolCounters {
counters, err := re.Query.ExpandWildCardPath(value)
childQueries, err := r.query.GetCounterPaths(value)
if err != nil {
re.log.Error(err, `failed to expand counter path (query="%v")`, value)
if err == pdh.PDH_CSTATUS_NO_COUNTER || err == pdh.PDH_CSTATUS_NO_COUNTERNAME || err == pdh.PDH_CSTATUS_NO_INSTANCE || err == pdh.PDH_CSTATUS_NO_OBJECT {
r.log.Infow("Ignoring non existent counter", "error", err,
logp.Namespace("application pool"), "query", value)
continue
} else {
return errors.Wrapf(err, `failed to expand counter (query="%v")`, value)
}
}
newQueries = append(newQueries, childQueries...)
// check if the pdhexpandcounterpath/pdhexpandwildcardpath functions have expanded the counter successfully.
if len(childQueries) == 0 || (len(childQueries) == 1 && strings.Contains(childQueries[0], "*")) {
// covering cases when PdhExpandWildCardPathW returns no counter paths or is unable to expand and the ignore_non_existent_counters flag is set
r.log.Debugw("No counter paths returned but PdhExpandWildCardPathW returned no errors", "initial query", value,
logp.Namespace("perfmon"), "expanded query", childQueries)
continue
}
for _, count := range counters {
if err = re.Query.AddCounter(count, "", "float", true); err != nil {
return errors.Wrapf(err, `failed to add counter (query="%v")`, count)
for _, v := range childQueries {
if err := r.query.AddCounter(v, "", "float", len(childQueries) > 1); err != nil {
return errors.Wrapf(err, `failed to add counter (query="%v")`, v)
}
newQueries = append(newQueries, count)
re.WorkerProcesses[count] = key
r.workerProcesses[v] = key
}
}
err = re.Query.RemoveUnusedCounters(newQueries)
err = r.query.RemoveUnusedCounters(newQueries)
if err != nil {
return errors.Wrap(err, "failed removing unused counter values")
}
return nil
}

// fetch executes collects the query data and maps the counter values to events.
func (re *Reader) fetch(names []string) ([]mb.Event, error) {
// refresh performance counter list
// Some counters, such as rate counters, require two counter values in order to compute a displayable value. In this case we must call PdhCollectQueryData twice before calling PdhGetFormattedCounterValue.
// For more information, see Collecting Performance Data (https://docs.microsoft.com/en-us/windows/desktop/PerfCtrs/collecting-performance-data).
// A flag is set if the second call has been executed else refresh will fail (reader.executed)
if re.hasRun || len(re.Query.Counters) == 0 {
err := re.initCounters(names)
if err != nil {
return nil, errors.Wrap(err, "failed retrieving counters")
}
}
// if the ignore_non_existent_counters flag is set and no valid counter paths are found the Read func will still execute, a check is done before
if len(re.Query.Counters) == 0 {
// read executes a query and returns those values in an event.
func (r *Reader) read() ([]mb.Event, error) {
if len(r.applicationPools) == 0 {
r.executed = true
return nil, nil
}

// Some counters, such as rate counters, require two counter values in order to compute a displayable value. In this case we must call PdhCollectQueryData twice before calling PdhGetFormattedCounterValue.
// For more information, see Collecting Performance Data (https://docs.microsoft.com/en-us/windows/desktop/PerfCtrs/collecting-performance-data).
if err := re.Query.CollectData(); err != nil {
if err := r.query.CollectData(); err != nil {
return nil, errors.Wrap(err, "failed querying counter values")
ChrsMark marked this conversation as resolved.
Show resolved Hide resolved
}

// Get the values.
values, err := re.Query.GetFormattedCounterValues()
values, err := r.query.GetFormattedCounterValues()
if err != nil {
return nil, errors.Wrap(err, "failed formatting counter values")
}
var events []mb.Event
eventGroup := r.mapEvents(values)
r.executed = true
results := make([]mb.Event, 0, len(events))
for _, val := range eventGroup {
results = append(results, val)
}
return results, nil
}

func (r *Reader) mapEvents(values map[string][]pdh.CounterValue) map[string]mb.Event {
workers := getProcessIds(values)
events := make(map[string]mb.Event)
for _, appPool := range re.ApplicationPools {
events[appPool.Name] = mb.Event{
for _, appPool := range r.applicationPools {
events[appPool.name] = mb.Event{
MetricSetFields: common.MapStr{
"name": appPool.Name,
"name": appPool.name,
},
RootFields: common.MapStr{},
}
Expand All @@ -151,45 +203,36 @@ func (re *Reader) fetch(names []string) ([]mb.Event, error) {
// Some counters, such as rate counters, require two counter values in order to compute a displayable value. In this case we must call PdhCollectQueryData twice before calling PdhGetFormattedCounterValue.
// For more information, see Collecting Performance Data (https://docs.microsoft.com/en-us/windows/desktop/PerfCtrs/collecting-performance-data).
if val.Err.Error != nil {
if !re.hasRun {
re.log.Debugw("Ignoring the first measurement because the data isn't ready",
"error", val.Err, logp.Namespace("application_pool"), "query", counterPath)
if !r.executed {
continue
}
// The counter has a negative value or the counter was successfully found, but the data returned is not valid.
// This error can occur if the counter value is less than the previous value. (Because counter values always increment, the counter value rolls over to zero when it reaches its maximum value.)
// This is not an error that stops the application from running successfully and a positive counter value should be retrieved in the later calls.
if val.Err.Error == pdh.PDH_CALC_NEGATIVE_VALUE || val.Err.Error == pdh.PDH_INVALID_DATA {
re.log.Debugw("Counter value retrieval returned",
r.log.Debugw("Counter value retrieval returned",
"error", val.Err.Error, "cstatus", pdh.PdhErrno(val.Err.CStatus), logp.Namespace("application_pool"), "query", counterPath)
continue
}
}
if val.Instance == appPool.Name {
events[appPool.Name].MetricSetFields.Put(appPool.counters[counterPath], val.Measurement)
} else if hasWorkerProcess(val.Instance, workers, appPool.WorkerProcessIds) {
if re.WorkerProcesses[counterPath] == ecsProcessId {
events[appPool.Name].RootFields.Put(re.WorkerProcesses[counterPath], val.Measurement)
} else {
events[appPool.Name].MetricSetFields.Put(re.WorkerProcesses[counterPath], val.Measurement)
if hasWorkerProcess(val.Instance, workers, appPool.workerProcessIds) {
if r.workerProcesses[counterPath] == ecsProcessId {
events[appPool.name].RootFields.Put(r.workerProcesses[counterPath], val.Measurement)
} else if len(r.workerProcesses[counterPath]) != 0 {
events[appPool.name].MetricSetFields.Put(r.workerProcesses[counterPath], val.Measurement)
}
}
}

}
}

re.hasRun = true
results := make([]mb.Event, 0, len(events))
for _, val := range events {
results = append(results, val)
}
return results, nil
return events
}

// Close will close the PDH query for now.
func (re *Reader) close() error {
return re.Query.Close()
// close will close the PDH query for now.
func (r *Reader) close() error {
return r.query.Close()
}

// getApplicationPools method retrieves the w3wp.exe processes and the application pool name, also filters on the application pool names configured by users
Expand All @@ -204,15 +247,15 @@ func getApplicationPools(names []string) ([]ApplicationPool, error) {
}
var applicationPools []ApplicationPool
for key, value := range appPools {
applicationPools = append(applicationPools, ApplicationPool{Name: key, WorkerProcessIds: value})
applicationPools = append(applicationPools, ApplicationPool{name: key, workerProcessIds: value})
}
if len(names) == 0 {
return applicationPools, nil
}
var filtered []ApplicationPool
for _, n := range names {
for _, w3 := range applicationPools {
if n == w3.Name {
if n == w3.name {
filtered = append(filtered, w3)
}
}
Expand Down Expand Up @@ -253,18 +296,18 @@ func getProcessIds(counterValues map[string][]pdh.CounterValue) []WorkerProcess
var workers []WorkerProcess
for key, values := range counterValues {
if strings.Contains(key, "\\ID Process") {
workers = append(workers, WorkerProcess{InstanceName: values[0].Instance, ProcessId: int(values[0].Measurement.(float64))})
workers = append(workers, WorkerProcess{instanceName: values[0].Instance, processId: int(values[0].Measurement.(float64))})
}
}
return workers
}

// hasWorkerProcess func checks if workerprocess list contains the process id
// hasWorkerProcess func checks if worker process list contains the process id
func hasWorkerProcess(instance string, workers []WorkerProcess, pids []int) bool {
for _, worker := range workers {
if worker.InstanceName == instance {
if worker.instanceName == instance {
for _, pid := range pids {
if pid == worker.ProcessId {
if pid == worker.processId {
return true
}
}
Expand Down
Loading