Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NETOBSERV-1354: fix concurrent access on watches #458

Merged
merged 1 commit into from
Oct 13, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 29 additions & 14 deletions pkg/watchers/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package watchers
import (
"context"
"fmt"
"sync"

"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -28,7 +29,8 @@ var (
type Watcher struct {
ctrl controller.Controller
cache cache.Cache
watched map[string]interface{}
watches map[string]bool
wmut sync.RWMutex
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually started with sync.Map but this isn't type safe (works with any) and the API is less convenient than the typical map API .. since I'm only using the mutex in a couple of places, all in all I find it easier to use traditional map + mutex

defaultNamespace string
}

Expand All @@ -39,7 +41,7 @@ func NewWatcher(ctrl controller.Controller, cache cache.Cache) *Watcher {
return &Watcher{
ctrl: ctrl,
cache: cache,
watched: make(map[string]interface{}),
watches: make(map[string]bool),
}
}

Expand All @@ -52,16 +54,32 @@ func kindToWatchable(kind flowslatest.MountableType) Watchable {

func (w *Watcher) Reset(namespace string) {
w.defaultNamespace = namespace
w.watched = make(map[string]interface{})
// Reset all registered watches as inactive
w.wmut.Lock()
for k := range w.watches {
w.watches[k] = false
}
w.wmut.Unlock()
}

func key(kind flowslatest.MountableType, name, namespace string) string {
return string(kind) + "/" + namespace + "/" + name
}

func (w *Watcher) setActiveWatch(key string) bool {
w.wmut.Lock()
_, exists := w.watches[key]
w.watches[key] = true
w.wmut.Unlock()
return exists
}

func (w *Watcher) watch(ctx context.Context, kind flowslatest.MountableType, obj client.Object) error {
if w.isWatched(kind, obj.GetName(), obj.GetNamespace()) {
// This watcher was already registered
k := key(kind, obj.GetName(), obj.GetNamespace())
// Mark as active
exists := w.setActiveWatch(k)
if exists {
// Don't register again
return nil
}
i, err := w.cache.GetInformer(ctx, obj)
Expand All @@ -74,7 +92,12 @@ func (w *Watcher) watch(ctx context.Context, kind flowslatest.MountableType, obj
err = w.ctrl.Watch(
&source.Informer{Informer: i},
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, o client.Object) []reconcile.Request {
if w.isWatched(kind, o.GetName(), o.GetNamespace()) {
// The watch might be registered, but inactive
k := key(kind, o.GetName(), o.GetNamespace())
w.wmut.RLock()
active := w.watches[k]
w.wmut.RUnlock()
if active {
// Trigger FlowCollector reconcile
return []reconcile.Request{{NamespacedName: constants.FlowCollectorName}}
}
Expand All @@ -84,17 +107,9 @@ func (w *Watcher) watch(ctx context.Context, kind flowslatest.MountableType, obj
if err != nil {
return err
}
w.watched[key(kind, obj.GetName(), obj.GetNamespace())] = true
return nil
}

func (w *Watcher) isWatched(kind flowslatest.MountableType, name, namespace string) bool {
if _, ok := w.watched[key(kind, name, namespace)]; ok {
return true
}
return false
}

func (w *Watcher) ProcessMTLSCerts(ctx context.Context, cl helper.Client, tls *flowslatest.ClientTLS, targetNamespace string) (caDigest string, userDigest string, err error) {
if tls.Enable && tls.CACert.Name != "" {
caRef := w.refFromCert(&tls.CACert)
Expand Down
Loading