diff --git a/pkg/reconciler/sequence/controller.go b/pkg/reconciler/sequence/controller.go index 2fa359a4ddb..882199a6e86 100644 --- a/pkg/reconciler/sequence/controller.go +++ b/pkg/reconciler/sequence/controller.go @@ -20,11 +20,16 @@ import ( "context" "k8s.io/client-go/tools/cache" + "knative.dev/pkg/configmap" + "knative.dev/pkg/controller" + "knative.dev/pkg/logging" + + "knative.dev/eventing/pkg/apis/feature" v1 "knative.dev/eventing/pkg/apis/flows/v1" "knative.dev/eventing/pkg/auth" "knative.dev/eventing/pkg/duck" - "knative.dev/pkg/configmap" - "knative.dev/pkg/controller" + + "knative.dev/pkg/injection/clients/dynamicclient" flowsv1 "knative.dev/eventing/pkg/apis/flows/v1" eventingclient "knative.dev/eventing/pkg/client/injection/client" @@ -33,7 +38,6 @@ import ( "knative.dev/eventing/pkg/client/injection/informers/flows/v1/sequence" "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription" sequencereconciler "knative.dev/eventing/pkg/client/injection/reconciler/flows/v1/sequence" - "knative.dev/pkg/injection/clients/dynamicclient" ) // NewController initializes the controller and is called by the generated code @@ -47,6 +51,14 @@ func NewController( subscriptionInformer := subscription.Get(ctx) eventPolicyInformer := eventpolicy.Get(ctx) + var globalResync func() + store := feature.NewStore(logging.FromContext(ctx), func(name string, value interface{}) { + if globalResync != nil { + globalResync() + } + }) + store.WatchConfigs(cmw) + r := &Reconciler{ sequenceLister: sequenceInformer.Lister(), subscriptionLister: subscriptionInformer.Lister(), @@ -54,7 +66,15 @@ func NewController( eventingClientSet: eventingclient.Get(ctx), eventPolicyLister: eventPolicyInformer.Lister(), } - impl := sequencereconciler.NewImpl(ctx, r) + impl := sequencereconciler.NewImpl(ctx, r, func(impl *controller.Impl) controller.Options { + return controller.Options{ + ConfigStore: store, + } + }) + + globalResync = func() { + impl.GlobalResync(sequenceInformer.Informer()) + } r.channelableTracker = duck.NewListableTrackerFromTracker(ctx, channelable.Get, impl.Tracker) sequenceInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue)) diff --git a/pkg/reconciler/sequence/sequence.go b/pkg/reconciler/sequence/sequence.go index 2bedb0510ca..cbb51035197 100644 --- a/pkg/reconciler/sequence/sequence.go +++ b/pkg/reconciler/sequence/sequence.go @@ -34,6 +34,8 @@ import ( "knative.dev/pkg/logging" pkgreconciler "knative.dev/pkg/reconciler" + "knative.dev/pkg/kmp" + eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" "knative.dev/eventing/pkg/apis/feature" v1 "knative.dev/eventing/pkg/apis/flows/v1" @@ -45,7 +47,6 @@ import ( listers "knative.dev/eventing/pkg/client/listers/flows/v1" messaginglisters "knative.dev/eventing/pkg/client/listers/messaging/v1" "knative.dev/eventing/pkg/duck" - "knative.dev/pkg/kmp" "knative.dev/eventing/pkg/reconciler/sequence/resources" ) @@ -217,6 +218,7 @@ func (r *Reconciler) reconcileSubscription(ctx context.Context, step int, p *v1. } return newSub, nil } else if equal, err := kmp.SafeEqual(sub.Spec, expected.Spec); !equal || err != nil { + expected.ResourceVersion = sub.ResourceVersion // only the mutable fields were changed, so we can update the subscription updatedSub, err := r.eventingClientSet.MessagingV1().Subscriptions(sub.Namespace).Update(ctx, expected, metav1.UpdateOptions{}) if err != nil {