Skip to content

Commit

Permalink
Make event handler type aware
Browse files Browse the repository at this point in the history
Signed-off-by: Danil Grigorev <danil.grigorev@suse.com>
  • Loading branch information
Danil-Grigorev committed Feb 29, 2024
1 parent 5615941 commit ddea5ef
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 57 deletions.
2 changes: 1 addition & 1 deletion pkg/builder/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func (blder *Builder) doWatch() error {
}
for _, w := range blder.watchesInput {
// If the source of this watch is of type Kind, project it.
if srcKind, ok := w.src.(*internalsource.Kind); ok {
if srcKind, ok := w.src.(*internalsource.Kind[client.Object]); ok {
typeForSrc, err := blder.project(srcKind.Type, w.objectProjection)
if err != nil {
return err
Expand Down
24 changes: 12 additions & 12 deletions pkg/internal/source/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ import (
var log = logf.RuntimeLog.WithName("source").WithName("EventHandler")

// NewEventHandler creates a new EventHandler.
func NewEventHandler(ctx context.Context, queue workqueue.RateLimitingInterface, handler handler.EventHandler, predicates []predicate.Predicate) *EventHandler {
return &EventHandler{
func NewEventHandler[T client.Object](ctx context.Context, queue workqueue.RateLimitingInterface, handler handler.EventHandler, predicates []predicate.Predicate) *EventHandler[T] {
return &EventHandler[T]{
ctx: ctx,
handler: handler,
queue: queue,
Expand All @@ -43,7 +43,7 @@ func NewEventHandler(ctx context.Context, queue workqueue.RateLimitingInterface,
}

// EventHandler adapts a handler.EventHandler interface to a cache.ResourceEventHandler interface.
type EventHandler struct {
type EventHandler[T client.Object] struct {
// ctx stores the context that created the event handler
// that is used to propagate cancellation signals to each handler function.
ctx context.Context
Expand All @@ -55,7 +55,7 @@ type EventHandler struct {

// HandlerFuncs converts EventHandler to a ResourceEventHandlerFuncs
// TODO: switch to ResourceEventHandlerDetailedFuncs with client-go 1.27
func (e *EventHandler) HandlerFuncs() cache.ResourceEventHandlerFuncs {
func (e *EventHandler[T]) HandlerFuncs() cache.ResourceEventHandlerFuncs {
return cache.ResourceEventHandlerFuncs{
AddFunc: e.OnAdd,
UpdateFunc: e.OnUpdate,
Expand All @@ -64,11 +64,11 @@ func (e *EventHandler) HandlerFuncs() cache.ResourceEventHandlerFuncs {
}

// OnAdd creates CreateEvent and calls Create on EventHandler.
func (e *EventHandler) OnAdd(obj interface{}) {
func (e *EventHandler[T]) OnAdd(obj interface{}) {
c := event.CreateEvent{}

// Pull Object out of the object
if o, ok := obj.(client.Object); ok {
if o, ok := obj.(T); ok {
c.Object = o
} else {
log.Error(nil, "OnAdd missing Object",
Expand All @@ -89,10 +89,10 @@ func (e *EventHandler) OnAdd(obj interface{}) {
}

// OnUpdate creates UpdateEvent and calls Update on EventHandler.
func (e *EventHandler) OnUpdate(oldObj, newObj interface{}) {
func (e *EventHandler[T]) OnUpdate(oldObj, newObj interface{}) {
u := event.UpdateEvent{}

if o, ok := oldObj.(client.Object); ok {
if o, ok := oldObj.(T); ok {
u.ObjectOld = o
} else {
log.Error(nil, "OnUpdate missing ObjectOld",
Expand All @@ -101,7 +101,7 @@ func (e *EventHandler) OnUpdate(oldObj, newObj interface{}) {
}

// Pull Object out of the object
if o, ok := newObj.(client.Object); ok {
if o, ok := newObj.(T); ok {
u.ObjectNew = o
} else {
log.Error(nil, "OnUpdate missing ObjectNew",
Expand All @@ -122,7 +122,7 @@ func (e *EventHandler) OnUpdate(oldObj, newObj interface{}) {
}

// OnDelete creates DeleteEvent and calls Delete on EventHandler.
func (e *EventHandler) OnDelete(obj interface{}) {
func (e *EventHandler[T]) OnDelete(obj interface{}) {
d := event.DeleteEvent{}

// Deal with tombstone events by pulling the object out. Tombstone events wrap the object in a
Expand All @@ -131,7 +131,7 @@ func (e *EventHandler) OnDelete(obj interface{}) {
// This should never happen if we aren't missing events, which we have concluded that we are not
// and made decisions off of this belief. Maybe this shouldn't be here?
var ok bool
if _, ok = obj.(client.Object); !ok {
if _, ok = obj.(T); !ok {
// If the object doesn't have Metadata, assume it is a tombstone object of type DeletedFinalStateUnknown
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
Expand All @@ -149,7 +149,7 @@ func (e *EventHandler) OnDelete(obj interface{}) {
}

// Pull Object out of the object
if o, ok := obj.(client.Object); ok {
if o, ok := obj.(T); ok {
d.Object = o
} else {
log.Error(nil, "OnDelete missing Object",
Expand Down
58 changes: 24 additions & 34 deletions pkg/internal/source/internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (

var _ = Describe("Internal", func() {
var ctx = context.Background()
var instance *internal.EventHandler
var instance *internal.EventHandler[*corev1.Pod]
var funcs, setfuncs *handler.Funcs
var set bool
BeforeEach(func() {
Expand Down Expand Up @@ -74,7 +74,7 @@ var _ = Describe("Internal", func() {
set = true
},
}
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, funcs, nil)
instance = internal.NewEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, funcs, nil)
})

Describe("EventHandler", func() {
Expand All @@ -99,38 +99,38 @@ var _ = Describe("Internal", func() {
})

It("should used Predicates to filter CreateEvents", func() {
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return false }},
})
set = false
instance.OnAdd(pod)
Expect(set).To(BeFalse())

set = false
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }},
})
instance.OnAdd(pod)
Expect(set).To(BeTrue())

set = false
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }},
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return false }},
})
instance.OnAdd(pod)
Expect(set).To(BeFalse())

set = false
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return false }},
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }},
})
instance.OnAdd(pod)
Expect(set).To(BeFalse())

set = false
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }},
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }},
})
Expand All @@ -142,8 +142,8 @@ var _ = Describe("Internal", func() {
instance.OnAdd(&metav1.ObjectMeta{})
})

It("should not call Create EventHandler if the object does not have metadata", func() {
instance.OnAdd(FooRuntimeObject{})
It("should not call Create EventHandler if an object is not 'that' object", func() {
instance.OnAdd(&corev1.Secret{})
})

It("should create an UpdateEvent", func() {
Expand All @@ -157,37 +157,37 @@ var _ = Describe("Internal", func() {

It("should used Predicates to filter UpdateEvents", func() {
set = false
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{UpdateFunc: func(updateEvent event.UpdateEvent) bool { return false }},
})
instance.OnUpdate(pod, newPod)
Expect(set).To(BeFalse())

set = false
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{UpdateFunc: func(event.UpdateEvent) bool { return true }},
})
instance.OnUpdate(pod, newPod)
Expect(set).To(BeTrue())

set = false
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{UpdateFunc: func(event.UpdateEvent) bool { return true }},
predicate.Funcs{UpdateFunc: func(event.UpdateEvent) bool { return false }},
})
instance.OnUpdate(pod, newPod)
Expect(set).To(BeFalse())

set = false
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{UpdateFunc: func(event.UpdateEvent) bool { return false }},
predicate.Funcs{UpdateFunc: func(event.UpdateEvent) bool { return true }},
})
instance.OnUpdate(pod, newPod)
Expect(set).To(BeFalse())

set = false
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }},
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }},
})
Expand All @@ -200,9 +200,9 @@ var _ = Describe("Internal", func() {
instance.OnUpdate(&corev1.Pod{}, &metav1.ObjectMeta{})
})

It("should not call Update EventHandler if the object does not have metadata", func() {
instance.OnUpdate(FooRuntimeObject{}, &corev1.Pod{})
instance.OnUpdate(&corev1.Pod{}, FooRuntimeObject{})
It("should not call Update EventHandler if an object is not 'that' object", func() {
instance.OnUpdate(&corev1.Secret{}, &corev1.Pod{})
instance.OnUpdate(&corev1.Pod{}, &corev1.ConfigMap{})
})

It("should create a DeleteEvent", func() {
Expand All @@ -215,37 +215,37 @@ var _ = Describe("Internal", func() {

It("should used Predicates to filter DeleteEvents", func() {
set = false
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return false }},
})
instance.OnDelete(pod)
Expect(set).To(BeFalse())

set = false
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return true }},
})
instance.OnDelete(pod)
Expect(set).To(BeTrue())

set = false
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return true }},
predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return false }},
})
instance.OnDelete(pod)
Expect(set).To(BeFalse())

set = false
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return false }},
predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return true }},
})
instance.OnDelete(pod)
Expect(set).To(BeFalse())

set = false
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler[*corev1.Pod](ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return true }},
predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return true }},
})
Expand All @@ -257,8 +257,8 @@ var _ = Describe("Internal", func() {
instance.OnDelete(&metav1.ObjectMeta{})
})

It("should not call Delete EventHandler if the object does not have metadata", func() {
instance.OnDelete(FooRuntimeObject{})
It("should not call Delete EventHandler if an object is not 'that' object", func() {
instance.OnDelete(&corev1.Secret{})
})

It("should create a DeleteEvent from a tombstone", func() {
Expand All @@ -274,16 +274,6 @@ var _ = Describe("Internal", func() {

instance.OnDelete(tombstone)
})

It("should ignore tombstone objects without meta", func() {
tombstone := cache.DeletedFinalStateUnknown{Obj: Foo{}}
instance.OnDelete(tombstone)
})
It("should ignore objects without meta", func() {
instance.OnAdd(Foo{})
instance.OnUpdate(Foo{}, Foo{})
instance.OnDelete(Foo{})
})
})
})

Expand Down
17 changes: 9 additions & 8 deletions pkg/internal/source/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"reflect"
"time"

"k8s.io/apimachinery/pkg/api/meta"
Expand All @@ -17,9 +18,9 @@ import (
)

// Kind is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create).
type Kind struct {
type Kind[T client.Object] struct {
// Type is the type of object to watch. e.g. &v1.Pod{}
Type client.Object
Type T

// Cache used to watch APIs
Cache cache.Cache
Expand All @@ -32,9 +33,9 @@ type Kind struct {

// Start is internal and should be called only by the Controller to register an EventHandler with the Informer
// to enqueue reconcile.Requests.
func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface,
func (ks *Kind[T]) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface,
prct ...predicate.Predicate) error {
if ks.Type == nil {
if reflect.DeepEqual(ks.Type, *new(T)) {
return fmt.Errorf("must create Kind with a non-nil object")
}
if ks.Cache == nil {
Expand Down Expand Up @@ -79,7 +80,7 @@ func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue w
return
}

_, err := i.AddEventHandler(NewEventHandler(ctx, queue, handler, prct).HandlerFuncs())
_, err := i.AddEventHandler(NewEventHandler[T](ctx, queue, handler, prct).HandlerFuncs())
if err != nil {
ks.started <- err
return
Expand All @@ -94,16 +95,16 @@ func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue w
return nil
}

func (ks *Kind) String() string {
if ks.Type != nil {
func (ks *Kind[T]) String() string {
if !reflect.DeepEqual(ks.Type, *new(T)) {
return fmt.Sprintf("kind source: %T", ks.Type)
}
return "kind source: unknown type"
}

// WaitForSync implements SyncingSource to allow controllers to wait with starting
// workers until the cache is synced.
func (ks *Kind) WaitForSync(ctx context.Context) error {
func (ks *Kind[T]) WaitForSync(ctx context.Context) error {
select {
case err := <-ks.started:
return err
Expand Down
9 changes: 7 additions & 2 deletions pkg/source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,12 @@ type SyncingSource interface {

// Kind creates a KindSource with the given cache provider.
func Kind(cache cache.Cache, object client.Object) SyncingSource {
return &internal.Kind{Type: object, Cache: cache}
return &internal.Kind[client.Object]{Type: object, Cache: cache}
}

// ObjectKind creates a typed KindSource with the given cache provider.
func ObjectKind[T client.Object](cache cache.Cache, object T) SyncingSource {
return &internal.Kind[T]{Type: object, Cache: cache}
}

var _ Source = &Channel{}
Expand Down Expand Up @@ -198,7 +203,7 @@ func (is *Informer) Start(ctx context.Context, handler handler.EventHandler, que
return fmt.Errorf("must specify Informer.Informer")
}

_, err := is.Informer.AddEventHandler(internal.NewEventHandler(ctx, queue, handler, prct).HandlerFuncs())
_, err := is.Informer.AddEventHandler(internal.NewEventHandler[client.Object](ctx, queue, handler, prct).HandlerFuncs())
if err != nil {
return err
}
Expand Down

0 comments on commit ddea5ef

Please sign in to comment.