From 3ee2400189c16b4941242857ffc926dc1d5230f3 Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Mon, 3 Jun 2024 12:30:18 -0400 Subject: [PATCH] fix: sequence updates subscription where possible, instead of recreating it (#7948) * feat: sequences only recreate subscriptions when the changes are to immutable fields Signed-off-by: Calum Murray * fix(tests): sequence unit tests correctly check for updates to subscriptions Signed-off-by: Calum Murray * cleanup: use method from api folder to check immutable fields Signed-off-by: Calum Murray --------- Signed-off-by: Calum Murray --- pkg/reconciler/sequence/sequence.go | 12 +++++-- pkg/reconciler/sequence/sequence_test.go | 45 ++++++++++++++++++++---- 2 files changed, 49 insertions(+), 8 deletions(-) diff --git a/pkg/reconciler/sequence/sequence.go b/pkg/reconciler/sequence/sequence.go index d37dd3b793a..4919ffe3b7d 100644 --- a/pkg/reconciler/sequence/sequence.go +++ b/pkg/reconciler/sequence/sequence.go @@ -22,7 +22,6 @@ import ( "go.uber.org/zap" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/equality" apierrs "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -43,6 +42,7 @@ import ( listers "knative.dev/eventing/pkg/client/listers/flows/v1" messaginglisters "knative.dev/eventing/pkg/client/listers/messaging/v1" "knative.dev/eventing/pkg/duck" + "knative.dev/pkg/kmp" "knative.dev/eventing/pkg/reconciler/sequence/resources" ) @@ -190,7 +190,7 @@ func (r *Reconciler) reconcileSubscription(ctx context.Context, step int, p *v1. // TODO: Send events here, or elsewhere? //r.Recorder.Eventf(p, corev1.EventTypeWarning, subscriptionCreateFailed, "Create Sequences's subscription failed: %v", err) return nil, fmt.Errorf("failed to get subscription: %s", err) - } else if !equality.Semantic.DeepDerivative(expected.Spec, sub.Spec) { + } else if immutableFieldsChanged := expected.CheckImmutableFields(ctx, sub); immutableFieldsChanged != nil { // Given that spec.channel is immutable, we cannot just update the subscription. We delete // it instead, and re-create it. err = r.eventingClientSet.MessagingV1().Subscriptions(sub.Namespace).Delete(ctx, sub.Name, metav1.DeleteOptions{}) @@ -204,6 +204,14 @@ func (r *Reconciler) reconcileSubscription(ctx context.Context, step int, p *v1. return nil, err } return newSub, nil + } else if equal, err := kmp.SafeEqual(sub.Spec, expected.Spec); !equal || err != nil { + // only the mutable fields were changed, so we can update the subscription + updatedSub, err := r.eventingClientSet.MessagingV1().Subscriptions(sub.Namespace).Update(ctx, expected, metav1.UpdateOptions{}) + if err != nil { + logging.FromContext(ctx).Infow("Cannot update subscription", zap.Error(err)) + return nil, err + } + return updatedSub, nil } return sub, nil } diff --git a/pkg/reconciler/sequence/sequence_test.go b/pkg/reconciler/sequence/sequence_test.go index b149a72ea01..ab0fdd84259 100644 --- a/pkg/reconciler/sequence/sequence_test.go +++ b/pkg/reconciler/sequence/sequence_test.go @@ -887,18 +887,15 @@ func TestAllCases(t *testing.T) { WithSequenceSteps([]v1.SequenceStep{{Destination: createDestination(0)}}))), }, WantErr: false, - WantDeletes: []clientgotesting.DeleteActionImpl{{ + WantUpdates: []clientgotesting.UpdateActionImpl{{ ActionImpl: clientgotesting.ActionImpl{ Namespace: testNS, Resource: v1.SchemeGroupVersion.WithResource("subscriptions"), }, - Name: resources.SequenceChannelName(sequenceName, 0), - }}, - WantCreates: []runtime.Object{ - resources.NewSubscription(0, NewSequence(sequenceName, testNS, + Object: resources.NewSubscription(0, NewSequence(sequenceName, testNS, WithSequenceChannelTemplateSpec(imc), WithSequenceSteps([]v1.SequenceStep{{Destination: createDestination(1)}}))), - }, + }}, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewSequence(sequenceName, testNS, WithInitSequenceConditions, @@ -991,6 +988,18 @@ func TestAllCases(t *testing.T) { Name: resources.SequenceChannelName(sequenceName, 2), }, }, + WantUpdates: []clientgotesting.UpdateActionImpl{{ + ActionImpl: clientgotesting.ActionImpl{ + Namespace: testNS, + Resource: v1.SchemeGroupVersion.WithResource("subscriptions"), + }, + Object: resources.NewSubscription(1, NewSequence(sequenceName, testNS, + WithSequenceChannelTemplateSpec(imc), + WithSequenceSteps([]v1.SequenceStep{ + {Destination: createDestination(0)}, + {Destination: createDestination(1)}}, + ))), + }}, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewSequence(sequenceName, testNS, WithInitSequenceConditions, @@ -1120,6 +1129,18 @@ func TestAllCases(t *testing.T) { Name: resources.SequenceChannelName(sequenceName, 2), }, }, + WantUpdates: []clientgotesting.UpdateActionImpl{{ + ActionImpl: clientgotesting.ActionImpl{ + Namespace: testNS, + Resource: v1.SchemeGroupVersion.WithResource("subscriptions"), + }, + Object: resources.NewSubscription(1, NewSequence(sequenceName, testNS, + WithSequenceChannelTemplateSpec(imc), + WithSequenceSteps([]v1.SequenceStep{ + {Destination: createDestination(0)}, + {Destination: createDestination(1)}}, + ))), + }}, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewSequence(sequenceName, testNS, WithInitSequenceConditions, @@ -1233,6 +1254,18 @@ func TestAllCases(t *testing.T) { WantEvents: []string{ Eventf(corev1.EventTypeWarning, "InternalError", "inducing failure for delete inmemorychannels"), }, + WantUpdates: []clientgotesting.UpdateActionImpl{{ + ActionImpl: clientgotesting.ActionImpl{ + Namespace: testNS, + Resource: v1.SchemeGroupVersion.WithResource("subscriptions"), + }, + Object: resources.NewSubscription(1, NewSequence(sequenceName, testNS, + WithSequenceChannelTemplateSpec(imc), + WithSequenceSteps([]v1.SequenceStep{ + {Destination: createDestination(0)}, + {Destination: createDestination(1)}}, + ))), + }}, WantDeletes: []clientgotesting.DeleteActionImpl{ {