From 6f393782f44c3ec983b88f055d77a27159fa2551 Mon Sep 17 00:00:00 2001 From: chrismark Date: Wed, 21 Jul 2021 13:37:08 +0300 Subject: [PATCH] Remove usage of channels Signed-off-by: chrismark --- .../composable/providers/kubernetes/pod.go | 26 ++--------- .../providers/kubernetes/pod_test.go | 46 +++++++++++++------ 2 files changed, 35 insertions(+), 37 deletions(-) diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go index d9559963a78..56e24eca70f 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go @@ -70,21 +70,7 @@ func (p *pod) emitRunning(pod *kubernetes.Pod) { } func (p *pod) emitContainers(pod *kubernetes.Pod, containers []kubernetes.Container, containerstatuses []kubernetes.PodContainerStatus) { - - providerDataChan := make(chan providerData) - done := make(chan bool, 1) - go generateContainerData(pod, containers, containerstatuses, providerDataChan, done, p.config) - - for { - select { - case data := <-providerDataChan: - // Emit the container - data.mapping["scope"] = p.scope - p.comm.AddOrUpdate(data.uid, ContainerPriority, data.mapping, data.processors) - case <-done: - return - } - } + generateContainerData(p.comm, pod, containers, containerstatuses, p.config) } func (p *pod) emitStopped(pod *kubernetes.Pod) { @@ -185,11 +171,10 @@ func generatePodData(pod *kubernetes.Pod, cfg *Config) providerData { } func generateContainerData( + comm composable.DynamicProviderComm, pod *kubernetes.Pod, containers []kubernetes.Container, containerstatuses []kubernetes.PodContainerStatus, - dataChan chan providerData, - done chan bool, cfg *Config) { //TODO: add metadata here too ie -> meta := s.metagen.Generate() @@ -247,11 +232,6 @@ func generateContainerData( }, }, } - dataChan <- providerData{ - uid: eventID, - mapping: mapping, - processors: processors, - } + comm.AddOrUpdate(eventID, ContainerPriority, mapping, processors) } - done <- true } diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod_test.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod_test.go index 3d71d838ed1..5a00cf9e2df 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod_test.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod_test.go @@ -5,6 +5,7 @@ package kubernetes import ( + "context" "fmt" "testing" @@ -97,8 +98,7 @@ func TestGenerateContainerPodData(t *testing.T) { Status: kubernetes.PodStatus{PodIP: "127.0.0.5"}, } - providerDataChan := make(chan providerData) - done := make(chan bool, 1) + providerDataChan := make(chan providerData, 1) containers := []kubernetes.Container{ { @@ -120,12 +120,16 @@ func TestGenerateContainerPodData(t *testing.T) { ContainerID: "crio://asdfghdeadbeef", }, } - go generateContainerData( + comm := MockDynamicComm{ + context.TODO(), + providerDataChan, + } + generateContainerData( + &comm, pod, containers, containerStatuses, - providerDataChan, - done, &Config{LabelsDedot: true, AnnotationsDedot: true}) + &Config{LabelsDedot: true, AnnotationsDedot: true}) mapping := map[string]interface{}{ "namespace": pod.GetNamespace(), @@ -155,15 +159,29 @@ func TestGenerateContainerPodData(t *testing.T) { } cuid := fmt.Sprintf("%s.%s", pod.GetObjectMeta().GetUID(), "nginx") - for { - select { - case data := <-providerDataChan: - assert.Equal(t, cuid, data.uid) - assert.Equal(t, mapping, data.mapping) - assert.Equal(t, processors, data.processors) - case <-done: - return - } + data := <-providerDataChan + assert.Equal(t, cuid, data.uid) + assert.Equal(t, mapping, data.mapping) + assert.Equal(t, processors, data.processors) + +} + +// MockDynamicComm is used in tests. +type MockDynamicComm struct { + context.Context + providerDataChan chan providerData +} + +// AddOrUpdate adds or updates a current mapping. +func (t *MockDynamicComm) AddOrUpdate(id string, priority int, mapping map[string]interface{}, processors []map[string]interface{}) error { + t.providerDataChan <- providerData{ + id, + mapping, + processors, } + return nil +} +// Remove +func (t *MockDynamicComm) Remove(id string) { }