diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 54ad2fa21ec..d85b8f1529c 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -989,6 +989,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Apache: convert status.total_kbytes to status.total_bytes in fleet mode. {pull}23022[23022] - Release MSSQL as GA {pull}23146[23146] - Add support for SASL/SCRAM authentication to the Kafka module. {pull}24810[24810] +- Refactor state_* metricsets to share response from endpoint. {pull}25640[25640] - Add server id to zookeeper events. {pull}25550[25550] - Add additional network metrics to docker/network {pull}25354[25354] diff --git a/metricbeat/helper/prometheus/prometheus.go b/metricbeat/helper/prometheus/prometheus.go index 3568310466e..591ef1c212b 100644 --- a/metricbeat/helper/prometheus/prometheus.go +++ b/metricbeat/helper/prometheus/prometheus.go @@ -44,6 +44,8 @@ type Prometheus interface { GetProcessedMetrics(mapping *MetricsMapping) ([]common.MapStr, error) + ProcessMetrics(families []*dto.MetricFamily, mapping *MetricsMapping) ([]common.MapStr, error) + ReportProcessedMetrics(mapping *MetricsMapping, r mb.ReporterV2) error } @@ -139,11 +141,7 @@ type MetricsMapping struct { ExtraFields map[string]string } -func (p *prometheus) GetProcessedMetrics(mapping *MetricsMapping) ([]common.MapStr, error) { - families, err := p.GetFamilies() - if err != nil { - return nil, err - } +func (p *prometheus) ProcessMetrics(families []*dto.MetricFamily, mapping *MetricsMapping) ([]common.MapStr, error) { eventsMap := map[string]common.MapStr{} infoMetrics := []*infoMetricData{} @@ -260,6 +258,14 @@ func (p *prometheus) GetProcessedMetrics(mapping *MetricsMapping) ([]common.MapS return events, nil } +func (p *prometheus) GetProcessedMetrics(mapping *MetricsMapping) ([]common.MapStr, error) { + families, err := p.GetFamilies() + if err != nil { + return nil, err + } + return p.ProcessMetrics(families, mapping) +} + // infoMetricData keeps data about an infoMetric type infoMetricData struct { Labels common.MapStr diff --git a/metricbeat/module/kubernetes/kubernetes.go b/metricbeat/module/kubernetes/kubernetes.go new file mode 100644 index 00000000000..53cd037f5e0 --- /dev/null +++ b/metricbeat/module/kubernetes/kubernetes.go @@ -0,0 +1,113 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package kubernetes + +import ( + "sync" + "time" + + "github.com/mitchellh/hashstructure" + "github.com/pkg/errors" + dto "github.com/prometheus/client_model/go" + + p "github.com/elastic/beats/v7/metricbeat/helper/prometheus" + "github.com/elastic/beats/v7/metricbeat/mb" +) + +func init() { + // Register the ModuleFactory function for the "kubernetes" module. + if err := mb.Registry.AddModule("kubernetes", ModuleBuilder()); err != nil { + panic(err) + } +} + +type Module interface { + mb.Module + GetSharedFamilies(prometheus p.Prometheus) ([]*dto.MetricFamily, error) +} + +type familiesCache struct { + sharedFamilies []*dto.MetricFamily + lastFetchErr error + lastFetchTimestamp time.Time +} + +type kubeStateMetricsCache struct { + cacheMap map[uint64]*familiesCache + lock sync.Mutex +} + +func (c *kubeStateMetricsCache) getCacheMapEntry(hash uint64) *familiesCache { + c.lock.Lock() + defer c.lock.Unlock() + if _, ok := c.cacheMap[hash]; !ok { + c.cacheMap[hash] = &familiesCache{} + } + return c.cacheMap[hash] +} + +type module struct { + mb.BaseModule + + kubeStateMetricsCache *kubeStateMetricsCache + familiesCache *familiesCache +} + +func ModuleBuilder() func(base mb.BaseModule) (mb.Module, error) { + kubeStateMetricsCache := &kubeStateMetricsCache{ + cacheMap: make(map[uint64]*familiesCache), + } + return func(base mb.BaseModule) (mb.Module, error) { + hash, err := generateCacheHash(base.Config().Hosts) + if err != nil { + return nil, errors.Wrap(err, "error generating cache hash for kubeStateMetricsCache") + } + // NOTE: These entries will be never removed, this can be a leak if + // metricbeat is used to monitor clusters dynamically created. + // (https://github.com/elastic/beats/pull/25640#discussion_r633395213) + familiesCache := kubeStateMetricsCache.getCacheMapEntry(hash) + m := module{ + BaseModule: base, + kubeStateMetricsCache: kubeStateMetricsCache, + familiesCache: familiesCache, + } + return &m, nil + } +} + +func (m *module) GetSharedFamilies(prometheus p.Prometheus) ([]*dto.MetricFamily, error) { + m.kubeStateMetricsCache.lock.Lock() + defer m.kubeStateMetricsCache.lock.Unlock() + + now := time.Now() + + if m.familiesCache.lastFetchTimestamp.IsZero() || now.Sub(m.familiesCache.lastFetchTimestamp) > m.Config().Period { + m.familiesCache.sharedFamilies, m.familiesCache.lastFetchErr = prometheus.GetFamilies() + m.familiesCache.lastFetchTimestamp = now + } + + return m.familiesCache.sharedFamilies, m.familiesCache.lastFetchErr +} + +func generateCacheHash(host []string) (uint64, error) { + id, err := hashstructure.Hash(host, nil) + if err != nil { + return 0, err + } + return id, nil +} diff --git a/metricbeat/module/kubernetes/state_container/state_container.go b/metricbeat/module/kubernetes/state_container/state_container.go index a47e42158a8..d2bab6fc216 100644 --- a/metricbeat/module/kubernetes/state_container/state_container.go +++ b/metricbeat/module/kubernetes/state_container/state_container.go @@ -18,6 +18,7 @@ package state_container import ( + "fmt" "strings" "github.com/pkg/errors" @@ -26,6 +27,7 @@ import ( p "github.com/elastic/beats/v7/metricbeat/helper/prometheus" "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/beats/v7/metricbeat/mb/parse" + k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes" "github.com/elastic/beats/v7/metricbeat/module/kubernetes/util" ) @@ -89,6 +91,7 @@ type MetricSet struct { mb.BaseMetricSet prometheus p.Prometheus enricher util.Enricher + mod k8smod.Module } // New create a new instance of the MetricSet @@ -99,10 +102,15 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { if err != nil { return nil, err } + mod, ok := base.Module().(k8smod.Module) + if !ok { + return nil, fmt.Errorf("must be child of kubernetes module") + } return &MetricSet{ BaseMetricSet: base, prometheus: prometheus, enricher: util.NewContainerMetadataEnricher(base, false), + mod: mod, }, nil } @@ -112,7 +120,11 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { m.enricher.Start() - events, err := m.prometheus.GetProcessedMetrics(mapping) + families, err := m.mod.GetSharedFamilies(m.prometheus) + if err != nil { + return errors.Wrap(err, "error getting families") + } + events, err := m.prometheus.ProcessMetrics(families, mapping) if err != nil { return errors.Wrap(err, "error getting event") } diff --git a/metricbeat/module/kubernetes/state_cronjob/state_cronjob.go b/metricbeat/module/kubernetes/state_cronjob/state_cronjob.go index a0fc4f120c8..809a9b82f29 100644 --- a/metricbeat/module/kubernetes/state_cronjob/state_cronjob.go +++ b/metricbeat/module/kubernetes/state_cronjob/state_cronjob.go @@ -18,11 +18,14 @@ package state_cronjob import ( + "fmt" + "github.com/pkg/errors" "github.com/elastic/beats/v7/libbeat/common" p "github.com/elastic/beats/v7/metricbeat/helper/prometheus" "github.com/elastic/beats/v7/metricbeat/mb" + k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes" ) func init() { @@ -40,6 +43,7 @@ type CronJobMetricSet struct { mb.BaseMetricSet prometheus p.Prometheus mapping *p.MetricsMapping + mod k8smod.Module } // NewCronJobMetricSet returns a prometheus based metricset for CronJobs @@ -49,9 +53,15 @@ func NewCronJobMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) { return nil, err } + mod, ok := base.Module().(k8smod.Module) + if !ok { + return nil, fmt.Errorf("must be child of kubernetes module") + } + return &CronJobMetricSet{ BaseMetricSet: base, prometheus: prometheus, + mod: mod, mapping: &p.MetricsMapping{ Metrics: map[string]p.MetricMap{ "kube_cronjob_info": p.InfoMetric(), @@ -77,7 +87,11 @@ func NewCronJobMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) { // // Copied from other kube state metrics. func (m *CronJobMetricSet) Fetch(reporter mb.ReporterV2) error { - events, err := m.prometheus.GetProcessedMetrics(m.mapping) + families, err := m.mod.GetSharedFamilies(m.prometheus) + if err != nil { + return errors.Wrap(err, "error getting family metrics") + } + events, err := m.prometheus.ProcessMetrics(families, m.mapping) if err != nil { return errors.Wrap(err, "error getting metrics") } diff --git a/metricbeat/module/kubernetes/state_daemonset/state_daemonset.go b/metricbeat/module/kubernetes/state_daemonset/state_daemonset.go index 32c4f84d4a3..484d42dd8c2 100644 --- a/metricbeat/module/kubernetes/state_daemonset/state_daemonset.go +++ b/metricbeat/module/kubernetes/state_daemonset/state_daemonset.go @@ -18,11 +18,14 @@ package state_daemonset import ( + "fmt" + "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/kubernetes" p "github.com/elastic/beats/v7/metricbeat/helper/prometheus" "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/beats/v7/metricbeat/mb/parse" + k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes" "github.com/elastic/beats/v7/metricbeat/module/kubernetes/util" ) @@ -69,6 +72,7 @@ type MetricSet struct { mb.BaseMetricSet prometheus p.Prometheus enricher util.Enricher + mod k8smod.Module } // New create a new instance of the MetricSet @@ -79,10 +83,15 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { if err != nil { return nil, err } + mod, ok := base.Module().(k8smod.Module) + if !ok { + return nil, fmt.Errorf("must be child of kubernetes module") + } return &MetricSet{ BaseMetricSet: base, prometheus: prometheus, enricher: util.NewResourceMetadataEnricher(base, &kubernetes.ReplicaSet{}, false), + mod: mod, }, nil } @@ -92,7 +101,13 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { func (m *MetricSet) Fetch(reporter mb.ReporterV2) { m.enricher.Start() - events, err := m.prometheus.GetProcessedMetrics(mapping) + families, err := m.mod.GetSharedFamilies(m.prometheus) + if err != nil { + m.Logger().Error(err) + reporter.Error(err) + return + } + events, err := m.prometheus.ProcessMetrics(families, mapping) if err != nil { m.Logger().Error(err) reporter.Error(err) diff --git a/metricbeat/module/kubernetes/state_deployment/state_deployment.go b/metricbeat/module/kubernetes/state_deployment/state_deployment.go index ded9115d1ce..64704e09d5d 100644 --- a/metricbeat/module/kubernetes/state_deployment/state_deployment.go +++ b/metricbeat/module/kubernetes/state_deployment/state_deployment.go @@ -18,11 +18,14 @@ package state_deployment import ( + "fmt" + "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/kubernetes" p "github.com/elastic/beats/v7/metricbeat/helper/prometheus" "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/beats/v7/metricbeat/mb/parse" + k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes" "github.com/elastic/beats/v7/metricbeat/module/kubernetes/util" ) @@ -70,6 +73,7 @@ type MetricSet struct { mb.BaseMetricSet prometheus p.Prometheus enricher util.Enricher + mod k8smod.Module } // New create a new instance of the MetricSet @@ -80,10 +84,15 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { if err != nil { return nil, err } + mod, ok := base.Module().(k8smod.Module) + if !ok { + return nil, fmt.Errorf("must be child of kubernetes module") + } return &MetricSet{ BaseMetricSet: base, prometheus: prometheus, enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Deployment{}, false), + mod: mod, }, nil } @@ -93,7 +102,13 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { func (m *MetricSet) Fetch(reporter mb.ReporterV2) { m.enricher.Start() - events, err := m.prometheus.GetProcessedMetrics(mapping) + families, err := m.mod.GetSharedFamilies(m.prometheus) + if err != nil { + m.Logger().Error(err) + reporter.Error(err) + return + } + events, err := m.prometheus.ProcessMetrics(families, mapping) if err != nil { m.Logger().Error(err) reporter.Error(err) diff --git a/metricbeat/module/kubernetes/state_node/state_node.go b/metricbeat/module/kubernetes/state_node/state_node.go index f2a1a6b965e..4a31b2170a8 100644 --- a/metricbeat/module/kubernetes/state_node/state_node.go +++ b/metricbeat/module/kubernetes/state_node/state_node.go @@ -18,12 +18,15 @@ package state_node import ( + "fmt" + "github.com/pkg/errors" "github.com/elastic/beats/v7/libbeat/common/kubernetes" p "github.com/elastic/beats/v7/metricbeat/helper/prometheus" "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/beats/v7/metricbeat/mb/parse" + k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes" "github.com/elastic/beats/v7/metricbeat/module/kubernetes/util" ) @@ -81,6 +84,7 @@ type MetricSet struct { mb.BaseMetricSet prometheus p.Prometheus enricher util.Enricher + mod k8smod.Module } // New create a new instance of the MetricSet @@ -91,11 +95,15 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { if err != nil { return nil, err } - + mod, ok := base.Module().(k8smod.Module) + if !ok { + return nil, fmt.Errorf("must be child of kubernetes module") + } return &MetricSet{ BaseMetricSet: base, prometheus: prometheus, enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Node{}, false), + mod: mod, }, nil } @@ -105,7 +113,11 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { m.enricher.Start() - events, err := m.prometheus.GetProcessedMetrics(mapping) + families, err := m.mod.GetSharedFamilies(m.prometheus) + if err != nil { + return errors.Wrap(err, "error doing HTTP request to fetch 'state_node' Metricset data") + } + events, err := m.prometheus.ProcessMetrics(families, mapping) if err != nil { return errors.Wrap(err, "error doing HTTP request to fetch 'state_node' Metricset data") } diff --git a/metricbeat/module/kubernetes/state_persistentvolume/state_persistentvolume.go b/metricbeat/module/kubernetes/state_persistentvolume/state_persistentvolume.go index 6505e1e0be6..f61172d3b39 100644 --- a/metricbeat/module/kubernetes/state_persistentvolume/state_persistentvolume.go +++ b/metricbeat/module/kubernetes/state_persistentvolume/state_persistentvolume.go @@ -18,8 +18,11 @@ package state_persistentvolume import ( + "fmt" + p "github.com/elastic/beats/v7/metricbeat/helper/prometheus" "github.com/elastic/beats/v7/metricbeat/mb" + k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes" ) func init() { @@ -34,6 +37,7 @@ type PersistentVolumeMetricSet struct { mb.BaseMetricSet prometheus p.Prometheus mapping *p.MetricsMapping + mod k8smod.Module } // NewPersistentVolumeMetricSet returns a prometheus based metricset for Persistent Volumes @@ -42,10 +46,14 @@ func NewPersistentVolumeMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) { if err != nil { return nil, err } - + mod, ok := base.Module().(k8smod.Module) + if !ok { + return nil, fmt.Errorf("must be child of kubernetes module") + } return &PersistentVolumeMetricSet{ BaseMetricSet: base, prometheus: prometheus, + mod: mod, mapping: &p.MetricsMapping{ Metrics: map[string]p.MetricMap{ "kube_persistentvolume_capacity_bytes": p.Metric("capacity.bytes"), @@ -69,7 +77,13 @@ func NewPersistentVolumeMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) { // Fetch prometheus metrics and treats those prefixed by mb.ModuleDataKey as // module rooted fields at the event that gets reported func (m *PersistentVolumeMetricSet) Fetch(reporter mb.ReporterV2) { - events, err := m.prometheus.GetProcessedMetrics(m.mapping) + families, err := m.mod.GetSharedFamilies(m.prometheus) + if err != nil { + m.Logger().Error(err) + reporter.Error(err) + return + } + events, err := m.prometheus.ProcessMetrics(families, m.mapping) if err != nil { m.Logger().Error(err) reporter.Error(err) diff --git a/metricbeat/module/kubernetes/state_persistentvolumeclaim/state_persistentvolumeclaim.go b/metricbeat/module/kubernetes/state_persistentvolumeclaim/state_persistentvolumeclaim.go index b7685089a83..54d26c57a3a 100644 --- a/metricbeat/module/kubernetes/state_persistentvolumeclaim/state_persistentvolumeclaim.go +++ b/metricbeat/module/kubernetes/state_persistentvolumeclaim/state_persistentvolumeclaim.go @@ -18,8 +18,11 @@ package state_persistentvolumeclaim import ( + "fmt" + p "github.com/elastic/beats/v7/metricbeat/helper/prometheus" "github.com/elastic/beats/v7/metricbeat/mb" + k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes" ) func init() { @@ -34,6 +37,7 @@ type persistentvolumeclaimMetricSet struct { mb.BaseMetricSet prometheus p.Prometheus mapping *p.MetricsMapping + mod k8smod.Module } // NewpersistentvolumeclaimMetricSet returns a prometheus based metricset for Persistent Volumes @@ -42,10 +46,14 @@ func NewpersistentvolumeclaimMetricSet(base mb.BaseMetricSet) (mb.MetricSet, err if err != nil { return nil, err } - + mod, ok := base.Module().(k8smod.Module) + if !ok { + return nil, fmt.Errorf("must be child of kubernetes module") + } return &persistentvolumeclaimMetricSet{ BaseMetricSet: base, prometheus: prometheus, + mod: mod, mapping: &p.MetricsMapping{ Metrics: map[string]p.MetricMap{ @@ -73,7 +81,12 @@ func NewpersistentvolumeclaimMetricSet(base mb.BaseMetricSet) (mb.MetricSet, err // Fetch prometheus metrics and treats those prefixed by mb.ModuleDataKey as // module rooted fields at the event that gets reported func (m *persistentvolumeclaimMetricSet) Fetch(reporter mb.ReporterV2) error { - events, err := m.prometheus.GetProcessedMetrics(m.mapping) + + families, err := m.mod.GetSharedFamilies(m.prometheus) + if err != nil { + return err + } + events, err := m.prometheus.ProcessMetrics(families, m.mapping) if err != nil { return err } diff --git a/metricbeat/module/kubernetes/state_pod/state_pod.go b/metricbeat/module/kubernetes/state_pod/state_pod.go index 7a531e0a5e8..4be4aea5023 100644 --- a/metricbeat/module/kubernetes/state_pod/state_pod.go +++ b/metricbeat/module/kubernetes/state_pod/state_pod.go @@ -18,11 +18,14 @@ package state_pod import ( + "fmt" + "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/kubernetes" p "github.com/elastic/beats/v7/metricbeat/helper/prometheus" "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/beats/v7/metricbeat/mb/parse" + k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes" "github.com/elastic/beats/v7/metricbeat/module/kubernetes/util" ) @@ -72,6 +75,7 @@ type MetricSet struct { mb.BaseMetricSet prometheus p.Prometheus enricher util.Enricher + mod k8smod.Module } // New create a new instance of the MetricSet @@ -82,11 +86,15 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { if err != nil { return nil, err } - + mod, ok := base.Module().(k8smod.Module) + if !ok { + return nil, fmt.Errorf("must be child of kubernetes module") + } return &MetricSet{ BaseMetricSet: base, prometheus: prometheus, enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Pod{}, false), + mod: mod, }, nil } @@ -96,7 +104,13 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { func (m *MetricSet) Fetch(reporter mb.ReporterV2) { m.enricher.Start() - events, err := m.prometheus.GetProcessedMetrics(mapping) + families, err := m.mod.GetSharedFamilies(m.prometheus) + if err != nil { + m.Logger().Error(err) + reporter.Error(err) + return + } + events, err := m.prometheus.ProcessMetrics(families, mapping) if err != nil { m.Logger().Error(err) reporter.Error(err) diff --git a/metricbeat/module/kubernetes/state_replicaset/state_replicaset.go b/metricbeat/module/kubernetes/state_replicaset/state_replicaset.go index cc24af630bd..ee955f691a3 100644 --- a/metricbeat/module/kubernetes/state_replicaset/state_replicaset.go +++ b/metricbeat/module/kubernetes/state_replicaset/state_replicaset.go @@ -18,11 +18,14 @@ package state_replicaset import ( + "fmt" + "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/kubernetes" p "github.com/elastic/beats/v7/metricbeat/helper/prometheus" "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/beats/v7/metricbeat/mb/parse" + k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes" "github.com/elastic/beats/v7/metricbeat/module/kubernetes/util" ) @@ -70,6 +73,7 @@ type MetricSet struct { mb.BaseMetricSet prometheus p.Prometheus enricher util.Enricher + mod k8smod.Module } // New create a new instance of the MetricSet @@ -80,10 +84,15 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { if err != nil { return nil, err } + mod, ok := base.Module().(k8smod.Module) + if !ok { + return nil, fmt.Errorf("must be child of kubernetes module") + } return &MetricSet{ BaseMetricSet: base, prometheus: prometheus, enricher: util.NewResourceMetadataEnricher(base, &kubernetes.ReplicaSet{}, false), + mod: mod, }, nil } @@ -93,7 +102,13 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { func (m *MetricSet) Fetch(reporter mb.ReporterV2) { m.enricher.Start() - events, err := m.prometheus.GetProcessedMetrics(mapping) + families, err := m.mod.GetSharedFamilies(m.prometheus) + if err != nil { + m.Logger().Error(err) + reporter.Error(err) + return + } + events, err := m.prometheus.ProcessMetrics(families, mapping) if err != nil { m.Logger().Error(err) reporter.Error(err) diff --git a/metricbeat/module/kubernetes/state_resourcequota/state_resourcequota.go b/metricbeat/module/kubernetes/state_resourcequota/state_resourcequota.go index 8901714bf33..ccb3d94a935 100644 --- a/metricbeat/module/kubernetes/state_resourcequota/state_resourcequota.go +++ b/metricbeat/module/kubernetes/state_resourcequota/state_resourcequota.go @@ -18,8 +18,11 @@ package state_resourcequota import ( + "fmt" + p "github.com/elastic/beats/v7/metricbeat/helper/prometheus" "github.com/elastic/beats/v7/metricbeat/mb" + k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes" ) func init() { @@ -37,6 +40,7 @@ type ResourceQuotaMetricSet struct { mb.BaseMetricSet prometheus p.Prometheus mapping *p.MetricsMapping + mod k8smod.Module } // NewResourceQuotaMetricSet returns a prometheus based metricset for ResourceQuotas @@ -45,10 +49,14 @@ func NewResourceQuotaMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) { if err != nil { return nil, err } - + mod, ok := base.Module().(k8smod.Module) + if !ok { + return nil, fmt.Errorf("must be child of kubernetes module") + } return &ResourceQuotaMetricSet{ BaseMetricSet: base, prometheus: prometheus, + mod: mod, mapping: &p.MetricsMapping{ Metrics: map[string]p.MetricMap{ "kube_resourcequota_created": p.Metric("created.sec"), @@ -68,7 +76,13 @@ func NewResourceQuotaMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) { // Fetch prometheus metrics and treats those prefixed by mb.ModuleDataKey as // module rooted fields at the event that gets reported func (m *ResourceQuotaMetricSet) Fetch(reporter mb.ReporterV2) { - events, err := m.prometheus.GetProcessedMetrics(m.mapping) + families, err := m.mod.GetSharedFamilies(m.prometheus) + if err != nil { + m.Logger().Error(err) + reporter.Error(err) + return + } + events, err := m.prometheus.ProcessMetrics(families, m.mapping) if err != nil { m.Logger().Error(err) reporter.Error(err) diff --git a/metricbeat/module/kubernetes/state_service/state_service.go b/metricbeat/module/kubernetes/state_service/state_service.go index 5aef61e2c3a..a656d21e86c 100644 --- a/metricbeat/module/kubernetes/state_service/state_service.go +++ b/metricbeat/module/kubernetes/state_service/state_service.go @@ -18,9 +18,12 @@ package state_service import ( + "fmt" + "github.com/elastic/beats/v7/libbeat/common/kubernetes" p "github.com/elastic/beats/v7/metricbeat/helper/prometheus" "github.com/elastic/beats/v7/metricbeat/mb" + k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes" "github.com/elastic/beats/v7/metricbeat/module/kubernetes/util" ) @@ -40,6 +43,7 @@ type ServiceMetricSet struct { prometheus p.Prometheus mapping *p.MetricsMapping enricher util.Enricher + mod k8smod.Module } // NewServiceMetricSet returns a prometheus based metricset for Services @@ -48,10 +52,14 @@ func NewServiceMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) { if err != nil { return nil, err } - + mod, ok := base.Module().(k8smod.Module) + if !ok { + return nil, fmt.Errorf("must be child of kubernetes module") + } return &ServiceMetricSet{ BaseMetricSet: base, prometheus: prometheus, + mod: mod, mapping: &p.MetricsMapping{ Metrics: map[string]p.MetricMap{ "kube_service_info": p.InfoMetric(), @@ -86,7 +94,14 @@ func NewServiceMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) { // module rooted fields at the event that gets reported func (m *ServiceMetricSet) Fetch(reporter mb.ReporterV2) { m.enricher.Start() - events, err := m.prometheus.GetProcessedMetrics(m.mapping) + + families, err := m.mod.GetSharedFamilies(m.prometheus) + if err != nil { + m.Logger().Error(err) + reporter.Error(err) + return + } + events, err := m.prometheus.ProcessMetrics(families, m.mapping) if err != nil { m.Logger().Error(err) reporter.Error(err) diff --git a/metricbeat/module/kubernetes/state_statefulset/state_statefulset.go b/metricbeat/module/kubernetes/state_statefulset/state_statefulset.go index cadaf4a7238..9641b62df51 100644 --- a/metricbeat/module/kubernetes/state_statefulset/state_statefulset.go +++ b/metricbeat/module/kubernetes/state_statefulset/state_statefulset.go @@ -18,11 +18,14 @@ package state_statefulset import ( + "fmt" + "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/kubernetes" p "github.com/elastic/beats/v7/metricbeat/helper/prometheus" "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/beats/v7/metricbeat/mb/parse" + k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes" "github.com/elastic/beats/v7/metricbeat/module/kubernetes/util" ) @@ -69,6 +72,7 @@ type MetricSet struct { mb.BaseMetricSet prometheus p.Prometheus enricher util.Enricher + mod k8smod.Module } // New create a new instance of the MetricSet @@ -79,10 +83,15 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { if err != nil { return nil, err } + mod, ok := base.Module().(k8smod.Module) + if !ok { + return nil, fmt.Errorf("must be child of kubernetes module") + } return &MetricSet{ BaseMetricSet: base, prometheus: prometheus, enricher: util.NewResourceMetadataEnricher(base, &kubernetes.StatefulSet{}, false), + mod: mod, }, nil } @@ -92,7 +101,13 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { func (m *MetricSet) Fetch(reporter mb.ReporterV2) { m.enricher.Start() - events, err := m.prometheus.GetProcessedMetrics(mapping) + families, err := m.mod.GetSharedFamilies(m.prometheus) + if err != nil { + m.Logger().Error(err) + reporter.Error(err) + return + } + events, err := m.prometheus.ProcessMetrics(families, mapping) if err != nil { m.Logger().Error(err) reporter.Error(err) diff --git a/metricbeat/module/kubernetes/state_storageclass/state_storageclass.go b/metricbeat/module/kubernetes/state_storageclass/state_storageclass.go index 98db701cfeb..85e598d388d 100644 --- a/metricbeat/module/kubernetes/state_storageclass/state_storageclass.go +++ b/metricbeat/module/kubernetes/state_storageclass/state_storageclass.go @@ -18,8 +18,11 @@ package state_storageclass import ( + "fmt" + p "github.com/elastic/beats/v7/metricbeat/helper/prometheus" "github.com/elastic/beats/v7/metricbeat/mb" + k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes" ) func init() { @@ -34,6 +37,7 @@ type StorageClassMetricSet struct { mb.BaseMetricSet prometheus p.Prometheus mapping *p.MetricsMapping + mod k8smod.Module } // NewStorageClassMetricSet returns a prometheus based metricset for Storage classes @@ -42,10 +46,14 @@ func NewStorageClassMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) { if err != nil { return nil, err } - + mod, ok := base.Module().(k8smod.Module) + if !ok { + return nil, fmt.Errorf("must be child of kubernetes module") + } return &StorageClassMetricSet{ BaseMetricSet: base, prometheus: prometheus, + mod: mod, mapping: &p.MetricsMapping{ Metrics: map[string]p.MetricMap{ "kube_storageclass_info": p.InfoMetric(), @@ -70,7 +78,13 @@ func NewStorageClassMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) { // Fetch prometheus metrics and treats those prefixed by mb.ModuleDataKey as // module rooted fields at the event that gets reported func (m *StorageClassMetricSet) Fetch(reporter mb.ReporterV2) { - events, err := m.prometheus.GetProcessedMetrics(m.mapping) + families, err := m.mod.GetSharedFamilies(m.prometheus) + if err != nil { + m.Logger().Error(err) + reporter.Error(err) + return + } + events, err := m.prometheus.ProcessMetrics(families, m.mapping) if err != nil { m.Logger().Error(err) reporter.Error(err)