diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go index 08d335cfd29..71f11191ec0 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go @@ -25,8 +25,8 @@ type pod struct { comm composable.DynamicProviderComm } -type podData struct { - pod *kubernetes.Pod +type providerData struct { + uid string mapping map[string]interface{} processors []map[string]interface{} } @@ -52,7 +52,7 @@ func (p *pod) emitRunning(pod *kubernetes.Pod) { // Emit the pod // We emit Pod + containers to ensure that configs matching Pod only // get Pod metadata (not specific to any container) - p.comm.AddOrUpdate(string(data.pod.GetUID()), PodPriority, data.mapping, data.processors) + p.comm.AddOrUpdate(data.uid, PodPriority, data.mapping, data.processors) // Emit all containers in the pod p.emitContainers(pod, pod.Spec.Containers, pod.Status.ContainerStatuses) @@ -62,54 +62,19 @@ func (p *pod) emitRunning(pod *kubernetes.Pod) { } func (p *pod) emitContainers(pod *kubernetes.Pod, containers []kubernetes.Container, containerstatuses []kubernetes.PodContainerStatus) { - // Collect all runtimes from status information. - containerIDs := map[string]string{} - runtimes := map[string]string{} - for _, c := range containerstatuses { - cid, runtime := kubernetes.ContainerIDWithRuntime(c) - containerIDs[c.Name] = cid - runtimes[c.Name] = runtime - } - - for _, c := range containers { - // If it doesn't have an ID, container doesn't exist in - // the runtime, emit only an event if we are stopping, so - // we are sure of cleaning up configurations. - cid := containerIDs[c.Name] - if cid == "" { - continue - } - - // ID is the combination of pod UID + container name - eventID := fmt.Sprintf("%s.%s", pod.GetObjectMeta().GetUID(), c.Name) - - mapping := map[string]interface{}{ - "namespace": pod.GetNamespace(), - "pod": map[string]interface{}{ - "uid": string(pod.GetUID()), - "name": pod.GetName(), - "labels": pod.GetLabels(), - "ip": pod.Status.PodIP, - }, - "container": map[string]interface{}{ - "id": cid, - "name": c.Name, - "image": c.Image, - "runtime": runtimes[c.Name], - }, - } - processors := []map[string]interface{}{ - { - "add_fields": map[string]interface{}{ - "fields": mapping, - "target": "kubernetes", - }, - }, + providerDataChan := make(chan providerData) + done := make(chan bool, 1) + go generateContainerData(pod, containers, containerstatuses, providerDataChan, done) + + for { + select { + case data := <-providerDataChan: + // Emit the container + p.comm.AddOrUpdate(data.uid, ContainerPriority, data.mapping, data.processors) + case <-done: + return } - - // Emit the container - p.comm.AddOrUpdate(eventID, ContainerPriority, mapping, processors) } } @@ -162,7 +127,7 @@ func (p *pod) OnDelete(obj interface{}) { time.AfterFunc(p.cleanupTimeout, func() { p.emitStopped(pod) }) } -func generatePodData(pod *kubernetes.Pod) podData { +func generatePodData(pod *kubernetes.Pod) providerData { //TODO: add metadata here too ie -> meta := s.metagen.Generate(pod) // Pass annotations to all events so that it can be used in templating and by annotation builders. @@ -181,8 +146,8 @@ func generatePodData(pod *kubernetes.Pod) podData { "ip": pod.Status.PodIP, }, } - return podData{ - pod: pod, + return providerData{ + uid: string(pod.GetUID()), mapping: mapping, processors: []map[string]interface{}{ { @@ -194,3 +159,64 @@ func generatePodData(pod *kubernetes.Pod) podData { }, } } + +func generateContainerData( + pod *kubernetes.Pod, + containers []kubernetes.Container, + containerstatuses []kubernetes.PodContainerStatus, + dataChan chan providerData, + done chan bool) { + //TODO: add metadata here too ie -> meta := s.metagen.Generate() + + containerIDs := map[string]string{} + runtimes := map[string]string{} + for _, c := range containerstatuses { + cid, runtime := kubernetes.ContainerIDWithRuntime(c) + containerIDs[c.Name] = cid + runtimes[c.Name] = runtime + } + + for _, c := range containers { + // If it doesn't have an ID, container doesn't exist in + // the runtime, emit only an event if we are stopping, so + // we are sure of cleaning up configurations. + cid := containerIDs[c.Name] + if cid == "" { + continue + } + + // ID is the combination of pod UID + container name + eventID := fmt.Sprintf("%s.%s", pod.GetObjectMeta().GetUID(), c.Name) + + mapping := map[string]interface{}{ + "namespace": pod.GetNamespace(), + "pod": map[string]interface{}{ + "uid": string(pod.GetUID()), + "name": pod.GetName(), + "labels": pod.GetLabels(), + "ip": pod.Status.PodIP, + }, + "container": map[string]interface{}{ + "id": cid, + "name": c.Name, + "image": c.Image, + "runtime": runtimes[c.Name], + }, + } + + processors := []map[string]interface{}{ + { + "add_fields": map[string]interface{}{ + "fields": mapping, + "target": "kubernetes", + }, + }, + } + dataChan <- providerData{ + uid: eventID, + mapping: mapping, + processors: processors, + } + } + done <- true +} diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod_test.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod_test.go index 9e802a7d6d0..2bffabfd55c 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod_test.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod_test.go @@ -5,12 +5,14 @@ package kubernetes import ( + "fmt" "testing" "github.com/elastic/beats/v7/libbeat/common" "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -64,7 +66,95 @@ func TestGeneratePodData(t *testing.T) { }, } - assert.Equal(t, pod, data.pod) + assert.Equal(t, string(pod.GetUID()), data.uid) assert.Equal(t, mapping, data.mapping) assert.Equal(t, processors, data.processors) } + +func TestGenerateContainerPodData(t *testing.T) { + uid := "005f3b90-4b9d-12f8-acf0-31020a840133" + pod := &kubernetes.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testpod", + UID: types.UID(uid), + Namespace: "testns", + Labels: map[string]string{ + "foo": "bar", + }, + Annotations: map[string]string{ + "app": "production", + }, + }, + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + Spec: kubernetes.PodSpec{ + NodeName: "testnode", + }, + Status: kubernetes.PodStatus{PodIP: "127.0.0.5"}, + } + + providerDataChan := make(chan providerData) + done := make(chan bool, 1) + + containers := []kubernetes.Container{ + { + Name: "nginx", + Image: "nginx:1.120", + Ports: []kubernetes.ContainerPort{ + { + Name: "http", + Protocol: v1.ProtocolTCP, + ContainerPort: 80, + }, + }, + }, + } + containerStatuses := []kubernetes.PodContainerStatus{ + { + Name: "nginx", + Ready: true, + ContainerID: "crio://asdfghdeadbeef", + }, + } + go generateContainerData(pod, containers, containerStatuses, providerDataChan, done) + + mapping := map[string]interface{}{ + "namespace": pod.GetNamespace(), + "pod": map[string]interface{}{ + "uid": string(pod.GetUID()), + "name": pod.GetName(), + "labels": pod.GetLabels(), + "ip": pod.Status.PodIP, + }, + "container": map[string]interface{}{ + "id": "asdfghdeadbeef", + "name": "nginx", + "image": "nginx:1.120", + "runtime": "crio", + }, + } + + processors := []map[string]interface{}{ + { + "add_fields": map[string]interface{}{ + "fields": mapping, + "target": "kubernetes", + }, + }, + } + + cuid := fmt.Sprintf("%s.%s", pod.GetObjectMeta().GetUID(), "nginx") + for { + select { + case data := <-providerDataChan: + assert.Equal(t, cuid, data.uid) + assert.Equal(t, mapping, data.mapping) + assert.Equal(t, processors, data.processors) + case <-done: + return + } + } + +}