Skip to content

Commit

Permalink
[8.0](backport elastic#31785) Feature/cache expiration (elastic#31975)
Browse files Browse the repository at this point in the history
* Feature/cache expiration (elastic#31785)
  • Loading branch information
mergify[bot] authored Jun 20, 2022
1 parent d84e1f0 commit 77021f2
Show file tree
Hide file tree
Showing 24 changed files with 141 additions and 81 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ https://github.com/elastic/beats/compare/v8.0.1\...main[Check the HEAD diff]
- Enhance metricbeat on openshift documentation {pull}30054[30054]
- Fixed missing ZooKeeper metrics due compatibility issues with versions >= 3.6.0 {pull}30068[30068]
- Fix Docker module: rename fields on dashboards. {pull}30500[30500]
- Fix kubernetes module's internal cache expiration issue. This avoid metrics like `kubernetes.container.cpu.usage.limit.pct` from not being populated. {pull}31785[31785]

*Packetbeat*

Expand Down
4 changes: 3 additions & 1 deletion libbeat/common/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,9 @@ func (c *Cache) StartJanitor(interval time.Duration) {

// StopJanitor stops the goroutine created by StartJanitor.
func (c *Cache) StopJanitor() {
close(c.janitorQuit)
if c.janitorQuit != nil {
close(c.janitorQuit)
}
}

// get returns the non-expired values from the cache.
Expand Down
12 changes: 12 additions & 0 deletions libbeat/common/kubernetes/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,18 @@ func NewInformer(client kubernetes.Interface, resource Resource, opts WatchOptio
}

objType = "statefulset"
case *DaemonSet:
ss := client.AppsV1().DaemonSets(opts.Namespace)
listwatch = &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return ss.List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return ss.Watch(ctx, options)
},
}

objType = "daemonset"
case *Service:
svc := client.CoreV1().Services(opts.Namespace)
listwatch = &cache.ListWatch{
Expand Down
3 changes: 3 additions & 0 deletions libbeat/common/kubernetes/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ type ReplicaSet = appsv1.ReplicaSet
// StatefulSet data
type StatefulSet = appsv1.StatefulSet

// DaemonSet data
type DaemonSet = appsv1.DaemonSet

// Service data
type Service = v1.Service

Expand Down
9 changes: 2 additions & 7 deletions metricbeat/module/kubernetes/container/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package container
import (
"fmt"

"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/metricbeat/helper"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/mb/parse"
Expand All @@ -38,8 +37,6 @@ var (
DefaultScheme: defaultScheme,
DefaultPath: defaultPath,
}.Build()

logger = logp.NewLogger("kubernetes.container")
)

// init registers the MetricSet with the central registry.
Expand Down Expand Up @@ -77,7 +74,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{
BaseMetricSet: base,
http: http,
enricher: util.NewContainerMetadataEnricher(base, true),
enricher: util.NewContainerMetadataEnricher(base, mod.GetPerfMetricsCache(), true),
mod: mod,
}, nil
}
Expand All @@ -95,7 +92,7 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
return
}

events, err := eventMapping(body, util.PerfMetrics)
events, err := eventMapping(body, m.mod.GetPerfMetricsCache())
if err != nil {
m.Logger().Error(err)
reporter.Error(err)
Expand All @@ -116,8 +113,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
return
}
}

return
}

// Close stops this metricset
Expand Down
3 changes: 2 additions & 1 deletion metricbeat/module/kubernetes/container/container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"io/ioutil"
"os"
"testing"
"time"

"github.com/stretchr/testify/assert"

Expand All @@ -40,7 +41,7 @@ func TestEventMapping(t *testing.T) {
body, err := ioutil.ReadAll(f)
assert.NoError(t, err, "cannot read test file "+testFile)

cache := util.NewPerfMetricsCache()
cache := util.NewPerfMetricsCache(120 * time.Second)
cache.NodeCoresAllocatable.Set("gke-beats-default-pool-a5b33e2e-hdww", 2)
cache.NodeMemAllocatable.Set("gke-beats-default-pool-a5b33e2e-hdww", 146227200)
cache.ContainerMemLimit.Set(util.ContainerUID("default", "nginx-deployment-2303442956-pcqfc", "nginx"), 14622720)
Expand Down
21 changes: 19 additions & 2 deletions metricbeat/module/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,17 @@
package kubernetes

import (
"fmt"
"sync"
"time"

"github.com/mitchellh/hashstructure"
"github.com/pkg/errors"
dto "github.com/prometheus/client_model/go"

"github.com/elastic/beats/v7/metricbeat/helper"
p "github.com/elastic/beats/v7/metricbeat/helper/prometheus"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/module/kubernetes/util"
)

func init() {
Expand All @@ -41,6 +42,7 @@ type Module interface {
mb.Module
GetStateMetricsFamilies(prometheus p.Prometheus) ([]*dto.MetricFamily, error)
GetKubeletStats(http *helper.HTTP) ([]byte, error)
GetPerfMetricsCache() *util.PerfMetricsCache
}

type familiesCache struct {
Expand Down Expand Up @@ -84,6 +86,7 @@ type module struct {

kubeStateMetricsCache *kubeStateMetricsCache
kubeletStatsCache *kubeletStatsCache
perfMetrics *util.PerfMetricsCache
cacheHash uint64
}

Expand All @@ -94,15 +97,25 @@ func ModuleBuilder() func(base mb.BaseModule) (mb.Module, error) {
kubeletStatsCache := &kubeletStatsCache{
cacheMap: make(map[uint64]*statsCache),
}
perfMetrics := util.NewPerfMetricsCache(0)
return func(base mb.BaseModule) (mb.Module, error) {
hash, err := generateCacheHash(base.Config().Hosts)
if err != nil {
return nil, errors.Wrap(err, "error generating cache hash for kubeStateMetricsCache")
return nil, fmt.Errorf("error generating cache hash for kubeStateMetricsCache: %w", err)
}

// NOTE: `Period * 2` is an arbitrary value to make the cache NEVER to expire before the next scraping run
// if different metricsets have different periods, we will effectively set (timeout = max(Period) * 2)
minCacheExpirationTime := base.Config().Period * 2
if perfMetrics.GetTimeout() < minCacheExpirationTime {
perfMetrics.SetOrUpdateTimeout(minCacheExpirationTime)
}

m := module{
BaseModule: base,
kubeStateMetricsCache: kubeStateMetricsCache,
kubeletStatsCache: kubeletStatsCache,
perfMetrics: perfMetrics,
cacheHash: hash,
}
return &m, nil
Expand Down Expand Up @@ -153,3 +166,7 @@ func generateCacheHash(host []string) (uint64, error) {
}
return id, nil
}

func (m *module) GetPerfMetricsCache() *util.PerfMetricsCache {
return m.perfMetrics
}
7 changes: 1 addition & 6 deletions metricbeat/module/kubernetes/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/kubernetes"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/metricbeat/helper"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/mb/parse"
Expand All @@ -40,8 +39,6 @@ var (
DefaultScheme: defaultScheme,
DefaultPath: defaultPath,
}.Build()

logger = logp.NewLogger("kubernetes.node")
)

// init registers the MetricSet with the central registry.
Expand Down Expand Up @@ -79,7 +76,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{
BaseMetricSet: base,
http: http,
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Node{}, false),
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Node{}, mod.GetPerfMetricsCache(), false),
mod: mod,
}, nil
}
Expand Down Expand Up @@ -115,8 +112,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
m.Logger().Debug("error trying to emit event")
return
}

return
}

// Close stops this metricset
Expand Down
8 changes: 2 additions & 6 deletions metricbeat/module/kubernetes/pod/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"fmt"

"github.com/elastic/beats/v7/libbeat/common/kubernetes"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/metricbeat/helper"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/mb/parse"
Expand All @@ -39,8 +38,6 @@ var (
DefaultScheme: defaultScheme,
DefaultPath: defaultPath,
}.Build()

logger = logp.NewLogger("kubernetes.pod")
)

// init registers the MetricSet with the central registry.
Expand Down Expand Up @@ -78,7 +75,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{
BaseMetricSet: base,
http: http,
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Pod{}, true),
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Pod{}, mod.GetPerfMetricsCache(), true),
mod: mod,
}, nil
}
Expand All @@ -96,7 +93,7 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
return
}

events, err := eventMapping(body, util.PerfMetrics)
events, err := eventMapping(body, m.mod.GetPerfMetricsCache())
if err != nil {
m.Logger().Error(err)
reporter.Error(err)
Expand All @@ -117,7 +114,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
return
}
}
return
}

// Close stops this metricset
Expand Down
3 changes: 2 additions & 1 deletion metricbeat/module/kubernetes/pod/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"io/ioutil"
"os"
"testing"
"time"

"github.com/stretchr/testify/assert"

Expand All @@ -40,7 +41,7 @@ func TestEventMapping(t *testing.T) {
body, err := ioutil.ReadAll(f)
assert.NoError(t, err, "cannot read test file "+testFile)

cache := util.NewPerfMetricsCache()
cache := util.NewPerfMetricsCache(120 * time.Second)
cache.NodeCoresAllocatable.Set("gke-beats-default-pool-a5b33e2e-hdww", 2)
cache.NodeMemAllocatable.Set("gke-beats-default-pool-a5b33e2e-hdww", 146227200)
cache.ContainerMemLimit.Set(util.ContainerUID("default", "nginx-deployment-2303442956-pcqfc", "nginx"), 14622720)
Expand Down
18 changes: 7 additions & 11 deletions metricbeat/module/kubernetes/state_container/state_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import (
"fmt"
"strings"

"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/common"
p "github.com/elastic/beats/v7/metricbeat/helper/prometheus"
"github.com/elastic/beats/v7/metricbeat/mb"
Expand All @@ -34,8 +32,6 @@ import (
const (
defaultScheme = "http"
defaultPath = "/metrics"
// Nanocores conversion 10^9
nanocores = 1000000000
)

var (
Expand Down Expand Up @@ -121,7 +117,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{
BaseMetricSet: base,
prometheus: prometheus,
enricher: util.NewContainerMetadataEnricher(base, false),
enricher: util.NewContainerMetadataEnricher(base, mod.GetPerfMetricsCache(), false),
mod: mod,
}, nil
}
Expand All @@ -134,11 +130,11 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {

families, err := m.mod.GetStateMetricsFamilies(m.prometheus)
if err != nil {
return errors.Wrap(err, "error getting families")
return fmt.Errorf("error getting families: %w", err)
}
events, err := m.prometheus.ProcessMetrics(families, mapping)
if err != nil {
return errors.Wrap(err, "error getting event")
return fmt.Errorf("error getting event: %w", err)
}

m.enricher.Enrich(events)
Expand All @@ -153,15 +149,15 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
cID := (containerID).(string)
split := strings.Index(cID, "://")
if split != -1 {
containerFields.Put("runtime", cID[:split])
containerFields.Put("id", cID[split+3:])
util.ShouldPut(containerFields, "runtime", cID[:split], m.Logger())
util.ShouldPut(containerFields, "id", cID[split+3:], m.Logger())
}
}
if containerImage, ok := event["image"]; ok {
cImage := (containerImage).(string)
containerFields.Put("image.name", cImage)
util.ShouldPut(containerFields, "image.name", cImage, m.Logger())
// remove kubernetes.container.image field as value is the same as ECS container.image.name field
event.Delete("image")
util.ShouldDelete(event, "image", m.Logger())
}

e, err := util.CreateEvent(event, "kubernetes.container")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{
BaseMetricSet: base,
prometheus: prometheus,
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.ReplicaSet{}, false),
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.DaemonSet{}, mod.GetPerfMetricsCache(), false),
mod: mod,
}, nil
}
Expand Down Expand Up @@ -126,8 +126,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
return
}
}

return
}

// Close stops this metricset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{
BaseMetricSet: base,
prometheus: prometheus,
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Deployment{}, false),
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Deployment{}, mod.GetPerfMetricsCache(), false),
mod: mod,
}, nil
}
Expand Down Expand Up @@ -128,8 +128,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
return
}
}

return
}

// Close stops this metricset
Expand Down
4 changes: 1 addition & 3 deletions metricbeat/module/kubernetes/state_job/state_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{
BaseMetricSet: base,
prometheus: prometheus,
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Job{}, false),
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Job{}, mod.GetPerfMetricsCache(), false),
mod: mod,
}, nil
}
Expand Down Expand Up @@ -145,8 +145,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
return
}
}

return
}

// Close stops this metricset
Expand Down
Loading

0 comments on commit 77021f2

Please sign in to comment.