diff --git a/x-pack/elastic-agent/CHANGELOG.next.asciidoc b/x-pack/elastic-agent/CHANGELOG.next.asciidoc index a443566ce80..15b4ce745b1 100644 --- a/x-pack/elastic-agent/CHANGELOG.next.asciidoc +++ b/x-pack/elastic-agent/CHANGELOG.next.asciidoc @@ -123,3 +123,4 @@ - Add proxy support to enroll command. {pull}26514[26514] - Enable configuring monitoring namespace {issue}26439[26439] - Communicate with Fleet Server over HTTP2. {pull}26474[26474] +- Support Node and Service autodiscovery in kubernetes dynamic provider. {pull}26801[26801] diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/config.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/config.go index 1a85005df19..d0538f43363 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/config.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/config.go @@ -9,24 +9,57 @@ package kubernetes import ( "time" + + "github.com/elastic/beats/v7/libbeat/logp" ) // Config for kubernetes provider type Config struct { + Scope string `config:"scope"` + Resources Resources `config:"resources"` + KubeConfig string `config:"kube_config"` + Namespace string `config:"namespace"` SyncPeriod time.Duration `config:"sync_period"` CleanupTimeout time.Duration `config:"cleanup_timeout" validate:"positive"` - // Needed when resource is a pod + // Needed when resource is a Pod or Node Node string `config:"node"` +} - // Scope of the provider (cluster or node) - Scope string `config:"scope"` +// Resources config section for resources' config blocks +type Resources struct { + Pod Enabled `config:"pod"` + Node Enabled `config:"node"` + Service Enabled `config:"service"` +} + +// Enabled config section for resources' config blocks +type Enabled struct { + Enabled bool `config:"enabled"` } // InitDefaults initializes the default values for the config. func (c *Config) InitDefaults() { - c.SyncPeriod = 10 * time.Minute c.CleanupTimeout = 60 * time.Second + c.SyncPeriod = 10 * time.Minute c.Scope = "node" } + +// Validate ensures correctness of config +func (c *Config) Validate() error { + // Check if resource is service. If yes then default the scope to "cluster". + if c.Resources.Service.Enabled { + if c.Scope == "node" { + logp.L().Warnf("can not set scope to `node` when using resource `Service`. resetting scope to `cluster`") + } + c.Scope = "cluster" + } + + if !c.Resources.Pod.Enabled && !c.Resources.Node.Enabled && !c.Resources.Service.Enabled { + c.Resources.Pod = Enabled{true} + c.Resources.Node = Enabled{true} + } + + return nil +} diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/kubernetes.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/kubernetes.go index 9cea442dc6b..bb08c813c21 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/kubernetes.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/kubernetes.go @@ -6,7 +6,8 @@ package kubernetes import ( "fmt" - "time" + + k8s "k8s.io/client-go/kubernetes" "github.com/elastic/beats/v7/libbeat/common/kubernetes" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" @@ -16,10 +17,14 @@ import ( ) const ( + // NodePriority is the priority that node mappings are added to the provider. + NodePriority = 0 // PodPriority is the priority that pod mappings are added to the provider. - PodPriority = 0 + PodPriority = 1 // ContainerPriority is the priority that container mappings are added to the provider. - ContainerPriority = 1 + ContainerPriority = 2 + // ServicePriority is the priority that service mappings are added to the provider. + ServicePriority = 3 ) func init() { @@ -31,12 +36,6 @@ type dynamicProvider struct { config *Config } -type eventWatcher struct { - logger *logger.Logger - cleanupTimeout time.Duration - comm composable.DynamicProviderComm -} - // DynamicProviderBuilder builds the dynamic provider. func DynamicProviderBuilder(logger *logger.Logger, c *config.Config) (composable.DynamicProvider, error) { var cfg Config @@ -50,172 +49,96 @@ func DynamicProviderBuilder(logger *logger.Logger, c *config.Config) (composable return &dynamicProvider{logger, &cfg}, nil } -// Run runs the environment context provider. +// Run runs the kubernetes context provider. func (p *dynamicProvider) Run(comm composable.DynamicProviderComm) error { - client, err := kubernetes.GetKubernetesClient(p.config.KubeConfig) + if p.config.Resources.Pod.Enabled { + err := p.watchResource(comm, "pod", p.config) + if err != nil { + return err + } + } + if p.config.Resources.Node.Enabled { + err := p.watchResource(comm, "node", p.config) + if err != nil { + return err + } + } + if p.config.Resources.Service.Enabled { + err := p.watchResource(comm, "service", p.config) + if err != nil { + return err + } + } + return nil +} + +// watchResource initializes the proper watcher according to the given resource (pod, node, service) +// and starts watching for such resource's events. +func (p *dynamicProvider) watchResource( + comm composable.DynamicProviderComm, + resourceType string, + config *Config) error { + client, err := kubernetes.GetKubernetesClient(config.KubeConfig) if err != nil { // info only; return nil (do nothing) - p.logger.Debugf("Kubernetes provider skipped, unable to connect: %s", err) + p.logger.Debugf("Kubernetes provider for resource %s skipped, unable to connect: %s", resourceType, err) return nil } // Ensure that node is set correctly whenever the scope is set to "node". Make sure that node is empty // when cluster scope is enforced. - p.logger.Infof("Kubernetes provider started with %s scope", p.config.Scope) + p.logger.Infof("Kubernetes provider started for resource %s with %s scope", resourceType, p.config.Scope) if p.config.Scope == "node" { - p.logger.Debugf("Initializing Kubernetes watcher using node: %v", p.config.Node) - p.config.Node = kubernetes.DiscoverKubernetesNode(p.logger, p.config.Node, kubernetes.IsInCluster(p.config.KubeConfig), client) + p.logger.Debugf( + "Initializing Kubernetes watcher for resource %s using node: %v", + resourceType, + config.Node) + config.Node = kubernetes.DiscoverKubernetesNode( + p.logger, config.Node, + kubernetes.IsInCluster(config.KubeConfig), + client) } else { - p.config.Node = "" + config.Node = "" } - watcher, err := kubernetes.NewWatcher(client, &kubernetes.Pod{}, kubernetes.WatchOptions{ - SyncTimeout: p.config.SyncPeriod, - Node: p.config.Node, - //Namespace: p.config.Namespace, - }, nil) + watcher, err := p.newWatcher(resourceType, comm, client, config) if err != nil { - return errors.New(err, "couldn't create kubernetes watcher") + return errors.New(err, "couldn't create kubernetes watcher for resource %s", resourceType) } - watcher.AddEventHandler(&eventWatcher{p.logger, p.config.CleanupTimeout, comm}) err = watcher.Start() if err != nil { - return errors.New(err, "couldn't start kubernetes watcher") + return errors.New(err, "couldn't start kubernetes watcher for resource %s", resourceType) } - return nil } -func (p *eventWatcher) emitRunning(pod *kubernetes.Pod) { - mapping := map[string]interface{}{ - "namespace": pod.GetNamespace(), - "pod": map[string]interface{}{ - "uid": string(pod.GetUID()), - "name": pod.GetName(), - "labels": pod.GetLabels(), - "ip": pod.Status.PodIP, - }, - } - - processors := []map[string]interface{}{ - { - "add_fields": map[string]interface{}{ - "fields": mapping, - "target": "kubernetes", - }, - }, - } - - // Emit the pod - // We emit Pod + containers to ensure that configs matching Pod only - // get Pod metadata (not specific to any container) - p.comm.AddOrUpdate(string(pod.GetUID()), PodPriority, mapping, processors) - - // Emit all containers in the pod - p.emitContainers(pod, pod.Spec.Containers, pod.Status.ContainerStatuses) - - // TODO deal with init containers stopping after initialization - p.emitContainers(pod, pod.Spec.InitContainers, pod.Status.InitContainerStatuses) -} - -func (p *eventWatcher) emitContainers(pod *kubernetes.Pod, containers []kubernetes.Container, containerstatuses []kubernetes.PodContainerStatus) { - // Collect all runtimes from status information. - containerIDs := map[string]string{} - runtimes := map[string]string{} - for _, c := range containerstatuses { - cid, runtime := kubernetes.ContainerIDWithRuntime(c) - containerIDs[c.Name] = cid - runtimes[c.Name] = runtime - } - - for _, c := range containers { - // If it doesn't have an ID, container doesn't exist in - // the runtime, emit only an event if we are stopping, so - // we are sure of cleaning up configurations. - cid := containerIDs[c.Name] - if cid == "" { - continue +// newWatcher initializes the proper watcher according to the given resource (pod, node, service). +func (p *dynamicProvider) newWatcher( + resourceType string, + comm composable.DynamicProviderComm, + client k8s.Interface, + config *Config) (kubernetes.Watcher, error) { + switch resourceType { + case "pod": + watcher, err := NewPodWatcher(comm, config, p.logger, client, p.config.Scope) + if err != nil { + return nil, err } - - // ID is the combination of pod UID + container name - eventID := fmt.Sprintf("%s.%s", pod.GetObjectMeta().GetUID(), c.Name) - - mapping := map[string]interface{}{ - "namespace": pod.GetNamespace(), - "pod": map[string]interface{}{ - "uid": string(pod.GetUID()), - "name": pod.GetName(), - "labels": pod.GetLabels(), - "ip": pod.Status.PodIP, - }, - "container": map[string]interface{}{ - "id": cid, - "name": c.Name, - "image": c.Image, - "runtime": runtimes[c.Name], - }, + return watcher, nil + case "node": + watcher, err := NewNodeWatcher(comm, config, p.logger, client, p.config.Scope) + if err != nil { + return nil, err } - - processors := []map[string]interface{}{ - { - "add_fields": map[string]interface{}{ - "fields": mapping, - "target": "kubernetes", - }, - }, + return watcher, nil + case "service": + watcher, err := NewServiceWatcher(comm, config, p.logger, client, p.config.Scope) + if err != nil { + return nil, err } - - // Emit the container - p.comm.AddOrUpdate(eventID, ContainerPriority, mapping, processors) + return watcher, nil + default: + return nil, fmt.Errorf("unsupported autodiscover resource %s", resourceType) } } - -func (p *eventWatcher) emitStopped(pod *kubernetes.Pod) { - p.comm.Remove(string(pod.GetUID())) - - for _, c := range pod.Spec.Containers { - // ID is the combination of pod UID + container name - eventID := fmt.Sprintf("%s.%s", pod.GetObjectMeta().GetUID(), c.Name) - p.comm.Remove(eventID) - } - - for _, c := range pod.Spec.InitContainers { - // ID is the combination of pod UID + container name - eventID := fmt.Sprintf("%s.%s", pod.GetObjectMeta().GetUID(), c.Name) - p.comm.Remove(eventID) - } -} - -// OnAdd ensures processing of pod objects that are newly added -func (p *eventWatcher) OnAdd(obj interface{}) { - p.logger.Debugf("pod add: %+v", obj) - p.emitRunning(obj.(*kubernetes.Pod)) -} - -// OnUpdate emits events for a given pod depending on the state of the pod, -// if it is terminating, a stop event is scheduled, if not, a stop and a start -// events are sent sequentially to recreate the resources assotiated to the pod. -func (p *eventWatcher) OnUpdate(obj interface{}) { - pod := obj.(*kubernetes.Pod) - - p.logger.Debugf("pod update for pod: %+v, status: %+v", pod.Name, pod.Status.Phase) - switch pod.Status.Phase { - case kubernetes.PodSucceeded, kubernetes.PodFailed: - time.AfterFunc(p.cleanupTimeout, func() { p.emitStopped(pod) }) - return - case kubernetes.PodPending: - p.logger.Debugf("pod update (pending): don't know what to do with this pod yet, skipping for now: %+v", obj) - return - } - - p.logger.Debugf("pod update: %+v", obj) - p.emitRunning(pod) -} - -// OnDelete stops pod objects that are deleted -func (p *eventWatcher) OnDelete(obj interface{}) { - p.logger.Debugf("pod delete: %+v", obj) - pod := obj.(*kubernetes.Pod) - time.AfterFunc(p.cleanupTimeout, func() { p.emitStopped(pod) }) -} diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/node.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/node.go new file mode 100644 index 00000000000..455a06107ef --- /dev/null +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/node.go @@ -0,0 +1,218 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package kubernetes + +import ( + "time" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" + k8s "k8s.io/client-go/kubernetes" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/kubernetes" + "github.com/elastic/beats/v7/libbeat/common/safemapstr" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable" +) + +type node struct { + logger *logp.Logger + cleanupTimeout time.Duration + comm composable.DynamicProviderComm + scope string + config *Config +} + +type nodeData struct { + node *kubernetes.Node + mapping map[string]interface{} + processors []map[string]interface{} +} + +// NewNodeWatcher creates a watcher that can discover and process node objects +func NewNodeWatcher( + comm composable.DynamicProviderComm, + cfg *Config, + logger *logp.Logger, + client k8s.Interface, + scope string) (kubernetes.Watcher, error) { + watcher, err := kubernetes.NewWatcher(client, &kubernetes.Node{}, kubernetes.WatchOptions{ + SyncTimeout: cfg.SyncPeriod, + Node: cfg.Node, + IsUpdated: isUpdated, + HonorReSyncs: true, + }, nil) + if err != nil { + return nil, errors.New(err, "couldn't create kubernetes watcher") + } + watcher.AddEventHandler(&node{logger, cfg.CleanupTimeout, comm, scope, cfg}) + + return watcher, nil +} + +func (n *node) emitRunning(node *kubernetes.Node) { + data := generateNodeData(node, n.config) + if data == nil { + return + } + data.mapping["scope"] = n.scope + + // Emit the node + n.comm.AddOrUpdate(string(node.GetUID()), NodePriority, data.mapping, data.processors) +} + +func (n *node) emitStopped(node *kubernetes.Node) { + n.comm.Remove(string(node.GetUID())) +} + +// OnAdd ensures processing of node objects that are newly created +func (n *node) OnAdd(obj interface{}) { + n.logger.Debugf("Watcher Node add: %+v", obj) + n.emitRunning(obj.(*kubernetes.Node)) +} + +// OnUpdate ensures processing of node objects that are updated +func (n *node) OnUpdate(obj interface{}) { + node := obj.(*kubernetes.Node) + if node.GetObjectMeta().GetDeletionTimestamp() != nil { + n.logger.Debugf("Watcher Node update (terminating): %+v", obj) + // Node is terminating, don't reload its configuration and ignore the event as long as node is Ready. + if isNodeReady(node) { + return + } + time.AfterFunc(n.cleanupTimeout, func() { n.emitStopped(node) }) + } else { + n.logger.Debugf("Watcher Node update: %+v", obj) + n.emitRunning(node) + } +} + +// OnDelete ensures processing of node objects that are deleted +func (n *node) OnDelete(obj interface{}) { + n.logger.Debugf("Watcher Node delete: %+v", obj) + node := obj.(*kubernetes.Node) + time.AfterFunc(n.cleanupTimeout, func() { n.emitStopped(node) }) +} + +func isUpdated(o, n interface{}) bool { + old, _ := o.(*kubernetes.Node) + new, _ := n.(*kubernetes.Node) + + // Consider as not update in case one of the two objects is not a Node + if old == nil || new == nil { + return true + } + + // This is a resync. It is not an update + if old.ResourceVersion == new.ResourceVersion { + return false + } + + // If the old object and new object are different + oldCopy := old.DeepCopy() + oldCopy.ResourceVersion = "" + + newCopy := new.DeepCopy() + newCopy.ResourceVersion = "" + + // If the old object and new object are different in either meta or spec then there is a valid change + if !equality.Semantic.DeepEqual(oldCopy.Spec, newCopy.Spec) || !equality.Semantic.DeepEqual(oldCopy.ObjectMeta, newCopy.ObjectMeta) { + return true + } + + // If there is a change in the node status then there is a valid change. + if isNodeReady(old) != isNodeReady(new) { + return true + } + return false +} + +// getAddress returns the IP of the node Resource. If there is a +// NodeExternalIP then it is returned, if not then it will try to find +// an address of NodeExternalIP type and if not found it looks for a NodeHostName address type +func getAddress(node *kubernetes.Node) string { + for _, address := range node.Status.Addresses { + if address.Type == v1.NodeExternalIP && address.Address != "" { + return address.Address + } + } + + for _, address := range node.Status.Addresses { + if address.Type == v1.NodeExternalIP && address.Address != "" { + return address.Address + } + } + + for _, address := range node.Status.Addresses { + if address.Type == v1.NodeHostName && address.Address != "" { + return address.Address + } + } + + return "" +} + +func isNodeReady(node *kubernetes.Node) bool { + for _, c := range node.Status.Conditions { + if c.Type == v1.NodeReady { + return c.Status == v1.ConditionTrue + } + } + return false +} + +func generateNodeData(node *kubernetes.Node, cfg *Config) *nodeData { + host := getAddress(node) + + // If a node doesn't have an IP then dont monitor it + if host == "" { + return nil + } + + // If the node is not in ready state then dont monitor it + if !isNodeReady(node) { + return nil + } + + //TODO: add metadata here too ie -> meta := n.metagen.Generate(node) + + // Pass annotations to all events so that it can be used in templating and by annotation builders. + annotations := common.MapStr{} + for k, v := range node.GetObjectMeta().GetAnnotations() { + safemapstr.Put(annotations, k, v) + } + + labels := common.MapStr{} + for k, v := range node.GetObjectMeta().GetLabels() { + // TODO: add dedoting option + safemapstr.Put(labels, k, v) + } + + mapping := map[string]interface{}{ + "node": map[string]interface{}{ + "uid": string(node.GetUID()), + "name": node.GetName(), + "labels": labels, + "annotations": annotations, + "ip": host, + }, + } + + processors := []map[string]interface{}{ + { + "add_fields": map[string]interface{}{ + "fields": mapping, + "target": "kubernetes", + }, + }, + } + return &nodeData{ + node: node, + mapping: mapping, + processors: processors, + } +} diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/node_test.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/node_test.go new file mode 100644 index 00000000000..68c35878490 --- /dev/null +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/node_test.go @@ -0,0 +1,72 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package kubernetes + +import ( + "testing" + + "github.com/elastic/beats/v7/libbeat/common" + + "github.com/stretchr/testify/assert" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + + "github.com/elastic/beats/v7/libbeat/common/kubernetes" +) + +func TestGenerateNodeData(t *testing.T) { + uid := "005f3b90-4b9d-12f8-acf0-31020a840133" + node := &kubernetes.Node{ + ObjectMeta: kubernetes.ObjectMeta{ + Name: "testnode", + UID: types.UID(uid), + Labels: map[string]string{ + "foo": "bar", + }, + Annotations: map[string]string{ + "baz": "ban", + }, + }, + TypeMeta: metav1.TypeMeta{ + Kind: "Node", + APIVersion: "v1", + }, + Status: v1.NodeStatus{ + Conditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}}, + Addresses: []v1.NodeAddress{{Type: v1.NodeHostName, Address: "node1"}}, + }, + } + + data := generateNodeData(node, &Config{}) + + mapping := map[string]interface{}{ + "node": map[string]interface{}{ + "uid": string(node.GetUID()), + "name": node.GetName(), + "labels": common.MapStr{ + "foo": "bar", + }, + "annotations": common.MapStr{ + "baz": "ban", + }, + "ip": "node1", + }, + } + + processors := []map[string]interface{}{ + { + "add_fields": map[string]interface{}{ + "fields": mapping, + "target": "kubernetes", + }, + }, + } + + assert.Equal(t, node, data.node) + assert.Equal(t, mapping, data.mapping) + assert.Equal(t, processors, data.processors) +} diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go new file mode 100644 index 00000000000..b0b3ab3b525 --- /dev/null +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go @@ -0,0 +1,224 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package kubernetes + +import ( + "fmt" + "time" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/safemapstr" + + k8s "k8s.io/client-go/kubernetes" + + "github.com/elastic/beats/v7/libbeat/common/kubernetes" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable" +) + +type pod struct { + logger *logp.Logger + cleanupTimeout time.Duration + comm composable.DynamicProviderComm + scope string + config *Config +} + +type providerData struct { + uid string + mapping map[string]interface{} + processors []map[string]interface{} +} + +// NewPodWatcher creates a watcher that can discover and process pod objects +func NewPodWatcher( + comm composable.DynamicProviderComm, + cfg *Config, + logger *logp.Logger, + client k8s.Interface, + scope string) (kubernetes.Watcher, error) { + watcher, err := kubernetes.NewWatcher(client, &kubernetes.Pod{}, kubernetes.WatchOptions{ + SyncTimeout: cfg.SyncPeriod, + Node: cfg.Node, + Namespace: cfg.Namespace, + HonorReSyncs: true, + }, nil) + if err != nil { + return nil, errors.New(err, "couldn't create kubernetes watcher") + } + watcher.AddEventHandler(&pod{logger, cfg.CleanupTimeout, comm, scope, cfg}) + + return watcher, nil +} + +func (p *pod) emitRunning(pod *kubernetes.Pod) { + data := generatePodData(pod, p.config) + data.mapping["scope"] = p.scope + // Emit the pod + // We emit Pod + containers to ensure that configs matching Pod only + // get Pod metadata (not specific to any container) + p.comm.AddOrUpdate(data.uid, PodPriority, data.mapping, data.processors) + + // Emit all containers in the pod + p.emitContainers(pod, pod.Spec.Containers, pod.Status.ContainerStatuses) + + // TODO: deal with init containers stopping after initialization + p.emitContainers(pod, pod.Spec.InitContainers, pod.Status.InitContainerStatuses) + // TODO: deal with ephemeral containers +} + +func (p *pod) emitContainers(pod *kubernetes.Pod, containers []kubernetes.Container, containerstatuses []kubernetes.PodContainerStatus) { + generateContainerData(p.comm, pod, containers, containerstatuses, p.config) +} + +func (p *pod) emitStopped(pod *kubernetes.Pod) { + p.comm.Remove(string(pod.GetUID())) + + for _, c := range pod.Spec.Containers { + // ID is the combination of pod UID + container name + eventID := fmt.Sprintf("%s.%s", pod.GetObjectMeta().GetUID(), c.Name) + p.comm.Remove(eventID) + } + + for _, c := range pod.Spec.InitContainers { + // ID is the combination of pod UID + container name + eventID := fmt.Sprintf("%s.%s", pod.GetObjectMeta().GetUID(), c.Name) + p.comm.Remove(eventID) + } +} + +// OnAdd ensures processing of pod objects that are newly added +func (p *pod) OnAdd(obj interface{}) { + p.logger.Debugf("pod add: %+v", obj) + p.emitRunning(obj.(*kubernetes.Pod)) +} + +// OnUpdate emits events for a given pod depending on the state of the pod, +// if it is terminating, a stop event is scheduled, if not, a stop and a start +// events are sent sequentially to recreate the resources assotiated to the pod. +func (p *pod) OnUpdate(obj interface{}) { + pod := obj.(*kubernetes.Pod) + + p.logger.Debugf("pod update for pod: %+v, status: %+v", pod.Name, pod.Status.Phase) + switch pod.Status.Phase { + case kubernetes.PodSucceeded, kubernetes.PodFailed: + time.AfterFunc(p.cleanupTimeout, func() { p.emitStopped(pod) }) + return + case kubernetes.PodPending: + p.logger.Debugf("pod update (pending): don't know what to do with this pod yet, skipping for now: %+v", obj) + return + } + + p.logger.Debugf("pod update: %+v", obj) + p.emitRunning(pod) +} + +// OnDelete stops pod objects that are deleted +func (p *pod) OnDelete(obj interface{}) { + p.logger.Debugf("pod delete: %+v", obj) + pod := obj.(*kubernetes.Pod) + time.AfterFunc(p.cleanupTimeout, func() { p.emitStopped(pod) }) +} + +func generatePodData(pod *kubernetes.Pod, cfg *Config) providerData { + // TODO: add metadata here too ie -> meta := s.metagen.Generate(pod) + + // Pass annotations to all events so that it can be used in templating and by annotation builders. + annotations := common.MapStr{} + for k, v := range pod.GetObjectMeta().GetAnnotations() { + safemapstr.Put(annotations, k, v) + } + + labels := common.MapStr{} + for k, v := range pod.GetObjectMeta().GetLabels() { + // TODO: add dedoting option + safemapstr.Put(labels, k, v) + } + + mapping := map[string]interface{}{ + "namespace": pod.GetNamespace(), + "pod": map[string]interface{}{ + "uid": string(pod.GetUID()), + "name": pod.GetName(), + "labels": labels, + "annotations": annotations, + "ip": pod.Status.PodIP, + }, + } + return providerData{ + uid: string(pod.GetUID()), + mapping: mapping, + processors: []map[string]interface{}{ + { + "add_fields": map[string]interface{}{ + "fields": mapping, + "target": "kubernetes", + }, + }, + }, + } +} + +func generateContainerData( + comm composable.DynamicProviderComm, + pod *kubernetes.Pod, + containers []kubernetes.Container, + containerstatuses []kubernetes.PodContainerStatus, + cfg *Config) { + //TODO: add metadata here too ie -> meta := s.metagen.Generate() + + containerIDs := map[string]string{} + runtimes := map[string]string{} + for _, c := range containerstatuses { + cid, runtime := kubernetes.ContainerIDWithRuntime(c) + containerIDs[c.Name] = cid + runtimes[c.Name] = runtime + } + + labels := common.MapStr{} + for k, v := range pod.GetObjectMeta().GetLabels() { + safemapstr.Put(labels, k, v) + } + + for _, c := range containers { + // If it doesn't have an ID, container doesn't exist in + // the runtime, emit only an event if we are stopping, so + // we are sure of cleaning up configurations. + cid := containerIDs[c.Name] + if cid == "" { + continue + } + + // ID is the combination of pod UID + container name + eventID := fmt.Sprintf("%s.%s", pod.GetObjectMeta().GetUID(), c.Name) + + mapping := map[string]interface{}{ + "namespace": pod.GetNamespace(), + "pod": map[string]interface{}{ + "uid": string(pod.GetUID()), + "name": pod.GetName(), + "labels": labels, + "ip": pod.Status.PodIP, + }, + "container": map[string]interface{}{ + "id": cid, + "name": c.Name, + "image": c.Image, + "runtime": runtimes[c.Name], + }, + } + + processors := []map[string]interface{}{ + { + "add_fields": map[string]interface{}{ + "fields": mapping, + "target": "kubernetes", + }, + }, + } + comm.AddOrUpdate(eventID, ContainerPriority, mapping, processors) + } +} diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod_test.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod_test.go new file mode 100644 index 00000000000..1e85557a2d6 --- /dev/null +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod_test.go @@ -0,0 +1,187 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package kubernetes + +import ( + "context" + "fmt" + "testing" + + "github.com/elastic/beats/v7/libbeat/common" + + "github.com/stretchr/testify/assert" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + + "github.com/elastic/beats/v7/libbeat/common/kubernetes" +) + +func TestGeneratePodData(t *testing.T) { + uid := "005f3b90-4b9d-12f8-acf0-31020a840133" + pod := &kubernetes.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testpod", + UID: types.UID(uid), + Namespace: "testns", + Labels: map[string]string{ + "foo": "bar", + }, + Annotations: map[string]string{ + "app": "production", + }, + }, + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + Spec: kubernetes.PodSpec{ + NodeName: "testnode", + }, + Status: kubernetes.PodStatus{PodIP: "127.0.0.5"}, + } + + data := generatePodData(pod, &Config{}) + + mapping := map[string]interface{}{ + "namespace": pod.GetNamespace(), + "pod": map[string]interface{}{ + "uid": string(pod.GetUID()), + "name": pod.GetName(), + "labels": common.MapStr{ + "foo": "bar", + }, + "annotations": common.MapStr{ + "app": "production", + }, + "ip": pod.Status.PodIP, + }, + } + processors := []map[string]interface{}{ + { + "add_fields": map[string]interface{}{ + "fields": mapping, + "target": "kubernetes", + }, + }, + } + + assert.Equal(t, string(pod.GetUID()), data.uid) + assert.Equal(t, mapping, data.mapping) + assert.Equal(t, processors, data.processors) +} + +func TestGenerateContainerPodData(t *testing.T) { + uid := "005f3b90-4b9d-12f8-acf0-31020a840133" + pod := &kubernetes.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testpod", + UID: types.UID(uid), + Namespace: "testns", + Labels: map[string]string{ + "foo": "bar", + }, + Annotations: map[string]string{ + "app": "production", + }, + }, + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + Spec: kubernetes.PodSpec{ + NodeName: "testnode", + }, + Status: kubernetes.PodStatus{PodIP: "127.0.0.5"}, + } + + providerDataChan := make(chan providerData, 1) + + containers := []kubernetes.Container{ + { + Name: "nginx", + Image: "nginx:1.120", + Ports: []kubernetes.ContainerPort{ + { + Name: "http", + Protocol: v1.ProtocolTCP, + ContainerPort: 80, + }, + }, + }, + } + containerStatuses := []kubernetes.PodContainerStatus{ + { + Name: "nginx", + Ready: true, + ContainerID: "crio://asdfghdeadbeef", + }, + } + comm := MockDynamicComm{ + context.TODO(), + providerDataChan, + } + generateContainerData( + &comm, + pod, + containers, + containerStatuses, + &Config{}) + + mapping := map[string]interface{}{ + "namespace": pod.GetNamespace(), + "pod": map[string]interface{}{ + "uid": string(pod.GetUID()), + "name": pod.GetName(), + "labels": common.MapStr{ + "foo": "bar", + }, + "ip": pod.Status.PodIP, + }, + "container": map[string]interface{}{ + "id": "asdfghdeadbeef", + "name": "nginx", + "image": "nginx:1.120", + "runtime": "crio", + }, + } + + processors := []map[string]interface{}{ + { + "add_fields": map[string]interface{}{ + "fields": mapping, + "target": "kubernetes", + }, + }, + } + + cuid := fmt.Sprintf("%s.%s", pod.GetObjectMeta().GetUID(), "nginx") + data := <-providerDataChan + assert.Equal(t, cuid, data.uid) + assert.Equal(t, mapping, data.mapping) + assert.Equal(t, processors, data.processors) + +} + +// MockDynamicComm is used in tests. +type MockDynamicComm struct { + context.Context + providerDataChan chan providerData +} + +// AddOrUpdate adds or updates a current mapping. +func (t *MockDynamicComm) AddOrUpdate(id string, priority int, mapping map[string]interface{}, processors []map[string]interface{}) error { + t.providerDataChan <- providerData{ + id, + mapping, + processors, + } + return nil +} + +// Remove +func (t *MockDynamicComm) Remove(id string) { +} diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service.go new file mode 100644 index 00000000000..a0f73b16382 --- /dev/null +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service.go @@ -0,0 +1,140 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package kubernetes + +import ( + "time" + + k8s "k8s.io/client-go/kubernetes" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/kubernetes" + "github.com/elastic/beats/v7/libbeat/common/safemapstr" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable" +) + +type service struct { + logger *logp.Logger + cleanupTimeout time.Duration + comm composable.DynamicProviderComm + scope string + config *Config +} + +type serviceData struct { + service *kubernetes.Service + mapping map[string]interface{} + processors []map[string]interface{} +} + +// NewServiceWatcher creates a watcher that can discover and process service objects +func NewServiceWatcher( + comm composable.DynamicProviderComm, + cfg *Config, + logger *logp.Logger, + client k8s.Interface, + scope string) (kubernetes.Watcher, error) { + watcher, err := kubernetes.NewWatcher(client, &kubernetes.Service{}, kubernetes.WatchOptions{ + SyncTimeout: cfg.SyncPeriod, + Node: cfg.Node, + HonorReSyncs: true, + }, nil) + if err != nil { + return nil, errors.New(err, "couldn't create kubernetes watcher") + } + watcher.AddEventHandler(&service{logger, cfg.CleanupTimeout, comm, scope, cfg}) + + return watcher, nil +} + +func (s *service) emitRunning(service *kubernetes.Service) { + data := generateServiceData(service, s.config) + if data == nil { + return + } + data.mapping["scope"] = s.scope + + // Emit the service + s.comm.AddOrUpdate(string(service.GetUID()), ServicePriority, data.mapping, data.processors) +} + +func (s *service) emitStopped(service *kubernetes.Service) { + s.comm.Remove(string(service.GetUID())) +} + +// OnAdd ensures processing of service objects that are newly created +func (s *service) OnAdd(obj interface{}) { + s.logger.Debugf("Watcher Service add: %+v", obj) + s.emitRunning(obj.(*kubernetes.Service)) +} + +// OnUpdate ensures processing of service objects that are updated +func (s *service) OnUpdate(obj interface{}) { + service := obj.(*kubernetes.Service) + // Once service is in terminated state, mark it for deletion + if service.GetObjectMeta().GetDeletionTimestamp() != nil { + s.logger.Debugf("Watcher Service update (terminating): %+v", obj) + time.AfterFunc(s.cleanupTimeout, func() { s.emitStopped(service) }) + } else { + s.logger.Debugf("Watcher Node update: %+v", obj) + s.emitRunning(service) + } +} + +// OnDelete ensures processing of service objects that are deleted +func (s *service) OnDelete(obj interface{}) { + s.logger.Debugf("Watcher Service delete: %+v", obj) + service := obj.(*kubernetes.Service) + time.AfterFunc(s.cleanupTimeout, func() { s.emitStopped(service) }) +} + +func generateServiceData(service *kubernetes.Service, cfg *Config) *serviceData { + host := service.Spec.ClusterIP + + // If a service doesn't have an IP then dont monitor it + if host == "" { + return nil + } + + //TODO: add metadata here too ie -> meta := s.metagen.Generate(service) + + // Pass annotations to all events so that it can be used in templating and by annotation builders. + annotations := common.MapStr{} + for k, v := range service.GetObjectMeta().GetAnnotations() { + safemapstr.Put(annotations, k, v) + } + + labels := common.MapStr{} + for k, v := range service.GetObjectMeta().GetLabels() { + // TODO: add dedoting option + safemapstr.Put(labels, k, v) + } + + mapping := map[string]interface{}{ + "service": map[string]interface{}{ + "uid": string(service.GetUID()), + "name": service.GetName(), + "labels": labels, + "annotations": annotations, + "ip": host, + }, + } + + processors := []map[string]interface{}{ + { + "add_fields": map[string]interface{}{ + "fields": mapping, + "target": "kubernetes", + }, + }, + } + return &serviceData{ + service: service, + mapping: mapping, + processors: processors, + } +} diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service_test.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service_test.go new file mode 100644 index 00000000000..c52a1069728 --- /dev/null +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service_test.go @@ -0,0 +1,76 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package kubernetes + +import ( + "testing" + + "github.com/elastic/beats/v7/libbeat/common" + + "github.com/stretchr/testify/assert" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + + "github.com/elastic/beats/v7/libbeat/common/kubernetes" +) + +func TestGenerateServiceData(t *testing.T) { + uid := "005f3b90-4b9d-12f8-acf0-31020a840133" + service := &kubernetes.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testsvc", + UID: types.UID(uid), + Namespace: "testns", + Labels: map[string]string{ + "foo": "bar", + }, + Annotations: map[string]string{ + "baz": "ban", + }, + }, + TypeMeta: metav1.TypeMeta{ + Kind: "Service", + APIVersion: "v1", + }, + Spec: v1.ServiceSpec{ + ClusterIP: "1.2.3.4", + Selector: map[string]string{ + "app": "istiod", + "istio": "pilot", + }, + }, + } + + data := generateServiceData(service, &Config{}) + + mapping := map[string]interface{}{ + "service": map[string]interface{}{ + "uid": string(service.GetUID()), + "name": service.GetName(), + "labels": common.MapStr{ + "foo": "bar", + }, + "annotations": common.MapStr{ + "baz": "ban", + }, + "ip": service.Spec.ClusterIP, + }, + } + + processors := []map[string]interface{}{ + { + "add_fields": map[string]interface{}{ + "fields": mapping, + "target": "kubernetes", + }, + }, + } + + assert.Equal(t, service, data.service) + assert.Equal(t, mapping, data.mapping) + assert.Equal(t, processors, data.processors) +}