Skip to content

Commit

Permalink
Make sure k8s watchers close when closing k8s meta processor (#35630)
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrsMark authored Jun 1, 2023
1 parent 977d996 commit e310ee2
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ https://github.com/elastic/beats/compare/v8.7.1\...main[Check the HEAD diff]
- In cases where the matcher detects a non-string type in a match statement, report the error as a debug statement, and not a warning statement. {pull}35119[35119]
- 'add_cloud_metadata' processor - add cloud.region field for GCE cloud provider
- 'add_cloud_metadata' processor - update azure metadata api version to get missing `cloud.account.id` field
- Make sure k8s watchers are closed when closing k8s meta processor. {pull}35630[35630]


*Auditbeat*
Expand Down
36 changes: 28 additions & 8 deletions libbeat/processors/add_kubernetes_metadata/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ const (
type kubernetesAnnotator struct {
log *logp.Logger
watcher kubernetes.Watcher
nsWatcher kubernetes.Watcher
nodeWatcher kubernetes.Watcher
rsWatcher kubernetes.Watcher
jobWatcher kubernetes.Watcher
indexers *Indexers
matchers *Matchers
cache *cache
Expand Down Expand Up @@ -222,6 +226,7 @@ func (k *kubernetesAnnotator) init(config kubeAnnotatorConfig, cfg *config.C) {
if err != nil {
k.log.Errorf("Error creating watcher for %T due to error %+v", &kubernetes.ReplicaSet{}, err)
}
k.rsWatcher = replicaSetWatcher
}
if metaConf.CronJob {
jobWatcher, err = kubernetes.NewNamedWatcher("resource_metadata_enricher_job", client, &kubernetes.Job{}, kubernetes.WatchOptions{
Expand All @@ -230,6 +235,7 @@ func (k *kubernetesAnnotator) init(config kubeAnnotatorConfig, cfg *config.C) {
if err != nil {
k.log.Errorf("Error creating watcher for %T due to error %+v", &kubernetes.Job{}, err)
}
k.jobWatcher = jobWatcher
}

// TODO: refactor the above section to a common function to be used by NeWPodEventer too
Expand All @@ -238,6 +244,8 @@ func (k *kubernetesAnnotator) init(config kubeAnnotatorConfig, cfg *config.C) {
k.indexers = NewIndexers(config.Indexers, metaGen)
k.watcher = watcher
k.kubernetesAvailable = true
k.nodeWatcher = nodeWatcher
k.nsWatcher = namespaceWatcher

watcher.AddEventHandler(kubernetes.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
Expand All @@ -259,26 +267,26 @@ func (k *kubernetesAnnotator) init(config kubeAnnotatorConfig, cfg *config.C) {

// NOTE: order is important here since pod meta will include node meta and hence node.Store() should
// be populated before trying to generate metadata for Pods.
if nodeWatcher != nil {
if err := nodeWatcher.Start(); err != nil {
if k.nodeWatcher != nil {
if err := k.nodeWatcher.Start(); err != nil {
k.log.Debugf("add_kubernetes_metadata", "Couldn't start node watcher: %v", err)
return
}
}
if namespaceWatcher != nil {
if err := namespaceWatcher.Start(); err != nil {
if k.nsWatcher != nil {
if err := k.nsWatcher.Start(); err != nil {
k.log.Debugf("add_kubernetes_metadata", "Couldn't start namespace watcher: %v", err)
return
}
}
if replicaSetWatcher != nil {
if err := replicaSetWatcher.Start(); err != nil {
if k.rsWatcher != nil {
if err := k.rsWatcher.Start(); err != nil {
k.log.Debugf("add_kubernetes_metadata", "Couldn't start replicaSet watcher: %v", err)
return
}
}
if jobWatcher != nil {
if err := jobWatcher.Start(); err != nil {
if k.jobWatcher != nil {
if err := k.jobWatcher.Start(); err != nil {
k.log.Debugf("add_kubernetes_metadata", "Couldn't start job watcher: %v", err)
return
}
Expand Down Expand Up @@ -342,6 +350,18 @@ func (k *kubernetesAnnotator) Close() error {
if k.watcher != nil {
k.watcher.Stop()
}
if k.nodeWatcher != nil {
k.nodeWatcher.Stop()
}
if k.nsWatcher != nil {
k.nsWatcher.Stop()
}
if k.rsWatcher != nil {
k.rsWatcher.Stop()
}
if k.jobWatcher != nil {
k.jobWatcher.Stop()
}
if k.cache != nil {
k.cache.stop()
}
Expand Down

0 comments on commit e310ee2

Please sign in to comment.