Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reconcile event policies for mt-broker #8090

Merged
merged 10 commits into from
Jul 12, 2024
56 changes: 56 additions & 0 deletions pkg/reconciler/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/dynamic"
corev1listers "k8s.io/client-go/listers/core/v1"
Expand All @@ -46,6 +47,7 @@ import (
duckv1 "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/eventing/pkg/apis/eventing"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
"knative.dev/eventing/pkg/apis/feature"
messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1"
"knative.dev/eventing/pkg/auth"
Expand Down Expand Up @@ -264,6 +266,11 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, b *eventingv1.Broker) pk
return fmt.Errorf("could not update broker status with EventPolicies: %v", err)
}

// Reconcile the EventPolicy for the Broker.
if err := r.reconcileBrokerChannelEventPolicies(ctx, b, triggerChan, featureFlags); err != nil {
return fmt.Errorf("failed to reconcile EventPolicy for Broker %s: %w", b.Name, err)
}

// So, at this point the Broker is ready and everything should be solid
// for the triggers to act upon.
return nil
Expand Down Expand Up @@ -428,6 +435,55 @@ func (r *Reconciler) reconcileChannel(ctx context.Context, channelResourceInterf
return channelable, nil
}

func (r *Reconciler) reconcileBrokerChannelEventPolicies(ctx context.Context, b *eventingv1.Broker, triggerChan *duckv1.Channelable, featureFlags feature.Flags) error {
expected := resources.MakeEventPolicyForBackingChannel(b, triggerChan)

if featureFlags.IsOIDCAuthentication() {
// get the eventpolicy , create if not exists.
foundEP, err := r.eventPolicyLister.EventPolicies(expected.Namespace).Get(expected.Name)
if apierrs.IsNotFound(err) {
// create the EventPolicy if it doesn't exists.
logging.FromContext(ctx).Info("Creating EventPolicy for Broker %s", expected.Name)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm adding a comment here, but it applies to all of the logging in this function:

I don't think these need to be logged at the Info level, it generally seems to be Debug logs. Additionally, if we are going to be using the logger everywhere, it seems to make sense to create the logger once at the start of the function and then re-use it throughout, instead of fetching it from the context every time we want to log something

Copy link
Contributor Author

@rahulii rahulii Jul 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, will change from Info -> Debug.
On the other point, I was trying to be consistent with the rest of the code base!
So, should I keep it as it is for change to create a single logger object?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would create a single logger object for this function if you are going to have this many log statements. In other places, since there isn't a ton of logging it might be fine to just fetch the logger everytime. But, normally we fetch the logger once when we have this much logging to do in a function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, PTAL!


_, err = r.eventingClientSet.EventingV1alpha1().EventPolicies(expected.Namespace).Create(ctx, expected, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("failed to create EventPolicy for Broker %s: %w", expected.Name, err)
}
} else if err != nil {
return fmt.Errorf("failed to get EventPolicy for Broker %s: %w", expected.Name, err)
} else if r.policyNeedsUpdate(foundEP, expected) {
// update the EventPolicy if it exists and needs update.
logging.FromContext(ctx).Info("Updating EventPolicy for Broker %s", expected.Name)
expected.SetResourceVersion(foundEP.GetResourceVersion())
_, err = r.eventingClientSet.EventingV1alpha1().EventPolicies(expected.Namespace).Update(ctx, expected, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("failed to update EventPolicy for Broker %s: %w", expected.Name, err)
}
}
} else {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all of these if/else blocks can make it a little unclear what is going on, I might refactor this to return early in many of these cases. For example, if the EventPolicy is successfully created, we can return nil. This gets rid of the need for the else if r.policyNeedsUpdate(...), which can be turned into a simpler if r.policyNeedsUpdate(...).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, thanks for the feedback, I will make the changes!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, PTAL!

// list all the orphaned eventpolicies that have owner reference set to the broker and delete them.
eventPolicies, err := r.eventPolicyLister.EventPolicies(expected.Namespace).List(labels.Everything())
if err != nil {
return fmt.Errorf("failed to list EventPolicies for Broker %s: %w", expected.Name, err)
}
for _, ep := range eventPolicies {
if metav1.IsControlledBy(ep, b) {
logging.FromContext(ctx).Info("Deleting EventPolicy for Broker %s", expected.Name)
err := r.eventingClientSet.EventingV1alpha1().EventPolicies(ep.Namespace).Delete(ctx, ep.Name, metav1.DeleteOptions{})
if err != nil {
return fmt.Errorf("failed to delete EventPolicy for Broker %s: %w", expected.Name, err)
}
logging.FromContext(ctx).Info("Deleted EventPolicy for Broker %s", expected.Name)
}
}
}
return nil
}

func (r *Reconciler) policyNeedsUpdate(foundEP, expected *eventingv1alpha1.EventPolicy) bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this function doesn't really need to be a method on the Reconciler struct

Suggested change
func (r *Reconciler) policyNeedsUpdate(foundEP, expected *eventingv1alpha1.EventPolicy) bool {
func policyNeedsUpdate(foundEP, expected *eventingv1alpha1.EventPolicy) bool {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, PTAL!

return !equality.Semantic.DeepDerivative(expected, foundEP)
}

// TriggerChannelLabels are all the labels placed on the Trigger Channel for the given brokerName. This
// should only be used by Broker and Trigger code.
func TriggerChannelLabels(brokerName, brokerNamespace string) map[string]string {
Expand Down
81 changes: 81 additions & 0 deletions pkg/reconciler/broker/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (

"knative.dev/eventing/pkg/apis/eventing"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
"knative.dev/eventing/pkg/apis/feature"
"knative.dev/eventing/pkg/auth"
fakeeventingclient "knative.dev/eventing/pkg/client/injection/client/fake"
Expand All @@ -57,6 +58,7 @@ import (
. "knative.dev/pkg/reconciler/testing"

_ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger/fake"
"knative.dev/eventing/pkg/reconciler/broker/resources"
. "knative.dev/eventing/pkg/reconciler/testing/v1"
)

Expand Down Expand Up @@ -144,6 +146,12 @@ var (
Version: "v1",
Kind: "Broker",
}

channelV1GVK = metav1.GroupVersionKind{
Group: "messaging.knative.dev",
Version: "v1",
Kind: "InMemoryChannel",
}
)

func TestReconcile(t *testing.T) {
Expand Down Expand Up @@ -780,6 +788,9 @@ func TestReconcile(t *testing.T) {
WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})),
},
WantErr: false,
WantCreates: []runtime.Object{
makeEventPolicy(),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: NewBroker(brokerName, testNS,
WithBrokerClass(eventing.MTChannelBrokerClassValue),
Expand Down Expand Up @@ -824,6 +835,9 @@ func TestReconcile(t *testing.T) {
WithEventPolicyToRef(brokerV1GVK, brokerName),
),
},
WantCreates: []runtime.Object{
makeEventPolicy(),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: NewBroker(brokerName, testNS,
WithBrokerClass(eventing.MTChannelBrokerClassValue),
Expand Down Expand Up @@ -866,6 +880,9 @@ func TestReconcile(t *testing.T) {
WithEventPolicyToRef(brokerV1GVK, brokerName),
),
},
WantCreates: []runtime.Object{
makeEventPolicy(),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: NewBroker(brokerName, testNS,
WithBrokerClass(eventing.MTChannelBrokerClassValue),
Expand Down Expand Up @@ -911,6 +928,9 @@ func TestReconcile(t *testing.T) {
WithEventPolicyToRef(brokerV1GVK, brokerName),
),
},
WantCreates: []runtime.Object{
makeEventPolicy(),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: NewBroker(brokerName, testNS,
WithBrokerClass(eventing.MTChannelBrokerClassValue),
Expand All @@ -932,6 +952,47 @@ func TestReconcile(t *testing.T) {
Ctx: feature.ToContext(context.Background(), feature.Flags{
feature.OIDCAuthentication: feature.Enabled,
}),
}, {
Name: "Should create an Event Policy for a Broker's underlying Channel",
Key: testKey,
Objects: []runtime.Object{
NewBroker(brokerName, testNS,
WithBrokerClass(eventing.MTChannelBrokerClassValue),
WithBrokerConfig(config()),
WithInitBrokerConditions),
createChannel(withChannelReady),
imcConfigMap(),
NewEndpoints(filterServiceName, systemNS,
WithEndpointsLabels(FilterLabels()),
WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})),
NewEndpoints(ingressServiceName, systemNS,
WithEndpointsLabels(IngressLabels()),
WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})),
},
WantCreates: []runtime.Object{
makeEventPolicy(),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: NewBroker(brokerName, testNS,
WithBrokerClass(eventing.MTChannelBrokerClassValue),
WithBrokerConfig(config()),
WithBrokerReady,
WithBrokerAddress(&duckv1.Addressable{
URL: brokerAddress,
Audience: &brokerAudience,
}),
WithChannelAddressAnnotation(triggerChannelURL),
WithChannelAPIVersionAnnotation(triggerChannelAPIVersion),
WithChannelKindAnnotation(triggerChannelKind),
WithChannelNameAnnotation(triggerChannelName),
WithDLSNotConfigured(),
WithBrokerEventPoliciesReadyBecauseNoPolicyAndOIDCEnabled(),
),
}},
Ctx: feature.ToContext(context.Background(), feature.Flags{
feature.OIDCAuthentication: feature.Enabled,
feature.AuthorizationDefaultMode: feature.AuthorizationAllowSameNamespace,
}),
},
}

Expand Down Expand Up @@ -1227,3 +1288,23 @@ func makeTLSSecret() *corev1.Secret {
Type: corev1.SecretTypeTLS,
}
}

func makeEventPolicy() *eventingv1alpha1.EventPolicy {
return NewEventPolicy(brokerName+"-event-policy", testNS,
WithEventPolicyToRef(channelV1GVK, triggerChannelName),
WithEventPolicyFromSub(resources.OIDCBrokerSub),
WithEventPolicyOwnerReferences([]metav1.OwnerReference{
{
APIVersion: "eventing.knative.dev/v1",
Kind: "Broker",
Name: brokerName,
},
}...),
WithEventPolicyLabels(map[string]string{
"eventing.knative.dev/" + "broker-group": brokerV1GVK.Group,
"eventing.knative.dev/" + "broker-version": brokerV1GVK.Version,
"eventing.knative.dev/" + "broker-kind": brokerV1GVK.Kind,
"eventing.knative.dev/" + "broker-name": brokerName,
}),
)
}
79 changes: 79 additions & 0 deletions pkg/reconciler/broker/resources/eventpolicy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
Copyright 2024 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package resources

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
)

const (
BackingChannelEventPolicyLabelPrefix = "eventing.knative.dev/"
OIDCBrokerSub = "system:serviceaccount:knative-eventing:mt-broker-ingress-oidc"
brokerAPIVersion = "eventing.knative.dev/v1"
version = "v1"
brokerKind = "Broker"
brokerGroup = "eventing.knative.dev"
)

func MakeEventPolicyForBackingChannel(b *eventingv1.Broker, backingChannel *eventingduckv1.Channelable) *eventingv1alpha1.EventPolicy {
return &eventingv1alpha1.EventPolicy{
ObjectMeta: metav1.ObjectMeta{
Namespace: backingChannel.Namespace,
Name: b.Name + "-event-policy",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a bit more complexity here than just adding the suffix to the name of the broker, since we might run into scenarios where the resulting name exceeds the allowed name length in k8s. Instead, we use kmeta.ChildName(parent, suffix) to generate the names normally. See here: https://pkg.go.dev/knative.dev/pkg/kmeta#ChildName

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, PTAL!

OwnerReferences: []metav1.OwnerReference{
{
APIVersion: brokerAPIVersion,
Kind: brokerKind,
Name: b.Name,
},
},
Labels: LabelsForBackingChannelsEventPolicy(b),
},
Spec: eventingv1alpha1.EventPolicySpec{
To: []eventingv1alpha1.EventPolicySpecTo{
{
Ref: &eventingv1alpha1.EventPolicyToReference{
APIVersion: backingChannel.APIVersion,
Kind: backingChannel.Kind,
Name: backingChannel.Name,
},
},
},
From: []eventingv1alpha1.EventPolicySpecFrom{
{
Sub: toStrPtr(OIDCBrokerSub),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using our own implementation here, we normally use the k8s ptr package, so this would become ptr.To(OIDCBrokerSub)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, PTAL!

},
},
},
}
}

func LabelsForBackingChannelsEventPolicy(broker *eventingv1.Broker) map[string]string {
return map[string]string{
BackingChannelEventPolicyLabelPrefix + "broker-group": brokerGroup,
BackingChannelEventPolicyLabelPrefix + "broker-version": version,
BackingChannelEventPolicyLabelPrefix + "broker-kind": brokerKind,
BackingChannelEventPolicyLabelPrefix + "broker-name": broker.Name,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious if we actually need all of these labels for the EventPolicies, what exactly are you using them all for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Cali0707 I took reference from channel - https://github.com/knative/eventing/blob/main/pkg/reconciler/channel/resources/eventpolicy.go#L69
Hence, thought of adding the same here. LMK if you think it shouldn't be there, I will remove it !

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah looking there it seems like those labels are used to correctly select the EventPolicies which were created by the channel. We should also use the labels to make a label selector when checking which EventPolicies are owned by the broker. See here for how it is used in the channel:

applyingEventPoliciesForBackingChannel, err := auth.GetEventPoliciesForResource(r.eventPolicyLister, backingChannel.GroupVersionKind(), backingChannel.ObjectMeta)
if err != nil {
return fmt.Errorf("could not get applying EventPolicies for for backing channel %s/%s: %w", channel.Namespace, channel.Name, err)
}
selector, err := labels.ValidatedSelectorFromSet(resources.LabelsForBackingChannelsEventPolicy(backingChannel))
if err != nil {
return fmt.Errorf("could not get valid selector for backing channels EventPolicy %s/%s: %w", backingChannel.Namespace, backingChannel.Name, err)
}
existingEventPoliciesForBackingChannel, err := r.eventPolicyLister.EventPolicies(backingChannel.Namespace).List(selector)
if err != nil {
return fmt.Errorf("could not get existing EventPolicies in backing channels namespace %q: %w", backingChannel.Namespace, err)
}
for _, policy := range existingEventPoliciesForBackingChannel {
if !r.containsPolicy(policy.Name, applyingEventPoliciesForBackingChannel) {
// the existing policy is not in the list of applying policies anymore --> is outdated --> delete it
err := r.eventingClientSet.EventingV1alpha1().EventPolicies(policy.Namespace).Delete(ctx, policy.Name, metav1.DeleteOptions{})
if err != nil && apierrs.IsNotFound(err) {
return fmt.Errorf("could not delete old EventPolicy %s/%s: %w", policy.Namespace, policy.Name, err)
}
}
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gotcha. So just to clarify: currently we are filtering EventPolicies based on the Owner References of broker, instead filter out based on the labels, right ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, filtering the eventpolicies by ownerreference is good, but when we list the eventpolicies, we should use a label selector with these labels (see line 152 in the code I linked above). So, to summarize what we would do is:

  1. List the EventPolicy resources using a label selector to only retrieve relevant ones
  2. Filter the EventPolicy resources by owner reference

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, PTAL!

}
}

func toStrPtr(s string) *string {
return &s
}
76 changes: 76 additions & 0 deletions pkg/reconciler/broker/resources/eventpolicy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
Copyright 2024 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package resources

import (
"testing"

"github.com/google/go-cmp/cmp"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
)

func TestMakeEventPolicyForBackingChannel(t *testing.T) {
broker := &eventingv1.Broker{
ObjectMeta: metav1.ObjectMeta{
Name: "test-broker",
Namespace: "test-namespace",
},
}
backingChannel := &eventingduckv1.Channelable{
ObjectMeta: metav1.ObjectMeta{
Name: "test-channel",
Namespace: "test-namespace",
},
}
want := &eventingv1alpha1.EventPolicy{
ObjectMeta: metav1.ObjectMeta{
Namespace: "test-namespace",
Name: "test-broker-event-policy",
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "eventing.knative.dev/v1",
Kind: "Broker",
Name: "test-broker",
},
},
Labels: LabelsForBackingChannelsEventPolicy(broker),
},
Spec: eventingv1alpha1.EventPolicySpec{
To: []eventingv1alpha1.EventPolicySpecTo{
{
Ref: &eventingv1alpha1.EventPolicyToReference{
APIVersion: backingChannel.APIVersion,
Kind: backingChannel.Kind,
Name: backingChannel.Name,
},
},
},
From: []eventingv1alpha1.EventPolicySpecFrom{
{
Sub: toStrPtr(OIDCBrokerSub),
},
},
},
}
got := MakeEventPolicyForBackingChannel(broker, backingChannel)
if diff := cmp.Diff(want, got); diff != "" {
t.Errorf("MakeEventPolicyForBackingChannel() (-want, +got) = %v", diff)
}
}
8 changes: 8 additions & 0 deletions pkg/reconciler/testing/v1/eventpolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,14 @@ func WithEventPolicyFrom(gvk metav1.GroupVersionKind, name, namespace string) Ev
}
}

func WithEventPolicyFromSub(sub string) EventPolicyOption {
return func(ep *v1alpha1.EventPolicy) {
ep.Spec.From = append(ep.Spec.From, v1alpha1.EventPolicySpecFrom{
Sub: &sub,
})
}
}

func WithEventPolicyLabels(labels map[string]string) EventPolicyOption {
return func(ep *v1alpha1.EventPolicy) {
ep.ObjectMeta.Labels = labels
Expand Down
Loading