Skip to content

Commit

Permalink
Merge pull request #46 from pwittrock/transformations
Browse files Browse the repository at this point in the history
Support mapping an object to multiple keys
  • Loading branch information
Phillip Wittrock authored Apr 4, 2018
2 parents 12f1fba + 7dced0a commit f634b54
Show file tree
Hide file tree
Showing 4 changed files with 244 additions and 5 deletions.
38 changes: 38 additions & 0 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,44 @@ func (gc *GenericController) WatchTransformationOf(obj metav1.Object, mapFn even
eventhandlers.MapAndEnqueue{Map: mapFn, Predicates: p})
}

// WatchTransformationsOf watches objects matching obj's type and enqueues the keys returned by mapFn.
func (gc *GenericController) WatchTransformationsOf(obj metav1.Object, mapFn eventhandlers.ObjToKeys,
p ...predicates.Predicate) error {
gc.once.Do(gc.init)
return gc.queue.addEventHandler(obj,
eventhandlers.MapAndEnqueue{MultiMap: func(i interface{}) []types.ReconcileKey {
result := []types.ReconcileKey{}
for _, k := range mapFn(i) {
if namespace, name, err := cache.SplitMetaNamespaceKey(k); err == nil {
result = append(result, types.ReconcileKey{namespace, name})
}
}
return result
}, Predicates: p})
}

// WatchTransformationKeyOf watches objects matching obj's type and enqueues the key returned by mapFn.
func (gc *GenericController) WatchTransformationKeyOf(obj metav1.Object, mapFn eventhandlers.ObjToReconcileKey,
p ...predicates.Predicate) error {
gc.once.Do(gc.init)
return gc.queue.addEventHandler(obj,
eventhandlers.MapAndEnqueue{MultiMap: func(i interface{}) []types.ReconcileKey {
if k := mapFn(i); len(k.Name) > 0 {
return []types.ReconcileKey{k}
} else {
return []types.ReconcileKey{}
}
}, Predicates: p})
}

// WatchTransformationKeysOf watches objects matching obj's type and enqueues the keys returned by mapFn.
func (gc *GenericController) WatchTransformationKeysOf(obj metav1.Object, mapFn eventhandlers.ObjToReconcileKeys,
p ...predicates.Predicate) error {
gc.once.Do(gc.init)
return gc.queue.addEventHandler(obj,
eventhandlers.MapAndEnqueue{MultiMap: mapFn, Predicates: p})
}

// WatchEvents watches objects matching obj's type and uses the functions from provider to handle events.
func (gc *GenericController) WatchEvents(obj metav1.Object, provider types.HandleFnProvider) error {
gc.once.Do(gc.init)
Expand Down
59 changes: 57 additions & 2 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"

"time"

"github.com/kubernetes-sigs/kubebuilder/pkg/controller/eventhandlers"
"github.com/kubernetes-sigs/kubebuilder/pkg/controller/test"
"github.com/kubernetes-sigs/kubebuilder/pkg/controller/types"
Expand All @@ -29,7 +31,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"time"
)

var _ = Describe("GenericController", func() {
Expand Down Expand Up @@ -218,7 +219,7 @@ var _ = Describe("GenericController", func() {
Expect(instance.GetMetrics().QueueLength).Should(Equal(0))
})

It("should use the map function to reconcile a different key", func() {
It("should use the transformation function to reconcile a different key", func() {
// Listen for Pod changes
Expect(instance.WatchTransformationOf(&corev1.Pod{}, func(obj interface{}) string {
p := obj.(*corev1.Pod)
Expand All @@ -233,6 +234,60 @@ var _ = Describe("GenericController", func() {
Expect(instance.GetMetrics().QueueLength).Should(Equal(0))
})

It("should use the transformationkey function to reconcile a different key", func() {
// Listen for Pod changes
Expect(instance.WatchTransformationKeyOf(&corev1.Pod{}, func(obj interface{}) types.ReconcileKey {
p := obj.(*corev1.Pod)
return types.ReconcileKey{p.Namespace + "-namespace", p.Name + "-name"}
})).Should(Succeed())

fakePodInformer.Add(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-pod", Namespace: "default"}})

val := ChannelResult{}
Eventually(result).Should(Receive(&val.result))
Expect(val.result).Should(Equal("default-namespace/test-pod-name"))
Expect(instance.GetMetrics().QueueLength).Should(Equal(0))
})

It("should use the transformationsof function to reconcile multiple different keys", func() {
// Listen for Pod changes
Expect(instance.WatchTransformationsOf(&corev1.Pod{}, func(obj interface{}) []string {
p := obj.(*corev1.Pod)
return []string{
p.Namespace + "-namespace/" + p.Name + "-name-1",
p.Namespace + "-namespace/" + p.Name + "-name-2"}
})).Should(Succeed())

fakePodInformer.Add(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-pod", Namespace: "default"}})

val := ChannelResult{}
Eventually(result).Should(Receive(&val.result))
Expect(val.result).Should(Equal("default-namespace/test-pod-name-1"))
Eventually(result).Should(Receive(&val.result))
Expect(val.result).Should(Equal("default-namespace/test-pod-name-2"))
Expect(instance.GetMetrics().QueueLength).Should(Equal(0))
})

It("should use the transformationkeysof function to reconcile multiple different keys", func() {
// Listen for Pod changes
Expect(instance.WatchTransformationKeysOf(&corev1.Pod{}, func(obj interface{}) []types.ReconcileKey {
p := obj.(*corev1.Pod)
return []types.ReconcileKey{
{p.Namespace + "-namespace", p.Name + "-name-1"},
{p.Namespace + "-namespace", p.Name + "-name-2"},
}
})).Should(Succeed())

fakePodInformer.Add(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-pod", Namespace: "default"}})

val := ChannelResult{}
Eventually(result).Should(Receive(&val.result))
Expect(val.result).Should(Equal("default-namespace/test-pod-name-1"))
Eventually(result).Should(Receive(&val.result))
Expect(val.result).Should(Equal("default-namespace/test-pod-name-2"))
Expect(instance.GetMetrics().QueueLength).Should(Equal(0))
})

It("should call the event handling add function", func() {
// Listen for Pod changes
Expect(instance.WatchEvents(&corev1.Pod{},
Expand Down
20 changes: 17 additions & 3 deletions pkg/controller/eventhandlers/eventhandlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ type MapAndEnqueue struct {
Predicates []predicates.Predicate
// Map maps an object to a key that can be enqueued
Map func(interface{}) string

MultiMap func(interface{}) []types.ReconcileKey
}

// Get returns ResourceEventHandlerFuncs that Map an object to a Key and enqueue the key if it is non-empty
Expand Down Expand Up @@ -74,9 +76,15 @@ func (mp MapAndEnqueue) Get(r workqueue.RateLimitingInterface) cache.ResourceEve

// addRateLimited maps the obj to a string. If the string is non-empty, it is enqueued.
func (mp MapAndEnqueue) addRateLimited(r workqueue.RateLimitingInterface, obj interface{}) {
k := mp.Map(obj)
if len(k) > 0 {
r.AddRateLimited(k)
if mp.Map != nil {
if k := mp.Map(obj); len(k) > 0 {
r.AddRateLimited(k)
}
}
if mp.MultiMap != nil {
for _, k := range mp.MultiMap(obj) {
r.AddRateLimited(k.Namespace + "/" + k.Name)
}
}
}

Expand Down Expand Up @@ -141,6 +149,12 @@ func (m MapToController) Map(obj interface{}) string {
// ObjToKey returns a string namespace/name key for an object
type ObjToKey func(interface{}) string

type ObjToKeys func(interface{}) []string

type ObjToReconcileKey func(interface{}) types.ReconcileKey

type ObjToReconcileKeys func(interface{}) []types.ReconcileKey

// MapToSelf returns the namespace/name key of obj
func MapToSelf(obj interface{}) string {
if key, err := cache.MetaNamespaceKeyFunc(obj); err != nil {
Expand Down
132 changes: 132 additions & 0 deletions pkg/controller/example_watchandmap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,135 @@ func ExampleGenericController_WatchTransformationOf() {
// One time for program
controller.RunInformersAndControllers(run.CreateRunArguments())
}

func ExampleGenericController_WatchTransformationsOf() {
// One time setup for program
flag.Parse()
informerFactory := config.GetKubernetesInformersOrDie()
if err := controller.AddInformerProvider(&corev1.Pod{}, informerFactory.Core().V1().Pods()); err != nil {
log.Fatalf("Could not set informer %v", err)
}
if err := controller.AddInformerProvider(&appsv1.ReplicaSet{}, informerFactory.Apps().V1().ReplicaSets()); err != nil {
log.Fatalf("Could not set informer %v", err)
}

// Per-controller setup
c := &controller.GenericController{
Reconcile: func(key types.ReconcileKey) error {
fmt.Printf("Reconciling Pod %s\n", key)
return nil
},
}
err := c.Watch(&appsv1.ReplicaSet{})
if err != nil {
log.Fatalf("%v", err)
}
err = c.WatchTransformationsOf(&corev1.Pod{},
func(i interface{}) []string {
p, ok := i.(*corev1.Pod)
if !ok {
return []string{}
}

// Find multiple parents based off the name
return []string{
p.Namespace + "/" + strings.Split(p.Name, "-")[0] + "-parent-1",
p.Namespace + "/" + strings.Split(p.Name, "-")[0] + "-parent-2",
}
},
)
if err != nil {
log.Fatalf("%v", err)
}
controller.AddController(c)

// One time for program
controller.RunInformersAndControllers(run.CreateRunArguments())
}

func ExampleGenericController_WatchTransformationKeyOf() {
// One time setup for program
flag.Parse()
informerFactory := config.GetKubernetesInformersOrDie()
if err := controller.AddInformerProvider(&corev1.Pod{}, informerFactory.Core().V1().Pods()); err != nil {
log.Fatalf("Could not set informer %v", err)
}
if err := controller.AddInformerProvider(&appsv1.ReplicaSet{}, informerFactory.Apps().V1().ReplicaSets()); err != nil {
log.Fatalf("Could not set informer %v", err)
}

// Per-controller setup
c := &controller.GenericController{
Reconcile: func(key types.ReconcileKey) error {
fmt.Printf("Reconciling Pod %s\n", key)
return nil
},
}
err := c.Watch(&appsv1.ReplicaSet{})
if err != nil {
log.Fatalf("%v", err)
}
err = c.WatchTransformationKeyOf(&corev1.Pod{},
func(i interface{}) types.ReconcileKey {
p, ok := i.(*corev1.Pod)
if !ok {
return types.ReconcileKey{}
}

// Find multiple parents based off the name
return types.ReconcileKey{p.Namespace, strings.Split(p.Name, "-")[0]}
},
)
if err != nil {
log.Fatalf("%v", err)
}
controller.AddController(c)

// One time for program
controller.RunInformersAndControllers(run.CreateRunArguments())
}

func ExampleGenericController_WatchTransformationKeysOf() {
// One time setup for program
flag.Parse()
informerFactory := config.GetKubernetesInformersOrDie()
if err := controller.AddInformerProvider(&corev1.Pod{}, informerFactory.Core().V1().Pods()); err != nil {
log.Fatalf("Could not set informer %v", err)
}
if err := controller.AddInformerProvider(&appsv1.ReplicaSet{}, informerFactory.Apps().V1().ReplicaSets()); err != nil {
log.Fatalf("Could not set informer %v", err)
}

// Per-controller setup
c := &controller.GenericController{
Reconcile: func(key types.ReconcileKey) error {
fmt.Printf("Reconciling Pod %s\n", key)
return nil
},
}
err := c.Watch(&appsv1.ReplicaSet{})
if err != nil {
log.Fatalf("%v", err)
}
err = c.WatchTransformationKeysOf(&corev1.Pod{},
func(i interface{}) []types.ReconcileKey {
p, ok := i.(*corev1.Pod)
if !ok {
return []types.ReconcileKey{}
}

// Find multiple parents based off the name
return []types.ReconcileKey{
{p.Namespace, strings.Split(p.Name, "-")[0] + "-parent-1"},
{p.Namespace, strings.Split(p.Name, "-")[0] + "-parent-2"},
}
},
)
if err != nil {
log.Fatalf("%v", err)
}
controller.AddController(c)

// One time for program
controller.RunInformersAndControllers(run.CreateRunArguments())
}

0 comments on commit f634b54

Please sign in to comment.