From b32c137ce2bc51b41ccce53aa70b6af700d2ef0f Mon Sep 17 00:00:00 2001 From: Richard Wall Date: Thu, 3 Feb 2022 19:27:04 +0000 Subject: [PATCH 1/4] Test that gvkFixupWatcher matches the behaviour of watch.FakeWatcher Signed-off-by: Richard Wall --- pkg/cache/internal/informers_map_test.go | 94 +++++++++++++++++++++++ pkg/cache/internal/internal_suite_test.go | 31 ++++++++ 2 files changed, 125 insertions(+) create mode 100644 pkg/cache/internal/informers_map_test.go create mode 100644 pkg/cache/internal/internal_suite_test.go diff --git a/pkg/cache/internal/informers_map_test.go b/pkg/cache/internal/informers_map_test.go new file mode 100644 index 0000000000..bf6b902744 --- /dev/null +++ b/pkg/cache/internal/informers_map_test.go @@ -0,0 +1,94 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package internal + +import ( + "fmt" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" +) + +// Test that gvkFixupWatcher behaves like watch.FakeWatcher +// and that it overrides the GVK. +// These tests are adapted from the watch.FakeWatcher tests in: +// https://github.com/kubernetes/kubernetes/blob/adbda068c1808fcc8a64a94269e0766b5c46ec41/staging/src/k8s.io/apimachinery/pkg/watch/watch_test.go#L33-L78 +var _ = Describe("gvkFixupWatcher", func() { + It("behaves like watch.FakeWatcher", func() { + newTestType := func(name string) runtime.Object { + return &metav1.PartialObjectMetadata{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + } + } + + f := watch.NewFake() + // This is the GVK which we expect the wrapper to set on all the events + expectedGVK := schema.GroupVersionKind{ + Group: "testgroup", + Version: "v1test2", + Kind: "TestKind", + } + gvkfw := newGVKFixupWatcher(expectedGVK, f) + + table := []struct { + t watch.EventType + s runtime.Object + }{ + {watch.Added, newTestType("foo")}, + {watch.Modified, newTestType("qux")}, + {watch.Modified, newTestType("bar")}, + {watch.Deleted, newTestType("bar")}, + {watch.Error, newTestType("error: blah")}, + } + + consumer := func(w watch.Interface) { + for _, expect := range table { + By(fmt.Sprintf("Fixing up watch.EventType: %v and passing it on", expect.t)) + got, ok := <-w.ResultChan() + Expect(ok).To(BeTrue(), "closed early") + Expect(expect.t).To(Equal(got.Type), "unexpected Event.Type or out-of-order Event") + Expect(got.Object).To(BeAssignableToTypeOf(&metav1.PartialObjectMetadata{}), "unexpected Event.Object type") + a := got.Object.(*metav1.PartialObjectMetadata) + Expect(got.Object.GetObjectKind().GroupVersionKind()).To(Equal(expectedGVK), "GVK was not fixed up") + expected := expect.s.DeepCopyObject() + expected.GetObjectKind().SetGroupVersionKind(schema.GroupVersionKind{}) + actual := a.DeepCopyObject() + actual.GetObjectKind().SetGroupVersionKind(schema.GroupVersionKind{}) + Expect(actual).To(Equal(expected), "unexpected change to the Object") + } + Eventually(w.ResultChan()).Should(BeClosed()) + } + + sender := func() { + f.Add(newTestType("foo")) + f.Action(watch.Modified, newTestType("qux")) + f.Modify(newTestType("bar")) + f.Delete(newTestType("bar")) + f.Error(newTestType("error: blah")) + f.Stop() + } + + go sender() + consumer(gvkfw) + }) +}) diff --git a/pkg/cache/internal/internal_suite_test.go b/pkg/cache/internal/internal_suite_test.go new file mode 100644 index 0000000000..fd83315e19 --- /dev/null +++ b/pkg/cache/internal/internal_suite_test.go @@ -0,0 +1,31 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package internal + +import ( + "testing" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "sigs.k8s.io/controller-runtime/pkg/envtest/printer" +) + +func TestSource(t *testing.T) { + RegisterFailHandler(Fail) + suiteName := "Cache Internal Suite" + RunSpecsWithDefaultAndCustomReporters(t, suiteName, []Reporter{printer.NewlineReporter{}, printer.NewProwReporter(suiteName)}) +} From fb739b00533fb01dbf3b97e1bfcc277b03e9c24b Mon Sep 17 00:00:00 2001 From: Richard Wall Date: Thu, 3 Feb 2022 21:15:29 +0000 Subject: [PATCH 2/4] Re-implement gvkFixupWatcher as a watch.FilterFunc Signed-off-by: Richard Wall --- pkg/cache/internal/informers_map.go | 60 +++++++++++++---------------- 1 file changed, 26 insertions(+), 34 deletions(-) diff --git a/pkg/cache/internal/informers_map.go b/pkg/cache/internal/informers_map.go index 07f2f1261f..031ba94029 100644 --- a/pkg/cache/internal/informers_map.go +++ b/pkg/cache/internal/informers_map.go @@ -409,41 +409,33 @@ func createMetadataListWatch(gvk schema.GroupVersionKind, ip *specificInformersM }, nil } -type gvkFixupWatcher struct { - watcher watch.Interface - ch chan watch.Event - gvk schema.GroupVersionKind - wg sync.WaitGroup -} - +// newGVKFixupWatcher adds a wrapper that preserves the GVK information when +// events come in. +// +// This works around a bug where GVK information is not passed into mapping +// functions when using the OnlyMetadata option in the builder. +// This issue is most likely caused by kubernetes/kubernetes#80609. +// See kubernetes-sigs/controller-runtime#1484. +// +// This was originally implemented as a cache.ResourceEventHandler wrapper but +// that contained a data race which was resolved by setting the GVK in a watch +// wrapper, before the objects are written to the cache. +// See kubernetes-sigs/controller-runtime#1650. +// +// The original watch wrapper was found to be incompatible with +// k8s.io/client-go/tools/cache.Reflector so it has been re-implemented as a +// watch.Filter which is compatible. +// See kubernetes-sigs/controller-runtime#1789. func newGVKFixupWatcher(gvk schema.GroupVersionKind, watcher watch.Interface) watch.Interface { - ch := make(chan watch.Event) - w := &gvkFixupWatcher{ - gvk: gvk, - watcher: watcher, - ch: ch, - } - w.wg.Add(1) - go w.run() - return w -} - -func (w *gvkFixupWatcher) run() { - for e := range w.watcher.ResultChan() { - e.Object.GetObjectKind().SetGroupVersionKind(w.gvk) - w.ch <- e - } - w.wg.Done() -} - -func (w *gvkFixupWatcher) Stop() { - w.watcher.Stop() - w.wg.Wait() - close(w.ch) -} - -func (w *gvkFixupWatcher) ResultChan() <-chan watch.Event { - return w.ch + return watch.Filter( + watcher, + func(in watch.Event) (out watch.Event, keep bool) { + keep = true + in.DeepCopyInto(&out) + out.Object.GetObjectKind().SetGroupVersionKind(gvk) + return out, keep + }, + ) } // resyncPeriod returns a function which generates a duration each time it is From bad563a71e7d9e225fd4e37d9d15d0efd1fa35b5 Mon Sep 17 00:00:00 2001 From: Richard Wall Date: Mon, 7 Feb 2022 18:36:05 +0000 Subject: [PATCH 3/4] Update copyright headers Signed-off-by: Richard Wall --- pkg/cache/internal/informers_map_test.go | 2 +- pkg/cache/internal/internal_suite_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/cache/internal/informers_map_test.go b/pkg/cache/internal/informers_map_test.go index bf6b902744..32a26fff4e 100644 --- a/pkg/cache/internal/informers_map_test.go +++ b/pkg/cache/internal/informers_map_test.go @@ -1,5 +1,5 @@ /* -Copyright 2018 The Kubernetes Authors. +Copyright 2022 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/pkg/cache/internal/internal_suite_test.go b/pkg/cache/internal/internal_suite_test.go index fd83315e19..4e7c2b2de5 100644 --- a/pkg/cache/internal/internal_suite_test.go +++ b/pkg/cache/internal/internal_suite_test.go @@ -1,5 +1,5 @@ /* -Copyright 2018 The Kubernetes Authors. +Copyright 2022 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. From e11297bb169d28a84948a3b5fa1195e167cf1898 Mon Sep 17 00:00:00 2001 From: Richard Wall Date: Mon, 7 Feb 2022 21:01:23 +0000 Subject: [PATCH 4/4] Remove DeepCopy, as suggested during code review Signed-off-by: Richard Wall --- pkg/cache/internal/informers_map.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/pkg/cache/internal/informers_map.go b/pkg/cache/internal/informers_map.go index 031ba94029..2eb68e840a 100644 --- a/pkg/cache/internal/informers_map.go +++ b/pkg/cache/internal/informers_map.go @@ -429,11 +429,9 @@ func createMetadataListWatch(gvk schema.GroupVersionKind, ip *specificInformersM func newGVKFixupWatcher(gvk schema.GroupVersionKind, watcher watch.Interface) watch.Interface { return watch.Filter( watcher, - func(in watch.Event) (out watch.Event, keep bool) { - keep = true - in.DeepCopyInto(&out) - out.Object.GetObjectKind().SetGroupVersionKind(gvk) - return out, keep + func(in watch.Event) (watch.Event, bool) { + in.Object.GetObjectKind().SetGroupVersionKind(gvk) + return in, true }, ) }