Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🌱 Make event handler type aware #1

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion pkg/builder/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,9 @@ func (blder *Builder) WatchesMetadata(object client.Object, eventHandler handler
//
// STOP! Consider using For(...), Owns(...), Watches(...), WatchesMetadata(...) instead.
// This method is only exposed for more advanced use cases, most users should use one of the higher level functions.
//
// Example:
// WatchesRawSource(source.Kind(cache, &corev1.Pod{}), eventHandler, opts...) // ensure that source propagates only valid Pod objects.
func (blder *Builder) WatchesRawSource(src source.Source, eventHandler handler.EventHandler, opts ...WatchesOption) *Builder {
input := WatchesInput{src: src, eventHandler: eventHandler}
for _, opt := range opts {
Expand Down Expand Up @@ -313,7 +316,7 @@ func (blder *Builder) doWatch() error {
}
for _, w := range blder.watchesInput {
// If the source of this watch is of type Kind, project it.
if srcKind, ok := w.src.(*internalsource.Kind); ok {
if srcKind, ok := w.src.(*internalsource.Kind[client.Object]); ok {
typeForSrc, err := blder.project(srcKind.Type, w.objectProjection)
if err != nil {
return err
Expand Down
24 changes: 12 additions & 12 deletions pkg/internal/source/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ import (
var log = logf.RuntimeLog.WithName("source").WithName("EventHandler")

// NewEventHandler creates a new EventHandler.
func NewEventHandler(ctx context.Context, queue workqueue.RateLimitingInterface, handler handler.EventHandler, predicates []predicate.Predicate) *EventHandler {
return &EventHandler{
func NewEventHandler[T client.Object](ctx context.Context, queue workqueue.RateLimitingInterface, handler handler.EventHandler, predicates []predicate.Predicate) *EventHandler[T] {
return &EventHandler[T]{
ctx: ctx,
handler: handler,
queue: queue,
Expand All @@ -43,7 +43,7 @@ func NewEventHandler(ctx context.Context, queue workqueue.RateLimitingInterface,
}

// EventHandler adapts a handler.EventHandler interface to a cache.ResourceEventHandler interface.
type EventHandler struct {
type EventHandler[T client.Object] struct {
// ctx stores the context that created the event handler
// that is used to propagate cancellation signals to each handler function.
ctx context.Context
Expand All @@ -55,7 +55,7 @@ type EventHandler struct {

// HandlerFuncs converts EventHandler to a ResourceEventHandlerFuncs
// TODO: switch to ResourceEventHandlerDetailedFuncs with client-go 1.27
func (e *EventHandler) HandlerFuncs() cache.ResourceEventHandlerFuncs {
func (e *EventHandler[T]) HandlerFuncs() cache.ResourceEventHandlerFuncs {
return cache.ResourceEventHandlerFuncs{
AddFunc: e.OnAdd,
UpdateFunc: e.OnUpdate,
Expand All @@ -64,11 +64,11 @@ func (e *EventHandler) HandlerFuncs() cache.ResourceEventHandlerFuncs {
}

// OnAdd creates CreateEvent and calls Create on EventHandler.
func (e *EventHandler) OnAdd(obj interface{}) {
func (e *EventHandler[T]) OnAdd(obj interface{}) {
c := event.CreateEvent{}

// Pull Object out of the object
if o, ok := obj.(client.Object); ok {
if o, ok := obj.(T); ok {
c.Object = o
} else {
log.Error(nil, "OnAdd missing Object",
Expand All @@ -89,10 +89,10 @@ func (e *EventHandler) OnAdd(obj interface{}) {
}

// OnUpdate creates UpdateEvent and calls Update on EventHandler.
func (e *EventHandler) OnUpdate(oldObj, newObj interface{}) {
func (e *EventHandler[T]) OnUpdate(oldObj, newObj interface{}) {
u := event.UpdateEvent{}

if o, ok := oldObj.(client.Object); ok {
if o, ok := oldObj.(T); ok {
u.ObjectOld = o
} else {
log.Error(nil, "OnUpdate missing ObjectOld",
Expand All @@ -101,7 +101,7 @@ func (e *EventHandler) OnUpdate(oldObj, newObj interface{}) {
}

// Pull Object out of the object
if o, ok := newObj.(client.Object); ok {
if o, ok := newObj.(T); ok {
u.ObjectNew = o
} else {
log.Error(nil, "OnUpdate missing ObjectNew",
Expand All @@ -122,7 +122,7 @@ func (e *EventHandler) OnUpdate(oldObj, newObj interface{}) {
}

// OnDelete creates DeleteEvent and calls Delete on EventHandler.
func (e *EventHandler) OnDelete(obj interface{}) {
func (e *EventHandler[T]) OnDelete(obj interface{}) {
d := event.DeleteEvent{}

// Deal with tombstone events by pulling the object out. Tombstone events wrap the object in a
Expand All @@ -131,7 +131,7 @@ func (e *EventHandler) OnDelete(obj interface{}) {
// This should never happen if we aren't missing events, which we have concluded that we are not
// and made decisions off of this belief. Maybe this shouldn't be here?
var ok bool
if _, ok = obj.(client.Object); !ok {
if _, ok = obj.(T); !ok {
// If the object doesn't have Metadata, assume it is a tombstone object of type DeletedFinalStateUnknown
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
Expand All @@ -149,7 +149,7 @@ func (e *EventHandler) OnDelete(obj interface{}) {
}

// Pull Object out of the object
if o, ok := obj.(client.Object); ok {
if o, ok := obj.(T); ok {
d.Object = o
} else {
log.Error(nil, "OnDelete missing Object",
Expand Down
50 changes: 32 additions & 18 deletions pkg/internal/source/internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (

var _ = Describe("Internal", func() {
var ctx = context.Background()
var instance *internal.EventHandler
var instance *internal.EventHandler[*corev1.Pod]
var funcs, setfuncs *handler.Funcs
var set bool
BeforeEach(func() {
Expand Down Expand Up @@ -74,7 +74,7 @@ var _ = Describe("Internal", func() {
set = true
},
}
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, funcs, nil)
instance = internal.NewEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, funcs, nil)
})

Describe("EventHandler", func() {
Expand All @@ -99,38 +99,38 @@ var _ = Describe("Internal", func() {
})

It("should used Predicates to filter CreateEvents", func() {
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return false }},
})
set = false
instance.OnAdd(pod)
Expect(set).To(BeFalse())

set = false
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }},
})
instance.OnAdd(pod)
Expect(set).To(BeTrue())

set = false
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }},
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return false }},
})
instance.OnAdd(pod)
Expect(set).To(BeFalse())

set = false
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return false }},
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }},
})
instance.OnAdd(pod)
Expect(set).To(BeFalse())

set = false
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }},
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }},
})
Expand All @@ -142,6 +142,10 @@ var _ = Describe("Internal", func() {
instance.OnAdd(&metav1.ObjectMeta{})
})

It("should not call Create EventHandler if an object is not 'that' object", func() {
instance.OnAdd(&corev1.Secret{})
})

It("should not call Create EventHandler if the object does not have metadata", func() {
instance.OnAdd(FooRuntimeObject{})
})
Expand All @@ -157,37 +161,37 @@ var _ = Describe("Internal", func() {

It("should used Predicates to filter UpdateEvents", func() {
set = false
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{UpdateFunc: func(updateEvent event.UpdateEvent) bool { return false }},
})
instance.OnUpdate(pod, newPod)
Expect(set).To(BeFalse())

set = false
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{UpdateFunc: func(event.UpdateEvent) bool { return true }},
})
instance.OnUpdate(pod, newPod)
Expect(set).To(BeTrue())

set = false
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{UpdateFunc: func(event.UpdateEvent) bool { return true }},
predicate.Funcs{UpdateFunc: func(event.UpdateEvent) bool { return false }},
})
instance.OnUpdate(pod, newPod)
Expect(set).To(BeFalse())

set = false
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{UpdateFunc: func(event.UpdateEvent) bool { return false }},
predicate.Funcs{UpdateFunc: func(event.UpdateEvent) bool { return true }},
})
instance.OnUpdate(pod, newPod)
Expect(set).To(BeFalse())

set = false
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }},
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }},
})
Expand All @@ -200,9 +204,15 @@ var _ = Describe("Internal", func() {
instance.OnUpdate(&corev1.Pod{}, &metav1.ObjectMeta{})
})

It("should not call Update EventHandler if an object is not 'that' object", func() {
instance.OnUpdate(&corev1.Secret{}, &corev1.Pod{})
instance.OnUpdate(&corev1.Pod{}, &corev1.ConfigMap{})
})

It("should not call Update EventHandler if the object does not have metadata", func() {
instance.OnUpdate(FooRuntimeObject{}, &corev1.Pod{})
instance.OnUpdate(&corev1.Pod{}, FooRuntimeObject{})
instance.OnUpdate(FooRuntimeObject{}, &corev1.Pod{})
instance.OnUpdate(FooRuntimeObject{}, FooRuntimeObject{})
})

It("should create a DeleteEvent", func() {
Expand All @@ -215,37 +225,37 @@ var _ = Describe("Internal", func() {

It("should used Predicates to filter DeleteEvents", func() {
set = false
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return false }},
})
instance.OnDelete(pod)
Expect(set).To(BeFalse())

set = false
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return true }},
})
instance.OnDelete(pod)
Expect(set).To(BeTrue())

set = false
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return true }},
predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return false }},
})
instance.OnDelete(pod)
Expect(set).To(BeFalse())

set = false
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return false }},
predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return true }},
})
instance.OnDelete(pod)
Expect(set).To(BeFalse())

set = false
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return true }},
predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return true }},
})
Expand All @@ -257,6 +267,10 @@ var _ = Describe("Internal", func() {
instance.OnDelete(&metav1.ObjectMeta{})
})

It("should not call Delete EventHandler if an object is not 'that' object", func() {
instance.OnDelete(&corev1.Secret{})
})

It("should not call Delete EventHandler if the object does not have metadata", func() {
instance.OnDelete(FooRuntimeObject{})
})
Expand Down
17 changes: 9 additions & 8 deletions pkg/internal/source/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"reflect"
"time"

"k8s.io/apimachinery/pkg/api/meta"
Expand All @@ -17,9 +18,9 @@ import (
)

// Kind is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create).
type Kind struct {
type Kind[T client.Object] struct {
// Type is the type of object to watch. e.g. &v1.Pod{}
Type client.Object
Type T

// Cache used to watch APIs
Cache cache.Cache
Expand All @@ -32,9 +33,9 @@ type Kind struct {

// Start is internal and should be called only by the Controller to register an EventHandler with the Informer
// to enqueue reconcile.Requests.
func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface,
func (ks *Kind[T]) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface,
prct ...predicate.Predicate) error {
if ks.Type == nil {
if reflect.DeepEqual(ks.Type, *new(T)) {
return fmt.Errorf("must create Kind with a non-nil object")
}
if ks.Cache == nil {
Expand Down Expand Up @@ -79,7 +80,7 @@ func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue w
return
}

_, err := i.AddEventHandler(NewEventHandler(ctx, queue, handler, prct).HandlerFuncs())
_, err := i.AddEventHandler(NewEventHandler[T](ctx, queue, handler, prct).HandlerFuncs())
if err != nil {
ks.started <- err
return
Expand All @@ -94,16 +95,16 @@ func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue w
return nil
}

func (ks *Kind) String() string {
if ks.Type != nil {
func (ks *Kind[T]) String() string {
if !reflect.DeepEqual(ks.Type, *new(T)) {
return fmt.Sprintf("kind source: %T", ks.Type)
}
return "kind source: unknown type"
}

// WaitForSync implements SyncingSource to allow controllers to wait with starting
// workers until the cache is synced.
func (ks *Kind) WaitForSync(ctx context.Context) error {
func (ks *Kind[T]) WaitForSync(ctx context.Context) error {
select {
case err := <-ks.started:
return err
Expand Down
6 changes: 3 additions & 3 deletions pkg/source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ type SyncingSource interface {
}

// Kind creates a KindSource with the given cache provider.
func Kind(cache cache.Cache, object client.Object) SyncingSource {
return &internal.Kind{Type: object, Cache: cache}
func Kind[T client.Object](cache cache.Cache, object T) SyncingSource {
return &internal.Kind[T]{Type: object, Cache: cache}
}

var _ Source = &Channel{}
Expand Down Expand Up @@ -198,7 +198,7 @@ func (is *Informer) Start(ctx context.Context, handler handler.EventHandler, que
return fmt.Errorf("must specify Informer.Informer")
}

_, err := is.Informer.AddEventHandler(internal.NewEventHandler(ctx, queue, handler, prct).HandlerFuncs())
_, err := is.Informer.AddEventHandler(internal.NewEventHandler[client.Object](ctx, queue, handler, prct).HandlerFuncs())
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/source/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
. "github.com/onsi/gomega"

"sigs.k8s.io/controller-runtime/pkg/cache/informertest"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
Expand Down Expand Up @@ -187,7 +188,7 @@ var _ = Describe("Source", func() {
})

It("should return an error from Start if a type was not provided", func() {
instance := source.Kind(ic, nil)
instance := source.Kind[client.Object](ic, nil)
err := instance.Start(ctx, nil, nil)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("must create Kind with a non-nil object"))
Expand Down