Skip to content

Commit

Permalink
review comments
Browse files Browse the repository at this point in the history
Signed-off-by: chrismark <chrismarkou92@gmail.com>
  • Loading branch information
ChrsMark committed Jul 21, 2021
1 parent ea014a1 commit 88260fd
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 87 deletions.
41 changes: 14 additions & 27 deletions x-pack/elastic-agent/pkg/composable/providers/kubernetes/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,63 +18,50 @@ type Config struct {
Scope string `config:"scope"`
Resources Resources `config:"resources"`

// expand config settings at root level of config too
KubeConfig string `config:"kube_config"`
Namespace string `config:"namespace"`
SyncPeriod time.Duration `config:"sync_period"`
CleanupTimeout time.Duration `config:"cleanup_timeout" validate:"positive"`

// Needed when resource is a Pod or Node
Node string `config:"node"`

LabelsDedot bool `config:"labels.dedot"`
AnnotationsDedot bool `config:"annotations.dedot"`
}

// Resources config section for resources' config blocks
type Resources struct {
Pod *ResourceConfig `config:"pod"`
Node *ResourceConfig `config:"node"`
Service *ResourceConfig `config:"service"`
Pod Enabled `config:"pod"`
Node Enabled `config:"node"`
Service Enabled `config:"service"`
}

// ResourceConfig for kubernetes resource
type ResourceConfig struct {
KubeConfig string `config:"kube_config"`
Namespace string `config:"namespace"`
SyncPeriod time.Duration `config:"sync_period"`
CleanupTimeout time.Duration `config:"cleanup_timeout" validate:"positive"`

// Needed when resource is a Pod or Node
Node string `config:"node"`
type Enabled struct {
Enabled bool `config:"enabled"`
}

// InitDefaults initializes the default values for the config.
func (c *Config) InitDefaults() {
c.CleanupTimeout = 60 * time.Second
c.SyncPeriod = 10 * time.Minute
c.Scope = "node"
c.Resources.Pod = Enabled{true}
c.Resources.Node = Enabled{true}
c.Resources.Service = Enabled{true}
c.LabelsDedot = true
c.AnnotationsDedot = true
}

// Validate ensures correctness of config
func (c *Config) Validate() error {
// Check if resource is service. If yes then default the scope to "cluster".
if c.Resources.Service != nil {
if c.Resources.Service.Enabled {
if c.Scope == "node" {
logp.L().Warnf("can not set scope to `node` when using resource `Service`. resetting scope to `cluster`")
}
c.Scope = "cluster"
}
baseCfg := &ResourceConfig{
CleanupTimeout: c.CleanupTimeout,
SyncPeriod: c.CleanupTimeout,
KubeConfig: c.KubeConfig,
Namespace: c.Namespace,
Node: c.Node,
}
if c.Resources.Pod == nil {
c.Resources.Pod = baseCfg
}
if c.Resources.Node == nil {
c.Resources.Node = baseCfg
}

return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,23 +51,20 @@ func DynamicProviderBuilder(logger *logger.Logger, c *config.Config) (composable

// Run runs the kubernetes context provider.
func (p *dynamicProvider) Run(comm composable.DynamicProviderComm) error {
if p.config.Resources.Pod != nil {
resourceConfig := p.config.Resources.Pod
err := p.watchResource(comm, "pod", resourceConfig)
if p.config.Resources.Pod.Enabled {
err := p.watchResource(comm, "pod", p.config)
if err != nil {
return err
}
}
if p.config.Resources.Node != nil {
resourceConfig := p.config.Resources.Node
err := p.watchResource(comm, "node", resourceConfig)
if p.config.Resources.Node.Enabled {
err := p.watchResource(comm, "node", p.config)
if err != nil {
return err
}
}
if p.config.Resources.Service != nil {
resourceConfig := p.config.Resources.Service
err := p.watchResource(comm, "service", resourceConfig)
if p.config.Resources.Service.Enabled {
err := p.watchResource(comm, "service", p.config)
if err != nil {
return err
}
Expand All @@ -80,8 +77,8 @@ func (p *dynamicProvider) Run(comm composable.DynamicProviderComm) error {
func (p *dynamicProvider) watchResource(
comm composable.DynamicProviderComm,
resourceType string,
resourceConfig *ResourceConfig) error {
client, err := kubernetes.GetKubernetesClient(resourceConfig.KubeConfig)
config *Config) error {
client, err := kubernetes.GetKubernetesClient(config.KubeConfig)
if err != nil {
// info only; return nil (do nothing)
p.logger.Debugf("Kubernetes provider for resource %s skipped, unable to connect: %s", resourceType, err)
Expand All @@ -95,16 +92,16 @@ func (p *dynamicProvider) watchResource(
p.logger.Debugf(
"Initializing Kubernetes watcher for resource %s using node: %v",
resourceType,
resourceConfig.Node)
resourceConfig.Node = kubernetes.DiscoverKubernetesNode(
p.logger, resourceConfig.Node,
kubernetes.IsInCluster(resourceConfig.KubeConfig),
config.Node)
config.Node = kubernetes.DiscoverKubernetesNode(
p.logger, config.Node,
kubernetes.IsInCluster(config.KubeConfig),
client)
} else {
resourceConfig.Node = ""
config.Node = ""
}

watcher, err := p.newWatcher(resourceType, comm, client, resourceConfig)
watcher, err := p.newWatcher(resourceType, comm, client, config)
if err != nil {
return errors.New(err, "couldn't create kubernetes watcher for resource %s", resourceType)
}
Expand All @@ -121,7 +118,7 @@ func (p *dynamicProvider) newWatcher(
resourceType string,
comm composable.DynamicProviderComm,
client k8s.Interface,
config *ResourceConfig) (kubernetes.Watcher, error) {
config *Config) (kubernetes.Watcher, error) {
switch resourceType {
case "pod":
watcher, err := NewPodWatcher(comm, config, p.logger, client, p.config.Scope)
Expand Down
36 changes: 27 additions & 9 deletions x-pack/elastic-agent/pkg/composable/providers/kubernetes/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type node struct {
cleanupTimeout time.Duration
comm composable.DynamicProviderComm
scope string
config *Config
}

type nodeData struct {
Expand All @@ -35,7 +36,7 @@ type nodeData struct {
// NewNodeWatcher creates a watcher that can discover and process node objects
func NewNodeWatcher(
comm composable.DynamicProviderComm,
cfg *ResourceConfig,
cfg *Config,
logger *logp.Logger,
client k8s.Interface,
scope string) (kubernetes.Watcher, error) {
Expand All @@ -48,17 +49,17 @@ func NewNodeWatcher(
if err != nil {
return nil, errors.New(err, "couldn't create kubernetes watcher")
}
watcher.AddEventHandler(&node{logger, cfg.CleanupTimeout, comm, scope})
watcher.AddEventHandler(&node{logger, cfg.CleanupTimeout, comm, scope, cfg})

return watcher, nil
}

func (n *node) emitRunning(node *kubernetes.Node) {
data := generateNodeData(node)
data.mapping["scope"] = n.scope
data := generateNodeData(node, n.config)
if data == nil {
return
}
data.mapping["scope"] = n.scope

// Emit the node
n.comm.AddOrUpdate(string(node.GetUID()), NodePriority, data.mapping, data.processors)
Expand Down Expand Up @@ -86,7 +87,6 @@ func (n *node) OnUpdate(obj interface{}) {
time.AfterFunc(n.cleanupTimeout, func() { n.emitStopped(node) })
} else {
n.logger.Debugf("Watcher Node update: %+v", obj)
n.emitStopped(node)
n.emitRunning(node)
}
}
Expand Down Expand Up @@ -131,6 +131,9 @@ func isUpdated(o, n interface{}) bool {
return false
}

// getAddress returns the IP of the node Resource. If there is a
// NodeExternalIP then it is returned, if not then it will try to find
// an address of NodeExternalIP type and if not found it looks for a NodeHostName address type
func getAddress(node *kubernetes.Node) string {
for _, address := range node.Status.Addresses {
if address.Type == v1.NodeExternalIP && address.Address != "" {
Expand All @@ -139,7 +142,7 @@ func getAddress(node *kubernetes.Node) string {
}

for _, address := range node.Status.Addresses {
if address.Type == v1.NodeInternalIP && address.Address != "" {
if address.Type == v1.NodeExternalIP && address.Address != "" {
return address.Address
}
}
Expand All @@ -162,7 +165,7 @@ func isNodeReady(node *kubernetes.Node) bool {
return false
}

func generateNodeData(node *kubernetes.Node) *nodeData {
func generateNodeData(node *kubernetes.Node, cfg *Config) *nodeData {
host := getAddress(node)

// If a node doesn't have an IP then dont monitor it
Expand All @@ -180,14 +183,29 @@ func generateNodeData(node *kubernetes.Node) *nodeData {
// Pass annotations to all events so that it can be used in templating and by annotation builders.
annotations := common.MapStr{}
for k, v := range node.GetObjectMeta().GetAnnotations() {
safemapstr.Put(annotations, k, v)
if cfg.AnnotationsDedot {
annotation := common.DeDot(k)
annotations.Put(annotation, v)
} else {
safemapstr.Put(annotations, k, v)
}
}

labels := common.MapStr{}
for k, v := range node.GetObjectMeta().GetLabels() {
if cfg.LabelsDedot {
label := common.DeDot(k)
labels.Put(label, v)
} else {
safemapstr.Put(labels, k, v)
}
}

mapping := map[string]interface{}{
"node": map[string]interface{}{
"uid": string(node.GetUID()),
"name": node.GetName(),
"labels": node.GetLabels(),
"labels": labels,
"annotations": annotations,
"ip": host,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@ func TestGenerateNodeData(t *testing.T) {
},
}

data := generateNodeData(node)
data := generateNodeData(node, &Config{LabelsDedot: true, AnnotationsDedot: true})

mapping := map[string]interface{}{
"node": map[string]interface{}{
"uid": string(node.GetUID()),
"name": node.GetName(),
"labels": node.GetLabels(),
"uid": string(node.GetUID()),
"name": node.GetName(),
"labels": common.MapStr{
"foo": "bar",
},
"annotations": common.MapStr{
"baz": "ban",
},
Expand Down
45 changes: 36 additions & 9 deletions x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type pod struct {
cleanupTimeout time.Duration
comm composable.DynamicProviderComm
scope string
config *Config
}

type providerData struct {
Expand All @@ -35,7 +36,7 @@ type providerData struct {
// NewPodWatcher creates a watcher that can discover and process pod objects
func NewPodWatcher(
comm composable.DynamicProviderComm,
cfg *ResourceConfig,
cfg *Config,
logger *logp.Logger,
client k8s.Interface,
scope string) (kubernetes.Watcher, error) {
Expand All @@ -48,13 +49,13 @@ func NewPodWatcher(
if err != nil {
return nil, errors.New(err, "couldn't create kubernetes watcher")
}
watcher.AddEventHandler(&pod{logger, cfg.CleanupTimeout, comm, scope})
watcher.AddEventHandler(&pod{logger, cfg.CleanupTimeout, comm, scope, cfg})

return watcher, nil
}

func (p *pod) emitRunning(pod *kubernetes.Pod) {
data := generatePodData(pod)
data := generatePodData(pod, p.config)
data.mapping["scope"] = p.scope
// Emit the pod
// We emit Pod + containers to ensure that configs matching Pod only
Expand All @@ -72,7 +73,7 @@ func (p *pod) emitContainers(pod *kubernetes.Pod, containers []kubernetes.Contai

providerDataChan := make(chan providerData)
done := make(chan bool, 1)
go generateContainerData(pod, containers, containerstatuses, providerDataChan, done)
go generateContainerData(pod, containers, containerstatuses, providerDataChan, done, p.config)

for {
select {
Expand Down Expand Up @@ -135,21 +136,36 @@ func (p *pod) OnDelete(obj interface{}) {
time.AfterFunc(p.cleanupTimeout, func() { p.emitStopped(pod) })
}

func generatePodData(pod *kubernetes.Pod) providerData {
func generatePodData(pod *kubernetes.Pod, cfg *Config) providerData {
//TODO: add metadata here too ie -> meta := s.metagen.Generate(pod)

// Pass annotations to all events so that it can be used in templating and by annotation builders.
annotations := common.MapStr{}
for k, v := range pod.GetObjectMeta().GetAnnotations() {
safemapstr.Put(annotations, k, v)
if cfg.AnnotationsDedot {
annotation := common.DeDot(k)
annotations.Put(annotation, v)
} else {
safemapstr.Put(annotations, k, v)
}
}

labels := common.MapStr{}
for k, v := range pod.GetObjectMeta().GetLabels() {
if cfg.LabelsDedot {
label := common.DeDot(k)
labels.Put(label, v)
} else {
safemapstr.Put(labels, k, v)
}
}

mapping := map[string]interface{}{
"namespace": pod.GetNamespace(),
"pod": map[string]interface{}{
"uid": string(pod.GetUID()),
"name": pod.GetName(),
"labels": pod.GetLabels(),
"labels": labels,
"annotations": annotations,
"ip": pod.Status.PodIP,
},
Expand All @@ -173,7 +189,8 @@ func generateContainerData(
containers []kubernetes.Container,
containerstatuses []kubernetes.PodContainerStatus,
dataChan chan providerData,
done chan bool) {
done chan bool,
cfg *Config) {
//TODO: add metadata here too ie -> meta := s.metagen.Generate()

containerIDs := map[string]string{}
Expand All @@ -184,6 +201,16 @@ func generateContainerData(
runtimes[c.Name] = runtime
}

labels := common.MapStr{}
for k, v := range pod.GetObjectMeta().GetLabels() {
if cfg.LabelsDedot {
label := common.DeDot(k)
labels.Put(label, v)
} else {
safemapstr.Put(labels, k, v)
}
}

for _, c := range containers {
// If it doesn't have an ID, container doesn't exist in
// the runtime, emit only an event if we are stopping, so
Expand All @@ -201,7 +228,7 @@ func generateContainerData(
"pod": map[string]interface{}{
"uid": string(pod.GetUID()),
"name": pod.GetName(),
"labels": pod.GetLabels(),
"labels": labels,
"ip": pod.Status.PodIP,
},
"container": map[string]interface{}{
Expand Down
Loading

0 comments on commit 88260fd

Please sign in to comment.