diff --git a/pkg/builder/controller_test.go b/pkg/builder/controller_test.go index bdb2512767..3b3aba30ca 100644 --- a/pkg/builder/controller_test.go +++ b/pkg/builder/controller_test.go @@ -497,7 +497,7 @@ var _ = Describe("application", func() { For(&appsv1.Deployment{}, OnlyMetadata). Owns(&appsv1.ReplicaSet{}, OnlyMetadata). Watches(&appsv1.StatefulSet{}, - handler.EnqueueRequestsFromMapFunc(func(o client.Object) []reconcile.Request { + handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, o client.Object) []reconcile.Request { defer GinkgoRecover() ometa := o.(*metav1.PartialObjectMetadata) diff --git a/pkg/handler/enqueue.go b/pkg/handler/enqueue.go index e6d3a4eaab..c72b2e1ebb 100644 --- a/pkg/handler/enqueue.go +++ b/pkg/handler/enqueue.go @@ -17,6 +17,8 @@ limitations under the License. package handler import ( + "context" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/event" @@ -36,7 +38,7 @@ var _ EventHandler = &EnqueueRequestForObject{} type EnqueueRequestForObject struct{} // Create implements EventHandler. -func (e *EnqueueRequestForObject) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) { +func (e *EnqueueRequestForObject) Create(ctx context.Context, evt event.CreateEvent, q workqueue.RateLimitingInterface) { if evt.Object == nil { enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt) return @@ -48,7 +50,7 @@ func (e *EnqueueRequestForObject) Create(evt event.CreateEvent, q workqueue.Rate } // Update implements EventHandler. -func (e *EnqueueRequestForObject) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) { +func (e *EnqueueRequestForObject) Update(ctx context.Context, evt event.UpdateEvent, q workqueue.RateLimitingInterface) { switch { case evt.ObjectNew != nil: q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ @@ -66,7 +68,7 @@ func (e *EnqueueRequestForObject) Update(evt event.UpdateEvent, q workqueue.Rate } // Delete implements EventHandler. -func (e *EnqueueRequestForObject) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) { +func (e *EnqueueRequestForObject) Delete(ctx context.Context, evt event.DeleteEvent, q workqueue.RateLimitingInterface) { if evt.Object == nil { enqueueLog.Error(nil, "DeleteEvent received with no metadata", "event", evt) return @@ -78,7 +80,7 @@ func (e *EnqueueRequestForObject) Delete(evt event.DeleteEvent, q workqueue.Rate } // Generic implements EventHandler. -func (e *EnqueueRequestForObject) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) { +func (e *EnqueueRequestForObject) Generic(ctx context.Context, evt event.GenericEvent, q workqueue.RateLimitingInterface) { if evt.Object == nil { enqueueLog.Error(nil, "GenericEvent received with no metadata", "event", evt) return diff --git a/pkg/handler/enqueue_mapped.go b/pkg/handler/enqueue_mapped.go index bdbc24bfa6..b55fdde6ba 100644 --- a/pkg/handler/enqueue_mapped.go +++ b/pkg/handler/enqueue_mapped.go @@ -17,6 +17,8 @@ limitations under the License. package handler import ( + "context" + "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" @@ -25,7 +27,7 @@ import ( // MapFunc is the signature required for enqueueing requests from a generic function. // This type is usually used with EnqueueRequestsFromMapFunc when registering an event handler. -type MapFunc func(client.Object) []reconcile.Request +type MapFunc func(context.Context, client.Object) []reconcile.Request // EnqueueRequestsFromMapFunc enqueues Requests by running a transformation function that outputs a collection // of reconcile.Requests on each Event. The reconcile.Requests may be for an arbitrary set of objects @@ -51,32 +53,32 @@ type enqueueRequestsFromMapFunc struct { } // Create implements EventHandler. -func (e *enqueueRequestsFromMapFunc) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) { +func (e *enqueueRequestsFromMapFunc) Create(ctx context.Context, evt event.CreateEvent, q workqueue.RateLimitingInterface) { reqs := map[reconcile.Request]empty{} - e.mapAndEnqueue(q, evt.Object, reqs) + e.mapAndEnqueue(ctx, q, evt.Object, reqs) } // Update implements EventHandler. -func (e *enqueueRequestsFromMapFunc) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) { +func (e *enqueueRequestsFromMapFunc) Update(ctx context.Context, evt event.UpdateEvent, q workqueue.RateLimitingInterface) { reqs := map[reconcile.Request]empty{} - e.mapAndEnqueue(q, evt.ObjectOld, reqs) - e.mapAndEnqueue(q, evt.ObjectNew, reqs) + e.mapAndEnqueue(ctx, q, evt.ObjectOld, reqs) + e.mapAndEnqueue(ctx, q, evt.ObjectNew, reqs) } // Delete implements EventHandler. -func (e *enqueueRequestsFromMapFunc) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) { +func (e *enqueueRequestsFromMapFunc) Delete(ctx context.Context, evt event.DeleteEvent, q workqueue.RateLimitingInterface) { reqs := map[reconcile.Request]empty{} - e.mapAndEnqueue(q, evt.Object, reqs) + e.mapAndEnqueue(ctx, q, evt.Object, reqs) } // Generic implements EventHandler. -func (e *enqueueRequestsFromMapFunc) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) { +func (e *enqueueRequestsFromMapFunc) Generic(ctx context.Context, evt event.GenericEvent, q workqueue.RateLimitingInterface) { reqs := map[reconcile.Request]empty{} - e.mapAndEnqueue(q, evt.Object, reqs) + e.mapAndEnqueue(ctx, q, evt.Object, reqs) } -func (e *enqueueRequestsFromMapFunc) mapAndEnqueue(q workqueue.RateLimitingInterface, object client.Object, reqs map[reconcile.Request]empty) { - for _, req := range e.toRequests(object) { +func (e *enqueueRequestsFromMapFunc) mapAndEnqueue(ctx context.Context, q workqueue.RateLimitingInterface, object client.Object, reqs map[reconcile.Request]empty) { + for _, req := range e.toRequests(ctx, object) { _, ok := reqs[req] if !ok { q.Add(req) diff --git a/pkg/handler/enqueue_owner.go b/pkg/handler/enqueue_owner.go index a7cf6f2c4b..02e7d756f8 100644 --- a/pkg/handler/enqueue_owner.go +++ b/pkg/handler/enqueue_owner.go @@ -17,6 +17,7 @@ limitations under the License. package handler import ( + "context" "fmt" "k8s.io/apimachinery/pkg/api/meta" @@ -82,7 +83,7 @@ type enqueueRequestForOwner struct { } // Create implements EventHandler. -func (e *enqueueRequestForOwner) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) { +func (e *enqueueRequestForOwner) Create(ctx context.Context, evt event.CreateEvent, q workqueue.RateLimitingInterface) { reqs := map[reconcile.Request]empty{} e.getOwnerReconcileRequest(evt.Object, reqs) for req := range reqs { @@ -91,7 +92,7 @@ func (e *enqueueRequestForOwner) Create(evt event.CreateEvent, q workqueue.RateL } // Update implements EventHandler. -func (e *enqueueRequestForOwner) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) { +func (e *enqueueRequestForOwner) Update(ctx context.Context, evt event.UpdateEvent, q workqueue.RateLimitingInterface) { reqs := map[reconcile.Request]empty{} e.getOwnerReconcileRequest(evt.ObjectOld, reqs) e.getOwnerReconcileRequest(evt.ObjectNew, reqs) @@ -101,7 +102,7 @@ func (e *enqueueRequestForOwner) Update(evt event.UpdateEvent, q workqueue.RateL } // Delete implements EventHandler. -func (e *enqueueRequestForOwner) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) { +func (e *enqueueRequestForOwner) Delete(ctx context.Context, evt event.DeleteEvent, q workqueue.RateLimitingInterface) { reqs := map[reconcile.Request]empty{} e.getOwnerReconcileRequest(evt.Object, reqs) for req := range reqs { @@ -110,7 +111,7 @@ func (e *enqueueRequestForOwner) Delete(evt event.DeleteEvent, q workqueue.RateL } // Generic implements EventHandler. -func (e *enqueueRequestForOwner) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) { +func (e *enqueueRequestForOwner) Generic(ctx context.Context, evt event.GenericEvent, q workqueue.RateLimitingInterface) { reqs := map[reconcile.Request]empty{} e.getOwnerReconcileRequest(evt.Object, reqs) for req := range reqs { diff --git a/pkg/handler/eventhandler.go b/pkg/handler/eventhandler.go index 8652d22d72..2f380f4fc4 100644 --- a/pkg/handler/eventhandler.go +++ b/pkg/handler/eventhandler.go @@ -17,6 +17,8 @@ limitations under the License. package handler import ( + "context" + "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/event" ) @@ -41,17 +43,17 @@ import ( // Most users shouldn't need to implement their own EventHandler. type EventHandler interface { // Create is called in response to an create event - e.g. Pod Creation. - Create(event.CreateEvent, workqueue.RateLimitingInterface) + Create(context.Context, event.CreateEvent, workqueue.RateLimitingInterface) // Update is called in response to an update event - e.g. Pod Updated. - Update(event.UpdateEvent, workqueue.RateLimitingInterface) + Update(context.Context, event.UpdateEvent, workqueue.RateLimitingInterface) // Delete is called in response to a delete event - e.g. Pod Deleted. - Delete(event.DeleteEvent, workqueue.RateLimitingInterface) + Delete(context.Context, event.DeleteEvent, workqueue.RateLimitingInterface) // Generic is called in response to an event of an unknown type or a synthetic event triggered as a cron or // external trigger request - e.g. reconcile Autoscaling, or a Webhook. - Generic(event.GenericEvent, workqueue.RateLimitingInterface) + Generic(context.Context, event.GenericEvent, workqueue.RateLimitingInterface) } var _ EventHandler = Funcs{} @@ -60,45 +62,45 @@ var _ EventHandler = Funcs{} type Funcs struct { // Create is called in response to an add event. Defaults to no-op. // RateLimitingInterface is used to enqueue reconcile.Requests. - CreateFunc func(event.CreateEvent, workqueue.RateLimitingInterface) + CreateFunc func(context.Context, event.CreateEvent, workqueue.RateLimitingInterface) // Update is called in response to an update event. Defaults to no-op. // RateLimitingInterface is used to enqueue reconcile.Requests. - UpdateFunc func(event.UpdateEvent, workqueue.RateLimitingInterface) + UpdateFunc func(context.Context, event.UpdateEvent, workqueue.RateLimitingInterface) // Delete is called in response to a delete event. Defaults to no-op. // RateLimitingInterface is used to enqueue reconcile.Requests. - DeleteFunc func(event.DeleteEvent, workqueue.RateLimitingInterface) + DeleteFunc func(context.Context, event.DeleteEvent, workqueue.RateLimitingInterface) // GenericFunc is called in response to a generic event. Defaults to no-op. // RateLimitingInterface is used to enqueue reconcile.Requests. - GenericFunc func(event.GenericEvent, workqueue.RateLimitingInterface) + GenericFunc func(context.Context, event.GenericEvent, workqueue.RateLimitingInterface) } // Create implements EventHandler. -func (h Funcs) Create(e event.CreateEvent, q workqueue.RateLimitingInterface) { +func (h Funcs) Create(ctx context.Context, e event.CreateEvent, q workqueue.RateLimitingInterface) { if h.CreateFunc != nil { - h.CreateFunc(e, q) + h.CreateFunc(ctx, e, q) } } // Delete implements EventHandler. -func (h Funcs) Delete(e event.DeleteEvent, q workqueue.RateLimitingInterface) { +func (h Funcs) Delete(ctx context.Context, e event.DeleteEvent, q workqueue.RateLimitingInterface) { if h.DeleteFunc != nil { - h.DeleteFunc(e, q) + h.DeleteFunc(ctx, e, q) } } // Update implements EventHandler. -func (h Funcs) Update(e event.UpdateEvent, q workqueue.RateLimitingInterface) { +func (h Funcs) Update(ctx context.Context, e event.UpdateEvent, q workqueue.RateLimitingInterface) { if h.UpdateFunc != nil { - h.UpdateFunc(e, q) + h.UpdateFunc(ctx, e, q) } } // Generic implements EventHandler. -func (h Funcs) Generic(e event.GenericEvent, q workqueue.RateLimitingInterface) { +func (h Funcs) Generic(ctx context.Context, e event.GenericEvent, q workqueue.RateLimitingInterface) { if h.GenericFunc != nil { - h.GenericFunc(e, q) + h.GenericFunc(ctx, e, q) } } diff --git a/pkg/handler/eventhandler_test.go b/pkg/handler/eventhandler_test.go index 6eeb26854d..f8dd1c5ddb 100644 --- a/pkg/handler/eventhandler_test.go +++ b/pkg/handler/eventhandler_test.go @@ -17,6 +17,8 @@ package handler_test import ( + "context" + . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" appsv1 "k8s.io/api/apps/v1" @@ -37,6 +39,7 @@ import ( ) var _ = Describe("Eventhandler", func() { + var ctx = context.Background() var q workqueue.RateLimitingInterface var instance handler.EnqueueRequestForObject var pod *corev1.Pod @@ -58,7 +61,7 @@ var _ = Describe("Eventhandler", func() { evt := event.CreateEvent{ Object: pod, } - instance.Create(evt, q) + instance.Create(ctx, evt, q) Expect(q.Len()).To(Equal(1)) i, _ := q.Get() @@ -72,7 +75,7 @@ var _ = Describe("Eventhandler", func() { evt := event.DeleteEvent{ Object: pod, } - instance.Delete(evt, q) + instance.Delete(ctx, evt, q) Expect(q.Len()).To(Equal(1)) i, _ := q.Get() @@ -92,7 +95,7 @@ var _ = Describe("Eventhandler", func() { ObjectOld: pod, ObjectNew: newPod, } - instance.Update(evt, q) + instance.Update(ctx, evt, q) Expect(q.Len()).To(Equal(1)) i, _ := q.Get() @@ -106,7 +109,7 @@ var _ = Describe("Eventhandler", func() { evt := event.GenericEvent{ Object: pod, } - instance.Generic(evt, q) + instance.Generic(ctx, evt, q) Expect(q.Len()).To(Equal(1)) i, _ := q.Get() Expect(i).NotTo(BeNil()) @@ -120,7 +123,7 @@ var _ = Describe("Eventhandler", func() { evt := event.CreateEvent{ Object: nil, } - instance.Create(evt, q) + instance.Create(ctx, evt, q) Expect(q.Len()).To(Equal(0)) }) @@ -133,7 +136,7 @@ var _ = Describe("Eventhandler", func() { ObjectNew: newPod, ObjectOld: nil, } - instance.Update(evt, q) + instance.Update(ctx, evt, q) Expect(q.Len()).To(Equal(1)) i, _ := q.Get() Expect(i).NotTo(BeNil()) @@ -143,7 +146,7 @@ var _ = Describe("Eventhandler", func() { evt.ObjectNew = nil evt.ObjectOld = pod - instance.Update(evt, q) + instance.Update(ctx, evt, q) Expect(q.Len()).To(Equal(1)) i, _ = q.Get() Expect(i).NotTo(BeNil()) @@ -156,7 +159,7 @@ var _ = Describe("Eventhandler", func() { evt := event.DeleteEvent{ Object: nil, } - instance.Delete(evt, q) + instance.Delete(ctx, evt, q) Expect(q.Len()).To(Equal(0)) }) @@ -164,7 +167,7 @@ var _ = Describe("Eventhandler", func() { evt := event.GenericEvent{ Object: nil, } - instance.Generic(evt, q) + instance.Generic(ctx, evt, q) Expect(q.Len()).To(Equal(0)) }) }) @@ -173,7 +176,7 @@ var _ = Describe("Eventhandler", func() { Describe("EnqueueRequestsFromMapFunc", func() { It("should enqueue a Request with the function applied to the CreateEvent.", func() { req := []reconcile.Request{} - instance := handler.EnqueueRequestsFromMapFunc(func(a client.Object) []reconcile.Request { + instance := handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a client.Object) []reconcile.Request { defer GinkgoRecover() Expect(a).To(Equal(pod)) req = []reconcile.Request{ @@ -190,7 +193,7 @@ var _ = Describe("Eventhandler", func() { evt := event.CreateEvent{ Object: pod, } - instance.Create(evt, q) + instance.Create(ctx, evt, q) Expect(q.Len()).To(Equal(2)) i1, _ := q.Get() @@ -205,7 +208,7 @@ var _ = Describe("Eventhandler", func() { It("should enqueue a Request with the function applied to the DeleteEvent.", func() { req := []reconcile.Request{} - instance := handler.EnqueueRequestsFromMapFunc(func(a client.Object) []reconcile.Request { + instance := handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a client.Object) []reconcile.Request { defer GinkgoRecover() Expect(a).To(Equal(pod)) req = []reconcile.Request{ @@ -222,7 +225,7 @@ var _ = Describe("Eventhandler", func() { evt := event.DeleteEvent{ Object: pod, } - instance.Delete(evt, q) + instance.Delete(ctx, evt, q) Expect(q.Len()).To(Equal(2)) i1, _ := q.Get() @@ -241,7 +244,7 @@ var _ = Describe("Eventhandler", func() { req := []reconcile.Request{} - instance := handler.EnqueueRequestsFromMapFunc(func(a client.Object) []reconcile.Request { + instance := handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a client.Object) []reconcile.Request { defer GinkgoRecover() req = []reconcile.Request{ { @@ -258,7 +261,7 @@ var _ = Describe("Eventhandler", func() { ObjectOld: pod, ObjectNew: newPod, } - instance.Update(evt, q) + instance.Update(ctx, evt, q) Expect(q.Len()).To(Equal(2)) i, _ := q.Get() @@ -270,7 +273,7 @@ var _ = Describe("Eventhandler", func() { It("should enqueue a Request with the function applied to the GenericEvent.", func() { req := []reconcile.Request{} - instance := handler.EnqueueRequestsFromMapFunc(func(a client.Object) []reconcile.Request { + instance := handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a client.Object) []reconcile.Request { defer GinkgoRecover() Expect(a).To(Equal(pod)) req = []reconcile.Request{ @@ -287,7 +290,7 @@ var _ = Describe("Eventhandler", func() { evt := event.GenericEvent{ Object: pod, } - instance.Generic(evt, q) + instance.Generic(ctx, evt, q) Expect(q.Len()).To(Equal(2)) i1, _ := q.Get() @@ -315,7 +318,7 @@ var _ = Describe("Eventhandler", func() { evt := event.CreateEvent{ Object: pod, } - instance.Create(evt, q) + instance.Create(ctx, evt, q) Expect(q.Len()).To(Equal(1)) i, _ := q.Get() @@ -336,7 +339,7 @@ var _ = Describe("Eventhandler", func() { evt := event.DeleteEvent{ Object: pod, } - instance.Delete(evt, q) + instance.Delete(ctx, evt, q) Expect(q.Len()).To(Equal(1)) i, _ := q.Get() @@ -369,7 +372,7 @@ var _ = Describe("Eventhandler", func() { ObjectOld: pod, ObjectNew: newPod, } - instance.Update(evt, q) + instance.Update(ctx, evt, q) Expect(q.Len()).To(Equal(2)) i1, _ := q.Get() @@ -406,7 +409,7 @@ var _ = Describe("Eventhandler", func() { ObjectOld: pod, ObjectNew: newPod, } - instance.Update(evt, q) + instance.Update(ctx, evt, q) Expect(q.Len()).To(Equal(1)) i, _ := q.Get() @@ -426,7 +429,7 @@ var _ = Describe("Eventhandler", func() { evt := event.GenericEvent{ Object: pod, } - instance.Generic(evt, q) + instance.Generic(ctx, evt, q) Expect(q.Len()).To(Equal(1)) i, _ := q.Get() @@ -451,7 +454,7 @@ var _ = Describe("Eventhandler", func() { evt := event.CreateEvent{ Object: pod, } - instance.Create(evt, q) + instance.Create(ctx, evt, q) Expect(q.Len()).To(Equal(0)) }) @@ -468,7 +471,7 @@ var _ = Describe("Eventhandler", func() { evt := event.CreateEvent{ Object: pod, } - instance.Create(evt, q) + instance.Create(ctx, evt, q) Expect(q.Len()).To(Equal(1)) i, _ := q.Get() @@ -488,7 +491,7 @@ var _ = Describe("Eventhandler", func() { evt := event.CreateEvent{ Object: pod, } - instance.Create(evt, q) + instance.Create(ctx, evt, q) Expect(q.Len()).To(Equal(1)) i, _ := q.Get() @@ -502,7 +505,7 @@ var _ = Describe("Eventhandler", func() { evt := event.CreateEvent{ Object: pod, } - instance.Create(evt, q) + instance.Create(ctx, evt, q) Expect(q.Len()).To(Equal(0)) }) @@ -542,7 +545,7 @@ var _ = Describe("Eventhandler", func() { evt := event.CreateEvent{ Object: pod, } - instance.Create(evt, q) + instance.Create(ctx, evt, q) Expect(q.Len()).To(Equal(1)) i, _ := q.Get() Expect(i).To(Equal(reconcile.Request{ @@ -571,7 +574,7 @@ var _ = Describe("Eventhandler", func() { evt := event.CreateEvent{ Object: pod, } - instance.Create(evt, q) + instance.Create(ctx, evt, q) Expect(q.Len()).To(Equal(0)) }) @@ -580,7 +583,7 @@ var _ = Describe("Eventhandler", func() { evt := event.CreateEvent{ Object: pod, } - instance.Create(evt, q) + instance.Create(ctx, evt, q) Expect(q.Len()).To(Equal(0)) }) }) @@ -608,7 +611,7 @@ var _ = Describe("Eventhandler", func() { evt := event.CreateEvent{ Object: pod, } - instance.Create(evt, q) + instance.Create(ctx, evt, q) Expect(q.Len()).To(Equal(3)) i1, _ := q.Get() @@ -638,7 +641,7 @@ var _ = Describe("Eventhandler", func() { evt := event.CreateEvent{ Object: nil, } - instance.Create(evt, q) + instance.Create(ctx, evt, q) Expect(q.Len()).To(Equal(0)) }) }) @@ -664,7 +667,7 @@ var _ = Describe("Eventhandler", func() { evt := event.CreateEvent{ Object: pod, } - instance.Create(evt, q) + instance.Create(ctx, evt, q) Expect(q.Len()).To(Equal(0)) }) }) @@ -672,19 +675,19 @@ var _ = Describe("Eventhandler", func() { Describe("Funcs", func() { failingFuncs := handler.Funcs{ - CreateFunc: func(event.CreateEvent, workqueue.RateLimitingInterface) { + CreateFunc: func(context.Context, event.CreateEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Did not expect CreateEvent to be called.") }, - DeleteFunc: func(event.DeleteEvent, workqueue.RateLimitingInterface) { + DeleteFunc: func(context.Context, event.DeleteEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Did not expect DeleteEvent to be called.") }, - UpdateFunc: func(event.UpdateEvent, workqueue.RateLimitingInterface) { + UpdateFunc: func(context.Context, event.UpdateEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Did not expect UpdateEvent to be called.") }, - GenericFunc: func(event.GenericEvent, workqueue.RateLimitingInterface) { + GenericFunc: func(context.Context, event.GenericEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Did not expect GenericEvent to be called.") }, @@ -695,12 +698,12 @@ var _ = Describe("Eventhandler", func() { evt := event.CreateEvent{ Object: pod, } - instance.CreateFunc = func(evt2 event.CreateEvent, q2 workqueue.RateLimitingInterface) { + instance.CreateFunc = func(ctx context.Context, evt2 event.CreateEvent, q2 workqueue.RateLimitingInterface) { defer GinkgoRecover() Expect(q2).To(Equal(q)) Expect(evt2).To(Equal(evt)) } - instance.Create(evt, q) + instance.Create(ctx, evt, q) }) It("should NOT call CreateFunc for a CreateEvent if NOT provided.", func() { @@ -709,7 +712,7 @@ var _ = Describe("Eventhandler", func() { evt := event.CreateEvent{ Object: pod, } - instance.Create(evt, q) + instance.Create(ctx, evt, q) }) It("should call UpdateFunc for an UpdateEvent if provided.", func() { @@ -722,13 +725,13 @@ var _ = Describe("Eventhandler", func() { } instance := failingFuncs - instance.UpdateFunc = func(evt2 event.UpdateEvent, q2 workqueue.RateLimitingInterface) { + instance.UpdateFunc = func(ctx context.Context, evt2 event.UpdateEvent, q2 workqueue.RateLimitingInterface) { defer GinkgoRecover() Expect(q2).To(Equal(q)) Expect(evt2).To(Equal(evt)) } - instance.Update(evt, q) + instance.Update(ctx, evt, q) }) It("should NOT call UpdateFunc for an UpdateEvent if NOT provided.", func() { @@ -739,7 +742,7 @@ var _ = Describe("Eventhandler", func() { ObjectOld: pod, ObjectNew: newPod, } - instance.Update(evt, q) + instance.Update(ctx, evt, q) }) It("should call DeleteFunc for a DeleteEvent if provided.", func() { @@ -747,12 +750,12 @@ var _ = Describe("Eventhandler", func() { evt := event.DeleteEvent{ Object: pod, } - instance.DeleteFunc = func(evt2 event.DeleteEvent, q2 workqueue.RateLimitingInterface) { + instance.DeleteFunc = func(ctx context.Context, evt2 event.DeleteEvent, q2 workqueue.RateLimitingInterface) { defer GinkgoRecover() Expect(q2).To(Equal(q)) Expect(evt2).To(Equal(evt)) } - instance.Delete(evt, q) + instance.Delete(ctx, evt, q) }) It("should NOT call DeleteFunc for a DeleteEvent if NOT provided.", func() { @@ -761,7 +764,7 @@ var _ = Describe("Eventhandler", func() { evt := event.DeleteEvent{ Object: pod, } - instance.Delete(evt, q) + instance.Delete(ctx, evt, q) }) It("should call GenericFunc for a GenericEvent if provided.", func() { @@ -769,12 +772,12 @@ var _ = Describe("Eventhandler", func() { evt := event.GenericEvent{ Object: pod, } - instance.GenericFunc = func(evt2 event.GenericEvent, q2 workqueue.RateLimitingInterface) { + instance.GenericFunc = func(ctx context.Context, evt2 event.GenericEvent, q2 workqueue.RateLimitingInterface) { defer GinkgoRecover() Expect(q2).To(Equal(q)) Expect(evt2).To(Equal(evt)) } - instance.Generic(evt, q) + instance.Generic(ctx, evt, q) }) It("should NOT call GenericFunc for a GenericEvent if NOT provided.", func() { @@ -783,7 +786,7 @@ var _ = Describe("Eventhandler", func() { evt := event.GenericEvent{ Object: pod, } - instance.Generic(evt, q) + instance.Generic(ctx, evt, q) }) }) }) diff --git a/pkg/handler/example_test.go b/pkg/handler/example_test.go index 9e1ad0a1d4..ad07e4e31d 100644 --- a/pkg/handler/example_test.go +++ b/pkg/handler/example_test.go @@ -17,6 +17,8 @@ limitations under the License. package handler_test import ( + "context" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" @@ -65,7 +67,7 @@ func ExampleEnqueueRequestsFromMapFunc() { // controller is a controller.controller err := c.Watch( source.Kind(mgr.GetCache(), &appsv1.Deployment{}), - handler.EnqueueRequestsFromMapFunc(func(a client.Object) []reconcile.Request { + handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a client.Object) []reconcile.Request { return []reconcile.Request{ {NamespacedName: types.NamespacedName{ Name: a.GetName() + "-1", @@ -89,25 +91,25 @@ func ExampleFuncs() { err := c.Watch( source.Kind(mgr.GetCache(), &corev1.Pod{}), handler.Funcs{ - CreateFunc: func(e event.CreateEvent, q workqueue.RateLimitingInterface) { + CreateFunc: func(ctx context.Context, e event.CreateEvent, q workqueue.RateLimitingInterface) { q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ Name: e.Object.GetName(), Namespace: e.Object.GetNamespace(), }}) }, - UpdateFunc: func(e event.UpdateEvent, q workqueue.RateLimitingInterface) { + UpdateFunc: func(ctx context.Context, e event.UpdateEvent, q workqueue.RateLimitingInterface) { q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ Name: e.ObjectNew.GetName(), Namespace: e.ObjectNew.GetNamespace(), }}) }, - DeleteFunc: func(e event.DeleteEvent, q workqueue.RateLimitingInterface) { + DeleteFunc: func(ctx context.Context, e event.DeleteEvent, q workqueue.RateLimitingInterface) { q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ Name: e.Object.GetName(), Namespace: e.Object.GetNamespace(), }}) }, - GenericFunc: func(e event.GenericEvent, q workqueue.RateLimitingInterface) { + GenericFunc: func(ctx context.Context, e event.GenericEvent, q workqueue.RateLimitingInterface) { q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ Name: e.Object.GetName(), Namespace: e.Object.GetNamespace(), diff --git a/pkg/internal/controller/controller_test.go b/pkg/internal/controller/controller_test.go index 83a9e61a6d..9024556fd0 100644 --- a/pkg/internal/controller/controller_test.go +++ b/pkg/internal/controller/controller_test.go @@ -235,7 +235,7 @@ var _ = Describe("controller", func() { ctrl.startWatches = []watchDescription{{ src: ins, handler: handler.Funcs{ - GenericFunc: func(evt event.GenericEvent, q workqueue.RateLimitingInterface) { + GenericFunc: func(ctx context.Context, evt event.GenericEvent, q workqueue.RateLimitingInterface) { defer GinkgoRecover() close(processed) }, diff --git a/pkg/internal/source/eventsource.go b/pkg/internal/source/event_handler.go similarity index 72% rename from pkg/internal/source/eventsource.go rename to pkg/internal/source/event_handler.go index f0cfe212ed..8449a9dc75 100644 --- a/pkg/internal/source/eventsource.go +++ b/pkg/internal/source/event_handler.go @@ -17,6 +17,7 @@ limitations under the License. package internal import ( + "context" "fmt" "k8s.io/client-go/tools/cache" @@ -31,17 +32,31 @@ import ( var log = logf.RuntimeLog.WithName("source").WithName("EventHandler") -var _ cache.ResourceEventHandler = EventHandler{} +var _ cache.ResourceEventHandler = &EventHandler{} + +// NewEventHandler creates a new EventHandler. +func NewEventHandler(ctx context.Context, queue workqueue.RateLimitingInterface, handler handler.EventHandler, predicates []predicate.Predicate) *EventHandler { + return &EventHandler{ + ctx: ctx, + handler: handler, + queue: queue, + predicates: predicates, + } +} // EventHandler adapts a handler.EventHandler interface to a cache.ResourceEventHandler interface. type EventHandler struct { - EventHandler handler.EventHandler - Queue workqueue.RateLimitingInterface - Predicates []predicate.Predicate + // ctx stores the context that created the event handler + // that is used to propagate cancellation signals to each handler function. + ctx context.Context + + handler handler.EventHandler + queue workqueue.RateLimitingInterface + predicates []predicate.Predicate } // OnAdd creates CreateEvent and calls Create on EventHandler. -func (e EventHandler) OnAdd(obj interface{}) { +func (e *EventHandler) OnAdd(obj interface{}) { c := event.CreateEvent{} // Pull Object out of the object @@ -53,18 +68,20 @@ func (e EventHandler) OnAdd(obj interface{}) { return } - for _, p := range e.Predicates { + for _, p := range e.predicates { if !p.Create(c) { return } } // Invoke create handler - e.EventHandler.Create(c, e.Queue) + ctx, cancel := context.WithCancel(e.ctx) + defer cancel() + e.handler.Create(ctx, c, e.queue) } // OnUpdate creates UpdateEvent and calls Update on EventHandler. -func (e EventHandler) OnUpdate(oldObj, newObj interface{}) { +func (e *EventHandler) OnUpdate(oldObj, newObj interface{}) { u := event.UpdateEvent{} if o, ok := oldObj.(client.Object); ok { @@ -84,18 +101,20 @@ func (e EventHandler) OnUpdate(oldObj, newObj interface{}) { return } - for _, p := range e.Predicates { + for _, p := range e.predicates { if !p.Update(u) { return } } // Invoke update handler - e.EventHandler.Update(u, e.Queue) + ctx, cancel := context.WithCancel(e.ctx) + defer cancel() + e.handler.Update(ctx, u, e.queue) } // OnDelete creates DeleteEvent and calls Delete on EventHandler. -func (e EventHandler) OnDelete(obj interface{}) { +func (e *EventHandler) OnDelete(obj interface{}) { d := event.DeleteEvent{} // Deal with tombstone events by pulling the object out. Tombstone events wrap the object in a @@ -127,12 +146,14 @@ func (e EventHandler) OnDelete(obj interface{}) { return } - for _, p := range e.Predicates { + for _, p := range e.predicates { if !p.Delete(d) { return } } // Invoke delete handler - e.EventHandler.Delete(d, e.Queue) + ctx, cancel := context.WithCancel(e.ctx) + defer cancel() + e.handler.Delete(ctx, d, e.queue) } diff --git a/pkg/internal/source/internal_test.go b/pkg/internal/source/internal_test.go index 8a2e1b6fd2..9203879ac8 100644 --- a/pkg/internal/source/internal_test.go +++ b/pkg/internal/source/internal_test.go @@ -17,6 +17,8 @@ limitations under the License. package internal_test import ( + "context" + . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "k8s.io/client-go/tools/cache" @@ -34,48 +36,45 @@ import ( ) var _ = Describe("Internal", func() { - - var instance internal.EventHandler + var ctx = context.Background() + var instance *internal.EventHandler var funcs, setfuncs *handler.Funcs var set bool BeforeEach(func() { funcs = &handler.Funcs{ - CreateFunc: func(event.CreateEvent, workqueue.RateLimitingInterface) { + CreateFunc: func(context.Context, event.CreateEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Did not expect CreateEvent to be called.") }, - DeleteFunc: func(e event.DeleteEvent, q workqueue.RateLimitingInterface) { + DeleteFunc: func(context.Context, event.DeleteEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Did not expect DeleteEvent to be called.") }, - UpdateFunc: func(event.UpdateEvent, workqueue.RateLimitingInterface) { + UpdateFunc: func(context.Context, event.UpdateEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Did not expect UpdateEvent to be called.") }, - GenericFunc: func(event.GenericEvent, workqueue.RateLimitingInterface) { + GenericFunc: func(context.Context, event.GenericEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Did not expect GenericEvent to be called.") }, } setfuncs = &handler.Funcs{ - CreateFunc: func(event.CreateEvent, workqueue.RateLimitingInterface) { + CreateFunc: func(context.Context, event.CreateEvent, workqueue.RateLimitingInterface) { set = true }, - DeleteFunc: func(e event.DeleteEvent, q workqueue.RateLimitingInterface) { + DeleteFunc: func(context.Context, event.DeleteEvent, workqueue.RateLimitingInterface) { set = true }, - UpdateFunc: func(event.UpdateEvent, workqueue.RateLimitingInterface) { + UpdateFunc: func(context.Context, event.UpdateEvent, workqueue.RateLimitingInterface) { set = true }, - GenericFunc: func(event.GenericEvent, workqueue.RateLimitingInterface) { + GenericFunc: func(context.Context, event.GenericEvent, workqueue.RateLimitingInterface) { set = true }, } - instance = internal.EventHandler{ - Queue: controllertest.Queue{}, - EventHandler: funcs, - } + instance = internal.NewEventHandler(ctx, controllertest.Queue{}, funcs, nil) }) Describe("EventHandler", func() { @@ -92,55 +91,49 @@ var _ = Describe("Internal", func() { }) It("should create a CreateEvent", func() { - funcs.CreateFunc = func(evt event.CreateEvent, q workqueue.RateLimitingInterface) { + funcs.CreateFunc = func(ctx context.Context, evt event.CreateEvent, q workqueue.RateLimitingInterface) { defer GinkgoRecover() - Expect(q).To(Equal(instance.Queue)) Expect(evt.Object).To(Equal(pod)) } instance.OnAdd(pod) }) It("should used Predicates to filter CreateEvents", func() { - instance = internal.EventHandler{ - Queue: controllertest.Queue{}, - EventHandler: setfuncs, - } - - set = false - instance.Predicates = []predicate.Predicate{ + instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{ predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return false }}, - } + }) + set = false instance.OnAdd(pod) Expect(set).To(BeFalse()) set = false - instance.Predicates = []predicate.Predicate{ + instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{ predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }}, - } + }) instance.OnAdd(pod) Expect(set).To(BeTrue()) set = false - instance.Predicates = []predicate.Predicate{ + instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{ predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }}, predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return false }}, - } + }) instance.OnAdd(pod) Expect(set).To(BeFalse()) set = false - instance.Predicates = []predicate.Predicate{ + instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{ predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return false }}, predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }}, - } + }) instance.OnAdd(pod) Expect(set).To(BeFalse()) set = false - instance.Predicates = []predicate.Predicate{ + instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{ predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }}, predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }}, - } + }) instance.OnAdd(pod) Expect(set).To(BeTrue()) }) @@ -154,10 +147,8 @@ var _ = Describe("Internal", func() { }) It("should create an UpdateEvent", func() { - funcs.UpdateFunc = func(evt event.UpdateEvent, q workqueue.RateLimitingInterface) { + funcs.UpdateFunc = func(ctx context.Context, evt event.UpdateEvent, q workqueue.RateLimitingInterface) { defer GinkgoRecover() - Expect(q).To(Equal(instance.Queue)) - Expect(evt.ObjectOld).To(Equal(pod)) Expect(evt.ObjectNew).To(Equal(newPod)) } @@ -165,46 +156,41 @@ var _ = Describe("Internal", func() { }) It("should used Predicates to filter UpdateEvents", func() { - instance = internal.EventHandler{ - Queue: controllertest.Queue{}, - EventHandler: setfuncs, - } - set = false - instance.Predicates = []predicate.Predicate{ + instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{ predicate.Funcs{UpdateFunc: func(updateEvent event.UpdateEvent) bool { return false }}, - } + }) instance.OnUpdate(pod, newPod) Expect(set).To(BeFalse()) set = false - instance.Predicates = []predicate.Predicate{ + instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{ predicate.Funcs{UpdateFunc: func(event.UpdateEvent) bool { return true }}, - } + }) instance.OnUpdate(pod, newPod) Expect(set).To(BeTrue()) set = false - instance.Predicates = []predicate.Predicate{ + instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{ predicate.Funcs{UpdateFunc: func(event.UpdateEvent) bool { return true }}, predicate.Funcs{UpdateFunc: func(event.UpdateEvent) bool { return false }}, - } + }) instance.OnUpdate(pod, newPod) Expect(set).To(BeFalse()) set = false - instance.Predicates = []predicate.Predicate{ + instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{ predicate.Funcs{UpdateFunc: func(event.UpdateEvent) bool { return false }}, predicate.Funcs{UpdateFunc: func(event.UpdateEvent) bool { return true }}, - } + }) instance.OnUpdate(pod, newPod) Expect(set).To(BeFalse()) set = false - instance.Predicates = []predicate.Predicate{ + instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{ predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }}, predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }}, - } + }) instance.OnUpdate(pod, newPod) Expect(set).To(BeTrue()) }) @@ -220,56 +206,49 @@ var _ = Describe("Internal", func() { }) It("should create a DeleteEvent", func() { - funcs.DeleteFunc = func(evt event.DeleteEvent, q workqueue.RateLimitingInterface) { + funcs.DeleteFunc = func(ctx context.Context, evt event.DeleteEvent, q workqueue.RateLimitingInterface) { defer GinkgoRecover() - Expect(q).To(Equal(instance.Queue)) - Expect(evt.Object).To(Equal(pod)) } instance.OnDelete(pod) }) It("should used Predicates to filter DeleteEvents", func() { - instance = internal.EventHandler{ - Queue: controllertest.Queue{}, - EventHandler: setfuncs, - } - set = false - instance.Predicates = []predicate.Predicate{ + instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{ predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return false }}, - } + }) instance.OnDelete(pod) Expect(set).To(BeFalse()) set = false - instance.Predicates = []predicate.Predicate{ + instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{ predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return true }}, - } + }) instance.OnDelete(pod) Expect(set).To(BeTrue()) set = false - instance.Predicates = []predicate.Predicate{ + instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{ predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return true }}, predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return false }}, - } + }) instance.OnDelete(pod) Expect(set).To(BeFalse()) set = false - instance.Predicates = []predicate.Predicate{ + instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{ predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return false }}, predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return true }}, - } + }) instance.OnDelete(pod) Expect(set).To(BeFalse()) set = false - instance.Predicates = []predicate.Predicate{ + instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{ predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return true }}, predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return true }}, - } + }) instance.OnDelete(pod) Expect(set).To(BeTrue()) }) @@ -287,9 +266,8 @@ var _ = Describe("Internal", func() { tombstone := cache.DeletedFinalStateUnknown{ Obj: pod, } - funcs.DeleteFunc = func(evt event.DeleteEvent, q workqueue.RateLimitingInterface) { + funcs.DeleteFunc = func(ctx context.Context, evt event.DeleteEvent, q workqueue.RateLimitingInterface) { defer GinkgoRecover() - Expect(q).To(Equal(instance.Queue)) Expect(evt.Object).To(Equal(pod)) } diff --git a/pkg/internal/source/kind.go b/pkg/internal/source/kind.go index d33d97d571..2e765acce8 100644 --- a/pkg/internal/source/kind.go +++ b/pkg/internal/source/kind.go @@ -79,7 +79,7 @@ func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue w return } - _, err := i.AddEventHandler(EventHandler{Queue: queue, EventHandler: handler, Predicates: prct}) + _, err := i.AddEventHandler(NewEventHandler(ctx, queue, handler, prct)) if err != nil { ks.started <- err return diff --git a/pkg/source/source.go b/pkg/source/source.go index e824613074..5fb7c439b6 100644 --- a/pkg/source/source.go +++ b/pkg/source/source.go @@ -133,7 +133,11 @@ func (cs *Channel) Start( } if shouldHandle { - handler.Generic(evt, queue) + func() { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + handler.Generic(ctx, evt, queue) + }() } } }() @@ -200,7 +204,7 @@ func (is *Informer) Start(ctx context.Context, handler handler.EventHandler, que return fmt.Errorf("must specify Informer.Informer") } - _, err := is.Informer.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct}) + _, err := is.Informer.AddEventHandler(internal.NewEventHandler(ctx, queue, handler, prct)) if err != nil { return err } diff --git a/pkg/source/source_integration_test.go b/pkg/source/source_integration_test.go index 2527cf0034..594d3c9a9c 100644 --- a/pkg/source/source_integration_test.go +++ b/pkg/source/source_integration_test.go @@ -17,6 +17,7 @@ limitations under the License. package source_test import ( + "context" "fmt" "time" @@ -102,17 +103,17 @@ var _ = Describe("Source", func() { // Create an event handler to verify the events newHandler := func(c chan interface{}) handler.Funcs { return handler.Funcs{ - CreateFunc: func(evt event.CreateEvent, rli workqueue.RateLimitingInterface) { + CreateFunc: func(ctx context.Context, evt event.CreateEvent, rli workqueue.RateLimitingInterface) { defer GinkgoRecover() Expect(rli).To(Equal(q)) c <- evt }, - UpdateFunc: func(evt event.UpdateEvent, rli workqueue.RateLimitingInterface) { + UpdateFunc: func(ctx context.Context, evt event.UpdateEvent, rli workqueue.RateLimitingInterface) { defer GinkgoRecover() Expect(rli).To(Equal(q)) c <- evt }, - DeleteFunc: func(evt event.DeleteEvent, rli workqueue.RateLimitingInterface) { + DeleteFunc: func(ctx context.Context, evt event.DeleteEvent, rli workqueue.RateLimitingInterface) { defer GinkgoRecover() Expect(rli).To(Equal(q)) c <- evt @@ -242,7 +243,7 @@ var _ = Describe("Source", func() { q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") instance := &source.Informer{Informer: depInformer} err := instance.Start(ctx, handler.Funcs{ - CreateFunc: func(evt event.CreateEvent, q2 workqueue.RateLimitingInterface) { + CreateFunc: func(ctx context.Context, evt event.CreateEvent, q2 workqueue.RateLimitingInterface) { defer GinkgoRecover() var err error rs, err := clientset.AppsV1().ReplicaSets("default").Get(ctx, rs.Name, metav1.GetOptions{}) @@ -252,15 +253,15 @@ var _ = Describe("Source", func() { Expect(evt.Object).To(Equal(rs)) close(c) }, - UpdateFunc: func(event.UpdateEvent, workqueue.RateLimitingInterface) { + UpdateFunc: func(context.Context, event.UpdateEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Unexpected UpdateEvent") }, - DeleteFunc: func(event.DeleteEvent, workqueue.RateLimitingInterface) { + DeleteFunc: func(context.Context, event.DeleteEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Unexpected DeleteEvent") }, - GenericFunc: func(event.GenericEvent, workqueue.RateLimitingInterface) { + GenericFunc: func(context.Context, event.GenericEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Unexpected GenericEvent") }, @@ -283,9 +284,9 @@ var _ = Describe("Source", func() { q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") instance := &source.Informer{Informer: depInformer} err = instance.Start(ctx, handler.Funcs{ - CreateFunc: func(evt event.CreateEvent, q2 workqueue.RateLimitingInterface) { + CreateFunc: func(ctx context.Context, evt event.CreateEvent, q2 workqueue.RateLimitingInterface) { }, - UpdateFunc: func(evt event.UpdateEvent, q2 workqueue.RateLimitingInterface) { + UpdateFunc: func(ctx context.Context, evt event.UpdateEvent, q2 workqueue.RateLimitingInterface) { defer GinkgoRecover() var err error rs2, err := clientset.AppsV1().ReplicaSets("default").Get(ctx, rs.Name, metav1.GetOptions{}) @@ -298,11 +299,11 @@ var _ = Describe("Source", func() { close(c) }, - DeleteFunc: func(event.DeleteEvent, workqueue.RateLimitingInterface) { + DeleteFunc: func(context.Context, event.DeleteEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Unexpected DeleteEvent") }, - GenericFunc: func(event.GenericEvent, workqueue.RateLimitingInterface) { + GenericFunc: func(context.Context, event.GenericEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Unexpected GenericEvent") }, @@ -320,17 +321,17 @@ var _ = Describe("Source", func() { q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") instance := &source.Informer{Informer: depInformer} err := instance.Start(ctx, handler.Funcs{ - CreateFunc: func(event.CreateEvent, workqueue.RateLimitingInterface) { + CreateFunc: func(context.Context, event.CreateEvent, workqueue.RateLimitingInterface) { }, - UpdateFunc: func(event.UpdateEvent, workqueue.RateLimitingInterface) { + UpdateFunc: func(context.Context, event.UpdateEvent, workqueue.RateLimitingInterface) { }, - DeleteFunc: func(evt event.DeleteEvent, q2 workqueue.RateLimitingInterface) { + DeleteFunc: func(ctx context.Context, evt event.DeleteEvent, q2 workqueue.RateLimitingInterface) { defer GinkgoRecover() Expect(q2).To(Equal(q)) Expect(evt.Object.GetName()).To(Equal(rs.Name)) close(c) }, - GenericFunc: func(event.GenericEvent, workqueue.RateLimitingInterface) { + GenericFunc: func(context.Context, event.GenericEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Unexpected GenericEvent") }, diff --git a/pkg/source/source_test.go b/pkg/source/source_test.go index f80a00f02d..1e0e3afed3 100644 --- a/pkg/source/source_test.go +++ b/pkg/source/source_test.go @@ -66,21 +66,21 @@ var _ = Describe("Source", func() { q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") instance := source.Kind(ic, &corev1.Pod{}) err := instance.Start(ctx, handler.Funcs{ - CreateFunc: func(evt event.CreateEvent, q2 workqueue.RateLimitingInterface) { + CreateFunc: func(ctx context.Context, evt event.CreateEvent, q2 workqueue.RateLimitingInterface) { defer GinkgoRecover() Expect(q2).To(Equal(q)) Expect(evt.Object).To(Equal(p)) close(c) }, - UpdateFunc: func(event.UpdateEvent, workqueue.RateLimitingInterface) { + UpdateFunc: func(context.Context, event.UpdateEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Unexpected UpdateEvent") }, - DeleteFunc: func(event.DeleteEvent, workqueue.RateLimitingInterface) { + DeleteFunc: func(context.Context, event.DeleteEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Unexpected DeleteEvent") }, - GenericFunc: func(event.GenericEvent, workqueue.RateLimitingInterface) { + GenericFunc: func(context.Context, event.GenericEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Unexpected GenericEvent") }, @@ -103,11 +103,11 @@ var _ = Describe("Source", func() { q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") instance := source.Kind(ic, &corev1.Pod{}) err := instance.Start(ctx, handler.Funcs{ - CreateFunc: func(evt event.CreateEvent, q2 workqueue.RateLimitingInterface) { + CreateFunc: func(ctx context.Context, evt event.CreateEvent, q2 workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Unexpected CreateEvent") }, - UpdateFunc: func(evt event.UpdateEvent, q2 workqueue.RateLimitingInterface) { + UpdateFunc: func(ctx context.Context, evt event.UpdateEvent, q2 workqueue.RateLimitingInterface) { defer GinkgoRecover() Expect(q2).To(BeIdenticalTo(q)) Expect(evt.ObjectOld).To(Equal(p)) @@ -116,11 +116,11 @@ var _ = Describe("Source", func() { close(c) }, - DeleteFunc: func(event.DeleteEvent, workqueue.RateLimitingInterface) { + DeleteFunc: func(context.Context, event.DeleteEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Unexpected DeleteEvent") }, - GenericFunc: func(event.GenericEvent, workqueue.RateLimitingInterface) { + GenericFunc: func(context.Context, event.GenericEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Unexpected GenericEvent") }, @@ -148,21 +148,21 @@ var _ = Describe("Source", func() { q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") instance := source.Kind(ic, &corev1.Pod{}) err := instance.Start(ctx, handler.Funcs{ - CreateFunc: func(event.CreateEvent, workqueue.RateLimitingInterface) { + CreateFunc: func(context.Context, event.CreateEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Unexpected DeleteEvent") }, - UpdateFunc: func(event.UpdateEvent, workqueue.RateLimitingInterface) { + UpdateFunc: func(context.Context, event.UpdateEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Unexpected UpdateEvent") }, - DeleteFunc: func(evt event.DeleteEvent, q2 workqueue.RateLimitingInterface) { + DeleteFunc: func(ctx context.Context, evt event.DeleteEvent, q2 workqueue.RateLimitingInterface) { defer GinkgoRecover() Expect(q2).To(BeIdenticalTo(q)) Expect(evt.Object).To(Equal(p)) close(c) }, - GenericFunc: func(event.GenericEvent, workqueue.RateLimitingInterface) { + GenericFunc: func(context.Context, event.GenericEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Unexpected GenericEvent") }, @@ -290,19 +290,19 @@ var _ = Describe("Source", func() { q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") instance := &source.Channel{Source: ch} err := instance.Start(ctx, handler.Funcs{ - CreateFunc: func(event.CreateEvent, workqueue.RateLimitingInterface) { + CreateFunc: func(context.Context, event.CreateEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Unexpected CreateEvent") }, - UpdateFunc: func(event.UpdateEvent, workqueue.RateLimitingInterface) { + UpdateFunc: func(context.Context, event.UpdateEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Unexpected UpdateEvent") }, - DeleteFunc: func(event.DeleteEvent, workqueue.RateLimitingInterface) { + DeleteFunc: func(context.Context, event.DeleteEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Unexpected DeleteEvent") }, - GenericFunc: func(evt event.GenericEvent, q2 workqueue.RateLimitingInterface) { + GenericFunc: func(ctx context.Context, evt event.GenericEvent, q2 workqueue.RateLimitingInterface) { defer GinkgoRecover() // The empty event should have been filtered out by the predicates, // and will not be passed to the handler. @@ -329,19 +329,19 @@ var _ = Describe("Source", func() { instance := &source.Channel{Source: ch} instance.DestBufferSize = 1 err := instance.Start(ctx, handler.Funcs{ - CreateFunc: func(event.CreateEvent, workqueue.RateLimitingInterface) { + CreateFunc: func(context.Context, event.CreateEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Unexpected CreateEvent") }, - UpdateFunc: func(event.UpdateEvent, workqueue.RateLimitingInterface) { + UpdateFunc: func(context.Context, event.UpdateEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Unexpected UpdateEvent") }, - DeleteFunc: func(event.DeleteEvent, workqueue.RateLimitingInterface) { + DeleteFunc: func(context.Context, event.DeleteEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Unexpected DeleteEvent") }, - GenericFunc: func(evt event.GenericEvent, q2 workqueue.RateLimitingInterface) { + GenericFunc: func(ctx context.Context, evt event.GenericEvent, q2 workqueue.RateLimitingInterface) { defer GinkgoRecover() // Block for the first time if eventCount == 0 { @@ -386,19 +386,19 @@ var _ = Describe("Source", func() { instance.DestBufferSize = 1 err := instance.Start(ctx, handler.Funcs{ - CreateFunc: func(event.CreateEvent, workqueue.RateLimitingInterface) { + CreateFunc: func(context.Context, event.CreateEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Unexpected CreateEvent") }, - UpdateFunc: func(event.UpdateEvent, workqueue.RateLimitingInterface) { + UpdateFunc: func(context.Context, event.UpdateEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Unexpected UpdateEvent") }, - DeleteFunc: func(event.DeleteEvent, workqueue.RateLimitingInterface) { + DeleteFunc: func(context.Context, event.DeleteEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Unexpected DeleteEvent") }, - GenericFunc: func(evt event.GenericEvent, q2 workqueue.RateLimitingInterface) { + GenericFunc: func(ctx context.Context, evt event.GenericEvent, q2 workqueue.RateLimitingInterface) { defer GinkgoRecover() close(processed) @@ -427,19 +427,19 @@ var _ = Describe("Source", func() { defer close(processed) err := src.Start(ctx, handler.Funcs{ - CreateFunc: func(event.CreateEvent, workqueue.RateLimitingInterface) { + CreateFunc: func(context.Context, event.CreateEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Unexpected CreateEvent") }, - UpdateFunc: func(event.UpdateEvent, workqueue.RateLimitingInterface) { + UpdateFunc: func(context.Context, event.UpdateEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Unexpected UpdateEvent") }, - DeleteFunc: func(event.DeleteEvent, workqueue.RateLimitingInterface) { + DeleteFunc: func(context.Context, event.DeleteEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Unexpected DeleteEvent") }, - GenericFunc: func(evt event.GenericEvent, q2 workqueue.RateLimitingInterface) { + GenericFunc: func(ctx context.Context, evt event.GenericEvent, q2 workqueue.RateLimitingInterface) { defer GinkgoRecover() processed <- struct{}{} @@ -475,19 +475,19 @@ var _ = Describe("Source", func() { q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") instance := &source.Channel{Source: ch} err := instance.Start(ctx, handler.Funcs{ - CreateFunc: func(event.CreateEvent, workqueue.RateLimitingInterface) { + CreateFunc: func(context.Context, event.CreateEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Unexpected CreateEvent") }, - UpdateFunc: func(event.UpdateEvent, workqueue.RateLimitingInterface) { + UpdateFunc: func(context.Context, event.UpdateEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Unexpected UpdateEvent") }, - DeleteFunc: func(event.DeleteEvent, workqueue.RateLimitingInterface) { + DeleteFunc: func(context.Context, event.DeleteEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Unexpected DeleteEvent") }, - GenericFunc: func(evt event.GenericEvent, q2 workqueue.RateLimitingInterface) { + GenericFunc: func(ctx context.Context, evt event.GenericEvent, q2 workqueue.RateLimitingInterface) { defer GinkgoRecover() Expect(q2).To(BeIdenticalTo(q)) Expect(evt.Object).To(Equal(p)) @@ -498,19 +498,19 @@ var _ = Describe("Source", func() { Expect(err).NotTo(HaveOccurred()) err = instance.Start(ctx, handler.Funcs{ - CreateFunc: func(event.CreateEvent, workqueue.RateLimitingInterface) { + CreateFunc: func(context.Context, event.CreateEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Unexpected CreateEvent") }, - UpdateFunc: func(event.UpdateEvent, workqueue.RateLimitingInterface) { + UpdateFunc: func(context.Context, event.UpdateEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Unexpected UpdateEvent") }, - DeleteFunc: func(event.DeleteEvent, workqueue.RateLimitingInterface) { + DeleteFunc: func(context.Context, event.DeleteEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Unexpected DeleteEvent") }, - GenericFunc: func(evt event.GenericEvent, q2 workqueue.RateLimitingInterface) { + GenericFunc: func(ctx context.Context, evt event.GenericEvent, q2 workqueue.RateLimitingInterface) { defer GinkgoRecover() Expect(q2).To(BeIdenticalTo(q)) Expect(evt.Object).To(Equal(p))