Skip to content

Commit

Permalink
[Agent] Support Node and Service autodiscovery in k8s provider (#26801)
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrsMark authored Jul 22, 2021
1 parent fd59cf7 commit 6635acb
Show file tree
Hide file tree
Showing 9 changed files with 1,030 additions and 156 deletions.
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,4 @@
- Add proxy support to enroll command. {pull}26514[26514]
- Enable configuring monitoring namespace {issue}26439[26439]
- Communicate with Fleet Server over HTTP2. {pull}26474[26474]
- Support Node and Service autodiscovery in kubernetes dynamic provider. {pull}26801[26801]
41 changes: 37 additions & 4 deletions x-pack/elastic-agent/pkg/composable/providers/kubernetes/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,57 @@ package kubernetes

import (
"time"

"github.com/elastic/beats/v7/libbeat/logp"
)

// Config for kubernetes provider
type Config struct {
Scope string `config:"scope"`
Resources Resources `config:"resources"`

KubeConfig string `config:"kube_config"`
Namespace string `config:"namespace"`
SyncPeriod time.Duration `config:"sync_period"`
CleanupTimeout time.Duration `config:"cleanup_timeout" validate:"positive"`

// Needed when resource is a pod
// Needed when resource is a Pod or Node
Node string `config:"node"`
}

// Scope of the provider (cluster or node)
Scope string `config:"scope"`
// Resources config section for resources' config blocks
type Resources struct {
Pod Enabled `config:"pod"`
Node Enabled `config:"node"`
Service Enabled `config:"service"`
}

// Enabled config section for resources' config blocks
type Enabled struct {
Enabled bool `config:"enabled"`
}

// InitDefaults initializes the default values for the config.
func (c *Config) InitDefaults() {
c.SyncPeriod = 10 * time.Minute
c.CleanupTimeout = 60 * time.Second
c.SyncPeriod = 10 * time.Minute
c.Scope = "node"
}

// Validate ensures correctness of config
func (c *Config) Validate() error {
// Check if resource is service. If yes then default the scope to "cluster".
if c.Resources.Service.Enabled {
if c.Scope == "node" {
logp.L().Warnf("can not set scope to `node` when using resource `Service`. resetting scope to `cluster`")
}
c.Scope = "cluster"
}

if !c.Resources.Pod.Enabled && !c.Resources.Node.Enabled && !c.Resources.Service.Enabled {
c.Resources.Pod = Enabled{true}
c.Resources.Node = Enabled{true}
}

return nil
}
227 changes: 75 additions & 152 deletions x-pack/elastic-agent/pkg/composable/providers/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ package kubernetes

import (
"fmt"
"time"

k8s "k8s.io/client-go/kubernetes"

"github.com/elastic/beats/v7/libbeat/common/kubernetes"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
Expand All @@ -16,10 +17,14 @@ import (
)

const (
// NodePriority is the priority that node mappings are added to the provider.
NodePriority = 0
// PodPriority is the priority that pod mappings are added to the provider.
PodPriority = 0
PodPriority = 1
// ContainerPriority is the priority that container mappings are added to the provider.
ContainerPriority = 1
ContainerPriority = 2
// ServicePriority is the priority that service mappings are added to the provider.
ServicePriority = 3
)

func init() {
Expand All @@ -31,12 +36,6 @@ type dynamicProvider struct {
config *Config
}

type eventWatcher struct {
logger *logger.Logger
cleanupTimeout time.Duration
comm composable.DynamicProviderComm
}

// DynamicProviderBuilder builds the dynamic provider.
func DynamicProviderBuilder(logger *logger.Logger, c *config.Config) (composable.DynamicProvider, error) {
var cfg Config
Expand All @@ -50,172 +49,96 @@ func DynamicProviderBuilder(logger *logger.Logger, c *config.Config) (composable
return &dynamicProvider{logger, &cfg}, nil
}

// Run runs the environment context provider.
// Run runs the kubernetes context provider.
func (p *dynamicProvider) Run(comm composable.DynamicProviderComm) error {
client, err := kubernetes.GetKubernetesClient(p.config.KubeConfig)
if p.config.Resources.Pod.Enabled {
err := p.watchResource(comm, "pod", p.config)
if err != nil {
return err
}
}
if p.config.Resources.Node.Enabled {
err := p.watchResource(comm, "node", p.config)
if err != nil {
return err
}
}
if p.config.Resources.Service.Enabled {
err := p.watchResource(comm, "service", p.config)
if err != nil {
return err
}
}
return nil
}

// watchResource initializes the proper watcher according to the given resource (pod, node, service)
// and starts watching for such resource's events.
func (p *dynamicProvider) watchResource(
comm composable.DynamicProviderComm,
resourceType string,
config *Config) error {
client, err := kubernetes.GetKubernetesClient(config.KubeConfig)
if err != nil {
// info only; return nil (do nothing)
p.logger.Debugf("Kubernetes provider skipped, unable to connect: %s", err)
p.logger.Debugf("Kubernetes provider for resource %s skipped, unable to connect: %s", resourceType, err)
return nil
}

// Ensure that node is set correctly whenever the scope is set to "node". Make sure that node is empty
// when cluster scope is enforced.
p.logger.Infof("Kubernetes provider started with %s scope", p.config.Scope)
p.logger.Infof("Kubernetes provider started for resource %s with %s scope", resourceType, p.config.Scope)
if p.config.Scope == "node" {
p.logger.Debugf("Initializing Kubernetes watcher using node: %v", p.config.Node)
p.config.Node = kubernetes.DiscoverKubernetesNode(p.logger, p.config.Node, kubernetes.IsInCluster(p.config.KubeConfig), client)
p.logger.Debugf(
"Initializing Kubernetes watcher for resource %s using node: %v",
resourceType,
config.Node)
config.Node = kubernetes.DiscoverKubernetesNode(
p.logger, config.Node,
kubernetes.IsInCluster(config.KubeConfig),
client)
} else {
p.config.Node = ""
config.Node = ""
}

watcher, err := kubernetes.NewWatcher(client, &kubernetes.Pod{}, kubernetes.WatchOptions{
SyncTimeout: p.config.SyncPeriod,
Node: p.config.Node,
//Namespace: p.config.Namespace,
}, nil)
watcher, err := p.newWatcher(resourceType, comm, client, config)
if err != nil {
return errors.New(err, "couldn't create kubernetes watcher")
return errors.New(err, "couldn't create kubernetes watcher for resource %s", resourceType)
}
watcher.AddEventHandler(&eventWatcher{p.logger, p.config.CleanupTimeout, comm})

err = watcher.Start()
if err != nil {
return errors.New(err, "couldn't start kubernetes watcher")
return errors.New(err, "couldn't start kubernetes watcher for resource %s", resourceType)
}

return nil
}

func (p *eventWatcher) emitRunning(pod *kubernetes.Pod) {
mapping := map[string]interface{}{
"namespace": pod.GetNamespace(),
"pod": map[string]interface{}{
"uid": string(pod.GetUID()),
"name": pod.GetName(),
"labels": pod.GetLabels(),
"ip": pod.Status.PodIP,
},
}

processors := []map[string]interface{}{
{
"add_fields": map[string]interface{}{
"fields": mapping,
"target": "kubernetes",
},
},
}

// Emit the pod
// We emit Pod + containers to ensure that configs matching Pod only
// get Pod metadata (not specific to any container)
p.comm.AddOrUpdate(string(pod.GetUID()), PodPriority, mapping, processors)

// Emit all containers in the pod
p.emitContainers(pod, pod.Spec.Containers, pod.Status.ContainerStatuses)

// TODO deal with init containers stopping after initialization
p.emitContainers(pod, pod.Spec.InitContainers, pod.Status.InitContainerStatuses)
}

func (p *eventWatcher) emitContainers(pod *kubernetes.Pod, containers []kubernetes.Container, containerstatuses []kubernetes.PodContainerStatus) {
// Collect all runtimes from status information.
containerIDs := map[string]string{}
runtimes := map[string]string{}
for _, c := range containerstatuses {
cid, runtime := kubernetes.ContainerIDWithRuntime(c)
containerIDs[c.Name] = cid
runtimes[c.Name] = runtime
}

for _, c := range containers {
// If it doesn't have an ID, container doesn't exist in
// the runtime, emit only an event if we are stopping, so
// we are sure of cleaning up configurations.
cid := containerIDs[c.Name]
if cid == "" {
continue
// newWatcher initializes the proper watcher according to the given resource (pod, node, service).
func (p *dynamicProvider) newWatcher(
resourceType string,
comm composable.DynamicProviderComm,
client k8s.Interface,
config *Config) (kubernetes.Watcher, error) {
switch resourceType {
case "pod":
watcher, err := NewPodWatcher(comm, config, p.logger, client, p.config.Scope)
if err != nil {
return nil, err
}

// ID is the combination of pod UID + container name
eventID := fmt.Sprintf("%s.%s", pod.GetObjectMeta().GetUID(), c.Name)

mapping := map[string]interface{}{
"namespace": pod.GetNamespace(),
"pod": map[string]interface{}{
"uid": string(pod.GetUID()),
"name": pod.GetName(),
"labels": pod.GetLabels(),
"ip": pod.Status.PodIP,
},
"container": map[string]interface{}{
"id": cid,
"name": c.Name,
"image": c.Image,
"runtime": runtimes[c.Name],
},
return watcher, nil
case "node":
watcher, err := NewNodeWatcher(comm, config, p.logger, client, p.config.Scope)
if err != nil {
return nil, err
}

processors := []map[string]interface{}{
{
"add_fields": map[string]interface{}{
"fields": mapping,
"target": "kubernetes",
},
},
return watcher, nil
case "service":
watcher, err := NewServiceWatcher(comm, config, p.logger, client, p.config.Scope)
if err != nil {
return nil, err
}

// Emit the container
p.comm.AddOrUpdate(eventID, ContainerPriority, mapping, processors)
return watcher, nil
default:
return nil, fmt.Errorf("unsupported autodiscover resource %s", resourceType)
}
}

func (p *eventWatcher) emitStopped(pod *kubernetes.Pod) {
p.comm.Remove(string(pod.GetUID()))

for _, c := range pod.Spec.Containers {
// ID is the combination of pod UID + container name
eventID := fmt.Sprintf("%s.%s", pod.GetObjectMeta().GetUID(), c.Name)
p.comm.Remove(eventID)
}

for _, c := range pod.Spec.InitContainers {
// ID is the combination of pod UID + container name
eventID := fmt.Sprintf("%s.%s", pod.GetObjectMeta().GetUID(), c.Name)
p.comm.Remove(eventID)
}
}

// OnAdd ensures processing of pod objects that are newly added
func (p *eventWatcher) OnAdd(obj interface{}) {
p.logger.Debugf("pod add: %+v", obj)
p.emitRunning(obj.(*kubernetes.Pod))
}

// OnUpdate emits events for a given pod depending on the state of the pod,
// if it is terminating, a stop event is scheduled, if not, a stop and a start
// events are sent sequentially to recreate the resources assotiated to the pod.
func (p *eventWatcher) OnUpdate(obj interface{}) {
pod := obj.(*kubernetes.Pod)

p.logger.Debugf("pod update for pod: %+v, status: %+v", pod.Name, pod.Status.Phase)
switch pod.Status.Phase {
case kubernetes.PodSucceeded, kubernetes.PodFailed:
time.AfterFunc(p.cleanupTimeout, func() { p.emitStopped(pod) })
return
case kubernetes.PodPending:
p.logger.Debugf("pod update (pending): don't know what to do with this pod yet, skipping for now: %+v", obj)
return
}

p.logger.Debugf("pod update: %+v", obj)
p.emitRunning(pod)
}

// OnDelete stops pod objects that are deleted
func (p *eventWatcher) OnDelete(obj interface{}) {
p.logger.Debugf("pod delete: %+v", obj)
pod := obj.(*kubernetes.Pod)
time.AfterFunc(p.cleanupTimeout, func() { p.emitStopped(pod) })
}
Loading

0 comments on commit 6635acb

Please sign in to comment.