From dad9a7cdce31fd7552c77c7fe661246063c0d490 Mon Sep 17 00:00:00 2001 From: Phillip Wittrock Date: Mon, 19 Mar 2018 11:27:51 -0700 Subject: [PATCH] Update controller libraries - Lookup controller and compare uid when watchingcontrollerof - Rename packages and functions - Better test coverage --- pkg/controller/common_suite_test.go | 2 +- pkg/controller/controller.go | 45 +- pkg/controller/controller_test.go | 278 +++++++++--- .../{handlefunctions => eventhandlers}/doc.go | 2 +- pkg/controller/eventhandlers/eventhandlers.go | 152 +++++++ .../eventhandlers/eventhandlers_suite_test.go | 29 ++ .../eventhandlers/eventhandlers_test.go | 419 ++++++++++++++++++ pkg/controller/example_controller_test.go | 7 +- .../example_watchandhandleevents_test.go | 10 +- pkg/controller/example_watchandmap_test.go | 2 +- .../example_watchandmaptocontroller_test.go | 9 +- .../handlefunctions/handlerfuncs.go | 177 -------- pkg/controller/listeningqueue.go | 41 +- pkg/controller/listeningqueue_test.go | 142 ------ pkg/controller/manager_test.go | 10 - pkg/controller/predicates/predicates.go | 83 ++++ .../predicates_suite_test.go} | 13 +- pkg/controller/predicates/predicates_test.go | 117 +++++ 18 files changed, 1073 insertions(+), 465 deletions(-) rename pkg/controller/{handlefunctions => eventhandlers}/doc.go (96%) create mode 100644 pkg/controller/eventhandlers/eventhandlers.go create mode 100644 pkg/controller/eventhandlers/eventhandlers_suite_test.go create mode 100644 pkg/controller/eventhandlers/eventhandlers_test.go delete mode 100644 pkg/controller/handlefunctions/handlerfuncs.go delete mode 100644 pkg/controller/manager_test.go create mode 100644 pkg/controller/predicates/predicates.go rename pkg/controller/{handlefunctions/handlerfuncs_test.go => predicates/predicates_suite_test.go} (79%) create mode 100644 pkg/controller/predicates/predicates_test.go diff --git a/pkg/controller/common_suite_test.go b/pkg/controller/common_suite_test.go index 0c9041781a..434350bc63 100644 --- a/pkg/controller/common_suite_test.go +++ b/pkg/controller/common_suite_test.go @@ -23,7 +23,7 @@ import ( "testing" ) -func Testcontroller(t *testing.T) { +func TestController(t *testing.T) { RegisterFailHandler(Fail) RunSpecs(t, "controller Suite") } diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index d32cc8891f..37f946dfed 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -25,9 +25,10 @@ import ( "time" "github.com/golang/glog" - "github.com/kubernetes-sigs/kubebuilder/pkg/controller/handlefunctions" + "github.com/kubernetes-sigs/kubebuilder/pkg/controller/eventhandlers" "github.com/kubernetes-sigs/kubebuilder/pkg/controller/informers" "github.com/kubernetes-sigs/kubebuilder/pkg/controller/metrics" + "github.com/kubernetes-sigs/kubebuilder/pkg/controller/predicates" "github.com/kubernetes-sigs/kubebuilder/pkg/controller/types" "github.com/kubernetes-sigs/kubebuilder/pkg/inject/run" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -41,7 +42,7 @@ import ( var ( // DefaultReconcileFn is used by GenericController if Reconcile is not set DefaultReconcileFn = func(k types.ReconcileKey) error { - log.Printf("No ReconcileFn definded - skipping %+v", k) + log.Printf("No ReconcileFn defined - skipping %+v", k) return nil } @@ -87,34 +88,38 @@ func (gc *GenericController) GetMetrics() metrics.Metrics { } } -// Watch watches objects matching obj's type and enqueues their keys. -func (gc *GenericController) Watch(obj metav1.Object) error { +// Watch watches objects matching obj's type and enqueues their keys to be reconcild. +func (gc *GenericController) Watch(obj metav1.Object, p ...predicates.Predicate) error { gc.once.Do(gc.init) - return gc.queue.watchFor(obj) + return gc.queue.addEventHandler(obj, + eventhandlers.MapAndEnqueue{Map: eventhandlers.MapToSelf, Predicates: p}) } -// WatchAndMapToController watches objects matching obj's type and enqueues the keys of their controllers. -func (gc *GenericController) WatchAndMapToController(obj metav1.Object, gvks ...metav1.GroupVersionKind) error { +// WatchControllerOf watches for events for objects matching obj's type and enqueues events for the +// controller of the object if the controller UID matches the ownerref UID. +// Will walk the owners references looking up the controller using the path function and comparing the UID of +// the object to the ownersref UID. +// e.g. if obj was a Pod and the path contained lookup functions for ReplicaSet, Deployment, Foo it would walk +// Pod -> (controller) ReplicaSet -> (controller) Deployment -> (controller) Foo and reconcile Foo only. +func (gc *GenericController) WatchControllerOf(obj metav1.Object, path eventhandlers.Path, + p ...predicates.Predicate) error { gc.once.Do(gc.init) - return gc.queue.watchForAndMapToController(obj, gvks...) + return gc.queue.addEventHandler(obj, + eventhandlers.MapAndEnqueue{Map: eventhandlers.MapToController{Path: path}.Map, Predicates: p}) } -func (gc *GenericController) WatchAndMapToControllerIf(obj metav1.Object, - p handlefunctions.Predicate, gvks ...metav1.GroupVersionKind) error { +// WatchTransformationOf watches objects matching obj's type and enqueues the key returned by mapFn. +func (gc *GenericController) WatchTransformationOf(obj metav1.Object, mapFn eventhandlers.ObjToKey, + p ...predicates.Predicate) error { gc.once.Do(gc.init) - return gc.queue.watchForAndMapToControllerIf(obj, p, gvks...) + return gc.queue.addEventHandler(obj, + eventhandlers.MapAndEnqueue{Map: mapFn, Predicates: p}) } -// WatchAndMap watches objects matching obj's type and enqueues the key returned by mapFn. -func (gc *GenericController) WatchAndMap(obj metav1.Object, mapFn handlefunctions.ObjToKey) error { +// WatchEvents watches objects matching obj's type and uses the functions from provider to handle events. +func (gc *GenericController) WatchEvents(obj metav1.Object, provider types.HandleFnProvider) error { gc.once.Do(gc.init) - return gc.queue.watchForAndMapToNewObjectKey(obj, mapFn) -} - -// WatchAndHandleEvents watches objects matching obj's type and uses the functions from provider to handle events. -func (gc *GenericController) WatchAndHandleEvents(obj metav1.Object, provider types.HandleFnProvider) error { - gc.once.Do(gc.init) - return gc.queue.watchForAndHandleEvent(obj, fnToInterfaceAdapter{provider}) + return gc.queue.addEventHandler(obj, fnToInterfaceAdapter{provider}) } // WatchChannel enqueues object keys read from the channel. diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index 5a236e71cd..3b30d51a71 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -14,13 +14,13 @@ See the License for the specific language governing permissions and limitations under the License. */ -package controller_test +package controller import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "github.com/kubernetes-sigs/kubebuilder/pkg/controller" + "github.com/kubernetes-sigs/kubebuilder/pkg/controller/eventhandlers" "github.com/kubernetes-sigs/kubebuilder/pkg/controller/test" "github.com/kubernetes-sigs/kubebuilder/pkg/controller/types" "github.com/kubernetes-sigs/kubebuilder/pkg/inject/run" @@ -29,20 +29,23 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" + "time" ) var _ = Describe("GenericController", func() { var ( - instance *controller.GenericController - mgr *controller.ControllerManager + instance *GenericController + mgr *ControllerManager fakePodInformer *test.FakeInformer fakeReplicaSetInformer *test.FakeInformer result chan string stop chan struct{} + ch chan string + t = true ) BeforeEach(func() { - mgr = &controller.ControllerManager{} + mgr = &ControllerManager{} // Create a new informers map with fake informers fakePodInformer = &test.FakeInformer{Synced: true} @@ -58,10 +61,10 @@ var _ = Describe("GenericController", func() { stop = make(chan struct{}) }) - Describe("Listening to a Pod SharedInformer", func() { + Describe("Watching a Pod from a controller", func() { BeforeEach(func() { // Create a new listeningQueue - instance = &controller.GenericController{ + instance = &GenericController{ Name: "TestInstance", InformerRegistry: mgr, Reconcile: func(k types.ReconcileKey) error { @@ -75,31 +78,79 @@ var _ = Describe("GenericController", func() { }) Context("Where a Pod has been added", func() { + It("should be able to lookup the controller", func() { + Expect(mgr.GetController("TestInstance")).Should(Equal(instance)) + }) + + It("should be able to lookup the informer provider", func() { + Expect(mgr.GetInformerProvider(&corev1.Pod{})).Should(Equal(fakePodInformer)) + }) + + It("should be able to lookup the informer provider", func() { + Expect(mgr.GetInformer(&corev1.Pod{})).Should(Equal(fakePodInformer)) + }) + It("should reconcile the Pod namespace/name", func() { // Listen for Pod changes Expect(instance.Watch(&corev1.Pod{})).Should(Succeed()) // Create a Pod event + fakePodInformer.Add(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-pod", Namespace: "default"}}) + + val := ChannelResult{} + Eventually(result).Should(Receive(&val.result)) + Expect(val.result).Should(Equal("default/test-pod")) + Expect(instance.GetMetrics().QueueLength).Should(Equal(0)) + }) + + It("should reconcile the Controller namespace/name if the UID matches", func() { + // Function to lookup the ReplicaSet based on the key + fn := func(k types.ReconcileKey) (interface{}, error) { + return &appsv1.ReplicaSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-replicaset", + Namespace: "default", + UID: "uid5", + }, + }, nil + } + // Listen for Pod changes + Expect(instance.WatchControllerOf(&corev1.Pod{}, eventhandlers.Path{fn})).Should(Succeed()) + fakePodInformer.Add(&corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "test-pod", Namespace: "default", + OwnerReferences: []metav1.OwnerReference{ + { + Name: "test-replicaset", + Controller: &t, + UID: "uid5", + }, + }, }, }) val := ChannelResult{} Eventually(result).Should(Receive(&val.result)) - Expect(val.result).Should(Equal("default/test-pod")) + Expect(val.result).Should(Equal("default/test-replicaset")) Expect(instance.GetMetrics().QueueLength).Should(Equal(0)) }) - }) - Context("Where a Pod has been added", func() { - It("should reconcile the Controller namespace/name", func() { + It("should not reconcile the Controller namespace/name if the UID doesn't match", func() { + // Function to lookup the ReplicaSet based on the key + fn := func(k types.ReconcileKey) (interface{}, error) { + return &appsv1.ReplicaSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-replicaset", + Namespace: "default", + UID: "uid5", + }, + }, nil + } // Listen for Pod changes - Expect(instance.WatchAndMapToController(&corev1.Pod{})).Should(Succeed()) + Expect(instance.WatchControllerOf(&corev1.Pod{}, eventhandlers.Path{fn})).Should(Succeed()) - c := true fakePodInformer.Add(&corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "test-pod", @@ -107,45 +158,84 @@ var _ = Describe("GenericController", func() { OwnerReferences: []metav1.OwnerReference{ { Name: "test-replicaset", - Controller: &c, + Controller: &t, + UID: "uid3", // UID doesn't match }, }, }, }) val := ChannelResult{} - Eventually(result).Should(Receive(&val.result)) - Expect(val.result).Should(Equal("default/test-replicaset")) - Expect(instance.GetMetrics().QueueLength).Should(Equal(0)) + Consistently(result).Should(Not(Receive(&val.result))) }) - }) - Context("Where a Pod has been added", func() { - It("should reconcile the mapped key", func() { - // Listen for Pod changes - Expect(instance.WatchAndMap(&corev1.Pod{}, func(obj interface{}) string { - p := obj.(*corev1.Pod) - return p.Namespace + "-namespace/" + p.Name + "-name" - })).Should(Succeed()) + It("should reconcile the Controller-Controller namespace/name", func() { + // Function to lookup the ReplicaSet based on the key + fn1 := func(k types.ReconcileKey) (interface{}, error) { + return &appsv1.ReplicaSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-replicaset", + Namespace: "default", + UID: "uid5", + OwnerReferences: []metav1.OwnerReference{ + { + Name: "test-deployment", + UID: "uid7", + Controller: &t, + }, + }, + }, + }, nil + } + fn2 := func(k types.ReconcileKey) (interface{}, error) { + return &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-deployment", + Namespace: "default", + UID: "uid7", + }, + }, nil + } + Expect(instance.WatchControllerOf(&corev1.Pod{}, eventhandlers.Path{fn1, fn2})).Should(Succeed()) fakePodInformer.Add(&corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "test-pod", Namespace: "default", + OwnerReferences: []metav1.OwnerReference{ + { + Name: "test-replicaset", + Controller: &t, + UID: "uid5", + }, + }, }, }) + val := ChannelResult{} + Eventually(result).Should(Receive(&val.result)) + Expect(val.result).Should(Equal("default/test-deployment")) + Expect(instance.GetMetrics().QueueLength).Should(Equal(0)) + }) + + It("should use the map function to reconcile a different key", func() { + // Listen for Pod changes + Expect(instance.WatchTransformationOf(&corev1.Pod{}, func(obj interface{}) string { + p := obj.(*corev1.Pod) + return p.Namespace + "-namespace/" + p.Name + "-name" + })).Should(Succeed()) + + fakePodInformer.Add(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-pod", Namespace: "default"}}) + val := ChannelResult{} Eventually(result).Should(Receive(&val.result)) Expect(val.result).Should(Equal("default-namespace/test-pod-name")) Expect(instance.GetMetrics().QueueLength).Should(Equal(0)) }) - }) - Context("Where a Pod has been added", func() { It("should call the event handling add function", func() { // Listen for Pod changes - Expect(instance.WatchAndHandleEvents(&corev1.Pod{}, + Expect(instance.WatchEvents(&corev1.Pod{}, func(w workqueue.RateLimitingInterface) cache.ResourceEventHandlerFuncs { return cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { w.AddRateLimited("key/value") }, @@ -154,12 +244,7 @@ var _ = Describe("GenericController", func() { } })).Should(Succeed()) - fakePodInformer.Add(&corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-pod", - Namespace: "default", - }, - }) + fakePodInformer.Add(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-pod", Namespace: "default"}}) val := ChannelResult{} Eventually(result).Should(Receive(&val.result)) @@ -171,7 +256,7 @@ var _ = Describe("GenericController", func() { Context("Where a Pod has been updated", func() { It("should call the event handling update function", func() { // Listen for Pod changes - Expect(instance.WatchAndHandleEvents(&corev1.Pod{}, + Expect(instance.WatchEvents(&corev1.Pod{}, func(w workqueue.RateLimitingInterface) cache.ResourceEventHandlerFuncs { return cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { Fail("Add function called") }, @@ -202,7 +287,7 @@ var _ = Describe("GenericController", func() { Context("Where a Pod has been deleted", func() { It("should call the event handling delete function", func() { // Listen for Pod changes - Expect(instance.WatchAndHandleEvents(&corev1.Pod{}, + Expect(instance.WatchEvents(&corev1.Pod{}, func(w workqueue.RateLimitingInterface) cache.ResourceEventHandlerFuncs { return cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { Fail("Add function called") }, @@ -211,12 +296,7 @@ var _ = Describe("GenericController", func() { } })).Should(Succeed()) - fakePodInformer.Delete(&corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-pod", - Namespace: "default", - }, - }) + fakePodInformer.Delete(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-pod", Namespace: "default"}}) val := ChannelResult{} Eventually(result).Should(Receive(&val.result)) @@ -226,10 +306,10 @@ var _ = Describe("GenericController", func() { }) }) - Describe("Checking Metrics to a Pod SharedInformer", func() { + Describe("Watching a channel", func() { BeforeEach(func() { - // Create a new listeningQueue - instance = &controller.GenericController{ + ch = make(chan string) + instance = &GenericController{ Name: "TestInstance", InformerRegistry: mgr, Reconcile: func(k types.ReconcileKey) error { @@ -239,28 +319,106 @@ var _ = Describe("GenericController", func() { }, } mgr.AddController(instance) + Expect(instance.WatchChannel(ch)).Should(Succeed()) mgr.RunInformersAndControllers(run.RunArguments{Stop: stop}) }) - Context("Where a Pod has been added", func() { - It("should reconcile the Pod namespace/name", func() { - // Listen for Pod changes - Expect(instance.Watch(&corev1.Pod{})).Should(Succeed()) - - // Create a Pod event - fakePodInformer.Add(&corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-pod", - Namespace: "default", - }, - }) - + Context("Where a key is added to the channel", func() { + It("should reconcile the added namespace/name", func() { + go func() { ch <- "hello/world" }() val := ChannelResult{} - Eventually(result).Should(Receive(&val.result)) - Expect(val.result).Should(Equal("default/test-pod")) + Eventually(result, time.Second*1).Should(Receive(&val.result)) + Expect(val.result).Should(Equal("hello/world")) Expect(instance.GetMetrics().QueueLength).Should(Equal(0)) }) }) + + Context("Where a key does not have a namespace/name", func() { + It("should not reconcile the any namespace/name", func() { + go func() { ch <- "hello/world/foo" }() + val := ChannelResult{} + Consistently(result, time.Second*1).Should(Not(Receive(&val.result))) + }) + }) + }) + + Describe("Creating an empty controller", func() { + BeforeEach(func() { + instance = &GenericController{ + AfterReconcile: func(k types.ReconcileKey, err error) { + defer GinkgoRecover() + Expect(err).Should(BeNil()) + result <- k.Namespace + "/" + k.Name + }, + } + defaultManager = ControllerManager{} + AddInformerProvider(&corev1.Pod{}, fakePodInformer) + Expect(GetInformerProvider(&corev1.Pod{})).Should(Equal(fakePodInformer)) + AddController(instance) + RunInformersAndControllers(run.RunArguments{Stop: stop}) + }) + + It("should create a name for the controller", func() { + Expect(instance.Watch(&corev1.Pod{})).Should(Succeed()) + Expect(instance.Name).Should(Not(BeEmpty())) + }) + + It("should use the default informer registry", func() { + Expect(instance.Watch(&corev1.Pod{})).Should(Succeed()) + + // Create a Pod event + fakePodInformer.Add(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-pod", Namespace: "default"}}) + + val := ChannelResult{} + Eventually(result).Should(Receive(&val.result)) + Expect(val.result).Should(Equal("default/test-pod")) + Expect(instance.GetMetrics().QueueLength).Should(Equal(0)) + }) + }) + + Describe("Adding a non-string item to the queue", func() { + BeforeEach(func() { + instance = &GenericController{ + Name: "TestInstance", + InformerRegistry: mgr, + Reconcile: func(k types.ReconcileKey) error { + // Write the result to a channel + result <- k.Namespace + "/" + k.Name + return nil + }, + } + mgr.AddController(instance) + mgr.RunInformersAndControllers(run.RunArguments{Stop: stop}) + }) + It("should not call reconcile", func() { + instance.Watch(&corev1.Pod{}) + instance.queue.AddRateLimited(fakePodInformer) + val := ChannelResult{} + Consistently(result).Should(Not(Receive(&val.result))) + }) + }) + + Describe("Adding string where the namespace/name cannot be parsed", func() { + BeforeEach(func() { + instance = &GenericController{ + Name: "TestInstance", + InformerRegistry: mgr, + Reconcile: func(k types.ReconcileKey) error { + // Write the result to a channel + result <- k.Namespace + "/" + k.Name + return nil + }, + } + mgr.AddController(instance) + mgr.RunInformersAndControllers(run.RunArguments{Stop: stop}) + }) + + It("should not call reconcile", func() { + instance.Watch(&corev1.Pod{}) + instance.queue.AddRateLimited("1/2/3") + val := ChannelResult{} + Consistently(result).Should(Not(Receive(&val.result))) + }) }) AfterEach(func() { diff --git a/pkg/controller/handlefunctions/doc.go b/pkg/controller/eventhandlers/doc.go similarity index 96% rename from pkg/controller/handlefunctions/doc.go rename to pkg/controller/eventhandlers/doc.go index a0f3a26867..809e18e8de 100644 --- a/pkg/controller/handlefunctions/doc.go +++ b/pkg/controller/eventhandlers/doc.go @@ -15,4 +15,4 @@ limitations under the License. */ // The handlefunctions defines mapping and event handling functions for controllers -package handlefunctions +package eventhandlers diff --git a/pkg/controller/eventhandlers/eventhandlers.go b/pkg/controller/eventhandlers/eventhandlers.go new file mode 100644 index 0000000000..c2f990bbec --- /dev/null +++ b/pkg/controller/eventhandlers/eventhandlers.go @@ -0,0 +1,152 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package eventhandlers + +import ( + "fmt" + + "github.com/golang/glog" + "github.com/kubernetes-sigs/kubebuilder/pkg/controller/predicates" + "github.com/kubernetes-sigs/kubebuilder/pkg/controller/types" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" +) + +// EventHandler accepts a workqueue and returns ResourceEventHandlerFuncs that enqueue messages to it +// for add / update / delete events +type EventHandler interface { + Get(r workqueue.RateLimitingInterface) cache.ResourceEventHandlerFuncs +} + +// MapAndEnqueue provides Fns to map objects to name/namespace keys and enqueue them as messages +type MapAndEnqueue struct { + Predicates []predicates.Predicate + // Map maps an object to a key that can be enqueued + Map func(interface{}) string +} + +// Get returns ResourceEventHandlerFuncs that Map an object to a Key and enqueue the key if it is non-empty +func (mp MapAndEnqueue) Get(r workqueue.RateLimitingInterface) cache.ResourceEventHandlerFuncs { + // Enqueue the mapped key for updates to the object + return cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + for _, p := range mp.Predicates { + if !p.HandleCreate(obj) { + return + } + } + mp.addRateLimited(r, obj) + }, + UpdateFunc: func(old, obj interface{}) { + for _, p := range mp.Predicates { + if !p.HandleUpdate(old, obj) { + return + } + } + mp.addRateLimited(r, obj) + }, + DeleteFunc: func(obj interface{}) { + for _, p := range mp.Predicates { + if !p.HandleDelete(obj) { + return + } + } + mp.addRateLimited(r, obj) + }, + } +} + +// addRateLimited maps the obj to a string. If the string is non-empty, it is enqueued. +func (mp MapAndEnqueue) addRateLimited(r workqueue.RateLimitingInterface, obj interface{}) { + k := mp.Map(obj) + if len(k) > 0 { + r.AddRateLimited(k) + } +} + +type ControllerLookup func(types.ReconcileKey) (interface{}, error) + +type Path []ControllerLookup + +type MapToController struct { + Path Path +} + +// MapToController returns the namespace/name key of the controller for obj +func (m MapToController) Map(obj interface{}) string { + var object metav1.Object + var ok bool + if object, ok = obj.(metav1.Object); !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + runtime.HandleError(fmt.Errorf("error decoding object, invalid type")) + return "" + } + object, ok = tombstone.Obj.(metav1.Object) + if !ok { + runtime.HandleError(fmt.Errorf("error decoding object tombstone, invalid type")) + return "" + } + glog.V(4).Infof("Recovered deleted object '%s' from tombstone", object.GetName()) + } + glog.V(4).Infof("Processing object: %s", object.GetName()) + // Walk the controller path to the root + o := object + for len(m.Path) > 0 { + // Get the owner reference + if ownerRef := metav1.GetControllerOf(o); ownerRef != nil { + // Resolve the owner object and check if the UID of the looked up object matches the reference. + owner, err := m.Path[0](types.ReconcileKey{Name: ownerRef.Name, Namespace: o.GetNamespace()}) + if err != nil || owner == nil { + glog.V(2).Infof("Could not lookup owner %v %v", owner, err) + return "" + } + var ownerObject metav1.Object + if ownerObject, ok = owner.(metav1.Object); !ok { + glog.V(2).Infof("No ObjectMeta for owner %v %v", owner, err) + return "" + } + if ownerObject.GetUID() != ownerRef.UID { + return "" + } + + // Pop the path element or return the value + if len(m.Path) > 1 { + o = ownerObject + m.Path = m.Path[1:] + } else { + return object.GetNamespace() + "/" + ownerRef.Name + } + } + } + return "" +} + +// ObjToKey returns a string namespace/name key for an object +type ObjToKey func(interface{}) string + +// MapToSelf returns the namespace/name key of obj +func MapToSelf(obj interface{}) string { + if key, err := cache.MetaNamespaceKeyFunc(obj); err != nil { + runtime.HandleError(err) + return "" + } else { + return key + } +} diff --git a/pkg/controller/eventhandlers/eventhandlers_suite_test.go b/pkg/controller/eventhandlers/eventhandlers_suite_test.go new file mode 100644 index 0000000000..8da28f7c58 --- /dev/null +++ b/pkg/controller/eventhandlers/eventhandlers_suite_test.go @@ -0,0 +1,29 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package eventhandlers + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "testing" +) + +func TestEventhandlers(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Eventhandlers Suite") +} diff --git a/pkg/controller/eventhandlers/eventhandlers_test.go b/pkg/controller/eventhandlers/eventhandlers_test.go new file mode 100644 index 0000000000..f9fa8c2448 --- /dev/null +++ b/pkg/controller/eventhandlers/eventhandlers_test.go @@ -0,0 +1,419 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package eventhandlers_test + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "github.com/kubernetes-sigs/kubebuilder/pkg/controller/eventhandlers" + "github.com/kubernetes-sigs/kubebuilder/pkg/controller/types" + + "fmt" + //"github.com/kubernetes-sigs/kubebuilder/pkg/controller/predicates" + "github.com/kubernetes-sigs/kubebuilder/pkg/controller/predicates" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/util/workqueue" +) + +var _ = Describe("Eventhandlers", func() { + var ( + t = true + mae = eventhandlers.MapAndEnqueue{} + q = workqueue.NewNamedRateLimitingQueue(workqueue.NewMaxOfRateLimiter(), "world") + ) + + BeforeEach(func() { + mae = eventhandlers.MapAndEnqueue{ + Map: func(i interface{}) string { return fmt.Sprintf("p-%v", i) }, + } + q = workqueue.NewNamedRateLimitingQueue(workqueue.NewMaxOfRateLimiter( + workqueue.NewItemExponentialFailureRateLimiter(0, 0), + ), "world") + }) + + Describe("When mapping and enqueuing an event", func() { + Context("Where there are no Predicates", func() { + It("should set the Add function", func() { + fns := mae.Get(q) + fns.AddFunc("add") + Eventually(q.Len).Should(Equal(1)) + Expect(q.Get()).Should(Equal("p-add")) + }) + + It("should set the Delete function", func() { + fns := mae.Get(q) + fns.DeleteFunc("delete") + Eventually(q.Len()).Should(Equal(1)) + Expect(q.Get()).Should(Equal("p-delete")) + }) + + It("should set the Update function", func() { + fns := mae.Get(q) + fns.UpdateFunc("old", "update") + Eventually(q.Len()).Should(Equal(1)) + Expect(q.Get()).Should(Equal("p-update")) + }) + }) + + Context("Where there is one true Predicate", func() { + It("should set the Add function", func() { + mae.Predicates = []predicates.Predicate{FakePredicates{create: true}} + fns := mae.Get(q) + fns.AddFunc("add") + Eventually(q.Len()).Should(Equal(1)) + Expect(q.Get()).Should(Equal("p-add")) + + fns.DeleteFunc("delete") + fns.UpdateFunc("old", "update") + Consistently(q.Len).Should(Equal(0)) + }) + + It("should set the Delete function", func() { + mae.Predicates = []predicates.Predicate{FakePredicates{delete: true}} + fns := mae.Get(q) + fns.DeleteFunc("delete") + Eventually(q.Len()).Should(Equal(1)) + Expect(q.Get()).Should(Equal("p-delete")) + + fns.AddFunc("add") + fns.UpdateFunc("old", "add") + Consistently(q.Len).Should(Equal(0)) + }) + + It("should set the Update function", func() { + mae.Predicates = []predicates.Predicate{FakePredicates{update: true}} + fns := mae.Get(q) + fns.UpdateFunc("old", "update") + Eventually(q.Len()).Should(Equal(1)) + Expect(q.Get()).Should(Equal("p-update")) + + fns.AddFunc("add") + fns.DeleteFunc("delete") + Consistently(q.Len).Should(Equal(0)) + }) + }) + + Context("Where there are both true and false Predicates", func() { + Context("Where there is one false Predicate", func() { + It("should not Add", func() { + mae.Predicates = []predicates.Predicate{FakePredicates{create: true}, FakePredicates{}} + fns := mae.Get(q) + fns.AddFunc("add") + Consistently(q.Len).Should(Equal(0)) + }) + + It("should not Delete", func() { + mae.Predicates = []predicates.Predicate{FakePredicates{delete: true}, FakePredicates{}} + fns := mae.Get(q) + fns.DeleteFunc("delete") + Consistently(q.Len).Should(Equal(0)) + }) + + It("should not Update", func() { + mae.Predicates = []predicates.Predicate{FakePredicates{update: true}, FakePredicates{}} + fns := mae.Get(q) + fns.UpdateFunc("old", "update") + Consistently(q.Len).Should(Equal(0)) + }) + + It("should not Add", func() { + mae.Predicates = []predicates.Predicate{FakePredicates{}, FakePredicates{create: true}} + fns := mae.Get(q) + fns.AddFunc("add") + Consistently(q.Len).Should(Equal(0)) + }) + + It("should not Delete", func() { + mae.Predicates = []predicates.Predicate{FakePredicates{}, FakePredicates{delete: true}} + fns := mae.Get(q) + fns.DeleteFunc("delete") + Consistently(q.Len).Should(Equal(0)) + }) + + It("should not Update", func() { + mae.Predicates = []predicates.Predicate{FakePredicates{}, FakePredicates{update: true}} + fns := mae.Get(q) + fns.UpdateFunc("old", "update") + Consistently(q.Len).Should(Equal(0)) + }) + }) + }) + }) + + Describe("When mapping an object to itself", func() { + Context("Where the object has key metadata", func() { + It("should return the reconcile key for itself", func() { + result := eventhandlers.MapToSelf(&corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: "not-default", + OwnerReferences: []metav1.OwnerReference{ + { + Name: "test-replicaset", + Controller: &t, + UID: "uid5", + }, + }, + }, + }) + Expect(result).Should(Equal("not-default/test-pod")) + }) + }) + + Context("Where the object does not have key metadata", func() { + It("should return the empty string", func() { + obj := "" + result := eventhandlers.MapToSelf(&obj) + Expect(result).Should(Equal("")) + }) + }) + }) + + Describe("When mapping events for an object to the objects controller", func() { + var ( + mtc = eventhandlers.MapToController{} + ) + BeforeEach(func() { + mtc = eventhandlers.MapToController{} + }) + + Context("Where the object doesn't have metadata", func() { + It("should return the empty string", func() { + s := "" + result := mtc.Map(&s) + Expect(result).Should(Equal("")) + }) + }) + + Context("Where the path is empty", func() { + It("should return the empty string", func() { + result := mtc.Map(&corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: "default", + OwnerReferences: []metav1.OwnerReference{ + { + Name: "test-replicaset", + Controller: &t, + UID: "uid5", + }, + }, + }, + }) + Expect(result).Should(Equal("")) + }) + }) + + Context("Where the controller isn't found", func() { + It("should return the empty string", func() { + mtc.Path = eventhandlers.Path{ + func(k types.ReconcileKey) (interface{}, error) { + return nil, nil + }, + } + result := mtc.Map(&corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: "default", + OwnerReferences: []metav1.OwnerReference{ + { + Name: "test-replicaset", + Controller: &t, + UID: "uid5", + }, + }, + }, + }) + Expect(result).Should(Equal("")) + }) + }) + + Context("Where an error is returned when looking up the controller", func() { + It("should return the empty string", func() { + mtc.Path = eventhandlers.Path{ + func(k types.ReconcileKey) (interface{}, error) { + return &appsv1.ReplicaSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-replicaset", + Namespace: "default", + UID: "uid5", + }, + }, fmt.Errorf("error") + }, + } + result := mtc.Map(&corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: "default", + OwnerReferences: []metav1.OwnerReference{ + { + Name: "test-replicaset", + Controller: &t, + UID: "uid5", + }, + }, + }, + }) + Expect(result).Should(Equal("")) + }) + }) + + Context("Where the returned controller doesn't have metadata", func() { + It("should return the empty string", func() { + mtc.Path = eventhandlers.Path{ + func(k types.ReconcileKey) (interface{}, error) { + s := "" + return &s, nil + }, + } + result := mtc.Map(&corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: "default", + OwnerReferences: []metav1.OwnerReference{ + { + Name: "test-replicaset", + Controller: &t, + UID: "uid5", + }, + }, + }, + }) + Expect(result).Should(Equal("")) + }) + }) + + Context("Where the controller UID matches", func() { + It("should return the controller's namespace/name", func() { + mtc.Path = eventhandlers.Path{ + func(k types.ReconcileKey) (interface{}, error) { + return &appsv1.ReplicaSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-replicaset", + Namespace: "default", + UID: "uid5", + }, + }, nil + }, + } + result := mtc.Map(&corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: "default", + OwnerReferences: []metav1.OwnerReference{ + { + Name: "test-replicaset", + Controller: &t, + UID: "uid5", + }, + }, + }, + }) + Expect(result).Should(Equal("default/test-replicaset")) + }) + }) + + Context("Where the controller UID doesn't match", func() { + It("should not return the controller's namespace/name", func() { + mtc.Path = eventhandlers.Path{ + func(k types.ReconcileKey) (interface{}, error) { + defer GinkgoRecover() + Expect(k).Should(Equal(types.ReconcileKey{Name: "test-replicaset", Namespace: "default"})) + return &appsv1.ReplicaSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-replicaset", + Namespace: "default", + UID: "uid5", + }, + }, nil + }, + } + result := mtc.Map(&corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: "default", + OwnerReferences: []metav1.OwnerReference{ + { + Name: "test-replicaset", + Controller: &t, + UID: "uid3", + }, + }, + }, + }) + Expect(result).Should(Equal("")) + }) + }) + + Context("Where the controller maps to another controller", func() { + It("should return the controller's-controller's namespace/name", func() { + mtc.Path = eventhandlers.Path{ + func(k types.ReconcileKey) (interface{}, error) { + return &appsv1.ReplicaSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-replicaset", + Namespace: "default", + UID: "uid5", + OwnerReferences: []metav1.OwnerReference{ + { + Name: "test-deployment", + UID: "uid7", + Controller: &t, + }, + }, + }, + }, nil + }, + func(k types.ReconcileKey) (interface{}, error) { + return &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-deployment", + Namespace: "default", + UID: "uid7", + }, + }, nil + }, + } + result := mtc.Map(&corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: "default", + OwnerReferences: []metav1.OwnerReference{ + { + Name: "test-replicaset", + Controller: &t, + UID: "uid5", + }, + }, + }, + }) + Expect(result).Should(Equal("default/test-deployment")) + }) + }) + }) +}) + +type FakePredicates struct { + update, delete, create bool +} + +func (h FakePredicates) HandleUpdate(old, new interface{}) bool { return h.update } +func (h FakePredicates) HandleDelete(obj interface{}) bool { return h.delete } +func (h FakePredicates) HandleCreate(obj interface{}) bool { return h.create } diff --git a/pkg/controller/example_controller_test.go b/pkg/controller/example_controller_test.go index a2bccb046d..93cfb995f9 100644 --- a/pkg/controller/example_controller_test.go +++ b/pkg/controller/example_controller_test.go @@ -23,6 +23,7 @@ import ( "github.com/kubernetes-sigs/kubebuilder/pkg/config" "github.com/kubernetes-sigs/kubebuilder/pkg/controller" + "github.com/kubernetes-sigs/kubebuilder/pkg/controller/eventhandlers" "github.com/kubernetes-sigs/kubebuilder/pkg/controller/types" "github.com/kubernetes-sigs/kubebuilder/pkg/inject/run" appsv1 "k8s.io/api/apps/v1" @@ -59,7 +60,11 @@ func ExampleGenericController() { return nil }, } - if err := rsController.WatchAndMapToController(&corev1.Pod{}); err != nil { + + fn := func(k types.ReconcileKey) (interface{}, error) { + return informerFactory.Apps().V1().ReplicaSets().Lister().ReplicaSets(k.Namespace).Get(k.Name) + } + if err := rsController.WatchControllerOf(&corev1.Pod{}, eventhandlers.Path{fn}); err != nil { log.Fatalf("%v", err) } if err := rsController.Watch(&appsv1.ReplicaSet{}); err != nil { diff --git a/pkg/controller/example_watchandhandleevents_test.go b/pkg/controller/example_watchandhandleevents_test.go index 4f6e3aae2f..9146b631ac 100644 --- a/pkg/controller/example_watchandhandleevents_test.go +++ b/pkg/controller/example_watchandhandleevents_test.go @@ -23,7 +23,7 @@ import ( "github.com/kubernetes-sigs/kubebuilder/pkg/config" "github.com/kubernetes-sigs/kubebuilder/pkg/controller" - "github.com/kubernetes-sigs/kubebuilder/pkg/controller/handlefunctions" + "github.com/kubernetes-sigs/kubebuilder/pkg/controller/eventhandlers" "github.com/kubernetes-sigs/kubebuilder/pkg/controller/types" "github.com/kubernetes-sigs/kubebuilder/pkg/inject/run" corev1 "k8s.io/api/core/v1" @@ -47,14 +47,14 @@ func ExampleGenericController_WatchAndHandleEvents() { return nil }, } - err := c.WatchAndHandleEvents(&corev1.Pod{}, + err := c.WatchEvents(&corev1.Pod{}, // This function returns the callbacks that will be invoked for events func(q workqueue.RateLimitingInterface) cache.ResourceEventHandlerFuncs { // This function implements the same functionality as GenericController.Watch return cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { q.AddRateLimited(handlefunctions.MapToSelf(obj)) }, - UpdateFunc: func(old, obj interface{}) { q.AddRateLimited(handlefunctions.MapToSelf(obj)) }, - DeleteFunc: func(obj interface{}) { q.AddRateLimited(handlefunctions.MapToSelf(obj)) }, + AddFunc: func(obj interface{}) { q.AddRateLimited(eventhandlers.MapToSelf(obj)) }, + UpdateFunc: func(old, obj interface{}) { q.AddRateLimited(eventhandlers.MapToSelf(obj)) }, + DeleteFunc: func(obj interface{}) { q.AddRateLimited(eventhandlers.MapToSelf(obj)) }, } }) if err != nil { diff --git a/pkg/controller/example_watchandmap_test.go b/pkg/controller/example_watchandmap_test.go index 3588339bc9..84d9fe529b 100644 --- a/pkg/controller/example_watchandmap_test.go +++ b/pkg/controller/example_watchandmap_test.go @@ -52,7 +52,7 @@ func ExampleGenericController_WatchAndMap() { if err != nil { log.Fatalf("%v", err) } - err = c.WatchAndMap(&corev1.Pod{}, + err = c.WatchTransformationOf(&corev1.Pod{}, func(i interface{}) string { p, ok := i.(*corev1.Pod) if !ok { diff --git a/pkg/controller/example_watchandmaptocontroller_test.go b/pkg/controller/example_watchandmaptocontroller_test.go index 1fe8cd3aa0..55c994a395 100644 --- a/pkg/controller/example_watchandmaptocontroller_test.go +++ b/pkg/controller/example_watchandmaptocontroller_test.go @@ -23,10 +23,10 @@ import ( "github.com/kubernetes-sigs/kubebuilder/pkg/config" "github.com/kubernetes-sigs/kubebuilder/pkg/controller" + "github.com/kubernetes-sigs/kubebuilder/pkg/controller/eventhandlers" "github.com/kubernetes-sigs/kubebuilder/pkg/controller/types" "github.com/kubernetes-sigs/kubebuilder/pkg/inject/run" corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) func ExampleGenericController_WatchAndMapToController() { @@ -44,9 +44,10 @@ func ExampleGenericController_WatchAndMapToController() { return nil }, } - err := c.WatchAndMapToController(&corev1.Pod{}, - metav1.GroupVersionKind{Group: "apps", Version: "v1", Kind: "ReplicaSet"}, - ) + fn := func(k types.ReconcileKey) (interface{}, error) { + return informerFactory.Apps().V1().ReplicaSets().Lister().ReplicaSets(k.Namespace).Get(k.Name) + } + err := c.WatchControllerOf(&corev1.Pod{}, eventhandlers.Path{fn}) if err != nil { log.Fatalf("%v", err) } diff --git a/pkg/controller/handlefunctions/handlerfuncs.go b/pkg/controller/handlefunctions/handlerfuncs.go deleted file mode 100644 index 9d83f15ade..0000000000 --- a/pkg/controller/handlefunctions/handlerfuncs.go +++ /dev/null @@ -1,177 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package handlefunctions - -import ( - "fmt" - - "github.com/golang/glog" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/client-go/tools/cache" - "k8s.io/client-go/util/workqueue" -) - -var ( - ResourceVersionChanged = ResourceVersionChangedPredicate{} -) - -// MappingEnqueuingFnProvider provides Fns to map objects to name/namespace keys and enqueue them as messages -type MappingEnqueuingFnProvider struct { - Predicate Predicate - // Map maps an object to a key that can be enqueued - Map func(interface{}) string -} - -// HandlingFnsForQueue accepts a workqueue and returns ResourceEventHandlerFuncs that enqueue messages to it -// for add / update / delete events -type HandlingFnsForQueue interface { - Get(r workqueue.RateLimitingInterface) cache.ResourceEventHandlerFuncs -} - -// Get returns ResourceEventHandlerFuncs that Map an object to a Key and enqueue the key if it is non-empty -func (mp MappingEnqueuingFnProvider) Get(r workqueue.RateLimitingInterface) cache.ResourceEventHandlerFuncs { - // Enqueue the mapped key for updates to the object - return cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { mp.addRateLimited(r, obj) }, - UpdateFunc: func(old, obj interface{}) { mp.addRateLimited(r, obj) }, - DeleteFunc: func(obj interface{}) { mp.addRateLimited(r, obj) }, - } -} - -// addRateLimited maps the obj to a string. If the string is non-empty, it is enqueued. -func (mp MappingEnqueuingFnProvider) addRateLimited(r workqueue.RateLimitingInterface, obj interface{}) { - k := mp.Map(obj) - if len(k) > 0 { - r.AddRateLimited(k) - } -} - -type MapToController struct { - GVK []metav1.GroupVersionKind -} - -// MapToController returns the namespace/name key of the controller for obj -func (m MapToController) Map(obj interface{}) string { - var object metav1.Object - var ok bool - if object, ok = obj.(metav1.Object); !ok { - tombstone, ok := obj.(cache.DeletedFinalStateUnknown) - if !ok { - runtime.HandleError(fmt.Errorf("error decoding object, invalid type")) - return "" - } - object, ok = tombstone.Obj.(metav1.Object) - if !ok { - runtime.HandleError(fmt.Errorf("error decoding object tombstone, invalid type")) - return "" - } - glog.V(4).Infof("Recovered deleted object '%s' from tombstone", object.GetName()) - } - glog.V(4).Infof("Processing object: %s", object.GetName()) - if ownerRef := metav1.GetControllerOf(object); ownerRef != nil { - // If GVK is empty don't filter - found := len(m.GVK) == 0 - - // Only notify the resource if its gvk matches - for _, gvk := range m.GVK { - if ownerRef.Kind == gvk.Kind && ownerRef.APIVersion == gvk.Group+"/"+gvk.Version { - found = true - } - } - - if !found { - return "" - } - return object.GetNamespace() + "/" + ownerRef.Name - } - return "" -} - -// MapToSelf returns the namespace/name key of obj -func MapToSelf(obj interface{}) string { - if key, err := cache.MetaNamespaceKeyFunc(obj); err != nil { - runtime.HandleError(err) - return "" - } else { - return key - } -} - -// ObjToKey returns a string namespace/name key for an object -type ObjToKey func(interface{}) string - -type Predicate interface { - HandleUpdate(old, new interface{}) bool - HandleDelete(obj interface{}) bool - HandleCreate(obj interface{}) bool -} - -type TrueMixin struct{} - -func (TrueMixin) HandleUpdate(old, new interface{}) bool { - return true -} - -func (TrueMixin) HandleDelete(obj interface{}) bool { - return true -} - -func (TrueMixin) HandleCreate(obj interface{}) bool { - return true -} - -type FalseMixin struct{} - -func (FalseMixin) HandleUpdate(old, new interface{}) bool { - return false -} - -func (FalseMixin) HandleDelete(obj interface{}) bool { - return false -} - -func (FalseMixin) HandleCreate(obj interface{}) bool { - return false -} - -type ConditionalMappingHandler struct { - MappingEnqueuingFnProvider -} - -type ResourceVersionChangedPredicate struct { - TrueMixin -} - -func (ResourceVersionChangedPredicate) HandleUpdate(old, new interface{}) bool { - oldObject, ok := old.(metav1.Object) - if !ok { - fmt.Errorf("Cannot handle %T because old is not an Object: %v\n", oldObject, oldObject) - return false - } - newObject, ok := new.(metav1.Object) - if !ok { - fmt.Errorf("Cannot handle %T because new is not an Object: %v\n", newObject, newObject) - return false - } - - if oldObject.GetResourceVersion() == newObject.GetResourceVersion() { - // Periodic resync will send update events for all resources Deployments. - return false - } - return true -} diff --git a/pkg/controller/listeningqueue.go b/pkg/controller/listeningqueue.go index 62b59eaefe..572e9f0c48 100644 --- a/pkg/controller/listeningqueue.go +++ b/pkg/controller/listeningqueue.go @@ -19,7 +19,7 @@ package controller import ( "fmt" - "github.com/kubernetes-sigs/kubebuilder/pkg/controller/handlefunctions" + "github.com/kubernetes-sigs/kubebuilder/pkg/controller/eventhandlers" "github.com/kubernetes-sigs/kubebuilder/pkg/controller/informers" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/cache" @@ -41,40 +41,6 @@ type listeningQueue struct { synced []cache.InformerSynced } -// watchFor watches objects matching obj's type and enqueues their keys. -func (q *listeningQueue) watchFor(obj metav1.Object) error { - return q.addEventHandler(obj, handlefunctions.MappingEnqueuingFnProvider{Map: handlefunctions.MapToSelf}) -} - -// watchForAndMapToController watches objects matching obj's type and enqueues the keys of their controllers. -func (q *listeningQueue) watchForAndMapToController(obj metav1.Object, gvks ...metav1.GroupVersionKind) error { - return q.addEventHandler(obj, handlefunctions.MappingEnqueuingFnProvider{ - Map: handlefunctions.MapToController{GVK: gvks}.Map, - }) -} - -// watchForAndMapToControllerIf watches objects matching obj's type and enqueues the keys of their controllers. -func (q *listeningQueue) watchForAndMapToControllerIf(obj metav1.Object, predicate handlefunctions.Predicate, - gvks ...metav1.GroupVersionKind) error { - return q.addEventHandler(obj, handlefunctions.MappingEnqueuingFnProvider{ - Map: handlefunctions.MapToController{GVK: gvks}.Map, - Predicate: predicate, - }) -} - -// WatchAndMap watches objects matching obj's type and maps them to keys that it then enqueues. -func (q *listeningQueue) watchForAndMapToNewObjectKey( - obj metav1.Object, mappingFn handlefunctions.ObjToKey) error { - - return q.addEventHandler(obj, handlefunctions.MappingEnqueuingFnProvider{Map: mappingFn}) -} - -// watchForAndHandleEvent watches objects matching obj's type and uses the functions from provider to handle events. -func (q *listeningQueue) watchForAndHandleEvent( - obj metav1.Object, provider handlefunctions.HandlingFnsForQueue) error { - return q.addEventHandler(obj, provider) -} - // watchChannel enqueues message from a channel func (q *listeningQueue) watchChannel(source <-chan string) error { go func() { @@ -86,14 +52,13 @@ func (q *listeningQueue) watchChannel(source <-chan string) error { } // addEventHandler uses the provider functions to add an event handler for events to objects matching obj's type -func (q *listeningQueue) addEventHandler( - obj metav1.Object, provider handlefunctions.HandlingFnsForQueue) error { +func (q *listeningQueue) addEventHandler(obj metav1.Object, eh eventhandlers.EventHandler) error { i, err := q.lookupInformer(obj) if err != nil { return err } - fns := provider.Get(q.RateLimitingInterface) + fns := eh.Get(q.RateLimitingInterface) q.synced = append(q.synced, i.HasSynced) i.AddEventHandler(fns) return nil diff --git a/pkg/controller/listeningqueue_test.go b/pkg/controller/listeningqueue_test.go index 68dc246cc6..317f981917 100644 --- a/pkg/controller/listeningqueue_test.go +++ b/pkg/controller/listeningqueue_test.go @@ -25,7 +25,6 @@ import ( "github.com/kubernetes-sigs/kubebuilder/pkg/controller/test" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/util/workqueue" ) @@ -54,147 +53,6 @@ var _ = Describe("ListeningQueue", func() { } }) - Describe("Listening to a Pod SharedInformer", func() { - Context("Where a Pod has been added", func() { - It("should add the Pod namespace/name key to the queue", func() { - // Listen for Pod changes - Expect(instance.watchFor(&corev1.Pod{})).Should(Succeed()) - - // Create a Pod event - fakePodInformer.Add(&corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-pod", - Namespace: "default", - }, - }) - - Eventually(instance.Len, time.Second*2).Should(Equal(1)) - key, shutdown := instance.Get() - Expect(shutdown).To(Equal(false)) - Expect(key).To(Equal("default/test-pod")) - Expect(instance.Len()).To(Equal(0)) - }) - }) - - Context("Where several Pods have been added", func() { - It("should add all the Pod namespace/name keys to the queue", func() { - // Listen for Pod changes - Expect(instance.watchFor(&corev1.Pod{})).Should(Succeed()) - - // Create a Pod event - fakePodInformer.Add(&corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-pod-1", - Namespace: "default-1", - }, - }) - - fakePodInformer.Add(&corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-pod-2", - Namespace: "default-2", - }, - }) - - keys := []string{} - Eventually(instance.Len, time.Second*2).Should(Equal(2)) - - key, shutdown := instance.Get() - Expect(shutdown).To(Equal(false)) - keys = append(keys, key.(string)) - - key, shutdown = instance.Get() - Expect(shutdown).To(Equal(false)) - keys = append(keys, key.(string)) - - Expect(instance.Len()).To(Equal(0)) - Expect(keys).Should(ConsistOf("default-1/test-pod-1", "default-2/test-pod-2")) - }) - }) - - Context("Where the same Pod is added multiple times", func() { - It("should add the Pod namespace/name to the queue exactly once", func() { - // Listen for Pod changes - Expect(instance.watchFor(&corev1.Pod{})).Should(Succeed()) - - // Create a Pod event - pod := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-pod", - Namespace: "default", - }, - } - - // Add the Pod a bunch of times - for i := 0; i < 10; i++ { - fakePodInformer.Add(pod) - } - - Consistently(instance.Len, time.Second*2).Should(BeNumerically("<=", 1)) - - Expect(instance.Len()).To(Equal(1)) - key, shutdown := instance.Get() - Expect(shutdown).To(Equal(false)) - Expect(key).To(Equal("default/test-pod")) - Expect(instance.Len()).To(Equal(0)) - }) - }) - }) - - Describe("Listening to a ReplicaSet SharedInformer", func() { - Context("Where a Pod has been added by the RS", func() { - It("should add the parent ReplicaSet namespace/name to the queue", func() { - // Listen for Pod changes - Expect(instance.watchForAndMapToController(&corev1.Pod{})).Should(Succeed()) - - // Create a Pod event - c := true - fakePodInformer.Add(&corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-pod", - Namespace: "default", - OwnerReferences: []metav1.OwnerReference{ - { - Name: "test-replicaset", - Controller: &c, - }, - }, - }, - }) - - Eventually(instance.Len, time.Second*1).Should(Equal(1)) - key, shutdown := instance.Get() - Expect(shutdown).To(Equal(false)) - Expect(key).To(Equal("default/test-replicaset")) - Expect(instance.Len()).To(Equal(0)) - }) - }) - - Context("Where a Pod has been added but not by the RS", func() { - It("should add the parent ReplicaSet namespace/name to the queue", func() { - // Listen for Pod changes - Expect(instance.watchForAndMapToController(&corev1.Pod{})).Should(Succeed()) - - // Create a Pod event - fakePodInformer.Add(&corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-pod", - Namespace: "default", - OwnerReferences: []metav1.OwnerReference{ - { - Name: "test-replicaset", - // Doesn't have controller set - }, - }, - }, - }) - - // Should not enqueue a message since controller isn't set - Consistently(instance.Len, time.Second*1).Should(Equal(0)) - }) - }) - }) - Describe("Listening to a Channel", func() { Context("Where a message is sent", func() { It("should enqueue the message", func() { diff --git a/pkg/controller/manager_test.go b/pkg/controller/manager_test.go deleted file mode 100644 index 03bc551c5e..0000000000 --- a/pkg/controller/manager_test.go +++ /dev/null @@ -1,10 +0,0 @@ -package controller - -import ( - . "github.com/onsi/ginkgo" - //. "github.com/onsi/gomega" -) - -var _ = Describe("Singleton", func() { - -}) diff --git a/pkg/controller/predicates/predicates.go b/pkg/controller/predicates/predicates.go new file mode 100644 index 0000000000..7f2693e742 --- /dev/null +++ b/pkg/controller/predicates/predicates.go @@ -0,0 +1,83 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package predicates + +import ( + "fmt" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +var ( + ResourceVersionChanged = ResourceVersionChangedPredicate{} +) + +type Predicate interface { + HandleUpdate(old, new interface{}) bool + HandleDelete(obj interface{}) bool + HandleCreate(obj interface{}) bool +} + +type TrueMixin struct{} + +func (TrueMixin) HandleUpdate(old, new interface{}) bool { + return true +} + +func (TrueMixin) HandleDelete(obj interface{}) bool { + return true +} + +func (TrueMixin) HandleCreate(obj interface{}) bool { + return true +} + +type FalseMixin struct{} + +func (FalseMixin) HandleUpdate(old, new interface{}) bool { + return false +} + +func (FalseMixin) HandleDelete(obj interface{}) bool { + return false +} + +func (FalseMixin) HandleCreate(obj interface{}) bool { + return false +} + +type ResourceVersionChangedPredicate struct { + TrueMixin +} + +func (ResourceVersionChangedPredicate) HandleUpdate(old, new interface{}) bool { + oldObject, ok := old.(metav1.Object) + if !ok { + fmt.Errorf("Cannot handle %T because old is not an Object: %v\n", oldObject, oldObject) + return false + } + newObject, ok := new.(metav1.Object) + if !ok { + fmt.Errorf("Cannot handle %T because new is not an Object: %v\n", newObject, newObject) + return false + } + + if oldObject.GetResourceVersion() == newObject.GetResourceVersion() { + // Periodic resync will send update events for all resources Deployments. + return false + } + return true +} diff --git a/pkg/controller/handlefunctions/handlerfuncs_test.go b/pkg/controller/predicates/predicates_suite_test.go similarity index 79% rename from pkg/controller/handlefunctions/handlerfuncs_test.go rename to pkg/controller/predicates/predicates_suite_test.go index 5b214c8669..abe50afa6e 100644 --- a/pkg/controller/handlefunctions/handlerfuncs_test.go +++ b/pkg/controller/predicates/predicates_suite_test.go @@ -14,13 +14,16 @@ See the License for the specific language governing permissions and limitations under the License. */ -package handlefunctions +package predicates import ( . "github.com/onsi/ginkgo" - //. "github.com/onsi/gomega" -) + . "github.com/onsi/gomega" -var _ = Describe("Handlerfuncs", func() { + "testing" +) -}) +func TestPredicates(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Predicates Suite") +} diff --git a/pkg/controller/predicates/predicates_test.go b/pkg/controller/predicates/predicates_test.go new file mode 100644 index 0000000000..e91785e359 --- /dev/null +++ b/pkg/controller/predicates/predicates_test.go @@ -0,0 +1,117 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package predicates + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +var _ = Describe("Predicates", func() { + var () + + BeforeEach(func() { + }) + + Describe("When checking the TrueMixin Predicate", func() { + It("should return true for Add", func() { + Expect(TrueMixin{}.HandleCreate("")).Should(BeTrue()) + }) + It("should return true for Update", func() { + Expect(TrueMixin{}.HandleUpdate("", "")).Should(BeTrue()) + }) + It("should return true for Delete", func() { + Expect(TrueMixin{}.HandleDelete("")).Should(BeTrue()) + }) + }) + + Describe("When checking the FalseMixin Predicate", func() { + It("should return true for Add", func() { + Expect(FalseMixin{}.HandleCreate("")).Should(BeFalse()) + }) + It("should return true for Update", func() { + Expect(FalseMixin{}.HandleUpdate("", "")).Should(BeFalse()) + }) + It("should return true for Delete", func() { + Expect(FalseMixin{}.HandleDelete("")).Should(BeFalse()) + }) + }) + + Describe("When checking a ResourceVersionChangedPredicate", func() { + Context("Where the old object doesn't have a ResourceVersion", func() { + It("should return false", func() { + instance := ResourceVersionChangedPredicate{} + Expect(instance.HandleDelete("")).Should(BeTrue()) + Expect(instance.HandleCreate("")).Should(BeTrue()) + Expect(instance.HandleUpdate(&corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "1", + }, + }, "")).Should(BeFalse()) + }) + }) + + Context("Where the new object doesn't have a ResourceVersion", func() { + It("should return false", func() { + instance := ResourceVersionChangedPredicate{} + Expect(instance.HandleDelete("")).Should(BeTrue()) + Expect(instance.HandleCreate("")).Should(BeTrue()) + Expect(instance.HandleUpdate("", &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "1", + }, + })).Should(BeFalse()) + }) + }) + + Context("Where the ResourceVersion hasn't changed", func() { + It("should return false", func() { + instance := ResourceVersionChangedPredicate{} + Expect(instance.HandleDelete("")).Should(BeTrue()) + Expect(instance.HandleCreate("")).Should(BeTrue()) + Expect(instance.HandleUpdate(&corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "1", + }, + }, &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "1", + }, + })).Should(BeFalse()) + }) + }) + + Context("Where the ResourceVersion has changed", func() { + It("should return true", func() { + instance := ResourceVersionChangedPredicate{} + Expect(instance.HandleDelete("")).Should(BeTrue()) + Expect(instance.HandleCreate("")).Should(BeTrue()) + Expect(instance.HandleUpdate(&corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "1", + }, + }, &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "2", + }, + })).Should(BeTrue()) + }) + }) + }) +})