diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index fbef98864e66..7173d86e206d 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -371,6 +371,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Register additional name for `storage` metricset in the azure module. {pull}28447[28447] - Update reference to gosigar pacakge for filesystem windows fix. {pull}28909[28909] - Override `Host()` on statsd MetricSet {pull}29103[29103] +- Add `add_resource_metadata` configuration to Kubernetes module. {pull}29133[29133] *Packetbeat* diff --git a/libbeat/autodiscover/providers/kubernetes/pod.go b/libbeat/autodiscover/providers/kubernetes/pod.go index 2e637cda41dd..945e39467b56 100644 --- a/libbeat/autodiscover/providers/kubernetes/pod.go +++ b/libbeat/autodiscover/providers/kubernetes/pod.go @@ -100,9 +100,6 @@ func NewPodEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, pub options.Namespace = config.Namespace } metaConf := config.AddResourceMetadata - if metaConf == nil { - metaConf = metadata.GetDefaultResourceMetadataConfig() - } nodeWatcher, err := kubernetes.NewNamedWatcher("node", client, &kubernetes.Node{}, options, nil) if err != nil { logger.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Node{}, err) diff --git a/libbeat/autodiscover/providers/kubernetes/pod_test.go b/libbeat/autodiscover/providers/kubernetes/pod_test.go index ed24f4554169..5388eeb4e6dc 100644 --- a/libbeat/autodiscover/providers/kubernetes/pod_test.go +++ b/libbeat/autodiscover/providers/kubernetes/pod_test.go @@ -1907,7 +1907,7 @@ func TestPod_EmitEvent(t *testing.T) { t.Fatal(err) } - metaGen := metadata.NewPodMetadataGenerator(common.NewConfig(), nil, client, nil, nil, true) + metaGen := metadata.NewPodMetadataGenerator(common.NewConfig(), nil, client, nil, nil, nil) p := &Provider{ config: defaultConfig(), bus: bus.New(logp.NewLogger("bus"), "test"), diff --git a/libbeat/common/kubernetes/metadata/metadata.go b/libbeat/common/kubernetes/metadata/metadata.go index 7195d47ab0c4..eb55856081b5 100644 --- a/libbeat/common/kubernetes/metadata/metadata.go +++ b/libbeat/common/kubernetes/metadata/metadata.go @@ -96,8 +96,7 @@ func GetPodMetaGen( if namespaceWatcher != nil && metaConf.Namespace.Enabled() { namespaceMetaGen = NewNamespaceMetadataGenerator(metaConf.Namespace, namespaceWatcher.Store(), namespaceWatcher.Client()) } - metaGen := NewPodMetadataGenerator(cfg, podWatcher.Store(), podWatcher.Client(), nodeMetaGen, namespaceMetaGen, metaConf.Deployment) - + metaGen := NewPodMetadataGenerator(cfg, podWatcher.Store(), podWatcher.Client(), nodeMetaGen, namespaceMetaGen, metaConf) return metaGen } diff --git a/libbeat/common/kubernetes/metadata/namespace_test.go b/libbeat/common/kubernetes/metadata/namespace_test.go index 65ae39d8f5f4..88cc7859cdd7 100644 --- a/libbeat/common/kubernetes/metadata/namespace_test.go +++ b/libbeat/common/kubernetes/metadata/namespace_test.go @@ -51,9 +51,11 @@ func TestNamespace_Generate(t *testing.T) { UID: types.UID(uid), Labels: map[string]string{ "foo": "bar", + "key": "value", }, Annotations: map[string]string{ "spam": "baz", + "key": "value", }, }, TypeMeta: metav1.TypeMeta{ @@ -75,6 +77,7 @@ func TestNamespace_Generate(t *testing.T) { } cfg, err := common.NewConfigFrom(Config{ + IncludeLabels: []string{"foo"}, IncludeAnnotations: []string{"spam"}, }) if err != nil { diff --git a/libbeat/common/kubernetes/metadata/pod.go b/libbeat/common/kubernetes/metadata/pod.go index 5ec63292b6c8..ec6328aee968 100644 --- a/libbeat/common/kubernetes/metadata/pod.go +++ b/libbeat/common/kubernetes/metadata/pod.go @@ -29,12 +29,12 @@ import ( ) type pod struct { - store cache.Store - client k8s.Interface - node MetaGen - namespace MetaGen - resource *Resource - addDeployment bool + store cache.Store + client k8s.Interface + node MetaGen + namespace MetaGen + resource *Resource + addResourceMetadata *AddResourceMetadataConfig } // NewPodMetadataGenerator creates a metagen for pod resources @@ -44,14 +44,19 @@ func NewPodMetadataGenerator( client k8s.Interface, node MetaGen, namespace MetaGen, - addDeploymentMeta bool) MetaGen { + addResourceMetadata *AddResourceMetadataConfig) MetaGen { + + if addResourceMetadata == nil { + addResourceMetadata = GetDefaultResourceMetadataConfig() + } + return &pod{ - resource: NewResourceMetadataGenerator(cfg, client), - store: pods, - node: node, - namespace: namespace, - client: client, - addDeployment: addDeploymentMeta, + resource: NewResourceMetadataGenerator(cfg, client), + store: pods, + node: node, + namespace: namespace, + client: client, + addResourceMetadata: addResourceMetadata, } } @@ -87,7 +92,7 @@ func (p *pod) GenerateK8s(obj kubernetes.Resource, opts ...FieldOptions) common. out := p.resource.GenerateK8s("pod", obj, opts...) // check if Pod is handled by a ReplicaSet which is controlled by a Deployment - if p.addDeployment { + if p.addResourceMetadata.Deployment { rsName, _ := out.GetValue("replicaset.name") if rsName, ok := rsName.(string); ok { dep := p.getRSDeployment(rsName, po.GetNamespace()) diff --git a/libbeat/common/kubernetes/metadata/pod_test.go b/libbeat/common/kubernetes/metadata/pod_test.go index 12e2da4fd3c9..a780d3d2daf4 100644 --- a/libbeat/common/kubernetes/metadata/pod_test.go +++ b/libbeat/common/kubernetes/metadata/pod_test.go @@ -374,7 +374,7 @@ func TestPod_Generate(t *testing.T) { }) assert.NoError(t, err) - metagen := NewPodMetadataGenerator(config, nil, client, nil, nil, true) + metagen := NewPodMetadataGenerator(config, nil, client, nil, nil, nil) for _, test := range tests { t.Run(test.name, func(t *testing.T) { assert.Equal(t, test.output, metagen.Generate(test.input)) @@ -496,7 +496,7 @@ func TestPod_GenerateFromName(t *testing.T) { assert.NoError(t, err) pods := cache.NewStore(cache.MetaNamespaceKeyFunc) pods.Add(test.input) - metagen := NewPodMetadataGenerator(config, pods, client, nil, nil, true) + metagen := NewPodMetadataGenerator(config, pods, client, nil, nil, nil) accessor, err := meta.Accessor(test.input) require.NoError(t, err) @@ -618,7 +618,156 @@ func TestPod_GenerateWithNodeNamespace(t *testing.T) { namespaces.Add(test.namespace) nsMeta := NewNamespaceMetadataGenerator(config, namespaces, client) - metagen := NewPodMetadataGenerator(config, pods, client, nodeMeta, nsMeta, true) + metagen := NewPodMetadataGenerator(config, pods, client, nodeMeta, nsMeta, nil) + t.Run(test.name, func(t *testing.T) { + assert.Equal(t, test.output, metagen.Generate(test.input)) + }) + } +} + +func TestPod_GenerateWithNodeNamespaceWithAddResourceConfig(t *testing.T) { + client := k8sfake.NewSimpleClientset() + uid := "005f3b90-4b9d-12f8-acf0-31020a840133" + namespace := "default" + name := "obj" + boolean := true + + tests := []struct { + input kubernetes.Resource + node kubernetes.Resource + namespace kubernetes.Resource + output common.MapStr + name string + }{ + { + name: "test simple object", + input: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + UID: types.UID(uid), + Namespace: namespace, + Labels: map[string]string{ + "app.kubernetes.io/component": "exporter", + }, + Annotations: map[string]string{ + "app": "production", + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps", + Kind: "ReplicaSet", + Name: "nginx-rs", + UID: "005f3b90-4b9d-12f8-acf0-31020a8409087", + Controller: &boolean, + }, + }, + }, + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + + Spec: v1.PodSpec{ + NodeName: "testnode", + }, + Status: v1.PodStatus{PodIP: "127.0.0.5"}, + }, + node: &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testnode", + UID: types.UID(uid), + Labels: map[string]string{ + "nodekey": "nodevalue", + "nodekey2": "nodevalue2", + }, + Annotations: map[string]string{}, + }, + TypeMeta: metav1.TypeMeta{ + Kind: "Node", + APIVersion: "v1", + }, + Status: v1.NodeStatus{ + Addresses: []v1.NodeAddress{{Type: v1.NodeHostName, Address: "node1"}}, + }, + }, + namespace: &v1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: namespace, + UID: types.UID(uid), + Labels: map[string]string{ + "app.kubernetes.io/name": "kube-state-metrics", + "nskey2": "nsvalue2", + }, + Annotations: map[string]string{}, + }, + TypeMeta: metav1.TypeMeta{ + Kind: "Namespace", + APIVersion: "v1", + }, + }, + output: common.MapStr{"kubernetes": common.MapStr{ + "pod": common.MapStr{ + "name": "obj", + "uid": uid, + "ip": "127.0.0.5", + }, + "namespace": "default", + "namespace_uid": uid, + "namespace_labels": common.MapStr{ + "app_kubernetes_io/name": "kube-state-metrics", + }, + "node": common.MapStr{ + "name": "testnode", + "uid": uid, + "labels": common.MapStr{ + "nodekey2": "nodevalue2", + }, + "hostname": "node1", + }, + "labels": common.MapStr{ + "app_kubernetes_io/component": "exporter", + }, + "annotations": common.MapStr{ + "app": "production", + }, + "replicaset": common.MapStr{ + "name": "nginx-rs", + }, + }}, + }, + } + + for _, test := range tests { + config, err := common.NewConfigFrom(map[string]interface{}{ + "include_annotations": []string{"app"}, + }) + + assert.NoError(t, err) + + namespaceConfig, _ := common.NewConfigFrom(map[string]interface{}{ + "include_labels": []string{"app.kubernetes.io/name"}, + }) + nodeConfig, _ := common.NewConfigFrom(map[string]interface{}{ + "include_labels": []string{"nodekey2"}, + }) + metaConfig := AddResourceMetadataConfig{ + Namespace: namespaceConfig, + Node: nodeConfig, + Deployment: false, + } + + pods := cache.NewStore(cache.MetaNamespaceKeyFunc) + pods.Add(test.input) + + nodes := cache.NewStore(cache.MetaNamespaceKeyFunc) + nodes.Add(test.node) + nodeMeta := NewNodeMetadataGenerator(nodeConfig, nodes, client) + + namespaces := cache.NewStore(cache.MetaNamespaceKeyFunc) + namespaces.Add(test.namespace) + nsMeta := NewNamespaceMetadataGenerator(namespaceConfig, namespaces, client) + + metagen := NewPodMetadataGenerator(config, pods, client, nodeMeta, nsMeta, &metaConfig) t.Run(test.name, func(t *testing.T) { assert.Equal(t, test.output, metagen.Generate(test.input)) }) diff --git a/libbeat/common/kubernetes/metadata/resource.go b/libbeat/common/kubernetes/metadata/resource.go index b15e39b3775c..152c2be37a3b 100644 --- a/libbeat/common/kubernetes/metadata/resource.go +++ b/libbeat/common/kubernetes/metadata/resource.go @@ -85,7 +85,7 @@ func (r *Resource) GenerateK8s(kind string, obj kubernetes.Resource, options ... return nil } - labelMap := common.MapStr{} + var labelMap common.MapStr if len(r.config.IncludeLabels) == 0 { labelMap = GenerateMap(accessor.GetLabels(), r.config.LabelsDedot) } else { diff --git a/libbeat/processors/add_kubernetes_metadata/indexers_test.go b/libbeat/processors/add_kubernetes_metadata/indexers_test.go index 853345c8bd57..51ba288974c6 100644 --- a/libbeat/processors/add_kubernetes_metadata/indexers_test.go +++ b/libbeat/processors/add_kubernetes_metadata/indexers_test.go @@ -32,7 +32,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common/kubernetes" ) -var metagen = metadata.NewPodMetadataGenerator(common.NewConfig(), nil, nil, nil, nil, true) +var metagen = metadata.NewPodMetadataGenerator(common.NewConfig(), nil, nil, nil, nil, nil) func TestPodIndexer(t *testing.T) { var testConfig = common.NewConfig() @@ -90,7 +90,7 @@ func TestPodIndexer(t *testing.T) { func TestPodUIDIndexer(t *testing.T) { var testConfig = common.NewConfig() - metaGenWithPodUID := metadata.NewPodMetadataGenerator(common.NewConfig(), nil, nil, nil, nil, true) + metaGenWithPodUID := metadata.NewPodMetadataGenerator(common.NewConfig(), nil, nil, nil, nil, nil) podUIDIndexer, err := NewPodUIDIndexer(*testConfig, metaGenWithPodUID) assert.NoError(t, err) @@ -301,7 +301,7 @@ func TestFilteredGenMeta(t *testing.T) { }) assert.NoError(t, err) - filteredGen := metadata.NewPodMetadataGenerator(config, nil, nil, nil, nil, true) + filteredGen := metadata.NewPodMetadataGenerator(config, nil, nil, nil, nil, nil) podIndexer, err = NewPodNameIndexer(*testConfig, filteredGen) assert.NoError(t, err) @@ -338,7 +338,7 @@ func TestFilteredGenMetaExclusion(t *testing.T) { }) assert.NoError(t, err) - filteredGen := metadata.NewPodMetadataGenerator(config, nil, nil, nil, nil, true) + filteredGen := metadata.NewPodMetadataGenerator(config, nil, nil, nil, nil, nil) podIndexer, err := NewPodNameIndexer(*testConfig, filteredGen) assert.NoError(t, err) diff --git a/libbeat/processors/add_kubernetes_metadata/kubernetes.go b/libbeat/processors/add_kubernetes_metadata/kubernetes.go index 2255bed3f6bd..a4b40b9655d9 100644 --- a/libbeat/processors/add_kubernetes_metadata/kubernetes.go +++ b/libbeat/processors/add_kubernetes_metadata/kubernetes.go @@ -191,9 +191,6 @@ func (k *kubernetesAnnotator) init(config kubeAnnotatorConfig, cfg *common.Confi } metaConf := config.AddResourceMetadata - if metaConf == nil { - metaConf = metadata.GetDefaultResourceMetadataConfig() - } options := kubernetes.WatchOptions{ SyncTimeout: config.SyncPeriod, diff --git a/metricbeat/docs/modules/kubernetes.asciidoc b/metricbeat/docs/modules/kubernetes.asciidoc index e7b82455189b..52f0b07a75fc 100644 --- a/metricbeat/docs/modules/kubernetes.asciidoc +++ b/metricbeat/docs/modules/kubernetes.asciidoc @@ -222,6 +222,16 @@ metricbeat.modules: # If kube_config is not set, KUBECONFIG environment variable will be checked # and if not present it will fall back to InCluster #kube_config: ~/.kube/config + # To configure additionally node and namespace metadata, added to pod, service and container resource types, + # `add_resource_metadata` can be defined. + # By default all labels will be included while annotations are not added by default. + # add_resource_metadata: + # namespace: + # include_labels: ["namespacelabel1"] + # node: + # include_labels: ["nodelabel2"] + # include_annotations: ["nodeannotation1"] + # deployment: false # Kubernetes client QPS and burst can be configured additionally #kube_client_options: # qps: 5 @@ -256,6 +266,16 @@ metricbeat.modules: # If kube_config is not set, KUBECONFIG environment variable will be checked # and if not present it will fall back to InCluster #kube_config: ~/.kube/config + # To configure additionally node and namespace metadata, added to pod, service and container resource types, + # `add_resource_metadata` can be defined. + # By default all labels will be included while annotations are not added by default. + # add_resource_metadata: + # namespace: + # include_labels: ["namespacelabel1"] + # node: + # include_labels: ["nodelabel2"] + # include_annotations: ["nodeannotation1"] + # deployment: false # Kubernetes client QPS and burst can be configured additionally #kube_client_options: # qps: 5 diff --git a/metricbeat/metricbeat.reference.yml b/metricbeat/metricbeat.reference.yml index 7673d7b01b50..c036073f011a 100644 --- a/metricbeat/metricbeat.reference.yml +++ b/metricbeat/metricbeat.reference.yml @@ -500,6 +500,16 @@ metricbeat.modules: # If kube_config is not set, KUBECONFIG environment variable will be checked # and if not present it will fall back to InCluster #kube_config: ~/.kube/config + # To configure additionally node and namespace metadata, added to pod, service and container resource types, + # `add_resource_metadata` can be defined. + # By default all labels will be included while annotations are not added by default. + # add_resource_metadata: + # namespace: + # include_labels: ["namespacelabel1"] + # node: + # include_labels: ["nodelabel2"] + # include_annotations: ["nodeannotation1"] + # deployment: false # Kubernetes client QPS and burst can be configured additionally #kube_client_options: # qps: 5 @@ -534,6 +544,16 @@ metricbeat.modules: # If kube_config is not set, KUBECONFIG environment variable will be checked # and if not present it will fall back to InCluster #kube_config: ~/.kube/config + # To configure additionally node and namespace metadata, added to pod, service and container resource types, + # `add_resource_metadata` can be defined. + # By default all labels will be included while annotations are not added by default. + # add_resource_metadata: + # namespace: + # include_labels: ["namespacelabel1"] + # node: + # include_labels: ["nodelabel2"] + # include_annotations: ["nodeannotation1"] + # deployment: false # Kubernetes client QPS and burst can be configured additionally #kube_client_options: # qps: 5 diff --git a/metricbeat/module/kubernetes/_meta/config.reference.yml b/metricbeat/module/kubernetes/_meta/config.reference.yml index 09fb14662a21..2a9f4601f869 100644 --- a/metricbeat/module/kubernetes/_meta/config.reference.yml +++ b/metricbeat/module/kubernetes/_meta/config.reference.yml @@ -23,6 +23,16 @@ # If kube_config is not set, KUBECONFIG environment variable will be checked # and if not present it will fall back to InCluster #kube_config: ~/.kube/config + # To configure additionally node and namespace metadata, added to pod, service and container resource types, + # `add_resource_metadata` can be defined. + # By default all labels will be included while annotations are not added by default. + # add_resource_metadata: + # namespace: + # include_labels: ["namespacelabel1"] + # node: + # include_labels: ["nodelabel2"] + # include_annotations: ["nodeannotation1"] + # deployment: false # Kubernetes client QPS and burst can be configured additionally #kube_client_options: # qps: 5 @@ -57,6 +67,16 @@ # If kube_config is not set, KUBECONFIG environment variable will be checked # and if not present it will fall back to InCluster #kube_config: ~/.kube/config + # To configure additionally node and namespace metadata, added to pod, service and container resource types, + # `add_resource_metadata` can be defined. + # By default all labels will be included while annotations are not added by default. + # add_resource_metadata: + # namespace: + # include_labels: ["namespacelabel1"] + # node: + # include_labels: ["nodelabel2"] + # include_annotations: ["nodeannotation1"] + # deployment: false # Kubernetes client QPS and burst can be configured additionally #kube_client_options: # qps: 5 diff --git a/metricbeat/module/kubernetes/_meta/config.yml b/metricbeat/module/kubernetes/_meta/config.yml index dc9b3697ead8..94b0a00427fe 100644 --- a/metricbeat/module/kubernetes/_meta/config.yml +++ b/metricbeat/module/kubernetes/_meta/config.yml @@ -22,6 +22,16 @@ #host: node_name # If kube_config is not set, KUBECONFIG environment variable will be checked # and if not present it will fall back to InCluster + # To configure additionally node and namespace metadata, added to pod, service and container resource types, + # `add_resource_metadata` can be defined. + # By default all labels will be included while annotations are not added by default. + # add_resource_metadata: + # namespace: + # include_labels: ["namespacelabel1"] + # node: + # include_labels: ["nodelabel2"] + # include_annotations: ["nodeannotation1"] + # deployment: false #kube_config: ~/.kube/config # Kubernetes client QPS and burst can be configured additionally #kube_client_options: diff --git a/metricbeat/module/kubernetes/util/kubernetes.go b/metricbeat/module/kubernetes/util/kubernetes.go index 6f4b7c44d281..b173c6de3600 100644 --- a/metricbeat/module/kubernetes/util/kubernetes.go +++ b/metricbeat/module/kubernetes/util/kubernetes.go @@ -48,12 +48,15 @@ type Enricher interface { } type kubernetesConfig struct { - // AddMetadata enables enriching metricset events with metadata from the API server - AddMetadata bool `config:"add_metadata"` KubeConfig string `config:"kube_config"` - Host string `config:"host"` - SyncPeriod time.Duration `config:"sync_period"` KubeClientOptions kubernetes.KubeClientOptions `config:"kube_client_options"` + + Host string `config:"host"` + SyncPeriod time.Duration `config:"sync_period"` + + // AddMetadata enables enriching metricset events with metadata from the API server + AddMetadata bool `config:"add_metadata"` + AddResourceMetadata *metadata.AddResourceMetadataConfig `config:"add_resource_metadata"` } type enricher struct { @@ -63,90 +66,45 @@ type enricher struct { watcher kubernetes.Watcher watcherStarted bool watcherStartedLock sync.Mutex + namespaceWatcher kubernetes.Watcher + nodeWatcher kubernetes.Watcher isPod bool } const selector = "kubernetes" -// GetWatcher initializes a kubernetes watcher with the given -// scope (node or cluster), and resource type -func GetWatcher(base mb.BaseMetricSet, resource kubernetes.Resource, nodeScope bool) (kubernetes.Watcher, error) { - return GetNamedWatcher("", base, resource, nodeScope) -} - -func GetNamedWatcher(name string, base mb.BaseMetricSet, resource kubernetes.Resource, nodeScope bool) (kubernetes.Watcher, error) { - config := kubernetesConfig{ - AddMetadata: true, - SyncPeriod: time.Minute * 10, - } - if err := base.Module().UnpackConfig(&config); err != nil { - return nil, err - } - - // Return nil if metadata enriching is disabled: - if !config.AddMetadata { - return nil, nil - } - - client, err := kubernetes.GetKubernetesClient(config.KubeConfig, config.KubeClientOptions) - if err != nil { - return nil, err - } - - options := kubernetes.WatchOptions{ - SyncTimeout: config.SyncPeriod, - } - - log := logp.NewLogger(selector) - - // Watch objects in the node only - if nodeScope { - nd := &kubernetes.DiscoverKubernetesNodeParams{ - ConfigHost: config.Host, - Client: client, - IsInCluster: kubernetes.IsInCluster(config.KubeConfig), - HostUtils: &kubernetes.DefaultDiscoveryUtils{}, - } - options.Node, err = kubernetes.DiscoverKubernetesNode(log, nd) - if err != nil { - return nil, fmt.Errorf("couldn't discover kubernetes node: %w", err) - } - } - - log.Debugf("Initializing a new Kubernetes watcher using host: %v", config.Host) - - return kubernetes.NewNamedWatcher(name, client, resource, options, nil) -} - // NewResourceMetadataEnricher returns an Enricher configured for kubernetes resource events func NewResourceMetadataEnricher( base mb.BaseMetricSet, res kubernetes.Resource, nodeScope bool) Enricher { - watcher, err := GetNamedWatcher("resource_metadata_enricher", base, res, nodeScope) - if err != nil { - logp.Err("Error initializing Kubernetes metadata enricher: %s", err) + config := validatedConfig(base) + if config == nil { + logp.Info("Kubernetes metricset enriching is disabled") return &nilEnricher{} } + watcher, nodeWatcher, namespaceWatcher := getResourceMetadataWatchers(config, res, nodeScope) + if watcher == nil { - logp.Info("Kubernetes metricset enriching is disabled") return &nilEnricher{} } - metaConfig := metadata.Config{} - if err := base.Module().UnpackConfig(&metaConfig); err != nil { + // GetPodMetaGen requires cfg of type Config + commonMetaConfig := metadata.Config{} + if err := base.Module().UnpackConfig(&commonMetaConfig); err != nil { logp.Err("Error initializing Kubernetes metadata enricher: %s", err) return &nilEnricher{} } - - cfg, _ := common.NewConfigFrom(&metaConfig) + cfg, _ := common.NewConfigFrom(&commonMetaConfig) metaGen := metadata.NewResourceMetadataGenerator(cfg, watcher.Client()) - podMetaGen := metadata.NewPodMetadataGenerator(cfg, nil, watcher.Client(), nil, nil, true) - serviceMetaGen := metadata.NewServiceMetadataGenerator(cfg, nil, nil, watcher.Client()) - enricher := buildMetadataEnricher(watcher, + podMetaGen := metadata.GetPodMetaGen(cfg, watcher, nodeWatcher, namespaceWatcher, config.AddResourceMetadata) + + namespaceMeta := metadata.NewNamespaceMetadataGenerator(config.AddResourceMetadata.Namespace, namespaceWatcher.Store(), watcher.Client()) + serviceMetaGen := metadata.NewServiceMetadataGenerator(cfg, watcher.Store(), namespaceMeta, watcher.Client()) + enricher := buildMetadataEnricher(watcher, nodeWatcher, namespaceWatcher, // update func(m map[string]common.MapStr, r kubernetes.Resource) { accessor, _ := meta.Accessor(r) @@ -214,27 +172,27 @@ func NewContainerMetadataEnricher( base mb.BaseMetricSet, nodeScope bool) Enricher { - watcher, err := GetNamedWatcher("container_metadata_enricher", base, &kubernetes.Pod{}, nodeScope) - if err != nil { - logp.Err("Error initializing Kubernetes metadata enricher: %s", err) + config := validatedConfig(base) + if config == nil { + logp.Info("Kubernetes metricset enriching is disabled") return &nilEnricher{} } + watcher, nodeWatcher, namespaceWatcher := getResourceMetadataWatchers(config, &kubernetes.Pod{}, nodeScope) if watcher == nil { - logp.Info("Kubernetes metricset enriching is disabled") return &nilEnricher{} } - metaConfig := metadata.Config{} - if err := base.Module().UnpackConfig(&metaConfig); err != nil { + commonMetaConfig := metadata.Config{} + if err := base.Module().UnpackConfig(&commonMetaConfig); err != nil { logp.Err("Error initializing Kubernetes metadata enricher: %s", err) return &nilEnricher{} } + cfg, _ := common.NewConfigFrom(&commonMetaConfig) - cfg, _ := common.NewConfigFrom(&metaConfig) + metaGen := metadata.GetPodMetaGen(cfg, watcher, nodeWatcher, namespaceWatcher, config.AddResourceMetadata) - metaGen := metadata.NewPodMetadataGenerator(cfg, nil, watcher.Client(), nil, nil, true) - enricher := buildMetadataEnricher(watcher, + enricher := buildMetadataEnricher(watcher, nodeWatcher, namespaceWatcher, // update func(m map[string]common.MapStr, r kubernetes.Resource) { pod := r.(*kubernetes.Pod) @@ -276,6 +234,76 @@ func NewContainerMetadataEnricher( return enricher } +func getResourceMetadataWatchers(config *kubernetesConfig, resource kubernetes.Resource, nodeScope bool) (kubernetes.Watcher, kubernetes.Watcher, kubernetes.Watcher) { + client, err := kubernetes.GetKubernetesClient(config.KubeConfig, config.KubeClientOptions) + if err != nil { + logp.Err("Error creating Kubernetes client: %s", err) + return nil, nil, nil + } + + options := kubernetes.WatchOptions{ + SyncTimeout: config.SyncPeriod, + } + + log := logp.NewLogger(selector) + + // Watch objects in the node only + if nodeScope { + nd := &kubernetes.DiscoverKubernetesNodeParams{ + ConfigHost: config.Host, + Client: client, + IsInCluster: kubernetes.IsInCluster(config.KubeConfig), + HostUtils: &kubernetes.DefaultDiscoveryUtils{}, + } + options.Node, err = kubernetes.DiscoverKubernetesNode(log, nd) + if err != nil { + logp.Err("Couldn't discover kubernetes node: %s", err) + return nil, nil, nil + } + } + + log.Debugf("Initializing a new Kubernetes watcher using host: %v", config.Host) + + watcher, err := kubernetes.NewNamedWatcher("resource_metadata_enricher", client, resource, options, nil) + if err != nil { + logp.Err("Error initializing Kubernetes watcher: %s", err) + return nil, nil, nil + } + + nodeWatcher, err := kubernetes.NewNamedWatcher("resource_metadata_enricher_node", client, &kubernetes.Node{}, options, nil) + if err != nil { + logp.Err("Error creating watcher for %T due to error %+v", &kubernetes.Node{}, err) + return watcher, nil, nil + } + + namespaceWatcher, err := kubernetes.NewNamedWatcher("resource_metadata_enricher_namespace", client, &kubernetes.Namespace{}, kubernetes.WatchOptions{ + SyncTimeout: config.SyncPeriod, + }, nil) + if err != nil { + logp.Err("Error creating watcher for %T due to error %+v", &kubernetes.Namespace{}, err) + return watcher, nodeWatcher, nil + } + + return watcher, nodeWatcher, namespaceWatcher +} + +func validatedConfig(base mb.BaseMetricSet) *kubernetesConfig { + config := kubernetesConfig{ + AddMetadata: true, + SyncPeriod: time.Minute * 10, + AddResourceMetadata: metadata.GetDefaultResourceMetadataConfig(), + } + if err := base.Module().UnpackConfig(&config); err != nil { + return nil + } + + // Return nil if metadata enriching is disabled: + if !config.AddMetadata { + return nil + } + return &config +} + func getString(m common.MapStr, key string) string { val, err := m.GetValue(key) if err != nil { @@ -292,14 +320,18 @@ func join(fields ...string) string { func buildMetadataEnricher( watcher kubernetes.Watcher, + nodeWatcher kubernetes.Watcher, + namespaceWatcher kubernetes.Watcher, update func(map[string]common.MapStr, kubernetes.Resource), delete func(map[string]common.MapStr, kubernetes.Resource), index func(e common.MapStr) string) *enricher { enricher := enricher{ - metadata: map[string]common.MapStr{}, - index: index, - watcher: watcher, + metadata: map[string]common.MapStr{}, + index: index, + watcher: watcher, + nodeWatcher: nodeWatcher, + namespaceWatcher: namespaceWatcher, } watcher.AddEventHandler(kubernetes.ResourceEventHandlerFuncs{ @@ -326,6 +358,18 @@ func buildMetadataEnricher( func (m *enricher) Start() { m.watcherStartedLock.Lock() defer m.watcherStartedLock.Unlock() + if m.nodeWatcher != nil { + if err := m.nodeWatcher.Start(); err != nil { + logp.Warn("Error starting node watcher: %s", err) + } + } + + if m.namespaceWatcher != nil { + if err := m.namespaceWatcher.Start(); err != nil { + logp.Warn("Error starting namespace watcher: %s", err) + } + } + if !m.watcherStarted { err := m.watcher.Start() if err != nil { @@ -342,6 +386,13 @@ func (m *enricher) Stop() { m.watcher.Stop() m.watcherStarted = false } + if m.namespaceWatcher != nil { + m.namespaceWatcher.Stop() + } + + if m.nodeWatcher != nil { + m.nodeWatcher.Stop() + } } func (m *enricher) Enrich(events []common.MapStr) { diff --git a/metricbeat/module/kubernetes/util/kubernetes_test.go b/metricbeat/module/kubernetes/util/kubernetes_test.go index 4ab9acc9d5d5..83bbd3dd1f7a 100644 --- a/metricbeat/module/kubernetes/util/kubernetes_test.go +++ b/metricbeat/module/kubernetes/util/kubernetes_test.go @@ -36,6 +36,8 @@ import ( func TestBuildMetadataEnricher(t *testing.T) { watcher := mockWatcher{} + nodeWatcher := mockWatcher{} + namespaceWatcher := mockWatcher{} funcs := mockFuncs{} resource := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -48,7 +50,7 @@ func TestBuildMetadataEnricher(t *testing.T) { }, } - enricher := buildMetadataEnricher(&watcher, funcs.update, funcs.delete, funcs.index) + enricher := buildMetadataEnricher(&watcher, &nodeWatcher, &namespaceWatcher, funcs.update, funcs.delete, funcs.index) assert.NotNil(t, watcher.handler) enricher.Start() diff --git a/metricbeat/modules.d/kubernetes.yml.disabled b/metricbeat/modules.d/kubernetes.yml.disabled index 08203350e93d..144d13fb301b 100644 --- a/metricbeat/modules.d/kubernetes.yml.disabled +++ b/metricbeat/modules.d/kubernetes.yml.disabled @@ -25,6 +25,16 @@ #host: node_name # If kube_config is not set, KUBECONFIG environment variable will be checked # and if not present it will fall back to InCluster + # To configure additionally node and namespace metadata, added to pod, service and container resource types, + # `add_resource_metadata` can be defined. + # By default all labels will be included while annotations are not added by default. + # add_resource_metadata: + # namespace: + # include_labels: ["namespacelabel1"] + # node: + # include_labels: ["nodelabel2"] + # include_annotations: ["nodeannotation1"] + # deployment: false #kube_config: ~/.kube/config # Kubernetes client QPS and burst can be configured additionally #kube_client_options: diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go index 3a702b95c0e0..bb9ec52887fe 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go @@ -89,9 +89,7 @@ func NewPodEventer( Node: cfg.Node, } metaConf := cfg.AddResourceMetadata - if metaConf == nil { - metaConf = metadata.GetDefaultResourceMetadataConfig() - } + nodeWatcher, err := kubernetes.NewNamedWatcher("agent-node", client, &kubernetes.Node{}, options, nil) if err != nil { logger.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Node{}, err) diff --git a/x-pack/metricbeat/metricbeat.reference.yml b/x-pack/metricbeat/metricbeat.reference.yml index be4adb144c8a..4d1d41aa0d52 100644 --- a/x-pack/metricbeat/metricbeat.reference.yml +++ b/x-pack/metricbeat/metricbeat.reference.yml @@ -872,6 +872,16 @@ metricbeat.modules: # If kube_config is not set, KUBECONFIG environment variable will be checked # and if not present it will fall back to InCluster #kube_config: ~/.kube/config + # To configure additionally node and namespace metadata, added to pod, service and container resource types, + # `add_resource_metadata` can be defined. + # By default all labels will be included while annotations are not added by default. + # add_resource_metadata: + # namespace: + # include_labels: ["namespacelabel1"] + # node: + # include_labels: ["nodelabel2"] + # include_annotations: ["nodeannotation1"] + # deployment: false # Kubernetes client QPS and burst can be configured additionally #kube_client_options: # qps: 5 @@ -906,6 +916,16 @@ metricbeat.modules: # If kube_config is not set, KUBECONFIG environment variable will be checked # and if not present it will fall back to InCluster #kube_config: ~/.kube/config + # To configure additionally node and namespace metadata, added to pod, service and container resource types, + # `add_resource_metadata` can be defined. + # By default all labels will be included while annotations are not added by default. + # add_resource_metadata: + # namespace: + # include_labels: ["namespacelabel1"] + # node: + # include_labels: ["nodelabel2"] + # include_annotations: ["nodeannotation1"] + # deployment: false # Kubernetes client QPS and burst can be configured additionally #kube_client_options: # qps: 5