Skip to content

Commit

Permalink
✨ Deduplicate events before sending them into the workqueue. (#1390)
Browse files Browse the repository at this point in the history
* ✨ Deduplicate events before sending them into the workqueue.

This avoids race conditions where extra reconciles can happen rarely.

* ✨ Switch to map[string]struct{} to reduce memory usage slightly.

Also make sure that enqueue_mapped preserves order.

* 📝 Update function doc for getOwnerReconcileRequest.

* 🎨 Fix up duplication tests and ensure Update for _mapped dedups between both objects.
  • Loading branch information
coderanger committed Mar 14, 2021
1 parent b125a18 commit 253f275
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 91 deletions.
18 changes: 8 additions & 10 deletions pkg/handler/enqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (

var enqueueLog = logf.RuntimeLog.WithName("eventhandler").WithName("EnqueueRequestForObject")

type empty struct{}

var _ EventHandler = &EnqueueRequestForObject{}

// EnqueueRequestForObject enqueues a Request containing the Name and Namespace of the object that is the source of the Event.
Expand All @@ -47,22 +49,18 @@ func (e *EnqueueRequestForObject) Create(evt event.CreateEvent, q workqueue.Rate

// Update implements EventHandler
func (e *EnqueueRequestForObject) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
if evt.ObjectOld != nil {
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.ObjectOld.GetName(),
Namespace: evt.ObjectOld.GetNamespace(),
}})
} else {
enqueueLog.Error(nil, "UpdateEvent received with no old metadata", "event", evt)
}

if evt.ObjectNew != nil {
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.ObjectNew.GetName(),
Namespace: evt.ObjectNew.GetNamespace(),
}})
} else if evt.ObjectOld != nil {
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.ObjectOld.GetName(),
Namespace: evt.ObjectOld.GetNamespace(),
}})
} else {
enqueueLog.Error(nil, "UpdateEvent received with no new metadata", "event", evt)
enqueueLog.Error(nil, "UpdateEvent received with no metadata", "event", evt)
}
}

Expand Down
22 changes: 15 additions & 7 deletions pkg/handler/enqueue_mapped.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,28 +53,36 @@ type enqueueRequestsFromMapFunc struct {

// Create implements EventHandler
func (e *enqueueRequestsFromMapFunc) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
e.mapAndEnqueue(q, evt.Object)
reqs := map[reconcile.Request]empty{}
e.mapAndEnqueue(q, evt.Object, reqs)
}

// Update implements EventHandler
func (e *enqueueRequestsFromMapFunc) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
e.mapAndEnqueue(q, evt.ObjectOld)
e.mapAndEnqueue(q, evt.ObjectNew)
reqs := map[reconcile.Request]empty{}
e.mapAndEnqueue(q, evt.ObjectOld, reqs)
e.mapAndEnqueue(q, evt.ObjectNew, reqs)
}

// Delete implements EventHandler
func (e *enqueueRequestsFromMapFunc) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
e.mapAndEnqueue(q, evt.Object)
reqs := map[reconcile.Request]empty{}
e.mapAndEnqueue(q, evt.Object, reqs)
}

// Generic implements EventHandler
func (e *enqueueRequestsFromMapFunc) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) {
e.mapAndEnqueue(q, evt.Object)
reqs := map[reconcile.Request]empty{}
e.mapAndEnqueue(q, evt.Object, reqs)
}

func (e *enqueueRequestsFromMapFunc) mapAndEnqueue(q workqueue.RateLimitingInterface, object client.Object) {
func (e *enqueueRequestsFromMapFunc) mapAndEnqueue(q workqueue.RateLimitingInterface, object client.Object, reqs map[reconcile.Request]empty) {
for _, req := range e.toRequests(object) {
q.Add(req)
_, ok := reqs[req]
if !ok {
q.Add(req)
reqs[req] = empty{}
}
}
}

Expand Down
34 changes: 18 additions & 16 deletions pkg/handler/enqueue_owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,31 +59,37 @@ type EnqueueRequestForOwner struct {

// Create implements EventHandler
func (e *EnqueueRequestForOwner) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
for _, req := range e.getOwnerReconcileRequest(evt.Object) {
reqs := map[reconcile.Request]empty{}
e.getOwnerReconcileRequest(evt.Object, reqs)
for req := range reqs {
q.Add(req)
}
}

// Update implements EventHandler
func (e *EnqueueRequestForOwner) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
for _, req := range e.getOwnerReconcileRequest(evt.ObjectOld) {
q.Add(req)
}
for _, req := range e.getOwnerReconcileRequest(evt.ObjectNew) {
reqs := map[reconcile.Request]empty{}
e.getOwnerReconcileRequest(evt.ObjectOld, reqs)
e.getOwnerReconcileRequest(evt.ObjectNew, reqs)
for req := range reqs {
q.Add(req)
}
}

// Delete implements EventHandler
func (e *EnqueueRequestForOwner) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
for _, req := range e.getOwnerReconcileRequest(evt.Object) {
reqs := map[reconcile.Request]empty{}
e.getOwnerReconcileRequest(evt.Object, reqs)
for req := range reqs {
q.Add(req)
}
}

// Generic implements EventHandler
func (e *EnqueueRequestForOwner) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) {
for _, req := range e.getOwnerReconcileRequest(evt.Object) {
reqs := map[reconcile.Request]empty{}
e.getOwnerReconcileRequest(evt.Object, reqs)
for req := range reqs {
q.Add(req)
}
}
Expand All @@ -109,19 +115,18 @@ func (e *EnqueueRequestForOwner) parseOwnerTypeGroupKind(scheme *runtime.Scheme)
return nil
}

// getOwnerReconcileRequest looks at object and returns a slice of reconcile.Request to reconcile
// getOwnerReconcileRequest looks at object and builds a map of reconcile.Request to reconcile
// owners of object that match e.OwnerType.
func (e *EnqueueRequestForOwner) getOwnerReconcileRequest(object metav1.Object) []reconcile.Request {
func (e *EnqueueRequestForOwner) getOwnerReconcileRequest(object metav1.Object, result map[reconcile.Request]empty) {
// Iterate through the OwnerReferences looking for a match on Group and Kind against what was requested
// by the user
var result []reconcile.Request
for _, ref := range e.getOwnersReferences(object) {
// Parse the Group out of the OwnerReference to compare it to what was parsed out of the requested OwnerType
refGV, err := schema.ParseGroupVersion(ref.APIVersion)
if err != nil {
log.Error(err, "Could not parse OwnerReference APIVersion",
"api version", ref.APIVersion)
return nil
return
}

// Compare the OwnerReference Group and Kind against the OwnerType Group and Kind specified by the user.
Expand All @@ -138,18 +143,15 @@ func (e *EnqueueRequestForOwner) getOwnerReconcileRequest(object metav1.Object)
mapping, err := e.mapper.RESTMapping(e.groupKind, refGV.Version)
if err != nil {
log.Error(err, "Could not retrieve rest mapping", "kind", e.groupKind)
return nil
return
}
if mapping.Scope.Name() != meta.RESTScopeNameRoot {
request.Namespace = object.GetNamespace()
}

result = append(result, request)
result[request] = empty{}
}
}

// Return the matches
return result
}

// getOwnersReferences returns the OwnerReferences for an object as specified by the EnqueueRequestForOwner
Expand Down
140 changes: 82 additions & 58 deletions pkg/handler/eventhandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ var _ = Describe("Eventhandler", func() {
close(done)
})

It("should enqueue a Request with the Name / Namespace of both objects in the UpdateEvent.",
It("should enqueue a Request with the Name / Namespace of one object in the UpdateEvent.",
func(done Done) {
newPod := pod.DeepCopy()
newPod.Name = "baz2"
Expand All @@ -97,18 +97,12 @@ var _ = Describe("Eventhandler", func() {
ObjectNew: newPod,
}
instance.Update(evt, q)
Expect(q.Len()).To(Equal(2))
Expect(q.Len()).To(Equal(1))

i, _ := q.Get()
Expect(i).NotTo(BeNil())
req, ok := i.(reconcile.Request)
Expect(ok).To(BeTrue())
Expect(req.NamespacedName).To(Equal(types.NamespacedName{Namespace: "biz", Name: "baz"}))

i, _ = q.Get()
Expect(i).NotTo(BeNil())
req, ok = i.(reconcile.Request)
Expect(ok).To(BeTrue())
Expect(req.NamespacedName).To(Equal(types.NamespacedName{Namespace: "biz2", Name: "baz2"}))

close(done)
Expand Down Expand Up @@ -212,13 +206,14 @@ var _ = Describe("Eventhandler", func() {
instance.Create(evt, q)
Expect(q.Len()).To(Equal(2))

i, _ := q.Get()
Expect(i).To(Equal(reconcile.Request{
NamespacedName: types.NamespacedName{Namespace: "foo", Name: "bar"}}))

i, _ = q.Get()
Expect(i).To(Equal(reconcile.Request{
NamespacedName: types.NamespacedName{Namespace: "biz", Name: "baz"}}))
i1, _ := q.Get()
i2, _ := q.Get()
Expect([]interface{}{i1, i2}).To(ConsistOf(
reconcile.Request{
NamespacedName: types.NamespacedName{Namespace: "foo", Name: "bar"}},
reconcile.Request{
NamespacedName: types.NamespacedName{Namespace: "biz", Name: "baz"}},
))
})

It("should enqueue a Request with the function applied to the DeleteEvent.", func() {
Expand All @@ -243,20 +238,19 @@ var _ = Describe("Eventhandler", func() {
instance.Delete(evt, q)
Expect(q.Len()).To(Equal(2))

i, _ := q.Get()
Expect(i).To(Equal(reconcile.Request{
NamespacedName: types.NamespacedName{Namespace: "foo", Name: "bar"}}))

i, _ = q.Get()
Expect(i).To(Equal(reconcile.Request{
NamespacedName: types.NamespacedName{Namespace: "biz", Name: "baz"}}))
i1, _ := q.Get()
i2, _ := q.Get()
Expect([]interface{}{i1, i2}).To(ConsistOf(
reconcile.Request{
NamespacedName: types.NamespacedName{Namespace: "foo", Name: "bar"}},
reconcile.Request{
NamespacedName: types.NamespacedName{Namespace: "biz", Name: "baz"}},
))
})

It("should enqueue a Request with the function applied to both objects in the UpdateEvent.",
func() {
newPod := pod.DeepCopy()
newPod.Name = pod.Name + "2"
newPod.Namespace = pod.Namespace + "2"

req := []reconcile.Request{}

Expand All @@ -278,23 +272,13 @@ var _ = Describe("Eventhandler", func() {
ObjectNew: newPod,
}
instance.Update(evt, q)
Expect(q.Len()).To(Equal(4))
Expect(q.Len()).To(Equal(2))

i, _ := q.Get()
Expect(i).To(Equal(reconcile.Request{
NamespacedName: types.NamespacedName{Namespace: "foo", Name: "baz-bar"}}))
Expect(i).To(Equal(reconcile.Request{NamespacedName: types.NamespacedName{Namespace: "foo", Name: "baz-bar"}}))

i, _ = q.Get()
Expect(i).To(Equal(reconcile.Request{
NamespacedName: types.NamespacedName{Namespace: "biz", Name: "baz-baz"}}))

i, _ = q.Get()
Expect(i).To(Equal(reconcile.Request{
NamespacedName: types.NamespacedName{Namespace: "foo", Name: "baz2-bar"}}))

i, _ = q.Get()
Expect(i).To(Equal(reconcile.Request{
NamespacedName: types.NamespacedName{Namespace: "biz", Name: "baz2-baz"}}))
Expect(i).To(Equal(reconcile.Request{NamespacedName: types.NamespacedName{Namespace: "biz", Name: "baz-baz"}}))
})

It("should enqueue a Request with the function applied to the GenericEvent.", func() {
Expand All @@ -319,13 +303,14 @@ var _ = Describe("Eventhandler", func() {
instance.Generic(evt, q)
Expect(q.Len()).To(Equal(2))

i, _ := q.Get()
Expect(i).To(Equal(reconcile.Request{
NamespacedName: types.NamespacedName{Namespace: "foo", Name: "bar"}}))

i, _ = q.Get()
Expect(i).To(Equal(reconcile.Request{
NamespacedName: types.NamespacedName{Namespace: "biz", Name: "baz"}}))
i1, _ := q.Get()
i2, _ := q.Get()
Expect([]interface{}{i1, i2}).To(ConsistOf(
reconcile.Request{
NamespacedName: types.NamespacedName{Namespace: "foo", Name: "bar"}},
reconcile.Request{
NamespacedName: types.NamespacedName{Namespace: "biz", Name: "baz"}},
))
})
})

Expand Down Expand Up @@ -412,13 +397,50 @@ var _ = Describe("Eventhandler", func() {
instance.Update(evt, q)
Expect(q.Len()).To(Equal(2))

i, _ := q.Get()
Expect(i).To(Equal(reconcile.Request{
NamespacedName: types.NamespacedName{Namespace: pod.GetNamespace(), Name: "foo1-parent"}}))
i1, _ := q.Get()
i2, _ := q.Get()
Expect([]interface{}{i1, i2}).To(ConsistOf(
reconcile.Request{
NamespacedName: types.NamespacedName{Namespace: pod.GetNamespace(), Name: "foo1-parent"}},
reconcile.Request{
NamespacedName: types.NamespacedName{Namespace: newPod.GetNamespace(), Name: "foo2-parent"}},
))
})

i, _ = q.Get()
It("should enqueue a Request with the one duplicate Owner of both objects in the UpdateEvent.", func() {
newPod := pod.DeepCopy()
newPod.Name = pod.Name + "2"

instance := handler.EnqueueRequestForOwner{
OwnerType: &appsv1.ReplicaSet{},
}
Expect(instance.InjectScheme(scheme.Scheme)).To(Succeed())
Expect(instance.InjectMapper(mapper)).To(Succeed())

pod.OwnerReferences = []metav1.OwnerReference{
{
Name: "foo-parent",
Kind: "ReplicaSet",
APIVersion: "apps/v1",
},
}
newPod.OwnerReferences = []metav1.OwnerReference{
{
Name: "foo-parent",
Kind: "ReplicaSet",
APIVersion: "apps/v1",
},
}
evt := event.UpdateEvent{
ObjectOld: pod,
ObjectNew: newPod,
}
instance.Update(evt, q)
Expect(q.Len()).To(Equal(1))

i, _ := q.Get()
Expect(i).To(Equal(reconcile.Request{
NamespacedName: types.NamespacedName{Namespace: newPod.GetNamespace(), Name: "foo2-parent"}}))
NamespacedName: types.NamespacedName{Namespace: pod.GetNamespace(), Name: "foo-parent"}}))
})

It("should enqueue a Request with the Owner of the object in the GenericEvent.", func() {
Expand Down Expand Up @@ -659,15 +681,17 @@ var _ = Describe("Eventhandler", func() {
instance.Create(evt, q)
Expect(q.Len()).To(Equal(3))

i, _ := q.Get()
Expect(i).To(Equal(reconcile.Request{
NamespacedName: types.NamespacedName{Namespace: pod.GetNamespace(), Name: "foo1-parent"}}))
i, _ = q.Get()
Expect(i).To(Equal(reconcile.Request{
NamespacedName: types.NamespacedName{Namespace: pod.GetNamespace(), Name: "foo2-parent"}}))
i, _ = q.Get()
Expect(i).To(Equal(reconcile.Request{
NamespacedName: types.NamespacedName{Namespace: pod.GetNamespace(), Name: "foo3-parent"}}))
i1, _ := q.Get()
i2, _ := q.Get()
i3, _ := q.Get()
Expect([]interface{}{i1, i2, i3}).To(ConsistOf(
reconcile.Request{
NamespacedName: types.NamespacedName{Namespace: pod.GetNamespace(), Name: "foo1-parent"}},
reconcile.Request{
NamespacedName: types.NamespacedName{Namespace: pod.GetNamespace(), Name: "foo2-parent"}},
reconcile.Request{
NamespacedName: types.NamespacedName{Namespace: pod.GetNamespace(), Name: "foo3-parent"}},
))
})
})

Expand Down

0 comments on commit 253f275

Please sign in to comment.