Skip to content

Commit

Permalink
Refactor kubelet metricsets to share response from endpoint (elastic#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrsMark authored May 28, 2021
1 parent 0ec13ab commit a39dd00
Show file tree
Hide file tree
Showing 20 changed files with 121 additions and 35 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -937,6 +937,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Refactor state_* metricsets to share response from endpoint. {pull}25640[25640]
- Add server id to zookeeper events. {pull}25550[25550]
- Add additional network metrics to docker/network {pull}25354[25354]
- Reduce number of requests done by kubernetes metricsets to kubelet. {pull}25782[25782]

*Packetbeat*

Expand Down
11 changes: 10 additions & 1 deletion metricbeat/module/kubernetes/container/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@
package container

import (
"fmt"

"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/metricbeat/helper"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/mb/parse"
k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes"
"github.com/elastic/beats/v7/metricbeat/module/kubernetes/util"
)

Expand Down Expand Up @@ -58,6 +61,7 @@ type MetricSet struct {
mb.BaseMetricSet
http *helper.HTTP
enricher util.Enricher
mod k8smod.Module
}

// New create a new instance of the MetricSet
Expand All @@ -68,10 +72,15 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
if err != nil {
return nil, err
}
mod, ok := base.Module().(k8smod.Module)
if !ok {
return nil, fmt.Errorf("must be child of kubernetes module")
}
return &MetricSet{
BaseMetricSet: base,
http: http,
enricher: util.NewContainerMetadataEnricher(base, true),
mod: mod,
}, nil
}

Expand All @@ -81,7 +90,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
m.enricher.Start()

body, err := m.http.FetchContent()
body, err := m.mod.GetKubeletStats(m.http)
if err != nil {
return errors.Wrap(err, "error doing HTTP request to fetch 'container' Metricset data")

Expand Down
70 changes: 56 additions & 14 deletions metricbeat/module/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pkg/errors"
dto "github.com/prometheus/client_model/go"

"github.com/elastic/beats/v7/metricbeat/helper"
p "github.com/elastic/beats/v7/metricbeat/helper/prometheus"
"github.com/elastic/beats/v7/metricbeat/mb"
)
Expand All @@ -38,7 +39,8 @@ func init() {

type Module interface {
mb.Module
GetSharedFamilies(prometheus p.Prometheus) ([]*dto.MetricFamily, error)
GetStateMetricsFamilies(prometheus p.Prometheus) ([]*dto.MetricFamily, error)
GetKubeletStats(http *helper.HTTP) ([]byte, error)
}

type familiesCache struct {
Expand All @@ -53,55 +55,95 @@ type kubeStateMetricsCache struct {
}

func (c *kubeStateMetricsCache) getCacheMapEntry(hash uint64) *familiesCache {
c.lock.Lock()
defer c.lock.Unlock()
if _, ok := c.cacheMap[hash]; !ok {
c.cacheMap[hash] = &familiesCache{}
}
return c.cacheMap[hash]
}

type statsCache struct {
sharedStats []byte
lastFetchErr error
lastFetchTimestamp time.Time
}

type kubeletStatsCache struct {
cacheMap map[uint64]*statsCache
lock sync.Mutex
}

func (c *kubeletStatsCache) getCacheMapEntry(hash uint64) *statsCache {
if _, ok := c.cacheMap[hash]; !ok {
c.cacheMap[hash] = &statsCache{}
}
return c.cacheMap[hash]
}

type module struct {
mb.BaseModule

kubeStateMetricsCache *kubeStateMetricsCache
familiesCache *familiesCache
kubeletStatsCache *kubeletStatsCache
cacheHash uint64
}

func ModuleBuilder() func(base mb.BaseModule) (mb.Module, error) {
kubeStateMetricsCache := &kubeStateMetricsCache{
cacheMap: make(map[uint64]*familiesCache),
}
kubeletStatsCache := &kubeletStatsCache{
cacheMap: make(map[uint64]*statsCache),
}
return func(base mb.BaseModule) (mb.Module, error) {
hash, err := generateCacheHash(base.Config().Hosts)
if err != nil {
return nil, errors.Wrap(err, "error generating cache hash for kubeStateMetricsCache")
}
// NOTE: These entries will be never removed, this can be a leak if
// metricbeat is used to monitor clusters dynamically created.
// (https://github.com/elastic/beats/pull/25640#discussion_r633395213)
familiesCache := kubeStateMetricsCache.getCacheMapEntry(hash)
m := module{
BaseModule: base,
kubeStateMetricsCache: kubeStateMetricsCache,
familiesCache: familiesCache,
kubeletStatsCache: kubeletStatsCache,
cacheHash: hash,
}
return &m, nil
}
}

func (m *module) GetSharedFamilies(prometheus p.Prometheus) ([]*dto.MetricFamily, error) {
func (m *module) GetStateMetricsFamilies(prometheus p.Prometheus) ([]*dto.MetricFamily, error) {
m.kubeStateMetricsCache.lock.Lock()
defer m.kubeStateMetricsCache.lock.Unlock()

now := time.Now()
// NOTE: These entries will be never removed, this can be a leak if
// metricbeat is used to monitor clusters dynamically created.
// (https://github.com/elastic/beats/pull/25640#discussion_r633395213)
familiesCache := m.kubeStateMetricsCache.getCacheMapEntry(m.cacheHash)

if familiesCache.lastFetchTimestamp.IsZero() || now.Sub(familiesCache.lastFetchTimestamp) > m.Config().Period {
familiesCache.sharedFamilies, familiesCache.lastFetchErr = prometheus.GetFamilies()
familiesCache.lastFetchTimestamp = now
}

return familiesCache.sharedFamilies, familiesCache.lastFetchErr
}

func (m *module) GetKubeletStats(http *helper.HTTP) ([]byte, error) {
m.kubeletStatsCache.lock.Lock()
defer m.kubeletStatsCache.lock.Unlock()

now := time.Now()

// NOTE: These entries will be never removed, this can be a leak if
// metricbeat is used to monitor clusters dynamically created.
// (https://github.com/elastic/beats/pull/25640#discussion_r633395213)
statsCache := m.kubeletStatsCache.getCacheMapEntry(m.cacheHash)

if m.familiesCache.lastFetchTimestamp.IsZero() || now.Sub(m.familiesCache.lastFetchTimestamp) > m.Config().Period {
m.familiesCache.sharedFamilies, m.familiesCache.lastFetchErr = prometheus.GetFamilies()
m.familiesCache.lastFetchTimestamp = now
if statsCache.lastFetchTimestamp.IsZero() || now.Sub(statsCache.lastFetchTimestamp) > m.Config().Period {
statsCache.sharedStats, statsCache.lastFetchErr = http.FetchContent()
statsCache.lastFetchTimestamp = now
}

return m.familiesCache.sharedFamilies, m.familiesCache.lastFetchErr
return statsCache.sharedStats, statsCache.lastFetchErr
}

func generateCacheHash(host []string) (uint64, error) {
Expand Down
12 changes: 10 additions & 2 deletions metricbeat/module/kubernetes/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package node

import (
"fmt"

"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/common"
Expand All @@ -26,6 +28,7 @@ import (
"github.com/elastic/beats/v7/metricbeat/helper"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/mb/parse"
k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes"
"github.com/elastic/beats/v7/metricbeat/module/kubernetes/util"
)

Expand Down Expand Up @@ -60,6 +63,7 @@ type MetricSet struct {
mb.BaseMetricSet
http *helper.HTTP
enricher util.Enricher
mod k8smod.Module
}

// New create a new instance of the MetricSet
Expand All @@ -70,11 +74,15 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
if err != nil {
return nil, err
}

mod, ok := base.Module().(k8smod.Module)
if !ok {
return nil, fmt.Errorf("must be child of kubernetes module")
}
return &MetricSet{
BaseMetricSet: base,
http: http,
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Node{}, false),
mod: mod,
}, nil
}

Expand All @@ -84,7 +92,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
m.enricher.Start()

body, err := m.http.FetchContent()
body, err := m.mod.GetKubeletStats(m.http)
if err != nil {
return errors.Wrap(err, "error doing HTTP request to fetch 'node' Metricset data")

Expand Down
14 changes: 11 additions & 3 deletions metricbeat/module/kubernetes/pod/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@
package pod

import (
"fmt"

"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/common/kubernetes"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/metricbeat/helper"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/mb/parse"
k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes"
"github.com/elastic/beats/v7/metricbeat/module/kubernetes/util"
)

Expand Down Expand Up @@ -59,6 +62,7 @@ type MetricSet struct {
mb.BaseMetricSet
http *helper.HTTP
enricher util.Enricher
mod k8smod.Module
}

// New create a new instance of the MetricSet
Expand All @@ -69,11 +73,15 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
if err != nil {
return nil, err
}

mod, ok := base.Module().(k8smod.Module)
if !ok {
return nil, fmt.Errorf("must be child of kubernetes module")
}
return &MetricSet{
BaseMetricSet: base,
http: http,
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Pod{}, true),
mod: mod,
}, nil
}

Expand All @@ -83,9 +91,9 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
m.enricher.Start()

body, err := m.http.FetchContent()
body, err := m.mod.GetKubeletStats(m.http)
if err != nil {
return errors.Wrap(err, "error doing HTTP request to fetch 'pod' Metricset data")
return errors.Wrap(err, "error fetching shared data for 'pod' Metricset")
}

events, err := eventMapping(body, util.PerfMetrics)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
m.enricher.Start()

families, err := m.mod.GetSharedFamilies(m.prometheus)
families, err := m.mod.GetStateMetricsFamilies(m.prometheus)
if err != nil {
return errors.Wrap(err, "error getting families")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func NewCronJobMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) {
//
// Copied from other kube state metrics.
func (m *CronJobMetricSet) Fetch(reporter mb.ReporterV2) error {
families, err := m.mod.GetSharedFamilies(m.prometheus)
families, err := m.mod.GetStateMetricsFamilies(m.prometheus)
if err != nil {
return errors.Wrap(err, "error getting family metrics")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
m.enricher.Start()

families, err := m.mod.GetSharedFamilies(m.prometheus)
families, err := m.mod.GetStateMetricsFamilies(m.prometheus)
if err != nil {
m.Logger().Error(err)
reporter.Error(err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
m.enricher.Start()

families, err := m.mod.GetSharedFamilies(m.prometheus)
families, err := m.mod.GetStateMetricsFamilies(m.prometheus)
if err != nil {
m.Logger().Error(err)
reporter.Error(err)
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/module/kubernetes/state_node/state_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
m.enricher.Start()

families, err := m.mod.GetSharedFamilies(m.prometheus)
families, err := m.mod.GetStateMetricsFamilies(m.prometheus)
if err != nil {
return errors.Wrap(err, "error doing HTTP request to fetch 'state_node' Metricset data")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func NewPersistentVolumeMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) {
// Fetch prometheus metrics and treats those prefixed by mb.ModuleDataKey as
// module rooted fields at the event that gets reported
func (m *PersistentVolumeMetricSet) Fetch(reporter mb.ReporterV2) {
families, err := m.mod.GetSharedFamilies(m.prometheus)
families, err := m.mod.GetStateMetricsFamilies(m.prometheus)
if err != nil {
m.Logger().Error(err)
reporter.Error(err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func NewpersistentvolumeclaimMetricSet(base mb.BaseMetricSet) (mb.MetricSet, err
// module rooted fields at the event that gets reported
func (m *persistentvolumeclaimMetricSet) Fetch(reporter mb.ReporterV2) error {

families, err := m.mod.GetSharedFamilies(m.prometheus)
families, err := m.mod.GetStateMetricsFamilies(m.prometheus)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/module/kubernetes/state_pod/state_pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
m.enricher.Start()

families, err := m.mod.GetSharedFamilies(m.prometheus)
families, err := m.mod.GetStateMetricsFamilies(m.prometheus)
if err != nil {
m.Logger().Error(err)
reporter.Error(err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
m.enricher.Start()

families, err := m.mod.GetSharedFamilies(m.prometheus)
families, err := m.mod.GetStateMetricsFamilies(m.prometheus)
if err != nil {
m.Logger().Error(err)
reporter.Error(err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func NewResourceQuotaMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) {
// Fetch prometheus metrics and treats those prefixed by mb.ModuleDataKey as
// module rooted fields at the event that gets reported
func (m *ResourceQuotaMetricSet) Fetch(reporter mb.ReporterV2) {
families, err := m.mod.GetSharedFamilies(m.prometheus)
families, err := m.mod.GetStateMetricsFamilies(m.prometheus)
if err != nil {
m.Logger().Error(err)
reporter.Error(err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func NewServiceMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) {
func (m *ServiceMetricSet) Fetch(reporter mb.ReporterV2) {
m.enricher.Start()

families, err := m.mod.GetSharedFamilies(m.prometheus)
families, err := m.mod.GetStateMetricsFamilies(m.prometheus)
if err != nil {
m.Logger().Error(err)
reporter.Error(err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
m.enricher.Start()

families, err := m.mod.GetSharedFamilies(m.prometheus)
families, err := m.mod.GetStateMetricsFamilies(m.prometheus)
if err != nil {
m.Logger().Error(err)
reporter.Error(err)
Expand Down
Loading

0 comments on commit a39dd00

Please sign in to comment.