diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index d3682f0ed51..b64bcc480a3 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -67,6 +67,7 @@ https://github.com/elastic/beats/compare/v8.0.1\...main[Check the HEAD diff] - Enhance metricbeat on openshift documentation {pull}30054[30054] - Fixed missing ZooKeeper metrics due compatibility issues with versions >= 3.6.0 {pull}30068[30068] - Fix Docker module: rename fields on dashboards. {pull}30500[30500] +- Fix kubernetes module's internal cache expiration issue. This avoid metrics like `kubernetes.container.cpu.usage.limit.pct` from not being populated. {pull}31785[31785] *Packetbeat* diff --git a/libbeat/common/cache.go b/libbeat/common/cache.go index c06aa2427b8..d75a8471c5f 100644 --- a/libbeat/common/cache.go +++ b/libbeat/common/cache.go @@ -255,7 +255,9 @@ func (c *Cache) StartJanitor(interval time.Duration) { // StopJanitor stops the goroutine created by StartJanitor. func (c *Cache) StopJanitor() { - close(c.janitorQuit) + if c.janitorQuit != nil { + close(c.janitorQuit) + } } // get returns the non-expired values from the cache. diff --git a/libbeat/common/kubernetes/informer.go b/libbeat/common/kubernetes/informer.go index cd7fb513cf6..b1b791e948d 100644 --- a/libbeat/common/kubernetes/informer.go +++ b/libbeat/common/kubernetes/informer.go @@ -137,6 +137,18 @@ func NewInformer(client kubernetes.Interface, resource Resource, opts WatchOptio } objType = "statefulset" + case *DaemonSet: + ss := client.AppsV1().DaemonSets(opts.Namespace) + listwatch = &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return ss.List(ctx, options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return ss.Watch(ctx, options) + }, + } + + objType = "daemonset" case *Service: svc := client.CoreV1().Services(opts.Namespace) listwatch = &cache.ListWatch{ diff --git a/libbeat/common/kubernetes/types.go b/libbeat/common/kubernetes/types.go index c3d1fefb01e..8242c4351e0 100644 --- a/libbeat/common/kubernetes/types.go +++ b/libbeat/common/kubernetes/types.go @@ -70,6 +70,9 @@ type ReplicaSet = appsv1.ReplicaSet // StatefulSet data type StatefulSet = appsv1.StatefulSet +// DaemonSet data +type DaemonSet = appsv1.DaemonSet + // Service data type Service = v1.Service diff --git a/metricbeat/module/kubernetes/container/container.go b/metricbeat/module/kubernetes/container/container.go index 0c2d31eada6..5b78b3ea448 100644 --- a/metricbeat/module/kubernetes/container/container.go +++ b/metricbeat/module/kubernetes/container/container.go @@ -20,7 +20,6 @@ package container import ( "fmt" - "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/metricbeat/helper" "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/beats/v7/metricbeat/mb/parse" @@ -38,8 +37,6 @@ var ( DefaultScheme: defaultScheme, DefaultPath: defaultPath, }.Build() - - logger = logp.NewLogger("kubernetes.container") ) // init registers the MetricSet with the central registry. @@ -77,7 +74,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return &MetricSet{ BaseMetricSet: base, http: http, - enricher: util.NewContainerMetadataEnricher(base, true), + enricher: util.NewContainerMetadataEnricher(base, mod.GetPerfMetricsCache(), true), mod: mod, }, nil } @@ -95,7 +92,7 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) { return } - events, err := eventMapping(body, util.PerfMetrics) + events, err := eventMapping(body, m.mod.GetPerfMetricsCache()) if err != nil { m.Logger().Error(err) reporter.Error(err) @@ -116,8 +113,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) { return } } - - return } // Close stops this metricset diff --git a/metricbeat/module/kubernetes/container/container_test.go b/metricbeat/module/kubernetes/container/container_test.go index 30009846169..0b9a33afcb3 100644 --- a/metricbeat/module/kubernetes/container/container_test.go +++ b/metricbeat/module/kubernetes/container/container_test.go @@ -24,6 +24,7 @@ import ( "io/ioutil" "os" "testing" + "time" "github.com/stretchr/testify/assert" @@ -40,7 +41,7 @@ func TestEventMapping(t *testing.T) { body, err := ioutil.ReadAll(f) assert.NoError(t, err, "cannot read test file "+testFile) - cache := util.NewPerfMetricsCache() + cache := util.NewPerfMetricsCache(120 * time.Second) cache.NodeCoresAllocatable.Set("gke-beats-default-pool-a5b33e2e-hdww", 2) cache.NodeMemAllocatable.Set("gke-beats-default-pool-a5b33e2e-hdww", 146227200) cache.ContainerMemLimit.Set(util.ContainerUID("default", "nginx-deployment-2303442956-pcqfc", "nginx"), 14622720) diff --git a/metricbeat/module/kubernetes/kubernetes.go b/metricbeat/module/kubernetes/kubernetes.go index f9bd0e29e37..dc09f5a4a34 100644 --- a/metricbeat/module/kubernetes/kubernetes.go +++ b/metricbeat/module/kubernetes/kubernetes.go @@ -18,16 +18,17 @@ package kubernetes import ( + "fmt" "sync" "time" "github.com/mitchellh/hashstructure" - "github.com/pkg/errors" dto "github.com/prometheus/client_model/go" "github.com/elastic/beats/v7/metricbeat/helper" p "github.com/elastic/beats/v7/metricbeat/helper/prometheus" "github.com/elastic/beats/v7/metricbeat/mb" + "github.com/elastic/beats/v7/metricbeat/module/kubernetes/util" ) func init() { @@ -41,6 +42,7 @@ type Module interface { mb.Module GetStateMetricsFamilies(prometheus p.Prometheus) ([]*dto.MetricFamily, error) GetKubeletStats(http *helper.HTTP) ([]byte, error) + GetPerfMetricsCache() *util.PerfMetricsCache } type familiesCache struct { @@ -84,6 +86,7 @@ type module struct { kubeStateMetricsCache *kubeStateMetricsCache kubeletStatsCache *kubeletStatsCache + perfMetrics *util.PerfMetricsCache cacheHash uint64 } @@ -94,15 +97,25 @@ func ModuleBuilder() func(base mb.BaseModule) (mb.Module, error) { kubeletStatsCache := &kubeletStatsCache{ cacheMap: make(map[uint64]*statsCache), } + perfMetrics := util.NewPerfMetricsCache(0) return func(base mb.BaseModule) (mb.Module, error) { hash, err := generateCacheHash(base.Config().Hosts) if err != nil { - return nil, errors.Wrap(err, "error generating cache hash for kubeStateMetricsCache") + return nil, fmt.Errorf("error generating cache hash for kubeStateMetricsCache: %w", err) } + + // NOTE: `Period * 2` is an arbitrary value to make the cache NEVER to expire before the next scraping run + // if different metricsets have different periods, we will effectively set (timeout = max(Period) * 2) + minCacheExpirationTime := base.Config().Period * 2 + if perfMetrics.GetTimeout() < minCacheExpirationTime { + perfMetrics.SetOrUpdateTimeout(minCacheExpirationTime) + } + m := module{ BaseModule: base, kubeStateMetricsCache: kubeStateMetricsCache, kubeletStatsCache: kubeletStatsCache, + perfMetrics: perfMetrics, cacheHash: hash, } return &m, nil @@ -153,3 +166,7 @@ func generateCacheHash(host []string) (uint64, error) { } return id, nil } + +func (m *module) GetPerfMetricsCache() *util.PerfMetricsCache { + return m.perfMetrics +} diff --git a/metricbeat/module/kubernetes/node/node.go b/metricbeat/module/kubernetes/node/node.go index 1995b63267c..9859394b39c 100644 --- a/metricbeat/module/kubernetes/node/node.go +++ b/metricbeat/module/kubernetes/node/node.go @@ -22,7 +22,6 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/kubernetes" - "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/metricbeat/helper" "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/beats/v7/metricbeat/mb/parse" @@ -40,8 +39,6 @@ var ( DefaultScheme: defaultScheme, DefaultPath: defaultPath, }.Build() - - logger = logp.NewLogger("kubernetes.node") ) // init registers the MetricSet with the central registry. @@ -79,7 +76,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return &MetricSet{ BaseMetricSet: base, http: http, - enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Node{}, false), + enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Node{}, mod.GetPerfMetricsCache(), false), mod: mod, }, nil } @@ -115,8 +112,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) { m.Logger().Debug("error trying to emit event") return } - - return } // Close stops this metricset diff --git a/metricbeat/module/kubernetes/pod/pod.go b/metricbeat/module/kubernetes/pod/pod.go index 32c4997522e..a857a8c358c 100644 --- a/metricbeat/module/kubernetes/pod/pod.go +++ b/metricbeat/module/kubernetes/pod/pod.go @@ -21,7 +21,6 @@ import ( "fmt" "github.com/elastic/beats/v7/libbeat/common/kubernetes" - "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/metricbeat/helper" "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/beats/v7/metricbeat/mb/parse" @@ -39,8 +38,6 @@ var ( DefaultScheme: defaultScheme, DefaultPath: defaultPath, }.Build() - - logger = logp.NewLogger("kubernetes.pod") ) // init registers the MetricSet with the central registry. @@ -78,7 +75,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return &MetricSet{ BaseMetricSet: base, http: http, - enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Pod{}, true), + enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Pod{}, mod.GetPerfMetricsCache(), true), mod: mod, }, nil } @@ -96,7 +93,7 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) { return } - events, err := eventMapping(body, util.PerfMetrics) + events, err := eventMapping(body, m.mod.GetPerfMetricsCache()) if err != nil { m.Logger().Error(err) reporter.Error(err) @@ -117,7 +114,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) { return } } - return } // Close stops this metricset diff --git a/metricbeat/module/kubernetes/pod/pod_test.go b/metricbeat/module/kubernetes/pod/pod_test.go index 41340e5dede..9f7c6d2e397 100644 --- a/metricbeat/module/kubernetes/pod/pod_test.go +++ b/metricbeat/module/kubernetes/pod/pod_test.go @@ -24,6 +24,7 @@ import ( "io/ioutil" "os" "testing" + "time" "github.com/stretchr/testify/assert" @@ -40,7 +41,7 @@ func TestEventMapping(t *testing.T) { body, err := ioutil.ReadAll(f) assert.NoError(t, err, "cannot read test file "+testFile) - cache := util.NewPerfMetricsCache() + cache := util.NewPerfMetricsCache(120 * time.Second) cache.NodeCoresAllocatable.Set("gke-beats-default-pool-a5b33e2e-hdww", 2) cache.NodeMemAllocatable.Set("gke-beats-default-pool-a5b33e2e-hdww", 146227200) cache.ContainerMemLimit.Set(util.ContainerUID("default", "nginx-deployment-2303442956-pcqfc", "nginx"), 14622720) diff --git a/metricbeat/module/kubernetes/state_container/state_container.go b/metricbeat/module/kubernetes/state_container/state_container.go index 23c34bf040f..0079d16783b 100644 --- a/metricbeat/module/kubernetes/state_container/state_container.go +++ b/metricbeat/module/kubernetes/state_container/state_container.go @@ -21,8 +21,6 @@ import ( "fmt" "strings" - "github.com/pkg/errors" - "github.com/elastic/beats/v7/libbeat/common" p "github.com/elastic/beats/v7/metricbeat/helper/prometheus" "github.com/elastic/beats/v7/metricbeat/mb" @@ -34,8 +32,6 @@ import ( const ( defaultScheme = "http" defaultPath = "/metrics" - // Nanocores conversion 10^9 - nanocores = 1000000000 ) var ( @@ -121,7 +117,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return &MetricSet{ BaseMetricSet: base, prometheus: prometheus, - enricher: util.NewContainerMetadataEnricher(base, false), + enricher: util.NewContainerMetadataEnricher(base, mod.GetPerfMetricsCache(), false), mod: mod, }, nil } @@ -134,11 +130,11 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { families, err := m.mod.GetStateMetricsFamilies(m.prometheus) if err != nil { - return errors.Wrap(err, "error getting families") + return fmt.Errorf("error getting families: %w", err) } events, err := m.prometheus.ProcessMetrics(families, mapping) if err != nil { - return errors.Wrap(err, "error getting event") + return fmt.Errorf("error getting event: %w", err) } m.enricher.Enrich(events) @@ -153,15 +149,15 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { cID := (containerID).(string) split := strings.Index(cID, "://") if split != -1 { - containerFields.Put("runtime", cID[:split]) - containerFields.Put("id", cID[split+3:]) + util.ShouldPut(containerFields, "runtime", cID[:split], m.Logger()) + util.ShouldPut(containerFields, "id", cID[split+3:], m.Logger()) } } if containerImage, ok := event["image"]; ok { cImage := (containerImage).(string) - containerFields.Put("image.name", cImage) + util.ShouldPut(containerFields, "image.name", cImage, m.Logger()) // remove kubernetes.container.image field as value is the same as ECS container.image.name field - event.Delete("image") + util.ShouldDelete(event, "image", m.Logger()) } e, err := util.CreateEvent(event, "kubernetes.container") diff --git a/metricbeat/module/kubernetes/state_daemonset/state_daemonset.go b/metricbeat/module/kubernetes/state_daemonset/state_daemonset.go index 1066afa2195..05b70d3f74b 100644 --- a/metricbeat/module/kubernetes/state_daemonset/state_daemonset.go +++ b/metricbeat/module/kubernetes/state_daemonset/state_daemonset.go @@ -89,7 +89,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return &MetricSet{ BaseMetricSet: base, prometheus: prometheus, - enricher: util.NewResourceMetadataEnricher(base, &kubernetes.ReplicaSet{}, false), + enricher: util.NewResourceMetadataEnricher(base, &kubernetes.DaemonSet{}, mod.GetPerfMetricsCache(), false), mod: mod, }, nil } @@ -126,8 +126,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) { return } } - - return } // Close stops this metricset diff --git a/metricbeat/module/kubernetes/state_deployment/state_deployment.go b/metricbeat/module/kubernetes/state_deployment/state_deployment.go index 02448afdc9a..9d8c5d84e8e 100644 --- a/metricbeat/module/kubernetes/state_deployment/state_deployment.go +++ b/metricbeat/module/kubernetes/state_deployment/state_deployment.go @@ -90,7 +90,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return &MetricSet{ BaseMetricSet: base, prometheus: prometheus, - enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Deployment{}, false), + enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Deployment{}, mod.GetPerfMetricsCache(), false), mod: mod, }, nil } @@ -128,8 +128,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) { return } } - - return } // Close stops this metricset diff --git a/metricbeat/module/kubernetes/state_job/state_job.go b/metricbeat/module/kubernetes/state_job/state_job.go index 1b072eae742..0223b40887d 100644 --- a/metricbeat/module/kubernetes/state_job/state_job.go +++ b/metricbeat/module/kubernetes/state_job/state_job.go @@ -108,7 +108,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return &MetricSet{ BaseMetricSet: base, prometheus: prometheus, - enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Job{}, false), + enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Job{}, mod.GetPerfMetricsCache(), false), mod: mod, }, nil } @@ -145,8 +145,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) { return } } - - return } // Close stops this metricset diff --git a/metricbeat/module/kubernetes/state_node/state_node.go b/metricbeat/module/kubernetes/state_node/state_node.go index 99d1f351367..a0e70dc13a4 100644 --- a/metricbeat/module/kubernetes/state_node/state_node.go +++ b/metricbeat/module/kubernetes/state_node/state_node.go @@ -114,7 +114,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return &MetricSet{ BaseMetricSet: base, prometheus: prometheus, - enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Node{}, false), + enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Node{}, mod.GetPerfMetricsCache(), false), mod: mod, }, nil } @@ -151,8 +151,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) { return } } - - return } // Close stops this metricset diff --git a/metricbeat/module/kubernetes/state_pod/state_pod.go b/metricbeat/module/kubernetes/state_pod/state_pod.go index ea742ab26bd..09d9184af1c 100644 --- a/metricbeat/module/kubernetes/state_pod/state_pod.go +++ b/metricbeat/module/kubernetes/state_pod/state_pod.go @@ -92,7 +92,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return &MetricSet{ BaseMetricSet: base, prometheus: prometheus, - enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Pod{}, false), + enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Pod{}, mod.GetPerfMetricsCache(), false), mod: mod, }, nil } @@ -130,8 +130,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) { return } } - - return } // Close stops this metricset diff --git a/metricbeat/module/kubernetes/state_replicaset/state_replicaset.go b/metricbeat/module/kubernetes/state_replicaset/state_replicaset.go index 7ee8e62864a..a2622dec883 100644 --- a/metricbeat/module/kubernetes/state_replicaset/state_replicaset.go +++ b/metricbeat/module/kubernetes/state_replicaset/state_replicaset.go @@ -90,7 +90,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return &MetricSet{ BaseMetricSet: base, prometheus: prometheus, - enricher: util.NewResourceMetadataEnricher(base, &kubernetes.ReplicaSet{}, false), + enricher: util.NewResourceMetadataEnricher(base, &kubernetes.ReplicaSet{}, mod.GetPerfMetricsCache(), false), mod: mod, }, nil } @@ -127,8 +127,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) { return } } - - return } // Close stops this metricset diff --git a/metricbeat/module/kubernetes/state_resourcequota/state_resourcequota.go b/metricbeat/module/kubernetes/state_resourcequota/state_resourcequota.go index eb8bc6ddf35..f4a95a6d25b 100644 --- a/metricbeat/module/kubernetes/state_resourcequota/state_resourcequota.go +++ b/metricbeat/module/kubernetes/state_resourcequota/state_resourcequota.go @@ -97,5 +97,4 @@ func (m *ResourceQuotaMetricSet) Fetch(reporter mb.ReporterV2) { return } } - return } diff --git a/metricbeat/module/kubernetes/state_service/state_service.go b/metricbeat/module/kubernetes/state_service/state_service.go index 6794c36c44f..e2ff41fea85 100644 --- a/metricbeat/module/kubernetes/state_service/state_service.go +++ b/metricbeat/module/kubernetes/state_service/state_service.go @@ -86,7 +86,7 @@ func NewServiceMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) { "hostname": p.Label("ingress_hostname"), }, }, - enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Service{}, false), + enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Service{}, mod.GetPerfMetricsCache(), false), }, nil } @@ -122,7 +122,6 @@ func (m *ServiceMetricSet) Fetch(reporter mb.ReporterV2) { return } } - return } // Close stops this metricset diff --git a/metricbeat/module/kubernetes/state_statefulset/state_statefulset.go b/metricbeat/module/kubernetes/state_statefulset/state_statefulset.go index c2c02195206..807e53fe696 100644 --- a/metricbeat/module/kubernetes/state_statefulset/state_statefulset.go +++ b/metricbeat/module/kubernetes/state_statefulset/state_statefulset.go @@ -90,7 +90,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return &MetricSet{ BaseMetricSet: base, prometheus: prometheus, - enricher: util.NewResourceMetadataEnricher(base, &kubernetes.StatefulSet{}, false), + enricher: util.NewResourceMetadataEnricher(base, &kubernetes.StatefulSet{}, mod.GetPerfMetricsCache(), false), mod: mod, }, nil } @@ -127,8 +127,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) { return } } - - return } // Close stops this metricset diff --git a/metricbeat/module/kubernetes/state_storageclass/state_storageclass.go b/metricbeat/module/kubernetes/state_storageclass/state_storageclass.go index 820a052a05a..5a0ce3d0109 100644 --- a/metricbeat/module/kubernetes/state_storageclass/state_storageclass.go +++ b/metricbeat/module/kubernetes/state_storageclass/state_storageclass.go @@ -99,5 +99,4 @@ func (m *StorageClassMetricSet) Fetch(reporter mb.ReporterV2) { return } } - return } diff --git a/metricbeat/module/kubernetes/util/kubernetes.go b/metricbeat/module/kubernetes/util/kubernetes.go index c037c327526..146df64a007 100644 --- a/metricbeat/module/kubernetes/util/kubernetes.go +++ b/metricbeat/module/kubernetes/util/kubernetes.go @@ -117,6 +117,7 @@ func GetWatcher(base mb.BaseMetricSet, resource kubernetes.Resource, nodeScope b func NewResourceMetadataEnricher( base mb.BaseMetricSet, res kubernetes.Resource, + perfMetrics *PerfMetricsCache, nodeScope bool) Enricher { watcher, err := GetWatcher(base, res, nodeScope) @@ -156,12 +157,12 @@ func NewResourceMetadataEnricher( name := r.GetObjectMeta().GetName() if cpu, ok := r.Status.Capacity["cpu"]; ok { if q, err := resource.ParseQuantity(cpu.String()); err == nil { - PerfMetrics.NodeCoresAllocatable.Set(name, float64(q.MilliValue())/1000) + perfMetrics.NodeCoresAllocatable.Set(name, float64(q.MilliValue())/1000) } } if memory, ok := r.Status.Capacity["memory"]; ok { if q, err := resource.ParseQuantity(memory.String()); err == nil { - PerfMetrics.NodeMemAllocatable.Set(name, float64(q.Value())) + perfMetrics.NodeMemAllocatable.Set(name, float64(q.Value())) } } @@ -179,6 +180,8 @@ func NewResourceMetadataEnricher( m[id] = metaGen.Generate("namespace", r) case *kubernetes.ReplicaSet: m[id] = metaGen.Generate("replicaset", r) + case *kubernetes.DaemonSet: + m[id] = metaGen.Generate("daemonset", r) default: m[id] = metaGen.Generate(r.GetObjectKind().GroupVersionKind().Kind, r) } @@ -207,6 +210,7 @@ func NewResourceMetadataEnricher( // NewContainerMetadataEnricher returns an Enricher configured for container events func NewContainerMetadataEnricher( base mb.BaseMetricSet, + perfMetrics *PerfMetricsCache, nodeScope bool) Enricher { watcher, err := GetWatcher(base, &kubernetes.Pod{}, nodeScope) @@ -241,12 +245,12 @@ func NewContainerMetadataEnricher( // Report container limits to PerfMetrics cache if cpu, ok := container.Resources.Limits["cpu"]; ok { if q, err := resource.ParseQuantity(cpu.String()); err == nil { - PerfMetrics.ContainerCoresLimit.Set(cuid, float64(q.MilliValue())/1000) + perfMetrics.ContainerCoresLimit.Set(cuid, float64(q.MilliValue())/1000) } } if memory, ok := container.Resources.Limits["memory"]; ok { if q, err := resource.ParseQuantity(memory.String()); err == nil { - PerfMetrics.ContainerMemLimit.Set(cuid, float64(q.Value())) + perfMetrics.ContainerMemLimit.Set(cuid, float64(q.Value())) } } @@ -364,7 +368,10 @@ func (m *enricher) Enrich(events []common.MapStr) { delete(k8sMeta, "pod") } ecsMeta := meta.Clone() - ecsMeta.Delete("kubernetes") + err = ecsMeta.Delete("kubernetes") + if err != nil { + logp.Debug("kubernetes", "Failed to delete field '%s': %s", "kubernetes", err) + } event.DeepUpdate(common.MapStr{ mb.ModuleDataKey: k8sMeta, "meta": ecsMeta, @@ -412,3 +419,17 @@ func CreateEvent(event common.MapStr, namespace string) (mb.Event, error) { } return e, err } + +func ShouldPut(event common.MapStr, field string, value interface{}, logger *logp.Logger) { + _, err := event.Put(field, value) + if err != nil { + logger.Debugf("Failed to put field '%s' with value '%s': %s", field, value, err) + } +} + +func ShouldDelete(event common.MapStr, field string, logger *logp.Logger) { + err := event.Delete(field) + if err != nil { + logger.Debugf("Failed to delete field '%s': %s", field, err) + } +} diff --git a/metricbeat/module/kubernetes/util/metrics_cache.go b/metricbeat/module/kubernetes/util/metrics_cache.go index 3fdf59d8b16..60bde73336e 100644 --- a/metricbeat/module/kubernetes/util/metrics_cache.go +++ b/metricbeat/module/kubernetes/util/metrics_cache.go @@ -23,23 +23,14 @@ import ( "github.com/elastic/beats/v7/libbeat/common" ) -// PerfMetrics stores known metrics from Kubernetes nodes and containers -var PerfMetrics = NewPerfMetricsCache() - -func init() { - PerfMetrics.Start() -} - -const defaultTimeout = 120 * time.Second - // NewPerfMetricsCache initializes and returns a new PerfMetricsCache -func NewPerfMetricsCache() *PerfMetricsCache { +func NewPerfMetricsCache(timeout time.Duration) *PerfMetricsCache { return &PerfMetricsCache{ - NodeMemAllocatable: newValueMap(defaultTimeout), - NodeCoresAllocatable: newValueMap(defaultTimeout), + NodeMemAllocatable: newValueMap(timeout), + NodeCoresAllocatable: newValueMap(timeout), - ContainerMemLimit: newValueMap(defaultTimeout), - ContainerCoresLimit: newValueMap(defaultTimeout), + ContainerMemLimit: newValueMap(timeout), + ContainerCoresLimit: newValueMap(timeout), } } @@ -68,6 +59,43 @@ func (c *PerfMetricsCache) Stop() { c.ContainerCoresLimit.Stop() } +// Returns the maximum timeout of all the caches under PerfMetricsCache +func (c *PerfMetricsCache) GetTimeout() time.Duration { + var ans time.Duration = 0 + + nmATimeout := c.NodeMemAllocatable.GetTimeout() + if nmATimeout > ans { + ans = nmATimeout + } + + ncATimeout := c.NodeCoresAllocatable.GetTimeout() + if ncATimeout > ans { + ans = ncATimeout + } + + cmLTimeout := c.ContainerMemLimit.GetTimeout() + if cmLTimeout > ans { + ans = cmLTimeout + } + + ccLTimeout := c.ContainerCoresLimit.GetTimeout() + if ccLTimeout > ans { + ans = ccLTimeout + } + return ans +} + +// Set the timeout of all the caches under PerfMetricsCache, then Stop and Start all the cache janitors +func (c *PerfMetricsCache) SetOrUpdateTimeout(timeout time.Duration) { + c.NodeMemAllocatable.SetTimeout(timeout) + c.NodeCoresAllocatable.SetTimeout(timeout) + c.ContainerMemLimit.SetTimeout(timeout) + c.ContainerCoresLimit.SetTimeout(timeout) + + c.Stop() + c.Start() +} + type valueMap struct { cache *common.Cache timeout time.Duration @@ -109,6 +137,14 @@ func (m *valueMap) Stop() { m.cache.StopJanitor() } +func (m *valueMap) GetTimeout() time.Duration { + return m.timeout +} + +func (m *valueMap) SetTimeout(timeout time.Duration) { + m.timeout = timeout +} + // ContainerUID creates an unique ID for from namespace, pod name and container name func ContainerUID(namespace, pod, container string) string { return namespace + "/" + pod + "/" + container diff --git a/metricbeat/module/kubernetes/util/metrics_cache_test.go b/metricbeat/module/kubernetes/util/metrics_cache_test.go index 649c1f5fb86..07b447c5fe7 100644 --- a/metricbeat/module/kubernetes/util/metrics_cache_test.go +++ b/metricbeat/module/kubernetes/util/metrics_cache_test.go @@ -19,12 +19,13 @@ package util import ( "testing" + "time" "github.com/stretchr/testify/assert" ) func TestValueMap(t *testing.T) { - test := newValueMap(defaultTimeout) + test := newValueMap(120 * time.Second) // no value assert.Equal(t, 0.0, test.Get("foo")) @@ -35,7 +36,7 @@ func TestValueMap(t *testing.T) { } func TestGetWithDefault(t *testing.T) { - test := newValueMap(defaultTimeout) + test := newValueMap(120 * time.Second) // Empty + default assert.Equal(t, 0.0, test.Get("foo"))