Skip to content

Commit

Permalink
NETOBSERV-1805: threads are leaking with continous adding and deletin…
Browse files Browse the repository at this point in the history
…g pods

Signed-off-by: Mohamed Mahmoud <mmahmoud@redhat.com>
(cherry picked from commit 07d0a84)
  • Loading branch information
msherif1234 committed Oct 3, 2024
1 parent 1e8979b commit 3973f3f
Showing 1 changed file with 14 additions and 4 deletions.
18 changes: 14 additions & 4 deletions pkg/ifaces/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Watcher struct {
linkSubscriberAt func(ns netns.NsHandle, ch chan<- netlink.LinkUpdate, done <-chan struct{}) error
mutex *sync.Mutex
netnsWatcher *fsnotify.Watcher
nsDone map[string]chan struct{}
}

func NewWatcher(bufLen int) *Watcher {
Expand All @@ -41,17 +42,19 @@ func NewWatcher(bufLen int) *Watcher {
linkSubscriberAt: netlink.LinkSubscribeAt,
mutex: &sync.Mutex{},
netnsWatcher: &fsnotify.Watcher{},
nsDone: make(map[string]chan struct{}),
}
}

func (w *Watcher) Subscribe(ctx context.Context) (<-chan Event, error) {
out := make(chan Event, w.bufLen)

netns, err := getNetNS()
if err != nil {
w.nsDone[""] = make(chan struct{})
go w.sendUpdates(ctx, "", out)
} else {
for _, n := range netns {
w.nsDone[n] = make(chan struct{})
go w.sendUpdates(ctx, n, out)
}
}
Expand All @@ -64,9 +67,9 @@ func (w *Watcher) sendUpdates(ctx context.Context, ns string, out chan Event) {
var netnsHandle netns.NsHandle
var err error
log := logrus.WithField("component", "ifaces.Watcher")
doneChan := w.nsDone[ns]
// subscribe for interface events
links := make(chan netlink.LinkUpdate)
doneChan := make(chan struct{})
if err = wait.PollUntilContextTimeout(ctx, 50*time.Microsecond, time.Second, true, func(ctx context.Context) (done bool, err error) {
if ns == "" {
netnsHandle = netns.None()
Expand Down Expand Up @@ -113,6 +116,7 @@ func (w *Watcher) sendUpdates(ctx context.Context, ns string, out chan Event) {
}
}
}

for link := range links {
attrs := link.Attrs()
if attrs == nil {
Expand Down Expand Up @@ -191,8 +195,14 @@ func (w *Watcher) netnsNotify(ctx context.Context, out chan Event) {
}
if event.Op&fsnotify.Create == fsnotify.Create {
ns := filepath.Base(event.Name)
log.WithField("netns", ns).Debug("netns notification")
w.sendUpdates(ctx, ns, out)
log.WithField("netns", ns).Debug("netns create notification")
w.nsDone[ns] = make(chan struct{})
go w.sendUpdates(ctx, ns, out)
}
if event.Op&fsnotify.Remove == fsnotify.Remove {
ns := filepath.Base(event.Name)
log.WithField("netns", ns).Debug("netns delete notification")
w.nsDone[ns] <- struct{}{}
}
case err, ok := <-w.netnsWatcher.Errors:
if !ok {
Expand Down

0 comments on commit 3973f3f

Please sign in to comment.