From 3264b212fed393eaf0735bbc151fb825c3a36e65 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Thu, 4 Jul 2024 12:47:55 +0200 Subject: [PATCH] List applying EventPolicies in Brokers status (#8060) List applying event policies in Brokers status --- pkg/apis/eventing/v1/broker_lifecycle.go | 18 ++ pkg/apis/eventing/v1/broker_lifecycle_test.go | 45 +++++ pkg/apis/eventing/v1/test_helper.go | 2 + pkg/reconciler/broker/broker.go | 8 + pkg/reconciler/broker/broker_test.go | 167 +++++++++++++++++- pkg/reconciler/broker/controller.go | 11 ++ pkg/reconciler/broker/controller_test.go | 1 + pkg/reconciler/testing/v1/broker.go | 39 ++++ 8 files changed, 283 insertions(+), 8 deletions(-) diff --git a/pkg/apis/eventing/v1/broker_lifecycle.go b/pkg/apis/eventing/v1/broker_lifecycle.go index 9bc846e89e7..ab4d0fdb253 100644 --- a/pkg/apis/eventing/v1/broker_lifecycle.go +++ b/pkg/apis/eventing/v1/broker_lifecycle.go @@ -32,6 +32,7 @@ const ( BrokerConditionFilter apis.ConditionType = "FilterReady" BrokerConditionAddressable apis.ConditionType = "Addressable" BrokerConditionDeadLetterSinkResolved apis.ConditionType = "DeadLetterSinkResolved" + BrokerConditionEventPoliciesReady apis.ConditionType = "EventPoliciesReady" ) var brokerCondSet = apis.NewLivingConditionSet( @@ -40,6 +41,7 @@ var brokerCondSet = apis.NewLivingConditionSet( BrokerConditionFilter, BrokerConditionAddressable, BrokerConditionDeadLetterSinkResolved, + BrokerConditionEventPoliciesReady, ) var brokerCondSetLock = sync.RWMutex{} @@ -118,3 +120,19 @@ func (bs *BrokerStatus) MarkDeadLetterSinkResolvedFailed(reason, messageFormat s bs.DeliveryStatus = eventingduck.DeliveryStatus{} bs.GetConditionSet().Manage(bs).MarkFalse(BrokerConditionDeadLetterSinkResolved, reason, messageFormat, messageA...) } + +func (bs *BrokerStatus) MarkEventPoliciesTrue() { + bs.GetConditionSet().Manage(bs).MarkTrue(BrokerConditionEventPoliciesReady) +} + +func (bs *BrokerStatus) MarkEventPoliciesTrueWithReason(reason, messageFormat string, messageA ...interface{}) { + bs.GetConditionSet().Manage(bs).MarkTrueWithReason(BrokerConditionEventPoliciesReady, reason, messageFormat, messageA...) +} + +func (bs *BrokerStatus) MarkEventPoliciesFailed(reason, messageFormat string, messageA ...interface{}) { + bs.GetConditionSet().Manage(bs).MarkFalse(BrokerConditionEventPoliciesReady, reason, messageFormat, messageA...) +} + +func (bs *BrokerStatus) MarkEventPoliciesUnknown(reason, messageFormat string, messageA ...interface{}) { + bs.GetConditionSet().Manage(bs).MarkUnknown(BrokerConditionEventPoliciesReady, reason, messageFormat, messageA...) +} diff --git a/pkg/apis/eventing/v1/broker_lifecycle_test.go b/pkg/apis/eventing/v1/broker_lifecycle_test.go index df3c3c509bd..dbc2ca014de 100644 --- a/pkg/apis/eventing/v1/broker_lifecycle_test.go +++ b/pkg/apis/eventing/v1/broker_lifecycle_test.go @@ -193,6 +193,9 @@ func TestBrokerInitializeConditions(t *testing.T) { }, { Type: BrokerConditionDeadLetterSinkResolved, Status: corev1.ConditionUnknown, + }, { + Type: BrokerConditionEventPoliciesReady, + Status: corev1.ConditionUnknown, }, { Type: BrokerConditionFilter, Status: corev1.ConditionUnknown, @@ -226,6 +229,9 @@ func TestBrokerInitializeConditions(t *testing.T) { }, { Type: BrokerConditionDeadLetterSinkResolved, Status: corev1.ConditionUnknown, + }, { + Type: BrokerConditionEventPoliciesReady, + Status: corev1.ConditionUnknown, }, { Type: BrokerConditionFilter, Status: corev1.ConditionUnknown, @@ -259,6 +265,9 @@ func TestBrokerInitializeConditions(t *testing.T) { }, { Type: BrokerConditionDeadLetterSinkResolved, Status: corev1.ConditionUnknown, + }, { + Type: BrokerConditionEventPoliciesReady, + Status: corev1.ConditionUnknown, }, { Type: BrokerConditionFilter, Status: corev1.ConditionTrue, @@ -287,6 +296,9 @@ func TestBrokerInitializeConditions(t *testing.T) { }, { Type: BrokerConditionDeadLetterSinkResolved, Status: corev1.ConditionTrue, + }, { + Type: BrokerConditionEventPoliciesReady, + Status: corev1.ConditionTrue, }, { Type: BrokerConditionFilter, Status: corev1.ConditionTrue, @@ -316,6 +328,9 @@ func TestBrokerInitializeConditions(t *testing.T) { }, { Type: BrokerConditionDeadLetterSinkResolved, Status: corev1.ConditionTrue, + }, { + Type: BrokerConditionEventPoliciesReady, + Status: corev1.ConditionTrue, }, { Type: BrokerConditionFilter, Status: corev1.ConditionTrue, @@ -350,6 +365,7 @@ func TestBrokerIsReady(t *testing.T) { markTriggerChannelReady *bool markFilterReady *bool markDLSResolved *bool + markEventPolicyReady *bool markAddressable *bool address *apis.URL markIngressSubscriptionOwned bool @@ -361,6 +377,7 @@ func TestBrokerIsReady(t *testing.T) { markTriggerChannelReady: &trueVal, markFilterReady: &trueVal, markDLSResolved: &trueVal, + markEventPolicyReady: &trueVal, address: &apis.URL{Scheme: "http", Host: "hostname"}, markIngressSubscriptionOwned: true, markIngressSubscriptionReady: &trueVal, @@ -371,6 +388,7 @@ func TestBrokerIsReady(t *testing.T) { markTriggerChannelReady: &trueVal, markFilterReady: &trueVal, markDLSResolved: &trueVal, + markEventPolicyReady: &trueVal, address: &apis.URL{Scheme: "http", Host: "hostname"}, markIngressSubscriptionOwned: true, markIngressSubscriptionReady: &trueVal, @@ -381,6 +399,7 @@ func TestBrokerIsReady(t *testing.T) { markTriggerChannelReady: &trueVal, markFilterReady: &trueVal, markDLSResolved: &trueVal, + markEventPolicyReady: &trueVal, address: &apis.URL{Scheme: "http", Host: "hostname"}, markIngressSubscriptionOwned: true, markIngressSubscriptionReady: &trueVal, @@ -391,6 +410,7 @@ func TestBrokerIsReady(t *testing.T) { markTriggerChannelReady: &falseVal, markFilterReady: &trueVal, markDLSResolved: &trueVal, + markEventPolicyReady: &trueVal, address: &apis.URL{Scheme: "http", Host: "hostname"}, markIngressSubscriptionOwned: true, markIngressSubscriptionReady: &trueVal, @@ -401,6 +421,7 @@ func TestBrokerIsReady(t *testing.T) { markTriggerChannelReady: &trueVal, markFilterReady: &falseVal, markDLSResolved: &trueVal, + markEventPolicyReady: &trueVal, address: &apis.URL{Scheme: "http", Host: "hostname"}, markIngressSubscriptionOwned: true, markIngressSubscriptionReady: &trueVal, @@ -411,6 +432,7 @@ func TestBrokerIsReady(t *testing.T) { markTriggerChannelReady: &trueVal, markFilterReady: &trueVal, markDLSResolved: &trueVal, + markEventPolicyReady: &trueVal, address: nil, markIngressSubscriptionOwned: true, markIngressSubscriptionReady: &trueVal, @@ -421,6 +443,7 @@ func TestBrokerIsReady(t *testing.T) { markTriggerChannelReady: &trueVal, markFilterReady: &trueVal, markDLSResolved: &trueVal, + markEventPolicyReady: &trueVal, markAddressable: nil, address: nil, markIngressSubscriptionOwned: true, @@ -432,6 +455,7 @@ func TestBrokerIsReady(t *testing.T) { markTriggerChannelReady: &trueVal, markFilterReady: &trueVal, markDLSResolved: &falseVal, + markEventPolicyReady: &trueVal, address: &apis.URL{Scheme: "http", Host: "hostname"}, markIngressSubscriptionOwned: true, markIngressSubscriptionReady: &trueVal, @@ -442,16 +466,29 @@ func TestBrokerIsReady(t *testing.T) { markTriggerChannelReady: &trueVal, markFilterReady: &trueVal, markDLSResolved: nil, + markEventPolicyReady: &trueVal, address: &apis.URL{Scheme: "http", Host: "hostname"}, markIngressSubscriptionOwned: true, markIngressSubscriptionReady: &trueVal, wantReady: true, + }, { + name: "eventpolicy sad", + markIngressReady: &trueVal, + markTriggerChannelReady: &trueVal, + markFilterReady: &trueVal, + markDLSResolved: &trueVal, + markEventPolicyReady: &falseVal, + address: &apis.URL{Scheme: "http", Host: "hostname"}, + markIngressSubscriptionOwned: true, + markIngressSubscriptionReady: &trueVal, + wantReady: false, }, { name: "all sad", markIngressReady: &falseVal, markTriggerChannelReady: &falseVal, markFilterReady: &falseVal, markDLSResolved: &falseVal, + markEventPolicyReady: &falseVal, address: nil, markIngressSubscriptionOwned: true, markIngressSubscriptionReady: &falseVal, @@ -488,6 +525,14 @@ func TestBrokerIsReady(t *testing.T) { bs.MarkDeadLetterSinkNotConfigured() } + if test.markEventPolicyReady == &trueVal { + bs.MarkEventPoliciesTrue() + } else if test.markEventPolicyReady == &falseVal { + bs.MarkEventPoliciesFailed("", "") + } else { + bs.MarkEventPoliciesUnknown("", "") + } + if test.markFilterReady != nil { var ep *corev1.Endpoints if *test.markFilterReady { diff --git a/pkg/apis/eventing/v1/test_helper.go b/pkg/apis/eventing/v1/test_helper.go index 9775f84dd72..e2337e94066 100644 --- a/pkg/apis/eventing/v1/test_helper.go +++ b/pkg/apis/eventing/v1/test_helper.go @@ -66,6 +66,7 @@ func (t testHelper) ReadyBrokerStatus() *BrokerStatus { URL: apis.HTTP("example.com"), }) bs.MarkDeadLetterSinkResolvedSucceeded(eventingduckv1.DeliveryStatus{}) + bs.MarkEventPoliciesTrue() return bs } @@ -77,6 +78,7 @@ func (t testHelper) ReadyBrokerStatusWithoutDLS() *BrokerStatus { bs.SetAddress(&duckv1.Addressable{ URL: apis.HTTP("example.com"), }) + bs.MarkEventPoliciesTrue() bs.MarkDeadLetterSinkNotConfigured() return bs } diff --git a/pkg/reconciler/broker/broker.go b/pkg/reconciler/broker/broker.go index c741e815049..94f28ad6dc0 100644 --- a/pkg/reconciler/broker/broker.go +++ b/pkg/reconciler/broker/broker.go @@ -51,6 +51,7 @@ import ( "knative.dev/eventing/pkg/auth" clientset "knative.dev/eventing/pkg/client/clientset/versioned" brokerreconciler "knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1/broker" + eventingv1alpha1listers "knative.dev/eventing/pkg/client/listers/eventing/v1alpha1" messaginglisters "knative.dev/eventing/pkg/client/listers/messaging/v1" ducklib "knative.dev/eventing/pkg/duck" "knative.dev/eventing/pkg/eventingtls" @@ -79,6 +80,8 @@ type Reconciler struct { // If specified, only reconcile brokers with these labels brokerClass string + + eventPolicyLister eventingv1alpha1listers.EventPolicyLister } // Check that our Reconciler implements Interface @@ -256,6 +259,11 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, b *eventingv1.Broker) pk b.GetConditionSet().Manage(b.GetStatus()).MarkTrue(eventingv1.BrokerConditionAddressable) + err = auth.UpdateStatusWithEventPolicies(featureFlags, &b.Status.AppliedEventPoliciesStatus, &b.Status, r.eventPolicyLister, eventingv1.SchemeGroupVersion.WithKind("Broker"), b.ObjectMeta) + if err != nil { + return fmt.Errorf("could not update broker status with EventPolicies: %v", err) + } + // So, at this point the Broker is ready and everything should be solid // for the triggers to act upon. return nil diff --git a/pkg/reconciler/broker/broker_test.go b/pkg/reconciler/broker/broker_test.go index 0bfe7b27e09..82e72f7d768 100644 --- a/pkg/reconciler/broker/broker_test.go +++ b/pkg/reconciler/broker/broker_test.go @@ -79,6 +79,8 @@ const ( apiVersion: "messaging.knative.dev/v1" kind: "InMemoryChannel" ` + readyEventPolicyName = "test-event-policy-ready" + unreadyEventPolicyName = "test-event-policy-unready" ) var ( @@ -136,6 +138,12 @@ var ( dls = duckv1.Addressable{ URL: apis.HTTP("test-dls.test-namespace.svc.cluster.local"), } + + brokerV1GVK = metav1.GroupVersionKind{ + Group: "eventing.knative.dev", + Version: "v1", + Kind: "Broker", + } ) func TestReconcile(t *testing.T) { @@ -386,7 +394,8 @@ func TestReconcile(t *testing.T) { WithChannelAPIVersionAnnotation(triggerChannelAPIVersion), WithChannelKindAnnotation(triggerChannelKind), WithChannelNameAnnotation(triggerChannelName), - WithDLSNotConfigured()), + WithDLSNotConfigured(), + WithBrokerEventPoliciesReadyBecauseOIDCDisabled()), }}, }, { Name: "Successful Reconciliation with a Channel with CA certs", @@ -416,7 +425,8 @@ func TestReconcile(t *testing.T) { WithChannelAPIVersionAnnotation(triggerChannelAPIVersion), WithChannelKindAnnotation(triggerChannelKind), WithChannelNameAnnotation(triggerChannelName), - WithDLSNotConfigured()), + WithDLSNotConfigured(), + WithBrokerEventPoliciesReadyBecauseOIDCDisabled()), }}, }, { Name: "Successful Reconciliation with a Channel with Audience", @@ -446,7 +456,8 @@ func TestReconcile(t *testing.T) { WithChannelAPIVersionAnnotation(triggerChannelAPIVersion), WithChannelKindAnnotation(triggerChannelKind), WithChannelNameAnnotation(triggerChannelName), - WithDLSNotConfigured()), + WithDLSNotConfigured(), + WithBrokerEventPoliciesReadyBecauseOIDCDisabled()), }}, }, { Name: "Successful Reconciliation. Using legacy channel template config element.", @@ -475,7 +486,8 @@ func TestReconcile(t *testing.T) { WithChannelAPIVersionAnnotation(triggerChannelAPIVersion), WithChannelKindAnnotation(triggerChannelKind), WithChannelNameAnnotation(triggerChannelName), - WithDLSNotConfigured()), + WithDLSNotConfigured(), + WithBrokerEventPoliciesReadyBecauseOIDCDisabled()), }}, }, { Name: "Successful Reconciliation, status update fails", @@ -507,7 +519,8 @@ func TestReconcile(t *testing.T) { WithChannelAPIVersionAnnotation(triggerChannelAPIVersion), WithChannelKindAnnotation(triggerChannelKind), WithChannelNameAnnotation(triggerChannelName), - WithDLSNotConfigured()), + WithDLSNotConfigured(), + WithBrokerEventPoliciesReadyBecauseOIDCDisabled()), }}, WantEvents: []string{ Eventf(corev1.EventTypeWarning, "UpdateFailed", `Failed to update status for "test-broker": inducing failure for update brokers`), @@ -569,7 +582,8 @@ func TestReconcile(t *testing.T) { WithChannelAddressAnnotation(triggerChannelURL), WithChannelAPIVersionAnnotation(triggerChannelAPIVersion), WithChannelKindAnnotation(triggerChannelKind), - WithChannelNameAnnotation(triggerChannelName)), + WithChannelNameAnnotation(triggerChannelName), + WithBrokerEventPoliciesReadyBecauseOIDCDisabled()), }}, WantErr: false, }, { @@ -602,7 +616,8 @@ func TestReconcile(t *testing.T) { WithChannelAddressAnnotation(triggerChannelURL), WithChannelAPIVersionAnnotation(triggerChannelAPIVersion), WithChannelKindAnnotation(triggerChannelKind), - WithChannelNameAnnotation(triggerChannelName)), + WithChannelNameAnnotation(triggerChannelName), + WithBrokerEventPoliciesReadyBecauseOIDCDisabled()), }}, WantPatches: []clientgotesting.PatchActionImpl{ makeChannelDLSRefNamePatch(sinkSVCDest.Ref.Name), @@ -636,7 +651,8 @@ func TestReconcile(t *testing.T) { WithChannelAddressAnnotation(triggerChannelURL), WithChannelAPIVersionAnnotation(triggerChannelAPIVersion), WithChannelKindAnnotation(triggerChannelKind), - WithChannelNameAnnotation(triggerChannelName)), + WithChannelNameAnnotation(triggerChannelName), + WithBrokerEventPoliciesReadyBecauseOIDCDisabled()), }}, WantPatches: []clientgotesting.PatchActionImpl{ makeChannelDeliveryRetryPatch(deliveryRetries), @@ -685,6 +701,7 @@ func TestReconcile(t *testing.T) { URL: brokerAddress, }, }), + WithBrokerEventPoliciesReadyBecauseOIDCDisabled(), ), }}, Ctx: feature.ToContext(context.Background(), feature.Flags{ @@ -736,6 +753,7 @@ func TestReconcile(t *testing.T) { URL: httpsURL(brokerName, testNS), CACerts: pointer.String(testCaCerts), }), + WithBrokerEventPoliciesReadyBecauseOIDCDisabled(), ), }}, Ctx: feature.ToContext(context.Background(), feature.Flags{ @@ -777,6 +795,138 @@ func TestReconcile(t *testing.T) { WithChannelAPIVersionAnnotation(triggerChannelAPIVersion), WithChannelKindAnnotation(triggerChannelKind), WithChannelNameAnnotation(triggerChannelName), + WithBrokerEventPoliciesReadyBecauseNoPolicyAndOIDCEnabled(), + ), + }}, + Ctx: feature.ToContext(context.Background(), feature.Flags{ + feature.OIDCAuthentication: feature.Enabled, + feature.AuthorizationDefaultMode: feature.AuthorizationAllowSameNamespace, + }), + }, + { + Name: "Should list applying EventPolicies", + Key: testKey, + Objects: []runtime.Object{ + NewBroker(brokerName, testNS, + WithBrokerClass(eventing.MTChannelBrokerClassValue), + WithBrokerConfig(config()), + WithInitBrokerConditions), + createChannel(withChannelReady), + imcConfigMap(), + NewEndpoints(filterServiceName, systemNS, + WithEndpointsLabels(FilterLabels()), + WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), + NewEndpoints(ingressServiceName, systemNS, + WithEndpointsLabels(IngressLabels()), + WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), + NewEventPolicy(readyEventPolicyName, testNS, + WithReadyEventPolicyCondition, + WithEventPolicyToRef(brokerV1GVK, brokerName), + ), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewBroker(brokerName, testNS, + WithBrokerClass(eventing.MTChannelBrokerClassValue), + WithBrokerConfig(config()), + WithBrokerReady, + WithBrokerAddress(&duckv1.Addressable{ + URL: brokerAddress, + Audience: &brokerAudience, + }), + WithChannelAddressAnnotation(triggerChannelURL), + WithChannelAPIVersionAnnotation(triggerChannelAPIVersion), + WithChannelKindAnnotation(triggerChannelKind), + WithChannelNameAnnotation(triggerChannelName), + WithDLSNotConfigured(), + WithBrokerEventPoliciesReady(), + WithBrokerEventPoliciesListed(readyEventPolicyName)), + }}, + Ctx: feature.ToContext(context.Background(), feature.Flags{ + feature.OIDCAuthentication: feature.Enabled, + }), + }, + { + Name: "Should mark as NotReady on unready EventPolicies", + Key: testKey, + Objects: []runtime.Object{ + NewBroker(brokerName, testNS, + WithBrokerClass(eventing.MTChannelBrokerClassValue), + WithBrokerConfig(config()), + WithInitBrokerConditions), + createChannel(withChannelReady), + imcConfigMap(), + NewEndpoints(filterServiceName, systemNS, + WithEndpointsLabels(FilterLabels()), + WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), + NewEndpoints(ingressServiceName, systemNS, + WithEndpointsLabels(IngressLabels()), + WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), + NewEventPolicy(unreadyEventPolicyName, testNS, + WithUnreadyEventPolicyCondition("", ""), + WithEventPolicyToRef(brokerV1GVK, brokerName), + ), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewBroker(brokerName, testNS, + WithBrokerClass(eventing.MTChannelBrokerClassValue), + WithBrokerConfig(config()), + WithBrokerReady, + WithBrokerAddress(&duckv1.Addressable{ + URL: brokerAddress, + Audience: &brokerAudience, + }), + WithChannelAddressAnnotation(triggerChannelURL), + WithChannelAPIVersionAnnotation(triggerChannelAPIVersion), + WithChannelKindAnnotation(triggerChannelKind), + WithChannelNameAnnotation(triggerChannelName), + WithDLSNotConfigured(), + WithBrokerEventPoliciesNotReady("EventPoliciesNotReady", fmt.Sprintf("event policies %s are not ready", unreadyEventPolicyName))), + }}, + Ctx: feature.ToContext(context.Background(), feature.Flags{ + feature.OIDCAuthentication: feature.Enabled, + }), + }, + { + Name: "Should list only Ready EventPolicies", + Key: testKey, + Objects: []runtime.Object{ + NewBroker(brokerName, testNS, + WithBrokerClass(eventing.MTChannelBrokerClassValue), + WithBrokerConfig(config()), + WithInitBrokerConditions), + createChannel(withChannelReady), + imcConfigMap(), + NewEndpoints(filterServiceName, systemNS, + WithEndpointsLabels(FilterLabels()), + WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), + NewEndpoints(ingressServiceName, systemNS, + WithEndpointsLabels(IngressLabels()), + WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), + NewEventPolicy(readyEventPolicyName, testNS, + WithReadyEventPolicyCondition, + WithEventPolicyToRef(brokerV1GVK, brokerName), + ), + NewEventPolicy(unreadyEventPolicyName, testNS, + WithUnreadyEventPolicyCondition("", ""), + WithEventPolicyToRef(brokerV1GVK, brokerName), + ), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewBroker(brokerName, testNS, + WithBrokerClass(eventing.MTChannelBrokerClassValue), + WithBrokerConfig(config()), + WithBrokerReady, + WithBrokerAddress(&duckv1.Addressable{ + URL: brokerAddress, + Audience: &brokerAudience, + }), + WithChannelAddressAnnotation(triggerChannelURL), + WithChannelAPIVersionAnnotation(triggerChannelAPIVersion), + WithChannelKindAnnotation(triggerChannelKind), + WithChannelNameAnnotation(triggerChannelName), + WithDLSNotConfigured(), + WithBrokerEventPoliciesNotReady("EventPoliciesNotReady", fmt.Sprintf("event policies %s are not ready", unreadyEventPolicyName)), + WithBrokerEventPoliciesListed(readyEventPolicyName), ), }}, Ctx: feature.ToContext(context.Background(), feature.Flags{ @@ -810,6 +960,7 @@ func TestReconcile(t *testing.T) { secretLister: listers.GetSecretLister(), channelableTracker: duck.NewListableTrackerFromTracker(ctx, channelable.Get, tracker.New(func(types.NamespacedName) {}, 0)), uriResolver: resolver.NewURIResolverFromTracker(ctx, tracker.New(func(types.NamespacedName) {}, 0)), + eventPolicyLister: listers.GetEventPolicyLister(), } return broker.NewReconciler(ctx, logger, fakeeventingclient.Get(ctx), listers.GetBrokerLister(), diff --git a/pkg/reconciler/broker/controller.go b/pkg/reconciler/broker/controller.go index d581a90cdb2..ebb0a513735 100644 --- a/pkg/reconciler/broker/controller.go +++ b/pkg/reconciler/broker/controller.go @@ -19,6 +19,8 @@ package broker import ( "context" + "knative.dev/eventing/pkg/auth" + "go.uber.org/zap" "k8s.io/client-go/tools/cache" "knative.dev/pkg/apis" @@ -41,6 +43,7 @@ import ( eventingclient "knative.dev/eventing/pkg/client/injection/client" "knative.dev/eventing/pkg/client/injection/ducks/duck/v1/channelable" brokerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker" + eventpolicyinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy" subscriptioninformer "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription" brokerreconciler "knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1/broker" "knative.dev/eventing/pkg/duck" @@ -69,6 +72,7 @@ func NewController( endpointsInformer := endpointsinformer.Get(ctx) configmapInformer := configmapinformer.Get(ctx) secretInformer := secretinformer.Get(ctx) + eventPolicyInformer := eventpolicyinformer.Get(ctx) var globalResync func(obj interface{}) @@ -101,6 +105,7 @@ func NewController( brokerClass: eventing.MTChannelBrokerClassValue, configmapLister: configmapInformer.Lister(), secretLister: secretInformer.Lister(), + eventPolicyLister: eventPolicyInformer.Lister(), } impl := brokerreconciler.NewImpl(ctx, r, eventing.MTChannelBrokerClassValue, func(impl *controller.Impl) controller.Options { return controller.Options{ @@ -145,5 +150,11 @@ func NewController( Handler: controller.HandleAll(globalResync), }) + brokerGK := eventingv1.SchemeGroupVersion.WithKind("Broker").GroupKind() + + // Enqueue the Broker, if we have an EventPolicy which was referencing + // or got updated and now is referencing the Broker + eventPolicyInformer.Informer().AddEventHandler(auth.EventPolicyEventHandler(brokerInformer.Informer().GetIndexer(), brokerGK, impl.EnqueueKey)) + return impl } diff --git a/pkg/reconciler/broker/controller_test.go b/pkg/reconciler/broker/controller_test.go index 0e39281751c..80e5a7120f5 100644 --- a/pkg/reconciler/broker/controller_test.go +++ b/pkg/reconciler/broker/controller_test.go @@ -29,6 +29,7 @@ import ( // Fake injection informers _ "knative.dev/eventing/pkg/client/injection/ducks/duck/v1/channelable/fake" _ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker/fake" + _ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy/fake" _ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription/fake" _ "knative.dev/pkg/client/injection/ducks/duck/v1/conditions/fake" _ "knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment/fake" diff --git a/pkg/reconciler/testing/v1/broker.go b/pkg/reconciler/testing/v1/broker.go index d7ddbd31c90..96b4f9d9f43 100644 --- a/pkg/reconciler/testing/v1/broker.go +++ b/pkg/reconciler/testing/v1/broker.go @@ -18,10 +18,14 @@ import ( "fmt" "time" + eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1" + "knative.dev/eventing/pkg/apis/feature" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" + eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" eventingv1 "knative.dev/eventing/pkg/apis/duck/v1" "knative.dev/eventing/pkg/apis/eventing" v1 "knative.dev/eventing/pkg/apis/eventing/v1" @@ -288,3 +292,38 @@ func WithBrokersAddresses(addresses []duckv1.Addressable) BrokerOption { b.GetConditionSet().Manage(b.GetStatus()).MarkTrue(v1.BrokerConditionAddressable) } } + +func WithBrokerEventPoliciesReady() BrokerOption { + return func(b *v1.Broker) { + b.Status.MarkEventPoliciesTrue() + } +} + +func WithBrokerEventPoliciesNotReady(reason, message string) BrokerOption { + return func(b *v1.Broker) { + b.Status.MarkEventPoliciesFailed(reason, message) + } +} + +func WithBrokerEventPoliciesListed(policyNames ...string) BrokerOption { + return func(b *v1.Broker) { + for _, name := range policyNames { + b.Status.Policies = append(b.Status.Policies, eventingduckv1.AppliedEventPolicyRef{ + APIVersion: eventingv1alpha1.SchemeGroupVersion.String(), + Name: name, + }) + } + } +} + +func WithBrokerEventPoliciesReadyBecauseOIDCDisabled() BrokerOption { + return func(b *v1.Broker) { + b.Status.MarkEventPoliciesTrueWithReason("OIDCDisabled", "Feature %q must be enabled to support Authorization", feature.OIDCAuthentication) + } +} + +func WithBrokerEventPoliciesReadyBecauseNoPolicyAndOIDCEnabled() BrokerOption { + return func(b *v1.Broker) { + b.Status.MarkEventPoliciesTrueWithReason("DefaultAuthorizationMode", "Default authz mode is %q", feature.AuthorizationAllowSameNamespace) + } +}