From e5f6c1a8a857006b74e49346a999a867fa82c85e Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Tue, 28 May 2024 14:11:26 -0400 Subject: [PATCH 1/3] feat: sequences only recreate subscriptions when the changes are to immutable fields Signed-off-by: Calum Murray --- pkg/reconciler/sequence/sequence.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/pkg/reconciler/sequence/sequence.go b/pkg/reconciler/sequence/sequence.go index d37dd3b793a..cdf30881a83 100644 --- a/pkg/reconciler/sequence/sequence.go +++ b/pkg/reconciler/sequence/sequence.go @@ -20,9 +20,9 @@ import ( "context" "fmt" + "github.com/google/go-cmp/cmp/cmpopts" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/equality" apierrs "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -43,6 +43,7 @@ import ( listers "knative.dev/eventing/pkg/client/listers/flows/v1" messaginglisters "knative.dev/eventing/pkg/client/listers/messaging/v1" "knative.dev/eventing/pkg/duck" + "knative.dev/pkg/kmp" "knative.dev/eventing/pkg/reconciler/sequence/resources" ) @@ -174,6 +175,7 @@ func (r *Reconciler) reconcileSubscription(ctx context.Context, step int, p *v1. subName := resources.SequenceSubscriptionName(p.Name, step) sub, err := r.subscriptionLister.Subscriptions(p.Namespace).Get(subName) + ignoreArguments := cmpopts.IgnoreFields(messagingv1.SubscriptionSpec{}, "Subscriber", "Reply", "Delivery") // If the resource doesn't exist, we'll create it. if apierrs.IsNotFound(err) { sub = expected @@ -190,7 +192,7 @@ func (r *Reconciler) reconcileSubscription(ctx context.Context, step int, p *v1. // TODO: Send events here, or elsewhere? //r.Recorder.Eventf(p, corev1.EventTypeWarning, subscriptionCreateFailed, "Create Sequences's subscription failed: %v", err) return nil, fmt.Errorf("failed to get subscription: %s", err) - } else if !equality.Semantic.DeepDerivative(expected.Spec, sub.Spec) { + } else if equal, err := kmp.SafeEqual(sub.Spec, expected.Spec, ignoreArguments); !equal || err != nil { // Given that spec.channel is immutable, we cannot just update the subscription. We delete // it instead, and re-create it. err = r.eventingClientSet.MessagingV1().Subscriptions(sub.Namespace).Delete(ctx, sub.Name, metav1.DeleteOptions{}) @@ -204,6 +206,14 @@ func (r *Reconciler) reconcileSubscription(ctx context.Context, step int, p *v1. return nil, err } return newSub, nil + } else if equal, err := kmp.SafeEqual(sub.Spec, expected.Spec); !equal || err != nil { + // only the mutable fields were changed, so we can update the subscription + updatedSub, err := r.eventingClientSet.MessagingV1().Subscriptions(sub.Namespace).Update(ctx, expected, metav1.UpdateOptions{}) + if err != nil { + logging.FromContext(ctx).Infow("Cannot update subscription", zap.Error(err)) + return nil, err + } + return updatedSub, nil } return sub, nil } From b1b3d0710df0657d68aff84bf59e80e1f8fad581 Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Tue, 28 May 2024 14:12:07 -0400 Subject: [PATCH 2/3] fix(tests): sequence unit tests correctly check for updates to subscriptions Signed-off-by: Calum Murray --- pkg/reconciler/sequence/sequence_test.go | 45 ++++++++++++++++++++---- 1 file changed, 39 insertions(+), 6 deletions(-) diff --git a/pkg/reconciler/sequence/sequence_test.go b/pkg/reconciler/sequence/sequence_test.go index b149a72ea01..ab0fdd84259 100644 --- a/pkg/reconciler/sequence/sequence_test.go +++ b/pkg/reconciler/sequence/sequence_test.go @@ -887,18 +887,15 @@ func TestAllCases(t *testing.T) { WithSequenceSteps([]v1.SequenceStep{{Destination: createDestination(0)}}))), }, WantErr: false, - WantDeletes: []clientgotesting.DeleteActionImpl{{ + WantUpdates: []clientgotesting.UpdateActionImpl{{ ActionImpl: clientgotesting.ActionImpl{ Namespace: testNS, Resource: v1.SchemeGroupVersion.WithResource("subscriptions"), }, - Name: resources.SequenceChannelName(sequenceName, 0), - }}, - WantCreates: []runtime.Object{ - resources.NewSubscription(0, NewSequence(sequenceName, testNS, + Object: resources.NewSubscription(0, NewSequence(sequenceName, testNS, WithSequenceChannelTemplateSpec(imc), WithSequenceSteps([]v1.SequenceStep{{Destination: createDestination(1)}}))), - }, + }}, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewSequence(sequenceName, testNS, WithInitSequenceConditions, @@ -991,6 +988,18 @@ func TestAllCases(t *testing.T) { Name: resources.SequenceChannelName(sequenceName, 2), }, }, + WantUpdates: []clientgotesting.UpdateActionImpl{{ + ActionImpl: clientgotesting.ActionImpl{ + Namespace: testNS, + Resource: v1.SchemeGroupVersion.WithResource("subscriptions"), + }, + Object: resources.NewSubscription(1, NewSequence(sequenceName, testNS, + WithSequenceChannelTemplateSpec(imc), + WithSequenceSteps([]v1.SequenceStep{ + {Destination: createDestination(0)}, + {Destination: createDestination(1)}}, + ))), + }}, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewSequence(sequenceName, testNS, WithInitSequenceConditions, @@ -1120,6 +1129,18 @@ func TestAllCases(t *testing.T) { Name: resources.SequenceChannelName(sequenceName, 2), }, }, + WantUpdates: []clientgotesting.UpdateActionImpl{{ + ActionImpl: clientgotesting.ActionImpl{ + Namespace: testNS, + Resource: v1.SchemeGroupVersion.WithResource("subscriptions"), + }, + Object: resources.NewSubscription(1, NewSequence(sequenceName, testNS, + WithSequenceChannelTemplateSpec(imc), + WithSequenceSteps([]v1.SequenceStep{ + {Destination: createDestination(0)}, + {Destination: createDestination(1)}}, + ))), + }}, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewSequence(sequenceName, testNS, WithInitSequenceConditions, @@ -1233,6 +1254,18 @@ func TestAllCases(t *testing.T) { WantEvents: []string{ Eventf(corev1.EventTypeWarning, "InternalError", "inducing failure for delete inmemorychannels"), }, + WantUpdates: []clientgotesting.UpdateActionImpl{{ + ActionImpl: clientgotesting.ActionImpl{ + Namespace: testNS, + Resource: v1.SchemeGroupVersion.WithResource("subscriptions"), + }, + Object: resources.NewSubscription(1, NewSequence(sequenceName, testNS, + WithSequenceChannelTemplateSpec(imc), + WithSequenceSteps([]v1.SequenceStep{ + {Destination: createDestination(0)}, + {Destination: createDestination(1)}}, + ))), + }}, WantDeletes: []clientgotesting.DeleteActionImpl{ { From 924f7a69daa18ce8f1fe49afaec65ee618c4be9a Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Thu, 30 May 2024 11:47:45 -0400 Subject: [PATCH 3/3] cleanup: use method from api folder to check immutable fields Signed-off-by: Calum Murray --- pkg/reconciler/sequence/sequence.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pkg/reconciler/sequence/sequence.go b/pkg/reconciler/sequence/sequence.go index cdf30881a83..4919ffe3b7d 100644 --- a/pkg/reconciler/sequence/sequence.go +++ b/pkg/reconciler/sequence/sequence.go @@ -20,7 +20,6 @@ import ( "context" "fmt" - "github.com/google/go-cmp/cmp/cmpopts" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" apierrs "k8s.io/apimachinery/pkg/api/errors" @@ -175,7 +174,6 @@ func (r *Reconciler) reconcileSubscription(ctx context.Context, step int, p *v1. subName := resources.SequenceSubscriptionName(p.Name, step) sub, err := r.subscriptionLister.Subscriptions(p.Namespace).Get(subName) - ignoreArguments := cmpopts.IgnoreFields(messagingv1.SubscriptionSpec{}, "Subscriber", "Reply", "Delivery") // If the resource doesn't exist, we'll create it. if apierrs.IsNotFound(err) { sub = expected @@ -192,7 +190,7 @@ func (r *Reconciler) reconcileSubscription(ctx context.Context, step int, p *v1. // TODO: Send events here, or elsewhere? //r.Recorder.Eventf(p, corev1.EventTypeWarning, subscriptionCreateFailed, "Create Sequences's subscription failed: %v", err) return nil, fmt.Errorf("failed to get subscription: %s", err) - } else if equal, err := kmp.SafeEqual(sub.Spec, expected.Spec, ignoreArguments); !equal || err != nil { + } else if immutableFieldsChanged := expected.CheckImmutableFields(ctx, sub); immutableFieldsChanged != nil { // Given that spec.channel is immutable, we cannot just update the subscription. We delete // it instead, and re-create it. err = r.eventingClientSet.MessagingV1().Subscriptions(sub.Namespace).Delete(ctx, sub.Name, metav1.DeleteOptions{})