Skip to content

Commit

Permalink
Fix: propagate EventPolicy filter to underlying Channels EventPolicy (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
creydr committed Aug 16, 2024
1 parent 2b92299 commit b57ac3a
Show file tree
Hide file tree
Showing 7 changed files with 226 additions and 11 deletions.
8 changes: 8 additions & 0 deletions pkg/reconciler/channel/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"fmt"
"testing"

eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"

eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
"knative.dev/eventing/pkg/apis/feature"

Expand Down Expand Up @@ -554,6 +556,9 @@ func TestReconcile(t *testing.T) {
NewEventPolicy(readyEventPolicyName, testNS,
WithReadyEventPolicyCondition,
WithEventPolicyToRef(channelV1GVK, channelName),
WithEventPolicyFilter(eventingv1.SubscriptionsAPIFilter{
CESQL: "true",
}),
),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Expand Down Expand Up @@ -587,6 +592,9 @@ func TestReconcile(t *testing.T) {
"messaging.knative.dev/channel-kind": "InMemoryChannel",
"messaging.knative.dev/channel-name": channelName,
}),
WithEventPolicyFilter(eventingv1.SubscriptionsAPIFilter{
CESQL: "true",
}),
),
},
}}
Expand Down
3 changes: 2 additions & 1 deletion pkg/reconciler/channel/resources/eventpolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ func MakeEventPolicyForBackingChannel(backingChannel *eventingduckv1.Channelable
},
},
},
From: parentPolicy.Spec.From,
From: parentPolicy.Spec.From,
Filters: parentPolicy.Spec.Filters,
},
}
}
Expand Down
77 changes: 73 additions & 4 deletions pkg/reconciler/parallel/parallel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"fmt"
"testing"

eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"

fakeeventingclient "knative.dev/eventing/pkg/client/injection/client/fake"
fakedynamicclient "knative.dev/pkg/injection/clients/dynamicclient/fake"
"knative.dev/pkg/tracker"
Expand Down Expand Up @@ -1311,6 +1313,61 @@ func TestAllBranches(t *testing.T) {
feature.OIDCAuthentication: feature.Enabled,
feature.AuthorizationDefaultMode: feature.AuthorizationAllowSameNamespace,
}),
}, {
Name: "Propagates Filter of Parallels EventPolicy to ingress channels EventPolicy",
Key: pKey,
Objects: []runtime.Object{
NewFlowsParallel(parallelName, testNS,
WithInitFlowsParallelConditions,
WithFlowsParallelChannelTemplateSpec(imc),
WithFlowsParallelBranches([]v1.ParallelBranch{
{Filter: createFilter(0), Subscriber: createSubscriber(0)},
})),
NewEventPolicy(readyEventPolicyName, testNS,
WithReadyEventPolicyCondition,
WithEventPolicyToRef(parallelGVK, parallelName),
WithEventPolicyFilter(eventingv1.SubscriptionsAPIFilter{
CESQL: "true",
}),
),
},
WantErr: false,
WantCreates: []runtime.Object{
createChannel(parallelName),
createBranchChannel(parallelName, 0),
resources.NewFilterSubscription(0, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{
{Filter: createFilter(0), Subscriber: createSubscriber(0)},
}))),
resources.NewSubscription(0, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{
{Filter: createFilter(0), Subscriber: createSubscriber(0)},
}))),
makeEventPolicy(parallelName, resources.ParallelBranchChannelName(parallelName, 0), 0),
makeIngressChannelEventPolicy(parallelName, resources.ParallelChannelName(parallelName), readyEventPolicyName,
WithEventPolicyFilter(eventingv1.SubscriptionsAPIFilter{
CESQL: "true",
})),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: NewFlowsParallel(parallelName, testNS,
WithInitFlowsParallelConditions,
WithFlowsParallelChannelTemplateSpec(imc),
WithFlowsParallelBranches([]v1.ParallelBranch{{Filter: createFilter(0), Subscriber: createSubscriber(0)}}),
WithFlowsParallelChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"),
WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"),
WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"),
WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)),
WithFlowsParallelEventPoliciesReady(),
WithFlowsParallelEventPoliciesListed(readyEventPolicyName),
WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{{
FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse),
FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 0, corev1.ConditionFalse),
SubscriptionStatus: createParallelSubscriptionStatus(parallelName, 0, corev1.ConditionFalse),
}})),
}},
Ctx: feature.ToContext(context.Background(), feature.Flags{
feature.OIDCAuthentication: feature.Enabled,
feature.AuthorizationDefaultMode: feature.AuthorizationAllowSameNamespace,
}),
},
}

Expand Down Expand Up @@ -1494,8 +1551,8 @@ func createDelivery(gvk metav1.GroupVersionKind, name, namespace string) *eventi
}
}

func makeEventPolicy(parallelName, channelName string, branch int) *eventingv1alpha1.EventPolicy {
return NewEventPolicy(resources.ParallelEventPolicyName(parallelName, channelName), testNS,
func makeEventPolicy(parallelName, channelName string, branch int, opts ...EventPolicyOption) *eventingv1alpha1.EventPolicy {
ep := NewEventPolicy(resources.ParallelEventPolicyName(parallelName, channelName), testNS,
WithEventPolicyToRef(channelV1GVK, channelName),
// from a subscription
WithEventPolicyFrom(subscriberGVK, resources.ParallelFilterSubscriptionName(parallelName, branch), testNS),
Expand All @@ -1508,10 +1565,16 @@ func makeEventPolicy(parallelName, channelName string, branch int) *eventingv1al
}...),
WithEventPolicyLabels(resources.LabelsForParallelChannelsEventPolicy(parallelName)),
)

for _, opt := range opts {
opt(ep)
}

return ep
}

func makeIngressChannelEventPolicy(parallelName, channelName, parallelEventPolicyName string) *eventingv1alpha1.EventPolicy {
return NewEventPolicy(resources.ParallelEventPolicyName(parallelName, parallelEventPolicyName), testNS,
func makeIngressChannelEventPolicy(parallelName, channelName, parallelEventPolicyName string, opts ...EventPolicyOption) *eventingv1alpha1.EventPolicy {
ep := NewEventPolicy(resources.ParallelEventPolicyName(parallelName, parallelEventPolicyName), testNS,
WithEventPolicyToRef(channelV1GVK, channelName),
// from a subscription
WithEventPolicyOwnerReferences([]metav1.OwnerReference{
Expand All @@ -1527,4 +1590,10 @@ func makeIngressChannelEventPolicy(parallelName, channelName, parallelEventPolic
}...),
WithEventPolicyLabels(resources.LabelsForParallelChannelsEventPolicy(parallelName)),
)

for _, opt := range opts {
opt(ep)
}

return ep
}
3 changes: 2 additions & 1 deletion pkg/reconciler/parallel/resources/eventpolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ func MakeEventPolicyForParallelIngressChannel(p *flowsv1.Parallel, ingressChanne
},
},
},
From: parallelPolicy.Spec.From,
From: parallelPolicy.Spec.From,
Filters: parallelPolicy.Spec.Filters,
},
}
}
3 changes: 2 additions & 1 deletion pkg/reconciler/sequence/resources/eventpolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ func MakeEventPolicyForSequenceInputChannel(s *flowsv1.Sequence, inputChannel *e
},
},
},
From: sequencePolicy.Spec.From,
From: sequencePolicy.Spec.From,
Filters: sequencePolicy.Spec.Filters,
},
}
}
135 changes: 131 additions & 4 deletions pkg/reconciler/sequence/sequence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"fmt"
"testing"

eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"

"knative.dev/eventing/pkg/apis/feature"

corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -3561,6 +3563,119 @@ func TestAllCases(t *testing.T) {
},
})),
}},
}, {
Name: "Propagates Filter of Sequence EventPolicy to ingress channels EventPolicy",
Key: pKey,
Objects: []runtime.Object{
NewSequence(sequenceName, testNS,
WithInitSequenceConditions,
WithSequenceChannelTemplateSpec(imc),
WithSequenceSteps([]v1.SequenceStep{
{Destination: createDestination(0)},
{Destination: createDestination(1)},
})),
createChannel(sequenceName, 0),
createChannel(sequenceName, 1),
resources.NewSubscription(0, NewSequence(sequenceName, testNS,
WithSequenceChannelTemplateSpec(imc),
WithSequenceSteps([]v1.SequenceStep{
{Destination: createDestination(0)},
{Destination: createDestination(1)},
}))),
resources.NewSubscription(1, NewSequence(sequenceName, testNS,
WithSequenceChannelTemplateSpec(imc),
WithSequenceSteps([]v1.SequenceStep{
{Destination: createDestination(0)},
{Destination: createDestination(1)},
}))),
makeSequenceEventPolicy(sequenceName, WithEventPolicyFilter(eventingv1.SubscriptionsAPIFilter{
CESQL: "true",
})),
makeEventPolicy(sequenceName, resources.SequenceChannelName(sequenceName, 1), 1),
},
WantErr: false,
Ctx: feature.ToContext(context.Background(), feature.Flags{
feature.OIDCAuthentication: feature.Enabled,
feature.AuthorizationDefaultMode: feature.AuthorizationAllowSameNamespace,
}),
WantCreates: []runtime.Object{
makeInputChannelEventPolicy(sequenceName, resources.SequenceChannelName(sequenceName, 0), resources.SequenceEventPolicyName(sequenceName, ""), WithEventPolicyFilter(eventingv1.SubscriptionsAPIFilter{
CESQL: "true",
})),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: NewSequence(sequenceName, testNS,
WithInitSequenceConditions,
WithSequenceChannelTemplateSpec(imc),
WithSequenceSteps([]v1.SequenceStep{
{Destination: createDestination(0)},
{Destination: createDestination(1)},
}),
WithSequenceChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"),
WithSequenceAddressableNotReady("emptyAddress", "addressable is nil"),
WithSequenceSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"),
WithSequenceEventPoliciesNotReady("EventPoliciesNotReady", fmt.Sprintf("event policies %s are not ready", sequenceName)),
WithSequenceChannelStatuses([]v1.SequenceChannelStatus{
{
Channel: corev1.ObjectReference{
APIVersion: "messaging.knative.dev/v1",
Kind: "InMemoryChannel",
Name: resources.SequenceChannelName(sequenceName, 0),
Namespace: testNS,
},
ReadyCondition: apis.Condition{
Type: apis.ConditionReady,
Status: corev1.ConditionUnknown,
Reason: "NoReady",
Message: "Channel does not have Ready condition",
},
},
{
Channel: corev1.ObjectReference{
APIVersion: "messaging.knative.dev/v1",
Kind: "InMemoryChannel",
Name: resources.SequenceChannelName(sequenceName, 1),
Namespace: testNS,
},
ReadyCondition: apis.Condition{
Type: apis.ConditionReady,
Status: corev1.ConditionUnknown,
Reason: "NoReady",
Message: "Channel does not have Ready condition",
},
},
}),
WithSequenceSubscriptionStatuses([]v1.SequenceSubscriptionStatus{
{
Subscription: corev1.ObjectReference{
APIVersion: "messaging.knative.dev/v1",
Kind: "Subscription",
Name: resources.SequenceSubscriptionName(sequenceName, 0),
Namespace: testNS,
},
ReadyCondition: apis.Condition{
Type: apis.ConditionReady,
Status: corev1.ConditionUnknown,
Reason: "NoReady",
Message: "Subscription does not have Ready condition",
},
},
{
Subscription: corev1.ObjectReference{
APIVersion: "messaging.knative.dev/v1",
Kind: "Subscription",
Name: resources.SequenceSubscriptionName(sequenceName, 1),
Namespace: testNS,
},
ReadyCondition: apis.Condition{
Type: apis.ConditionReady,
Status: corev1.ConditionUnknown,
Reason: "NoReady",
Message: "Subscription does not have Ready condition",
},
},
})),
}},
},
}

Expand Down Expand Up @@ -3598,8 +3713,8 @@ func makeEventPolicy(sequenceName, channelName string, step int) *eventingv1alph
}

// Write a function to make the event policy for the sequence
func makeSequenceEventPolicy(sequenceName string) *eventingv1alpha1.EventPolicy {
return NewEventPolicy(resources.SequenceEventPolicyName(sequenceName, ""), testNS,
func makeSequenceEventPolicy(sequenceName string, opts ...EventPolicyOption) *eventingv1alpha1.EventPolicy {
ep := NewEventPolicy(resources.SequenceEventPolicyName(sequenceName, ""), testNS,
// from a subscription
WithEventPolicyOwnerReferences([]metav1.OwnerReference{
{
Expand All @@ -3609,10 +3724,16 @@ func makeSequenceEventPolicy(sequenceName string) *eventingv1alpha1.EventPolicy
},
}...),
)

for _, opt := range opts {
opt(ep)
}

return ep
}

func makeInputChannelEventPolicy(sequenceName, channelName string, sequenceEventPolicyName string) *eventingv1alpha1.EventPolicy {
return NewEventPolicy(resources.SequenceEventPolicyName(sequenceName, sequenceEventPolicyName), testNS,
func makeInputChannelEventPolicy(sequenceName, channelName string, sequenceEventPolicyName string, opts ...EventPolicyOption) *eventingv1alpha1.EventPolicy {
ep := NewEventPolicy(resources.SequenceEventPolicyName(sequenceName, sequenceEventPolicyName), testNS,
WithEventPolicyToRef(channelV1GVK, channelName),
// from a subscription
WithEventPolicyOwnerReferences([]metav1.OwnerReference{
Expand All @@ -3624,6 +3745,12 @@ func makeInputChannelEventPolicy(sequenceName, channelName string, sequenceEvent
}...),
WithEventPolicyLabels(resources.LabelsForSequenceChannelsEventPolicy(sequenceName)),
)

for _, opt := range opts {
opt(ep)
}

return ep
}

func makeInputChannelEventPolicyWithWrongSpec(sequenceName, channelName, policyName string) *eventingv1alpha1.EventPolicy {
Expand Down
8 changes: 8 additions & 0 deletions pkg/reconciler/testing/v1/eventpolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package testing
import (
"context"

eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"knative.dev/eventing/pkg/apis/eventing/v1alpha1"
Expand Down Expand Up @@ -146,6 +148,12 @@ func WithEventPolicyFromSub(sub string) EventPolicyOption {
}
}

func WithEventPolicyFilter(filter eventingv1.SubscriptionsAPIFilter) EventPolicyOption {
return func(ep *v1alpha1.EventPolicy) {
ep.Spec.Filters = append(ep.Spec.Filters, filter)
}
}

func WithEventPolicyLabels(labels map[string]string) EventPolicyOption {
return func(ep *v1alpha1.EventPolicy) {
ep.ObjectMeta.Labels = labels
Expand Down

0 comments on commit b57ac3a

Please sign in to comment.