Skip to content

Commit

Permalink
[system] - Add a flag to the system module to control whether metrics…
Browse files Browse the repository at this point in the history
… failures mark agent as degraded (#42160)

* chore: initial commit

* doc and test cases

* lint

* remove errors.Is

* chore: notice

* update docs

* Update metricbeat/mb/event.go

Co-authored-by: Craig MacKenzie <craig.mackenzie@elastic.co>

* remove references

* remove sleep

---------

Co-authored-by: Craig MacKenzie <craig.mackenzie@elastic.co>
(cherry picked from commit 7a2f8d4)
  • Loading branch information
VihasMakwana authored and mergify[bot] committed Jan 17, 2025
1 parent 58f7c8c commit 05b2e1e
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 10 deletions.
3 changes: 3 additions & 0 deletions metricbeat/mb/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,9 @@ type PartialMetricsError struct {
}

func (p PartialMetricsError) Error() string {
if p.Err == nil {
return ""
}
return p.Err.Error()
}

Expand Down
1 change: 1 addition & 0 deletions metricbeat/module/system/_meta/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
process.include_top_n:
by_cpu: 5 # include top 5 processes by CPU
by_memory: 5 # include top 5 processes by memory
degrade_on_partial: false # mark metricset as degraded if partial metrics are emitted
# Configure the mount point of the host’s filesystem for use in monitoring a host from within a container
# hostfs: "/hostfs"

Expand Down
23 changes: 18 additions & 5 deletions metricbeat/module/system/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,10 @@ func init() {
// MetricSet that fetches process metrics.
type MetricSet struct {
mb.BaseMetricSet
stats *process.Stats
perCPU bool
setpid int
stats *process.Stats
perCPU bool
setpid int
degradeOnPartial bool
}

// New creates and returns a new MetricSet.
Expand All @@ -72,7 +73,12 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
if config.Pid != 0 && config.Procs[0] != ".*" {
logp.L().Warnf("`process.pid` set to %d, but `processes` is set to a non-default value. Metricset will only report metrics for pid %d", config.Pid, config.Pid)
}

degradedConf := struct {
DegradeOnPartial bool `config:"degrade_on_partial"`
}{}
if err := base.Module().UnpackConfig(&degradedConf); err != nil {
logp.L().Warnf("Failed to unpack config; degraded mode will be disabled for partial metrics: %v", err)
}
m := &MetricSet{
BaseMetricSet: base,
stats: &process.Stats{
Expand All @@ -88,7 +94,8 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
IgnoreRootCgroups: true,
},
},
perCPU: config.IncludePerCPU,
perCPU: config.IncludePerCPU,
degradeOnPartial: degradedConf.DegradeOnPartial,
}

m.setpid = config.Pid
Expand Down Expand Up @@ -119,6 +126,9 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error {
// return only if the error is fatal in nature
return fmt.Errorf("process stats: %w", err)
} else if (err != nil && errors.Is(err, process.NonFatalErr{})) {
if m.degradeOnPartial {
return fmt.Errorf("error fetching process list: %w", err)
}
err = mb.PartialMetricsError{Err: err}
}

Expand All @@ -138,6 +148,9 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error {
// return only if the error is fatal in nature
return fmt.Errorf("error fetching pid %d: %w", m.setpid, err)
} else if (err != nil && errors.Is(err, process.NonFatalErr{})) {
if m.degradeOnPartial {
return fmt.Errorf("error fetching process list: %w", err)
}
err = mb.PartialMetricsError{Err: err}
}
// if error is non-fatal, emit partial metrics.
Expand Down
27 changes: 27 additions & 0 deletions metricbeat/module/system/process/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/metricbeat/mb"
mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing"
_ "github.com/elastic/beats/v7/metricbeat/module/system"
"github.com/elastic/elastic-agent-libs/logp"
Expand Down Expand Up @@ -55,6 +56,32 @@ func TestFetch(t *testing.T) {
events[0].BeatEvent("system", "process").Fields.StringToPrint())
}

func TestFetchDegradeOnPartial(t *testing.T) {
logp.DevelopmentSetup()
config := getConfig()
config["degrade_on_partial"] = true

f := mbtest.NewReportingMetricSetV2Error(t, config)
events, errs := mbtest.ReportingFetchV2Error(f)
if len(errs) > 0 {
for _, err := range errs {
assert.NotErrorIsf(t, err, &mb.PartialMetricsError{}, "Expected non-fatal error, got %v", err)
}
} else {
assert.NotEmpty(t, events)

events, errs = mbtest.ReportingFetchV2Error(f)
for _, err := range errs {
assert.ErrorIsf(t, err, process.NonFatalErr{}, "Expected non-fatal error, got %v", err)
}
assert.NotEmpty(t, events)

t.Logf("fetched %d events, showing events[0]:", len(events))
t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(),
events[0].BeatEvent("system", "process").Fields.StringToPrint())
}
}

func TestFetchSinglePid(t *testing.T) {
logp.DevelopmentSetup()

Expand Down
23 changes: 19 additions & 4 deletions metricbeat/module/system/process_summary/process_summary.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/elastic/beats/v7/libbeat/common/transform/typeconv"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/mb/parse"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-system-metrics/metric/system/process"
"github.com/elastic/elastic-agent-system-metrics/metric/system/resolve"
Expand All @@ -50,17 +51,28 @@ func init() {
// multiple fetch calls.
type MetricSet struct {
mb.BaseMetricSet
sys resolve.Resolver
sys resolve.Resolver
degradeOnPartial bool
}

// New create a new instance of the MetricSet
// Part of new is also setting up the configuration by processing additional
// configuration entries if needed.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
sys := base.Module().(resolve.Resolver)
sys, ok := base.Module().(resolve.Resolver)
if !ok {
return nil, fmt.Errorf("resolver cannot be cast from the module")
}
degradedConf := struct {
DegradeOnPartial bool `config:"degrade_on_partial"`
}{}
if err := base.Module().UnpackConfig(&degradedConf); err != nil {
logp.L().Warnf("Failed to unpack config; degraded mode will be disabled for partial metrics: %v", err)
}
return &MetricSet{
BaseMetricSet: base,
sys: sys,
BaseMetricSet: base,
sys: sys,
degradeOnPartial: degradedConf.DegradeOnPartial,
}, nil
}

Expand All @@ -74,6 +86,9 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error {
// return only if the error is fatal in nature
return fmt.Errorf("error fetching process list: %w", degradeErr)
} else if (degradeErr != nil && errors.Is(degradeErr, process.NonFatalErr{})) {
if m.degradeOnPartial {
return fmt.Errorf("error fetching process list: %w", degradeErr)
}
degradeErr = mb.PartialMetricsError{Err: degradeErr}
}

Expand Down
25 changes: 24 additions & 1 deletion metricbeat/module/system/process_summary/process_summary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/metricbeat/mb"
mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing"
_ "github.com/elastic/beats/v7/metricbeat/module/system"
"github.com/elastic/elastic-agent-libs/logp"
Expand Down Expand Up @@ -58,6 +59,27 @@ func TestFetch(t *testing.T) {
require.NoError(t, err)

}
func TestFetchDegradeOnPartial(t *testing.T) {
logp.DevelopmentSetup()
config := getConfig()
config["degrade_on_partial"] = true

f := mbtest.NewReportingMetricSetV2Error(t, config)
events, errs := mbtest.ReportingFetchV2Error(f)
if len(errs) > 0 {
for _, err := range errs {
assert.NotErrorIsf(t, err, &mb.PartialMetricsError{}, "Expected non-fatal error, got %v", err)
}
} else {
require.NotEmpty(t, events)
event := events[0].BeatEvent("system", "process_summary").Fields
t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(),
event.StringToPrint())

_, err := event.GetValue("system.process.summary")
require.NoError(t, err)
}
}

func TestStateNames(t *testing.T) {
logp.DevelopmentSetup()
Expand All @@ -80,7 +102,8 @@ func TestStateNames(t *testing.T) {
assert.NotZero(t, event["total"])

var sum int
total := event["total"].(int)
total, ok := event["total"].(int)
require.Truef(t, ok, "Expected int got %T", event["total"])
for key, val := range event {
if key == "total" {
continue
Expand Down
1 change: 1 addition & 0 deletions metricbeat/modules.d/system.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
process.include_top_n:
by_cpu: 5 # include top 5 processes by CPU
by_memory: 5 # include top 5 processes by memory
degrade_on_partial: false # mark metricset as degraded if partial metrics are emitted
# Configure the mount point of the host’s filesystem for use in monitoring a host from within a container
# hostfs: "/hostfs"

Expand Down

0 comments on commit 05b2e1e

Please sign in to comment.