Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.x](backport #26801) [Agent] Support Node and Service autodiscovery in k8s provider #27014

Merged
merged 1 commit into from
Jul 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,4 @@
- Add proxy support to enroll command. {pull}26514[26514]
- Enable configuring monitoring namespace {issue}26439[26439]
- Communicate with Fleet Server over HTTP2. {pull}26474[26474]
- Support Node and Service autodiscovery in kubernetes dynamic provider. {pull}26801[26801]
41 changes: 37 additions & 4 deletions x-pack/elastic-agent/pkg/composable/providers/kubernetes/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,57 @@ package kubernetes

import (
"time"

"github.com/elastic/beats/v7/libbeat/logp"
)

// Config for kubernetes provider
type Config struct {
Scope string `config:"scope"`
Resources Resources `config:"resources"`

KubeConfig string `config:"kube_config"`
Namespace string `config:"namespace"`
SyncPeriod time.Duration `config:"sync_period"`
CleanupTimeout time.Duration `config:"cleanup_timeout" validate:"positive"`

// Needed when resource is a pod
// Needed when resource is a Pod or Node
Node string `config:"node"`
}

// Scope of the provider (cluster or node)
Scope string `config:"scope"`
// Resources config section for resources' config blocks
type Resources struct {
Pod Enabled `config:"pod"`
Node Enabled `config:"node"`
Service Enabled `config:"service"`
}

// Enabled config section for resources' config blocks
type Enabled struct {
Enabled bool `config:"enabled"`
}

// InitDefaults initializes the default values for the config.
func (c *Config) InitDefaults() {
c.SyncPeriod = 10 * time.Minute
c.CleanupTimeout = 60 * time.Second
c.SyncPeriod = 10 * time.Minute
c.Scope = "node"
}

// Validate ensures correctness of config
func (c *Config) Validate() error {
// Check if resource is service. If yes then default the scope to "cluster".
if c.Resources.Service.Enabled {
if c.Scope == "node" {
logp.L().Warnf("can not set scope to `node` when using resource `Service`. resetting scope to `cluster`")
}
c.Scope = "cluster"
}

if !c.Resources.Pod.Enabled && !c.Resources.Node.Enabled && !c.Resources.Service.Enabled {
c.Resources.Pod = Enabled{true}
c.Resources.Node = Enabled{true}
}

return nil
}
227 changes: 75 additions & 152 deletions x-pack/elastic-agent/pkg/composable/providers/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ package kubernetes

import (
"fmt"
"time"

k8s "k8s.io/client-go/kubernetes"

"github.com/elastic/beats/v7/libbeat/common/kubernetes"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
Expand All @@ -16,10 +17,14 @@ import (
)

const (
// NodePriority is the priority that node mappings are added to the provider.
NodePriority = 0
// PodPriority is the priority that pod mappings are added to the provider.
PodPriority = 0
PodPriority = 1
// ContainerPriority is the priority that container mappings are added to the provider.
ContainerPriority = 1
ContainerPriority = 2
// ServicePriority is the priority that service mappings are added to the provider.
ServicePriority = 3
)

func init() {
Expand All @@ -31,12 +36,6 @@ type dynamicProvider struct {
config *Config
}

type eventWatcher struct {
logger *logger.Logger
cleanupTimeout time.Duration
comm composable.DynamicProviderComm
}

// DynamicProviderBuilder builds the dynamic provider.
func DynamicProviderBuilder(logger *logger.Logger, c *config.Config) (composable.DynamicProvider, error) {
var cfg Config
Expand All @@ -50,172 +49,96 @@ func DynamicProviderBuilder(logger *logger.Logger, c *config.Config) (composable
return &dynamicProvider{logger, &cfg}, nil
}

// Run runs the environment context provider.
// Run runs the kubernetes context provider.
func (p *dynamicProvider) Run(comm composable.DynamicProviderComm) error {
client, err := kubernetes.GetKubernetesClient(p.config.KubeConfig)
if p.config.Resources.Pod.Enabled {
err := p.watchResource(comm, "pod", p.config)
if err != nil {
return err
}
}
if p.config.Resources.Node.Enabled {
err := p.watchResource(comm, "node", p.config)
if err != nil {
return err
}
}
if p.config.Resources.Service.Enabled {
err := p.watchResource(comm, "service", p.config)
if err != nil {
return err
}
}
return nil
}

// watchResource initializes the proper watcher according to the given resource (pod, node, service)
// and starts watching for such resource's events.
func (p *dynamicProvider) watchResource(
comm composable.DynamicProviderComm,
resourceType string,
config *Config) error {
client, err := kubernetes.GetKubernetesClient(config.KubeConfig)
if err != nil {
// info only; return nil (do nothing)
p.logger.Debugf("Kubernetes provider skipped, unable to connect: %s", err)
p.logger.Debugf("Kubernetes provider for resource %s skipped, unable to connect: %s", resourceType, err)
return nil
}

// Ensure that node is set correctly whenever the scope is set to "node". Make sure that node is empty
// when cluster scope is enforced.
p.logger.Infof("Kubernetes provider started with %s scope", p.config.Scope)
p.logger.Infof("Kubernetes provider started for resource %s with %s scope", resourceType, p.config.Scope)
if p.config.Scope == "node" {
p.logger.Debugf("Initializing Kubernetes watcher using node: %v", p.config.Node)
p.config.Node = kubernetes.DiscoverKubernetesNode(p.logger, p.config.Node, kubernetes.IsInCluster(p.config.KubeConfig), client)
p.logger.Debugf(
"Initializing Kubernetes watcher for resource %s using node: %v",
resourceType,
config.Node)
config.Node = kubernetes.DiscoverKubernetesNode(
p.logger, config.Node,
kubernetes.IsInCluster(config.KubeConfig),
client)
} else {
p.config.Node = ""
config.Node = ""
}

watcher, err := kubernetes.NewWatcher(client, &kubernetes.Pod{}, kubernetes.WatchOptions{
SyncTimeout: p.config.SyncPeriod,
Node: p.config.Node,
//Namespace: p.config.Namespace,
}, nil)
watcher, err := p.newWatcher(resourceType, comm, client, config)
if err != nil {
return errors.New(err, "couldn't create kubernetes watcher")
return errors.New(err, "couldn't create kubernetes watcher for resource %s", resourceType)
}
watcher.AddEventHandler(&eventWatcher{p.logger, p.config.CleanupTimeout, comm})

err = watcher.Start()
if err != nil {
return errors.New(err, "couldn't start kubernetes watcher")
return errors.New(err, "couldn't start kubernetes watcher for resource %s", resourceType)
}

return nil
}

func (p *eventWatcher) emitRunning(pod *kubernetes.Pod) {
mapping := map[string]interface{}{
"namespace": pod.GetNamespace(),
"pod": map[string]interface{}{
"uid": string(pod.GetUID()),
"name": pod.GetName(),
"labels": pod.GetLabels(),
"ip": pod.Status.PodIP,
},
}

processors := []map[string]interface{}{
{
"add_fields": map[string]interface{}{
"fields": mapping,
"target": "kubernetes",
},
},
}

// Emit the pod
// We emit Pod + containers to ensure that configs matching Pod only
// get Pod metadata (not specific to any container)
p.comm.AddOrUpdate(string(pod.GetUID()), PodPriority, mapping, processors)

// Emit all containers in the pod
p.emitContainers(pod, pod.Spec.Containers, pod.Status.ContainerStatuses)

// TODO deal with init containers stopping after initialization
p.emitContainers(pod, pod.Spec.InitContainers, pod.Status.InitContainerStatuses)
}

func (p *eventWatcher) emitContainers(pod *kubernetes.Pod, containers []kubernetes.Container, containerstatuses []kubernetes.PodContainerStatus) {
// Collect all runtimes from status information.
containerIDs := map[string]string{}
runtimes := map[string]string{}
for _, c := range containerstatuses {
cid, runtime := kubernetes.ContainerIDWithRuntime(c)
containerIDs[c.Name] = cid
runtimes[c.Name] = runtime
}

for _, c := range containers {
// If it doesn't have an ID, container doesn't exist in
// the runtime, emit only an event if we are stopping, so
// we are sure of cleaning up configurations.
cid := containerIDs[c.Name]
if cid == "" {
continue
// newWatcher initializes the proper watcher according to the given resource (pod, node, service).
func (p *dynamicProvider) newWatcher(
resourceType string,
comm composable.DynamicProviderComm,
client k8s.Interface,
config *Config) (kubernetes.Watcher, error) {
switch resourceType {
case "pod":
watcher, err := NewPodWatcher(comm, config, p.logger, client, p.config.Scope)
if err != nil {
return nil, err
}

// ID is the combination of pod UID + container name
eventID := fmt.Sprintf("%s.%s", pod.GetObjectMeta().GetUID(), c.Name)

mapping := map[string]interface{}{
"namespace": pod.GetNamespace(),
"pod": map[string]interface{}{
"uid": string(pod.GetUID()),
"name": pod.GetName(),
"labels": pod.GetLabels(),
"ip": pod.Status.PodIP,
},
"container": map[string]interface{}{
"id": cid,
"name": c.Name,
"image": c.Image,
"runtime": runtimes[c.Name],
},
return watcher, nil
case "node":
watcher, err := NewNodeWatcher(comm, config, p.logger, client, p.config.Scope)
if err != nil {
return nil, err
}

processors := []map[string]interface{}{
{
"add_fields": map[string]interface{}{
"fields": mapping,
"target": "kubernetes",
},
},
return watcher, nil
case "service":
watcher, err := NewServiceWatcher(comm, config, p.logger, client, p.config.Scope)
if err != nil {
return nil, err
}

// Emit the container
p.comm.AddOrUpdate(eventID, ContainerPriority, mapping, processors)
return watcher, nil
default:
return nil, fmt.Errorf("unsupported autodiscover resource %s", resourceType)
}
}

func (p *eventWatcher) emitStopped(pod *kubernetes.Pod) {
p.comm.Remove(string(pod.GetUID()))

for _, c := range pod.Spec.Containers {
// ID is the combination of pod UID + container name
eventID := fmt.Sprintf("%s.%s", pod.GetObjectMeta().GetUID(), c.Name)
p.comm.Remove(eventID)
}

for _, c := range pod.Spec.InitContainers {
// ID is the combination of pod UID + container name
eventID := fmt.Sprintf("%s.%s", pod.GetObjectMeta().GetUID(), c.Name)
p.comm.Remove(eventID)
}
}

// OnAdd ensures processing of pod objects that are newly added
func (p *eventWatcher) OnAdd(obj interface{}) {
p.logger.Debugf("pod add: %+v", obj)
p.emitRunning(obj.(*kubernetes.Pod))
}

// OnUpdate emits events for a given pod depending on the state of the pod,
// if it is terminating, a stop event is scheduled, if not, a stop and a start
// events are sent sequentially to recreate the resources assotiated to the pod.
func (p *eventWatcher) OnUpdate(obj interface{}) {
pod := obj.(*kubernetes.Pod)

p.logger.Debugf("pod update for pod: %+v, status: %+v", pod.Name, pod.Status.Phase)
switch pod.Status.Phase {
case kubernetes.PodSucceeded, kubernetes.PodFailed:
time.AfterFunc(p.cleanupTimeout, func() { p.emitStopped(pod) })
return
case kubernetes.PodPending:
p.logger.Debugf("pod update (pending): don't know what to do with this pod yet, skipping for now: %+v", obj)
return
}

p.logger.Debugf("pod update: %+v", obj)
p.emitRunning(pod)
}

// OnDelete stops pod objects that are deleted
func (p *eventWatcher) OnDelete(obj interface{}) {
p.logger.Debugf("pod delete: %+v", obj)
pod := obj.(*kubernetes.Pod)
time.AfterFunc(p.cleanupTimeout, func() { p.emitStopped(pod) })
}
Loading