Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 set partialmetadata gvk in list/watch funcs to avoid data race in cache #1650

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 26 additions & 14 deletions pkg/builder/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ var _ = Describe("application", func() {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
doReconcileTest(ctx, "3", bldr, m, false)
doReconcileTest(ctx, "3", m, false, bldr)
}, 10)

It("should Reconcile Watches objects", func() {
Expand All @@ -322,7 +322,7 @@ var _ = Describe("application", func() {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
doReconcileTest(ctx, "4", bldr, m, true)
doReconcileTest(ctx, "4", m, true, bldr)
}, 10)
})

Expand Down Expand Up @@ -378,7 +378,7 @@ var _ = Describe("application", func() {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
doReconcileTest(ctx, "5", bldr, m, true)
doReconcileTest(ctx, "5", m, true, bldr)

Expect(deployPrctExecuted).To(BeTrue(), "Deploy predicated should be called at least once")
Expect(replicaSetPrctExecuted).To(BeTrue(), "ReplicaSet predicated should be called at least once")
Expand All @@ -396,6 +396,16 @@ var _ = Describe("application", func() {
Expect(err).NotTo(HaveOccurred())
})

It("should support multiple controllers watching the same metadata kind", func() {
bldr1 := ControllerManagedBy(mgr).For(&appsv1.Deployment{}, OnlyMetadata)
bldr2 := ControllerManagedBy(mgr).For(&appsv1.Deployment{}, OnlyMetadata)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

doReconcileTest(ctx, "6", mgr, true, bldr1, bldr2)
})

It("should support watching For, Owns, and Watch as metadata", func() {
statefulSetMaps := make(chan *metav1.PartialObjectMetadata)

Expand All @@ -421,7 +431,7 @@ var _ = Describe("application", func() {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
doReconcileTest(ctx, "8", bldr, mgr, true)
doReconcileTest(ctx, "8", mgr, true, bldr)

By("Creating a new stateful set")
set := &appsv1.StatefulSet{
Expand Down Expand Up @@ -496,7 +506,7 @@ func (c *nonTypedOnlyCache) GetInformerForKind(ctx context.Context, gvk schema.G

// TODO(directxman12): this function has too many arguments, and the whole
// "nameSuffix" think is a bit of a hack It should be cleaned up significantly by someone with a bit of time.
func doReconcileTest(ctx context.Context, nameSuffix string, blder *Builder, mgr manager.Manager, complete bool) {
func doReconcileTest(ctx context.Context, nameSuffix string, mgr manager.Manager, complete bool, blders ...*Builder) {
deployName := "deploy-name-" + nameSuffix
rsName := "rs-name-" + nameSuffix

Expand All @@ -512,15 +522,17 @@ func doReconcileTest(ctx context.Context, nameSuffix string, blder *Builder, mgr
return reconcile.Result{}, nil
})

if complete {
err := blder.Complete(fn)
Expect(err).NotTo(HaveOccurred())
} else {
var err error
var c controller.Controller
c, err = blder.Build(fn)
Expect(err).NotTo(HaveOccurred())
Expect(c).NotTo(BeNil())
for _, blder := range blders {
if complete {
err := blder.Complete(fn)
Expect(err).NotTo(HaveOccurred())
} else {
var err error
var c controller.Controller
c, err = blder.Build(fn)
Expect(err).NotTo(HaveOccurred())
Expect(c).NotTo(BeNil())
}
}

By("Starting the application")
Expand Down
74 changes: 64 additions & 10 deletions pkg/cache/internal/informers_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"k8s.io/client-go/metadata"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"

"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
)

Expand Down Expand Up @@ -231,12 +232,6 @@ func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, ob
return nil, false, err
}

switch obj.(type) {
case *metav1.PartialObjectMetadata, *metav1.PartialObjectMetadataList:
ni = metadataSharedIndexInformerPreserveGVK(gvk, ni)
default:
}

i := &MapEntry{
Informer: ni,
Reader: CacheReader{
Expand Down Expand Up @@ -372,26 +367,85 @@ func createMetadataListWatch(gvk schema.GroupVersionKind, ip *specificInformersM
return &cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
ip.selectors[gvk].ApplyToList(&opts)

var (
list *metav1.PartialObjectMetadataList
err error
)
namespace := restrictNamespaceBySelector(ip.namespace, ip.selectors[gvk])
if namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
return client.Resource(mapping.Resource).Namespace(namespace).List(ctx, opts)
list, err = client.Resource(mapping.Resource).Namespace(namespace).List(ctx, opts)
} else {
list, err = client.Resource(mapping.Resource).List(ctx, opts)
}
if list != nil {
for i := range list.Items {
list.Items[i].SetGroupVersionKind(gvk)
}
}
return client.Resource(mapping.Resource).List(ctx, opts)
return list, err
},
// Setup the watch function
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
ip.selectors[gvk].ApplyToList(&opts)
// Watch needs to be set to true separately
opts.Watch = true

var (
watcher watch.Interface
err error
)
namespace := restrictNamespaceBySelector(ip.namespace, ip.selectors[gvk])
if namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
return client.Resource(mapping.Resource).Namespace(namespace).Watch(ctx, opts)
watcher, err = client.Resource(mapping.Resource).Namespace(namespace).Watch(ctx, opts)
} else {
watcher, err = client.Resource(mapping.Resource).Watch(ctx, opts)
}
if watcher != nil {
watcher = newGVKFixupWatcher(gvk, watcher)
}
return client.Resource(mapping.Resource).Watch(ctx, opts)
return watcher, err
},
}, nil
}

type gvkFixupWatcher struct {
watcher watch.Interface
ch chan watch.Event
gvk schema.GroupVersionKind
wg sync.WaitGroup
}

func newGVKFixupWatcher(gvk schema.GroupVersionKind, watcher watch.Interface) watch.Interface {
ch := make(chan watch.Event)
w := &gvkFixupWatcher{
gvk: gvk,
watcher: watcher,
ch: ch,
}
w.wg.Add(1)
go w.run()
return w
}

func (w *gvkFixupWatcher) run() {
for e := range w.watcher.ResultChan() {
e.Object.GetObjectKind().SetGroupVersionKind(w.gvk)
w.ch <- e
}
w.wg.Done()
}

func (w *gvkFixupWatcher) Stop() {
w.watcher.Stop()
w.wg.Wait()
close(w.ch)
}

func (w *gvkFixupWatcher) ResultChan() <-chan watch.Event {
return w.ch
}
Comment on lines +419 to +447
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the only way we can fix this behavior? Having an extra reader seems a bit overkill, given that's a reader on a reader, but I also don't have other options that I can think about.


// resyncPeriod returns a function which generates a duration each time it is
// invoked; this is so that multiple controllers don't get into lock-step and all
// hammer the apiserver with list requests simultaneously.
Expand Down
71 changes: 0 additions & 71 deletions pkg/cache/internal/metadata_infomer_wrapper.go

This file was deleted.