From 273e608d8a6cd7f34b6b56a6bd4626878319aed7 Mon Sep 17 00:00:00 2001 From: Richard Wall Date: Tue, 8 Feb 2022 02:30:55 +0000 Subject: [PATCH] :bug: Fixed a bug in newGVKFixupWatcher which caused the metadata informer to hang (#1790) * Test that gvkFixupWatcher matches the behaviour of watch.FakeWatcher Signed-off-by: Richard Wall * Re-implement gvkFixupWatcher as a watch.FilterFunc Signed-off-by: Richard Wall * Update copyright headers Signed-off-by: Richard Wall * Remove DeepCopy, as suggested during code review Signed-off-by: Richard Wall --- pkg/cache/internal/informers_map.go | 58 ++++++-------- pkg/cache/internal/informers_map_test.go | 94 +++++++++++++++++++++++ pkg/cache/internal/internal_suite_test.go | 31 ++++++++ 3 files changed, 149 insertions(+), 34 deletions(-) create mode 100644 pkg/cache/internal/informers_map_test.go create mode 100644 pkg/cache/internal/internal_suite_test.go diff --git a/pkg/cache/internal/informers_map.go b/pkg/cache/internal/informers_map.go index 07f2f1261f..2eb68e840a 100644 --- a/pkg/cache/internal/informers_map.go +++ b/pkg/cache/internal/informers_map.go @@ -409,41 +409,31 @@ func createMetadataListWatch(gvk schema.GroupVersionKind, ip *specificInformersM }, nil } -type gvkFixupWatcher struct { - watcher watch.Interface - ch chan watch.Event - gvk schema.GroupVersionKind - wg sync.WaitGroup -} - +// newGVKFixupWatcher adds a wrapper that preserves the GVK information when +// events come in. +// +// This works around a bug where GVK information is not passed into mapping +// functions when using the OnlyMetadata option in the builder. +// This issue is most likely caused by kubernetes/kubernetes#80609. +// See kubernetes-sigs/controller-runtime#1484. +// +// This was originally implemented as a cache.ResourceEventHandler wrapper but +// that contained a data race which was resolved by setting the GVK in a watch +// wrapper, before the objects are written to the cache. +// See kubernetes-sigs/controller-runtime#1650. +// +// The original watch wrapper was found to be incompatible with +// k8s.io/client-go/tools/cache.Reflector so it has been re-implemented as a +// watch.Filter which is compatible. +// See kubernetes-sigs/controller-runtime#1789. func newGVKFixupWatcher(gvk schema.GroupVersionKind, watcher watch.Interface) watch.Interface { - ch := make(chan watch.Event) - w := &gvkFixupWatcher{ - gvk: gvk, - watcher: watcher, - ch: ch, - } - w.wg.Add(1) - go w.run() - return w -} - -func (w *gvkFixupWatcher) run() { - for e := range w.watcher.ResultChan() { - e.Object.GetObjectKind().SetGroupVersionKind(w.gvk) - w.ch <- e - } - w.wg.Done() -} - -func (w *gvkFixupWatcher) Stop() { - w.watcher.Stop() - w.wg.Wait() - close(w.ch) -} - -func (w *gvkFixupWatcher) ResultChan() <-chan watch.Event { - return w.ch + return watch.Filter( + watcher, + func(in watch.Event) (watch.Event, bool) { + in.Object.GetObjectKind().SetGroupVersionKind(gvk) + return in, true + }, + ) } // resyncPeriod returns a function which generates a duration each time it is diff --git a/pkg/cache/internal/informers_map_test.go b/pkg/cache/internal/informers_map_test.go new file mode 100644 index 0000000000..32a26fff4e --- /dev/null +++ b/pkg/cache/internal/informers_map_test.go @@ -0,0 +1,94 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package internal + +import ( + "fmt" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" +) + +// Test that gvkFixupWatcher behaves like watch.FakeWatcher +// and that it overrides the GVK. +// These tests are adapted from the watch.FakeWatcher tests in: +// https://github.com/kubernetes/kubernetes/blob/adbda068c1808fcc8a64a94269e0766b5c46ec41/staging/src/k8s.io/apimachinery/pkg/watch/watch_test.go#L33-L78 +var _ = Describe("gvkFixupWatcher", func() { + It("behaves like watch.FakeWatcher", func() { + newTestType := func(name string) runtime.Object { + return &metav1.PartialObjectMetadata{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + } + } + + f := watch.NewFake() + // This is the GVK which we expect the wrapper to set on all the events + expectedGVK := schema.GroupVersionKind{ + Group: "testgroup", + Version: "v1test2", + Kind: "TestKind", + } + gvkfw := newGVKFixupWatcher(expectedGVK, f) + + table := []struct { + t watch.EventType + s runtime.Object + }{ + {watch.Added, newTestType("foo")}, + {watch.Modified, newTestType("qux")}, + {watch.Modified, newTestType("bar")}, + {watch.Deleted, newTestType("bar")}, + {watch.Error, newTestType("error: blah")}, + } + + consumer := func(w watch.Interface) { + for _, expect := range table { + By(fmt.Sprintf("Fixing up watch.EventType: %v and passing it on", expect.t)) + got, ok := <-w.ResultChan() + Expect(ok).To(BeTrue(), "closed early") + Expect(expect.t).To(Equal(got.Type), "unexpected Event.Type or out-of-order Event") + Expect(got.Object).To(BeAssignableToTypeOf(&metav1.PartialObjectMetadata{}), "unexpected Event.Object type") + a := got.Object.(*metav1.PartialObjectMetadata) + Expect(got.Object.GetObjectKind().GroupVersionKind()).To(Equal(expectedGVK), "GVK was not fixed up") + expected := expect.s.DeepCopyObject() + expected.GetObjectKind().SetGroupVersionKind(schema.GroupVersionKind{}) + actual := a.DeepCopyObject() + actual.GetObjectKind().SetGroupVersionKind(schema.GroupVersionKind{}) + Expect(actual).To(Equal(expected), "unexpected change to the Object") + } + Eventually(w.ResultChan()).Should(BeClosed()) + } + + sender := func() { + f.Add(newTestType("foo")) + f.Action(watch.Modified, newTestType("qux")) + f.Modify(newTestType("bar")) + f.Delete(newTestType("bar")) + f.Error(newTestType("error: blah")) + f.Stop() + } + + go sender() + consumer(gvkfw) + }) +}) diff --git a/pkg/cache/internal/internal_suite_test.go b/pkg/cache/internal/internal_suite_test.go new file mode 100644 index 0000000000..4e7c2b2de5 --- /dev/null +++ b/pkg/cache/internal/internal_suite_test.go @@ -0,0 +1,31 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package internal + +import ( + "testing" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "sigs.k8s.io/controller-runtime/pkg/envtest/printer" +) + +func TestSource(t *testing.T) { + RegisterFailHandler(Fail) + suiteName := "Cache Internal Suite" + RunSpecsWithDefaultAndCustomReporters(t, suiteName, []Reporter{printer.NewlineReporter{}, printer.NewProwReporter(suiteName)}) +}