From 1ee3b82d999eeb02e5def89c65e80e817e40e774 Mon Sep 17 00:00:00 2001 From: 7h3-3mp7y-m4n Date: Wed, 5 Jun 2024 23:45:42 +0530 Subject: [PATCH 1/2] Fix: Parallels updates subscription where possible, instead of recreating it --- pkg/reconciler/parallel/parallel.go | 13 +++++++++++-- pkg/reconciler/parallel/parallel_test.go | 21 ++++++++++++--------- 2 files changed, 23 insertions(+), 11 deletions(-) diff --git a/pkg/reconciler/parallel/parallel.go b/pkg/reconciler/parallel/parallel.go index 26f3521b03c..f7b94764451 100644 --- a/pkg/reconciler/parallel/parallel.go +++ b/pkg/reconciler/parallel/parallel.go @@ -22,7 +22,8 @@ import ( "go.uber.org/zap" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/equality" + + //"k8s.io/apimachinery/pkg/api/equality" apierrs "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -45,6 +46,7 @@ import ( messaginglisters "knative.dev/eventing/pkg/client/listers/messaging/v1" ducklib "knative.dev/eventing/pkg/duck" "knative.dev/eventing/pkg/reconciler/parallel/resources" + "knative.dev/pkg/kmp" ) type Reconciler struct { @@ -216,7 +218,7 @@ func (r *Reconciler) reconcileSubscription(ctx context.Context, branchNumber int // TODO: Send events here, or elsewhere? //r.Recorder.Eventf(p, corev1.EventTypeWarning, subscriptionCreateFailed, "Create Parallels's subscription failed: %v", err) return nil, fmt.Errorf("failed to get Subscription: %s", err) - } else if !equality.Semantic.DeepDerivative(expected.Spec, sub.Spec) { + } else if immutableFieldsChanged := expected.CheckImmutableFields(ctx, sub); immutableFieldsChanged != nil { // Given that spec.channel is immutable, we cannot just update the subscription. We delete // it instead, and re-create it. err = r.eventingClientSet.MessagingV1().Subscriptions(sub.Namespace).Delete(ctx, sub.Name, metav1.DeleteOptions{}) @@ -230,6 +232,13 @@ func (r *Reconciler) reconcileSubscription(ctx context.Context, branchNumber int return nil, err } return newSub, nil + } else if equal, err := kmp.SafeEqual(sub.Spec, expected.Spec); !equal || err != nil { + updatedSub, err := r.eventingClientSet.MessagingV1().Subscriptions(sub.Namespace).Update(ctx, expected, metav1.UpdateOptions{}) + if err != nil { + logging.FromContext(ctx).Infow("Cannot update subscription", zap.Error(err)) + return nil, err + } + return updatedSub, nil } return sub, nil } diff --git a/pkg/reconciler/parallel/parallel_test.go b/pkg/reconciler/parallel/parallel_test.go index 54dbec0d76f..8a3dbee44b8 100644 --- a/pkg/reconciler/parallel/parallel_test.go +++ b/pkg/reconciler/parallel/parallel_test.go @@ -427,22 +427,25 @@ func TestAllBranches(t *testing.T) { {Subscriber: createSubscriber(0)}, })))}, WantErr: false, - WantDeletes: []clientgotesting.DeleteActionImpl{{ - ActionImpl: clientgotesting.ActionImpl{ - Namespace: testNS, - Resource: v1.SchemeGroupVersion.WithResource("subscriptions"), + WantUpdates: []clientgotesting.UpdateActionImpl{ + { + ActionImpl: clientgotesting.ActionImpl{ + Namespace: testNS, + Resource: v1.SchemeGroupVersion.WithResource("subscriptions"), + }, + Object: resources.NewSubscription(0, NewFlowsParallel(parallelName, testNS, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ + {Subscriber: createSubscriber(1)}, + }))), }, - Name: resources.ParallelBranchChannelName(parallelName, 0), - }}, + }, WantCreates: []runtime.Object{ createChannel(parallelName), createBranchChannel(parallelName, 0), resources.NewFilterSubscription(0, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ {Subscriber: createSubscriber(1)}, }))), - resources.NewSubscription(0, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ - {Subscriber: createSubscriber(1)}, - }))), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewFlowsParallel(parallelName, testNS, From 5553cf3f91fc52ffebaf53b03b0b6d1ede072ec1 Mon Sep 17 00:00:00 2001 From: 7h3-3mp7y-m4n Date: Mon, 10 Jun 2024 16:23:07 +0530 Subject: [PATCH 2/2] Fix: removing not imported pkg --- pkg/reconciler/parallel/parallel.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/reconciler/parallel/parallel.go b/pkg/reconciler/parallel/parallel.go index f7b94764451..1e6d674a6a2 100644 --- a/pkg/reconciler/parallel/parallel.go +++ b/pkg/reconciler/parallel/parallel.go @@ -22,8 +22,6 @@ import ( "go.uber.org/zap" corev1 "k8s.io/api/core/v1" - - //"k8s.io/apimachinery/pkg/api/equality" apierrs "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"