From fc43a826393e1f1b67ff66eb557c5159c52a47a0 Mon Sep 17 00:00:00 2001 From: Mohamed Mahmoud Date: Tue, 1 Oct 2024 13:14:38 -0400 Subject: [PATCH] NETOBSERV-1805: threads are leaking with continous adding and deleting pods Signed-off-by: Mohamed Mahmoud (cherry picked from commit 55c3dec076cc2c2bfb436414e47a3e586f92a761) --- pkg/ifaces/watcher.go | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/pkg/ifaces/watcher.go b/pkg/ifaces/watcher.go index 25274d6d..f701c4ca 100644 --- a/pkg/ifaces/watcher.go +++ b/pkg/ifaces/watcher.go @@ -31,6 +31,7 @@ type Watcher struct { linkSubscriberAt func(ns netns.NsHandle, ch chan<- netlink.LinkUpdate, done <-chan struct{}) error mutex *sync.Mutex netnsWatcher *fsnotify.Watcher + nsDone map[string]chan struct{} } func NewWatcher(bufLen int) *Watcher { @@ -41,17 +42,19 @@ func NewWatcher(bufLen int) *Watcher { linkSubscriberAt: netlink.LinkSubscribeAt, mutex: &sync.Mutex{}, netnsWatcher: &fsnotify.Watcher{}, + nsDone: make(map[string]chan struct{}), } } func (w *Watcher) Subscribe(ctx context.Context) (<-chan Event, error) { out := make(chan Event, w.bufLen) - netns, err := getNetNS() if err != nil { + w.nsDone[""] = make(chan struct{}) go w.sendUpdates(ctx, "", out) } else { for _, n := range netns { + w.nsDone[n] = make(chan struct{}) go w.sendUpdates(ctx, n, out) } } @@ -64,9 +67,13 @@ func (w *Watcher) sendUpdates(ctx context.Context, ns string, out chan Event) { var netnsHandle netns.NsHandle var err error log := logrus.WithField("component", "ifaces.Watcher") + doneChan := w.nsDone[ns] + defer func() { + close(doneChan) + delete(w.nsDone, ns) + }() // subscribe for interface events links := make(chan netlink.LinkUpdate) - doneChan := make(chan struct{}) if err = wait.PollUntilContextTimeout(ctx, 50*time.Microsecond, time.Second, true, func(ctx context.Context) (done bool, err error) { if ns == "" { netnsHandle = netns.None() @@ -113,6 +120,7 @@ func (w *Watcher) sendUpdates(ctx context.Context, ns string, out chan Event) { } } } + for link := range links { attrs := link.Attrs() if attrs == nil { @@ -191,9 +199,19 @@ func (w *Watcher) netnsNotify(ctx context.Context, out chan Event) { } if event.Op&fsnotify.Create == fsnotify.Create { ns := filepath.Base(event.Name) - log.WithField("netns", ns).Debug("netns notification") + log.WithField("netns", ns).Debug("netns create notification") + if _, ok := w.nsDone[ns]; ok { + log.WithField("netns", ns).Debug("netns channel already exists, delete it") + delete(w.nsDone, ns) + } + w.nsDone[ns] = make(chan struct{}) go w.sendUpdates(ctx, ns, out) } + if event.Op&fsnotify.Remove == fsnotify.Remove { + ns := filepath.Base(event.Name) + log.WithField("netns", ns).Debug("netns delete notification") + w.nsDone[ns] <- struct{}{} + } case err, ok := <-w.netnsWatcher.Errors: if !ok { return