Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reconcile event policies for mt-broker #8090

Merged
merged 10 commits into from
Jul 12, 2024
76 changes: 76 additions & 0 deletions pkg/reconciler/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
duckv1 "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/eventing/pkg/apis/eventing"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
"knative.dev/eventing/pkg/apis/feature"
messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1"
"knative.dev/eventing/pkg/auth"
Expand Down Expand Up @@ -264,6 +265,11 @@
return fmt.Errorf("could not update broker status with EventPolicies: %v", err)
}

// Reconcile the EventPolicy for the Broker.
if err := r.reconcileEventPolicy(ctx, b, chanMan, featureFlags); err != nil {
return fmt.Errorf("failed to reconcile EventPolicy for Broker %s: %w", b.Name, err)
}

// So, at this point the Broker is ready and everything should be solid
// for the triggers to act upon.
return nil
Expand Down Expand Up @@ -428,6 +434,44 @@
return channelable, nil
}

func (r *Reconciler) reconcileEventPolicy(ctx context.Context, b *eventingv1.Broker, chanMan *channelTemplate, featureFlags feature.Flags) error {
eventPolicyObjRef := &corev1.ObjectReference{
Name: b.Name,
Namespace: b.Namespace,
}

if featureFlags.IsOIDCAuthentication() {
// get the eventpolicy , create if not exists.
_, err := r.eventPolicyLister.EventPolicies(b.Namespace).Get(b.Name)
switch {
case apierrs.IsNotFound(err):
// create the EventPolicy if it doesn't exists.
logging.FromContext(ctx).Info("Creating EventPolicy for Broker %s", eventPolicyObjRef.Name)
eventPolicy, err := r.getEventPolicyForBroker(b, chanMan)
if err != nil {
return fmt.Errorf("failed to create EventPolicy for Broker %s: %w", eventPolicyObjRef.Name, err)
}
_, err = r.eventingClientSet.EventingV1alpha1().EventPolicies(eventPolicyObjRef.Namespace).Create(ctx, eventPolicy, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("failed to create EventPolicy for Broker %s: %w", eventPolicyObjRef.Name, err)
}
logging.FromContext(ctx).Info("Created EventPolicy for Broker %s", eventPolicyObjRef.Name)
case err != nil:
return fmt.Errorf("failed to get EventPolicy for Broker %s: %w", eventPolicyObjRef.Name, err)
}
} else {
// list the eventpolicy, delete if exists.
err := r.eventingClientSet.EventingV1alpha1().EventPolicies(eventPolicyObjRef.Namespace).Delete(ctx, eventPolicyObjRef.Name, metav1.DeleteOptions{})
switch {
case apierrs.IsNotFound(err):
logging.FromContext(ctx).Info("EventPolicy for Broker %s does not exist", eventPolicyObjRef.Name)
case err != nil:
return fmt.Errorf("failed to delete EventPolicy for Broker %s: %w", eventPolicyObjRef.Name, err)
}
}
return nil
}

// TriggerChannelLabels are all the labels placed on the Trigger Channel for the given brokerName. This
// should only be used by Broker and Trigger code.
func TriggerChannelLabels(brokerName, brokerNamespace string) map[string]string {
Expand Down Expand Up @@ -469,3 +513,35 @@
httpsAddress.URL.Path = fmt.Sprintf("/%s/%s", b.Namespace, b.Name)
return httpsAddress
}

// getEventPolicyForBroker returns the EventPolicy for the Broker.
func (r *Reconciler) getEventPolicyForBroker(b *eventingv1.Broker, chanMan *channelTemplate) (*eventingv1alpha1.EventPolicy, error) {

Check failure on line 518 in pkg/reconciler/broker/broker.go

View workflow job for this annotation

GitHub Actions / style / Golang / Lint

(*Reconciler).getEventPolicyForBroker - result 1 (error) is always nil (unparam)
rahulii marked this conversation as resolved.
Show resolved Hide resolved
oidcBrokerSub := "system:serviceaccount:knative-eventing:mt-broker-ingress-oidc"
rahulii marked this conversation as resolved.
Show resolved Hide resolved
ep := &eventingv1alpha1.EventPolicy{
ObjectMeta: metav1.ObjectMeta{
// TODO: This is a temporary name, we need to come up with a better name for the EventPolicy.
Name: b.Name,
Namespace: b.Namespace,
OwnerReferences: []metav1.OwnerReference{
*kmeta.NewControllerRef(b),
},
},
Spec: eventingv1alpha1.EventPolicySpec{
To: []eventingv1alpha1.EventPolicySpecTo{
{
Ref: &eventingv1alpha1.EventPolicyToReference{
APIVersion: chanMan.template.APIVersion,
Kind: chanMan.template.Kind,
Name: chanMan.ref.Name,
},
},
},
From: []eventingv1alpha1.EventPolicySpecFrom{
{
Sub: &oidcBrokerSub,
},
},
},
}
return ep, nil
}
Loading