From 7f4300c58d437a483588715f32b007568e610e7d Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Tue, 28 May 2024 14:11:26 -0400 Subject: [PATCH 1/3] feat: sequences only recreate subscriptions when the changes are to immutable fields Signed-off-by: Calum Murray --- pkg/reconciler/sequence/sequence.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/pkg/reconciler/sequence/sequence.go b/pkg/reconciler/sequence/sequence.go index 7a7d5fdbf7a..fe0df8fa183 100644 --- a/pkg/reconciler/sequence/sequence.go +++ b/pkg/reconciler/sequence/sequence.go @@ -20,9 +20,9 @@ import ( "context" "fmt" + "github.com/google/go-cmp/cmp/cmpopts" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/equality" apierrs "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -46,6 +46,7 @@ import ( listers "knative.dev/eventing/pkg/client/listers/flows/v1" messaginglisters "knative.dev/eventing/pkg/client/listers/messaging/v1" "knative.dev/eventing/pkg/duck" + "knative.dev/pkg/kmp" corev1listers "k8s.io/client-go/listers/core/v1" "knative.dev/eventing/pkg/reconciler/sequence/resources" @@ -188,6 +189,7 @@ func (r *Reconciler) reconcileSubscription(ctx context.Context, step int, p *v1. subName := resources.SequenceSubscriptionName(p.Name, step) sub, err := r.subscriptionLister.Subscriptions(p.Namespace).Get(subName) + ignoreArguments := cmpopts.IgnoreFields(messagingv1.SubscriptionSpec{}, "Subscriber", "Reply", "Delivery") // If the resource doesn't exist, we'll create it. if apierrs.IsNotFound(err) { sub = expected @@ -204,7 +206,7 @@ func (r *Reconciler) reconcileSubscription(ctx context.Context, step int, p *v1. // TODO: Send events here, or elsewhere? //r.Recorder.Eventf(p, corev1.EventTypeWarning, subscriptionCreateFailed, "Create Sequences's subscription failed: %v", err) return nil, fmt.Errorf("failed to get subscription: %s", err) - } else if !equality.Semantic.DeepDerivative(expected.Spec, sub.Spec) { + } else if equal, err := kmp.SafeEqual(sub.Spec, expected.Spec, ignoreArguments); !equal || err != nil { // Given that spec.channel is immutable, we cannot just update the subscription. We delete // it instead, and re-create it. err = r.eventingClientSet.MessagingV1().Subscriptions(sub.Namespace).Delete(ctx, sub.Name, metav1.DeleteOptions{}) @@ -218,6 +220,14 @@ func (r *Reconciler) reconcileSubscription(ctx context.Context, step int, p *v1. return nil, err } return newSub, nil + } else if equal, err := kmp.SafeEqual(sub.Spec, expected.Spec); !equal || err != nil { + // only the mutable fields were changed, so we can update the subscription + updatedSub, err := r.eventingClientSet.MessagingV1().Subscriptions(sub.Namespace).Update(ctx, expected, metav1.UpdateOptions{}) + if err != nil { + logging.FromContext(ctx).Infow("Cannot update subscription", zap.Error(err)) + return nil, err + } + return updatedSub, nil } return sub, nil } From a7f95f9951cc05586d1ad345fff0223350596537 Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Tue, 28 May 2024 14:12:07 -0400 Subject: [PATCH 2/3] fix(tests): sequence unit tests correctly check for updates to subscriptions Signed-off-by: Calum Murray --- pkg/reconciler/sequence/sequence_test.go | 45 ++++++++++++++++++++---- 1 file changed, 39 insertions(+), 6 deletions(-) diff --git a/pkg/reconciler/sequence/sequence_test.go b/pkg/reconciler/sequence/sequence_test.go index 0e4f3aa0ee3..e50cf64a5dd 100644 --- a/pkg/reconciler/sequence/sequence_test.go +++ b/pkg/reconciler/sequence/sequence_test.go @@ -896,18 +896,15 @@ func TestAllCases(t *testing.T) { WithSequenceSteps([]v1.SequenceStep{{Destination: createDestination(0)}}))), }, WantErr: false, - WantDeletes: []clientgotesting.DeleteActionImpl{{ + WantUpdates: []clientgotesting.UpdateActionImpl{{ ActionImpl: clientgotesting.ActionImpl{ Namespace: testNS, Resource: v1.SchemeGroupVersion.WithResource("subscriptions"), }, - Name: resources.SequenceChannelName(sequenceName, 0), - }}, - WantCreates: []runtime.Object{ - resources.NewSubscription(0, NewSequence(sequenceName, testNS, + Object: resources.NewSubscription(0, NewSequence(sequenceName, testNS, WithSequenceChannelTemplateSpec(imc), WithSequenceSteps([]v1.SequenceStep{{Destination: createDestination(1)}}))), - }, + }}, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewSequence(sequenceName, testNS, WithInitSequenceConditions, @@ -1001,6 +998,18 @@ func TestAllCases(t *testing.T) { Name: resources.SequenceChannelName(sequenceName, 2), }, }, + WantUpdates: []clientgotesting.UpdateActionImpl{{ + ActionImpl: clientgotesting.ActionImpl{ + Namespace: testNS, + Resource: v1.SchemeGroupVersion.WithResource("subscriptions"), + }, + Object: resources.NewSubscription(1, NewSequence(sequenceName, testNS, + WithSequenceChannelTemplateSpec(imc), + WithSequenceSteps([]v1.SequenceStep{ + {Destination: createDestination(0)}, + {Destination: createDestination(1)}}, + ))), + }}, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewSequence(sequenceName, testNS, WithInitSequenceConditions, @@ -1131,6 +1140,18 @@ func TestAllCases(t *testing.T) { Name: resources.SequenceChannelName(sequenceName, 2), }, }, + WantUpdates: []clientgotesting.UpdateActionImpl{{ + ActionImpl: clientgotesting.ActionImpl{ + Namespace: testNS, + Resource: v1.SchemeGroupVersion.WithResource("subscriptions"), + }, + Object: resources.NewSubscription(1, NewSequence(sequenceName, testNS, + WithSequenceChannelTemplateSpec(imc), + WithSequenceSteps([]v1.SequenceStep{ + {Destination: createDestination(0)}, + {Destination: createDestination(1)}}, + ))), + }}, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewSequence(sequenceName, testNS, WithInitSequenceConditions, @@ -1245,6 +1266,18 @@ func TestAllCases(t *testing.T) { WantEvents: []string{ Eventf(corev1.EventTypeWarning, "InternalError", "inducing failure for delete inmemorychannels"), }, + WantUpdates: []clientgotesting.UpdateActionImpl{{ + ActionImpl: clientgotesting.ActionImpl{ + Namespace: testNS, + Resource: v1.SchemeGroupVersion.WithResource("subscriptions"), + }, + Object: resources.NewSubscription(1, NewSequence(sequenceName, testNS, + WithSequenceChannelTemplateSpec(imc), + WithSequenceSteps([]v1.SequenceStep{ + {Destination: createDestination(0)}, + {Destination: createDestination(1)}}, + ))), + }}, WantDeletes: []clientgotesting.DeleteActionImpl{ { From 19cbbb4cac5decef44a4ab6ac96a505c676de9be Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Thu, 30 May 2024 11:47:45 -0400 Subject: [PATCH 3/3] cleanup: use method from api folder to check immutable fields Signed-off-by: Calum Murray --- pkg/reconciler/sequence/sequence.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pkg/reconciler/sequence/sequence.go b/pkg/reconciler/sequence/sequence.go index fe0df8fa183..c95a75c6788 100644 --- a/pkg/reconciler/sequence/sequence.go +++ b/pkg/reconciler/sequence/sequence.go @@ -20,7 +20,6 @@ import ( "context" "fmt" - "github.com/google/go-cmp/cmp/cmpopts" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" apierrs "k8s.io/apimachinery/pkg/api/errors" @@ -189,7 +188,6 @@ func (r *Reconciler) reconcileSubscription(ctx context.Context, step int, p *v1. subName := resources.SequenceSubscriptionName(p.Name, step) sub, err := r.subscriptionLister.Subscriptions(p.Namespace).Get(subName) - ignoreArguments := cmpopts.IgnoreFields(messagingv1.SubscriptionSpec{}, "Subscriber", "Reply", "Delivery") // If the resource doesn't exist, we'll create it. if apierrs.IsNotFound(err) { sub = expected @@ -206,7 +204,7 @@ func (r *Reconciler) reconcileSubscription(ctx context.Context, step int, p *v1. // TODO: Send events here, or elsewhere? //r.Recorder.Eventf(p, corev1.EventTypeWarning, subscriptionCreateFailed, "Create Sequences's subscription failed: %v", err) return nil, fmt.Errorf("failed to get subscription: %s", err) - } else if equal, err := kmp.SafeEqual(sub.Spec, expected.Spec, ignoreArguments); !equal || err != nil { + } else if immutableFieldsChanged := expected.CheckImmutableFields(ctx, sub); immutableFieldsChanged != nil { // Given that spec.channel is immutable, we cannot just update the subscription. We delete // it instead, and re-create it. err = r.eventingClientSet.MessagingV1().Subscriptions(sub.Namespace).Delete(ctx, sub.Name, metav1.DeleteOptions{})