From e73f2c91d08a982b43adf67c6644bcd89ac1b071 Mon Sep 17 00:00:00 2001 From: Rahul Sawra Date: Fri, 12 Jul 2024 19:03:27 +0530 Subject: [PATCH] List applying policies in Parallel (#8083) * lifecycle changes for evebtpolicies in parallel Signed-off-by: rahulii * controller changes for eventpolicies and add tests Signed-off-by: rahulii * fix review comments Signed-off-by: rahulii * fix to review comments Signed-off-by: rahulii * add featurestore to watch configmaps Signed-off-by: rahulii * add featurestore to reconciler impl Signed-off-by: rahulii --------- Signed-off-by: rahulii --- pkg/apis/flows/v1/parallel_lifecycle.go | 26 ++- pkg/apis/flows/v1/parallel_lifecycle_test.go | 166 +++++++++++-------- pkg/reconciler/parallel/controller.go | 30 +++- pkg/reconciler/parallel/controller_test.go | 11 +- pkg/reconciler/parallel/parallel.go | 12 ++ pkg/reconciler/parallel/parallel_test.go | 158 +++++++++++++++++- pkg/reconciler/testing/v1/parallel.go | 38 +++++ 7 files changed, 368 insertions(+), 73 deletions(-) diff --git a/pkg/apis/flows/v1/parallel_lifecycle.go b/pkg/apis/flows/v1/parallel_lifecycle.go index 99c283108a2..e8cbb3da132 100644 --- a/pkg/apis/flows/v1/parallel_lifecycle.go +++ b/pkg/apis/flows/v1/parallel_lifecycle.go @@ -25,7 +25,7 @@ import ( pkgduckv1 "knative.dev/pkg/apis/duck/v1" ) -var pCondSet = apis.NewLivingConditionSet(ParallelConditionReady, ParallelConditionChannelsReady, ParallelConditionSubscriptionsReady, ParallelConditionAddressable) +var pCondSet = apis.NewLivingConditionSet(ParallelConditionReady, ParallelConditionChannelsReady, ParallelConditionSubscriptionsReady, ParallelConditionAddressable, ParallelConditionEventPoliciesReady) const ( // ParallelConditionReady has status True when all subconditions below have been set to True. @@ -42,6 +42,10 @@ const ( // ParallelConditionAddressable has status true when this Parallel meets // the Addressable contract and has a non-empty hostname. ParallelConditionAddressable apis.ConditionType = "Addressable" + + // ParallelConditionEventPoliciesReady has status True when applying EventPolicies for this + // Parallel are ready or if there are no EventPolicies. + ParallelConditionEventPoliciesReady apis.ConditionType = "EventPoliciesReady" ) // GetConditionSet retrieves the condition set for this resource. Implements the KRShaped interface. @@ -217,3 +221,23 @@ func (ps *ParallelStatus) setAddress(address *pkgduckv1.Addressable) { pCondSet.Manage(ps).MarkTrue(ParallelConditionAddressable) } } + +// MarkEventPoliciesFailed marks the ParallelConditionEventPoliciesReady as False with the given reason and message. +func (ps *ParallelStatus) MarkEventPoliciesFailed(reason, messageFormat string, messageA ...interface{}) { + pCondSet.Manage(ps).MarkFalse(ParallelConditionEventPoliciesReady, reason, messageFormat, messageA...) +} + +// MarkEventPoliciesUnknown marks the ParallelConditionEventPoliciesReady as Unknown with the given reason and message. +func (ps *ParallelStatus) MarkEventPoliciesUnknown(reason, messageFormat string, messageA ...interface{}) { + pCondSet.Manage(ps).MarkUnknown(ParallelConditionEventPoliciesReady, reason, messageFormat, messageA...) +} + +// MarkEventPoliciesTrue marks the ParallelConditionEventPoliciesReady as True. +func (ps *ParallelStatus) MarkEventPoliciesTrue() { + pCondSet.Manage(ps).MarkTrue(ParallelConditionEventPoliciesReady) +} + +// MarkEventPoliciesTrueWithReason marks the ParallelConditionEventPoliciesReady as True with the given reason and message. +func (ps *ParallelStatus) MarkEventPoliciesTrueWithReason(reason, messageFormat string, messageA ...interface{}) { + pCondSet.Manage(ps).MarkTrueWithReason(ParallelConditionEventPoliciesReady, reason, messageFormat, messageA...) +} diff --git a/pkg/apis/flows/v1/parallel_lifecycle_test.go b/pkg/apis/flows/v1/parallel_lifecycle_test.go index cb9d2957503..351e8b1a0d5 100644 --- a/pkg/apis/flows/v1/parallel_lifecycle_test.go +++ b/pkg/apis/flows/v1/parallel_lifecycle_test.go @@ -90,6 +90,9 @@ func TestParallelInitializeConditions(t *testing.T) { }, { Type: ParallelConditionChannelsReady, Status: corev1.ConditionUnknown, + }, { + Type: ParallelConditionEventPoliciesReady, + Status: corev1.ConditionUnknown, }, { Type: ParallelConditionReady, Status: corev1.ConditionUnknown, @@ -117,6 +120,9 @@ func TestParallelInitializeConditions(t *testing.T) { }, { Type: ParallelConditionChannelsReady, Status: corev1.ConditionFalse, + }, { + Type: ParallelConditionEventPoliciesReady, + Status: corev1.ConditionUnknown, }, { Type: ParallelConditionReady, Status: corev1.ConditionUnknown, @@ -144,6 +150,9 @@ func TestParallelInitializeConditions(t *testing.T) { }, { Type: ParallelConditionChannelsReady, Status: corev1.ConditionUnknown, + }, { + Type: ParallelConditionEventPoliciesReady, + Status: corev1.ConditionUnknown, }, { Type: ParallelConditionReady, Status: corev1.ConditionUnknown, @@ -435,82 +444,101 @@ func TestParallelPropagateSubscriptionStatusUpdated(t *testing.T) { func TestParallelReady(t *testing.T) { tests := []struct { - name string - fsubs []*messagingv1.Subscription - subs []*messagingv1.Subscription - ichannel *eventingduckv1.Channelable - channels []*eventingduckv1.Channelable - want bool + name string + fsubs []*messagingv1.Subscription + subs []*messagingv1.Subscription + ichannel *eventingduckv1.Channelable + channels []*eventingduckv1.Channelable + eventPoliciesReady bool + want bool }{{ - name: "ingress false, empty", - fsubs: []*messagingv1.Subscription{}, - subs: []*messagingv1.Subscription{}, - ichannel: getChannelable(false), - channels: []*eventingduckv1.Channelable{}, - want: false, + name: "ingress false, empty", + fsubs: []*messagingv1.Subscription{}, + subs: []*messagingv1.Subscription{}, + ichannel: getChannelable(false), + channels: []*eventingduckv1.Channelable{}, + eventPoliciesReady: true, + want: false, }, { - name: "ingress true, empty", - fsubs: []*messagingv1.Subscription{}, - subs: []*messagingv1.Subscription{}, - ichannel: getChannelable(true), - channels: []*eventingduckv1.Channelable{}, - want: false, + name: "ingress true, empty", + fsubs: []*messagingv1.Subscription{}, + subs: []*messagingv1.Subscription{}, + ichannel: getChannelable(true), + channels: []*eventingduckv1.Channelable{}, + eventPoliciesReady: true, + want: false, }, { - name: "ingress true, one channelable not ready, one subscription ready", - ichannel: getChannelable(true), - channels: []*eventingduckv1.Channelable{getChannelable(false)}, - fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true)}, - subs: []*messagingv1.Subscription{getSubscription("sub0", true)}, - want: false, + name: "ingress true, one channelable not ready, one subscription ready", + ichannel: getChannelable(true), + channels: []*eventingduckv1.Channelable{getChannelable(false)}, + fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true)}, + subs: []*messagingv1.Subscription{getSubscription("sub0", true)}, + eventPoliciesReady: true, + want: false, }, { - name: "ingress true, one channelable ready, one subscription not ready", - ichannel: getChannelable(true), - channels: []*eventingduckv1.Channelable{getChannelable(true)}, - fsubs: []*messagingv1.Subscription{getSubscription("fsub0", false)}, - subs: []*messagingv1.Subscription{getSubscription("sub0", false)}, - want: false, + name: "ingress true, one channelable ready, one subscription not ready", + ichannel: getChannelable(true), + channels: []*eventingduckv1.Channelable{getChannelable(true)}, + fsubs: []*messagingv1.Subscription{getSubscription("fsub0", false)}, + subs: []*messagingv1.Subscription{getSubscription("sub0", false)}, + eventPoliciesReady: true, + want: false, }, { - name: "ingress false, one channelable ready, one subscription ready", - ichannel: getChannelable(false), - channels: []*eventingduckv1.Channelable{getChannelable(true)}, - fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true)}, - subs: []*messagingv1.Subscription{getSubscription("sub0", true)}, - want: false, + name: "ingress false, one channelable ready, one subscription ready", + ichannel: getChannelable(false), + channels: []*eventingduckv1.Channelable{getChannelable(true)}, + fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true)}, + subs: []*messagingv1.Subscription{getSubscription("sub0", true)}, + eventPoliciesReady: true, + want: false, }, { - name: "ingress true, one channelable ready, one subscription ready", - ichannel: getChannelable(true), - channels: []*eventingduckv1.Channelable{getChannelable(true)}, - fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true)}, - subs: []*messagingv1.Subscription{getSubscription("sub0", true)}, - want: true, + name: "ingress true, one channelable ready, one subscription ready", + ichannel: getChannelable(true), + channels: []*eventingduckv1.Channelable{getChannelable(true)}, + fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true)}, + subs: []*messagingv1.Subscription{getSubscription("sub0", true)}, + eventPoliciesReady: true, + want: true, }, { - name: "ingress true, one channelable ready, one not, two subscriptions ready", - ichannel: getChannelable(true), - channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(false)}, - fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)}, - subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)}, - want: false, + name: "ingress true, one channelable ready, one not, two subscriptions ready", + ichannel: getChannelable(true), + channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(false)}, + fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)}, + subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)}, + eventPoliciesReady: true, + want: false, }, { - name: "ingress true, two channelables ready, one subscription ready, one not", - ichannel: getChannelable(true), - channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)}, - fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", false)}, - subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", false)}, - want: false, + name: "ingress true, two channelables ready, one subscription ready, one not", + ichannel: getChannelable(true), + channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)}, + fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", false)}, + subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", false)}, + eventPoliciesReady: true, + want: false, }, { - name: "ingress false, two channelables ready, two subscriptions ready", - ichannel: getChannelable(false), - channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)}, - fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)}, - subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)}, - want: false, + name: "ingress false, two channelables ready, two subscriptions ready", + ichannel: getChannelable(false), + channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)}, + fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)}, + subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)}, + eventPoliciesReady: true, + want: false, }, { - name: "ingress true, two channelables ready, two subscriptions ready", - ichannel: getChannelable(true), - channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)}, - fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)}, - subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)}, - want: true, + name: "ingress true, two channelables ready, two subscriptions ready", + ichannel: getChannelable(true), + channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)}, + fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)}, + subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)}, + eventPoliciesReady: true, + want: true, + }, { + name: "ingress true, two channelables ready, two subscriptions ready, event policies not ready", + ichannel: getChannelable(true), + channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)}, + fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)}, + subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)}, + eventPoliciesReady: false, + want: false, }} for _, test := range tests { @@ -518,6 +546,12 @@ func TestParallelReady(t *testing.T) { ps := ParallelStatus{} ps.PropagateChannelStatuses(test.ichannel, test.channels) ps.PropagateSubscriptionStatuses(test.fsubs, test.subs) + + if test.eventPoliciesReady { + ps.MarkEventPoliciesTrue() + } else { + ps.MarkEventPoliciesFailed("", "") + } got := ps.IsReady() want := test.want if want != got { diff --git a/pkg/reconciler/parallel/controller.go b/pkg/reconciler/parallel/controller.go index 8a9f6d3ea56..ddebfa6c4e8 100644 --- a/pkg/reconciler/parallel/controller.go +++ b/pkg/reconciler/parallel/controller.go @@ -20,14 +20,18 @@ import ( "context" "k8s.io/client-go/tools/cache" + "knative.dev/eventing/pkg/apis/feature" v1 "knative.dev/eventing/pkg/apis/flows/v1" + "knative.dev/eventing/pkg/auth" "knative.dev/eventing/pkg/duck" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" "knative.dev/pkg/injection/clients/dynamicclient" + "knative.dev/pkg/logging" eventingclient "knative.dev/eventing/pkg/client/injection/client" "knative.dev/eventing/pkg/client/injection/ducks/duck/v1/channelable" + "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy" "knative.dev/eventing/pkg/client/injection/informers/flows/v1/parallel" "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription" parallelreconciler "knative.dev/eventing/pkg/client/injection/reconciler/flows/v1/parallel" @@ -42,14 +46,29 @@ func NewController( parallelInformer := parallel.Get(ctx) subscriptionInformer := subscription.Get(ctx) + eventPolicyInformer := eventpolicy.Get(ctx) r := &Reconciler{ parallelLister: parallelInformer.Lister(), subscriptionLister: subscriptionInformer.Lister(), dynamicClientSet: dynamicclient.Get(ctx), eventingClientSet: eventingclient.Get(ctx), + eventPolicyLister: eventPolicyInformer.Lister(), } - impl := parallelreconciler.NewImpl(ctx, r) + + var globalResync func() + featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(name string, value interface{}) { + if globalResync != nil { + globalResync() + } + }) + featureStore.WatchConfigs(cmw) + + impl := parallelreconciler.NewImpl(ctx, r, func(impl *controller.Impl) controller.Options { + return controller.Options{ + ConfigStore: featureStore, + } + }) r.channelableTracker = duck.NewListableTrackerFromTracker(ctx, channelable.Get, impl.Tracker) parallelInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue)) @@ -61,5 +80,14 @@ func NewController( Handler: controller.HandleAll(impl.EnqueueControllerOf), }) + parallelGK := v1.Kind("Parallel") + // Enqueue the Parallel, if we have an EventPolicy which was referencing + // or got updated and now is referencing the Parallel + eventPolicyInformer.Informer().AddEventHandler(auth.EventPolicyEventHandler( + parallelInformer.Informer().GetIndexer(), + parallelGK, + impl.EnqueueKey, + )) + return impl } diff --git a/pkg/reconciler/parallel/controller_test.go b/pkg/reconciler/parallel/controller_test.go index 9b6b1e5c618..55dec60eba6 100644 --- a/pkg/reconciler/parallel/controller_test.go +++ b/pkg/reconciler/parallel/controller_test.go @@ -19,11 +19,15 @@ package parallel import ( "testing" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "knative.dev/pkg/configmap" . "knative.dev/pkg/reconciler/testing" // Fake injection informers + "knative.dev/eventing/pkg/apis/feature" _ "knative.dev/eventing/pkg/client/injection/ducks/duck/v1/channelable/fake" + _ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy/fake" _ "knative.dev/eventing/pkg/client/injection/informers/flows/v1/parallel/fake" _ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription/fake" ) @@ -31,7 +35,12 @@ import ( func TestNew(t *testing.T) { ctx, _ := SetupFakeContext(t) - c := NewController(ctx, configmap.NewStaticWatcher()) + c := NewController(ctx, configmap.NewStaticWatcher(&corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: feature.FlagsConfigName, + Namespace: "knative-eventing", + }, + })) if c == nil { t.Fatal("Expected NewController to return a non-nil value") diff --git a/pkg/reconciler/parallel/parallel.go b/pkg/reconciler/parallel/parallel.go index eeefb989d03..eb2bb010f92 100644 --- a/pkg/reconciler/parallel/parallel.go +++ b/pkg/reconciler/parallel/parallel.go @@ -36,10 +36,13 @@ import ( pkgreconciler "knative.dev/pkg/reconciler" duckv1 "knative.dev/eventing/pkg/apis/duck/v1" + "knative.dev/eventing/pkg/apis/feature" v1 "knative.dev/eventing/pkg/apis/flows/v1" messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" + "knative.dev/eventing/pkg/auth" clientset "knative.dev/eventing/pkg/client/clientset/versioned" parallelreconciler "knative.dev/eventing/pkg/client/injection/reconciler/flows/v1/parallel" + eventingv1alpha1listers "knative.dev/eventing/pkg/client/listers/eventing/v1alpha1" listers "knative.dev/eventing/pkg/client/listers/flows/v1" messaginglisters "knative.dev/eventing/pkg/client/listers/messaging/v1" ducklib "knative.dev/eventing/pkg/duck" @@ -58,6 +61,8 @@ type Reconciler struct { // dynamicClientSet allows us to configure pluggable Build objects dynamicClientSet dynamic.Interface + + eventPolicyLister eventingv1alpha1listers.EventPolicyLister } // Check that our Reconciler implements parallelreconciler.Interface @@ -71,6 +76,8 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, p *v1.Parallel) pkgrecon // 2.2 create a Subscription to the filter Channel, subscribe the subscriber and send reply to // either the branch Reply. If not present, send reply to the global Reply. If not present, do not send reply. // 3. Rinse and repeat step #2 above for each branch in the list + featureFlags := feature.FromContext(ctx) + if p.Status.BranchStatuses == nil { p.Status.BranchStatuses = make([]v1.ParallelBranchStatus, 0) } @@ -137,6 +144,11 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, p *v1.Parallel) pkgrecon return fmt.Errorf("error removing unwanted Subscriptions: %w", err) } + err := auth.UpdateStatusWithEventPolicies(featureFlags, &p.Status.AppliedEventPoliciesStatus, &p.Status, r.eventPolicyLister, v1.SchemeGroupVersion.WithKind("Parallel"), p.ObjectMeta) + if err != nil { + return fmt.Errorf("could not update parallel status with EventPolicies: %v", err) + } + return nil } diff --git a/pkg/reconciler/parallel/parallel_test.go b/pkg/reconciler/parallel/parallel_test.go index b97a077748b..1c4fe0310ee 100644 --- a/pkg/reconciler/parallel/parallel_test.go +++ b/pkg/reconciler/parallel/parallel_test.go @@ -53,10 +53,12 @@ import ( ) const ( - testNS = "test-namespace" - parallelName = "test-parallel" - replyChannelName = "reply-channel" - parallelGeneration = 79 + testNS = "test-namespace" + parallelName = "test-parallel" + replyChannelName = "reply-channel" + parallelGeneration = 79 + readyEventPolicyName = "test-event-policy-ready" + unreadyEventPolicyName = "test-event-policy-unready" ) var ( @@ -65,6 +67,12 @@ var ( Version: "v1", Kind: "Subscriber", } + + parallelGVK = metav1.GroupVersionKind{ + Group: "flows.knative.dev", + Version: "v1", + Kind: "Parallel", + } ) func TestAllBranches(t *testing.T) { @@ -137,6 +145,7 @@ func TestAllBranches(t *testing.T) { WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), + WithFlowsParallelEventPoliciesReadyBecauseOIDCDisabled(), WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{{ FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 0, corev1.ConditionFalse), @@ -173,6 +182,7 @@ func TestAllBranches(t *testing.T) { WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), + WithFlowsParallelEventPoliciesReadyBecauseOIDCDisabled(), WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{{ FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 0, corev1.ConditionFalse), @@ -209,6 +219,7 @@ func TestAllBranches(t *testing.T) { WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), + WithFlowsParallelEventPoliciesReadyBecauseOIDCDisabled(), WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{{ FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 0, corev1.ConditionFalse), @@ -249,6 +260,7 @@ func TestAllBranches(t *testing.T) { WithFlowsParallelChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), + WithFlowsParallelEventPoliciesReadyBecauseOIDCDisabled(), WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{{ FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 0, corev1.ConditionFalse), @@ -289,6 +301,7 @@ func TestAllBranches(t *testing.T) { WithFlowsParallelChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), + WithFlowsParallelEventPoliciesReadyBecauseOIDCDisabled(), WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{{ FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 0, corev1.ConditionFalse), @@ -339,6 +352,7 @@ func TestAllBranches(t *testing.T) { WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), + WithFlowsParallelEventPoliciesReadyBecauseOIDCDisabled(), WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{ { FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), @@ -398,6 +412,7 @@ func TestAllBranches(t *testing.T) { WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), + WithFlowsParallelEventPoliciesReadyBecauseOIDCDisabled(), WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{ { FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), @@ -456,6 +471,7 @@ func TestAllBranches(t *testing.T) { WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), + WithFlowsParallelEventPoliciesReadyBecauseOIDCDisabled(), WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{{ FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 0, corev1.ConditionFalse), @@ -553,6 +569,139 @@ func TestAllBranches(t *testing.T) { WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), + WithFlowsParallelEventPoliciesReadyBecauseOIDCDisabled(), + WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{{ + FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), + FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 0, corev1.ConditionFalse), + SubscriptionStatus: createParallelSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), + }})), + }}, + }, { + Name: "Should provision applying EventPolicies", + Key: pKey, + Objects: []runtime.Object{ + NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ + {Filter: createFilter(0), Subscriber: createSubscriber(0)}, + })), + NewEventPolicy(readyEventPolicyName, testNS, + WithReadyEventPolicyCondition, + WithEventPolicyToRef(parallelGVK, parallelName), + ), + }, + WantErr: false, + WantCreates: []runtime.Object{ + createChannel(parallelName), + createBranchChannel(parallelName, 0), + resources.NewFilterSubscription(0, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ + {Filter: createFilter(0), Subscriber: createSubscriber(0)}, + }))), + resources.NewSubscription(0, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ + {Filter: createFilter(0), Subscriber: createSubscriber(0)}, + }))), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{{Filter: createFilter(0), Subscriber: createSubscriber(0)}}), + WithFlowsParallelChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), + WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), + WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), + WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), + WithFlowsParallelEventPoliciesReady(), + WithFlowsParallelEventPoliciesListed(readyEventPolicyName), + WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{{ + FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), + FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 0, corev1.ConditionFalse), + SubscriptionStatus: createParallelSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), + }})), + }}, + }, { + Name: "Should mark as NotReady on unready EventPolicies", + Key: pKey, + Objects: []runtime.Object{ + NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ + {Filter: createFilter(0), Subscriber: createSubscriber(0)}, + })), + NewEventPolicy(unreadyEventPolicyName, testNS, + WithUnreadyEventPolicyCondition("", ""), + WithEventPolicyToRef(parallelGVK, parallelName), + ), + }, + WantErr: false, + WantCreates: []runtime.Object{ + createChannel(parallelName), + createBranchChannel(parallelName, 0), + resources.NewFilterSubscription(0, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ + {Filter: createFilter(0), Subscriber: createSubscriber(0)}, + }))), + resources.NewSubscription(0, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ + {Filter: createFilter(0), Subscriber: createSubscriber(0)}, + }))), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{{Filter: createFilter(0), Subscriber: createSubscriber(0)}}), + WithFlowsParallelChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), + WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), + WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), + WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), + WithFlowsParallelEventPoliciesNotReady("EventPoliciesNotReady", fmt.Sprintf("event policies %s are not ready", unreadyEventPolicyName)), + WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{{ + FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), + FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 0, corev1.ConditionFalse), + SubscriptionStatus: createParallelSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), + }})), + }}, + }, { + Name: "should list only Ready EventPolicies", + Key: pKey, + Objects: []runtime.Object{ + NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ + {Filter: createFilter(0), Subscriber: createSubscriber(0)}, + })), + NewEventPolicy(unreadyEventPolicyName, testNS, + WithUnreadyEventPolicyCondition("", ""), + WithEventPolicyToRef(parallelGVK, parallelName), + ), + NewEventPolicy(readyEventPolicyName, testNS, + WithReadyEventPolicyCondition, + WithEventPolicyToRef(parallelGVK, parallelName), + ), + }, + WantErr: false, + WantCreates: []runtime.Object{ + createChannel(parallelName), + createBranchChannel(parallelName, 0), + resources.NewFilterSubscription(0, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ + {Filter: createFilter(0), Subscriber: createSubscriber(0)}, + }))), + resources.NewSubscription(0, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ + {Filter: createFilter(0), Subscriber: createSubscriber(0)}, + }))), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{{Filter: createFilter(0), Subscriber: createSubscriber(0)}}), + WithFlowsParallelChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), + WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), + WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), + WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), + WithFlowsParallelEventPoliciesNotReady("EventPoliciesNotReady", fmt.Sprintf("event policies %s are not ready", unreadyEventPolicyName)), + WithFlowsParallelEventPoliciesListed(readyEventPolicyName), WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{{ FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 0, corev1.ConditionFalse), @@ -571,6 +720,7 @@ func TestAllBranches(t *testing.T) { subscriptionLister: listers.GetSubscriptionLister(), eventingClientSet: fakeeventingclient.Get(ctx), dynamicClientSet: fakedynamicclient.Get(ctx), + eventPolicyLister: listers.GetEventPolicyLister(), } return parallel.NewReconciler(ctx, logging.FromContext(ctx), fakeeventingclient.Get(ctx), listers.GetParallelLister(), diff --git a/pkg/reconciler/testing/v1/parallel.go b/pkg/reconciler/testing/v1/parallel.go index f77f417079d..936faf823ce 100644 --- a/pkg/reconciler/testing/v1/parallel.go +++ b/pkg/reconciler/testing/v1/parallel.go @@ -21,6 +21,9 @@ import ( "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" + "knative.dev/eventing/pkg/apis/eventing/v1alpha1" + "knative.dev/eventing/pkg/apis/feature" flowsv1 "knative.dev/eventing/pkg/apis/flows/v1" messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" duckv1 "knative.dev/pkg/apis/duck/v1" @@ -113,3 +116,38 @@ func WithFlowsParallelAddressableNotReady(reason, message string) FlowsParallelO p.Status.MarkAddressableNotReady(reason, message) } } + +func WithFlowsParallelEventPoliciesReady() FlowsParallelOption { + return func(p *flowsv1.Parallel) { + p.Status.MarkEventPoliciesTrue() + } +} + +func WithFlowsParallelEventPoliciesNotReady(reason, message string) FlowsParallelOption { + return func(p *flowsv1.Parallel) { + p.Status.MarkEventPoliciesFailed(reason, message) + } +} + +func WithFlowsParallelEventPoliciesReadyBecauseOIDCDisabled() FlowsParallelOption { + return func(p *flowsv1.Parallel) { + p.Status.MarkEventPoliciesTrueWithReason("OIDCDisabled", "Feature %q must be enabled to support Authorization", feature.OIDCAuthentication) + } +} + +func WithFlowsParallelEventPoliciesReadyBecauseNoPolicyAndOIDCEnabled() FlowsParallelOption { + return func(p *flowsv1.Parallel) { + p.Status.MarkEventPoliciesTrueWithReason("DefaultAuthorizationMode", "Default authz mode is %q", feature.AuthorizationAllowSameNamespace) + } +} + +func WithFlowsParallelEventPoliciesListed(policyNames ...string) FlowsParallelOption { + return func(p *flowsv1.Parallel) { + for _, name := range policyNames { + p.Status.Policies = append(p.Status.Policies, eventingduckv1.AppliedEventPolicyRef{ + APIVersion: v1alpha1.SchemeGroupVersion.String(), + Name: name, + }) + } + } +}