diff --git a/pkg/reconciler/broker/broker.go b/pkg/reconciler/broker/broker.go index c631fc50932..7974771a4ca 100644 --- a/pkg/reconciler/broker/broker.go +++ b/pkg/reconciler/broker/broker.go @@ -28,6 +28,7 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/dynamic" corev1listers "k8s.io/client-go/listers/core/v1" @@ -46,6 +47,7 @@ import ( duckv1 "knative.dev/eventing/pkg/apis/duck/v1" "knative.dev/eventing/pkg/apis/eventing" eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" + eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1" "knative.dev/eventing/pkg/apis/feature" messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" "knative.dev/eventing/pkg/auth" @@ -264,6 +266,11 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, b *eventingv1.Broker) pk return fmt.Errorf("could not update broker status with EventPolicies: %v", err) } + // Reconcile the EventPolicy for the Broker. + if err := r.reconcileBrokerChannelEventPolicies(ctx, b, triggerChan, featureFlags); err != nil { + return fmt.Errorf("failed to reconcile EventPolicy for Broker %s: %w", b.Name, err) + } + // So, at this point the Broker is ready and everything should be solid // for the triggers to act upon. return nil @@ -428,6 +435,64 @@ func (r *Reconciler) reconcileChannel(ctx context.Context, channelResourceInterf return channelable, nil } +func (r *Reconciler) reconcileBrokerChannelEventPolicies(ctx context.Context, b *eventingv1.Broker, triggerChan *duckv1.Channelable, featureFlags feature.Flags) error { + logger := logging.FromContext(ctx) + + expected := resources.MakeEventPolicyForBackingChannel(b, triggerChan) + if featureFlags.IsOIDCAuthentication() { + // Get the EventPolicy, create if not exists. + foundEP, err := r.eventPolicyLister.EventPolicies(expected.Namespace).Get(expected.Name) + if apierrs.IsNotFound(err) { + // Create the EventPolicy since it doesn't exist. + logger.Debugw("Creating EventPolicy for Broker %s", expected.Name) + + _, err = r.eventingClientSet.EventingV1alpha1().EventPolicies(expected.Namespace).Create(ctx, expected, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("failed to create EventPolicy for Broker %s: %w", expected.Name, err) + } + return nil + } + if err != nil { + return fmt.Errorf("failed to get EventPolicy for Broker %s: %w", expected.Name, err) + } + if policyNeedsUpdate(foundEP, expected) { + // Update the EventPolicy since it exists and needs update. + logger.Debugw("Updating EventPolicy for Broker %s", expected.Name) + expected.SetResourceVersion(foundEP.GetResourceVersion()) + _, err = r.eventingClientSet.EventingV1alpha1().EventPolicies(expected.Namespace).Update(ctx, expected, metav1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("failed to update EventPolicy for Broker %s: %w", expected.Name, err) + } + } + return nil + } + + // List all the orphaned EventPolicies that have owner reference set to the Broker and delete them. + selector, err := labels.ValidatedSelectorFromSet(resources.LabelsForBackingChannelsEventPolicy(b)) + if err != nil { + return fmt.Errorf("could not get valid selector for broker's channel EventPolicy %s/%s: %w", b.Namespace, b.Name, err) + } + eventPolicies, err := r.eventPolicyLister.EventPolicies(expected.Namespace).List(selector) + if err != nil { + return fmt.Errorf("failed to list EventPolicies for Broker %s: %w", expected.Name, err) + } + for _, ep := range eventPolicies { + if metav1.IsControlledBy(ep, b) { + logger.Debugw("Deleting EventPolicy for Broker %s", expected.Name) + err := r.eventingClientSet.EventingV1alpha1().EventPolicies(ep.Namespace).Delete(ctx, ep.Name, metav1.DeleteOptions{}) + if err != nil { + return fmt.Errorf("failed to delete EventPolicy for Broker %s: %w", expected.Name, err) + } + logger.Debugw("Deleted EventPolicy for Broker %s", expected.Name) + } + } + return nil +} + +func policyNeedsUpdate(foundEP, expected *eventingv1alpha1.EventPolicy) bool { + return !equality.Semantic.DeepDerivative(expected, foundEP) +} + // TriggerChannelLabels are all the labels placed on the Trigger Channel for the given brokerName. This // should only be used by Broker and Trigger code. func TriggerChannelLabels(brokerName, brokerNamespace string) map[string]string { diff --git a/pkg/reconciler/broker/broker_test.go b/pkg/reconciler/broker/broker_test.go index 82e72f7d768..784178d9b12 100644 --- a/pkg/reconciler/broker/broker_test.go +++ b/pkg/reconciler/broker/broker_test.go @@ -46,6 +46,7 @@ import ( "knative.dev/eventing/pkg/apis/eventing" eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" + eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1" "knative.dev/eventing/pkg/apis/feature" "knative.dev/eventing/pkg/auth" fakeeventingclient "knative.dev/eventing/pkg/client/injection/client/fake" @@ -57,6 +58,7 @@ import ( . "knative.dev/pkg/reconciler/testing" _ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger/fake" + "knative.dev/eventing/pkg/reconciler/broker/resources" . "knative.dev/eventing/pkg/reconciler/testing/v1" ) @@ -144,6 +146,12 @@ var ( Version: "v1", Kind: "Broker", } + + channelV1GVK = metav1.GroupVersionKind{ + Group: "messaging.knative.dev", + Version: "v1", + Kind: "InMemoryChannel", + } ) func TestReconcile(t *testing.T) { @@ -780,6 +788,9 @@ func TestReconcile(t *testing.T) { WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), }, WantErr: false, + WantCreates: []runtime.Object{ + makeEventPolicy(), + }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewBroker(brokerName, testNS, WithBrokerClass(eventing.MTChannelBrokerClassValue), @@ -824,6 +835,9 @@ func TestReconcile(t *testing.T) { WithEventPolicyToRef(brokerV1GVK, brokerName), ), }, + WantCreates: []runtime.Object{ + makeEventPolicy(), + }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewBroker(brokerName, testNS, WithBrokerClass(eventing.MTChannelBrokerClassValue), @@ -866,6 +880,9 @@ func TestReconcile(t *testing.T) { WithEventPolicyToRef(brokerV1GVK, brokerName), ), }, + WantCreates: []runtime.Object{ + makeEventPolicy(), + }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewBroker(brokerName, testNS, WithBrokerClass(eventing.MTChannelBrokerClassValue), @@ -911,6 +928,9 @@ func TestReconcile(t *testing.T) { WithEventPolicyToRef(brokerV1GVK, brokerName), ), }, + WantCreates: []runtime.Object{ + makeEventPolicy(), + }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewBroker(brokerName, testNS, WithBrokerClass(eventing.MTChannelBrokerClassValue), @@ -932,6 +952,47 @@ func TestReconcile(t *testing.T) { Ctx: feature.ToContext(context.Background(), feature.Flags{ feature.OIDCAuthentication: feature.Enabled, }), + }, { + Name: "Should create an Event Policy for a Broker's underlying Channel", + Key: testKey, + Objects: []runtime.Object{ + NewBroker(brokerName, testNS, + WithBrokerClass(eventing.MTChannelBrokerClassValue), + WithBrokerConfig(config()), + WithInitBrokerConditions), + createChannel(withChannelReady), + imcConfigMap(), + NewEndpoints(filterServiceName, systemNS, + WithEndpointsLabels(FilterLabels()), + WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), + NewEndpoints(ingressServiceName, systemNS, + WithEndpointsLabels(IngressLabels()), + WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), + }, + WantCreates: []runtime.Object{ + makeEventPolicy(), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewBroker(brokerName, testNS, + WithBrokerClass(eventing.MTChannelBrokerClassValue), + WithBrokerConfig(config()), + WithBrokerReady, + WithBrokerAddress(&duckv1.Addressable{ + URL: brokerAddress, + Audience: &brokerAudience, + }), + WithChannelAddressAnnotation(triggerChannelURL), + WithChannelAPIVersionAnnotation(triggerChannelAPIVersion), + WithChannelKindAnnotation(triggerChannelKind), + WithChannelNameAnnotation(triggerChannelName), + WithDLSNotConfigured(), + WithBrokerEventPoliciesReadyBecauseNoPolicyAndOIDCEnabled(), + ), + }}, + Ctx: feature.ToContext(context.Background(), feature.Flags{ + feature.OIDCAuthentication: feature.Enabled, + feature.AuthorizationDefaultMode: feature.AuthorizationAllowSameNamespace, + }), }, } @@ -1227,3 +1288,23 @@ func makeTLSSecret() *corev1.Secret { Type: corev1.SecretTypeTLS, } } + +func makeEventPolicy() *eventingv1alpha1.EventPolicy { + return NewEventPolicy(resources.BrokerEventPolicyName(brokerName, triggerChannelName), testNS, + WithEventPolicyToRef(channelV1GVK, triggerChannelName), + WithEventPolicyFromSub(resources.OIDCBrokerSub), + WithEventPolicyOwnerReferences([]metav1.OwnerReference{ + { + APIVersion: "eventing.knative.dev/v1", + Kind: "Broker", + Name: brokerName, + }, + }...), + WithEventPolicyLabels(map[string]string{ + "eventing.knative.dev/" + "broker-group": brokerV1GVK.Group, + "eventing.knative.dev/" + "broker-version": brokerV1GVK.Version, + "eventing.knative.dev/" + "broker-kind": brokerV1GVK.Kind, + "eventing.knative.dev/" + "broker-name": brokerName, + }), + ) +} diff --git a/pkg/reconciler/broker/resources/eventpolicy.go b/pkg/reconciler/broker/resources/eventpolicy.go new file mode 100644 index 00000000000..e9cba27c542 --- /dev/null +++ b/pkg/reconciler/broker/resources/eventpolicy.go @@ -0,0 +1,78 @@ +/* +Copyright 2024 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resources + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/ptr" + eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" + eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1" + "knative.dev/pkg/kmeta" +) + +const ( + BackingChannelEventPolicyLabelPrefix = "eventing.knative.dev/" + OIDCBrokerSub = "system:serviceaccount:knative-eventing:mt-broker-ingress-oidc" + brokerKind = "Broker" +) + +func MakeEventPolicyForBackingChannel(b *eventingv1.Broker, backingChannel *eventingduckv1.Channelable) *eventingv1alpha1.EventPolicy { + return &eventingv1alpha1.EventPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: backingChannel.Namespace, + Name: BrokerEventPolicyName(b.Name, backingChannel.Name), + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: eventingv1.SchemeGroupVersion.String(), + Kind: brokerKind, + Name: b.Name, + }, + }, + Labels: LabelsForBackingChannelsEventPolicy(b), + }, + Spec: eventingv1alpha1.EventPolicySpec{ + To: []eventingv1alpha1.EventPolicySpecTo{ + { + Ref: &eventingv1alpha1.EventPolicyToReference{ + APIVersion: backingChannel.APIVersion, + Kind: backingChannel.Kind, + Name: backingChannel.Name, + }, + }, + }, + From: []eventingv1alpha1.EventPolicySpecFrom{ + { + Sub: ptr.To(OIDCBrokerSub), + }, + }, + }, + } +} + +func LabelsForBackingChannelsEventPolicy(broker *eventingv1.Broker) map[string]string { + return map[string]string{ + BackingChannelEventPolicyLabelPrefix + "broker-group": eventingv1.SchemeGroupVersion.Group, + BackingChannelEventPolicyLabelPrefix + "broker-version": eventingv1.SchemeGroupVersion.Version, + BackingChannelEventPolicyLabelPrefix + "broker-kind": brokerKind, + BackingChannelEventPolicyLabelPrefix + "broker-name": broker.Name, + } +} + +func BrokerEventPolicyName(brokerName, channelName string) string { + return kmeta.ChildName(brokerName, "-ep-"+channelName) +} diff --git a/pkg/reconciler/broker/resources/eventpolicy_test.go b/pkg/reconciler/broker/resources/eventpolicy_test.go new file mode 100644 index 00000000000..148da67b0ec --- /dev/null +++ b/pkg/reconciler/broker/resources/eventpolicy_test.go @@ -0,0 +1,77 @@ +/* +Copyright 2024 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resources + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/ptr" + eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" + eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1" +) + +func TestMakeEventPolicyForBackingChannel(t *testing.T) { + broker := &eventingv1.Broker{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-broker", + Namespace: "test-namespace", + }, + } + backingChannel := &eventingduckv1.Channelable{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-channel", + Namespace: "test-namespace", + }, + } + want := &eventingv1alpha1.EventPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "test-namespace", + Name: BrokerEventPolicyName(broker.Name, backingChannel.Name), + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "eventing.knative.dev/v1", + Kind: "Broker", + Name: "test-broker", + }, + }, + Labels: LabelsForBackingChannelsEventPolicy(broker), + }, + Spec: eventingv1alpha1.EventPolicySpec{ + To: []eventingv1alpha1.EventPolicySpecTo{ + { + Ref: &eventingv1alpha1.EventPolicyToReference{ + APIVersion: backingChannel.APIVersion, + Kind: backingChannel.Kind, + Name: backingChannel.Name, + }, + }, + }, + From: []eventingv1alpha1.EventPolicySpecFrom{ + { + Sub: ptr.To(OIDCBrokerSub), + }, + }, + }, + } + got := MakeEventPolicyForBackingChannel(broker, backingChannel) + if diff := cmp.Diff(want, got); diff != "" { + t.Errorf("MakeEventPolicyForBackingChannel() (-want, +got) = %v", diff) + } +} diff --git a/pkg/reconciler/testing/v1/eventpolicy.go b/pkg/reconciler/testing/v1/eventpolicy.go index 442fd397483..df0dedc9700 100644 --- a/pkg/reconciler/testing/v1/eventpolicy.go +++ b/pkg/reconciler/testing/v1/eventpolicy.go @@ -138,6 +138,14 @@ func WithEventPolicyFrom(gvk metav1.GroupVersionKind, name, namespace string) Ev } } +func WithEventPolicyFromSub(sub string) EventPolicyOption { + return func(ep *v1alpha1.EventPolicy) { + ep.Spec.From = append(ep.Spec.From, v1alpha1.EventPolicySpecFrom{ + Sub: &sub, + }) + } +} + func WithEventPolicyLabels(labels map[string]string) EventPolicyOption { return func(ep *v1alpha1.EventPolicy) { ep.ObjectMeta.Labels = labels