Skip to content

Commit

Permalink
Remove usage of channels
Browse files Browse the repository at this point in the history
Signed-off-by: chrismark <chrismarkou92@gmail.com>
  • Loading branch information
ChrsMark committed Jul 21, 2021
1 parent f778936 commit 6f39378
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 37 deletions.
26 changes: 3 additions & 23 deletions x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,21 +70,7 @@ func (p *pod) emitRunning(pod *kubernetes.Pod) {
}

func (p *pod) emitContainers(pod *kubernetes.Pod, containers []kubernetes.Container, containerstatuses []kubernetes.PodContainerStatus) {

providerDataChan := make(chan providerData)
done := make(chan bool, 1)
go generateContainerData(pod, containers, containerstatuses, providerDataChan, done, p.config)

for {
select {
case data := <-providerDataChan:
// Emit the container
data.mapping["scope"] = p.scope
p.comm.AddOrUpdate(data.uid, ContainerPriority, data.mapping, data.processors)
case <-done:
return
}
}
generateContainerData(p.comm, pod, containers, containerstatuses, p.config)
}

func (p *pod) emitStopped(pod *kubernetes.Pod) {
Expand Down Expand Up @@ -185,11 +171,10 @@ func generatePodData(pod *kubernetes.Pod, cfg *Config) providerData {
}

func generateContainerData(
comm composable.DynamicProviderComm,
pod *kubernetes.Pod,
containers []kubernetes.Container,
containerstatuses []kubernetes.PodContainerStatus,
dataChan chan providerData,
done chan bool,
cfg *Config) {
//TODO: add metadata here too ie -> meta := s.metagen.Generate()

Expand Down Expand Up @@ -247,11 +232,6 @@ func generateContainerData(
},
},
}
dataChan <- providerData{
uid: eventID,
mapping: mapping,
processors: processors,
}
comm.AddOrUpdate(eventID, ContainerPriority, mapping, processors)
}
done <- true
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package kubernetes

import (
"context"
"fmt"
"testing"

Expand Down Expand Up @@ -97,8 +98,7 @@ func TestGenerateContainerPodData(t *testing.T) {
Status: kubernetes.PodStatus{PodIP: "127.0.0.5"},
}

providerDataChan := make(chan providerData)
done := make(chan bool, 1)
providerDataChan := make(chan providerData, 1)

containers := []kubernetes.Container{
{
Expand All @@ -120,12 +120,16 @@ func TestGenerateContainerPodData(t *testing.T) {
ContainerID: "crio://asdfghdeadbeef",
},
}
go generateContainerData(
comm := MockDynamicComm{
context.TODO(),
providerDataChan,
}
generateContainerData(
&comm,
pod,
containers,
containerStatuses,
providerDataChan,
done, &Config{LabelsDedot: true, AnnotationsDedot: true})
&Config{LabelsDedot: true, AnnotationsDedot: true})

mapping := map[string]interface{}{
"namespace": pod.GetNamespace(),
Expand Down Expand Up @@ -155,15 +159,29 @@ func TestGenerateContainerPodData(t *testing.T) {
}

cuid := fmt.Sprintf("%s.%s", pod.GetObjectMeta().GetUID(), "nginx")
for {
select {
case data := <-providerDataChan:
assert.Equal(t, cuid, data.uid)
assert.Equal(t, mapping, data.mapping)
assert.Equal(t, processors, data.processors)
case <-done:
return
}
data := <-providerDataChan
assert.Equal(t, cuid, data.uid)
assert.Equal(t, mapping, data.mapping)
assert.Equal(t, processors, data.processors)

}

// MockDynamicComm is used in tests.
type MockDynamicComm struct {
context.Context
providerDataChan chan providerData
}

// AddOrUpdate adds or updates a current mapping.
func (t *MockDynamicComm) AddOrUpdate(id string, priority int, mapping map[string]interface{}, processors []map[string]interface{}) error {
t.providerDataChan <- providerData{
id,
mapping,
processors,
}
return nil
}

// Remove
func (t *MockDynamicComm) Remove(id string) {
}

0 comments on commit 6f39378

Please sign in to comment.