From 7dced0a2fa643394ed224d1fc1fd9d79f44c9e06 Mon Sep 17 00:00:00 2001 From: Phillip Wittrock Date: Mon, 2 Apr 2018 22:52:34 -0700 Subject: [PATCH] Support for ReconcileKeys in transformations --- pkg/controller/controller.go | 30 +++++++ pkg/controller/controller_test.go | 39 ++++++++- pkg/controller/eventhandlers/eventhandlers.go | 8 +- pkg/controller/example_watchandmap_test.go | 87 +++++++++++++++++++ 4 files changed, 160 insertions(+), 4 deletions(-) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 2acf18d7b8..f01940d228 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -126,6 +126,36 @@ func (gc *GenericController) WatchTransformationOf(obj metav1.Object, mapFn even // WatchTransformationsOf watches objects matching obj's type and enqueues the keys returned by mapFn. func (gc *GenericController) WatchTransformationsOf(obj metav1.Object, mapFn eventhandlers.ObjToKeys, + p ...predicates.Predicate) error { + gc.once.Do(gc.init) + return gc.queue.addEventHandler(obj, + eventhandlers.MapAndEnqueue{MultiMap: func(i interface{}) []types.ReconcileKey { + result := []types.ReconcileKey{} + for _, k := range mapFn(i) { + if namespace, name, err := cache.SplitMetaNamespaceKey(k); err == nil { + result = append(result, types.ReconcileKey{namespace, name}) + } + } + return result + }, Predicates: p}) +} + +// WatchTransformationKeyOf watches objects matching obj's type and enqueues the key returned by mapFn. +func (gc *GenericController) WatchTransformationKeyOf(obj metav1.Object, mapFn eventhandlers.ObjToReconcileKey, + p ...predicates.Predicate) error { + gc.once.Do(gc.init) + return gc.queue.addEventHandler(obj, + eventhandlers.MapAndEnqueue{MultiMap: func(i interface{}) []types.ReconcileKey { + if k := mapFn(i); len(k.Name) > 0 { + return []types.ReconcileKey{k} + } else { + return []types.ReconcileKey{} + } + }, Predicates: p}) +} + +// WatchTransformationKeysOf watches objects matching obj's type and enqueues the keys returned by mapFn. +func (gc *GenericController) WatchTransformationKeysOf(obj metav1.Object, mapFn eventhandlers.ObjToReconcileKeys, p ...predicates.Predicate) error { gc.once.Do(gc.init) return gc.queue.addEventHandler(obj, diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index 2b20fbb700..dd71a472e6 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -219,7 +219,7 @@ var _ = Describe("GenericController", func() { Expect(instance.GetMetrics().QueueLength).Should(Equal(0)) }) - It("should use the map function to reconcile a different key", func() { + It("should use the transformation function to reconcile a different key", func() { // Listen for Pod changes Expect(instance.WatchTransformationOf(&corev1.Pod{}, func(obj interface{}) string { p := obj.(*corev1.Pod) @@ -234,7 +234,22 @@ var _ = Describe("GenericController", func() { Expect(instance.GetMetrics().QueueLength).Should(Equal(0)) }) - It("should use the map function to reconcile multiple different keys", func() { + It("should use the transformationkey function to reconcile a different key", func() { + // Listen for Pod changes + Expect(instance.WatchTransformationKeyOf(&corev1.Pod{}, func(obj interface{}) types.ReconcileKey { + p := obj.(*corev1.Pod) + return types.ReconcileKey{p.Namespace + "-namespace", p.Name + "-name"} + })).Should(Succeed()) + + fakePodInformer.Add(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-pod", Namespace: "default"}}) + + val := ChannelResult{} + Eventually(result).Should(Receive(&val.result)) + Expect(val.result).Should(Equal("default-namespace/test-pod-name")) + Expect(instance.GetMetrics().QueueLength).Should(Equal(0)) + }) + + It("should use the transformationsof function to reconcile multiple different keys", func() { // Listen for Pod changes Expect(instance.WatchTransformationsOf(&corev1.Pod{}, func(obj interface{}) []string { p := obj.(*corev1.Pod) @@ -253,6 +268,26 @@ var _ = Describe("GenericController", func() { Expect(instance.GetMetrics().QueueLength).Should(Equal(0)) }) + It("should use the transformationkeysof function to reconcile multiple different keys", func() { + // Listen for Pod changes + Expect(instance.WatchTransformationKeysOf(&corev1.Pod{}, func(obj interface{}) []types.ReconcileKey { + p := obj.(*corev1.Pod) + return []types.ReconcileKey{ + {p.Namespace + "-namespace", p.Name + "-name-1"}, + {p.Namespace + "-namespace", p.Name + "-name-2"}, + } + })).Should(Succeed()) + + fakePodInformer.Add(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-pod", Namespace: "default"}}) + + val := ChannelResult{} + Eventually(result).Should(Receive(&val.result)) + Expect(val.result).Should(Equal("default-namespace/test-pod-name-1")) + Eventually(result).Should(Receive(&val.result)) + Expect(val.result).Should(Equal("default-namespace/test-pod-name-2")) + Expect(instance.GetMetrics().QueueLength).Should(Equal(0)) + }) + It("should call the event handling add function", func() { // Listen for Pod changes Expect(instance.WatchEvents(&corev1.Pod{}, diff --git a/pkg/controller/eventhandlers/eventhandlers.go b/pkg/controller/eventhandlers/eventhandlers.go index 143c94e895..3fcd8772e7 100644 --- a/pkg/controller/eventhandlers/eventhandlers.go +++ b/pkg/controller/eventhandlers/eventhandlers.go @@ -40,7 +40,7 @@ type MapAndEnqueue struct { // Map maps an object to a key that can be enqueued Map func(interface{}) string - MultiMap func(interface{}) []string + MultiMap func(interface{}) []types.ReconcileKey } // Get returns ResourceEventHandlerFuncs that Map an object to a Key and enqueue the key if it is non-empty @@ -83,7 +83,7 @@ func (mp MapAndEnqueue) addRateLimited(r workqueue.RateLimitingInterface, obj in } if mp.MultiMap != nil { for _, k := range mp.MultiMap(obj) { - r.AddRateLimited(k) + r.AddRateLimited(k.Namespace + "/" + k.Name) } } } @@ -151,6 +151,10 @@ type ObjToKey func(interface{}) string type ObjToKeys func(interface{}) []string +type ObjToReconcileKey func(interface{}) types.ReconcileKey + +type ObjToReconcileKeys func(interface{}) []types.ReconcileKey + // MapToSelf returns the namespace/name key of obj func MapToSelf(obj interface{}) string { if key, err := cache.MetaNamespaceKeyFunc(obj); err != nil { diff --git a/pkg/controller/example_watchandmap_test.go b/pkg/controller/example_watchandmap_test.go index 7399abb9e8..f8de998413 100644 --- a/pkg/controller/example_watchandmap_test.go +++ b/pkg/controller/example_watchandmap_test.go @@ -116,3 +116,90 @@ func ExampleGenericController_WatchTransformationsOf() { // One time for program controller.RunInformersAndControllers(run.CreateRunArguments()) } + +func ExampleGenericController_WatchTransformationKeyOf() { + // One time setup for program + flag.Parse() + informerFactory := config.GetKubernetesInformersOrDie() + if err := controller.AddInformerProvider(&corev1.Pod{}, informerFactory.Core().V1().Pods()); err != nil { + log.Fatalf("Could not set informer %v", err) + } + if err := controller.AddInformerProvider(&appsv1.ReplicaSet{}, informerFactory.Apps().V1().ReplicaSets()); err != nil { + log.Fatalf("Could not set informer %v", err) + } + + // Per-controller setup + c := &controller.GenericController{ + Reconcile: func(key types.ReconcileKey) error { + fmt.Printf("Reconciling Pod %s\n", key) + return nil + }, + } + err := c.Watch(&appsv1.ReplicaSet{}) + if err != nil { + log.Fatalf("%v", err) + } + err = c.WatchTransformationKeyOf(&corev1.Pod{}, + func(i interface{}) types.ReconcileKey { + p, ok := i.(*corev1.Pod) + if !ok { + return types.ReconcileKey{} + } + + // Find multiple parents based off the name + return types.ReconcileKey{p.Namespace, strings.Split(p.Name, "-")[0]} + }, + ) + if err != nil { + log.Fatalf("%v", err) + } + controller.AddController(c) + + // One time for program + controller.RunInformersAndControllers(run.CreateRunArguments()) +} + +func ExampleGenericController_WatchTransformationKeysOf() { + // One time setup for program + flag.Parse() + informerFactory := config.GetKubernetesInformersOrDie() + if err := controller.AddInformerProvider(&corev1.Pod{}, informerFactory.Core().V1().Pods()); err != nil { + log.Fatalf("Could not set informer %v", err) + } + if err := controller.AddInformerProvider(&appsv1.ReplicaSet{}, informerFactory.Apps().V1().ReplicaSets()); err != nil { + log.Fatalf("Could not set informer %v", err) + } + + // Per-controller setup + c := &controller.GenericController{ + Reconcile: func(key types.ReconcileKey) error { + fmt.Printf("Reconciling Pod %s\n", key) + return nil + }, + } + err := c.Watch(&appsv1.ReplicaSet{}) + if err != nil { + log.Fatalf("%v", err) + } + err = c.WatchTransformationKeysOf(&corev1.Pod{}, + func(i interface{}) []types.ReconcileKey { + p, ok := i.(*corev1.Pod) + if !ok { + return []types.ReconcileKey{} + } + + // Find multiple parents based off the name + return []types.ReconcileKey{ + {p.Namespace, strings.Split(p.Name, "-")[0] + "-parent-1"}, + {p.Namespace, strings.Split(p.Name, "-")[0] + "-parent-2"}, + } + }, + ) + if err != nil { + log.Fatalf("%v", err) + } + controller.AddController(c) + + // One time for program + controller.RunInformersAndControllers(run.CreateRunArguments()) +}