diff --git a/pkg/reconciler/sequence/sequence.go b/pkg/reconciler/sequence/sequence.go index 7a7d5fdbf7a..c95a75c6788 100644 --- a/pkg/reconciler/sequence/sequence.go +++ b/pkg/reconciler/sequence/sequence.go @@ -22,7 +22,6 @@ import ( "go.uber.org/zap" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/equality" apierrs "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -46,6 +45,7 @@ import ( listers "knative.dev/eventing/pkg/client/listers/flows/v1" messaginglisters "knative.dev/eventing/pkg/client/listers/messaging/v1" "knative.dev/eventing/pkg/duck" + "knative.dev/pkg/kmp" corev1listers "k8s.io/client-go/listers/core/v1" "knative.dev/eventing/pkg/reconciler/sequence/resources" @@ -204,7 +204,7 @@ func (r *Reconciler) reconcileSubscription(ctx context.Context, step int, p *v1. // TODO: Send events here, or elsewhere? //r.Recorder.Eventf(p, corev1.EventTypeWarning, subscriptionCreateFailed, "Create Sequences's subscription failed: %v", err) return nil, fmt.Errorf("failed to get subscription: %s", err) - } else if !equality.Semantic.DeepDerivative(expected.Spec, sub.Spec) { + } else if immutableFieldsChanged := expected.CheckImmutableFields(ctx, sub); immutableFieldsChanged != nil { // Given that spec.channel is immutable, we cannot just update the subscription. We delete // it instead, and re-create it. err = r.eventingClientSet.MessagingV1().Subscriptions(sub.Namespace).Delete(ctx, sub.Name, metav1.DeleteOptions{}) @@ -218,6 +218,14 @@ func (r *Reconciler) reconcileSubscription(ctx context.Context, step int, p *v1. return nil, err } return newSub, nil + } else if equal, err := kmp.SafeEqual(sub.Spec, expected.Spec); !equal || err != nil { + // only the mutable fields were changed, so we can update the subscription + updatedSub, err := r.eventingClientSet.MessagingV1().Subscriptions(sub.Namespace).Update(ctx, expected, metav1.UpdateOptions{}) + if err != nil { + logging.FromContext(ctx).Infow("Cannot update subscription", zap.Error(err)) + return nil, err + } + return updatedSub, nil } return sub, nil } diff --git a/pkg/reconciler/sequence/sequence_test.go b/pkg/reconciler/sequence/sequence_test.go index 0e4f3aa0ee3..e50cf64a5dd 100644 --- a/pkg/reconciler/sequence/sequence_test.go +++ b/pkg/reconciler/sequence/sequence_test.go @@ -896,18 +896,15 @@ func TestAllCases(t *testing.T) { WithSequenceSteps([]v1.SequenceStep{{Destination: createDestination(0)}}))), }, WantErr: false, - WantDeletes: []clientgotesting.DeleteActionImpl{{ + WantUpdates: []clientgotesting.UpdateActionImpl{{ ActionImpl: clientgotesting.ActionImpl{ Namespace: testNS, Resource: v1.SchemeGroupVersion.WithResource("subscriptions"), }, - Name: resources.SequenceChannelName(sequenceName, 0), - }}, - WantCreates: []runtime.Object{ - resources.NewSubscription(0, NewSequence(sequenceName, testNS, + Object: resources.NewSubscription(0, NewSequence(sequenceName, testNS, WithSequenceChannelTemplateSpec(imc), WithSequenceSteps([]v1.SequenceStep{{Destination: createDestination(1)}}))), - }, + }}, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewSequence(sequenceName, testNS, WithInitSequenceConditions, @@ -1001,6 +998,18 @@ func TestAllCases(t *testing.T) { Name: resources.SequenceChannelName(sequenceName, 2), }, }, + WantUpdates: []clientgotesting.UpdateActionImpl{{ + ActionImpl: clientgotesting.ActionImpl{ + Namespace: testNS, + Resource: v1.SchemeGroupVersion.WithResource("subscriptions"), + }, + Object: resources.NewSubscription(1, NewSequence(sequenceName, testNS, + WithSequenceChannelTemplateSpec(imc), + WithSequenceSteps([]v1.SequenceStep{ + {Destination: createDestination(0)}, + {Destination: createDestination(1)}}, + ))), + }}, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewSequence(sequenceName, testNS, WithInitSequenceConditions, @@ -1131,6 +1140,18 @@ func TestAllCases(t *testing.T) { Name: resources.SequenceChannelName(sequenceName, 2), }, }, + WantUpdates: []clientgotesting.UpdateActionImpl{{ + ActionImpl: clientgotesting.ActionImpl{ + Namespace: testNS, + Resource: v1.SchemeGroupVersion.WithResource("subscriptions"), + }, + Object: resources.NewSubscription(1, NewSequence(sequenceName, testNS, + WithSequenceChannelTemplateSpec(imc), + WithSequenceSteps([]v1.SequenceStep{ + {Destination: createDestination(0)}, + {Destination: createDestination(1)}}, + ))), + }}, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewSequence(sequenceName, testNS, WithInitSequenceConditions, @@ -1245,6 +1266,18 @@ func TestAllCases(t *testing.T) { WantEvents: []string{ Eventf(corev1.EventTypeWarning, "InternalError", "inducing failure for delete inmemorychannels"), }, + WantUpdates: []clientgotesting.UpdateActionImpl{{ + ActionImpl: clientgotesting.ActionImpl{ + Namespace: testNS, + Resource: v1.SchemeGroupVersion.WithResource("subscriptions"), + }, + Object: resources.NewSubscription(1, NewSequence(sequenceName, testNS, + WithSequenceChannelTemplateSpec(imc), + WithSequenceSteps([]v1.SequenceStep{ + {Destination: createDestination(0)}, + {Destination: createDestination(1)}}, + ))), + }}, WantDeletes: []clientgotesting.DeleteActionImpl{ {