From fc42acd74f83ecc94de238ce1a6a9630174d912a Mon Sep 17 00:00:00 2001 From: Chris Mark Date: Fri, 28 May 2021 10:46:57 +0300 Subject: [PATCH] Refactor kubelet metricsets to share response from endpoint (#25782) (cherry picked from commit a39dd00e002caa47e63b9f10a83d6c2c031f17b0) --- CHANGELOG.next.asciidoc | 1 + .../module/kubernetes/container/container.go | 11 ++- metricbeat/module/kubernetes/kubernetes.go | 70 +++++++++++++++---- metricbeat/module/kubernetes/node/node.go | 12 +++- metricbeat/module/kubernetes/pod/pod.go | 14 +++- .../state_container/state_container.go | 2 +- .../kubernetes/state_cronjob/state_cronjob.go | 2 +- .../state_daemonset/state_daemonset.go | 2 +- .../state_deployment/state_deployment.go | 2 +- .../kubernetes/state_node/state_node.go | 2 +- .../state_persistentvolume.go | 2 +- .../state_persistentvolumeclaim.go | 2 +- .../module/kubernetes/state_pod/state_pod.go | 2 +- .../state_replicaset/state_replicaset.go | 2 +- .../state_resourcequota.go | 2 +- .../kubernetes/state_service/state_service.go | 2 +- .../state_statefulset/state_statefulset.go | 2 +- .../state_storageclass/state_storageclass.go | 2 +- metricbeat/module/kubernetes/system/system.go | 11 ++- metricbeat/module/kubernetes/volume/volume.go | 11 ++- 20 files changed, 121 insertions(+), 35 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index afc908dd604..aafaff9f4f8 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -616,6 +616,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Refactor state_* metricsets to share response from endpoint. {pull}25640[25640] - Add server id to zookeeper events. {pull}25550[25550] - Add additional network metrics to docker/network {pull}25354[25354] +- Reduce number of requests done by kubernetes metricsets to kubelet. {pull}25782[25782] *Packetbeat* diff --git a/metricbeat/module/kubernetes/container/container.go b/metricbeat/module/kubernetes/container/container.go index 718696a0a4f..8aa30094c70 100644 --- a/metricbeat/module/kubernetes/container/container.go +++ b/metricbeat/module/kubernetes/container/container.go @@ -18,12 +18,15 @@ package container import ( + "fmt" + "github.com/pkg/errors" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/metricbeat/helper" "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/beats/v7/metricbeat/mb/parse" + k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes" "github.com/elastic/beats/v7/metricbeat/module/kubernetes/util" ) @@ -58,6 +61,7 @@ type MetricSet struct { mb.BaseMetricSet http *helper.HTTP enricher util.Enricher + mod k8smod.Module } // New create a new instance of the MetricSet @@ -68,10 +72,15 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { if err != nil { return nil, err } + mod, ok := base.Module().(k8smod.Module) + if !ok { + return nil, fmt.Errorf("must be child of kubernetes module") + } return &MetricSet{ BaseMetricSet: base, http: http, enricher: util.NewContainerMetadataEnricher(base, true), + mod: mod, }, nil } @@ -81,7 +90,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { m.enricher.Start() - body, err := m.http.FetchContent() + body, err := m.mod.GetKubeletStats(m.http) if err != nil { return errors.Wrap(err, "error doing HTTP request to fetch 'container' Metricset data") diff --git a/metricbeat/module/kubernetes/kubernetes.go b/metricbeat/module/kubernetes/kubernetes.go index 53cd037f5e0..f9bd0e29e37 100644 --- a/metricbeat/module/kubernetes/kubernetes.go +++ b/metricbeat/module/kubernetes/kubernetes.go @@ -25,6 +25,7 @@ import ( "github.com/pkg/errors" dto "github.com/prometheus/client_model/go" + "github.com/elastic/beats/v7/metricbeat/helper" p "github.com/elastic/beats/v7/metricbeat/helper/prometheus" "github.com/elastic/beats/v7/metricbeat/mb" ) @@ -38,7 +39,8 @@ func init() { type Module interface { mb.Module - GetSharedFamilies(prometheus p.Prometheus) ([]*dto.MetricFamily, error) + GetStateMetricsFamilies(prometheus p.Prometheus) ([]*dto.MetricFamily, error) + GetKubeletStats(http *helper.HTTP) ([]byte, error) } type familiesCache struct { @@ -53,55 +55,95 @@ type kubeStateMetricsCache struct { } func (c *kubeStateMetricsCache) getCacheMapEntry(hash uint64) *familiesCache { - c.lock.Lock() - defer c.lock.Unlock() if _, ok := c.cacheMap[hash]; !ok { c.cacheMap[hash] = &familiesCache{} } return c.cacheMap[hash] } +type statsCache struct { + sharedStats []byte + lastFetchErr error + lastFetchTimestamp time.Time +} + +type kubeletStatsCache struct { + cacheMap map[uint64]*statsCache + lock sync.Mutex +} + +func (c *kubeletStatsCache) getCacheMapEntry(hash uint64) *statsCache { + if _, ok := c.cacheMap[hash]; !ok { + c.cacheMap[hash] = &statsCache{} + } + return c.cacheMap[hash] +} + type module struct { mb.BaseModule kubeStateMetricsCache *kubeStateMetricsCache - familiesCache *familiesCache + kubeletStatsCache *kubeletStatsCache + cacheHash uint64 } func ModuleBuilder() func(base mb.BaseModule) (mb.Module, error) { kubeStateMetricsCache := &kubeStateMetricsCache{ cacheMap: make(map[uint64]*familiesCache), } + kubeletStatsCache := &kubeletStatsCache{ + cacheMap: make(map[uint64]*statsCache), + } return func(base mb.BaseModule) (mb.Module, error) { hash, err := generateCacheHash(base.Config().Hosts) if err != nil { return nil, errors.Wrap(err, "error generating cache hash for kubeStateMetricsCache") } - // NOTE: These entries will be never removed, this can be a leak if - // metricbeat is used to monitor clusters dynamically created. - // (https://github.com/elastic/beats/pull/25640#discussion_r633395213) - familiesCache := kubeStateMetricsCache.getCacheMapEntry(hash) m := module{ BaseModule: base, kubeStateMetricsCache: kubeStateMetricsCache, - familiesCache: familiesCache, + kubeletStatsCache: kubeletStatsCache, + cacheHash: hash, } return &m, nil } } -func (m *module) GetSharedFamilies(prometheus p.Prometheus) ([]*dto.MetricFamily, error) { +func (m *module) GetStateMetricsFamilies(prometheus p.Prometheus) ([]*dto.MetricFamily, error) { m.kubeStateMetricsCache.lock.Lock() defer m.kubeStateMetricsCache.lock.Unlock() now := time.Now() + // NOTE: These entries will be never removed, this can be a leak if + // metricbeat is used to monitor clusters dynamically created. + // (https://github.com/elastic/beats/pull/25640#discussion_r633395213) + familiesCache := m.kubeStateMetricsCache.getCacheMapEntry(m.cacheHash) + + if familiesCache.lastFetchTimestamp.IsZero() || now.Sub(familiesCache.lastFetchTimestamp) > m.Config().Period { + familiesCache.sharedFamilies, familiesCache.lastFetchErr = prometheus.GetFamilies() + familiesCache.lastFetchTimestamp = now + } + + return familiesCache.sharedFamilies, familiesCache.lastFetchErr +} + +func (m *module) GetKubeletStats(http *helper.HTTP) ([]byte, error) { + m.kubeletStatsCache.lock.Lock() + defer m.kubeletStatsCache.lock.Unlock() + + now := time.Now() + + // NOTE: These entries will be never removed, this can be a leak if + // metricbeat is used to monitor clusters dynamically created. + // (https://github.com/elastic/beats/pull/25640#discussion_r633395213) + statsCache := m.kubeletStatsCache.getCacheMapEntry(m.cacheHash) - if m.familiesCache.lastFetchTimestamp.IsZero() || now.Sub(m.familiesCache.lastFetchTimestamp) > m.Config().Period { - m.familiesCache.sharedFamilies, m.familiesCache.lastFetchErr = prometheus.GetFamilies() - m.familiesCache.lastFetchTimestamp = now + if statsCache.lastFetchTimestamp.IsZero() || now.Sub(statsCache.lastFetchTimestamp) > m.Config().Period { + statsCache.sharedStats, statsCache.lastFetchErr = http.FetchContent() + statsCache.lastFetchTimestamp = now } - return m.familiesCache.sharedFamilies, m.familiesCache.lastFetchErr + return statsCache.sharedStats, statsCache.lastFetchErr } func generateCacheHash(host []string) (uint64, error) { diff --git a/metricbeat/module/kubernetes/node/node.go b/metricbeat/module/kubernetes/node/node.go index 9868f3ec2cd..14e606fc887 100644 --- a/metricbeat/module/kubernetes/node/node.go +++ b/metricbeat/module/kubernetes/node/node.go @@ -18,6 +18,8 @@ package node import ( + "fmt" + "github.com/pkg/errors" "github.com/elastic/beats/v7/libbeat/common" @@ -26,6 +28,7 @@ import ( "github.com/elastic/beats/v7/metricbeat/helper" "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/beats/v7/metricbeat/mb/parse" + k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes" "github.com/elastic/beats/v7/metricbeat/module/kubernetes/util" ) @@ -60,6 +63,7 @@ type MetricSet struct { mb.BaseMetricSet http *helper.HTTP enricher util.Enricher + mod k8smod.Module } // New create a new instance of the MetricSet @@ -70,11 +74,15 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { if err != nil { return nil, err } - + mod, ok := base.Module().(k8smod.Module) + if !ok { + return nil, fmt.Errorf("must be child of kubernetes module") + } return &MetricSet{ BaseMetricSet: base, http: http, enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Node{}, false), + mod: mod, }, nil } @@ -84,7 +92,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { m.enricher.Start() - body, err := m.http.FetchContent() + body, err := m.mod.GetKubeletStats(m.http) if err != nil { return errors.Wrap(err, "error doing HTTP request to fetch 'node' Metricset data") diff --git a/metricbeat/module/kubernetes/pod/pod.go b/metricbeat/module/kubernetes/pod/pod.go index 4c47cb6863c..d9cabf5a414 100644 --- a/metricbeat/module/kubernetes/pod/pod.go +++ b/metricbeat/module/kubernetes/pod/pod.go @@ -18,6 +18,8 @@ package pod import ( + "fmt" + "github.com/pkg/errors" "github.com/elastic/beats/v7/libbeat/common/kubernetes" @@ -25,6 +27,7 @@ import ( "github.com/elastic/beats/v7/metricbeat/helper" "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/beats/v7/metricbeat/mb/parse" + k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes" "github.com/elastic/beats/v7/metricbeat/module/kubernetes/util" ) @@ -59,6 +62,7 @@ type MetricSet struct { mb.BaseMetricSet http *helper.HTTP enricher util.Enricher + mod k8smod.Module } // New create a new instance of the MetricSet @@ -69,11 +73,15 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { if err != nil { return nil, err } - + mod, ok := base.Module().(k8smod.Module) + if !ok { + return nil, fmt.Errorf("must be child of kubernetes module") + } return &MetricSet{ BaseMetricSet: base, http: http, enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Pod{}, true), + mod: mod, }, nil } @@ -83,9 +91,9 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { m.enricher.Start() - body, err := m.http.FetchContent() + body, err := m.mod.GetKubeletStats(m.http) if err != nil { - return errors.Wrap(err, "error doing HTTP request to fetch 'pod' Metricset data") + return errors.Wrap(err, "error fetching shared data for 'pod' Metricset") } events, err := eventMapping(body, util.PerfMetrics) diff --git a/metricbeat/module/kubernetes/state_container/state_container.go b/metricbeat/module/kubernetes/state_container/state_container.go index d2bab6fc216..4e6c416fca3 100644 --- a/metricbeat/module/kubernetes/state_container/state_container.go +++ b/metricbeat/module/kubernetes/state_container/state_container.go @@ -120,7 +120,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { m.enricher.Start() - families, err := m.mod.GetSharedFamilies(m.prometheus) + families, err := m.mod.GetStateMetricsFamilies(m.prometheus) if err != nil { return errors.Wrap(err, "error getting families") } diff --git a/metricbeat/module/kubernetes/state_cronjob/state_cronjob.go b/metricbeat/module/kubernetes/state_cronjob/state_cronjob.go index 809a9b82f29..272dc58aca6 100644 --- a/metricbeat/module/kubernetes/state_cronjob/state_cronjob.go +++ b/metricbeat/module/kubernetes/state_cronjob/state_cronjob.go @@ -87,7 +87,7 @@ func NewCronJobMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) { // // Copied from other kube state metrics. func (m *CronJobMetricSet) Fetch(reporter mb.ReporterV2) error { - families, err := m.mod.GetSharedFamilies(m.prometheus) + families, err := m.mod.GetStateMetricsFamilies(m.prometheus) if err != nil { return errors.Wrap(err, "error getting family metrics") } diff --git a/metricbeat/module/kubernetes/state_daemonset/state_daemonset.go b/metricbeat/module/kubernetes/state_daemonset/state_daemonset.go index 484d42dd8c2..7a97692f120 100644 --- a/metricbeat/module/kubernetes/state_daemonset/state_daemonset.go +++ b/metricbeat/module/kubernetes/state_daemonset/state_daemonset.go @@ -101,7 +101,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { func (m *MetricSet) Fetch(reporter mb.ReporterV2) { m.enricher.Start() - families, err := m.mod.GetSharedFamilies(m.prometheus) + families, err := m.mod.GetStateMetricsFamilies(m.prometheus) if err != nil { m.Logger().Error(err) reporter.Error(err) diff --git a/metricbeat/module/kubernetes/state_deployment/state_deployment.go b/metricbeat/module/kubernetes/state_deployment/state_deployment.go index 64704e09d5d..fecad904c72 100644 --- a/metricbeat/module/kubernetes/state_deployment/state_deployment.go +++ b/metricbeat/module/kubernetes/state_deployment/state_deployment.go @@ -102,7 +102,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { func (m *MetricSet) Fetch(reporter mb.ReporterV2) { m.enricher.Start() - families, err := m.mod.GetSharedFamilies(m.prometheus) + families, err := m.mod.GetStateMetricsFamilies(m.prometheus) if err != nil { m.Logger().Error(err) reporter.Error(err) diff --git a/metricbeat/module/kubernetes/state_node/state_node.go b/metricbeat/module/kubernetes/state_node/state_node.go index 4a31b2170a8..3a14dce8357 100644 --- a/metricbeat/module/kubernetes/state_node/state_node.go +++ b/metricbeat/module/kubernetes/state_node/state_node.go @@ -113,7 +113,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { m.enricher.Start() - families, err := m.mod.GetSharedFamilies(m.prometheus) + families, err := m.mod.GetStateMetricsFamilies(m.prometheus) if err != nil { return errors.Wrap(err, "error doing HTTP request to fetch 'state_node' Metricset data") } diff --git a/metricbeat/module/kubernetes/state_persistentvolume/state_persistentvolume.go b/metricbeat/module/kubernetes/state_persistentvolume/state_persistentvolume.go index f61172d3b39..90847c63137 100644 --- a/metricbeat/module/kubernetes/state_persistentvolume/state_persistentvolume.go +++ b/metricbeat/module/kubernetes/state_persistentvolume/state_persistentvolume.go @@ -77,7 +77,7 @@ func NewPersistentVolumeMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) { // Fetch prometheus metrics and treats those prefixed by mb.ModuleDataKey as // module rooted fields at the event that gets reported func (m *PersistentVolumeMetricSet) Fetch(reporter mb.ReporterV2) { - families, err := m.mod.GetSharedFamilies(m.prometheus) + families, err := m.mod.GetStateMetricsFamilies(m.prometheus) if err != nil { m.Logger().Error(err) reporter.Error(err) diff --git a/metricbeat/module/kubernetes/state_persistentvolumeclaim/state_persistentvolumeclaim.go b/metricbeat/module/kubernetes/state_persistentvolumeclaim/state_persistentvolumeclaim.go index 54d26c57a3a..3cf414c498a 100644 --- a/metricbeat/module/kubernetes/state_persistentvolumeclaim/state_persistentvolumeclaim.go +++ b/metricbeat/module/kubernetes/state_persistentvolumeclaim/state_persistentvolumeclaim.go @@ -82,7 +82,7 @@ func NewpersistentvolumeclaimMetricSet(base mb.BaseMetricSet) (mb.MetricSet, err // module rooted fields at the event that gets reported func (m *persistentvolumeclaimMetricSet) Fetch(reporter mb.ReporterV2) error { - families, err := m.mod.GetSharedFamilies(m.prometheus) + families, err := m.mod.GetStateMetricsFamilies(m.prometheus) if err != nil { return err } diff --git a/metricbeat/module/kubernetes/state_pod/state_pod.go b/metricbeat/module/kubernetes/state_pod/state_pod.go index 4be4aea5023..89d7325537a 100644 --- a/metricbeat/module/kubernetes/state_pod/state_pod.go +++ b/metricbeat/module/kubernetes/state_pod/state_pod.go @@ -104,7 +104,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { func (m *MetricSet) Fetch(reporter mb.ReporterV2) { m.enricher.Start() - families, err := m.mod.GetSharedFamilies(m.prometheus) + families, err := m.mod.GetStateMetricsFamilies(m.prometheus) if err != nil { m.Logger().Error(err) reporter.Error(err) diff --git a/metricbeat/module/kubernetes/state_replicaset/state_replicaset.go b/metricbeat/module/kubernetes/state_replicaset/state_replicaset.go index ee955f691a3..72f5e50fb33 100644 --- a/metricbeat/module/kubernetes/state_replicaset/state_replicaset.go +++ b/metricbeat/module/kubernetes/state_replicaset/state_replicaset.go @@ -102,7 +102,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { func (m *MetricSet) Fetch(reporter mb.ReporterV2) { m.enricher.Start() - families, err := m.mod.GetSharedFamilies(m.prometheus) + families, err := m.mod.GetStateMetricsFamilies(m.prometheus) if err != nil { m.Logger().Error(err) reporter.Error(err) diff --git a/metricbeat/module/kubernetes/state_resourcequota/state_resourcequota.go b/metricbeat/module/kubernetes/state_resourcequota/state_resourcequota.go index ccb3d94a935..eb8bc6ddf35 100644 --- a/metricbeat/module/kubernetes/state_resourcequota/state_resourcequota.go +++ b/metricbeat/module/kubernetes/state_resourcequota/state_resourcequota.go @@ -76,7 +76,7 @@ func NewResourceQuotaMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) { // Fetch prometheus metrics and treats those prefixed by mb.ModuleDataKey as // module rooted fields at the event that gets reported func (m *ResourceQuotaMetricSet) Fetch(reporter mb.ReporterV2) { - families, err := m.mod.GetSharedFamilies(m.prometheus) + families, err := m.mod.GetStateMetricsFamilies(m.prometheus) if err != nil { m.Logger().Error(err) reporter.Error(err) diff --git a/metricbeat/module/kubernetes/state_service/state_service.go b/metricbeat/module/kubernetes/state_service/state_service.go index a656d21e86c..efc489da8e0 100644 --- a/metricbeat/module/kubernetes/state_service/state_service.go +++ b/metricbeat/module/kubernetes/state_service/state_service.go @@ -95,7 +95,7 @@ func NewServiceMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) { func (m *ServiceMetricSet) Fetch(reporter mb.ReporterV2) { m.enricher.Start() - families, err := m.mod.GetSharedFamilies(m.prometheus) + families, err := m.mod.GetStateMetricsFamilies(m.prometheus) if err != nil { m.Logger().Error(err) reporter.Error(err) diff --git a/metricbeat/module/kubernetes/state_statefulset/state_statefulset.go b/metricbeat/module/kubernetes/state_statefulset/state_statefulset.go index 9641b62df51..eb59e4127f4 100644 --- a/metricbeat/module/kubernetes/state_statefulset/state_statefulset.go +++ b/metricbeat/module/kubernetes/state_statefulset/state_statefulset.go @@ -101,7 +101,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { func (m *MetricSet) Fetch(reporter mb.ReporterV2) { m.enricher.Start() - families, err := m.mod.GetSharedFamilies(m.prometheus) + families, err := m.mod.GetStateMetricsFamilies(m.prometheus) if err != nil { m.Logger().Error(err) reporter.Error(err) diff --git a/metricbeat/module/kubernetes/state_storageclass/state_storageclass.go b/metricbeat/module/kubernetes/state_storageclass/state_storageclass.go index 85e598d388d..820a052a05a 100644 --- a/metricbeat/module/kubernetes/state_storageclass/state_storageclass.go +++ b/metricbeat/module/kubernetes/state_storageclass/state_storageclass.go @@ -78,7 +78,7 @@ func NewStorageClassMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) { // Fetch prometheus metrics and treats those prefixed by mb.ModuleDataKey as // module rooted fields at the event that gets reported func (m *StorageClassMetricSet) Fetch(reporter mb.ReporterV2) { - families, err := m.mod.GetSharedFamilies(m.prometheus) + families, err := m.mod.GetStateMetricsFamilies(m.prometheus) if err != nil { m.Logger().Error(err) reporter.Error(err) diff --git a/metricbeat/module/kubernetes/system/system.go b/metricbeat/module/kubernetes/system/system.go index b10c9c4afb3..80e520d22a7 100644 --- a/metricbeat/module/kubernetes/system/system.go +++ b/metricbeat/module/kubernetes/system/system.go @@ -18,12 +18,15 @@ package system import ( + "fmt" + "github.com/pkg/errors" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/metricbeat/helper" "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/beats/v7/metricbeat/mb/parse" + k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes" ) const ( @@ -56,6 +59,7 @@ func init() { type MetricSet struct { mb.BaseMetricSet http *helper.HTTP + mod k8smod.Module } // New create a new instance of the MetricSet @@ -66,9 +70,14 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { if err != nil { return nil, err } + mod, ok := base.Module().(k8smod.Module) + if !ok { + return nil, fmt.Errorf("must be child of kubernetes module") + } return &MetricSet{ BaseMetricSet: base, http: http, + mod: mod, }, nil } @@ -76,7 +85,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // format. It publishes the event which is then forwarded to the output. In case // of an error set the Error field of mb.Event or simply call report.Error(). func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { - body, err := m.http.FetchContent() + body, err := m.mod.GetKubeletStats(m.http) if err != nil { return errors.Wrap(err, "error doing HTTP request to fetch 'system' Metricset data") } diff --git a/metricbeat/module/kubernetes/volume/volume.go b/metricbeat/module/kubernetes/volume/volume.go index 102d8edfabc..77b446a56f1 100644 --- a/metricbeat/module/kubernetes/volume/volume.go +++ b/metricbeat/module/kubernetes/volume/volume.go @@ -18,12 +18,15 @@ package volume import ( + "fmt" + "github.com/pkg/errors" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/metricbeat/helper" "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/beats/v7/metricbeat/mb/parse" + k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes" ) const ( @@ -56,6 +59,7 @@ func init() { type MetricSet struct { mb.BaseMetricSet http *helper.HTTP + mod k8smod.Module } // New create a new instance of the MetricSet @@ -66,9 +70,14 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { if err != nil { return nil, err } + mod, ok := base.Module().(k8smod.Module) + if !ok { + return nil, fmt.Errorf("must be child of kubernetes module") + } return &MetricSet{ BaseMetricSet: base, http: http, + mod: mod, }, nil } @@ -76,7 +85,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // format. It publishes the event which is then forwarded to the output. In case // of an error set the Error field of mb.Event or simply call report.Error(). func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { - body, err := m.http.FetchContent() + body, err := m.mod.GetKubeletStats(m.http) if err != nil { return errors.Wrap(err, "error doing HTTP request to fetch 'volume' Metricset data") }