Skip to content

Commit

Permalink
Reconcile event policies for mt-broker (#8090)
Browse files Browse the repository at this point in the history
* reconcile event policies for broker

Signed-off-by: rahulii <r.sawra@gmail.com>

* delete the orphaned event policies by owner references and changes as per review comments

Signed-off-by: rahulii <r.sawra@gmail.com>

* fix tests and move event policy creation to new file

Signed-off-by: rahulii <r.sawra@gmail.com>

* add test cases

Signed-off-by: rahulii <r.sawra@gmail.com>

* add test cases

Signed-off-by: rahulii <r.sawra@gmail.com>

* changes to review comments

Signed-off-by: rahulii <r.sawra@gmail.com>

* changes to review comments

Signed-off-by: rahulii <r.sawra@gmail.com>

* refactor the code to reduce redundant if else statements

Signed-off-by: rahulii <r.sawra@gmail.com>

* change logs from info to debug

Signed-off-by: rahulii <r.sawra@gmail.com>

* use a single logger object instead of creating it everytime from context and some minor fixes

Signed-off-by: rahulii <r.sawra@gmail.com>

---------

Signed-off-by: rahulii <r.sawra@gmail.com>
  • Loading branch information
rahulii authored Jul 12, 2024
1 parent 5d6c780 commit a61107c
Show file tree
Hide file tree
Showing 5 changed files with 309 additions and 0 deletions.
65 changes: 65 additions & 0 deletions pkg/reconciler/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/dynamic"
corev1listers "k8s.io/client-go/listers/core/v1"
Expand All @@ -46,6 +47,7 @@ import (
duckv1 "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/eventing/pkg/apis/eventing"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
"knative.dev/eventing/pkg/apis/feature"
messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1"
"knative.dev/eventing/pkg/auth"
Expand Down Expand Up @@ -264,6 +266,11 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, b *eventingv1.Broker) pk
return fmt.Errorf("could not update broker status with EventPolicies: %v", err)
}

// Reconcile the EventPolicy for the Broker.
if err := r.reconcileBrokerChannelEventPolicies(ctx, b, triggerChan, featureFlags); err != nil {
return fmt.Errorf("failed to reconcile EventPolicy for Broker %s: %w", b.Name, err)
}

// So, at this point the Broker is ready and everything should be solid
// for the triggers to act upon.
return nil
Expand Down Expand Up @@ -428,6 +435,64 @@ func (r *Reconciler) reconcileChannel(ctx context.Context, channelResourceInterf
return channelable, nil
}

func (r *Reconciler) reconcileBrokerChannelEventPolicies(ctx context.Context, b *eventingv1.Broker, triggerChan *duckv1.Channelable, featureFlags feature.Flags) error {
logger := logging.FromContext(ctx)

expected := resources.MakeEventPolicyForBackingChannel(b, triggerChan)
if featureFlags.IsOIDCAuthentication() {
// Get the EventPolicy, create if not exists.
foundEP, err := r.eventPolicyLister.EventPolicies(expected.Namespace).Get(expected.Name)
if apierrs.IsNotFound(err) {
// Create the EventPolicy since it doesn't exist.
logger.Debugw("Creating EventPolicy for Broker %s", expected.Name)

_, err = r.eventingClientSet.EventingV1alpha1().EventPolicies(expected.Namespace).Create(ctx, expected, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("failed to create EventPolicy for Broker %s: %w", expected.Name, err)
}
return nil
}
if err != nil {
return fmt.Errorf("failed to get EventPolicy for Broker %s: %w", expected.Name, err)
}
if policyNeedsUpdate(foundEP, expected) {
// Update the EventPolicy since it exists and needs update.
logger.Debugw("Updating EventPolicy for Broker %s", expected.Name)
expected.SetResourceVersion(foundEP.GetResourceVersion())
_, err = r.eventingClientSet.EventingV1alpha1().EventPolicies(expected.Namespace).Update(ctx, expected, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("failed to update EventPolicy for Broker %s: %w", expected.Name, err)
}
}
return nil
}

// List all the orphaned EventPolicies that have owner reference set to the Broker and delete them.
selector, err := labels.ValidatedSelectorFromSet(resources.LabelsForBackingChannelsEventPolicy(b))
if err != nil {
return fmt.Errorf("could not get valid selector for broker's channel EventPolicy %s/%s: %w", b.Namespace, b.Name, err)
}
eventPolicies, err := r.eventPolicyLister.EventPolicies(expected.Namespace).List(selector)
if err != nil {
return fmt.Errorf("failed to list EventPolicies for Broker %s: %w", expected.Name, err)
}
for _, ep := range eventPolicies {
if metav1.IsControlledBy(ep, b) {
logger.Debugw("Deleting EventPolicy for Broker %s", expected.Name)
err := r.eventingClientSet.EventingV1alpha1().EventPolicies(ep.Namespace).Delete(ctx, ep.Name, metav1.DeleteOptions{})
if err != nil {
return fmt.Errorf("failed to delete EventPolicy for Broker %s: %w", expected.Name, err)
}
logger.Debugw("Deleted EventPolicy for Broker %s", expected.Name)
}
}
return nil
}

func policyNeedsUpdate(foundEP, expected *eventingv1alpha1.EventPolicy) bool {
return !equality.Semantic.DeepDerivative(expected, foundEP)
}

// TriggerChannelLabels are all the labels placed on the Trigger Channel for the given brokerName. This
// should only be used by Broker and Trigger code.
func TriggerChannelLabels(brokerName, brokerNamespace string) map[string]string {
Expand Down
81 changes: 81 additions & 0 deletions pkg/reconciler/broker/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (

"knative.dev/eventing/pkg/apis/eventing"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
"knative.dev/eventing/pkg/apis/feature"
"knative.dev/eventing/pkg/auth"
fakeeventingclient "knative.dev/eventing/pkg/client/injection/client/fake"
Expand All @@ -57,6 +58,7 @@ import (
. "knative.dev/pkg/reconciler/testing"

_ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger/fake"
"knative.dev/eventing/pkg/reconciler/broker/resources"
. "knative.dev/eventing/pkg/reconciler/testing/v1"
)

Expand Down Expand Up @@ -144,6 +146,12 @@ var (
Version: "v1",
Kind: "Broker",
}

channelV1GVK = metav1.GroupVersionKind{
Group: "messaging.knative.dev",
Version: "v1",
Kind: "InMemoryChannel",
}
)

func TestReconcile(t *testing.T) {
Expand Down Expand Up @@ -780,6 +788,9 @@ func TestReconcile(t *testing.T) {
WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})),
},
WantErr: false,
WantCreates: []runtime.Object{
makeEventPolicy(),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: NewBroker(brokerName, testNS,
WithBrokerClass(eventing.MTChannelBrokerClassValue),
Expand Down Expand Up @@ -824,6 +835,9 @@ func TestReconcile(t *testing.T) {
WithEventPolicyToRef(brokerV1GVK, brokerName),
),
},
WantCreates: []runtime.Object{
makeEventPolicy(),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: NewBroker(brokerName, testNS,
WithBrokerClass(eventing.MTChannelBrokerClassValue),
Expand Down Expand Up @@ -866,6 +880,9 @@ func TestReconcile(t *testing.T) {
WithEventPolicyToRef(brokerV1GVK, brokerName),
),
},
WantCreates: []runtime.Object{
makeEventPolicy(),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: NewBroker(brokerName, testNS,
WithBrokerClass(eventing.MTChannelBrokerClassValue),
Expand Down Expand Up @@ -911,6 +928,9 @@ func TestReconcile(t *testing.T) {
WithEventPolicyToRef(brokerV1GVK, brokerName),
),
},
WantCreates: []runtime.Object{
makeEventPolicy(),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: NewBroker(brokerName, testNS,
WithBrokerClass(eventing.MTChannelBrokerClassValue),
Expand All @@ -932,6 +952,47 @@ func TestReconcile(t *testing.T) {
Ctx: feature.ToContext(context.Background(), feature.Flags{
feature.OIDCAuthentication: feature.Enabled,
}),
}, {
Name: "Should create an Event Policy for a Broker's underlying Channel",
Key: testKey,
Objects: []runtime.Object{
NewBroker(brokerName, testNS,
WithBrokerClass(eventing.MTChannelBrokerClassValue),
WithBrokerConfig(config()),
WithInitBrokerConditions),
createChannel(withChannelReady),
imcConfigMap(),
NewEndpoints(filterServiceName, systemNS,
WithEndpointsLabels(FilterLabels()),
WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})),
NewEndpoints(ingressServiceName, systemNS,
WithEndpointsLabels(IngressLabels()),
WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})),
},
WantCreates: []runtime.Object{
makeEventPolicy(),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: NewBroker(brokerName, testNS,
WithBrokerClass(eventing.MTChannelBrokerClassValue),
WithBrokerConfig(config()),
WithBrokerReady,
WithBrokerAddress(&duckv1.Addressable{
URL: brokerAddress,
Audience: &brokerAudience,
}),
WithChannelAddressAnnotation(triggerChannelURL),
WithChannelAPIVersionAnnotation(triggerChannelAPIVersion),
WithChannelKindAnnotation(triggerChannelKind),
WithChannelNameAnnotation(triggerChannelName),
WithDLSNotConfigured(),
WithBrokerEventPoliciesReadyBecauseNoPolicyAndOIDCEnabled(),
),
}},
Ctx: feature.ToContext(context.Background(), feature.Flags{
feature.OIDCAuthentication: feature.Enabled,
feature.AuthorizationDefaultMode: feature.AuthorizationAllowSameNamespace,
}),
},
}

Expand Down Expand Up @@ -1227,3 +1288,23 @@ func makeTLSSecret() *corev1.Secret {
Type: corev1.SecretTypeTLS,
}
}

func makeEventPolicy() *eventingv1alpha1.EventPolicy {
return NewEventPolicy(resources.BrokerEventPolicyName(brokerName, triggerChannelName), testNS,
WithEventPolicyToRef(channelV1GVK, triggerChannelName),
WithEventPolicyFromSub(resources.OIDCBrokerSub),
WithEventPolicyOwnerReferences([]metav1.OwnerReference{
{
APIVersion: "eventing.knative.dev/v1",
Kind: "Broker",
Name: brokerName,
},
}...),
WithEventPolicyLabels(map[string]string{
"eventing.knative.dev/" + "broker-group": brokerV1GVK.Group,
"eventing.knative.dev/" + "broker-version": brokerV1GVK.Version,
"eventing.knative.dev/" + "broker-kind": brokerV1GVK.Kind,
"eventing.knative.dev/" + "broker-name": brokerName,
}),
)
}
78 changes: 78 additions & 0 deletions pkg/reconciler/broker/resources/eventpolicy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
Copyright 2024 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package resources

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/ptr"
eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
"knative.dev/pkg/kmeta"
)

const (
BackingChannelEventPolicyLabelPrefix = "eventing.knative.dev/"
OIDCBrokerSub = "system:serviceaccount:knative-eventing:mt-broker-ingress-oidc"
brokerKind = "Broker"
)

func MakeEventPolicyForBackingChannel(b *eventingv1.Broker, backingChannel *eventingduckv1.Channelable) *eventingv1alpha1.EventPolicy {
return &eventingv1alpha1.EventPolicy{
ObjectMeta: metav1.ObjectMeta{
Namespace: backingChannel.Namespace,
Name: BrokerEventPolicyName(b.Name, backingChannel.Name),
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: eventingv1.SchemeGroupVersion.String(),
Kind: brokerKind,
Name: b.Name,
},
},
Labels: LabelsForBackingChannelsEventPolicy(b),
},
Spec: eventingv1alpha1.EventPolicySpec{
To: []eventingv1alpha1.EventPolicySpecTo{
{
Ref: &eventingv1alpha1.EventPolicyToReference{
APIVersion: backingChannel.APIVersion,
Kind: backingChannel.Kind,
Name: backingChannel.Name,
},
},
},
From: []eventingv1alpha1.EventPolicySpecFrom{
{
Sub: ptr.To(OIDCBrokerSub),
},
},
},
}
}

func LabelsForBackingChannelsEventPolicy(broker *eventingv1.Broker) map[string]string {
return map[string]string{
BackingChannelEventPolicyLabelPrefix + "broker-group": eventingv1.SchemeGroupVersion.Group,
BackingChannelEventPolicyLabelPrefix + "broker-version": eventingv1.SchemeGroupVersion.Version,
BackingChannelEventPolicyLabelPrefix + "broker-kind": brokerKind,
BackingChannelEventPolicyLabelPrefix + "broker-name": broker.Name,
}
}

func BrokerEventPolicyName(brokerName, channelName string) string {
return kmeta.ChildName(brokerName, "-ep-"+channelName)
}
77 changes: 77 additions & 0 deletions pkg/reconciler/broker/resources/eventpolicy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
Copyright 2024 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package resources

import (
"testing"

"github.com/google/go-cmp/cmp"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/ptr"
eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
)

func TestMakeEventPolicyForBackingChannel(t *testing.T) {
broker := &eventingv1.Broker{
ObjectMeta: metav1.ObjectMeta{
Name: "test-broker",
Namespace: "test-namespace",
},
}
backingChannel := &eventingduckv1.Channelable{
ObjectMeta: metav1.ObjectMeta{
Name: "test-channel",
Namespace: "test-namespace",
},
}
want := &eventingv1alpha1.EventPolicy{
ObjectMeta: metav1.ObjectMeta{
Namespace: "test-namespace",
Name: BrokerEventPolicyName(broker.Name, backingChannel.Name),
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "eventing.knative.dev/v1",
Kind: "Broker",
Name: "test-broker",
},
},
Labels: LabelsForBackingChannelsEventPolicy(broker),
},
Spec: eventingv1alpha1.EventPolicySpec{
To: []eventingv1alpha1.EventPolicySpecTo{
{
Ref: &eventingv1alpha1.EventPolicyToReference{
APIVersion: backingChannel.APIVersion,
Kind: backingChannel.Kind,
Name: backingChannel.Name,
},
},
},
From: []eventingv1alpha1.EventPolicySpecFrom{
{
Sub: ptr.To(OIDCBrokerSub),
},
},
},
}
got := MakeEventPolicyForBackingChannel(broker, backingChannel)
if diff := cmp.Diff(want, got); diff != "" {
t.Errorf("MakeEventPolicyForBackingChannel() (-want, +got) = %v", diff)
}
}
Loading

0 comments on commit a61107c

Please sign in to comment.