Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: sequence updates subscription where possible, instead of recreating it #7948

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions pkg/reconciler/sequence/sequence.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import (
"context"
"fmt"

"github.com/google/go-cmp/cmp/cmpopts"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrs "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -43,6 +43,7 @@ import (
listers "knative.dev/eventing/pkg/client/listers/flows/v1"
messaginglisters "knative.dev/eventing/pkg/client/listers/messaging/v1"
"knative.dev/eventing/pkg/duck"
"knative.dev/pkg/kmp"

"knative.dev/eventing/pkg/reconciler/sequence/resources"
)
Expand Down Expand Up @@ -174,6 +175,7 @@ func (r *Reconciler) reconcileSubscription(ctx context.Context, step int, p *v1.
subName := resources.SequenceSubscriptionName(p.Name, step)
sub, err := r.subscriptionLister.Subscriptions(p.Namespace).Get(subName)

ignoreArguments := cmpopts.IgnoreFields(messagingv1.SubscriptionSpec{}, "Subscriber", "Reply", "Delivery")
// If the resource doesn't exist, we'll create it.
if apierrs.IsNotFound(err) {
sub = expected
Expand All @@ -190,7 +192,7 @@ func (r *Reconciler) reconcileSubscription(ctx context.Context, step int, p *v1.
// TODO: Send events here, or elsewhere?
//r.Recorder.Eventf(p, corev1.EventTypeWarning, subscriptionCreateFailed, "Create Sequences's subscription failed: %v", err)
return nil, fmt.Errorf("failed to get subscription: %s", err)
} else if !equality.Semantic.DeepDerivative(expected.Spec, sub.Spec) {
} else if equal, err := kmp.SafeEqual(sub.Spec, expected.Spec, ignoreArguments); !equal || err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As only .spec.channel is relevant here, we could also compare only these fields (instead of using ignoreArguments). So this might be a bit more self speaking directly 🤷

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is spec.channel the only immutable field? I just took this check from the validation webhook to make sure that the change is allowable. Maybe we cold just refactor that into its own function and call it here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just had it from the comment Given that spec.channel is immutable.... Also in the SubscriptionSpec this is the only one marked as immutable:

type SubscriptionSpec struct {
// Reference to a channel that will be used to create the subscription
// You can specify only the following fields of the KReference:
// - Kind
// - APIVersion
// - Name
// - Namespace
// The resource pointed by this KReference must meet the
// contract to the ChannelableSpec duck type. If the resource does not
// meet this contract it will be reflected in the Subscription's status.
//
// This field is immutable. We have no good answer on what happens to
// the events that are currently in the channel being consumed from
// and what the semantics there should be. For now, you can always
// delete the Subscription and recreate it to point to a different
// channel, giving the user more control over what semantics should
// be used (drain the channel first, possibly have events dropped,
// etc.)
Channel duckv1.KReference `json:"channel"`
// Subscriber is reference to function for processing events.
// Events from the Channel will be delivered here and replies are
// sent to a Destination as specified by the Reply.
Subscriber *duckv1.Destination `json:"subscriber,omitempty"`
// Reply specifies (optionally) how to handle events returned from
// the Subscriber target.
// +optional
Reply *duckv1.Destination `json:"reply,omitempty"`
// Delivery configuration
// +optional
Delivery *eventingduckv1.DeliverySpec `json:"delivery,omitempty"`
}

But aligning with the webhook code (via function call) makes sense to me

// Given that spec.channel is immutable, we cannot just update the subscription. We delete
// it instead, and re-create it.
err = r.eventingClientSet.MessagingV1().Subscriptions(sub.Namespace).Delete(ctx, sub.Name, metav1.DeleteOptions{})
Expand All @@ -204,6 +206,14 @@ func (r *Reconciler) reconcileSubscription(ctx context.Context, step int, p *v1.
return nil, err
}
return newSub, nil
} else if equal, err := kmp.SafeEqual(sub.Spec, expected.Spec); !equal || err != nil {
// only the mutable fields were changed, so we can update the subscription
updatedSub, err := r.eventingClientSet.MessagingV1().Subscriptions(sub.Namespace).Update(ctx, expected, metav1.UpdateOptions{})
if err != nil {
logging.FromContext(ctx).Infow("Cannot update subscription", zap.Error(err))
return nil, err
}
return updatedSub, nil
}
return sub, nil
}
Expand Down
45 changes: 39 additions & 6 deletions pkg/reconciler/sequence/sequence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -887,18 +887,15 @@ func TestAllCases(t *testing.T) {
WithSequenceSteps([]v1.SequenceStep{{Destination: createDestination(0)}}))),
},
WantErr: false,
WantDeletes: []clientgotesting.DeleteActionImpl{{
WantUpdates: []clientgotesting.UpdateActionImpl{{
ActionImpl: clientgotesting.ActionImpl{
Namespace: testNS,
Resource: v1.SchemeGroupVersion.WithResource("subscriptions"),
},
Name: resources.SequenceChannelName(sequenceName, 0),
}},
WantCreates: []runtime.Object{
resources.NewSubscription(0, NewSequence(sequenceName, testNS,
Object: resources.NewSubscription(0, NewSequence(sequenceName, testNS,
WithSequenceChannelTemplateSpec(imc),
WithSequenceSteps([]v1.SequenceStep{{Destination: createDestination(1)}}))),
},
}},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: NewSequence(sequenceName, testNS,
WithInitSequenceConditions,
Expand Down Expand Up @@ -991,6 +988,18 @@ func TestAllCases(t *testing.T) {
Name: resources.SequenceChannelName(sequenceName, 2),
},
},
WantUpdates: []clientgotesting.UpdateActionImpl{{
ActionImpl: clientgotesting.ActionImpl{
Namespace: testNS,
Resource: v1.SchemeGroupVersion.WithResource("subscriptions"),
},
Object: resources.NewSubscription(1, NewSequence(sequenceName, testNS,
WithSequenceChannelTemplateSpec(imc),
WithSequenceSteps([]v1.SequenceStep{
{Destination: createDestination(0)},
{Destination: createDestination(1)}},
))),
}},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: NewSequence(sequenceName, testNS,
WithInitSequenceConditions,
Expand Down Expand Up @@ -1120,6 +1129,18 @@ func TestAllCases(t *testing.T) {
Name: resources.SequenceChannelName(sequenceName, 2),
},
},
WantUpdates: []clientgotesting.UpdateActionImpl{{
ActionImpl: clientgotesting.ActionImpl{
Namespace: testNS,
Resource: v1.SchemeGroupVersion.WithResource("subscriptions"),
},
Object: resources.NewSubscription(1, NewSequence(sequenceName, testNS,
WithSequenceChannelTemplateSpec(imc),
WithSequenceSteps([]v1.SequenceStep{
{Destination: createDestination(0)},
{Destination: createDestination(1)}},
))),
}},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: NewSequence(sequenceName, testNS,
WithInitSequenceConditions,
Expand Down Expand Up @@ -1233,6 +1254,18 @@ func TestAllCases(t *testing.T) {
WantEvents: []string{
Eventf(corev1.EventTypeWarning, "InternalError", "inducing failure for delete inmemorychannels"),
},
WantUpdates: []clientgotesting.UpdateActionImpl{{
ActionImpl: clientgotesting.ActionImpl{
Namespace: testNS,
Resource: v1.SchemeGroupVersion.WithResource("subscriptions"),
},
Object: resources.NewSubscription(1, NewSequence(sequenceName, testNS,
WithSequenceChannelTemplateSpec(imc),
WithSequenceSteps([]v1.SequenceStep{
{Destination: createDestination(0)},
{Destination: createDestination(1)}},
))),
}},

WantDeletes: []clientgotesting.DeleteActionImpl{
{
Expand Down
Loading