Skip to content

Commit

Permalink
Feature/remove k8s cache (#32539) (#32670)
Browse files Browse the repository at this point in the history
* replaced internal expiring cache with non expiring dictionary in memory
* fixed a bug that prevented to export pod/container metrics when node/state_node metricsets were disabled

(cherry picked from commit 5503761)

Co-authored-by: Giuseppe Santoro <giuseppe.santoro@elastic.co>
  • Loading branch information
mergify[bot] and gsantoro authored Aug 15, 2022
1 parent 50f1b63 commit 27dd4d2
Show file tree
Hide file tree
Showing 27 changed files with 1,337 additions and 295 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Fix to ARN parsing for Cloudwatch resource names with leading slashes {pull}32358[32358]
- Fix an infinite loop in AWS billing metricset. {pull}32626[32626]
- Add missing metrics in AWS Transit Gateway module {pull}32617[32617]
- Replace internal expiring cache used by the Kubernetes module with in-memory dictionary {pull}32539[32539]

*Packetbeat*

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
{
"node": {
"nodeName": "gke-beats-default-pool-a5b33e2e-hdww",
"systemContainers": [
{
"name": "kubelet",
"startTime": "2017-04-18T12:53:49Z",
"cpu": {
"time": "2017-04-20T08:06:46Z",
"usageNanoCores": 11263994,
"usageCoreNanoSeconds": 2357800908948
},
"memory": {
"time": "2017-04-20T08:06:46Z",
"usageBytes": 36683776,
"workingSetBytes": 36495360,
"rssBytes": 35512320,
"pageFaults": 100835242,
"majorPageFaults": 0
},
"userDefinedMetrics": null
}
],
"startTime": "2017-04-18T12:53:49Z",
"cpu": {
"time": "2017-04-20T08:06:41Z",
"usageNanoCores": 18691146,
"usageCoreNanoSeconds": 4189523881380
},
"memory": {
"time": "2017-04-20T08:06:41Z",
"availableBytes": 1768316928,
"usageBytes": 2764943360,
"workingSetBytes": 2111090688,
"rssBytes": 2150400,
"pageFaults": 131567,
"majorPageFaults": 103
},
"network": {
"time": "2017-04-20T08:06:41Z",
"rxBytes": 1115133198,
"rxErrors": 0,
"txBytes": 812729002,
"txErrors": 0
},
"fs": {
"availableBytes": 98727014400,
"capacityBytes": 101258067968,
"usedBytes": 2514276352,
"inodesFree": 18446744073709551615,
"inodes": 6258720,
"inodesUsed": 138624
},
"runtime": {
"imageFs": {
"availableBytes": 98727014400,
"capacityBytes": 101258067968,
"usedBytes": 860204379,
"inodesFree": 18446744073709551615,
"inodes": 6258720,
"inodesUsed": 138624
}
}
},
"pods": [
{
"podRef": {
"name": "nginx-deployment-2303442956-pcqfc",
"namespace": "default",
"uid": "beabc196-2456-11e7-a3ad-42010a840235"
},
"startTime": "2017-04-18T16:47:44Z",
"containers": [
{
"name": "nginx",
"startTime": "2017-04-18T16:47:44Z",
"cpu": {
"time": "2017-04-20T08:06:34Z",
"usageNanoCores": 11263994,
"usageCoreNanoSeconds": 43959424
},
"memory": {
"time": "2017-04-20T08:06:34Z",
"usageBytes": 1462272,
"workingSetBytes": 1454080,
"rssBytes": 1409024,
"pageFaults": 841,
"majorPageFaults": 0
},
"rootfs": {
"availableBytes": 98727014400,
"capacityBytes": 101258067968,
"usedBytes": 61440,
"inodesFree": 6120096,
"inodes": 6258720,
"inodesUsed": 21
},
"logs": {
"availableBytes": 98727014400,
"capacityBytes": 101258067968,
"usedBytes": 28672,
"inodesFree": 6120096,
"inodes": 6258720,
"inodesUsed": 138624
},
"userDefinedMetrics": null
},
{
"name": "sidecar",
"startTime": "2017-04-18T16:47:44Z",
"cpu": {
"time": "2017-04-20T08:06:34Z",
"usageNanoCores": 11263994,
"usageCoreNanoSeconds": 43959424
},
"memory": {
"time": "2017-04-20T08:06:34Z",
"usageBytes": 1462272,
"workingSetBytes": 1454080,
"rssBytes": 1409024,
"pageFaults": 841,
"majorPageFaults": 0
},
"rootfs": {
"availableBytes": 98727014400,
"capacityBytes": 101258067968,
"usedBytes": 61440,
"inodesFree": 6120096,
"inodes": 6258720,
"inodesUsed": 21
},
"logs": {
"availableBytes": 98727014400,
"capacityBytes": 101258067968,
"usedBytes": 28672,
"inodesFree": 6120096,
"inodes": 6258720,
"inodesUsed": 138624
},
"userDefinedMetrics": null
}
],
"network": {
"time": "2017-04-20T08:06:41Z",
"rxBytes": 107056,
"rxErrors": 0,
"txBytes": 72447,
"txErrors": 0
},
"volume": [
{
"availableBytes": 1939689472,
"capacityBytes": 1939701760,
"usedBytes": 12288,
"inodesFree": 473551,
"inodes": 473560,
"inodesUsed": 9,
"name": "default-token-sg8x5"
}
]
}
]
}
4 changes: 2 additions & 2 deletions metricbeat/module/kubernetes/container/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{
BaseMetricSet: base,
http: http,
enricher: util.NewContainerMetadataEnricher(base, mod.GetPerfMetricsCache(), true),
enricher: util.NewContainerMetadataEnricher(base, mod.GetMetricsRepo(), true),
mod: mod,
}, nil
}
Expand All @@ -93,7 +93,7 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
return
}

events, err := eventMapping(body, m.mod.GetPerfMetricsCache(), m.Logger())
events, err := eventMapping(body, m.mod.GetMetricsRepo(), m.Logger())
if err != nil {
m.Logger().Error(err)
reporter.Error(err)
Expand Down
36 changes: 30 additions & 6 deletions metricbeat/module/kubernetes/container/container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"io/ioutil"
"os"
"testing"
"time"

"github.com/stretchr/testify/assert"

Expand All @@ -44,12 +43,25 @@ func TestEventMapping(t *testing.T) {
body, err := ioutil.ReadAll(f)
assert.NoError(t, err, "cannot read test file "+testFile)

cache := util.NewPerfMetricsCache(120 * time.Second)
cache.NodeCoresAllocatable.Set("gke-beats-default-pool-a5b33e2e-hdww", 2)
cache.NodeMemAllocatable.Set("gke-beats-default-pool-a5b33e2e-hdww", 146227200)
cache.ContainerMemLimit.Set(util.ContainerUID("default", "nginx-deployment-2303442956-pcqfc", "nginx"), 14622720)
metricsRepo := util.NewMetricsRepo()

events, err := eventMapping(body, cache, logger)
nodeName := "gke-beats-default-pool-a5b33e2e-hdww"

nodeMetrics := util.NewNodeMetrics()
nodeMetrics.CoresAllocatable = util.NewFloat64Metric(2)
nodeMetrics.MemoryAllocatable = util.NewFloat64Metric(146227200)
addNodeMetric(metricsRepo, nodeName, nodeMetrics)

namespace := "default"
podName := "nginx-deployment-2303442956-pcqfc"
podId := util.NewPodId(namespace, podName)
containerName := "nginx"

containerMetrics := util.NewContainerMetrics()
containerMetrics.MemoryLimit = util.NewFloat64Metric(14622720)
addContainerMetric(metricsRepo, nodeName, podId, containerName, containerMetrics)

events, err := eventMapping(body, metricsRepo, logger)
assert.NoError(t, err, "error mapping "+testFile)

assert.Len(t, events, 1, "got wrong number of events")
Expand Down Expand Up @@ -107,3 +119,15 @@ func testValue(t *testing.T, event mapstr.M, field string, value interface{}) {
assert.NoError(t, err, "Could not read field "+field)
assert.EqualValues(t, data, value, "Wrong value for field "+field)
}

func addContainerMetric(metricsRepo *util.MetricsRepo, nodeName string, podId util.PodId, containerName string, metrics *util.ContainerMetrics) {
nodeStore, _ := metricsRepo.AddNodeStore(nodeName)
podStore, _ := nodeStore.AddPodStore(podId)
containerStore, _ := podStore.AddContainerStore(containerName)
containerStore.SetContainerMetrics(metrics)
}

func addNodeMetric(metricsRepo *util.MetricsRepo, nodeName string, nodeMetrics *util.NodeMetrics) {
nodeStore, _ := metricsRepo.AddNodeStore(nodeName)
nodeStore.SetNodeMetrics(nodeMetrics)
}
50 changes: 39 additions & 11 deletions metricbeat/module/kubernetes/container/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"github.com/elastic/elastic-agent-libs/mapstr"
)

func eventMapping(content []byte, perfMetrics *util.PerfMetricsCache, logger *logp.Logger) ([]mapstr.M, error) {
func eventMapping(content []byte, metricsRepo *util.MetricsRepo, logger *logp.Logger) ([]mapstr.M, error) {
events := []mapstr.M{}
var summary kubernetes.Summary

Expand All @@ -39,9 +39,23 @@ func eventMapping(content []byte, perfMetrics *util.PerfMetricsCache, logger *lo
}

node := summary.Node
nodeCores := perfMetrics.NodeCoresAllocatable.Get(node.NodeName)
nodeMem := perfMetrics.NodeMemAllocatable.Get(node.NodeName)

nodeCores := 0.0
nodeMem := 0.0

nodeStore := metricsRepo.GetNodeStore(node.NodeName)
nodeMetrics := nodeStore.GetNodeMetrics()
if nodeMetrics.CoresAllocatable != nil {
nodeCores = nodeMetrics.CoresAllocatable.Value
}
if nodeMetrics.MemoryAllocatable != nil {
nodeMem = nodeMetrics.MemoryAllocatable.Value
}

for _, pod := range summary.Pods {
podId := util.NewPodId(pod.PodRef.Namespace, pod.PodRef.Name)
podStore := nodeStore.GetPodStore(podId)

for _, container := range pod.Containers {
containerEvent := mapstr.M{
mb.ModuleDataKey: mapstr.M{
Expand Down Expand Up @@ -127,17 +141,31 @@ func eventMapping(content []byte, perfMetrics *util.PerfMetricsCache, logger *lo
kubernetes2.ShouldPut(containerEvent, "memory.usage.node.pct", float64(container.Memory.UsageBytes)/nodeMem, logger)
}

cuid := util.ContainerUID(pod.PodRef.Namespace, pod.PodRef.Name, container.Name)
coresLimit := perfMetrics.ContainerCoresLimit.GetWithDefault(cuid, nodeCores)
memLimit := perfMetrics.ContainerMemLimit.GetWithDefault(cuid, nodeMem)
containerStore := podStore.GetContainerStore(container.Name)
containerMetrics := containerStore.GetContainerMetrics()

containerCoresLimit := nodeCores
if containerMetrics.CoresLimit != nil {
containerCoresLimit = containerMetrics.CoresLimit.Value
}

containerMemLimit := nodeMem
if containerMetrics.MemoryLimit != nil {
containerMemLimit = containerMetrics.MemoryLimit.Value
}

// NOTE:
// we don't currently check if `containerMemLimit` > `nodeMem` as we do in `kubernetes/pod/data.go`.
// There we do check, since if a container doesn't have a limit set, it will inherit the node limits and the sum of all
// the container limits can be greater than the node limits. We assume here the user can set correct limits on containers.

if coresLimit > 0 {
kubernetes2.ShouldPut(containerEvent, "cpu.usage.limit.pct", float64(container.CPU.UsageNanoCores)/1e9/coresLimit, logger)
if containerCoresLimit > 0 {
kubernetes2.ShouldPut(containerEvent, "cpu.usage.limit.pct", float64(container.CPU.UsageNanoCores)/1e9/containerCoresLimit, logger)
}

if memLimit > 0 {
kubernetes2.ShouldPut(containerEvent, "memory.usage.limit.pct", float64(container.Memory.UsageBytes)/memLimit, logger)
kubernetes2.ShouldPut(containerEvent, "memory.workingset.limit.pct", float64(container.Memory.WorkingSetBytes)/memLimit, logger)
if containerMemLimit > 0 {
kubernetes2.ShouldPut(containerEvent, "memory.usage.limit.pct", float64(container.Memory.UsageBytes)/containerMemLimit, logger)
kubernetes2.ShouldPut(containerEvent, "memory.workingset.limit.pct", float64(container.Memory.WorkingSetBytes)/containerMemLimit, logger)
}

events = append(events, containerEvent)
Expand Down
19 changes: 6 additions & 13 deletions metricbeat/module/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type Module interface {
mb.Module
GetStateMetricsFamilies(prometheus p.Prometheus) ([]*dto.MetricFamily, error)
GetKubeletStats(http *helper.HTTP) ([]byte, error)
GetPerfMetricsCache() *util.PerfMetricsCache
GetMetricsRepo() *util.MetricsRepo
}

type familiesCache struct {
Expand Down Expand Up @@ -86,7 +86,7 @@ type module struct {

kubeStateMetricsCache *kubeStateMetricsCache
kubeletStatsCache *kubeletStatsCache
perfMetrics *util.PerfMetricsCache
metricsRepo *util.MetricsRepo
cacheHash uint64
}

Expand All @@ -97,25 +97,18 @@ func ModuleBuilder() func(base mb.BaseModule) (mb.Module, error) {
kubeletStatsCache := &kubeletStatsCache{
cacheMap: make(map[uint64]*statsCache),
}
perfMetrics := util.NewPerfMetricsCache(0)
metricsRepo := util.NewMetricsRepo()
return func(base mb.BaseModule) (mb.Module, error) {
hash, err := generateCacheHash(base.Config().Hosts)
if err != nil {
return nil, fmt.Errorf("error generating cache hash for kubeStateMetricsCache: %w", err)
}

// NOTE: `Period * 2` is an arbitrary value to make the cache NEVER to expire before the next scraping run
// if different metricsets have different periods, we will effectively set (timeout = max(Period) * 2)
minCacheExpirationTime := base.Config().Period * 2
if perfMetrics.GetTimeout() < minCacheExpirationTime {
perfMetrics.SetOrUpdateTimeout(minCacheExpirationTime)
}

m := module{
BaseModule: base,
kubeStateMetricsCache: kubeStateMetricsCache,
kubeletStatsCache: kubeletStatsCache,
perfMetrics: perfMetrics,
metricsRepo: metricsRepo,
cacheHash: hash,
}
return &m, nil
Expand Down Expand Up @@ -167,6 +160,6 @@ func generateCacheHash(host []string) (uint64, error) {
return id, nil
}

func (m *module) GetPerfMetricsCache() *util.PerfMetricsCache {
return m.perfMetrics
func (m *module) GetMetricsRepo() *util.MetricsRepo {
return m.metricsRepo
}
2 changes: 1 addition & 1 deletion metricbeat/module/kubernetes/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{
BaseMetricSet: base,
http: http,
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Node{}, mod.GetPerfMetricsCache(), false),
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Node{}, mod.GetMetricsRepo(), false),
mod: mod,
}, nil
}
Expand Down
Loading

0 comments on commit 27dd4d2

Please sign in to comment.