From 4f06ad056de0ed20f3e544bc0692f7ad969ed5c1 Mon Sep 17 00:00:00 2001 From: rahulii Date: Wed, 24 Jul 2024 17:39:53 +0530 Subject: [PATCH 01/17] create eventpolicies for parallel channel Signed-off-by: rahulii --- .../parallel/resources/eventpolicy.go | 114 ++++++++++++++++++ 1 file changed, 114 insertions(+) create mode 100644 pkg/reconciler/parallel/resources/eventpolicy.go diff --git a/pkg/reconciler/parallel/resources/eventpolicy.go b/pkg/reconciler/parallel/resources/eventpolicy.go new file mode 100644 index 00000000000..36611940f6c --- /dev/null +++ b/pkg/reconciler/parallel/resources/eventpolicy.go @@ -0,0 +1,114 @@ +/* +Copyright 2024 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resources + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" + eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1" + flowsv1 "knative.dev/eventing/pkg/apis/flows/v1" + messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" + "knative.dev/pkg/kmeta" +) + +const ( + ParallelChannelEventPolicyLabelPrefix = "flows.knative.dev/" + parallelKind = "Parallel" +) + +func MakeEventPolicyForParallelChannel(p *flowsv1.Parallel, channel *eventingduckv1.Channelable, subscription *messagingv1.Subscription) *eventingv1alpha1.EventPolicy { + return &eventingv1alpha1.EventPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: channel.Namespace, + Name: ParallelEventPolicyName(p.Name, channel.Name), + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: flowsv1.SchemeGroupVersion.String(), + Kind: parallelKind, + Name: p.Name, + }, + }, + Labels: LabelsForParallelChannelsEventPolicy(p.Name), + }, + Spec: eventingv1alpha1.EventPolicySpec{ + To: []eventingv1alpha1.EventPolicySpecTo{ + { + Ref: &eventingv1alpha1.EventPolicyToReference{ + APIVersion: channel.APIVersion, + Kind: channel.Kind, + Name: channel.Name, + }, + }, + }, + From: []eventingv1alpha1.EventPolicySpecFrom{ + { + Ref: &eventingv1alpha1.EventPolicyFromReference{ + APIVersion: subscription.APIVersion, + Kind: subscription.Kind, + Name: subscription.Name, + Namespace: subscription.Namespace, + }, + }, + }, + }, + } +} + +func LabelsForParallelChannelsEventPolicy(parallelName string) map[string]string { + return map[string]string{ + ParallelChannelEventPolicyLabelPrefix + "parallel-group": flowsv1.SchemeGroupVersion.Group, + ParallelChannelEventPolicyLabelPrefix + "parallel-version": flowsv1.SchemeGroupVersion.Version, + ParallelChannelEventPolicyLabelPrefix + "parallel-kind": parallelKind, + ParallelChannelEventPolicyLabelPrefix + "parallel-name": parallelName, + } +} + +func ParallelEventPolicyName(parallelName, channelName string) string { + // if channel name is empty, it means the event policy is for the output channel + if channelName == "" { + return kmeta.ChildName(parallelName, "-ep") // no need to add the channel name + } else { + return kmeta.ChildName(parallelName, "-ep-"+channelName) + } +} + +// MakeEventPolicyForParallelInputChannel creates an EventPolicy for the input channel of a Parallel. +func MakeEventPolicyForParallelInputChannel(p *flowsv1.Parallel, inputChannel *eventingduckv1.Channelable, parallelPolicy *eventingv1alpha1.EventPolicy) *eventingv1alpha1.EventPolicy { + return &eventingv1alpha1.EventPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: inputChannel.Namespace, + Name: ParallelEventPolicyName(p.Name, inputChannel.Name), + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: flowsv1.SchemeGroupVersion.String(), + Kind: parallelKind, + Name: p.Name, + }, + }, + Labels: LabelsForParallelChannelsEventPolicy(p.Name), + }, + Spec: eventingv1alpha1.EventPolicySpec{ + To: []eventingv1alpha1.EventPolicySpecTo{ + { + Ref: &eventingv1alpha1.EventPolicyToReference{ + APIVersion: inputChannel.APIVersion, + Kind: inputChannel.Kind, + Name: inputChannel.Name, + }, + }, + }, + From: parallelPolicy.Spec.From, + }, + } +} From 3e327f190bcde42558af7121681704093ae9e0d8 Mon Sep 17 00:00:00 2001 From: rahulii Date: Mon, 29 Jul 2024 12:42:38 +0530 Subject: [PATCH 02/17] reconcile event policies for parallel Signed-off-by: rahulii --- pkg/reconciler/parallel/parallel.go | 143 ++++++++++++++++++ .../parallel/resources/eventpolicy.go | 14 +- 2 files changed, 150 insertions(+), 7 deletions(-) diff --git a/pkg/reconciler/parallel/parallel.go b/pkg/reconciler/parallel/parallel.go index eb2bb010f92..1d9d78dfde1 100644 --- a/pkg/reconciler/parallel/parallel.go +++ b/pkg/reconciler/parallel/parallel.go @@ -22,6 +22,7 @@ import ( "go.uber.org/zap" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" apierrs "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -36,6 +37,7 @@ import ( pkgreconciler "knative.dev/pkg/reconciler" duckv1 "knative.dev/eventing/pkg/apis/duck/v1" + eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1" "knative.dev/eventing/pkg/apis/feature" v1 "knative.dev/eventing/pkg/apis/flows/v1" messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" @@ -144,6 +146,11 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, p *v1.Parallel) pkgrecon return fmt.Errorf("error removing unwanted Subscriptions: %w", err) } + // Reconcile EventPolicies for the parallel. + if err := r.reconcileEventPolicies(ctx, p, ingressChannel, channels, filterSubs, featureFlags); err != nil { + return fmt.Errorf("failed to reconcile EventPolicies for Parallel: %w", err) + } + err := auth.UpdateStatusWithEventPolicies(featureFlags, &p.Status.AppliedEventPoliciesStatus, &p.Status, r.eventPolicyLister, v1.SchemeGroupVersion.WithKind("Parallel"), p.ObjectMeta) if err != nil { return fmt.Errorf("could not update parallel status with EventPolicies: %v", err) @@ -349,3 +356,139 @@ func (r *Reconciler) removeUnwantedSubscriptions(ctx context.Context, p *v1.Para return nil } + +func (r *Reconciler) reconcileEventPolicies(ctx context.Context, p *v1.Parallel, ingressChannel *duckv1.Channelable, + channels []*duckv1.Channelable, filterSubs []*messagingv1.Subscription, featureFlags feature.Flags) error { + + if !featureFlags.IsOIDCAuthentication() { + return r.cleanupAllEventPolicies(ctx, p) + } + // list all the existing event policies for the parallel. + existingPolicies, err := r.listEventPoliciesForParallel(p) + if err != nil { + return fmt.Errorf("failed to list existing event policies for parallel: %w", err) + } + // make a map of existing event policies for easy and efficient lookup. + existingPolicyMap := make(map[string]*eventingv1alpha1.EventPolicy) + for _, policy := range existingPolicies { + existingPolicyMap[policy.Name] = policy + } + + // prepare the list of event policies to create, update and delete. + var policiesToCreate, policiesToUpdate, policiesToDelete []*eventingv1alpha1.EventPolicy + policiesToDelete = make([]*eventingv1alpha1.EventPolicy, 0, len(existingPolicyMap)) + + for i, channel := range channels { + filterSub := filterSubs[i] + expectedPolicy := resources.MakeEventPolicyForParallelChannel(p, channel, filterSub) + if existingPolicy, ok := existingPolicyMap[expectedPolicy.Name]; ok { + if !equality.Semantic.DeepDerivative(existingPolicy.Spec, expectedPolicy.Spec) { + policiesToUpdate = append(policiesToUpdate, expectedPolicy) + } + delete(existingPolicyMap, expectedPolicy.Name) + } else { + policiesToCreate = append(policiesToCreate, expectedPolicy) + } + } + + // prepare the event policies for the ingress channel. + ingressChannelEventPolicies, err := r.prepareIngressChannelEventpolicies(p, ingressChannel) + if err != nil { + return fmt.Errorf("failed to prepare event policies for ingress channel: %w", err) + } + + for _, policy := range ingressChannelEventPolicies { + if existingPolicy, ok := existingPolicyMap[policy.Name]; ok { + if !equality.Semantic.DeepDerivative(existingPolicy.Spec, policy.Spec) { + policiesToUpdate = append(policiesToUpdate, policy) + } + delete(existingPolicyMap, policy.Name) + } else { + policiesToCreate = append(policiesToCreate, policy) + } + } + + // delete the remaining event policies in the map. + for _, policy := range existingPolicyMap { + policiesToDelete = append(policiesToDelete, policy) + } + + // now that we have the list of event policies to create, update and delete, we can perform the operations. + if err := r.createEventPolicies(ctx, policiesToCreate); err != nil { + return fmt.Errorf("failed to create event policies: %w", err) + } + if err := r.updateEventPolicies(ctx, policiesToUpdate); err != nil { + return fmt.Errorf("failed to update event policies: %w", err) + } + if err := r.deleteEventPolicies(ctx, policiesToDelete); err != nil { + return fmt.Errorf("failed to delete event policies: %w", err) + } + + return nil +} + +func (r *Reconciler) createEventPolicies(ctx context.Context, policies []*eventingv1alpha1.EventPolicy) error { + for _, policy := range policies { + _, err := r.eventingClientSet.EventingV1alpha1().EventPolicies(policy.Namespace).Create(ctx, policy, metav1.CreateOptions{}) + if err != nil { + return err + } + } + return nil +} + +func (r *Reconciler) updateEventPolicies(ctx context.Context, policies []*eventingv1alpha1.EventPolicy) error { + for _, policy := range policies { + _, err := r.eventingClientSet.EventingV1alpha1().EventPolicies(policy.Namespace).Update(ctx, policy, metav1.UpdateOptions{}) + if err != nil { + return err + } + } + return nil +} + +func (r *Reconciler) deleteEventPolicies(ctx context.Context, policies []*eventingv1alpha1.EventPolicy) error { + for _, policy := range policies { + err := r.eventingClientSet.EventingV1alpha1().EventPolicies(policy.Namespace).Delete(ctx, policy.Name, metav1.DeleteOptions{}) + if err != nil && !apierrs.IsNotFound(err) { + return err + } + } + return nil +} + +func (r *Reconciler) prepareIngressChannelEventpolicies(p *v1.Parallel, ingressChannel *duckv1.Channelable) ([]*eventingv1alpha1.EventPolicy, error) { + applyingEventPoliciesForParallel, err := auth.GetEventPoliciesForResource(r.eventPolicyLister, v1.SchemeGroupVersion.WithKind("Parallel"), p.ObjectMeta) + if err != nil { + return nil, fmt.Errorf("could not get EventPolicies for Parallel %s/%s: %w", p.Namespace, p.Name, err) + } + + if len(applyingEventPoliciesForParallel) == 0 { + return nil, nil + } + + var ingressChannelEventPolicies []*eventingv1alpha1.EventPolicy + for _, eventPolicy := range applyingEventPoliciesForParallel { + ingressChannelEventPolicies = append(ingressChannelEventPolicies, resources.MakeEventPolicyForParallelIngressChannel(p, ingressChannel, eventPolicy)) + } + + return ingressChannelEventPolicies, nil +} + +func (r *Reconciler) cleanupAllEventPolicies(ctx context.Context, p *v1.Parallel) error { + // list all the event policies for the parallel. + eventPolicies, err := r.listEventPoliciesForParallel(p) + if err != nil { + return err + } + return r.deleteEventPolicies(ctx, eventPolicies) +} + +// listEventPoliciesForParallel lists all EventPolicies (e.g. the policies for the input channel and the intermediate channels) +// created during reconcileKind that are associated with the given Parallel. +func (r *Reconciler) listEventPoliciesForParallel(p *v1.Parallel) ([]*eventingv1alpha1.EventPolicy, error) { + labelSelector := labels.SelectorFromSet(map[string]string{ + resources.ParallelChannelEventPolicyLabelPrefix + "parallel-name": p.Name, + }) + return r.eventPolicyLister.EventPolicies(p.Namespace).List(labelSelector) +} diff --git a/pkg/reconciler/parallel/resources/eventpolicy.go b/pkg/reconciler/parallel/resources/eventpolicy.go index 36611940f6c..b1bbd0e85f4 100644 --- a/pkg/reconciler/parallel/resources/eventpolicy.go +++ b/pkg/reconciler/parallel/resources/eventpolicy.go @@ -83,12 +83,12 @@ func ParallelEventPolicyName(parallelName, channelName string) string { } } -// MakeEventPolicyForParallelInputChannel creates an EventPolicy for the input channel of a Parallel. -func MakeEventPolicyForParallelInputChannel(p *flowsv1.Parallel, inputChannel *eventingduckv1.Channelable, parallelPolicy *eventingv1alpha1.EventPolicy) *eventingv1alpha1.EventPolicy { +// MakeEventPolicyForParallelIngressChannel creates an EventPolicy for the ingress channel of a Parallel. +func MakeEventPolicyForParallelIngressChannel(p *flowsv1.Parallel, ingressChannel *eventingduckv1.Channelable, parallelPolicy *eventingv1alpha1.EventPolicy) *eventingv1alpha1.EventPolicy { return &eventingv1alpha1.EventPolicy{ ObjectMeta: metav1.ObjectMeta{ - Namespace: inputChannel.Namespace, - Name: ParallelEventPolicyName(p.Name, inputChannel.Name), + Namespace: ingressChannel.Namespace, + Name: ParallelEventPolicyName(p.Name, ingressChannel.Name), OwnerReferences: []metav1.OwnerReference{ { APIVersion: flowsv1.SchemeGroupVersion.String(), @@ -102,9 +102,9 @@ func MakeEventPolicyForParallelInputChannel(p *flowsv1.Parallel, inputChannel *e To: []eventingv1alpha1.EventPolicySpecTo{ { Ref: &eventingv1alpha1.EventPolicyToReference{ - APIVersion: inputChannel.APIVersion, - Kind: inputChannel.Kind, - Name: inputChannel.Name, + APIVersion: ingressChannel.APIVersion, + Kind: ingressChannel.Kind, + Name: ingressChannel.Name, }, }, }, From be56d254ddf6fe1179cb1b65af37f7a9d242d205 Mon Sep 17 00:00:00 2001 From: rahulii Date: Mon, 29 Jul 2024 15:14:29 +0530 Subject: [PATCH 03/17] add unit test cases Signed-off-by: rahulii --- pkg/reconciler/parallel/parallel.go | 2 +- pkg/reconciler/parallel/parallel_test.go | 145 ++++++++++++++++++++++- 2 files changed, 145 insertions(+), 2 deletions(-) diff --git a/pkg/reconciler/parallel/parallel.go b/pkg/reconciler/parallel/parallel.go index 1d9d78dfde1..61d24011f71 100644 --- a/pkg/reconciler/parallel/parallel.go +++ b/pkg/reconciler/parallel/parallel.go @@ -377,7 +377,7 @@ func (r *Reconciler) reconcileEventPolicies(ctx context.Context, p *v1.Parallel, // prepare the list of event policies to create, update and delete. var policiesToCreate, policiesToUpdate, policiesToDelete []*eventingv1alpha1.EventPolicy policiesToDelete = make([]*eventingv1alpha1.EventPolicy, 0, len(existingPolicyMap)) - + for i, channel := range channels { filterSub := filterSubs[i] expectedPolicy := resources.MakeEventPolicyForParallelChannel(p, channel, filterSub) diff --git a/pkg/reconciler/parallel/parallel_test.go b/pkg/reconciler/parallel/parallel_test.go index 1c4fe0310ee..83a5a27b28c 100644 --- a/pkg/reconciler/parallel/parallel_test.go +++ b/pkg/reconciler/parallel/parallel_test.go @@ -35,6 +35,8 @@ import ( clientgotesting "k8s.io/client-go/testing" eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" + eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1" + "knative.dev/eventing/pkg/apis/feature" "knative.dev/eventing/pkg/client/injection/ducks/duck/v1/channelable" "knative.dev/eventing/pkg/duck" "knative.dev/pkg/apis" @@ -65,7 +67,7 @@ var ( subscriberGVK = metav1.GroupVersionKind{ Group: "messaging.knative.dev", Version: "v1", - Kind: "Subscriber", + Kind: "Subscription", } parallelGVK = metav1.GroupVersionKind{ @@ -73,6 +75,12 @@ var ( Version: "v1", Kind: "Parallel", } + + channelV1GVK = metav1.GroupVersionKind{ + Group: "messaging.knative.dev", + Version: "v1", + Kind: "InMemoryChannel", + } ) func TestAllBranches(t *testing.T) { @@ -708,6 +716,97 @@ func TestAllBranches(t *testing.T) { SubscriptionStatus: createParallelSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), }})), }}, + }, { + Name: "AuthZ Enablled with single branch, with filter, no EventPolicies", + Key: pKey, + Objects: []runtime.Object{ + NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ + {Filter: createFilter(0), Subscriber: createSubscriber(0)}, + }))}, + WantErr: false, + WantCreates: []runtime.Object{ + createChannel(parallelName), + createBranchChannel(parallelName, 0), + resources.NewFilterSubscription(0, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ + {Filter: createFilter(0), Subscriber: createSubscriber(0)}, + }))), + resources.NewSubscription(0, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ + {Filter: createFilter(0), Subscriber: createSubscriber(0)}, + }))), + makeEventPolicy(parallelName, resources.ParallelBranchChannelName(parallelName, 0), 0), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{{Filter: createFilter(0), Subscriber: createSubscriber(0)}}), + WithFlowsParallelChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), + WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), + WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), + WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), + WithFlowsParallelEventPoliciesReadyBecauseNoPolicyAndOIDCEnabled(), + WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{{ + FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), + FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 0, corev1.ConditionFalse), + SubscriptionStatus: createParallelSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), + }})), + }}, + Ctx: feature.ToContext(context.Background(), feature.Flags{ + feature.OIDCAuthentication: feature.Enabled, + feature.AuthorizationDefaultMode: feature.AuthorizationAllowSameNamespace, + }), + }, { + Name: "AuthZ Enablled with single branch, with filter, with Parallel EventPolicy", + Key: pKey, + Objects: []runtime.Object{ + NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ + {Filter: createFilter(0), Subscriber: createSubscriber(0)}, + })), + NewEventPolicy(readyEventPolicyName, testNS, + WithReadyEventPolicyCondition, + WithEventPolicyToRef(parallelGVK, parallelName), + ), + }, + WantErr: false, + WantCreates: []runtime.Object{ + createChannel(parallelName), + createBranchChannel(parallelName, 0), + resources.NewFilterSubscription(0, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ + {Filter: createFilter(0), Subscriber: createSubscriber(0)}, + }))), + resources.NewSubscription(0, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ + {Filter: createFilter(0), Subscriber: createSubscriber(0)}, + }))), + makeEventPolicy(parallelName, resources.ParallelBranchChannelName(parallelName, 0), 0), + makeIngressChannelEventPolicy(parallelName, resources.ParallelChannelName(parallelName)), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{{Filter: createFilter(0), Subscriber: createSubscriber(0)}}), + WithFlowsParallelChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), + WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), + WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), + WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), + WithFlowsParallelEventPoliciesReady(), + WithFlowsParallelEventPoliciesListed(readyEventPolicyName), + WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{{ + FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), + FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 0, corev1.ConditionFalse), + SubscriptionStatus: createParallelSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), + }})), + }}, + Ctx: feature.ToContext(context.Background(), feature.Flags{ + feature.OIDCAuthentication: feature.Enabled, + feature.AuthorizationDefaultMode: feature.AuthorizationAllowSameNamespace, + }), }, } @@ -890,3 +989,47 @@ func createDelivery(gvk metav1.GroupVersionKind, name, namespace string) *eventi }, } } + +func makeEventPolicy(parallelName, channelName string, branch int) *eventingv1alpha1.EventPolicy { + return NewEventPolicy(resources.ParallelEventPolicyName(parallelName, channelName), testNS, + WithEventPolicyToRef(channelV1GVK, channelName), + // from a subscription + WithEventPolicyFrom(subscriberGVK, resources.ParallelFilterSubscriptionName(parallelName, branch), testNS), + WithEventPolicyOwnerReferences([]metav1.OwnerReference{ + { + APIVersion: "flows.knative.dev/v1", + Kind: "Parallel", + Name: parallelName, + }, + }...), + WithEventPolicyLabels(resources.LabelsForParallelChannelsEventPolicy(parallelName)), + ) +} + +func makeParallelEventPolicy(parallelName string) *eventingv1alpha1.EventPolicy { + return NewEventPolicy(resources.ParallelEventPolicyName(parallelName, ""), testNS, + // from a subscription + WithEventPolicyOwnerReferences([]metav1.OwnerReference{ + { + APIVersion: "flows.knative.dev/v1", + Kind: "Parallel", + Name: parallelName, + }, + }...), + ) +} + +func makeIngressChannelEventPolicy(parallelName, channelName string) *eventingv1alpha1.EventPolicy { + return NewEventPolicy(resources.ParallelEventPolicyName(parallelName, channelName), testNS, + WithEventPolicyToRef(channelV1GVK, channelName), + // from a subscription + WithEventPolicyOwnerReferences([]metav1.OwnerReference{ + { + APIVersion: "flows.knative.dev/v1", + Kind: "Parallel", + Name: parallelName, + }, + }...), + WithEventPolicyLabels(resources.LabelsForParallelChannelsEventPolicy(parallelName)), + ) +} From 36e56d6c24209c12b9295b5d0ecb062a5bc71614 Mon Sep 17 00:00:00 2001 From: rahulii Date: Tue, 30 Jul 2024 18:01:50 +0530 Subject: [PATCH 04/17] add more unit test cases Signed-off-by: rahulii --- pkg/reconciler/parallel/parallel_test.go | 250 ++++++++++++++++++++++- 1 file changed, 248 insertions(+), 2 deletions(-) diff --git a/pkg/reconciler/parallel/parallel_test.go b/pkg/reconciler/parallel/parallel_test.go index 83a5a27b28c..07348489c11 100644 --- a/pkg/reconciler/parallel/parallel_test.go +++ b/pkg/reconciler/parallel/parallel_test.go @@ -717,7 +717,7 @@ func TestAllBranches(t *testing.T) { }})), }}, }, { - Name: "AuthZ Enablled with single branch, with filter, no EventPolicies", + Name: "AuthZ Enabled with single branch, with filter, no EventPolicies", Key: pKey, Objects: []runtime.Object{ NewFlowsParallel(parallelName, testNS, @@ -759,7 +759,7 @@ func TestAllBranches(t *testing.T) { feature.AuthorizationDefaultMode: feature.AuthorizationAllowSameNamespace, }), }, { - Name: "AuthZ Enablled with single branch, with filter, with Parallel EventPolicy", + Name: "AuthZ Enabled with single branch, with filter, with Parallel EventPolicy", Key: pKey, Objects: []runtime.Object{ NewFlowsParallel(parallelName, testNS, @@ -807,6 +807,252 @@ func TestAllBranches(t *testing.T) { feature.OIDCAuthentication: feature.Enabled, feature.AuthorizationDefaultMode: feature.AuthorizationAllowSameNamespace, }), + }, { + Name: "AuthZ Enabled two branches, no filters, no EventPolicy", + Key: pKey, + Objects: []runtime.Object{ + NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + }))}, + WantErr: false, + WantCreates: []runtime.Object{ + createChannel(parallelName), + createBranchChannel(parallelName, 0), + createBranchChannel(parallelName, 1), + resources.NewFilterSubscription(0, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + }))), + resources.NewSubscription(0, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + }))), + resources.NewFilterSubscription(1, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + }))), + resources.NewSubscription(1, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + }))), + makeEventPolicy(parallelName, resources.ParallelBranchChannelName(parallelName, 0), 0), + makeEventPolicy(parallelName, resources.ParallelBranchChannelName(parallelName, 1), 1), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + }), + WithFlowsParallelChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), + WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), + WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), + WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), + WithFlowsParallelEventPoliciesReadyBecauseNoPolicyAndOIDCEnabled(), + WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{ + { + FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), + FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 0, corev1.ConditionFalse), + SubscriptionStatus: createParallelSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), + }, + { + FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 1, corev1.ConditionFalse), + FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 1, corev1.ConditionFalse), + SubscriptionStatus: createParallelSubscriptionStatus(parallelName, 1, corev1.ConditionFalse), + }})), + }}, + Ctx: feature.ToContext(context.Background(), feature.Flags{ + feature.OIDCAuthentication: feature.Enabled, + feature.AuthorizationDefaultMode: feature.AuthorizationAllowSameNamespace, + }), + }, { + Name: "AuthZ Enabled two branches, no filters, with EventPolicy", + Key: pKey, + Objects: []runtime.Object{ + NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + })), + NewEventPolicy(readyEventPolicyName, testNS, + WithReadyEventPolicyCondition, + WithEventPolicyToRef(parallelGVK, parallelName), + ), + }, + WantErr: false, + WantCreates: []runtime.Object{ + createChannel(parallelName), + createBranchChannel(parallelName, 0), + createBranchChannel(parallelName, 1), + resources.NewFilterSubscription(0, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + }))), + resources.NewSubscription(0, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + }))), + resources.NewFilterSubscription(1, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + }))), + resources.NewSubscription(1, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + }))), + makeEventPolicy(parallelName, resources.ParallelBranchChannelName(parallelName, 0), 0), + makeEventPolicy(parallelName, resources.ParallelBranchChannelName(parallelName, 1), 1), + makeIngressChannelEventPolicy(parallelName, resources.ParallelChannelName(parallelName)), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + }), + WithFlowsParallelChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), + WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), + WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), + WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), + WithFlowsParallelEventPoliciesReady(), + WithFlowsParallelEventPoliciesListed(readyEventPolicyName), + WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{ + { + FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), + FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 0, corev1.ConditionFalse), + SubscriptionStatus: createParallelSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), + }, + { + FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 1, corev1.ConditionFalse), + FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 1, corev1.ConditionFalse), + SubscriptionStatus: createParallelSubscriptionStatus(parallelName, 1, corev1.ConditionFalse), + }})), + }}, + Ctx: feature.ToContext(context.Background(), feature.Flags{ + feature.OIDCAuthentication: feature.Enabled, + feature.AuthorizationDefaultMode: feature.AuthorizationAllowSameNamespace, + }), + }, + { + Name: "two branches, update: remove one branch, with AuthZ enabled and parallel doesn't have EventPolicies", + Key: pKey, + Objects: []runtime.Object{ + NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ + {Subscriber: createSubscriber(0)}, + }), + WithFlowsParallelChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), + WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), + WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), + WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), + WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{ + { + FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), + FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 0, corev1.ConditionFalse), + SubscriptionStatus: createParallelSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), + }, + { + FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 1, corev1.ConditionFalse), + FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 1, corev1.ConditionFalse), + SubscriptionStatus: createParallelSubscriptionStatus(parallelName, 1, corev1.ConditionFalse), + }, + })), + + createChannel(parallelName), + createBranchChannel(parallelName, 0), + createBranchChannel(parallelName, 1), + + resources.NewSubscription(0, NewFlowsParallel(parallelName, testNS, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + }))), + resources.NewFilterSubscription(0, NewFlowsParallel(parallelName, testNS, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + }))), + + resources.NewSubscription(1, NewFlowsParallel(parallelName, testNS, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + }))), + resources.NewFilterSubscription(1, NewFlowsParallel(parallelName, testNS, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + }))), + + makeEventPolicy(parallelName, resources.ParallelBranchChannelName(parallelName, 0), 0), + makeEventPolicy(parallelName, resources.ParallelBranchChannelName(parallelName, 1), 1), + }, + WantErr: false, + WantDeletes: []clientgotesting.DeleteActionImpl{ + { + ActionImpl: clientgotesting.ActionImpl{ + Namespace: testNS, + Resource: v1.SchemeGroupVersion.WithResource("subscriptions"), + }, + Name: resources.ParallelSubscriptionName(parallelName, 1), + }, { + ActionImpl: clientgotesting.ActionImpl{ + Namespace: testNS, + Resource: v1.SchemeGroupVersion.WithResource("subscriptions"), + }, + Name: resources.ParallelFilterSubscriptionName(parallelName, 1), + }, { + ActionImpl: clientgotesting.ActionImpl{ + Namespace: testNS, + Resource: v1.SchemeGroupVersion.WithResource("inmemorychannels"), + }, + Name: resources.ParallelBranchChannelName(parallelName, 1), + }, { + ActionImpl: clientgotesting.ActionImpl{ + Namespace: testNS, + Resource: v1.SchemeGroupVersion.WithResource("eventpolicies"), + }, + Name: resources.ParallelEventPolicyName(parallelName, resources.ParallelBranchChannelName(parallelName, 1)), + }, + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ + {Subscriber: createSubscriber(0)}, + }), + WithFlowsParallelChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), + WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), + WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), + WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), + WithFlowsParallelEventPoliciesReadyBecauseNoPolicyAndOIDCEnabled(), + WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{{ + FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), + FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 0, corev1.ConditionFalse), + SubscriptionStatus: createParallelSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), + }})), + }}, + Ctx: feature.ToContext(context.Background(), feature.Flags{ + feature.OIDCAuthentication: feature.Enabled, + feature.AuthorizationDefaultMode: feature.AuthorizationAllowSameNamespace, + }), }, } From 32faf47ec67d509a96c9945f4ea28144df2a1f56 Mon Sep 17 00:00:00 2001 From: rahulii Date: Tue, 30 Jul 2024 18:05:44 +0530 Subject: [PATCH 05/17] remove unused function Signed-off-by: rahulii --- pkg/reconciler/parallel/parallel_test.go | 13 ------------- pkg/reconciler/parallel/resources/eventpolicy.go | 7 +------ 2 files changed, 1 insertion(+), 19 deletions(-) diff --git a/pkg/reconciler/parallel/parallel_test.go b/pkg/reconciler/parallel/parallel_test.go index 07348489c11..ec0ddcc7f1f 100644 --- a/pkg/reconciler/parallel/parallel_test.go +++ b/pkg/reconciler/parallel/parallel_test.go @@ -1252,19 +1252,6 @@ func makeEventPolicy(parallelName, channelName string, branch int) *eventingv1al ) } -func makeParallelEventPolicy(parallelName string) *eventingv1alpha1.EventPolicy { - return NewEventPolicy(resources.ParallelEventPolicyName(parallelName, ""), testNS, - // from a subscription - WithEventPolicyOwnerReferences([]metav1.OwnerReference{ - { - APIVersion: "flows.knative.dev/v1", - Kind: "Parallel", - Name: parallelName, - }, - }...), - ) -} - func makeIngressChannelEventPolicy(parallelName, channelName string) *eventingv1alpha1.EventPolicy { return NewEventPolicy(resources.ParallelEventPolicyName(parallelName, channelName), testNS, WithEventPolicyToRef(channelV1GVK, channelName), diff --git a/pkg/reconciler/parallel/resources/eventpolicy.go b/pkg/reconciler/parallel/resources/eventpolicy.go index b1bbd0e85f4..8d2a3c96e70 100644 --- a/pkg/reconciler/parallel/resources/eventpolicy.go +++ b/pkg/reconciler/parallel/resources/eventpolicy.go @@ -75,12 +75,7 @@ func LabelsForParallelChannelsEventPolicy(parallelName string) map[string]string } func ParallelEventPolicyName(parallelName, channelName string) string { - // if channel name is empty, it means the event policy is for the output channel - if channelName == "" { - return kmeta.ChildName(parallelName, "-ep") // no need to add the channel name - } else { - return kmeta.ChildName(parallelName, "-ep-"+channelName) - } + return kmeta.ChildName(parallelName, "-ep-"+channelName) } // MakeEventPolicyForParallelIngressChannel creates an EventPolicy for the ingress channel of a Parallel. From 9df5d0c588c27639221090056a4442b0a15182f1 Mon Sep 17 00:00:00 2001 From: rahulii Date: Tue, 30 Jul 2024 18:16:19 +0530 Subject: [PATCH 06/17] add more unit test cases Signed-off-by: rahulii --- pkg/reconciler/parallel/parallel_test.go | 140 +++++++++++++++++++++++ 1 file changed, 140 insertions(+) diff --git a/pkg/reconciler/parallel/parallel_test.go b/pkg/reconciler/parallel/parallel_test.go index ec0ddcc7f1f..552ad3c650a 100644 --- a/pkg/reconciler/parallel/parallel_test.go +++ b/pkg/reconciler/parallel/parallel_test.go @@ -1053,6 +1053,146 @@ func TestAllBranches(t *testing.T) { feature.OIDCAuthentication: feature.Enabled, feature.AuthorizationDefaultMode: feature.AuthorizationAllowSameNamespace, }), + }, { + Name: "three branches, update: remove one branch, with AuthZ enabled and parallel doesn't have EventPolicies", + Key: pKey, + Objects: []runtime.Object{ + NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + }), + WithFlowsParallelChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), + WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), + WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), + WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), + WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{ + { + FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), + FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 0, corev1.ConditionFalse), + SubscriptionStatus: createParallelSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), + }, + { + FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 1, corev1.ConditionFalse), + FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 1, corev1.ConditionFalse), + SubscriptionStatus: createParallelSubscriptionStatus(parallelName, 1, corev1.ConditionFalse), + }, + { + FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 2, corev1.ConditionFalse), + FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 2, corev1.ConditionFalse), + SubscriptionStatus: createParallelSubscriptionStatus(parallelName, 2, corev1.ConditionFalse), + }, + })), + + createChannel(parallelName), + createBranchChannel(parallelName, 0), + createBranchChannel(parallelName, 1), + createBranchChannel(parallelName, 2), + + resources.NewSubscription(0, NewFlowsParallel(parallelName, testNS, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + {Subscriber: createSubscriber(2)}, + }))), + resources.NewFilterSubscription(0, NewFlowsParallel(parallelName, testNS, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + {Subscriber: createSubscriber(2)}, + }))), + + resources.NewSubscription(1, NewFlowsParallel(parallelName, testNS, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + {Subscriber: createSubscriber(2)}, + }))), + resources.NewFilterSubscription(1, NewFlowsParallel(parallelName, testNS, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + {Subscriber: createSubscriber(2)}, + }))), + resources.NewSubscription(2, NewFlowsParallel(parallelName, testNS, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + {Subscriber: createSubscriber(2)}, + }))), + resources.NewFilterSubscription(2, NewFlowsParallel(parallelName, testNS, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + {Subscriber: createSubscriber(2)}, + }))), + makeEventPolicy(parallelName, resources.ParallelBranchChannelName(parallelName, 0), 0), + makeEventPolicy(parallelName, resources.ParallelBranchChannelName(parallelName, 1), 1), + makeEventPolicy(parallelName, resources.ParallelBranchChannelName(parallelName, 2), 2), + }, + WantErr: false, + WantDeletes: []clientgotesting.DeleteActionImpl{ + { + ActionImpl: clientgotesting.ActionImpl{ + Namespace: testNS, + Resource: v1.SchemeGroupVersion.WithResource("subscriptions"), + }, + Name: resources.ParallelSubscriptionName(parallelName, 2), + }, { + ActionImpl: clientgotesting.ActionImpl{ + Namespace: testNS, + Resource: v1.SchemeGroupVersion.WithResource("subscriptions"), + }, + Name: resources.ParallelFilterSubscriptionName(parallelName, 2), + }, { + ActionImpl: clientgotesting.ActionImpl{ + Namespace: testNS, + Resource: v1.SchemeGroupVersion.WithResource("inmemorychannels"), + }, + Name: resources.ParallelBranchChannelName(parallelName, 2), + }, { + ActionImpl: clientgotesting.ActionImpl{ + Namespace: testNS, + Resource: v1.SchemeGroupVersion.WithResource("eventpolicies"), + }, + Name: resources.ParallelEventPolicyName(parallelName, resources.ParallelBranchChannelName(parallelName, 2)), + }, + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + }), + WithFlowsParallelChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), + WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), + WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), + WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), + WithFlowsParallelEventPoliciesReadyBecauseNoPolicyAndOIDCEnabled(), + WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{{ + FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), + FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 0, corev1.ConditionFalse), + SubscriptionStatus: createParallelSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), + }, { + FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 1, corev1.ConditionFalse), + FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 1, corev1.ConditionFalse), + SubscriptionStatus: createParallelSubscriptionStatus(parallelName, 1, corev1.ConditionFalse), + }})), + }}, + Ctx: feature.ToContext(context.Background(), feature.Flags{ + feature.OIDCAuthentication: feature.Enabled, + feature.AuthorizationDefaultMode: feature.AuthorizationAllowSameNamespace, + }), }, } From de6205c5dc60e75966866980edf6de2a8b4ae16e Mon Sep 17 00:00:00 2001 From: rahulii Date: Wed, 31 Jul 2024 12:54:21 +0530 Subject: [PATCH 07/17] add more unit test case - with multiple event policies Signed-off-by: rahulii --- pkg/reconciler/parallel/parallel_test.go | 67 ++++++++++++++++++++++-- 1 file changed, 63 insertions(+), 4 deletions(-) diff --git a/pkg/reconciler/parallel/parallel_test.go b/pkg/reconciler/parallel/parallel_test.go index 552ad3c650a..3348f9347bd 100644 --- a/pkg/reconciler/parallel/parallel_test.go +++ b/pkg/reconciler/parallel/parallel_test.go @@ -784,7 +784,7 @@ func TestAllBranches(t *testing.T) { {Filter: createFilter(0), Subscriber: createSubscriber(0)}, }))), makeEventPolicy(parallelName, resources.ParallelBranchChannelName(parallelName, 0), 0), - makeIngressChannelEventPolicy(parallelName, resources.ParallelChannelName(parallelName)), + makeIngressChannelEventPolicy(parallelName, resources.ParallelChannelName(parallelName), readyEventPolicyName), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewFlowsParallel(parallelName, testNS, @@ -910,7 +910,7 @@ func TestAllBranches(t *testing.T) { }))), makeEventPolicy(parallelName, resources.ParallelBranchChannelName(parallelName, 0), 0), makeEventPolicy(parallelName, resources.ParallelBranchChannelName(parallelName, 1), 1), - makeIngressChannelEventPolicy(parallelName, resources.ParallelChannelName(parallelName)), + makeIngressChannelEventPolicy(parallelName, resources.ParallelChannelName(parallelName), readyEventPolicyName), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewFlowsParallel(parallelName, testNS, @@ -1193,6 +1193,65 @@ func TestAllBranches(t *testing.T) { feature.OIDCAuthentication: feature.Enabled, feature.AuthorizationDefaultMode: feature.AuthorizationAllowSameNamespace, }), + }, { + Name: "Parallel with multiple event policies", + Key: pKey, + Objects: []runtime.Object{ + NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ + {Filter: createFilter(0), Subscriber: createSubscriber(0)}, + })), + NewEventPolicy(readyEventPolicyName, testNS, + WithReadyEventPolicyCondition, + WithEventPolicyToRef(parallelGVK, parallelName), + ), + NewEventPolicy(readyEventPolicyName+"-1", testNS, + WithReadyEventPolicyCondition, + WithEventPolicyToRef(parallelGVK, parallelName), + ), + NewEventPolicy(readyEventPolicyName+"-2", testNS, + WithReadyEventPolicyCondition, + WithEventPolicyToRef(parallelGVK, parallelName), + ), + }, + WantErr: false, + WantCreates: []runtime.Object{ + createChannel(parallelName), + createBranchChannel(parallelName, 0), + resources.NewFilterSubscription(0, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ + {Filter: createFilter(0), Subscriber: createSubscriber(0)}, + }))), + resources.NewSubscription(0, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ + {Filter: createFilter(0), Subscriber: createSubscriber(0)}, + }))), + makeEventPolicy(parallelName, resources.ParallelBranchChannelName(parallelName, 0), 0), + makeIngressChannelEventPolicy(parallelName, resources.ParallelChannelName(parallelName), readyEventPolicyName), + makeIngressChannelEventPolicy(parallelName, resources.ParallelChannelName(parallelName), readyEventPolicyName+"-1"), + makeIngressChannelEventPolicy(parallelName, resources.ParallelChannelName(parallelName), readyEventPolicyName+"-2"), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{{Filter: createFilter(0), Subscriber: createSubscriber(0)}}), + WithFlowsParallelChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), + WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), + WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), + WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), + WithFlowsParallelEventPoliciesReady(), + WithFlowsParallelEventPoliciesListed(readyEventPolicyName, readyEventPolicyName+"-1", readyEventPolicyName+"-2"), + WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{{ + FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), + FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 0, corev1.ConditionFalse), + SubscriptionStatus: createParallelSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), + }})), + }}, + Ctx: feature.ToContext(context.Background(), feature.Flags{ + feature.OIDCAuthentication: feature.Enabled, + feature.AuthorizationDefaultMode: feature.AuthorizationAllowSameNamespace, + }), }, } @@ -1392,8 +1451,8 @@ func makeEventPolicy(parallelName, channelName string, branch int) *eventingv1al ) } -func makeIngressChannelEventPolicy(parallelName, channelName string) *eventingv1alpha1.EventPolicy { - return NewEventPolicy(resources.ParallelEventPolicyName(parallelName, channelName), testNS, +func makeIngressChannelEventPolicy(parallelName, channelName, parallelEventPolicyName string) *eventingv1alpha1.EventPolicy { + return NewEventPolicy(resources.ParallelEventPolicyName(parallelName, parallelEventPolicyName), testNS, WithEventPolicyToRef(channelV1GVK, channelName), // from a subscription WithEventPolicyOwnerReferences([]metav1.OwnerReference{ From f1f23cd08938c1a645b19afdf0eddb706f34b7ec Mon Sep 17 00:00:00 2001 From: rahulii Date: Wed, 31 Jul 2024 12:54:58 +0530 Subject: [PATCH 08/17] fix gofmt Signed-off-by: rahulii --- pkg/reconciler/parallel/parallel.go | 6 +++--- pkg/reconciler/parallel/resources/eventpolicy.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/reconciler/parallel/parallel.go b/pkg/reconciler/parallel/parallel.go index 61d24011f71..5a4b5da594f 100644 --- a/pkg/reconciler/parallel/parallel.go +++ b/pkg/reconciler/parallel/parallel.go @@ -375,8 +375,8 @@ func (r *Reconciler) reconcileEventPolicies(ctx context.Context, p *v1.Parallel, } // prepare the list of event policies to create, update and delete. - var policiesToCreate, policiesToUpdate, policiesToDelete []*eventingv1alpha1.EventPolicy - policiesToDelete = make([]*eventingv1alpha1.EventPolicy, 0, len(existingPolicyMap)) + var policiesToCreate, policiesToUpdate []*eventingv1alpha1.EventPolicy + policiesToDelete := make([]*eventingv1alpha1.EventPolicy, 0, len(existingPolicyMap)) for i, channel := range channels { filterSub := filterSubs[i] @@ -467,7 +467,7 @@ func (r *Reconciler) prepareIngressChannelEventpolicies(p *v1.Parallel, ingressC return nil, nil } - var ingressChannelEventPolicies []*eventingv1alpha1.EventPolicy + ingressChannelEventPolicies := make([]*eventingv1alpha1.EventPolicy, 0, len(applyingEventPoliciesForParallel)) for _, eventPolicy := range applyingEventPoliciesForParallel { ingressChannelEventPolicies = append(ingressChannelEventPolicies, resources.MakeEventPolicyForParallelIngressChannel(p, ingressChannel, eventPolicy)) } diff --git a/pkg/reconciler/parallel/resources/eventpolicy.go b/pkg/reconciler/parallel/resources/eventpolicy.go index 8d2a3c96e70..7c006a0e4f8 100644 --- a/pkg/reconciler/parallel/resources/eventpolicy.go +++ b/pkg/reconciler/parallel/resources/eventpolicy.go @@ -83,7 +83,7 @@ func MakeEventPolicyForParallelIngressChannel(p *flowsv1.Parallel, ingressChanne return &eventingv1alpha1.EventPolicy{ ObjectMeta: metav1.ObjectMeta{ Namespace: ingressChannel.Namespace, - Name: ParallelEventPolicyName(p.Name, ingressChannel.Name), + Name: ParallelEventPolicyName(p.Name, parallelPolicy.Name), OwnerReferences: []metav1.OwnerReference{ { APIVersion: flowsv1.SchemeGroupVersion.String(), From 7b97071142ed464d285989c48681b7c1266326b1 Mon Sep 17 00:00:00 2001 From: rahulii Date: Wed, 31 Jul 2024 13:23:25 +0530 Subject: [PATCH 09/17] add more test cases Signed-off-by: rahulii --- pkg/reconciler/parallel/parallel_test.go | 28 +++++++++---------- .../parallel/resources/eventpolicy.go | 6 ++-- 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/pkg/reconciler/parallel/parallel_test.go b/pkg/reconciler/parallel/parallel_test.go index 3348f9347bd..ab68022a544 100644 --- a/pkg/reconciler/parallel/parallel_test.go +++ b/pkg/reconciler/parallel/parallel_test.go @@ -1194,7 +1194,7 @@ func TestAllBranches(t *testing.T) { feature.AuthorizationDefaultMode: feature.AuthorizationAllowSameNamespace, }), }, { - Name: "Parallel with multiple event policies", + Name: "Parallel Event Policy Deleted, corresponding Ingress Channel Policy should be deleted", Key: pKey, Objects: []runtime.Object{ NewFlowsParallel(parallelName, testNS, @@ -1207,14 +1207,9 @@ func TestAllBranches(t *testing.T) { WithReadyEventPolicyCondition, WithEventPolicyToRef(parallelGVK, parallelName), ), - NewEventPolicy(readyEventPolicyName+"-1", testNS, - WithReadyEventPolicyCondition, - WithEventPolicyToRef(parallelGVK, parallelName), - ), - NewEventPolicy(readyEventPolicyName+"-2", testNS, - WithReadyEventPolicyCondition, - WithEventPolicyToRef(parallelGVK, parallelName), - ), + makeEventPolicy(parallelName, resources.ParallelBranchChannelName(parallelName, 0), 0), + makeIngressChannelEventPolicy(parallelName, resources.ParallelChannelName(parallelName), readyEventPolicyName), + makeIngressChannelEventPolicy(parallelName, resources.ParallelChannelName(parallelName), readyEventPolicyName+"-1"), }, WantErr: false, WantCreates: []runtime.Object{ @@ -1226,10 +1221,15 @@ func TestAllBranches(t *testing.T) { resources.NewSubscription(0, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ {Filter: createFilter(0), Subscriber: createSubscriber(0)}, }))), - makeEventPolicy(parallelName, resources.ParallelBranchChannelName(parallelName, 0), 0), - makeIngressChannelEventPolicy(parallelName, resources.ParallelChannelName(parallelName), readyEventPolicyName), - makeIngressChannelEventPolicy(parallelName, resources.ParallelChannelName(parallelName), readyEventPolicyName+"-1"), - makeIngressChannelEventPolicy(parallelName, resources.ParallelChannelName(parallelName), readyEventPolicyName+"-2"), + }, + WantDeletes: []clientgotesting.DeleteActionImpl{ + { + ActionImpl: clientgotesting.ActionImpl{ + Namespace: testNS, + Resource: v1.SchemeGroupVersion.WithResource("eventpolicies"), + }, + Name: resources.ParallelEventPolicyName(parallelName, readyEventPolicyName+"-1"), + }, }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewFlowsParallel(parallelName, testNS, @@ -1241,7 +1241,7 @@ func TestAllBranches(t *testing.T) { WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), WithFlowsParallelEventPoliciesReady(), - WithFlowsParallelEventPoliciesListed(readyEventPolicyName, readyEventPolicyName+"-1", readyEventPolicyName+"-2"), + WithFlowsParallelEventPoliciesListed(readyEventPolicyName), WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{{ FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 0, corev1.ConditionFalse), diff --git a/pkg/reconciler/parallel/resources/eventpolicy.go b/pkg/reconciler/parallel/resources/eventpolicy.go index 7c006a0e4f8..c028d017775 100644 --- a/pkg/reconciler/parallel/resources/eventpolicy.go +++ b/pkg/reconciler/parallel/resources/eventpolicy.go @@ -74,8 +74,10 @@ func LabelsForParallelChannelsEventPolicy(parallelName string) map[string]string } } -func ParallelEventPolicyName(parallelName, channelName string) string { - return kmeta.ChildName(parallelName, "-ep-"+channelName) +// ParallelEventPolicyName returns the name of the EventPolicy for the Parallel. +// suffix is either channel name or parent event policy name. +func ParallelEventPolicyName(parallelName, suffix string) string { + return kmeta.ChildName(parallelName, "-ep-"+suffix) } // MakeEventPolicyForParallelIngressChannel creates an EventPolicy for the ingress channel of a Parallel. From 3a2c5429e66616f8ff104f224896c871abff3c68 Mon Sep 17 00:00:00 2001 From: rahulii Date: Wed, 31 Jul 2024 13:26:00 +0530 Subject: [PATCH 10/17] sort the slice for deterministic order in unit tests Signed-off-by: rahulii --- pkg/reconciler/parallel/parallel.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/reconciler/parallel/parallel.go b/pkg/reconciler/parallel/parallel.go index 5a4b5da594f..32a21dcfb91 100644 --- a/pkg/reconciler/parallel/parallel.go +++ b/pkg/reconciler/parallel/parallel.go @@ -19,6 +19,7 @@ package parallel import ( "context" "fmt" + "sort" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" @@ -467,6 +468,11 @@ func (r *Reconciler) prepareIngressChannelEventpolicies(p *v1.Parallel, ingressC return nil, nil } + // sort the event policies by name to ensure deterministic order. + sort.Slice(applyingEventPoliciesForParallel, func(i, j int) bool { + return applyingEventPoliciesForParallel[i].Name < applyingEventPoliciesForParallel[j].Name + }) + ingressChannelEventPolicies := make([]*eventingv1alpha1.EventPolicy, 0, len(applyingEventPoliciesForParallel)) for _, eventPolicy := range applyingEventPoliciesForParallel { ingressChannelEventPolicies = append(ingressChannelEventPolicies, resources.MakeEventPolicyForParallelIngressChannel(p, ingressChannel, eventPolicy)) From d88612b8be2a28177b091b434b8d95bc8502260b Mon Sep 17 00:00:00 2001 From: rahulii Date: Wed, 31 Jul 2024 13:34:04 +0530 Subject: [PATCH 11/17] minor fix Signed-off-by: rahulii --- pkg/reconciler/parallel/resources/eventpolicy.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/reconciler/parallel/resources/eventpolicy.go b/pkg/reconciler/parallel/resources/eventpolicy.go index c028d017775..85798c892a6 100644 --- a/pkg/reconciler/parallel/resources/eventpolicy.go +++ b/pkg/reconciler/parallel/resources/eventpolicy.go @@ -1,9 +1,12 @@ /* Copyright 2024 The Knative Authors + Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. From a0c46954c64312cf285474e20484b9365f4d0fdd Mon Sep 17 00:00:00 2001 From: rahulii Date: Thu, 1 Aug 2024 20:58:31 +0530 Subject: [PATCH 12/17] add more test cases Signed-off-by: rahulii --- pkg/reconciler/parallel/parallel_test.go | 59 ++++++++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/pkg/reconciler/parallel/parallel_test.go b/pkg/reconciler/parallel/parallel_test.go index ab68022a544..25353e5d5d2 100644 --- a/pkg/reconciler/parallel/parallel_test.go +++ b/pkg/reconciler/parallel/parallel_test.go @@ -1252,6 +1252,65 @@ func TestAllBranches(t *testing.T) { feature.OIDCAuthentication: feature.Enabled, feature.AuthorizationDefaultMode: feature.AuthorizationAllowSameNamespace, }), + }, { + Name: "Parallel with multiple event policies", + Key: pKey, + Objects: []runtime.Object{ + NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ + {Filter: createFilter(0), Subscriber: createSubscriber(0)}, + })), + NewEventPolicy(readyEventPolicyName, testNS, + WithReadyEventPolicyCondition, + WithEventPolicyToRef(parallelGVK, parallelName), + ), + NewEventPolicy(readyEventPolicyName+"-1", testNS, + WithReadyEventPolicyCondition, + WithEventPolicyToRef(parallelGVK, parallelName), + ), + NewEventPolicy(readyEventPolicyName+"-2", testNS, + WithReadyEventPolicyCondition, + WithEventPolicyToRef(parallelGVK, parallelName), + ), + }, + WantErr: false, + WantCreates: []runtime.Object{ + createChannel(parallelName), + createBranchChannel(parallelName, 0), + resources.NewFilterSubscription(0, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ + {Filter: createFilter(0), Subscriber: createSubscriber(0)}, + }))), + resources.NewSubscription(0, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ + {Filter: createFilter(0), Subscriber: createSubscriber(0)}, + }))), + makeEventPolicy(parallelName, resources.ParallelBranchChannelName(parallelName, 0), 0), + makeIngressChannelEventPolicy(parallelName, resources.ParallelChannelName(parallelName), readyEventPolicyName), + makeIngressChannelEventPolicy(parallelName, resources.ParallelChannelName(parallelName), readyEventPolicyName+"-1"), + makeIngressChannelEventPolicy(parallelName, resources.ParallelChannelName(parallelName), readyEventPolicyName+"-2"), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{{Filter: createFilter(0), Subscriber: createSubscriber(0)}}), + WithFlowsParallelChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), + WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), + WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), + WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), + WithFlowsParallelEventPoliciesReady(), + WithFlowsParallelEventPoliciesListed(readyEventPolicyName, readyEventPolicyName+"-1", readyEventPolicyName+"-2"), + WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{{ + FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), + FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 0, corev1.ConditionFalse), + SubscriptionStatus: createParallelSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), + }})), + }}, + Ctx: feature.ToContext(context.Background(), feature.Flags{ + feature.OIDCAuthentication: feature.Enabled, + feature.AuthorizationDefaultMode: feature.AuthorizationAllowSameNamespace, + }), }, } From 65147005e1b856263d144b99dc1324c97fa85861 Mon Sep 17 00:00:00 2001 From: rahulii Date: Mon, 5 Aug 2024 16:04:34 +0530 Subject: [PATCH 13/17] uid in ownerref Signed-off-by: rahulii --- pkg/reconciler/parallel/resources/eventpolicy.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/reconciler/parallel/resources/eventpolicy.go b/pkg/reconciler/parallel/resources/eventpolicy.go index 85798c892a6..d422a136af7 100644 --- a/pkg/reconciler/parallel/resources/eventpolicy.go +++ b/pkg/reconciler/parallel/resources/eventpolicy.go @@ -40,6 +40,7 @@ func MakeEventPolicyForParallelChannel(p *flowsv1.Parallel, channel *eventingduc APIVersion: flowsv1.SchemeGroupVersion.String(), Kind: parallelKind, Name: p.Name, + UID: p.UID, }, }, Labels: LabelsForParallelChannelsEventPolicy(p.Name), From 43fb57bb399e679d6d22ba9aa30b345c8ae4f02d Mon Sep 17 00:00:00 2001 From: rahulii Date: Thu, 8 Aug 2024 11:57:48 +0530 Subject: [PATCH 14/17] fix review comments from Chris Signed-off-by: rahulii --- pkg/reconciler/parallel/parallel.go | 8 +++++--- pkg/reconciler/parallel/resources/eventpolicy.go | 5 +---- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/pkg/reconciler/parallel/parallel.go b/pkg/reconciler/parallel/parallel.go index 32a21dcfb91..7d12a1b02d9 100644 --- a/pkg/reconciler/parallel/parallel.go +++ b/pkg/reconciler/parallel/parallel.go @@ -383,7 +383,8 @@ func (r *Reconciler) reconcileEventPolicies(ctx context.Context, p *v1.Parallel, filterSub := filterSubs[i] expectedPolicy := resources.MakeEventPolicyForParallelChannel(p, channel, filterSub) if existingPolicy, ok := existingPolicyMap[expectedPolicy.Name]; ok { - if !equality.Semantic.DeepDerivative(existingPolicy.Spec, expectedPolicy.Spec) { + if !equality.Semantic.DeepDerivative(expectedPolicy, existingPolicy) { + expectedPolicy.SetResourceVersion(existingPolicy.ResourceVersion) policiesToUpdate = append(policiesToUpdate, expectedPolicy) } delete(existingPolicyMap, expectedPolicy.Name) @@ -399,8 +400,9 @@ func (r *Reconciler) reconcileEventPolicies(ctx context.Context, p *v1.Parallel, } for _, policy := range ingressChannelEventPolicies { - if existingPolicy, ok := existingPolicyMap[policy.Name]; ok { - if !equality.Semantic.DeepDerivative(existingPolicy.Spec, policy.Spec) { + if existingIngressChannelPolicy, ok := existingPolicyMap[policy.Name]; ok { + if !equality.Semantic.DeepDerivative(policy, existingIngressChannelPolicy) { + policy.SetResourceVersion(existingIngressChannelPolicy.ResourceVersion) policiesToUpdate = append(policiesToUpdate, policy) } delete(existingPolicyMap, policy.Name) diff --git a/pkg/reconciler/parallel/resources/eventpolicy.go b/pkg/reconciler/parallel/resources/eventpolicy.go index d422a136af7..ff216f9cdf3 100644 --- a/pkg/reconciler/parallel/resources/eventpolicy.go +++ b/pkg/reconciler/parallel/resources/eventpolicy.go @@ -71,10 +71,7 @@ func MakeEventPolicyForParallelChannel(p *flowsv1.Parallel, channel *eventingduc func LabelsForParallelChannelsEventPolicy(parallelName string) map[string]string { return map[string]string{ - ParallelChannelEventPolicyLabelPrefix + "parallel-group": flowsv1.SchemeGroupVersion.Group, - ParallelChannelEventPolicyLabelPrefix + "parallel-version": flowsv1.SchemeGroupVersion.Version, - ParallelChannelEventPolicyLabelPrefix + "parallel-kind": parallelKind, - ParallelChannelEventPolicyLabelPrefix + "parallel-name": parallelName, + ParallelChannelEventPolicyLabelPrefix + "parallel-name": parallelName, } } From 35e475b3127979301ed31588fa95b664c77cc8e0 Mon Sep 17 00:00:00 2001 From: rahulii Date: Fri, 9 Aug 2024 00:39:40 +0530 Subject: [PATCH 15/17] add parallel policy as owner references Signed-off-by: rahulii --- pkg/reconciler/parallel/parallel_test.go | 4 ++++ pkg/reconciler/parallel/resources/eventpolicy.go | 6 ++++++ 2 files changed, 10 insertions(+) diff --git a/pkg/reconciler/parallel/parallel_test.go b/pkg/reconciler/parallel/parallel_test.go index 25353e5d5d2..e467b7b6d7c 100644 --- a/pkg/reconciler/parallel/parallel_test.go +++ b/pkg/reconciler/parallel/parallel_test.go @@ -1519,6 +1519,10 @@ func makeIngressChannelEventPolicy(parallelName, channelName, parallelEventPolic APIVersion: "flows.knative.dev/v1", Kind: "Parallel", Name: parallelName, + }, { + APIVersion: "eventing.knative.dev/v1alpha1", + Kind: "EventPolicy", + Name: parallelEventPolicyName, }, }...), WithEventPolicyLabels(resources.LabelsForParallelChannelsEventPolicy(parallelName)), diff --git a/pkg/reconciler/parallel/resources/eventpolicy.go b/pkg/reconciler/parallel/resources/eventpolicy.go index ff216f9cdf3..a7299316a6b 100644 --- a/pkg/reconciler/parallel/resources/eventpolicy.go +++ b/pkg/reconciler/parallel/resources/eventpolicy.go @@ -92,6 +92,12 @@ func MakeEventPolicyForParallelIngressChannel(p *flowsv1.Parallel, ingressChanne APIVersion: flowsv1.SchemeGroupVersion.String(), Kind: parallelKind, Name: p.Name, + UID: p.UID, + }, { + APIVersion: eventingv1alpha1.SchemeGroupVersion.String(), + Kind: "EventPolicy", + Name: parallelPolicy.Name, + UID: parallelPolicy.UID, }, }, Labels: LabelsForParallelChannelsEventPolicy(p.Name), From 1fbac6a0677c10ca95076dca7888b7329ad1d20d Mon Sep 17 00:00:00 2001 From: rahulii Date: Fri, 9 Aug 2024 12:51:51 +0530 Subject: [PATCH 16/17] fix e2e test Signed-off-by: rahulii --- pkg/reconciler/parallel/resources/eventpolicy.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/reconciler/parallel/resources/eventpolicy.go b/pkg/reconciler/parallel/resources/eventpolicy.go index a7299316a6b..b2c1b0a71f6 100644 --- a/pkg/reconciler/parallel/resources/eventpolicy.go +++ b/pkg/reconciler/parallel/resources/eventpolicy.go @@ -58,8 +58,8 @@ func MakeEventPolicyForParallelChannel(p *flowsv1.Parallel, channel *eventingduc From: []eventingv1alpha1.EventPolicySpecFrom{ { Ref: &eventingv1alpha1.EventPolicyFromReference{ - APIVersion: subscription.APIVersion, - Kind: subscription.Kind, + APIVersion: messagingv1.SchemeGroupVersion.String(), + Kind: "Subscription", Name: subscription.Name, Namespace: subscription.Namespace, }, From 0bd3e6943a2fa1bed3f7d5666a0b34d16ce56fcc Mon Sep 17 00:00:00 2001 From: rahulii Date: Fri, 9 Aug 2024 16:43:14 +0530 Subject: [PATCH 17/17] remove sorting of eventpolicies Signed-off-by: rahulii --- pkg/reconciler/parallel/parallel.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/pkg/reconciler/parallel/parallel.go b/pkg/reconciler/parallel/parallel.go index 7d12a1b02d9..6aac944d3f3 100644 --- a/pkg/reconciler/parallel/parallel.go +++ b/pkg/reconciler/parallel/parallel.go @@ -19,7 +19,6 @@ package parallel import ( "context" "fmt" - "sort" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" @@ -470,11 +469,6 @@ func (r *Reconciler) prepareIngressChannelEventpolicies(p *v1.Parallel, ingressC return nil, nil } - // sort the event policies by name to ensure deterministic order. - sort.Slice(applyingEventPoliciesForParallel, func(i, j int) bool { - return applyingEventPoliciesForParallel[i].Name < applyingEventPoliciesForParallel[j].Name - }) - ingressChannelEventPolicies := make([]*eventingv1alpha1.EventPolicy, 0, len(applyingEventPoliciesForParallel)) for _, eventPolicy := range applyingEventPoliciesForParallel { ingressChannelEventPolicies = append(ingressChannelEventPolicies, resources.MakeEventPolicyForParallelIngressChannel(p, ingressChannel, eventPolicy))