From fe79ea45a69dd31b9e26280f3f67ff1bf51df9e9 Mon Sep 17 00:00:00 2001 From: Giuseppe Santoro Date: Fri, 17 Jun 2022 15:29:17 +0100 Subject: [PATCH 1/5] Feature/cache expiration (#31785) * fixed cache expiration bug (cherry picked from commit 4b625fc4111b6dc9dc95635f9bd7f08905067d1d) # Conflicts: # metricbeat/module/kubernetes/container/container.go # metricbeat/module/kubernetes/pod/pod.go # metricbeat/module/kubernetes/state_daemonset/state_daemonset.go # metricbeat/module/kubernetes/state_persistentvolume/state_persistentvolume.go # metricbeat/module/kubernetes/state_persistentvolumeclaim/state_persistentvolumeclaim.go # metricbeat/module/kubernetes/util/kubernetes.go # metricbeat/module/kubernetes/volume/volume.go --- CHANGELOG.next.asciidoc | 3 + libbeat/common/cache.go | 4 +- .../module/kubernetes/container/container.go | 6 +- .../kubernetes/container/container_test.go | 3 +- metricbeat/module/kubernetes/kubernetes.go | 17 +++++ metricbeat/module/kubernetes/node/node.go | 2 +- metricbeat/module/kubernetes/pod/pod.go | 6 +- metricbeat/module/kubernetes/pod/pod_test.go | 3 +- .../state_container/state_container.go | 2 +- .../kubernetes/state_cronjob/state_cronjob.go | 4 +- .../state_daemonset/state_daemonset.go | 4 ++ .../state_deployment/state_deployment.go | 4 +- .../module/kubernetes/state_job/state_job.go | 4 +- .../kubernetes/state_node/state_node.go | 4 +- .../state_persistentvolume.go | 4 ++ .../state_persistentvolumeclaim.go | 4 ++ .../module/kubernetes/state_pod/state_pod.go | 4 +- .../state_replicaset/state_replicaset.go | 4 +- .../state_resourcequota.go | 1 - .../kubernetes/state_service/state_service.go | 3 +- .../state_statefulset/state_statefulset.go | 4 +- .../state_storageclass/state_storageclass.go | 1 - .../module/kubernetes/util/kubernetes.go | 56 +++++++++++++--- .../module/kubernetes/util/metrics_cache.go | 64 +++++++++++++++---- .../kubernetes/util/metrics_cache_test.go | 5 +- metricbeat/module/kubernetes/volume/volume.go | 24 +++++++ 26 files changed, 182 insertions(+), 58 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index d546b1e7291..bf71117a0a6 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -48,6 +48,9 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] *Metricbeat* +- make `system/filesystem` code sensitive to `hostfs` and migrate libraries to `elastic-agent-opts` {pull}31001[31001] +- Fix kubernetes module's internal cache expiration issue. This avoid metrics like `kubernetes.container.cpu.usage.limit.pct` from not being populated. {pull}31785[31785] +- add missing HealthyHostCount and UnHealthyHostCount for application ELB. {pull}31853[31853] *Packetbeat* diff --git a/libbeat/common/cache.go b/libbeat/common/cache.go index c06aa2427b8..d75a8471c5f 100644 --- a/libbeat/common/cache.go +++ b/libbeat/common/cache.go @@ -255,7 +255,9 @@ func (c *Cache) StartJanitor(interval time.Duration) { // StopJanitor stops the goroutine created by StartJanitor. func (c *Cache) StopJanitor() { - close(c.janitorQuit) + if c.janitorQuit != nil { + close(c.janitorQuit) + } } // get returns the non-expired values from the cache. diff --git a/metricbeat/module/kubernetes/container/container.go b/metricbeat/module/kubernetes/container/container.go index 346d161a7b3..050f41072c1 100644 --- a/metricbeat/module/kubernetes/container/container.go +++ b/metricbeat/module/kubernetes/container/container.go @@ -75,7 +75,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return &MetricSet{ BaseMetricSet: base, http: http, - enricher: util.NewContainerMetadataEnricher(base, true), + enricher: util.NewContainerMetadataEnricher(base, mod.GetPerfMetricsCache(), true), mod: mod, }, nil } @@ -93,7 +93,11 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) { return } +<<<<<<< HEAD events, err := eventMapping(body, util.PerfMetrics) +======= + events, err := eventMapping(body, m.mod.GetPerfMetricsCache(), m.Logger()) +>>>>>>> 4b625fc411 (Feature/cache expiration (#31785)) if err != nil { m.Logger().Error(err) reporter.Error(err) diff --git a/metricbeat/module/kubernetes/container/container_test.go b/metricbeat/module/kubernetes/container/container_test.go index 67ed44cac2f..9e9a94e9ff5 100644 --- a/metricbeat/module/kubernetes/container/container_test.go +++ b/metricbeat/module/kubernetes/container/container_test.go @@ -24,6 +24,7 @@ import ( "io/ioutil" "os" "testing" + "time" "github.com/stretchr/testify/assert" @@ -40,7 +41,7 @@ func TestEventMapping(t *testing.T) { body, err := ioutil.ReadAll(f) assert.NoError(t, err, "cannot read test file "+testFile) - cache := util.NewPerfMetricsCache() + cache := util.NewPerfMetricsCache(120 * time.Second) cache.NodeCoresAllocatable.Set("gke-beats-default-pool-a5b33e2e-hdww", 2) cache.NodeMemAllocatable.Set("gke-beats-default-pool-a5b33e2e-hdww", 146227200) cache.ContainerMemLimit.Set(util.ContainerUID("default", "nginx-deployment-2303442956-pcqfc", "nginx"), 14622720) diff --git a/metricbeat/module/kubernetes/kubernetes.go b/metricbeat/module/kubernetes/kubernetes.go index f9bd0e29e37..594dcf58907 100644 --- a/metricbeat/module/kubernetes/kubernetes.go +++ b/metricbeat/module/kubernetes/kubernetes.go @@ -28,6 +28,7 @@ import ( "github.com/elastic/beats/v7/metricbeat/helper" p "github.com/elastic/beats/v7/metricbeat/helper/prometheus" "github.com/elastic/beats/v7/metricbeat/mb" + "github.com/elastic/beats/v7/metricbeat/module/kubernetes/util" ) func init() { @@ -41,6 +42,7 @@ type Module interface { mb.Module GetStateMetricsFamilies(prometheus p.Prometheus) ([]*dto.MetricFamily, error) GetKubeletStats(http *helper.HTTP) ([]byte, error) + GetPerfMetricsCache() *util.PerfMetricsCache } type familiesCache struct { @@ -84,6 +86,7 @@ type module struct { kubeStateMetricsCache *kubeStateMetricsCache kubeletStatsCache *kubeletStatsCache + perfMetrics *util.PerfMetricsCache cacheHash uint64 } @@ -94,15 +97,25 @@ func ModuleBuilder() func(base mb.BaseModule) (mb.Module, error) { kubeletStatsCache := &kubeletStatsCache{ cacheMap: make(map[uint64]*statsCache), } + perfMetrics := util.NewPerfMetricsCache(0) return func(base mb.BaseModule) (mb.Module, error) { hash, err := generateCacheHash(base.Config().Hosts) if err != nil { return nil, errors.Wrap(err, "error generating cache hash for kubeStateMetricsCache") } + + // NOTE: `Period * 2` is an arbitrary value to make the cache NEVER to expire before the next scraping run + // if different metricsets have different periods, we will effectively set (timeout = max(Period) * 2) + minCacheExpirationTime := base.Config().Period * 2 + if perfMetrics.GetTimeout() < minCacheExpirationTime { + perfMetrics.SetOrUpdateTimeout(minCacheExpirationTime) + } + m := module{ BaseModule: base, kubeStateMetricsCache: kubeStateMetricsCache, kubeletStatsCache: kubeletStatsCache, + perfMetrics: perfMetrics, cacheHash: hash, } return &m, nil @@ -153,3 +166,7 @@ func generateCacheHash(host []string) (uint64, error) { } return id, nil } + +func (m *module) GetPerfMetricsCache() *util.PerfMetricsCache { + return m.perfMetrics +} diff --git a/metricbeat/module/kubernetes/node/node.go b/metricbeat/module/kubernetes/node/node.go index 1995b63267c..f6f0a7d4cab 100644 --- a/metricbeat/module/kubernetes/node/node.go +++ b/metricbeat/module/kubernetes/node/node.go @@ -79,7 +79,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return &MetricSet{ BaseMetricSet: base, http: http, - enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Node{}, false), + enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Node{}, mod.GetPerfMetricsCache(), false), mod: mod, }, nil } diff --git a/metricbeat/module/kubernetes/pod/pod.go b/metricbeat/module/kubernetes/pod/pod.go index 470f3e98165..75aa66dfe59 100644 --- a/metricbeat/module/kubernetes/pod/pod.go +++ b/metricbeat/module/kubernetes/pod/pod.go @@ -76,7 +76,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return &MetricSet{ BaseMetricSet: base, http: http, - enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Pod{}, true), + enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Pod{}, mod.GetPerfMetricsCache(), true), mod: mod, }, nil } @@ -94,7 +94,11 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) { return } +<<<<<<< HEAD events, err := eventMapping(body, util.PerfMetrics) +======= + events, err := eventMapping(body, m.mod.GetPerfMetricsCache(), m.Logger()) +>>>>>>> 4b625fc411 (Feature/cache expiration (#31785)) if err != nil { m.Logger().Error(err) reporter.Error(err) diff --git a/metricbeat/module/kubernetes/pod/pod_test.go b/metricbeat/module/kubernetes/pod/pod_test.go index 37486ad58e8..48645e81b27 100644 --- a/metricbeat/module/kubernetes/pod/pod_test.go +++ b/metricbeat/module/kubernetes/pod/pod_test.go @@ -24,6 +24,7 @@ import ( "io/ioutil" "os" "testing" + "time" "github.com/stretchr/testify/assert" @@ -40,7 +41,7 @@ func TestEventMapping(t *testing.T) { body, err := ioutil.ReadAll(f) assert.NoError(t, err, "cannot read test file "+testFile) - cache := util.NewPerfMetricsCache() + cache := util.NewPerfMetricsCache(120 * time.Second) cache.NodeCoresAllocatable.Set("gke-beats-default-pool-a5b33e2e-hdww", 2) cache.NodeMemAllocatable.Set("gke-beats-default-pool-a5b33e2e-hdww", 146227200) cache.ContainerMemLimit.Set(util.ContainerUID("default", "nginx-deployment-2303442956-pcqfc", "nginx"), 14622720) diff --git a/metricbeat/module/kubernetes/state_container/state_container.go b/metricbeat/module/kubernetes/state_container/state_container.go index 7938922ebe2..d997e5c2c75 100644 --- a/metricbeat/module/kubernetes/state_container/state_container.go +++ b/metricbeat/module/kubernetes/state_container/state_container.go @@ -122,7 +122,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return &MetricSet{ BaseMetricSet: base, prometheus: prometheus, - enricher: util.NewContainerMetadataEnricher(base, false), + enricher: util.NewContainerMetadataEnricher(base, mod.GetPerfMetricsCache(), false), mod: mod, }, nil } diff --git a/metricbeat/module/kubernetes/state_cronjob/state_cronjob.go b/metricbeat/module/kubernetes/state_cronjob/state_cronjob.go index 49d191580fd..a5f8bd12897 100644 --- a/metricbeat/module/kubernetes/state_cronjob/state_cronjob.go +++ b/metricbeat/module/kubernetes/state_cronjob/state_cronjob.go @@ -63,7 +63,7 @@ func NewCronJobMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) { BaseMetricSet: base, prometheus: prometheus, mod: mod, - enricher: util.NewResourceMetadataEnricher(base, &kubernetes.CronJob{}, false), + enricher: util.NewResourceMetadataEnricher(base, &kubernetes.CronJob{}, mod.GetPerfMetricsCache(), false), mapping: &p.MetricsMapping{ Metrics: map[string]p.MetricMap{ "kube_cronjob_info": p.InfoMetric(), @@ -122,8 +122,6 @@ func (m *CronJobMetricSet) Fetch(reporter mb.ReporterV2) { return } } - - return } // Close stops this metricset diff --git a/metricbeat/module/kubernetes/state_daemonset/state_daemonset.go b/metricbeat/module/kubernetes/state_daemonset/state_daemonset.go index 1066afa2195..992d433568f 100644 --- a/metricbeat/module/kubernetes/state_daemonset/state_daemonset.go +++ b/metricbeat/module/kubernetes/state_daemonset/state_daemonset.go @@ -89,7 +89,11 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return &MetricSet{ BaseMetricSet: base, prometheus: prometheus, +<<<<<<< HEAD enricher: util.NewResourceMetadataEnricher(base, &kubernetes.ReplicaSet{}, false), +======= + enricher: util.NewResourceMetadataEnricher(base, &kubernetes.DaemonSet{}, mod.GetPerfMetricsCache(), false), +>>>>>>> 4b625fc411 (Feature/cache expiration (#31785)) mod: mod, }, nil } diff --git a/metricbeat/module/kubernetes/state_deployment/state_deployment.go b/metricbeat/module/kubernetes/state_deployment/state_deployment.go index 02448afdc9a..9d8c5d84e8e 100644 --- a/metricbeat/module/kubernetes/state_deployment/state_deployment.go +++ b/metricbeat/module/kubernetes/state_deployment/state_deployment.go @@ -90,7 +90,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return &MetricSet{ BaseMetricSet: base, prometheus: prometheus, - enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Deployment{}, false), + enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Deployment{}, mod.GetPerfMetricsCache(), false), mod: mod, }, nil } @@ -128,8 +128,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) { return } } - - return } // Close stops this metricset diff --git a/metricbeat/module/kubernetes/state_job/state_job.go b/metricbeat/module/kubernetes/state_job/state_job.go index 1b072eae742..0223b40887d 100644 --- a/metricbeat/module/kubernetes/state_job/state_job.go +++ b/metricbeat/module/kubernetes/state_job/state_job.go @@ -108,7 +108,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return &MetricSet{ BaseMetricSet: base, prometheus: prometheus, - enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Job{}, false), + enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Job{}, mod.GetPerfMetricsCache(), false), mod: mod, }, nil } @@ -145,8 +145,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) { return } } - - return } // Close stops this metricset diff --git a/metricbeat/module/kubernetes/state_node/state_node.go b/metricbeat/module/kubernetes/state_node/state_node.go index 99d1f351367..a0e70dc13a4 100644 --- a/metricbeat/module/kubernetes/state_node/state_node.go +++ b/metricbeat/module/kubernetes/state_node/state_node.go @@ -114,7 +114,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return &MetricSet{ BaseMetricSet: base, prometheus: prometheus, - enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Node{}, false), + enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Node{}, mod.GetPerfMetricsCache(), false), mod: mod, }, nil } @@ -151,8 +151,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) { return } } - - return } // Close stops this metricset diff --git a/metricbeat/module/kubernetes/state_persistentvolume/state_persistentvolume.go b/metricbeat/module/kubernetes/state_persistentvolume/state_persistentvolume.go index 90847c63137..da8e41c2ec1 100644 --- a/metricbeat/module/kubernetes/state_persistentvolume/state_persistentvolume.go +++ b/metricbeat/module/kubernetes/state_persistentvolume/state_persistentvolume.go @@ -54,6 +54,10 @@ func NewPersistentVolumeMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) { BaseMetricSet: base, prometheus: prometheus, mod: mod, +<<<<<<< HEAD +======= + enricher: util.NewResourceMetadataEnricher(base, &kubernetes.PersistentVolume{}, mod.GetPerfMetricsCache(), false), +>>>>>>> 4b625fc411 (Feature/cache expiration (#31785)) mapping: &p.MetricsMapping{ Metrics: map[string]p.MetricMap{ "kube_persistentvolume_capacity_bytes": p.Metric("capacity.bytes"), diff --git a/metricbeat/module/kubernetes/state_persistentvolumeclaim/state_persistentvolumeclaim.go b/metricbeat/module/kubernetes/state_persistentvolumeclaim/state_persistentvolumeclaim.go index 3cf414c498a..bbbecfa4c5b 100644 --- a/metricbeat/module/kubernetes/state_persistentvolumeclaim/state_persistentvolumeclaim.go +++ b/metricbeat/module/kubernetes/state_persistentvolumeclaim/state_persistentvolumeclaim.go @@ -54,6 +54,10 @@ func NewpersistentvolumeclaimMetricSet(base mb.BaseMetricSet) (mb.MetricSet, err BaseMetricSet: base, prometheus: prometheus, mod: mod, +<<<<<<< HEAD +======= + enricher: util.NewResourceMetadataEnricher(base, &kubernetes.PersistentVolumeClaim{}, mod.GetPerfMetricsCache(), false), +>>>>>>> 4b625fc411 (Feature/cache expiration (#31785)) mapping: &p.MetricsMapping{ Metrics: map[string]p.MetricMap{ diff --git a/metricbeat/module/kubernetes/state_pod/state_pod.go b/metricbeat/module/kubernetes/state_pod/state_pod.go index ea742ab26bd..09d9184af1c 100644 --- a/metricbeat/module/kubernetes/state_pod/state_pod.go +++ b/metricbeat/module/kubernetes/state_pod/state_pod.go @@ -92,7 +92,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return &MetricSet{ BaseMetricSet: base, prometheus: prometheus, - enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Pod{}, false), + enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Pod{}, mod.GetPerfMetricsCache(), false), mod: mod, }, nil } @@ -130,8 +130,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) { return } } - - return } // Close stops this metricset diff --git a/metricbeat/module/kubernetes/state_replicaset/state_replicaset.go b/metricbeat/module/kubernetes/state_replicaset/state_replicaset.go index 7ee8e62864a..a2622dec883 100644 --- a/metricbeat/module/kubernetes/state_replicaset/state_replicaset.go +++ b/metricbeat/module/kubernetes/state_replicaset/state_replicaset.go @@ -90,7 +90,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return &MetricSet{ BaseMetricSet: base, prometheus: prometheus, - enricher: util.NewResourceMetadataEnricher(base, &kubernetes.ReplicaSet{}, false), + enricher: util.NewResourceMetadataEnricher(base, &kubernetes.ReplicaSet{}, mod.GetPerfMetricsCache(), false), mod: mod, }, nil } @@ -127,8 +127,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) { return } } - - return } // Close stops this metricset diff --git a/metricbeat/module/kubernetes/state_resourcequota/state_resourcequota.go b/metricbeat/module/kubernetes/state_resourcequota/state_resourcequota.go index eb8bc6ddf35..f4a95a6d25b 100644 --- a/metricbeat/module/kubernetes/state_resourcequota/state_resourcequota.go +++ b/metricbeat/module/kubernetes/state_resourcequota/state_resourcequota.go @@ -97,5 +97,4 @@ func (m *ResourceQuotaMetricSet) Fetch(reporter mb.ReporterV2) { return } } - return } diff --git a/metricbeat/module/kubernetes/state_service/state_service.go b/metricbeat/module/kubernetes/state_service/state_service.go index 6794c36c44f..e2ff41fea85 100644 --- a/metricbeat/module/kubernetes/state_service/state_service.go +++ b/metricbeat/module/kubernetes/state_service/state_service.go @@ -86,7 +86,7 @@ func NewServiceMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) { "hostname": p.Label("ingress_hostname"), }, }, - enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Service{}, false), + enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Service{}, mod.GetPerfMetricsCache(), false), }, nil } @@ -122,7 +122,6 @@ func (m *ServiceMetricSet) Fetch(reporter mb.ReporterV2) { return } } - return } // Close stops this metricset diff --git a/metricbeat/module/kubernetes/state_statefulset/state_statefulset.go b/metricbeat/module/kubernetes/state_statefulset/state_statefulset.go index c2c02195206..807e53fe696 100644 --- a/metricbeat/module/kubernetes/state_statefulset/state_statefulset.go +++ b/metricbeat/module/kubernetes/state_statefulset/state_statefulset.go @@ -90,7 +90,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return &MetricSet{ BaseMetricSet: base, prometheus: prometheus, - enricher: util.NewResourceMetadataEnricher(base, &kubernetes.StatefulSet{}, false), + enricher: util.NewResourceMetadataEnricher(base, &kubernetes.StatefulSet{}, mod.GetPerfMetricsCache(), false), mod: mod, }, nil } @@ -127,8 +127,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) { return } } - - return } // Close stops this metricset diff --git a/metricbeat/module/kubernetes/state_storageclass/state_storageclass.go b/metricbeat/module/kubernetes/state_storageclass/state_storageclass.go index 820a052a05a..5a0ce3d0109 100644 --- a/metricbeat/module/kubernetes/state_storageclass/state_storageclass.go +++ b/metricbeat/module/kubernetes/state_storageclass/state_storageclass.go @@ -99,5 +99,4 @@ func (m *StorageClassMetricSet) Fetch(reporter mb.ReporterV2) { return } } - return } diff --git a/metricbeat/module/kubernetes/util/kubernetes.go b/metricbeat/module/kubernetes/util/kubernetes.go index e20b4da74c0..739c6890f8a 100644 --- a/metricbeat/module/kubernetes/util/kubernetes.go +++ b/metricbeat/module/kubernetes/util/kubernetes.go @@ -18,6 +18,7 @@ package util import ( + "errors" "fmt" "strings" "sync" @@ -78,10 +79,16 @@ const selector = "kubernetes" func NewResourceMetadataEnricher( base mb.BaseMetricSet, res kubernetes.Resource, + perfMetrics *PerfMetricsCache, nodeScope bool) Enricher { +<<<<<<< HEAD config := validatedConfig(base) if config == nil { +======= + config, err := GetValidatedConfig(base) + if err != nil { +>>>>>>> 4b625fc411 (Feature/cache expiration (#31785)) logp.Info("Kubernetes metricset enriching is disabled") return &nilEnricher{} } @@ -120,12 +127,12 @@ func NewResourceMetadataEnricher( name := r.GetObjectMeta().GetName() if cpu, ok := r.Status.Capacity["cpu"]; ok { if q, err := resource.ParseQuantity(cpu.String()); err == nil { - PerfMetrics.NodeCoresAllocatable.Set(name, float64(q.MilliValue())/1000) + perfMetrics.NodeCoresAllocatable.Set(name, float64(q.MilliValue())/1000) } } if memory, ok := r.Status.Capacity["memory"]; ok { if q, err := resource.ParseQuantity(memory.String()); err == nil { - PerfMetrics.NodeMemAllocatable.Set(name, float64(q.Value())) + perfMetrics.NodeMemAllocatable.Set(name, float64(q.Value())) } } @@ -173,10 +180,16 @@ func NewResourceMetadataEnricher( // NewContainerMetadataEnricher returns an Enricher configured for container events func NewContainerMetadataEnricher( base mb.BaseMetricSet, + perfMetrics *PerfMetricsCache, nodeScope bool) Enricher { +<<<<<<< HEAD config := validatedConfig(base) if config == nil { +======= + config, err := GetValidatedConfig(base) + if err != nil { +>>>>>>> 4b625fc411 (Feature/cache expiration (#31785)) logp.Info("Kubernetes metricset enriching is disabled") return &nilEnricher{} } @@ -215,12 +228,12 @@ func NewContainerMetadataEnricher( // Report container limits to PerfMetrics cache if cpu, ok := container.Resources.Limits["cpu"]; ok { if q, err := resource.ParseQuantity(cpu.String()); err == nil { - PerfMetrics.ContainerCoresLimit.Set(cuid, float64(q.MilliValue())/1000) + perfMetrics.ContainerCoresLimit.Set(cuid, float64(q.MilliValue())/1000) } } if memory, ok := container.Resources.Limits["memory"]; ok { if q, err := resource.ParseQuantity(memory.String()); err == nil { - PerfMetrics.ContainerMemLimit.Set(cuid, float64(q.Value())) + perfMetrics.ContainerMemLimit.Set(cuid, float64(q.Value())) } } @@ -314,21 +327,44 @@ func GetDefaultDisabledMetaConfig() *kubernetesConfig { } } +<<<<<<< HEAD func validatedConfig(base mb.BaseMetricSet) *kubernetesConfig { config := kubernetesConfig{ +======= +func GetValidatedConfig(base mb.BaseMetricSet) (*kubernetesConfig, error) { + config, err := GetConfig(base) + if err != nil { + logp.Err("Error while getting config: %v", err) + return nil, err + } + + config, err = validateConfig(config) + if err != nil { + logp.Err("Error while validating config: %v", err) + return nil, err + } + return config, nil +} + +func validateConfig(config *kubernetesConfig) (*kubernetesConfig, error) { + if !config.AddMetadata { + return nil, errors.New("metadata enriching is disabled") + } + return config, nil +} + +func GetConfig(base mb.BaseMetricSet) (*kubernetesConfig, error) { + config := &kubernetesConfig{ +>>>>>>> 4b625fc411 (Feature/cache expiration (#31785)) AddMetadata: true, SyncPeriod: time.Minute * 10, AddResourceMetadata: metadata.GetDefaultResourceMetadataConfig(), } if err := base.Module().UnpackConfig(&config); err != nil { - return nil + return nil, errors.New("error unpacking configs") } - // Return nil if metadata enriching is disabled: - if !config.AddMetadata { - return nil - } - return &config + return config, nil } func getString(m common.MapStr, key string) string { diff --git a/metricbeat/module/kubernetes/util/metrics_cache.go b/metricbeat/module/kubernetes/util/metrics_cache.go index 3fdf59d8b16..60bde73336e 100644 --- a/metricbeat/module/kubernetes/util/metrics_cache.go +++ b/metricbeat/module/kubernetes/util/metrics_cache.go @@ -23,23 +23,14 @@ import ( "github.com/elastic/beats/v7/libbeat/common" ) -// PerfMetrics stores known metrics from Kubernetes nodes and containers -var PerfMetrics = NewPerfMetricsCache() - -func init() { - PerfMetrics.Start() -} - -const defaultTimeout = 120 * time.Second - // NewPerfMetricsCache initializes and returns a new PerfMetricsCache -func NewPerfMetricsCache() *PerfMetricsCache { +func NewPerfMetricsCache(timeout time.Duration) *PerfMetricsCache { return &PerfMetricsCache{ - NodeMemAllocatable: newValueMap(defaultTimeout), - NodeCoresAllocatable: newValueMap(defaultTimeout), + NodeMemAllocatable: newValueMap(timeout), + NodeCoresAllocatable: newValueMap(timeout), - ContainerMemLimit: newValueMap(defaultTimeout), - ContainerCoresLimit: newValueMap(defaultTimeout), + ContainerMemLimit: newValueMap(timeout), + ContainerCoresLimit: newValueMap(timeout), } } @@ -68,6 +59,43 @@ func (c *PerfMetricsCache) Stop() { c.ContainerCoresLimit.Stop() } +// Returns the maximum timeout of all the caches under PerfMetricsCache +func (c *PerfMetricsCache) GetTimeout() time.Duration { + var ans time.Duration = 0 + + nmATimeout := c.NodeMemAllocatable.GetTimeout() + if nmATimeout > ans { + ans = nmATimeout + } + + ncATimeout := c.NodeCoresAllocatable.GetTimeout() + if ncATimeout > ans { + ans = ncATimeout + } + + cmLTimeout := c.ContainerMemLimit.GetTimeout() + if cmLTimeout > ans { + ans = cmLTimeout + } + + ccLTimeout := c.ContainerCoresLimit.GetTimeout() + if ccLTimeout > ans { + ans = ccLTimeout + } + return ans +} + +// Set the timeout of all the caches under PerfMetricsCache, then Stop and Start all the cache janitors +func (c *PerfMetricsCache) SetOrUpdateTimeout(timeout time.Duration) { + c.NodeMemAllocatable.SetTimeout(timeout) + c.NodeCoresAllocatable.SetTimeout(timeout) + c.ContainerMemLimit.SetTimeout(timeout) + c.ContainerCoresLimit.SetTimeout(timeout) + + c.Stop() + c.Start() +} + type valueMap struct { cache *common.Cache timeout time.Duration @@ -109,6 +137,14 @@ func (m *valueMap) Stop() { m.cache.StopJanitor() } +func (m *valueMap) GetTimeout() time.Duration { + return m.timeout +} + +func (m *valueMap) SetTimeout(timeout time.Duration) { + m.timeout = timeout +} + // ContainerUID creates an unique ID for from namespace, pod name and container name func ContainerUID(namespace, pod, container string) string { return namespace + "/" + pod + "/" + container diff --git a/metricbeat/module/kubernetes/util/metrics_cache_test.go b/metricbeat/module/kubernetes/util/metrics_cache_test.go index 649c1f5fb86..07b447c5fe7 100644 --- a/metricbeat/module/kubernetes/util/metrics_cache_test.go +++ b/metricbeat/module/kubernetes/util/metrics_cache_test.go @@ -19,12 +19,13 @@ package util import ( "testing" + "time" "github.com/stretchr/testify/assert" ) func TestValueMap(t *testing.T) { - test := newValueMap(defaultTimeout) + test := newValueMap(120 * time.Second) // no value assert.Equal(t, 0.0, test.Get("foo")) @@ -35,7 +36,7 @@ func TestValueMap(t *testing.T) { } func TestGetWithDefault(t *testing.T) { - test := newValueMap(defaultTimeout) + test := newValueMap(120 * time.Second) // Empty + default assert.Equal(t, 0.0, test.Get("foo")) diff --git a/metricbeat/module/kubernetes/volume/volume.go b/metricbeat/module/kubernetes/volume/volume.go index 77b446a56f1..8ade1a7a718 100644 --- a/metricbeat/module/kubernetes/volume/volume.go +++ b/metricbeat/module/kubernetes/volume/volume.go @@ -78,7 +78,31 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { BaseMetricSet: base, http: http, mod: mod, +<<<<<<< HEAD }, nil +======= + } + + // add ECS orchestrator fields + config, err := util.GetValidatedConfig(base) + if err != nil { + logp.Info("Kubernetes metricset enriching is disabled") + } else { + client, err := kubernetes.GetKubernetesClient(config.KubeConfig, config.KubeClientOptions) + if err != nil { + return nil, fmt.Errorf("fail to get kubernetes client: %w", err) + } + cfg, _ := conf.NewConfigFrom(&config) + ecsClusterMeta, err := util.GetClusterECSMeta(cfg, client, ms.Logger()) + if err != nil { + ms.Logger().Debugf("could not retrieve cluster metadata: %w", err) + } + if ecsClusterMeta != nil { + ms.clusterMeta = ecsClusterMeta + } + } + return ms, nil +>>>>>>> 4b625fc411 (Feature/cache expiration (#31785)) } // Fetch methods implements the data gathering and data conversion to the right From 0993eaca983cd03d6726e278c83dc63f71d4eaff Mon Sep 17 00:00:00 2001 From: Giuseppe Santoro Date: Mon, 20 Jun 2022 10:50:20 +0200 Subject: [PATCH 2/5] resolved conflicts --- .../module/kubernetes/container/container.go | 6 +---- metricbeat/module/kubernetes/pod/pod.go | 6 +---- .../state_daemonset/state_daemonset.go | 6 +---- .../state_persistentvolume.go | 4 ---- .../state_persistentvolumeclaim.go | 4 ---- .../module/kubernetes/util/kubernetes.go | 15 ------------ metricbeat/module/kubernetes/volume/volume.go | 24 ------------------- 7 files changed, 3 insertions(+), 62 deletions(-) diff --git a/metricbeat/module/kubernetes/container/container.go b/metricbeat/module/kubernetes/container/container.go index 050f41072c1..ef18f5d4484 100644 --- a/metricbeat/module/kubernetes/container/container.go +++ b/metricbeat/module/kubernetes/container/container.go @@ -93,11 +93,7 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) { return } -<<<<<<< HEAD - events, err := eventMapping(body, util.PerfMetrics) -======= - events, err := eventMapping(body, m.mod.GetPerfMetricsCache(), m.Logger()) ->>>>>>> 4b625fc411 (Feature/cache expiration (#31785)) + events, err := eventMapping(body, m.mod.GetPerfMetricsCache()) if err != nil { m.Logger().Error(err) reporter.Error(err) diff --git a/metricbeat/module/kubernetes/pod/pod.go b/metricbeat/module/kubernetes/pod/pod.go index 75aa66dfe59..0cfc6ad5f21 100644 --- a/metricbeat/module/kubernetes/pod/pod.go +++ b/metricbeat/module/kubernetes/pod/pod.go @@ -94,11 +94,7 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) { return } -<<<<<<< HEAD - events, err := eventMapping(body, util.PerfMetrics) -======= - events, err := eventMapping(body, m.mod.GetPerfMetricsCache(), m.Logger()) ->>>>>>> 4b625fc411 (Feature/cache expiration (#31785)) + events, err := eventMapping(body, m.mod.GetPerfMetricsCache()) if err != nil { m.Logger().Error(err) reporter.Error(err) diff --git a/metricbeat/module/kubernetes/state_daemonset/state_daemonset.go b/metricbeat/module/kubernetes/state_daemonset/state_daemonset.go index 992d433568f..9a6aaeca944 100644 --- a/metricbeat/module/kubernetes/state_daemonset/state_daemonset.go +++ b/metricbeat/module/kubernetes/state_daemonset/state_daemonset.go @@ -89,11 +89,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return &MetricSet{ BaseMetricSet: base, prometheus: prometheus, -<<<<<<< HEAD - enricher: util.NewResourceMetadataEnricher(base, &kubernetes.ReplicaSet{}, false), -======= - enricher: util.NewResourceMetadataEnricher(base, &kubernetes.DaemonSet{}, mod.GetPerfMetricsCache(), false), ->>>>>>> 4b625fc411 (Feature/cache expiration (#31785)) + enricher: util.NewResourceMetadataEnricher(base, &kubernetes.ReplicaSet{}, mod.GetPerfMetricsCache(), false), mod: mod, }, nil } diff --git a/metricbeat/module/kubernetes/state_persistentvolume/state_persistentvolume.go b/metricbeat/module/kubernetes/state_persistentvolume/state_persistentvolume.go index da8e41c2ec1..90847c63137 100644 --- a/metricbeat/module/kubernetes/state_persistentvolume/state_persistentvolume.go +++ b/metricbeat/module/kubernetes/state_persistentvolume/state_persistentvolume.go @@ -54,10 +54,6 @@ func NewPersistentVolumeMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) { BaseMetricSet: base, prometheus: prometheus, mod: mod, -<<<<<<< HEAD -======= - enricher: util.NewResourceMetadataEnricher(base, &kubernetes.PersistentVolume{}, mod.GetPerfMetricsCache(), false), ->>>>>>> 4b625fc411 (Feature/cache expiration (#31785)) mapping: &p.MetricsMapping{ Metrics: map[string]p.MetricMap{ "kube_persistentvolume_capacity_bytes": p.Metric("capacity.bytes"), diff --git a/metricbeat/module/kubernetes/state_persistentvolumeclaim/state_persistentvolumeclaim.go b/metricbeat/module/kubernetes/state_persistentvolumeclaim/state_persistentvolumeclaim.go index bbbecfa4c5b..3cf414c498a 100644 --- a/metricbeat/module/kubernetes/state_persistentvolumeclaim/state_persistentvolumeclaim.go +++ b/metricbeat/module/kubernetes/state_persistentvolumeclaim/state_persistentvolumeclaim.go @@ -54,10 +54,6 @@ func NewpersistentvolumeclaimMetricSet(base mb.BaseMetricSet) (mb.MetricSet, err BaseMetricSet: base, prometheus: prometheus, mod: mod, -<<<<<<< HEAD -======= - enricher: util.NewResourceMetadataEnricher(base, &kubernetes.PersistentVolumeClaim{}, mod.GetPerfMetricsCache(), false), ->>>>>>> 4b625fc411 (Feature/cache expiration (#31785)) mapping: &p.MetricsMapping{ Metrics: map[string]p.MetricMap{ diff --git a/metricbeat/module/kubernetes/util/kubernetes.go b/metricbeat/module/kubernetes/util/kubernetes.go index 739c6890f8a..19f3a710284 100644 --- a/metricbeat/module/kubernetes/util/kubernetes.go +++ b/metricbeat/module/kubernetes/util/kubernetes.go @@ -82,13 +82,8 @@ func NewResourceMetadataEnricher( perfMetrics *PerfMetricsCache, nodeScope bool) Enricher { -<<<<<<< HEAD - config := validatedConfig(base) - if config == nil { -======= config, err := GetValidatedConfig(base) if err != nil { ->>>>>>> 4b625fc411 (Feature/cache expiration (#31785)) logp.Info("Kubernetes metricset enriching is disabled") return &nilEnricher{} } @@ -183,13 +178,8 @@ func NewContainerMetadataEnricher( perfMetrics *PerfMetricsCache, nodeScope bool) Enricher { -<<<<<<< HEAD - config := validatedConfig(base) - if config == nil { -======= config, err := GetValidatedConfig(base) if err != nil { ->>>>>>> 4b625fc411 (Feature/cache expiration (#31785)) logp.Info("Kubernetes metricset enriching is disabled") return &nilEnricher{} } @@ -327,10 +317,6 @@ func GetDefaultDisabledMetaConfig() *kubernetesConfig { } } -<<<<<<< HEAD -func validatedConfig(base mb.BaseMetricSet) *kubernetesConfig { - config := kubernetesConfig{ -======= func GetValidatedConfig(base mb.BaseMetricSet) (*kubernetesConfig, error) { config, err := GetConfig(base) if err != nil { @@ -355,7 +341,6 @@ func validateConfig(config *kubernetesConfig) (*kubernetesConfig, error) { func GetConfig(base mb.BaseMetricSet) (*kubernetesConfig, error) { config := &kubernetesConfig{ ->>>>>>> 4b625fc411 (Feature/cache expiration (#31785)) AddMetadata: true, SyncPeriod: time.Minute * 10, AddResourceMetadata: metadata.GetDefaultResourceMetadataConfig(), diff --git a/metricbeat/module/kubernetes/volume/volume.go b/metricbeat/module/kubernetes/volume/volume.go index 8ade1a7a718..77b446a56f1 100644 --- a/metricbeat/module/kubernetes/volume/volume.go +++ b/metricbeat/module/kubernetes/volume/volume.go @@ -78,31 +78,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { BaseMetricSet: base, http: http, mod: mod, -<<<<<<< HEAD }, nil -======= - } - - // add ECS orchestrator fields - config, err := util.GetValidatedConfig(base) - if err != nil { - logp.Info("Kubernetes metricset enriching is disabled") - } else { - client, err := kubernetes.GetKubernetesClient(config.KubeConfig, config.KubeClientOptions) - if err != nil { - return nil, fmt.Errorf("fail to get kubernetes client: %w", err) - } - cfg, _ := conf.NewConfigFrom(&config) - ecsClusterMeta, err := util.GetClusterECSMeta(cfg, client, ms.Logger()) - if err != nil { - ms.Logger().Debugf("could not retrieve cluster metadata: %w", err) - } - if ecsClusterMeta != nil { - ms.clusterMeta = ecsClusterMeta - } - } - return ms, nil ->>>>>>> 4b625fc411 (Feature/cache expiration (#31785)) } // Fetch methods implements the data gathering and data conversion to the right From c4d17fc6c8a3839c088bb723542091c4ae7fe5d7 Mon Sep 17 00:00:00 2001 From: Giuseppe Santoro Date: Mon, 20 Jun 2022 12:11:07 +0200 Subject: [PATCH 3/5] fixed changelog and backported daemonset --- CHANGELOG.next.asciidoc | 2 -- libbeat/common/kubernetes/informer.go | 12 ++++++++++++ libbeat/common/kubernetes/types.go | 3 +++ .../kubernetes/state_daemonset/state_daemonset.go | 2 +- metricbeat/module/kubernetes/util/kubernetes.go | 2 ++ 5 files changed, 18 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index bf71117a0a6..0ac4eba2683 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -48,9 +48,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] *Metricbeat* -- make `system/filesystem` code sensitive to `hostfs` and migrate libraries to `elastic-agent-opts` {pull}31001[31001] - Fix kubernetes module's internal cache expiration issue. This avoid metrics like `kubernetes.container.cpu.usage.limit.pct` from not being populated. {pull}31785[31785] -- add missing HealthyHostCount and UnHealthyHostCount for application ELB. {pull}31853[31853] *Packetbeat* diff --git a/libbeat/common/kubernetes/informer.go b/libbeat/common/kubernetes/informer.go index 690c8882fef..2ceb1bedb3c 100644 --- a/libbeat/common/kubernetes/informer.go +++ b/libbeat/common/kubernetes/informer.go @@ -137,6 +137,18 @@ func NewInformer(client kubernetes.Interface, resource Resource, opts WatchOptio } objType = "statefulset" + case *DaemonSet: + ss := client.AppsV1().DaemonSets(opts.Namespace) + listwatch = &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return ss.List(ctx, options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return ss.Watch(ctx, options) + }, + } + + objType = "daemonset" case *Service: svc := client.CoreV1().Services(opts.Namespace) listwatch = &cache.ListWatch{ diff --git a/libbeat/common/kubernetes/types.go b/libbeat/common/kubernetes/types.go index 65db664d82c..cb506f4bc38 100644 --- a/libbeat/common/kubernetes/types.go +++ b/libbeat/common/kubernetes/types.go @@ -73,6 +73,9 @@ type ReplicaSet = appsv1.ReplicaSet // StatefulSet data type StatefulSet = appsv1.StatefulSet +// DaemonSet data +type DaemonSet = appsv1.DaemonSet + // Service data type Service = v1.Service diff --git a/metricbeat/module/kubernetes/state_daemonset/state_daemonset.go b/metricbeat/module/kubernetes/state_daemonset/state_daemonset.go index 9a6aaeca944..5d61d4fbdef 100644 --- a/metricbeat/module/kubernetes/state_daemonset/state_daemonset.go +++ b/metricbeat/module/kubernetes/state_daemonset/state_daemonset.go @@ -89,7 +89,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return &MetricSet{ BaseMetricSet: base, prometheus: prometheus, - enricher: util.NewResourceMetadataEnricher(base, &kubernetes.ReplicaSet{}, mod.GetPerfMetricsCache(), false), + enricher: util.NewResourceMetadataEnricher(base, &kubernetes.DaemonSet{}, mod.GetPerfMetricsCache(), false), mod: mod, }, nil } diff --git a/metricbeat/module/kubernetes/util/kubernetes.go b/metricbeat/module/kubernetes/util/kubernetes.go index 19f3a710284..32b58dd231a 100644 --- a/metricbeat/module/kubernetes/util/kubernetes.go +++ b/metricbeat/module/kubernetes/util/kubernetes.go @@ -147,6 +147,8 @@ func NewResourceMetadataEnricher( m[id] = metaGen.Generate("namespace", r) case *kubernetes.ReplicaSet: m[id] = metaGen.Generate("replicaset", r) + case *kubernetes.DaemonSet: + m[id] = metaGen.Generate("daemonset", r) default: m[id] = metaGen.Generate(r.GetObjectKind().GroupVersionKind().Kind, r) } From 34e7b5ef90dc278a28c1faa1e10273588c8b10c5 Mon Sep 17 00:00:00 2001 From: Giuseppe Santoro Date: Mon, 20 Jun 2022 12:39:22 +0200 Subject: [PATCH 4/5] fixed linter issues --- metricbeat/module/kubernetes/kubernetes.go | 4 ++-- metricbeat/module/kubernetes/node/node.go | 5 ---- .../state_container/state_container.go | 16 +++++-------- .../state_daemonset/state_daemonset.go | 2 -- .../module/kubernetes/util/kubernetes.go | 24 ++++++++++++++++--- 5 files changed, 29 insertions(+), 22 deletions(-) diff --git a/metricbeat/module/kubernetes/kubernetes.go b/metricbeat/module/kubernetes/kubernetes.go index 594dcf58907..dc09f5a4a34 100644 --- a/metricbeat/module/kubernetes/kubernetes.go +++ b/metricbeat/module/kubernetes/kubernetes.go @@ -18,11 +18,11 @@ package kubernetes import ( + "fmt" "sync" "time" "github.com/mitchellh/hashstructure" - "github.com/pkg/errors" dto "github.com/prometheus/client_model/go" "github.com/elastic/beats/v7/metricbeat/helper" @@ -101,7 +101,7 @@ func ModuleBuilder() func(base mb.BaseModule) (mb.Module, error) { return func(base mb.BaseModule) (mb.Module, error) { hash, err := generateCacheHash(base.Config().Hosts) if err != nil { - return nil, errors.Wrap(err, "error generating cache hash for kubeStateMetricsCache") + return nil, fmt.Errorf("error generating cache hash for kubeStateMetricsCache: %w", err) } // NOTE: `Period * 2` is an arbitrary value to make the cache NEVER to expire before the next scraping run diff --git a/metricbeat/module/kubernetes/node/node.go b/metricbeat/module/kubernetes/node/node.go index f6f0a7d4cab..9859394b39c 100644 --- a/metricbeat/module/kubernetes/node/node.go +++ b/metricbeat/module/kubernetes/node/node.go @@ -22,7 +22,6 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/kubernetes" - "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/metricbeat/helper" "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/beats/v7/metricbeat/mb/parse" @@ -40,8 +39,6 @@ var ( DefaultScheme: defaultScheme, DefaultPath: defaultPath, }.Build() - - logger = logp.NewLogger("kubernetes.node") ) // init registers the MetricSet with the central registry. @@ -115,8 +112,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) { m.Logger().Debug("error trying to emit event") return } - - return } // Close stops this metricset diff --git a/metricbeat/module/kubernetes/state_container/state_container.go b/metricbeat/module/kubernetes/state_container/state_container.go index d997e5c2c75..3e2a0b17a5c 100644 --- a/metricbeat/module/kubernetes/state_container/state_container.go +++ b/metricbeat/module/kubernetes/state_container/state_container.go @@ -21,8 +21,6 @@ import ( "fmt" "strings" - "github.com/pkg/errors" - "github.com/elastic/beats/v7/libbeat/common" p "github.com/elastic/beats/v7/metricbeat/helper/prometheus" "github.com/elastic/beats/v7/metricbeat/mb" @@ -34,8 +32,6 @@ import ( const ( defaultScheme = "http" defaultPath = "/metrics" - // Nanocores conversion 10^9 - nanocores = 1000000000 ) var ( @@ -135,11 +131,11 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { families, err := m.mod.GetStateMetricsFamilies(m.prometheus) if err != nil { - return errors.Wrap(err, "error getting families") + return fmt.Errorf("error getting families: %w", err) } events, err := m.prometheus.ProcessMetrics(families, mapping) if err != nil { - return errors.Wrap(err, "error getting event") + return fmt.Errorf("error getting event: %w", err) } m.enricher.Enrich(events) @@ -154,15 +150,15 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { cID := (containerID).(string) split := strings.Index(cID, "://") if split != -1 { - containerFields.Put("runtime", cID[:split]) - containerFields.Put("id", cID[split+3:]) + util.ShouldPut(containerFields, "runtime", cID[:split], m.Logger()) + util.ShouldPut(containerFields, "id", cID[split+3:], m.Logger()) } } if containerImage, ok := event["image"]; ok { cImage := (containerImage).(string) - containerFields.Put("image.name", cImage) + util.ShouldPut(containerFields, "image.name", cImage, m.Logger()) // remove kubernetes.container.image field as value is the same as ECS container.image.name field - event.Delete("image") + util.ShouldDelete(event, "image", m.Logger()) } e, err := util.CreateEvent(event, "kubernetes.container") diff --git a/metricbeat/module/kubernetes/state_daemonset/state_daemonset.go b/metricbeat/module/kubernetes/state_daemonset/state_daemonset.go index 5d61d4fbdef..05b70d3f74b 100644 --- a/metricbeat/module/kubernetes/state_daemonset/state_daemonset.go +++ b/metricbeat/module/kubernetes/state_daemonset/state_daemonset.go @@ -126,8 +126,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) { return } } - - return } // Close stops this metricset diff --git a/metricbeat/module/kubernetes/util/kubernetes.go b/metricbeat/module/kubernetes/util/kubernetes.go index 32b58dd231a..82d31e27d2a 100644 --- a/metricbeat/module/kubernetes/util/kubernetes.go +++ b/metricbeat/module/kubernetes/util/kubernetes.go @@ -234,8 +234,9 @@ func NewContainerMetadataEnricher( // which is in the form of :// split := strings.Index(s.ContainerID, "://") if split != -1 { - meta.Put("container.id", s.ContainerID[split+3:]) - meta.Put("container.runtime", s.ContainerID[:split]) + ShouldPut(meta, "container.id", s.ContainerID[split+3:], base.Logger()) + + ShouldPut(meta, "container.runtime", s.ContainerID[:split], base.Logger()) } } id := join(pod.GetObjectMeta().GetNamespace(), pod.GetObjectMeta().GetName(), container.Name) @@ -472,7 +473,10 @@ func (m *enricher) Enrich(events []common.MapStr) { delete(k8sMeta, "pod") } ecsMeta := meta.Clone() - ecsMeta.Delete("kubernetes") + err = ecsMeta.Delete("kubernetes") + if err != nil { + logp.Debug("kubernetes", "Failed to delete field '%s': %s", "kubernetes", err) + } event.DeepUpdate(common.MapStr{ mb.ModuleDataKey: k8sMeta, "meta": ecsMeta, @@ -520,3 +524,17 @@ func CreateEvent(event common.MapStr, namespace string) (mb.Event, error) { } return e, err } + +func ShouldPut(event common.MapStr, field string, value interface{}, logger *logp.Logger) { + _, err := event.Put(field, value) + if err != nil { + logger.Debugf("Failed to put field '%s' with value '%s': %s", field, value, err) + } +} + +func ShouldDelete(event common.MapStr, field string, logger *logp.Logger) { + err := event.Delete(field) + if err != nil { + logger.Debugf("Failed to delete field '%s': %s", field, err) + } +} From ad3e87723a8f61c46b55ca127c32ec97990cff45 Mon Sep 17 00:00:00 2001 From: Giuseppe Santoro Date: Mon, 20 Jun 2022 12:59:01 +0200 Subject: [PATCH 5/5] fixed more linter errors --- .../kubernetes/state_container/state_container.go | 11 +++++++++-- metricbeat/module/kubernetes/util/kubernetes.go | 10 ++++++++-- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/metricbeat/module/kubernetes/state_container/state_container.go b/metricbeat/module/kubernetes/state_container/state_container.go index 3e2a0b17a5c..523dec36bc3 100644 --- a/metricbeat/module/kubernetes/state_container/state_container.go +++ b/metricbeat/module/kubernetes/state_container/state_container.go @@ -147,7 +147,10 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { if containerID, ok := event["id"]; ok { // we don't expect errors here, but if any we would obtain an // empty string - cID := (containerID).(string) + cID, ok := (containerID).(string) + if !ok { + m.Logger().Debugf("Error while casting containerID: %s", ok) + } split := strings.Index(cID, "://") if split != -1 { util.ShouldPut(containerFields, "runtime", cID[:split], m.Logger()) @@ -155,7 +158,11 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { } } if containerImage, ok := event["image"]; ok { - cImage := (containerImage).(string) + cImage, ok := (containerImage).(string) + if !ok { + m.Logger().Debugf("Error while casting containerImage: %s", ok) + } + util.ShouldPut(containerFields, "image.name", cImage, m.Logger()) // remove kubernetes.container.image field as value is the same as ECS container.image.name field util.ShouldDelete(event, "image", m.Logger()) diff --git a/metricbeat/module/kubernetes/util/kubernetes.go b/metricbeat/module/kubernetes/util/kubernetes.go index 82d31e27d2a..deac93945a1 100644 --- a/metricbeat/module/kubernetes/util/kubernetes.go +++ b/metricbeat/module/kubernetes/util/kubernetes.go @@ -203,7 +203,10 @@ func NewContainerMetadataEnricher( enricher := buildMetadataEnricher(watcher, nodeWatcher, namespaceWatcher, // update func(m map[string]common.MapStr, r kubernetes.Resource) { - pod := r.(*kubernetes.Pod) + pod, ok := r.(*kubernetes.Pod) + if !ok { + base.Logger().Debugf("Error while casting event: %s", ok) + } meta := metaGen.Generate(pod) statuses := make(map[string]*kubernetes.PodContainerStatus) @@ -245,7 +248,10 @@ func NewContainerMetadataEnricher( }, // delete func(m map[string]common.MapStr, r kubernetes.Resource) { - pod := r.(*kubernetes.Pod) + pod, ok := r.(*kubernetes.Pod) + if !ok { + base.Logger().Debugf("Error while casting event: %s", ok) + } for _, container := range append(pod.Spec.Containers, pod.Spec.InitContainers...) { id := join(pod.ObjectMeta.GetNamespace(), pod.GetObjectMeta().GetName(), container.Name) delete(m, id)