From 3877cdcfaf866907b74364f8067276dfc33b7725 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Tue, 18 Jun 2024 13:26:01 +0200 Subject: [PATCH 01/13] List applying EventPolicies in InMemoryChannels status --- .../roles/controller-clusterrole.yaml | 8 ++++ .../v1/in_memory_channel_lifecycle.go | 21 +++++++++ .../inmemorychannel/controller/controller.go | 3 ++ .../controller/controller_test.go | 1 + .../controller/inmemorychannel.go | 44 +++++++++++++++++++ 5 files changed, 77 insertions(+) diff --git a/config/channels/in-memory-channel/roles/controller-clusterrole.yaml b/config/channels/in-memory-channel/roles/controller-clusterrole.yaml index 9e1ab6812b4..6164e834f41 100644 --- a/config/channels/in-memory-channel/roles/controller-clusterrole.yaml +++ b/config/channels/in-memory-channel/roles/controller-clusterrole.yaml @@ -45,6 +45,14 @@ rules: - inmemorychannels verbs: - patch + - apiGroups: + - eventing.knative.dev + resources: + - eventpolicies + verbs: + - get + - list + - watch - apiGroups: - "" resources: diff --git a/pkg/apis/messaging/v1/in_memory_channel_lifecycle.go b/pkg/apis/messaging/v1/in_memory_channel_lifecycle.go index 6be9e29f338..3b6441a305a 100644 --- a/pkg/apis/messaging/v1/in_memory_channel_lifecycle.go +++ b/pkg/apis/messaging/v1/in_memory_channel_lifecycle.go @@ -33,6 +33,7 @@ var imcCondSet = apis.NewLivingConditionSet( InMemoryChannelConditionAddressable, InMemoryChannelConditionChannelServiceReady, InMemoryChannelConditionDeadLetterSinkResolved, + InMemoryChannelConditionEventPoliciesReady, ) const ( @@ -64,6 +65,10 @@ const ( // InMemoryChannelConditionDeadLetterSinkResolved has status True when there is a Dead Letter Sink ref or URI // defined in the Spec.Delivery, is a valid destination and its correctly resolved into a valid URI InMemoryChannelConditionDeadLetterSinkResolved apis.ConditionType = "DeadLetterSinkResolved" + + // InMemoryChannelConditionEventPoliciesReady has status True when all the applying EventPolicies for this + // InMemoryChannel are ready. + InMemoryChannelConditionEventPoliciesReady apis.ConditionType = "EventPoliciesReady" ) // GetConditionSet retrieves the condition set for this resource. Implements the KRShaped interface. @@ -182,3 +187,19 @@ func (imcs *InMemoryChannelStatus) MarkDeadLetterSinkResolvedFailed(reason, mess imcs.DeliveryStatus = eventingduck.DeliveryStatus{} imcCondSet.Manage(imcs).MarkFalse(InMemoryChannelConditionDeadLetterSinkResolved, reason, messageFormat, messageA...) } + +func (imcs *InMemoryChannelStatus) MarkEventPoliciesFailed(reason, messageFormat string, messageA ...interface{}) { + imcCondSet.Manage(imcs).MarkFalse(InMemoryChannelConditionEventPoliciesReady, reason, messageFormat, messageA...) +} + +func (imcs *InMemoryChannelStatus) MarkEventPoliciesUnknown(reason, messageFormat string, messageA ...interface{}) { + imcCondSet.Manage(imcs).MarkUnknown(InMemoryChannelConditionEventPoliciesReady, reason, messageFormat, messageA...) +} + +func (imcs *InMemoryChannelStatus) MarkEventPoliciesTrue() { + imcCondSet.Manage(imcs).MarkTrue(InMemoryChannelConditionEventPoliciesReady) +} + +func (imcs *InMemoryChannelStatus) MarkEventPoliciesTrueWithReason(reason, messageFormat string, messageA ...interface{}) { + imcCondSet.Manage(imcs).MarkTrueWithReason(InMemoryChannelConditionEventPoliciesReady, reason, messageFormat, messageA...) +} diff --git a/pkg/reconciler/inmemorychannel/controller/controller.go b/pkg/reconciler/inmemorychannel/controller/controller.go index e60a5414e7b..e9966968b09 100644 --- a/pkg/reconciler/inmemorychannel/controller/controller.go +++ b/pkg/reconciler/inmemorychannel/controller/controller.go @@ -31,6 +31,7 @@ import ( "knative.dev/pkg/resolver" "knative.dev/eventing/pkg/apis/feature" + "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy" "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/inmemorychannel" inmemorychannelreconciler "knative.dev/eventing/pkg/client/injection/reconciler/messaging/v1/inmemorychannel" "knative.dev/eventing/pkg/eventingtls" @@ -140,6 +141,8 @@ func NewController( Handler: controller.HandleAll(globalResync), }) + eventPolicyInformer.Informer().AddEventHandler(controller.HandleAll(globalResync)) + // Setup the watch on the config map of dispatcher config configStore := config.NewEventDispatcherConfigStore(logging.FromContext(ctx)) configStore.WatchConfigs(cmw) diff --git a/pkg/reconciler/inmemorychannel/controller/controller_test.go b/pkg/reconciler/inmemorychannel/controller/controller_test.go index f6c60bda7fa..4a6f1f3d2f0 100644 --- a/pkg/reconciler/inmemorychannel/controller/controller_test.go +++ b/pkg/reconciler/inmemorychannel/controller/controller_test.go @@ -31,6 +31,7 @@ import ( . "knative.dev/pkg/reconciler/testing" // Fake injection informers + _ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy/fake" _ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/inmemorychannel/fake" "knative.dev/eventing/pkg/reconciler/inmemorychannel/controller/config" diff --git a/pkg/reconciler/inmemorychannel/controller/inmemorychannel.go b/pkg/reconciler/inmemorychannel/controller/inmemorychannel.go index 9795dd31fff..d5b851aa710 100644 --- a/pkg/reconciler/inmemorychannel/controller/inmemorychannel.go +++ b/pkg/reconciler/inmemorychannel/controller/inmemorychannel.go @@ -20,6 +20,11 @@ import ( "context" "errors" "fmt" + "strings" + + eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1" + messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" + "knative.dev/eventing/pkg/client/listers/eventing/v1alpha1" "k8s.io/client-go/kubernetes" "k8s.io/utils/pointer" @@ -93,6 +98,8 @@ type Reconciler struct { eventDispatcherConfigStore *config.EventDispatcherConfigStore uriResolver *resolver.URIResolver + + eventPolicyLister v1alpha1.EventPolicyLister } // Check that our Reconciler implements Interface @@ -231,6 +238,43 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, imc *v1.InMemoryChannel) imc.GetConditionSet().Manage(imc.GetStatus()).MarkTrue(v1.InMemoryChannelConditionAddressable) + imc.Status.Policies = nil + applyingEvenPolicies, err := auth.GetEventPoliciesForResource(r.eventPolicyLister, messagingv1.SchemeGroupVersion.WithKind("InMemoryChannel"), imc.ObjectMeta) + if err != nil { + logging.FromContext(ctx).Errorw("Unable to get applying event policies for InMemoryChannel", zap.Error(err)) + imc.Status.MarkEventPoliciesFailed("EventPoliciesGetFailed", "Failed to get applying event policies") + } + + if len(applyingEvenPolicies) > 0 { + unreadyEventPolicies := []string{} + for _, policy := range applyingEvenPolicies { + imc.Status.Policies = append(imc.Status.Policies, eventingduck.AppliedEventPolicyRef{ + Name: policy.Name, + APIVersion: eventingv1alpha1.SchemeGroupVersion.String(), + }) + + if !policy.Status.IsReady() { + unreadyEventPolicies = append(unreadyEventPolicies, policy.Name) + } + } + + if len(unreadyEventPolicies) == 0 { + imc.Status.MarkEventPoliciesTrue() + } else { + imc.Status.MarkEventPoliciesFailed("NotReady", "event policies %s are not ready", strings.Join(unreadyEventPolicies, ", ")) + } + + } else { + // we have no applying event policy. So we set the EP condition to True + if featureFlags.IsOIDCAuthentication() { + // in case of OIDC auth, we also set the message with the default authorization mode + imc.Status.MarkEventPoliciesTrueWithReason("DefaultAuthorizationMode", "Default authz mode is %q", featureFlags[feature.AuthorizationDefaultMode]) + } else { + // in case OIDC is disabled, we set EP condition to true too, but give some message that authz (EPs) require OIDC + imc.Status.MarkEventPoliciesTrueWithReason("OIDCDisabled", "Feature %q must be enabled to support Authorization", feature.OIDCAuthentication) + } + } + // Ok, so now the Dispatcher Deployment & Service have been created, we're golden since the // dispatcher watches the Channel and where it needs to dispatch events to. logging.FromContext(ctx).Debugw("Reconciled InMemoryChannel", zap.Any("InMemoryChannel", imc)) From bf55645ff19350261073ada99af84a2cf0c34661 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Tue, 18 Jun 2024 13:27:13 +0200 Subject: [PATCH 02/13] Add & update integration tests --- .../v1/in_memory_channel_lifecycle_test.go | 34 +++++ pkg/reconciler/channel/channel_test.go | 12 +- .../controller/inmemorychannel_test.go | 141 ++++++++++++++++-- .../dispatcher/inmemorychannel_test.go | 23 ++- .../dispatcher/readiness_test.go | 1 + pkg/reconciler/testing/v1/inmemorychannel.go | 39 +++++ pkg/reconciler/testing/v1/listers.go | 6 + 7 files changed, 233 insertions(+), 23 deletions(-) diff --git a/pkg/apis/messaging/v1/in_memory_channel_lifecycle_test.go b/pkg/apis/messaging/v1/in_memory_channel_lifecycle_test.go index 7c97a86ed21..f1785342672 100644 --- a/pkg/apis/messaging/v1/in_memory_channel_lifecycle_test.go +++ b/pkg/apis/messaging/v1/in_memory_channel_lifecycle_test.go @@ -137,6 +137,9 @@ func TestInMemoryChannelInitializeConditions(t *testing.T) { }, { Type: InMemoryChannelConditionEndpointsReady, Status: corev1.ConditionUnknown, + }, { + Type: InMemoryChannelConditionEventPoliciesReady, + Status: corev1.ConditionUnknown, }, { Type: InMemoryChannelConditionReady, Status: corev1.ConditionUnknown, @@ -177,6 +180,9 @@ func TestInMemoryChannelInitializeConditions(t *testing.T) { }, { Type: InMemoryChannelConditionEndpointsReady, Status: corev1.ConditionUnknown, + }, { + Type: InMemoryChannelConditionEventPoliciesReady, + Status: corev1.ConditionUnknown, }, { Type: InMemoryChannelConditionReady, Status: corev1.ConditionUnknown, @@ -217,6 +223,9 @@ func TestInMemoryChannelInitializeConditions(t *testing.T) { }, { Type: InMemoryChannelConditionEndpointsReady, Status: corev1.ConditionUnknown, + }, { + Type: InMemoryChannelConditionEventPoliciesReady, + Status: corev1.ConditionUnknown, }, { Type: InMemoryChannelConditionReady, Status: corev1.ConditionUnknown, @@ -244,6 +253,7 @@ func TestInMemoryChannelIsReady(t *testing.T) { name string markServiceReady bool markChannelServiceReady bool + markEventPolicyReady bool setAddress bool markEndpointsReady bool DLSResolved *bool @@ -253,6 +263,7 @@ func TestInMemoryChannelIsReady(t *testing.T) { name: "all happy", markServiceReady: true, markChannelServiceReady: true, + markEventPolicyReady: true, markEndpointsReady: true, dispatcherStatus: deploymentStatusReady, setAddress: true, @@ -262,6 +273,7 @@ func TestInMemoryChannelIsReady(t *testing.T) { name: "service not ready", markServiceReady: false, markChannelServiceReady: false, + markEventPolicyReady: true, markEndpointsReady: true, dispatcherStatus: deploymentStatusReady, setAddress: true, @@ -271,6 +283,7 @@ func TestInMemoryChannelIsReady(t *testing.T) { name: "endpoints not ready", markServiceReady: true, markChannelServiceReady: false, + markEventPolicyReady: true, markEndpointsReady: false, dispatcherStatus: deploymentStatusReady, setAddress: true, @@ -281,6 +294,7 @@ func TestInMemoryChannelIsReady(t *testing.T) { markServiceReady: true, markEndpointsReady: true, markChannelServiceReady: false, + markEventPolicyReady: true, dispatcherStatus: deploymentStatusNotReady, setAddress: true, wantReady: false, @@ -289,6 +303,7 @@ func TestInMemoryChannelIsReady(t *testing.T) { name: "address not set", markServiceReady: true, markChannelServiceReady: false, + markEventPolicyReady: true, markEndpointsReady: true, dispatcherStatus: deploymentStatusReady, setAddress: false, @@ -298,6 +313,7 @@ func TestInMemoryChannelIsReady(t *testing.T) { name: "channel service not ready", markServiceReady: true, markChannelServiceReady: false, + markEventPolicyReady: true, markEndpointsReady: true, dispatcherStatus: deploymentStatusReady, setAddress: true, @@ -307,6 +323,7 @@ func TestInMemoryChannelIsReady(t *testing.T) { name: "dls sad", markServiceReady: true, markChannelServiceReady: false, + markEventPolicyReady: true, markEndpointsReady: true, dispatcherStatus: deploymentStatusReady, setAddress: true, @@ -316,6 +333,17 @@ func TestInMemoryChannelIsReady(t *testing.T) { name: "dls not configured", markServiceReady: true, markChannelServiceReady: false, + markEventPolicyReady: true, + markEndpointsReady: true, + dispatcherStatus: deploymentStatusReady, + setAddress: true, + wantReady: false, + DLSResolved: &trueVal, + }, { + name: "EventPolicy not ready", + markServiceReady: true, + markChannelServiceReady: true, + markEventPolicyReady: false, markEndpointsReady: true, dispatcherStatus: deploymentStatusReady, setAddress: true, @@ -336,6 +364,11 @@ func TestInMemoryChannelIsReady(t *testing.T) { } else { cs.MarkChannelServiceFailed("NotReadyChannelService", "testing") } + if test.markEventPolicyReady { + cs.MarkEventPoliciesTrue() + } else { + cs.MarkEndpointsFailed("NotReadyEventPolicy", "testing") + } if test.setAddress { cs.SetAddress(&duckv1.Addressable{URL: &apis.URL{Scheme: "http", Host: "foo.bar"}}) } @@ -437,6 +470,7 @@ func TestInMemoryChannelStatus_SetAddressable(t *testing.T) { func ReadyBrokerStatusWithoutDLS() *InMemoryChannelStatus { imcs := &InMemoryChannelStatus{} imcs.MarkChannelServiceTrue() + imcs.MarkEventPoliciesTrue() imcs.MarkDeadLetterSinkNotConfigured() imcs.MarkEndpointsTrue() imcs.SetAddress(&duckv1.Addressable{URL: apis.HTTP("example.com")}) diff --git a/pkg/reconciler/channel/channel_test.go b/pkg/reconciler/channel/channel_test.go index fe06a2dd633..91e0e469ffd 100644 --- a/pkg/reconciler/channel/channel_test.go +++ b/pkg/reconciler/channel/channel_test.go @@ -135,7 +135,8 @@ func TestReconcile(t *testing.T) { WithInMemoryChannelEndpointsReady(), WithInMemoryChannelChannelServiceReady(), WithInMemoryChannelAddress(backingChannelAddressable), - WithInMemoryChannelDLSUnknown()), + WithInMemoryChannelDLSUnknown(), + WithInMemoryChannelEventPoliciesReady()), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewChannel(channelName, testNS, @@ -165,7 +166,8 @@ func TestReconcile(t *testing.T) { WithInMemoryChannelChannelServiceReady(), WithInMemoryChannelSubscribers(subscribers()), WithInMemoryChannelAddress(backingChannelAddressable), - WithInMemoryChannelDLSUnknown()), + WithInMemoryChannelDLSUnknown(), + WithInMemoryChannelEventPoliciesReady()), }, }, { Name: "Backing channel created", @@ -259,7 +261,8 @@ func TestReconcile(t *testing.T) { WithInMemoryChannelChannelServiceReady(), WithInMemoryChannelSubscribers(subscribers()), WithInMemoryChannelAddress(backingChannelAddressable), - WithInMemoryChannelDLSUnknown()), + WithInMemoryChannelDLSUnknown(), + WithInMemoryChannelEventPoliciesReady()), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewChannel(channelName, testNS, @@ -293,7 +296,8 @@ func TestReconcile(t *testing.T) { WithInMemoryChannelAddress(backingChannelAddressable), WithInMemoryChannelSubscribers(subscribers()), WithInMemoryChannelStatusSubscribers(subscriberStatuses()), - WithInMemoryChannelDLSUnknown()), + WithInMemoryChannelDLSUnknown(), + WithInMemoryChannelEventPoliciesReady()), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewChannel(channelName, testNS, diff --git a/pkg/reconciler/inmemorychannel/controller/inmemorychannel_test.go b/pkg/reconciler/inmemorychannel/controller/inmemorychannel_test.go index ac06a594d52..485cfd82504 100644 --- a/pkg/reconciler/inmemorychannel/controller/inmemorychannel_test.go +++ b/pkg/reconciler/inmemorychannel/controller/inmemorychannel_test.go @@ -22,6 +22,8 @@ import ( "strconv" "testing" + "knative.dev/eventing/pkg/apis/eventing/v1alpha1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/types" "k8s.io/utils/pointer" @@ -75,6 +77,8 @@ const ( maxIdleConnsPerHost = 200 imcGeneration = 7 + + eventPolicyName = "test-event-policy" ) var ( @@ -183,7 +187,8 @@ func TestAllCases(t *testing.T) { WithInMemoryChannelEndpointsReady(), WithInMemoryChannelChannelServiceReady(), WithInMemoryChannelAddress(channelServiceAddress), - WithInMemoryChannelDLSUnknown()), + WithInMemoryChannelDLSUnknown(), + WithInMemoryChannelEventPoliciesReadyBecauseOIDCDisabled()), }}, }, { Name: "the status of deployment is unknown", @@ -206,7 +211,8 @@ func TestAllCases(t *testing.T) { WithInMemoryChannelEndpointsReady(), WithInMemoryChannelChannelServiceReady(), WithInMemoryChannelAddress(channelServiceAddress), - WithInMemoryChannelDLSUnknown()), + WithInMemoryChannelDLSUnknown(), + WithInMemoryChannelEventPoliciesReadyBecauseOIDCDisabled()), }}, }, { Name: "Service does not exist", @@ -326,6 +332,7 @@ func TestAllCases(t *testing.T) { WithInMemoryChannelAddress(channelServiceAddress), WithDeadLetterSink(imcDest), WithInMemoryChannelStatusDLS(dlsStatus), + WithInMemoryChannelEventPoliciesReadyBecauseOIDCDisabled(), ), }}, }, { @@ -368,6 +375,7 @@ func TestAllCases(t *testing.T) { URL: apis.HTTPS(dlsHost), CACerts: pointer.String(string(eventingtlstesting.CA)), }), + WithInMemoryChannelEventPoliciesReadyBecauseOIDCDisabled(), ), }}, }, { @@ -393,6 +401,7 @@ func TestAllCases(t *testing.T) { WithInMemoryChannelAddress(channelServiceAddress), WithDeadLetterSink(imcDest), WithInMemoryChannelStatusDLS(dlsStatus), + WithInMemoryChannelEventPoliciesReadyBecauseOIDCDisabled(), ), }}, }, { @@ -443,6 +452,7 @@ func TestAllCases(t *testing.T) { WithInMemoryChannelAddress(channelServiceAddress), WithDeadLetterSink(imcDest), WithInMemoryChannelStatusDLS(dlsStatus), + WithInMemoryChannelEventPoliciesReadyBecauseOIDCDisabled(), ), }}, }, { @@ -473,6 +483,7 @@ func TestAllCases(t *testing.T) { WithInMemoryChannelAddress(channelServiceAddress), WithDeadLetterSink(imcDest), WithInMemoryChannelStatusDLS(dlsStatus), + WithInMemoryChannelEventPoliciesReadyBecauseOIDCDisabled(), ), }}, }, { @@ -541,6 +552,7 @@ func TestAllCases(t *testing.T) { }), WithDeadLetterSink(imcDest), WithInMemoryChannelStatusDLS(dlsStatus), + WithInMemoryChannelEventPoliciesReadyBecauseOIDCDisabled(), ), }}, Ctx: feature.ToContext(context.Background(), feature.Flags{ @@ -588,6 +600,7 @@ func TestAllCases(t *testing.T) { }), WithDeadLetterSink(imcDest), WithInMemoryChannelStatusDLS(dlsStatus), + WithInMemoryChannelEventPoliciesReadyBecauseOIDCDisabled(), ), }}, Ctx: feature.ToContext(context.Background(), feature.Flags{ @@ -617,18 +630,83 @@ func TestAllCases(t *testing.T) { WithInMemoryChannelServiceReady(), WithInMemoryChannelEndpointsReady(), WithInMemoryChannelChannelServiceReady(), - WithInMemoryChannelAddress(channelServiceAddress), WithDeadLetterSink(imcDest), WithInMemoryChannelStatusDLS(dlsStatus), WithInMemoryChannelAddress(duckv1.Addressable{ URL: channelServiceAddress.URL, Audience: &channelAudience, }), + WithInMemoryChannelEventPoliciesReadyBecauseNoPolicyAndOIDCEnabled(), ), }}, Ctx: feature.ToContext(context.Background(), feature.Flags{ - feature.OIDCAuthentication: feature.Enabled, + feature.OIDCAuthentication: feature.Enabled, + feature.AuthorizationDefaultMode: feature.AuthorizationAllowSameNamespace, }), + }, { + Name: "Should provision applying EventPolicies", + Key: imcKey, + Objects: []runtime.Object{ + makeDLSServiceAsUnstructured(), + makeReadyDeployment(), + makeService(), + makeReadyEndpoints(), + NewInMemoryChannel(imcName, testNS, + WithDeadLetterSink(imcDest), + WithInMemoryChannelGeneration(imcGeneration), + ), + makeChannelService(NewInMemoryChannel(imcName, testNS)), + makeReadyEventPolicy(), + }, + WantErr: false, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewInMemoryChannel(imcName, testNS, + WithInitInMemoryChannelConditions, + WithInMemoryChannelDeploymentReady(), + WithInMemoryChannelGeneration(imcGeneration), + WithInMemoryChannelStatusObservedGeneration(imcGeneration), + WithInMemoryChannelServiceReady(), + WithInMemoryChannelEndpointsReady(), + WithInMemoryChannelChannelServiceReady(), + WithInMemoryChannelAddress(channelServiceAddress), + WithDeadLetterSink(imcDest), + WithInMemoryChannelStatusDLS(dlsStatus), + WithInMemoryChannelEventPoliciesReady(), + WithInMemoryChannelEventPoliciesListed(makeReadyEventPolicy()), + ), + }}, + }, { + Name: "Should mark NotReady on unread EventPolicy", + Key: imcKey, + Objects: []runtime.Object{ + makeDLSServiceAsUnstructured(), + makeReadyDeployment(), + makeService(), + makeReadyEndpoints(), + NewInMemoryChannel(imcName, testNS, + WithDeadLetterSink(imcDest), + WithInMemoryChannelGeneration(imcGeneration), + ), + makeChannelService(NewInMemoryChannel(imcName, testNS)), + makeUnreadyEventPolicy(), + }, + WantErr: false, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewInMemoryChannel(imcName, testNS, + WithInitInMemoryChannelConditions, + WithInMemoryChannelDeploymentReady(), + WithInMemoryChannelGeneration(imcGeneration), + WithInMemoryChannelStatusObservedGeneration(imcGeneration), + WithInMemoryChannelServiceReady(), + WithInMemoryChannelEndpointsReady(), + WithInMemoryChannelChannelServiceReady(), + WithInMemoryChannelAddress(channelServiceAddress), + WithDeadLetterSink(imcDest), + WithInMemoryChannelStatusDLS(dlsStatus), + WithInMemoryChannelEventPoliciesNotReady("NotReady", fmt.Sprintf("event policies %s are not ready", eventPolicyName)), + WithInMemoryChannelEventPoliciesListed(makeUnreadyEventPolicy()), + ), + }}, }} logger := logtesting.TestLogger(t) @@ -645,13 +723,14 @@ func TestAllCases(t *testing.T) { } r := &Reconciler{ - kubeClientSet: fakekubeclient.Get(ctx), - systemNamespace: testNS, - deploymentLister: listers.GetDeploymentLister(), - serviceLister: listers.GetServiceLister(), - endpointsLister: listers.GetEndpointsLister(), - secretLister: listers.GetSecretLister(), - uriResolver: resolver.NewURIResolverFromTracker(ctx, tracker.New(func(types.NamespacedName) {}, 0)), + kubeClientSet: fakekubeclient.Get(ctx), + systemNamespace: testNS, + deploymentLister: listers.GetDeploymentLister(), + serviceLister: listers.GetServiceLister(), + endpointsLister: listers.GetEndpointsLister(), + secretLister: listers.GetSecretLister(), + eventPolicyLister: listers.GetEventPolicyLister(), + uriResolver: resolver.NewURIResolverFromTracker(ctx, tracker.New(func(types.NamespacedName) {}, 0)), } return inmemorychannel.NewReconciler(ctx, logger, fakeeventingclient.Get(ctx), listers.GetInMemoryChannelLister(), @@ -703,6 +782,7 @@ func TestInNamespace(t *testing.T) { WithInMemoryChannelAddress(channelServiceAddress), WithDeadLetterSink(imcDest), WithInMemoryChannelStatusDLS(dlsStatus), + WithInMemoryChannelEventPoliciesReadyBecauseOIDCDisabled(), ), }}, WantEvents: []string{ @@ -742,6 +822,7 @@ func TestInNamespace(t *testing.T) { WithInMemoryChannelAddress(channelServiceAddress), WithDeadLetterSink(imcDest), WithInMemoryChannelStatusDLS(dlsStatus), + WithInMemoryChannelEventPoliciesReadyBecauseOIDCDisabled(), ), }}, }, @@ -763,6 +844,7 @@ func TestInNamespace(t *testing.T) { serviceAccountLister: listers.GetServiceAccountLister(), roleBindingLister: listers.GetRoleBindingLister(), secretLister: listers.GetSecretLister(), + eventPolicyLister: listers.GetEventPolicyLister(), eventDispatcherConfigStore: eventDispatcherConfigStore, uriResolver: resolver.NewURIResolverFromTracker(ctx, tracker.New(func(types.NamespacedName) {}, 0)), } @@ -807,6 +889,43 @@ func makeUnknownDeployment() *appsv1.Deployment { return d } +func makeEventPolicy() *v1alpha1.EventPolicy { + return &v1alpha1.EventPolicy{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "eventing.knative.dev/v1alpha1", + Kind: "EventPolicy", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: testNS, + Name: eventPolicyName, + }, + Spec: v1alpha1.EventPolicySpec{ + To: []v1alpha1.EventPolicySpecTo{ + { + Ref: &v1alpha1.EventPolicyToReference{ + APIVersion: v1.SchemeGroupVersion.String(), + Kind: "InMemoryChannel", + Name: imcName, + }, + }, + }, + }, + Status: v1alpha1.EventPolicyStatus{}, + } +} + +func makeReadyEventPolicy() *v1alpha1.EventPolicy { + policy := makeEventPolicy() + policy.Status.Conditions = []apis.Condition{{Type: v1alpha1.EventPolicyConditionReady, Status: corev1.ConditionTrue}} + return policy +} + +func makeUnreadyEventPolicy() *v1alpha1.EventPolicy { + policy := makeEventPolicy() + policy.Status.Conditions = []apis.Condition{{Type: v1alpha1.EventPolicyConditionReady, Status: corev1.ConditionFalse}} + return policy +} + func makeService() *corev1.Service { return &corev1.Service{ TypeMeta: metav1.TypeMeta{ diff --git a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go index 5e5d8862291..b50a27c351d 100644 --- a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go +++ b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go @@ -268,7 +268,8 @@ func TestAllCases(t *testing.T) { }, }), WithInMemoryChannelAddress(channelServiceAddress), - WithInMemoryChannelDLSUnknown()), + WithInMemoryChannelDLSUnknown(), + WithInMemoryChannelEventPoliciesReady()), }, WantEvents: []string{ Eventf(corev1.EventTypeWarning, "InternalError", "failed to parse Spec.BackoffDelay: expected 'P' period mark at the start: garbage"), @@ -346,7 +347,8 @@ func TestReconciler_ReconcileKind(t *testing.T) { WithInMemoryChannelChannelServiceReady(), WithInMemoryChannelSubscribers(subscribers), WithInMemoryChannelAddress(channelServiceAddress), - WithInMemoryChannelDLSUnknown()), + WithInMemoryChannelDLSUnknown(), + WithInMemoryChannelEventPoliciesReady()), wantSubs: []fanout.Subscription{{ Subscriber: duckv1.Addressable{ URL: apis.HTTP("call1"), @@ -375,7 +377,8 @@ func TestReconciler_ReconcileKind(t *testing.T) { WithInMemoryChannelChannelServiceReady(), WithInMemoryChannelSubscribers(subscribers), WithInMemoryChannelAddress(channelServiceAddress), - WithInMemoryChannelDLSUnknown()), + WithInMemoryChannelDLSUnknown(), + WithInMemoryChannelEventPoliciesReady()), subs: []fanout.Subscription{*subscription1}, wantSubs: []fanout.Subscription{{ Namespace: testNS, @@ -403,7 +406,8 @@ func TestReconciler_ReconcileKind(t *testing.T) { WithInMemoryChannelChannelServiceReady(), WithInMemoryChannelSubscribers(subscribers), WithInMemoryChannelAddress(channelServiceAddress), - WithInMemoryChannelDLSUnknown()), + WithInMemoryChannelDLSUnknown(), + WithInMemoryChannelEventPoliciesReady()), subs: []fanout.Subscription{*subscription1, *subscription2}, wantSubs: []fanout.Subscription{{ Namespace: testNS, @@ -431,7 +435,8 @@ func TestReconciler_ReconcileKind(t *testing.T) { WithInMemoryChannelChannelServiceReady(), WithInMemoryChannelSubscribers([]eventingduckv1.SubscriberSpec{subscriber1}), WithInMemoryChannelAddress(channelServiceAddress), - WithInMemoryChannelDLSUnknown()), + WithInMemoryChannelDLSUnknown(), + WithInMemoryChannelEventPoliciesReady()), subs: []fanout.Subscription{*subscription1, *subscription2}, wantSubs: []fanout.Subscription{ { @@ -453,7 +458,8 @@ func TestReconciler_ReconcileKind(t *testing.T) { WithInMemoryChannelChannelServiceReady(), WithInMemoryChannelSubscribers([]eventingduckv1.SubscriberSpec{subscriber1, subscriber3}), WithInMemoryChannelAddress(channelServiceAddress), - WithInMemoryChannelDLSUnknown()), + WithInMemoryChannelDLSUnknown(), + WithInMemoryChannelEventPoliciesReady()), subs: []fanout.Subscription{*subscription1, *subscription2}, wantSubs: []fanout.Subscription{ { @@ -482,7 +488,8 @@ func TestReconciler_ReconcileKind(t *testing.T) { WithInMemoryChannelChannelServiceReady(), WithInMemoryChannelSubscribers([]eventingduckv1.SubscriberSpec{subscriber1WithLinearRetry}), WithInMemoryChannelAddress(channelServiceAddress), - WithInMemoryChannelDLSUnknown()), + WithInMemoryChannelDLSUnknown(), + WithInMemoryChannelEventPoliciesReady()), subs: []fanout.Subscription{{ Subscriber: duckv1.Addressable{ URL: apis.HTTP("call1"), @@ -537,7 +544,7 @@ func TestReconciler_ReconcileKind(t *testing.T) { } channelHandler := handler.GetChannelHandler(channelServiceAddress.URL.Host) if channelHandler == nil { - t.Errorf("Did not get handler for %s", channelServiceAddress.URL.Host) + t.Fatalf("Did not get handler for %s", channelServiceAddress.URL.Host) } if diff := cmp.Diff(tc.wantSubs, channelHandler.GetSubscriptions(context.TODO()), cmpopts.IgnoreFields(kncloudevents.RetryConfig{}, "Backoff", "CheckRetry"), cmpopts.IgnoreFields(fanout.Subscription{}, "UID")); diff != "" { t.Error("unexpected subs (+want/-got)", diff) diff --git a/pkg/reconciler/inmemorychannel/dispatcher/readiness_test.go b/pkg/reconciler/inmemorychannel/dispatcher/readiness_test.go index e0ec28d8ce1..9bc37da21b2 100644 --- a/pkg/reconciler/inmemorychannel/dispatcher/readiness_test.go +++ b/pkg/reconciler/inmemorychannel/dispatcher/readiness_test.go @@ -40,6 +40,7 @@ func TestReadinessChecker(t *testing.T) { WithInMemoryChannelChannelServiceReady(), WithInMemoryChannelAddress(duckv1.Addressable{URL: apis.HTTP("fake-address")}), WithInMemoryChannelDLSUnknown(), + WithInMemoryChannelEventPoliciesReady(), ), }) diff --git a/pkg/reconciler/testing/v1/inmemorychannel.go b/pkg/reconciler/testing/v1/inmemorychannel.go index bd6398e67bf..408904c1928 100644 --- a/pkg/reconciler/testing/v1/inmemorychannel.go +++ b/pkg/reconciler/testing/v1/inmemorychannel.go @@ -18,6 +18,9 @@ import ( "fmt" "time" + "knative.dev/eventing/pkg/apis/eventing/v1alpha1" + "knative.dev/eventing/pkg/apis/feature" + "k8s.io/apimachinery/pkg/types" appsv1 "k8s.io/api/apps/v1" @@ -25,6 +28,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" duckv1 "knative.dev/pkg/apis/duck/v1" + eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" eventingv1 "knative.dev/eventing/pkg/apis/duck/v1" "knative.dev/eventing/pkg/apis/eventing" "knative.dev/eventing/pkg/apis/messaging" @@ -138,6 +142,41 @@ func WithInMemoryChannelEndpointsReady() InMemoryChannelOption { } } +func WithInMemoryChannelEventPoliciesReady() InMemoryChannelOption { + return func(imc *v1.InMemoryChannel) { + imc.Status.MarkEventPoliciesTrue() + } +} + +func WithInMemoryChannelEventPoliciesNotReady(reason, message string) InMemoryChannelOption { + return func(imc *v1.InMemoryChannel) { + imc.Status.MarkEventPoliciesFailed(reason, message) + } +} + +func WithInMemoryChannelEventPoliciesListed(policy ...*v1alpha1.EventPolicy) InMemoryChannelOption { + return func(imc *v1.InMemoryChannel) { + for _, p := range policy { + imc.Status.Policies = append(imc.Status.Policies, eventingduckv1.AppliedEventPolicyRef{ + APIVersion: p.APIVersion, + Name: p.Name, + }) + } + } +} + +func WithInMemoryChannelEventPoliciesReadyBecauseOIDCDisabled() InMemoryChannelOption { + return func(imc *v1.InMemoryChannel) { + imc.Status.MarkEventPoliciesTrueWithReason("OIDCDisabled", "Feature %q must be enabled to support Authorization", feature.OIDCAuthentication) + } +} + +func WithInMemoryChannelEventPoliciesReadyBecauseNoPolicyAndOIDCEnabled() InMemoryChannelOption { + return func(imc *v1.InMemoryChannel) { + imc.Status.MarkEventPoliciesTrueWithReason("DefaultAuthorizationMode", "Default authz mode is %q", feature.AuthorizationAllowSameNamespace) + } +} + func WithInMemoryChannelAddress(a duckv1.Addressable) InMemoryChannelOption { return func(imc *v1.InMemoryChannel) { imc.Status.SetAddress(&a) diff --git a/pkg/reconciler/testing/v1/listers.go b/pkg/reconciler/testing/v1/listers.go index 788b2fab2cc..0c7c3546d2c 100644 --- a/pkg/reconciler/testing/v1/listers.go +++ b/pkg/reconciler/testing/v1/listers.go @@ -31,12 +31,14 @@ import ( "k8s.io/client-go/tools/cache" eventingduck "knative.dev/eventing/pkg/apis/duck/v1" eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" + eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1" eventingv1beta2 "knative.dev/eventing/pkg/apis/eventing/v1beta2" flowsv1 "knative.dev/eventing/pkg/apis/flows/v1" messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1" fakeeventingclientset "knative.dev/eventing/pkg/client/clientset/versioned/fake" eventinglisters "knative.dev/eventing/pkg/client/listers/eventing/v1" + eventingv1alpha1listers "knative.dev/eventing/pkg/client/listers/eventing/v1alpha1" eventingv1beta2listers "knative.dev/eventing/pkg/client/listers/eventing/v1beta2" flowslisters "knative.dev/eventing/pkg/client/listers/flows/v1" messaginglisters "knative.dev/eventing/pkg/client/listers/messaging/v1" @@ -112,6 +114,10 @@ func (l *Listers) GetEventTypeLister() eventingv1beta2listers.EventTypeLister { return eventingv1beta2listers.NewEventTypeLister(l.indexerFor(&eventingv1beta2.EventType{})) } +func (l *Listers) GetEventPolicyLister() eventingv1alpha1listers.EventPolicyLister { + return eventingv1alpha1listers.NewEventPolicyLister(l.indexerFor(&eventingv1alpha1.EventPolicy{})) +} + func (l *Listers) GetPingSourceLister() sourcelisters.PingSourceLister { return sourcelisters.NewPingSourceLister(l.indexerFor(&sourcesv1.PingSource{})) } From 98f6c795c01bcfec176b0d41f660acb3a74cd588 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Tue, 18 Jun 2024 13:27:42 +0200 Subject: [PATCH 03/13] Reconcile IMC only on relevant EventPolicy changes --- .../inmemorychannel/controller/controller.go | 92 ++++++++++++++++++- 1 file changed, 88 insertions(+), 4 deletions(-) diff --git a/pkg/reconciler/inmemorychannel/controller/controller.go b/pkg/reconciler/inmemorychannel/controller/controller.go index e9966968b09..d4ffd55b960 100644 --- a/pkg/reconciler/inmemorychannel/controller/controller.go +++ b/pkg/reconciler/inmemorychannel/controller/controller.go @@ -18,14 +18,20 @@ package controller import ( "context" - - kubeclient "knative.dev/pkg/client/injection/kube/client" - "knative.dev/pkg/logging" + "strings" "github.com/kelseyhightower/envconfig" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/cache" + "knative.dev/eventing/pkg/apis/eventing/v1alpha1" + "knative.dev/eventing/pkg/apis/messaging" + v1 "knative.dev/eventing/pkg/client/listers/messaging/v1" + kubeclient "knative.dev/pkg/client/injection/kube/client" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" + "knative.dev/pkg/logging" "knative.dev/pkg/system" "knative.dev/pkg/resolver" @@ -66,6 +72,7 @@ func NewController( serviceAccountInformer := serviceaccount.Get(ctx) roleBindingInformer := rolebinding.Get(ctx) secretInformer := secretinformer.Get(ctx) + eventPolicyInformer := eventpolicy.Get(ctx) r := &Reconciler{ kubeClientSet: kubeclient.Get(ctx), @@ -76,6 +83,7 @@ func NewController( serviceAccountLister: serviceAccountInformer.Lister(), roleBindingLister: roleBindingInformer.Lister(), secretLister: secretInformer.Lister(), + eventPolicyLister: eventPolicyInformer.Lister(), } env := &envConfig{} @@ -141,7 +149,24 @@ func NewController( Handler: controller.HandleAll(globalResync), }) - eventPolicyInformer.Informer().AddEventHandler(controller.HandleAll(globalResync)) + // Enqueue the InMemoryChannel, if we have an EventPolicy which was referencing + // or got updated and now is referencing the InMemoryChannel + eventPolicyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + enqueueApplyingChannelOfEventPolicy(inmemorychannelInformer.Lister(), obj, impl.EnqueueKey) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + // Here we need to check if either the old or the new EventPolicy was referencing the InMemoryChannel + + alreadyEnqueued := enqueueApplyingChannelOfEventPolicy(inmemorychannelInformer.Lister(), oldObj, impl.EnqueueKey) + if !alreadyEnqueued { + enqueueApplyingChannelOfEventPolicy(inmemorychannelInformer.Lister(), newObj, impl.EnqueueKey) + } + }, + DeleteFunc: func(obj interface{}) { + enqueueApplyingChannelOfEventPolicy(inmemorychannelInformer.Lister(), obj, impl.EnqueueKey) + }, + }) // Setup the watch on the config map of dispatcher config configStore := config.NewEventDispatcherConfigStore(logging.FromContext(ctx)) @@ -150,3 +175,62 @@ func NewController( return impl } + +// enqueueApplyingChannelOfEventPolicy checks if an InMemoryChannel is referenced in the given EventPolicy. +// If so, it enqueues the channel into the enqueueFn() and returns true. +func enqueueApplyingChannelOfEventPolicy(imcLister v1.InMemoryChannelLister, obj interface{}, enqueueFn func(key types.NamespacedName)) bool { + eventPolicy, ok := obj.(*v1alpha1.EventPolicy) + if !ok { + return false + } + + for _, to := range eventPolicy.Spec.To { + if to.Ref != nil { + toGV, err := schema.ParseGroupVersion(to.Ref.APIVersion) + if err != nil { + return false + } + + if strings.EqualFold(toGV.Group, messaging.GroupName) && + strings.EqualFold(to.Ref.Kind, "InMemoryChannel") { + + enqueueFn(types.NamespacedName{ + Namespace: eventPolicy.Namespace, + Name: to.Ref.Name, + }) + return true + } + } + + if to.Selector != nil { + selectorGV, err := schema.ParseGroupVersion(to.Selector.APIVersion) + if err != nil { + return false + } + + if strings.EqualFold(selectorGV.Group, messaging.GroupName) && + strings.EqualFold(to.Selector.Kind, "InMemoryChannel") { + + selector, err := metav1.LabelSelectorAsSelector(to.Selector.LabelSelector) + if err != nil { + return false + } + + imcs, err := imcLister.InMemoryChannels(eventPolicy.Namespace).List(selector) + if err != nil { + return false + } + + for _, imc := range imcs { + enqueueFn(types.NamespacedName{ + Namespace: eventPolicy.Namespace, + Name: imc.Name, + }) + } + return true + } + } + } + + return false +} From 594e3d7526cb6a8c6c61048b080cd9b01a0ef6fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Tue, 18 Jun 2024 13:52:40 +0200 Subject: [PATCH 04/13] Only list ready EventPolicies in .status.policies --- .../controller/inmemorychannel.go | 11 +++-- .../controller/inmemorychannel_test.go | 47 ++++++++++++++++--- 2 files changed, 46 insertions(+), 12 deletions(-) diff --git a/pkg/reconciler/inmemorychannel/controller/inmemorychannel.go b/pkg/reconciler/inmemorychannel/controller/inmemorychannel.go index d5b851aa710..34d290981ed 100644 --- a/pkg/reconciler/inmemorychannel/controller/inmemorychannel.go +++ b/pkg/reconciler/inmemorychannel/controller/inmemorychannel.go @@ -248,13 +248,14 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, imc *v1.InMemoryChannel) if len(applyingEvenPolicies) > 0 { unreadyEventPolicies := []string{} for _, policy := range applyingEvenPolicies { - imc.Status.Policies = append(imc.Status.Policies, eventingduck.AppliedEventPolicyRef{ - Name: policy.Name, - APIVersion: eventingv1alpha1.SchemeGroupVersion.String(), - }) - if !policy.Status.IsReady() { unreadyEventPolicies = append(unreadyEventPolicies, policy.Name) + } else { + // only add Ready policies to the list + imc.Status.Policies = append(imc.Status.Policies, eventingduck.AppliedEventPolicyRef{ + Name: policy.Name, + APIVersion: eventingv1alpha1.SchemeGroupVersion.String(), + }) } } diff --git a/pkg/reconciler/inmemorychannel/controller/inmemorychannel_test.go b/pkg/reconciler/inmemorychannel/controller/inmemorychannel_test.go index 485cfd82504..4ef273027c8 100644 --- a/pkg/reconciler/inmemorychannel/controller/inmemorychannel_test.go +++ b/pkg/reconciler/inmemorychannel/controller/inmemorychannel_test.go @@ -78,7 +78,8 @@ const ( imcGeneration = 7 - eventPolicyName = "test-event-policy" + readyEventPolicyName = "test-event-policy-ready" + unreadyEventPolicyName = "test-event-policy-unready" ) var ( @@ -676,7 +677,7 @@ func TestAllCases(t *testing.T) { ), }}, }, { - Name: "Should mark NotReady on unread EventPolicy", + Name: "Should mark NotReady on unready EventPolicy", Key: imcKey, Objects: []runtime.Object{ makeDLSServiceAsUnstructured(), @@ -703,8 +704,40 @@ func TestAllCases(t *testing.T) { WithInMemoryChannelAddress(channelServiceAddress), WithDeadLetterSink(imcDest), WithInMemoryChannelStatusDLS(dlsStatus), - WithInMemoryChannelEventPoliciesNotReady("NotReady", fmt.Sprintf("event policies %s are not ready", eventPolicyName)), - WithInMemoryChannelEventPoliciesListed(makeUnreadyEventPolicy()), + WithInMemoryChannelEventPoliciesNotReady("NotReady", fmt.Sprintf("event policies %s are not ready", makeUnreadyEventPolicy().Name)), + ), + }}, + }, { + Name: "Should list only Ready EventPolicy", + Key: imcKey, + Objects: []runtime.Object{ + makeDLSServiceAsUnstructured(), + makeReadyDeployment(), + makeService(), + makeReadyEndpoints(), + NewInMemoryChannel(imcName, testNS, + WithDeadLetterSink(imcDest), + WithInMemoryChannelGeneration(imcGeneration), + ), + makeChannelService(NewInMemoryChannel(imcName, testNS)), + makeReadyEventPolicy(), + makeUnreadyEventPolicy(), + }, + WantErr: false, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewInMemoryChannel(imcName, testNS, + WithInitInMemoryChannelConditions, + WithInMemoryChannelDeploymentReady(), + WithInMemoryChannelGeneration(imcGeneration), + WithInMemoryChannelStatusObservedGeneration(imcGeneration), + WithInMemoryChannelServiceReady(), + WithInMemoryChannelEndpointsReady(), + WithInMemoryChannelChannelServiceReady(), + WithInMemoryChannelAddress(channelServiceAddress), + WithDeadLetterSink(imcDest), + WithInMemoryChannelStatusDLS(dlsStatus), + WithInMemoryChannelEventPoliciesNotReady("NotReady", fmt.Sprintf("event policies %s are not ready", makeUnreadyEventPolicy().Name)), + WithInMemoryChannelEventPoliciesListed(makeReadyEventPolicy()), ), }}, }} @@ -889,7 +922,7 @@ func makeUnknownDeployment() *appsv1.Deployment { return d } -func makeEventPolicy() *v1alpha1.EventPolicy { +func makeEventPolicy(eventPolicyName string) *v1alpha1.EventPolicy { return &v1alpha1.EventPolicy{ TypeMeta: metav1.TypeMeta{ APIVersion: "eventing.knative.dev/v1alpha1", @@ -915,13 +948,13 @@ func makeEventPolicy() *v1alpha1.EventPolicy { } func makeReadyEventPolicy() *v1alpha1.EventPolicy { - policy := makeEventPolicy() + policy := makeEventPolicy(readyEventPolicyName) policy.Status.Conditions = []apis.Condition{{Type: v1alpha1.EventPolicyConditionReady, Status: corev1.ConditionTrue}} return policy } func makeUnreadyEventPolicy() *v1alpha1.EventPolicy { - policy := makeEventPolicy() + policy := makeEventPolicy(unreadyEventPolicyName) policy.Status.Conditions = []apis.Condition{{Type: v1alpha1.EventPolicyConditionReady, Status: corev1.ConditionFalse}} return policy } From fd3530917f57b608eda16df87eff15157d0b7858 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Tue, 18 Jun 2024 18:08:51 +0200 Subject: [PATCH 05/13] Make reason for not being ready a bit more speaking (NotReady -> EventPoliciesNotReady) --- pkg/reconciler/inmemorychannel/controller/inmemorychannel.go | 2 +- .../inmemorychannel/controller/inmemorychannel_test.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/reconciler/inmemorychannel/controller/inmemorychannel.go b/pkg/reconciler/inmemorychannel/controller/inmemorychannel.go index 34d290981ed..3d818463dd6 100644 --- a/pkg/reconciler/inmemorychannel/controller/inmemorychannel.go +++ b/pkg/reconciler/inmemorychannel/controller/inmemorychannel.go @@ -262,7 +262,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, imc *v1.InMemoryChannel) if len(unreadyEventPolicies) == 0 { imc.Status.MarkEventPoliciesTrue() } else { - imc.Status.MarkEventPoliciesFailed("NotReady", "event policies %s are not ready", strings.Join(unreadyEventPolicies, ", ")) + imc.Status.MarkEventPoliciesFailed("EventPoliciesNotReady", "event policies %s are not ready", strings.Join(unreadyEventPolicies, ", ")) } } else { diff --git a/pkg/reconciler/inmemorychannel/controller/inmemorychannel_test.go b/pkg/reconciler/inmemorychannel/controller/inmemorychannel_test.go index 4ef273027c8..dc6611e43ee 100644 --- a/pkg/reconciler/inmemorychannel/controller/inmemorychannel_test.go +++ b/pkg/reconciler/inmemorychannel/controller/inmemorychannel_test.go @@ -704,7 +704,7 @@ func TestAllCases(t *testing.T) { WithInMemoryChannelAddress(channelServiceAddress), WithDeadLetterSink(imcDest), WithInMemoryChannelStatusDLS(dlsStatus), - WithInMemoryChannelEventPoliciesNotReady("NotReady", fmt.Sprintf("event policies %s are not ready", makeUnreadyEventPolicy().Name)), + WithInMemoryChannelEventPoliciesNotReady("EventPoliciesNotReady", fmt.Sprintf("event policies %s are not ready", makeUnreadyEventPolicy().Name)), ), }}, }, { @@ -736,7 +736,7 @@ func TestAllCases(t *testing.T) { WithInMemoryChannelAddress(channelServiceAddress), WithDeadLetterSink(imcDest), WithInMemoryChannelStatusDLS(dlsStatus), - WithInMemoryChannelEventPoliciesNotReady("NotReady", fmt.Sprintf("event policies %s are not ready", makeUnreadyEventPolicy().Name)), + WithInMemoryChannelEventPoliciesNotReady("EventPoliciesNotReady", fmt.Sprintf("event policies %s are not ready", makeUnreadyEventPolicy().Name)), WithInMemoryChannelEventPoliciesListed(makeReadyEventPolicy()), ), }}, From 999f4fc22f1063ba4cd023b4e1469bfc5b6f9764 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Wed, 19 Jun 2024 10:09:06 +0200 Subject: [PATCH 06/13] Make EventPolicy EventHandler generic --- pkg/reconciler/eventpolicy.go | 113 ++++++++++++++++++ .../inmemorychannel/controller/controller.go | 87 +------------- 2 files changed, 118 insertions(+), 82 deletions(-) create mode 100644 pkg/reconciler/eventpolicy.go diff --git a/pkg/reconciler/eventpolicy.go b/pkg/reconciler/eventpolicy.go new file mode 100644 index 00000000000..82bd022e27f --- /dev/null +++ b/pkg/reconciler/eventpolicy.go @@ -0,0 +1,113 @@ +/* +Copyright 2023 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package reconciler + +import ( + "strings" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/cache" + "knative.dev/eventing/pkg/apis/eventing/v1alpha1" +) + +// enqueueApplyingResourcesOfEventPolicy checks if the given GVK is referenced in the given EventPolicy. +// If so, it enqueues it into the enqueueFn(). +func enqueueApplyingResourcesOfEventPolicy(indexer cache.Indexer, gvk schema.GroupVersionKind, policyObj interface{}, enqueueFn func(key types.NamespacedName)) { + eventPolicy, ok := policyObj.(*v1alpha1.EventPolicy) + if !ok { + return + } + + for _, to := range eventPolicy.Spec.To { + if to.Ref != nil { + toGV, err := schema.ParseGroupVersion(to.Ref.APIVersion) + if err != nil { + continue + } + + if strings.EqualFold(toGV.Group, gvk.Group) && + strings.EqualFold(to.Ref.Kind, gvk.Kind) { + + enqueueFn(types.NamespacedName{ + Namespace: eventPolicy.Namespace, + Name: to.Ref.Name, + }) + } + } + + if to.Selector != nil { + selectorGV, err := schema.ParseGroupVersion(to.Selector.APIVersion) + if err != nil { + continue + } + + if strings.EqualFold(selectorGV.Group, gvk.Group) && + strings.EqualFold(to.Selector.Kind, gvk.Kind) { + + selector, err := metav1.LabelSelectorAsSelector(to.Selector.LabelSelector) + if err != nil { + continue + } + + resources := []metav1.Object{} + err = cache.ListAllByNamespace(indexer, eventPolicy.Namespace, selector, func(i interface{}) { + resources = append(resources, i.(metav1.Object)) + }) + if err != nil { + continue + } + + for _, resource := range resources { + enqueueFn(types.NamespacedName{ + Namespace: resource.GetNamespace(), + Name: resource.GetName(), + }) + } + } + } + } +} + +// EventPolicyEventHandler returns an ResourceEventHandler, which enqueues the referencing resources of the EventPolicy +// if the EventPolicy was referencing or got updated and now is referencing the resource of the given GVK. +func EventPolicyEventHandler(indexer cache.Indexer, gvk schema.GroupVersionKind, enqueueFn func(key types.NamespacedName)) cache.ResourceEventHandler { + return cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + enqueueApplyingResourcesOfEventPolicy(indexer, gvk, obj, enqueueFn) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + // Here we need to check if the old or the new EventPolicy was referencing the given GVK + + // make sure, we enqueue the keys only once + toEnqueue := map[types.NamespacedName]struct{}{} + addToEnqueueList := func(key types.NamespacedName) { + toEnqueue[key] = struct{}{} + } + enqueueApplyingResourcesOfEventPolicy(indexer, gvk, oldObj, addToEnqueueList) + enqueueApplyingResourcesOfEventPolicy(indexer, gvk, newObj, addToEnqueueList) + + for k := range toEnqueue { + enqueueFn(k) + } + }, + DeleteFunc: func(obj interface{}) { + enqueueApplyingResourcesOfEventPolicy(indexer, gvk, obj, enqueueFn) + }, + } +} diff --git a/pkg/reconciler/inmemorychannel/controller/controller.go b/pkg/reconciler/inmemorychannel/controller/controller.go index d4ffd55b960..fb6bdb96458 100644 --- a/pkg/reconciler/inmemorychannel/controller/controller.go +++ b/pkg/reconciler/inmemorychannel/controller/controller.go @@ -18,16 +18,11 @@ package controller import ( "context" - "strings" "github.com/kelseyhightower/envconfig" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/cache" - "knative.dev/eventing/pkg/apis/eventing/v1alpha1" - "knative.dev/eventing/pkg/apis/messaging" - v1 "knative.dev/eventing/pkg/client/listers/messaging/v1" + messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" + "knative.dev/eventing/pkg/reconciler" kubeclient "knative.dev/pkg/client/injection/kube/client" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" @@ -149,24 +144,11 @@ func NewController( Handler: controller.HandleAll(globalResync), }) + imcGVK := messagingv1.SchemeGroupVersion.WithKind("InMemoryChannel") + // Enqueue the InMemoryChannel, if we have an EventPolicy which was referencing // or got updated and now is referencing the InMemoryChannel - eventPolicyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - enqueueApplyingChannelOfEventPolicy(inmemorychannelInformer.Lister(), obj, impl.EnqueueKey) - }, - UpdateFunc: func(oldObj, newObj interface{}) { - // Here we need to check if either the old or the new EventPolicy was referencing the InMemoryChannel - - alreadyEnqueued := enqueueApplyingChannelOfEventPolicy(inmemorychannelInformer.Lister(), oldObj, impl.EnqueueKey) - if !alreadyEnqueued { - enqueueApplyingChannelOfEventPolicy(inmemorychannelInformer.Lister(), newObj, impl.EnqueueKey) - } - }, - DeleteFunc: func(obj interface{}) { - enqueueApplyingChannelOfEventPolicy(inmemorychannelInformer.Lister(), obj, impl.EnqueueKey) - }, - }) + eventPolicyInformer.Informer().AddEventHandler(reconciler.EventPolicyEventHandler(inmemorychannelInformer.Informer().GetIndexer(), imcGVK, impl.EnqueueKey)) // Setup the watch on the config map of dispatcher config configStore := config.NewEventDispatcherConfigStore(logging.FromContext(ctx)) @@ -175,62 +157,3 @@ func NewController( return impl } - -// enqueueApplyingChannelOfEventPolicy checks if an InMemoryChannel is referenced in the given EventPolicy. -// If so, it enqueues the channel into the enqueueFn() and returns true. -func enqueueApplyingChannelOfEventPolicy(imcLister v1.InMemoryChannelLister, obj interface{}, enqueueFn func(key types.NamespacedName)) bool { - eventPolicy, ok := obj.(*v1alpha1.EventPolicy) - if !ok { - return false - } - - for _, to := range eventPolicy.Spec.To { - if to.Ref != nil { - toGV, err := schema.ParseGroupVersion(to.Ref.APIVersion) - if err != nil { - return false - } - - if strings.EqualFold(toGV.Group, messaging.GroupName) && - strings.EqualFold(to.Ref.Kind, "InMemoryChannel") { - - enqueueFn(types.NamespacedName{ - Namespace: eventPolicy.Namespace, - Name: to.Ref.Name, - }) - return true - } - } - - if to.Selector != nil { - selectorGV, err := schema.ParseGroupVersion(to.Selector.APIVersion) - if err != nil { - return false - } - - if strings.EqualFold(selectorGV.Group, messaging.GroupName) && - strings.EqualFold(to.Selector.Kind, "InMemoryChannel") { - - selector, err := metav1.LabelSelectorAsSelector(to.Selector.LabelSelector) - if err != nil { - return false - } - - imcs, err := imcLister.InMemoryChannels(eventPolicy.Namespace).List(selector) - if err != nil { - return false - } - - for _, imc := range imcs { - enqueueFn(types.NamespacedName{ - Namespace: eventPolicy.Namespace, - Name: imc.Name, - }) - } - return true - } - } - } - - return false -} From ed231fdcaa2a25f22e0102a6b326aa9e3bd3dd59 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Thu, 20 Jun 2024 11:21:33 +0200 Subject: [PATCH 07/13] Move EventPolicyEventHandler to auth package --- pkg/auth/event_policy.go | 123 ++++++++++++++++++ pkg/reconciler/eventpolicy.go | 113 ---------------- .../inmemorychannel/controller/controller.go | 6 +- 3 files changed, 126 insertions(+), 116 deletions(-) delete mode 100644 pkg/reconciler/eventpolicy.go diff --git a/pkg/auth/event_policy.go b/pkg/auth/event_policy.go index 26efd163409..96e0923a7cf 100644 --- a/pkg/auth/event_policy.go +++ b/pkg/auth/event_policy.go @@ -18,6 +18,8 @@ package auth import ( "fmt" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/cache" "strings" corev1 "k8s.io/api/core/v1" @@ -86,6 +88,52 @@ func GetEventPoliciesForResource(lister listerseventingv1alpha1.EventPolicyListe return relevantPolicies, nil } +// GetApplyingResourcesOfEventPolicyForGK returns all applying resource names of GK of the given event policy. +// It returns only the names, as the resources are part of the same namespace as the event policy. +// +// This function is kind of the "inverse" of GetEventPoliciesForResource. +func GetApplyingResourcesOfEventPolicyForGK(eventPolicy *v1alpha1.EventPolicy, gk schema.GroupKind, gkIndexer cache.Indexer) ([]string, error) { + applyingResources := []string{} + for _, to := range eventPolicy.Spec.To { + if to.Ref != nil { + toGV, err := schema.ParseGroupVersion(to.Ref.APIVersion) + if err != nil { + return nil, fmt.Errorf("could not parse group version of %q: %w", to.Ref.APIVersion, err) + } + + if strings.EqualFold(toGV.Group, gk.Group) && + strings.EqualFold(to.Ref.Kind, gk.Kind) { + + applyingResources = append(applyingResources, to.Ref.Name) + } + } + + if to.Selector != nil { + selectorGV, err := schema.ParseGroupVersion(to.Selector.APIVersion) + if err != nil { + return nil, fmt.Errorf("could not parse group version of %q: %w", to.Selector.APIVersion, err) + } + + if strings.EqualFold(selectorGV.Group, gk.Group) && + strings.EqualFold(to.Selector.Kind, gk.Kind) { + + selector, err := metav1.LabelSelectorAsSelector(to.Selector.LabelSelector) + if err != nil { + return nil, fmt.Errorf("could not parse label selector %v: %w", to.Selector.LabelSelector, err) + } + + err = cache.ListAllByNamespace(gkIndexer, eventPolicy.Namespace, selector, func(i interface{}) { + applyingResources = append(applyingResources, i.(metav1.Object).GetName()) + }) + if err != nil { + return nil, fmt.Errorf("could not list resources of GK in %q namespace for selector %v: %w", eventPolicy.Namespace, selector, err) + } + } + } + } + return applyingResources, nil +} + // ResolveSubjects returns the OIDC service accounts names for the objects referenced in the EventPolicySpecFrom. func ResolveSubjects(resolver *resolver.AuthenticatableResolver, eventPolicy *v1alpha1.EventPolicy) ([]string, error) { allSAs := []string{} @@ -145,3 +193,78 @@ func SubjectContained(sub string, allowedSubs []string) bool { return false } + +func handleApplyingResourcesOfEventPolicy(eventPolicy *v1alpha1.EventPolicy, gk schema.GroupKind, indexer cache.Indexer, handlerFn func(key types.NamespacedName) error) error { + applyingResources, err := GetApplyingResourcesOfEventPolicyForGK(eventPolicy, gk, indexer) + if err != nil { + return fmt.Errorf("could not get applying resources of eventpolicy: %w", err) + } + + for _, resourceName := range applyingResources { + err := handlerFn(types.NamespacedName{ + Namespace: eventPolicy.Namespace, + Name: resourceName, + }) + + if err != nil { + return fmt.Errorf("could not handle resource %q: %w", resourceName, err) + } + } + + return nil +} + +// EventPolicyEventHandler returns an ResourceEventHandler, which passes the referencing resources of the EventPolicy +// to the enqueueFn if the EventPolicy was referencing or got updated and now is referencing the resource of the given GVK. +func EventPolicyEventHandler(indexer cache.Indexer, gk schema.GroupKind, enqueueFn func(key types.NamespacedName)) cache.ResourceEventHandler { + + return cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + eventPolicy, ok := obj.(*v1alpha1.EventPolicy) + if !ok { + return + } + + handleApplyingResourcesOfEventPolicy(eventPolicy, gk, indexer, func(key types.NamespacedName) error { + enqueueFn(key) + return nil + }) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + // Here we need to check if the old or the new EventPolicy was referencing the given GVK + oldEventPolicy, ok := oldObj.(*v1alpha1.EventPolicy) + if !ok { + return + } + newEventPolicy, ok := newObj.(*v1alpha1.EventPolicy) + if !ok { + return + } + + // make sure, we handle the keys only once + toHandle := map[types.NamespacedName]struct{}{} + addToHandleList := func(key types.NamespacedName) error { + toHandle[key] = struct{}{} + return nil + } + + handleApplyingResourcesOfEventPolicy(oldEventPolicy, gk, indexer, addToHandleList) + handleApplyingResourcesOfEventPolicy(newEventPolicy, gk, indexer, addToHandleList) + + for k := range toHandle { + enqueueFn(k) + } + }, + DeleteFunc: func(obj interface{}) { + eventPolicy, ok := obj.(*v1alpha1.EventPolicy) + if !ok { + return + } + + handleApplyingResourcesOfEventPolicy(eventPolicy, gk, indexer, func(key types.NamespacedName) error { + enqueueFn(key) + return nil + }) + }, + } +} diff --git a/pkg/reconciler/eventpolicy.go b/pkg/reconciler/eventpolicy.go deleted file mode 100644 index 82bd022e27f..00000000000 --- a/pkg/reconciler/eventpolicy.go +++ /dev/null @@ -1,113 +0,0 @@ -/* -Copyright 2023 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package reconciler - -import ( - "strings" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/tools/cache" - "knative.dev/eventing/pkg/apis/eventing/v1alpha1" -) - -// enqueueApplyingResourcesOfEventPolicy checks if the given GVK is referenced in the given EventPolicy. -// If so, it enqueues it into the enqueueFn(). -func enqueueApplyingResourcesOfEventPolicy(indexer cache.Indexer, gvk schema.GroupVersionKind, policyObj interface{}, enqueueFn func(key types.NamespacedName)) { - eventPolicy, ok := policyObj.(*v1alpha1.EventPolicy) - if !ok { - return - } - - for _, to := range eventPolicy.Spec.To { - if to.Ref != nil { - toGV, err := schema.ParseGroupVersion(to.Ref.APIVersion) - if err != nil { - continue - } - - if strings.EqualFold(toGV.Group, gvk.Group) && - strings.EqualFold(to.Ref.Kind, gvk.Kind) { - - enqueueFn(types.NamespacedName{ - Namespace: eventPolicy.Namespace, - Name: to.Ref.Name, - }) - } - } - - if to.Selector != nil { - selectorGV, err := schema.ParseGroupVersion(to.Selector.APIVersion) - if err != nil { - continue - } - - if strings.EqualFold(selectorGV.Group, gvk.Group) && - strings.EqualFold(to.Selector.Kind, gvk.Kind) { - - selector, err := metav1.LabelSelectorAsSelector(to.Selector.LabelSelector) - if err != nil { - continue - } - - resources := []metav1.Object{} - err = cache.ListAllByNamespace(indexer, eventPolicy.Namespace, selector, func(i interface{}) { - resources = append(resources, i.(metav1.Object)) - }) - if err != nil { - continue - } - - for _, resource := range resources { - enqueueFn(types.NamespacedName{ - Namespace: resource.GetNamespace(), - Name: resource.GetName(), - }) - } - } - } - } -} - -// EventPolicyEventHandler returns an ResourceEventHandler, which enqueues the referencing resources of the EventPolicy -// if the EventPolicy was referencing or got updated and now is referencing the resource of the given GVK. -func EventPolicyEventHandler(indexer cache.Indexer, gvk schema.GroupVersionKind, enqueueFn func(key types.NamespacedName)) cache.ResourceEventHandler { - return cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - enqueueApplyingResourcesOfEventPolicy(indexer, gvk, obj, enqueueFn) - }, - UpdateFunc: func(oldObj, newObj interface{}) { - // Here we need to check if the old or the new EventPolicy was referencing the given GVK - - // make sure, we enqueue the keys only once - toEnqueue := map[types.NamespacedName]struct{}{} - addToEnqueueList := func(key types.NamespacedName) { - toEnqueue[key] = struct{}{} - } - enqueueApplyingResourcesOfEventPolicy(indexer, gvk, oldObj, addToEnqueueList) - enqueueApplyingResourcesOfEventPolicy(indexer, gvk, newObj, addToEnqueueList) - - for k := range toEnqueue { - enqueueFn(k) - } - }, - DeleteFunc: func(obj interface{}) { - enqueueApplyingResourcesOfEventPolicy(indexer, gvk, obj, enqueueFn) - }, - } -} diff --git a/pkg/reconciler/inmemorychannel/controller/controller.go b/pkg/reconciler/inmemorychannel/controller/controller.go index fb6bdb96458..634faf51f63 100644 --- a/pkg/reconciler/inmemorychannel/controller/controller.go +++ b/pkg/reconciler/inmemorychannel/controller/controller.go @@ -18,11 +18,11 @@ package controller import ( "context" + "knative.dev/eventing/pkg/auth" "github.com/kelseyhightower/envconfig" "k8s.io/client-go/tools/cache" messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" - "knative.dev/eventing/pkg/reconciler" kubeclient "knative.dev/pkg/client/injection/kube/client" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" @@ -144,11 +144,11 @@ func NewController( Handler: controller.HandleAll(globalResync), }) - imcGVK := messagingv1.SchemeGroupVersion.WithKind("InMemoryChannel") + imcGK := messagingv1.SchemeGroupVersion.WithKind("InMemoryChannel").GroupKind() // Enqueue the InMemoryChannel, if we have an EventPolicy which was referencing // or got updated and now is referencing the InMemoryChannel - eventPolicyInformer.Informer().AddEventHandler(reconciler.EventPolicyEventHandler(inmemorychannelInformer.Informer().GetIndexer(), imcGVK, impl.EnqueueKey)) + eventPolicyInformer.Informer().AddEventHandler(auth.EventPolicyEventHandler(inmemorychannelInformer.Informer().GetIndexer(), imcGK, impl.EnqueueKey)) // Setup the watch on the config map of dispatcher config configStore := config.NewEventDispatcherConfigStore(logging.FromContext(ctx)) From a1e0b72b75528a1e644a93af4ac3bc731083c0fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Thu, 20 Jun 2024 12:01:04 +0200 Subject: [PATCH 08/13] Refactor to use EventPolicy helper functions from reconciler/testing package --- .../controller/inmemorychannel_test.go | 81 ++++++-------- pkg/reconciler/testing/v1/eventpolicy.go | 104 ++++++++++++++++++ pkg/reconciler/testing/v1/inmemorychannel.go | 8 +- 3 files changed, 144 insertions(+), 49 deletions(-) create mode 100644 pkg/reconciler/testing/v1/eventpolicy.go diff --git a/pkg/reconciler/inmemorychannel/controller/inmemorychannel_test.go b/pkg/reconciler/inmemorychannel/controller/inmemorychannel_test.go index dc6611e43ee..11c0aca30cd 100644 --- a/pkg/reconciler/inmemorychannel/controller/inmemorychannel_test.go +++ b/pkg/reconciler/inmemorychannel/controller/inmemorychannel_test.go @@ -657,7 +657,14 @@ func TestAllCases(t *testing.T) { WithInMemoryChannelGeneration(imcGeneration), ), makeChannelService(NewInMemoryChannel(imcName, testNS)), - makeReadyEventPolicy(), + NewEventPolicy(readyEventPolicyName, testNS, + WithReadyEventPolicyCondition, + WithEventPolicyToRef(v1alpha1.EventPolicyToReference{ + APIVersion: v1.SchemeGroupVersion.String(), + Kind: "InMemoryChannel", + Name: imcName, + }), + ), }, WantErr: false, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ @@ -673,7 +680,7 @@ func TestAllCases(t *testing.T) { WithDeadLetterSink(imcDest), WithInMemoryChannelStatusDLS(dlsStatus), WithInMemoryChannelEventPoliciesReady(), - WithInMemoryChannelEventPoliciesListed(makeReadyEventPolicy()), + WithInMemoryChannelEventPoliciesListed(readyEventPolicyName), ), }}, }, { @@ -689,7 +696,14 @@ func TestAllCases(t *testing.T) { WithInMemoryChannelGeneration(imcGeneration), ), makeChannelService(NewInMemoryChannel(imcName, testNS)), - makeUnreadyEventPolicy(), + NewEventPolicy(unreadyEventPolicyName, testNS, + WithUnreadyEventPolicyCondition, + WithEventPolicyToRef(v1alpha1.EventPolicyToReference{ + APIVersion: v1.SchemeGroupVersion.String(), + Kind: "InMemoryChannel", + Name: imcName, + }), + ), }, WantErr: false, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ @@ -704,7 +718,7 @@ func TestAllCases(t *testing.T) { WithInMemoryChannelAddress(channelServiceAddress), WithDeadLetterSink(imcDest), WithInMemoryChannelStatusDLS(dlsStatus), - WithInMemoryChannelEventPoliciesNotReady("EventPoliciesNotReady", fmt.Sprintf("event policies %s are not ready", makeUnreadyEventPolicy().Name)), + WithInMemoryChannelEventPoliciesNotReady("EventPoliciesNotReady", fmt.Sprintf("event policies %s are not ready", unreadyEventPolicyName)), ), }}, }, { @@ -720,8 +734,22 @@ func TestAllCases(t *testing.T) { WithInMemoryChannelGeneration(imcGeneration), ), makeChannelService(NewInMemoryChannel(imcName, testNS)), - makeReadyEventPolicy(), - makeUnreadyEventPolicy(), + NewEventPolicy(readyEventPolicyName, testNS, + WithReadyEventPolicyCondition, + WithEventPolicyToRef(v1alpha1.EventPolicyToReference{ + APIVersion: v1.SchemeGroupVersion.String(), + Kind: "InMemoryChannel", + Name: imcName, + }), + ), + NewEventPolicy(unreadyEventPolicyName, testNS, + WithUnreadyEventPolicyCondition, + WithEventPolicyToRef(v1alpha1.EventPolicyToReference{ + APIVersion: v1.SchemeGroupVersion.String(), + Kind: "InMemoryChannel", + Name: imcName, + }), + ), }, WantErr: false, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ @@ -736,8 +764,8 @@ func TestAllCases(t *testing.T) { WithInMemoryChannelAddress(channelServiceAddress), WithDeadLetterSink(imcDest), WithInMemoryChannelStatusDLS(dlsStatus), - WithInMemoryChannelEventPoliciesNotReady("EventPoliciesNotReady", fmt.Sprintf("event policies %s are not ready", makeUnreadyEventPolicy().Name)), - WithInMemoryChannelEventPoliciesListed(makeReadyEventPolicy()), + WithInMemoryChannelEventPoliciesNotReady("EventPoliciesNotReady", fmt.Sprintf("event policies %s are not ready", unreadyEventPolicyName)), + WithInMemoryChannelEventPoliciesListed(readyEventPolicyName), ), }}, }} @@ -922,43 +950,6 @@ func makeUnknownDeployment() *appsv1.Deployment { return d } -func makeEventPolicy(eventPolicyName string) *v1alpha1.EventPolicy { - return &v1alpha1.EventPolicy{ - TypeMeta: metav1.TypeMeta{ - APIVersion: "eventing.knative.dev/v1alpha1", - Kind: "EventPolicy", - }, - ObjectMeta: metav1.ObjectMeta{ - Namespace: testNS, - Name: eventPolicyName, - }, - Spec: v1alpha1.EventPolicySpec{ - To: []v1alpha1.EventPolicySpecTo{ - { - Ref: &v1alpha1.EventPolicyToReference{ - APIVersion: v1.SchemeGroupVersion.String(), - Kind: "InMemoryChannel", - Name: imcName, - }, - }, - }, - }, - Status: v1alpha1.EventPolicyStatus{}, - } -} - -func makeReadyEventPolicy() *v1alpha1.EventPolicy { - policy := makeEventPolicy(readyEventPolicyName) - policy.Status.Conditions = []apis.Condition{{Type: v1alpha1.EventPolicyConditionReady, Status: corev1.ConditionTrue}} - return policy -} - -func makeUnreadyEventPolicy() *v1alpha1.EventPolicy { - policy := makeEventPolicy(unreadyEventPolicyName) - policy.Status.Conditions = []apis.Condition{{Type: v1alpha1.EventPolicyConditionReady, Status: corev1.ConditionFalse}} - return policy -} - func makeService() *corev1.Service { return &corev1.Service{ TypeMeta: metav1.TypeMeta{ diff --git a/pkg/reconciler/testing/v1/eventpolicy.go b/pkg/reconciler/testing/v1/eventpolicy.go new file mode 100644 index 00000000000..95c4344a79a --- /dev/null +++ b/pkg/reconciler/testing/v1/eventpolicy.go @@ -0,0 +1,104 @@ +/* +Copyright 2024 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + "context" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "knative.dev/eventing/pkg/apis/eventing/v1alpha1" + "knative.dev/pkg/apis" +) + +// EventPolicyOption enables further configuration of an EventPolicy. +type EventPolicyOption func(*v1alpha1.EventPolicy) + +// NewEventPolicy creates a EventPolicy with EventPolicyOptions. +func NewEventPolicy(name, namespace string, o ...EventPolicyOption) *v1alpha1.EventPolicy { + ep := &v1alpha1.EventPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: name, + }, + } + for _, opt := range o { + opt(ep) + } + ep.SetDefaults(context.Background()) + + return ep +} + +func WithInitEventPolicyConditions(et *v1alpha1.EventPolicy) { + et.Status.InitializeConditions() +} + +func WithReadyEventPolicyCondition(ep *v1alpha1.EventPolicy) { + ep.Status.Conditions = []apis.Condition{ + { + Type: v1alpha1.EventPolicyConditionReady, + Status: corev1.ConditionTrue, + }, + } +} + +func WithUnreadyEventPolicyCondition(ep *v1alpha1.EventPolicy) { + ep.Status.Conditions = []apis.Condition{ + { + Type: v1alpha1.EventPolicyConditionReady, + Status: corev1.ConditionFalse, + }, + } +} + +func WithEventPolicyTo(tos ...v1alpha1.EventPolicySpecTo) EventPolicyOption { + return func(ep *v1alpha1.EventPolicy) { + for _, to := range tos { + ep.Spec.To = append(ep.Spec.To, to) + } + } +} + +func WithEventPolicyToRef(ref v1alpha1.EventPolicyToReference) EventPolicyOption { + return func(ep *v1alpha1.EventPolicy) { + ep.Spec.To = append(ep.Spec.To, v1alpha1.EventPolicySpecTo{ + Ref: &ref, + }) + } +} + +func WithEventPolicyFrom(froms ...v1alpha1.EventPolicySpecFrom) EventPolicyOption { + return func(ep *v1alpha1.EventPolicy) { + for _, from := range froms { + ep.Spec.From = append(ep.Spec.From, from) + } + } +} + +func WithEventPolicyLabels(labels map[string]string) EventPolicyOption { + return func(ep *v1alpha1.EventPolicy) { + ep.ObjectMeta.Labels = labels + } +} + +func WithEventPolicyOwnerReference(ownerRef metav1.OwnerReference) EventPolicyOption { + return func(ep *v1alpha1.EventPolicy) { + ep.ObjectMeta.OwnerReferences = []metav1.OwnerReference{ + ownerRef, + } + } +} diff --git a/pkg/reconciler/testing/v1/inmemorychannel.go b/pkg/reconciler/testing/v1/inmemorychannel.go index 408904c1928..4405cfaf533 100644 --- a/pkg/reconciler/testing/v1/inmemorychannel.go +++ b/pkg/reconciler/testing/v1/inmemorychannel.go @@ -154,12 +154,12 @@ func WithInMemoryChannelEventPoliciesNotReady(reason, message string) InMemoryCh } } -func WithInMemoryChannelEventPoliciesListed(policy ...*v1alpha1.EventPolicy) InMemoryChannelOption { +func WithInMemoryChannelEventPoliciesListed(policyNames ...string) InMemoryChannelOption { return func(imc *v1.InMemoryChannel) { - for _, p := range policy { + for _, names := range policyNames { imc.Status.Policies = append(imc.Status.Policies, eventingduckv1.AppliedEventPolicyRef{ - APIVersion: p.APIVersion, - Name: p.Name, + APIVersion: v1alpha1.SchemeGroupVersion.String(), + Name: names, }) } } From 777732a4303cf571d26c91c3d4349e5b577981f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Thu, 20 Jun 2024 14:02:27 +0200 Subject: [PATCH 09/13] Add unit test for GetApplyingResourcesOfEventPolicyForGK --- pkg/auth/event_policy.go | 3 +- pkg/auth/event_policy_test.go | 170 ++++++++++++++++++ .../inmemorychannel/controller/controller.go | 1 + pkg/reconciler/testing/v1/eventpolicy.go | 1 + 4 files changed, 174 insertions(+), 1 deletion(-) diff --git a/pkg/auth/event_policy.go b/pkg/auth/event_policy.go index 96e0923a7cf..0de435ac01a 100644 --- a/pkg/auth/event_policy.go +++ b/pkg/auth/event_policy.go @@ -18,9 +18,10 @@ package auth import ( "fmt" + "strings" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/cache" - "strings" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" diff --git a/pkg/auth/event_policy_test.go b/pkg/auth/event_policy_test.go index 64f972c7ba6..c65216e05b2 100644 --- a/pkg/auth/event_policy_test.go +++ b/pkg/auth/event_policy_test.go @@ -18,9 +18,12 @@ package auth import ( "context" + "reflect" "strings" "testing" + "k8s.io/apimachinery/pkg/runtime/schema" + "github.com/google/go-cmp/cmp" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -30,6 +33,7 @@ import ( "knative.dev/eventing/pkg/apis/eventing/v1alpha1" sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1" "knative.dev/eventing/pkg/client/clientset/versioned/scheme" + brokerinformerfake "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker/fake" eventpolicyinformerfake "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy/fake" duckv1 "knative.dev/pkg/apis/duck/v1" "knative.dev/pkg/client/injection/ducks/duck/v1/authstatus" @@ -764,3 +768,169 @@ func TestSubjectContained(t *testing.T) { }) } } + +func TestGetApplyingResourcesOfEventPolicyForGK(t *testing.T) { + + tests := []struct { + name string + eventPolicySpecTo []v1alpha1.EventPolicySpecTo + gk schema.GroupKind + brokerObjects []*eventingv1.Broker + want []string + wantErr bool + }{ + { + name: "Returns resource from direct reference", + eventPolicySpecTo: []v1alpha1.EventPolicySpecTo{ + { + Ref: &v1alpha1.EventPolicyToReference{ + APIVersion: eventingv1.SchemeGroupVersion.String(), + Kind: "Broker", + Name: "my-broker", + }, + }, + }, + gk: schema.GroupKind{ + Group: "eventing.knative.dev", + Kind: "Broker", + }, + brokerObjects: []*eventingv1.Broker{}, //for a direct reference, we don't need the indexer later + want: []string{ + "my-broker", + }, + }, { + name: "Ignores resources of other Group&Kind in direct reference", + eventPolicySpecTo: []v1alpha1.EventPolicySpecTo{ + { + Ref: &v1alpha1.EventPolicyToReference{ + APIVersion: eventingv1.SchemeGroupVersion.String(), + Kind: "Broker", + Name: "my-broker", + }, + }, { + Ref: &v1alpha1.EventPolicyToReference{ + APIVersion: eventingv1.SchemeGroupVersion.String(), + Kind: "Another-Kind", + Name: "another-res", + }, + }, + }, + gk: schema.GroupKind{ + Group: "eventing.knative.dev", + Kind: "Broker", + }, + brokerObjects: []*eventingv1.Broker{}, + want: []string{ + "my-broker", + }, + }, { + name: "Returns object which match selector", + eventPolicySpecTo: []v1alpha1.EventPolicySpecTo{ + { + Selector: &v1alpha1.EventPolicySelector{ + LabelSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "key": "value", + }, + }, + TypeMeta: &metav1.TypeMeta{ + APIVersion: eventingv1.SchemeGroupVersion.String(), + Kind: "Broker", + }, + }, + }, + }, + gk: schema.GroupKind{ + Group: "eventing.knative.dev", + Kind: "Broker", + }, + brokerObjects: []*eventingv1.Broker{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "my-broker", + Namespace: "my-ns", + Labels: map[string]string{ + "key": "value", + }, + }, + }, { + ObjectMeta: metav1.ObjectMeta{ + Name: "my-other-broker", + Namespace: "my-ns", + Labels: map[string]string{ + "other-key": "other-value", + }, + }, + }, + }, + want: []string{ + "my-broker", + }, + }, { + name: "Checks on GKs on selector match", + eventPolicySpecTo: []v1alpha1.EventPolicySpecTo{ + { + Selector: &v1alpha1.EventPolicySelector{ + LabelSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "key": "value", + }, + }, + TypeMeta: &metav1.TypeMeta{ + APIVersion: eventingv1.SchemeGroupVersion.String(), + Kind: "Broker", + }, + }, + }, + }, + gk: schema.GroupKind{ + Group: "eventing.knative.dev", + Kind: "Other-Kind", + }, + brokerObjects: []*eventingv1.Broker{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "my-broker", + Namespace: "my-ns", + Labels: map[string]string{ + "key": "value", + }, + }, + }, + }, + want: []string{}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx, _ := reconcilertesting.SetupFakeContext(t) + + brokerIndexer := brokerinformerfake.Get(ctx).Informer().GetIndexer() + for _, b := range tt.brokerObjects { + err := brokerIndexer.Add(b) + if err != nil { + t.Fatalf("could not add broker object to indexer: %v", err) + } + } + + eventPolicy := &v1alpha1.EventPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-policy", + Namespace: "my-ns", + }, + Spec: v1alpha1.EventPolicySpec{ + To: tt.eventPolicySpecTo, + }, + } + + got, err := GetApplyingResourcesOfEventPolicyForGK(eventPolicy, tt.gk, brokerIndexer) + if (err != nil) != tt.wantErr { + t.Errorf("GetApplyingResourcesOfEventPolicyForGK() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("GetApplyingResourcesOfEventPolicyForGK() got = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/reconciler/inmemorychannel/controller/controller.go b/pkg/reconciler/inmemorychannel/controller/controller.go index 634faf51f63..2102b3728e7 100644 --- a/pkg/reconciler/inmemorychannel/controller/controller.go +++ b/pkg/reconciler/inmemorychannel/controller/controller.go @@ -18,6 +18,7 @@ package controller import ( "context" + "knative.dev/eventing/pkg/auth" "github.com/kelseyhightower/envconfig" diff --git a/pkg/reconciler/testing/v1/eventpolicy.go b/pkg/reconciler/testing/v1/eventpolicy.go index 95c4344a79a..ee20e7cd0a6 100644 --- a/pkg/reconciler/testing/v1/eventpolicy.go +++ b/pkg/reconciler/testing/v1/eventpolicy.go @@ -18,6 +18,7 @@ package testing import ( "context" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "knative.dev/eventing/pkg/apis/eventing/v1alpha1" From c5fefd028ada3584d6aed479f2c3480f86e0d6b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Thu, 20 Jun 2024 14:07:51 +0200 Subject: [PATCH 10/13] Fix linter issues --- pkg/reconciler/testing/v1/eventpolicy.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/pkg/reconciler/testing/v1/eventpolicy.go b/pkg/reconciler/testing/v1/eventpolicy.go index ee20e7cd0a6..a54b07a03e0 100644 --- a/pkg/reconciler/testing/v1/eventpolicy.go +++ b/pkg/reconciler/testing/v1/eventpolicy.go @@ -68,9 +68,7 @@ func WithUnreadyEventPolicyCondition(ep *v1alpha1.EventPolicy) { func WithEventPolicyTo(tos ...v1alpha1.EventPolicySpecTo) EventPolicyOption { return func(ep *v1alpha1.EventPolicy) { - for _, to := range tos { - ep.Spec.To = append(ep.Spec.To, to) - } + ep.Spec.To = append(ep.Spec.To, tos...) } } @@ -84,9 +82,7 @@ func WithEventPolicyToRef(ref v1alpha1.EventPolicyToReference) EventPolicyOption func WithEventPolicyFrom(froms ...v1alpha1.EventPolicySpecFrom) EventPolicyOption { return func(ep *v1alpha1.EventPolicy) { - for _, from := range froms { - ep.Spec.From = append(ep.Spec.From, from) - } + ep.Spec.From = append(ep.Spec.From, froms...) } } From 77ec8d1cba4f955cb032f5a5b769ad31de33b937 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Thu, 20 Jun 2024 14:30:01 +0200 Subject: [PATCH 11/13] Match everything in namespace on empty .spec.to --- pkg/auth/event_policy.go | 61 +++++++++++++++++++++-------------- pkg/auth/event_policy_test.go | 21 ++++++++++++ 2 files changed, 58 insertions(+), 24 deletions(-) diff --git a/pkg/auth/event_policy.go b/pkg/auth/event_policy.go index 0de435ac01a..756e38dd515 100644 --- a/pkg/auth/event_policy.go +++ b/pkg/auth/event_policy.go @@ -95,43 +95,56 @@ func GetEventPoliciesForResource(lister listerseventingv1alpha1.EventPolicyListe // This function is kind of the "inverse" of GetEventPoliciesForResource. func GetApplyingResourcesOfEventPolicyForGK(eventPolicy *v1alpha1.EventPolicy, gk schema.GroupKind, gkIndexer cache.Indexer) ([]string, error) { applyingResources := []string{} - for _, to := range eventPolicy.Spec.To { - if to.Ref != nil { - toGV, err := schema.ParseGroupVersion(to.Ref.APIVersion) - if err != nil { - return nil, fmt.Errorf("could not parse group version of %q: %w", to.Ref.APIVersion, err) - } - if strings.EqualFold(toGV.Group, gk.Group) && - strings.EqualFold(to.Ref.Kind, gk.Kind) { + if eventPolicy.Spec.To == nil { + // empty .spec.to matches everything in namespace - applyingResources = append(applyingResources, to.Ref.Name) - } + err := cache.ListAllByNamespace(gkIndexer, eventPolicy.Namespace, labels.Everything(), func(i interface{}) { + applyingResources = append(applyingResources, i.(metav1.Object).GetName()) + }) + if err != nil { + return nil, fmt.Errorf("failed to list all %s %s resources in %s: %w", gk.Group, gk.Kind, eventPolicy.Namespace, err) } + } else { + for _, to := range eventPolicy.Spec.To { + if to.Ref != nil { + toGV, err := schema.ParseGroupVersion(to.Ref.APIVersion) + if err != nil { + return nil, fmt.Errorf("could not parse group version of %q: %w", to.Ref.APIVersion, err) + } - if to.Selector != nil { - selectorGV, err := schema.ParseGroupVersion(to.Selector.APIVersion) - if err != nil { - return nil, fmt.Errorf("could not parse group version of %q: %w", to.Selector.APIVersion, err) - } + if strings.EqualFold(toGV.Group, gk.Group) && + strings.EqualFold(to.Ref.Kind, gk.Kind) { - if strings.EqualFold(selectorGV.Group, gk.Group) && - strings.EqualFold(to.Selector.Kind, gk.Kind) { + applyingResources = append(applyingResources, to.Ref.Name) + } + } - selector, err := metav1.LabelSelectorAsSelector(to.Selector.LabelSelector) + if to.Selector != nil { + selectorGV, err := schema.ParseGroupVersion(to.Selector.APIVersion) if err != nil { - return nil, fmt.Errorf("could not parse label selector %v: %w", to.Selector.LabelSelector, err) + return nil, fmt.Errorf("could not parse group version of %q: %w", to.Selector.APIVersion, err) } - err = cache.ListAllByNamespace(gkIndexer, eventPolicy.Namespace, selector, func(i interface{}) { - applyingResources = append(applyingResources, i.(metav1.Object).GetName()) - }) - if err != nil { - return nil, fmt.Errorf("could not list resources of GK in %q namespace for selector %v: %w", eventPolicy.Namespace, selector, err) + if strings.EqualFold(selectorGV.Group, gk.Group) && + strings.EqualFold(to.Selector.Kind, gk.Kind) { + + selector, err := metav1.LabelSelectorAsSelector(to.Selector.LabelSelector) + if err != nil { + return nil, fmt.Errorf("could not parse label selector %v: %w", to.Selector.LabelSelector, err) + } + + err = cache.ListAllByNamespace(gkIndexer, eventPolicy.Namespace, selector, func(i interface{}) { + applyingResources = append(applyingResources, i.(metav1.Object).GetName()) + }) + if err != nil { + return nil, fmt.Errorf("could not list resources of GK in %q namespace for selector %v: %w", eventPolicy.Namespace, selector, err) + } } } } } + return applyingResources, nil } diff --git a/pkg/auth/event_policy_test.go b/pkg/auth/event_policy_test.go index c65216e05b2..9faeade1197 100644 --- a/pkg/auth/event_policy_test.go +++ b/pkg/auth/event_policy_test.go @@ -899,6 +899,27 @@ func TestGetApplyingResourcesOfEventPolicyForGK(t *testing.T) { }, }, want: []string{}, + }, { + name: "Empty .spec.to matches everything in namespace", + eventPolicySpecTo: nil, + gk: schema.GroupKind{ + Group: "eventing.knative.dev", + Kind: "Broker", + }, + brokerObjects: []*eventingv1.Broker{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "my-broker", + Namespace: "my-ns", + Labels: map[string]string{ + "key": "value", + }, + }, + }, + }, + want: []string{ + "my-broker", + }, }, } for _, tt := range tests { From 2e5c596b94612782022db059ad2686d981a38988 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Thu, 20 Jun 2024 14:38:32 +0200 Subject: [PATCH 12/13] GetApplyingResourcesOfEventPolicyForGK should return list of unique elements --- pkg/auth/event_policy.go | 16 ++++++++----- pkg/auth/event_policy_test.go | 42 +++++++++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+), 5 deletions(-) diff --git a/pkg/auth/event_policy.go b/pkg/auth/event_policy.go index 756e38dd515..7af5eceb428 100644 --- a/pkg/auth/event_policy.go +++ b/pkg/auth/event_policy.go @@ -94,13 +94,14 @@ func GetEventPoliciesForResource(lister listerseventingv1alpha1.EventPolicyListe // // This function is kind of the "inverse" of GetEventPoliciesForResource. func GetApplyingResourcesOfEventPolicyForGK(eventPolicy *v1alpha1.EventPolicy, gk schema.GroupKind, gkIndexer cache.Indexer) ([]string, error) { - applyingResources := []string{} + applyingResources := map[string]struct{}{} if eventPolicy.Spec.To == nil { // empty .spec.to matches everything in namespace err := cache.ListAllByNamespace(gkIndexer, eventPolicy.Namespace, labels.Everything(), func(i interface{}) { - applyingResources = append(applyingResources, i.(metav1.Object).GetName()) + name := i.(metav1.Object).GetName() + applyingResources[name] = struct{}{} }) if err != nil { return nil, fmt.Errorf("failed to list all %s %s resources in %s: %w", gk.Group, gk.Kind, eventPolicy.Namespace, err) @@ -116,7 +117,7 @@ func GetApplyingResourcesOfEventPolicyForGK(eventPolicy *v1alpha1.EventPolicy, g if strings.EqualFold(toGV.Group, gk.Group) && strings.EqualFold(to.Ref.Kind, gk.Kind) { - applyingResources = append(applyingResources, to.Ref.Name) + applyingResources[to.Ref.Name] = struct{}{} } } @@ -135,7 +136,8 @@ func GetApplyingResourcesOfEventPolicyForGK(eventPolicy *v1alpha1.EventPolicy, g } err = cache.ListAllByNamespace(gkIndexer, eventPolicy.Namespace, selector, func(i interface{}) { - applyingResources = append(applyingResources, i.(metav1.Object).GetName()) + name := i.(metav1.Object).GetName() + applyingResources[name] = struct{}{} }) if err != nil { return nil, fmt.Errorf("could not list resources of GK in %q namespace for selector %v: %w", eventPolicy.Namespace, selector, err) @@ -145,7 +147,11 @@ func GetApplyingResourcesOfEventPolicyForGK(eventPolicy *v1alpha1.EventPolicy, g } } - return applyingResources, nil + res := []string{} + for name := range applyingResources { + res = append(res, name) + } + return res, nil } // ResolveSubjects returns the OIDC service accounts names for the objects referenced in the EventPolicySpecFrom. diff --git a/pkg/auth/event_policy_test.go b/pkg/auth/event_policy_test.go index 9faeade1197..c958effac10 100644 --- a/pkg/auth/event_policy_test.go +++ b/pkg/auth/event_policy_test.go @@ -920,6 +920,48 @@ func TestGetApplyingResourcesOfEventPolicyForGK(t *testing.T) { want: []string{ "my-broker", }, + }, { + name: "Returns elements only once in slice", + eventPolicySpecTo: []v1alpha1.EventPolicySpecTo{ + { + Ref: &v1alpha1.EventPolicyToReference{ + APIVersion: eventingv1.SchemeGroupVersion.String(), + Kind: "Broker", + Name: "my-broker", + }, + }, + { + Selector: &v1alpha1.EventPolicySelector{ + LabelSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "key": "value", + }, + }, + TypeMeta: &metav1.TypeMeta{ + APIVersion: eventingv1.SchemeGroupVersion.String(), + Kind: "Broker", + }, + }, + }, + }, + gk: schema.GroupKind{ + Group: "eventing.knative.dev", + Kind: "Broker", + }, + brokerObjects: []*eventingv1.Broker{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "my-broker", + Namespace: "my-ns", + Labels: map[string]string{ + "key": "value", + }, + }, + }, + }, + want: []string{ + "my-broker", + }, }, } for _, tt := range tests { From 7519033723478673296239055f6b0d70f09cf9ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Thu, 20 Jun 2024 16:02:17 +0200 Subject: [PATCH 13/13] Added very basic unit test for EventPolicyEventHandler --- pkg/auth/event_policy.go | 1 - pkg/auth/event_policy_test.go | 139 +++++++++++++++++++++++++++++++++- 2 files changed, 138 insertions(+), 2 deletions(-) diff --git a/pkg/auth/event_policy.go b/pkg/auth/event_policy.go index 7af5eceb428..e049772f135 100644 --- a/pkg/auth/event_policy.go +++ b/pkg/auth/event_policy.go @@ -237,7 +237,6 @@ func handleApplyingResourcesOfEventPolicy(eventPolicy *v1alpha1.EventPolicy, gk // EventPolicyEventHandler returns an ResourceEventHandler, which passes the referencing resources of the EventPolicy // to the enqueueFn if the EventPolicy was referencing or got updated and now is referencing the resource of the given GVK. func EventPolicyEventHandler(indexer cache.Indexer, gk schema.GroupKind, enqueueFn func(key types.NamespacedName)) cache.ResourceEventHandler { - return cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { eventPolicy, ok := obj.(*v1alpha1.EventPolicy) diff --git a/pkg/auth/event_policy_test.go b/pkg/auth/event_policy_test.go index c958effac10..124f1423173 100644 --- a/pkg/auth/event_policy_test.go +++ b/pkg/auth/event_policy_test.go @@ -770,7 +770,6 @@ func TestSubjectContained(t *testing.T) { } func TestGetApplyingResourcesOfEventPolicyForGK(t *testing.T) { - tests := []struct { name string eventPolicySpecTo []v1alpha1.EventPolicySpecTo @@ -997,3 +996,141 @@ func TestGetApplyingResourcesOfEventPolicyForGK(t *testing.T) { }) } } + +func TestEventPolicyEventHandler_AddAndDelete(t *testing.T) { + eventPolicy := &v1alpha1.EventPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-policy", + Namespace: "my-ns", + }, + Spec: v1alpha1.EventPolicySpec{ + To: []v1alpha1.EventPolicySpecTo{ + { + Ref: &v1alpha1.EventPolicyToReference{ + APIVersion: eventingv1.SchemeGroupVersion.String(), + Kind: "Broker", + Name: "my-broker", + }, + }, + }, + }, + } + + gk := schema.GroupKind{ + Group: eventingv1.SchemeGroupVersion.Group, + Kind: "Broker", + } + + wantCalls := []string{ + "my-broker", + } + + calls := map[string]int{} + callbackFn := func(key types.NamespacedName) { + calls[key.Name]++ + } + + handler := EventPolicyEventHandler(nil, gk, callbackFn) + handler.OnAdd(eventPolicy, false) + + if len(calls) != len(wantCalls) { + t.Errorf("EventPolicyEventHandler() callback in ADD was called on wrong resources. Want to be called on %v, but was called on %v", wantCalls, calls) + } + for _, wantCallForResource := range wantCalls { + num, ok := calls[wantCallForResource] + if !ok { + t.Errorf("EventPolicyEventHandler() callback in ADD was called on %s 0 times. Expected to be called only once", wantCallForResource) + } + + if num != 1 { + t.Errorf("EventPolicyEventHandler() callback in ADD was called on %s %d times. Expected to be called only once", wantCallForResource, num) + } + } + + // do the same for OnDelete + calls = map[string]int{} + handler.OnDelete(eventPolicy) + + if len(calls) != len(wantCalls) { + t.Errorf("EventPolicyEventHandler() callback in DELETE was called on wrong resources. Want to be called on %v, but was called on %v", wantCalls, calls) + } + for _, wantCallForResource := range wantCalls { + num, ok := calls[wantCallForResource] + if !ok { + t.Errorf("EventPolicyEventHandler() callback in DELETE was called on %s 0 times. Expected to be called only once", wantCallForResource) + } + + if num != 1 { + t.Errorf("EventPolicyEventHandler() callback in DELETE was called on %s %d times. Expected to be called only once", wantCallForResource, num) + } + } + +} + +func TestEventPolicyEventHandler_Update(t *testing.T) { + oldEventPolicy := &v1alpha1.EventPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-policy", + Namespace: "my-ns", + }, + Spec: v1alpha1.EventPolicySpec{ + To: []v1alpha1.EventPolicySpecTo{ + { + Ref: &v1alpha1.EventPolicyToReference{ + APIVersion: eventingv1.SchemeGroupVersion.String(), + Kind: "Broker", + Name: "my-broker", + }, + }, + }, + }, + } + newEventPolicy := &v1alpha1.EventPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-policy", + Namespace: "my-ns", + }, + Spec: v1alpha1.EventPolicySpec{ + To: []v1alpha1.EventPolicySpecTo{ + { + Ref: &v1alpha1.EventPolicyToReference{ + APIVersion: eventingv1.SchemeGroupVersion.String(), + Kind: "Broker", + Name: "my-broker", + }, + }, + }, + }, + } + + gk := schema.GroupKind{ + Group: eventingv1.SchemeGroupVersion.Group, + Kind: "Broker", + } + + wantCalls := []string{ + "my-broker", + } + + calls := map[string]int{} + callbackFn := func(key types.NamespacedName) { + calls[key.Name]++ + } + + handler := EventPolicyEventHandler(nil, gk, callbackFn) + handler.OnUpdate(oldEventPolicy, newEventPolicy) + + if len(calls) != len(wantCalls) { + t.Errorf("EventPolicyEventHandler() callback in UPDATE was called on wrong resources. Want to be called on %v, but was called on %v", wantCalls, calls) + } + for _, wantCallForResource := range wantCalls { + num, ok := calls[wantCallForResource] + if !ok { + t.Errorf("EventPolicyEventHandler() callback in UPDATE was called on %s 0 times. Expected to be called only once", wantCallForResource) + } + + if num != 1 { + t.Errorf("EventPolicyEventHandler() callback in UPDATE was called on %s %d times. Expected to be called only once", wantCallForResource, num) + } + } +}