Skip to content

Commit

Permalink
Merge branch 'main' into rewrite-stats-reporter
Browse files Browse the repository at this point in the history
  • Loading branch information
Leo6Leo committed Aug 9, 2024
2 parents eaad51a + c521efb commit 6ad3213
Show file tree
Hide file tree
Showing 23 changed files with 4,094 additions and 508 deletions.
1 change: 1 addition & 0 deletions OWNERS_ALIASES
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ aliases:
eventing-wg-leads:
- pierDipi
eventing-writers:
- Leo6Leo
- aliok
- cali0707
- creydr
Expand Down
4 changes: 2 additions & 2 deletions hack/update-cert-manager.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ metadata:
EOF

helm template -n cert-manager cert-manager jetstack/cert-manager --create-namespace --version "${cert_manager_version}" --set installCRDs=true > third_party/cert-manager/01-cert-manager.yaml
helm template -n cert-manager cert-manager jetstack/trust-manager --create-namespace --version "${trust_manager_version}" --set installCRDs=true > third_party/cert-manager/02-trust-manager.yaml
helm template -n cert-manager cert-manager jetstack/trust-manager --create-namespace --version "${trust_manager_version}" --set crds.enabled=true > third_party/cert-manager/02-trust-manager.yaml
}

update_cert_manager "v1.13.3" "v0.7.1"
update_cert_manager "v1.13.3" "v0.12.0"
10 changes: 10 additions & 0 deletions pkg/apis/eventing/v1alpha1/eventpolicy_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,14 @@ func (ep *EventPolicy) SetDefaults(ctx context.Context) {
}

func (ets *EventPolicySpec) SetDefaults(ctx context.Context) {
for i := range ets.From {
ets.From[i].SetDefaults(ctx)
}
}

func (from *EventPolicySpecFrom) SetDefaults(ctx context.Context) {
if from.Ref != nil && from.Ref.Namespace == "" {
// default to event policies namespace
from.Ref.Namespace = apis.ParentMeta(ctx).Namespace
}
}
32 changes: 32 additions & 0 deletions pkg/apis/eventing/v1alpha1/eventpolicy_defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"context"
"testing"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/google/go-cmp/cmp"
)

Expand All @@ -32,6 +34,36 @@ func TestEventPolicyDefaults(t *testing.T) {
initial: EventPolicy{},
expected: EventPolicy{},
},
"default .spec.from[].namespace": {
initial: EventPolicy{
ObjectMeta: metav1.ObjectMeta{
Namespace: "my-ns",
},
Spec: EventPolicySpec{
From: []EventPolicySpecFrom{
{
Ref: &EventPolicyFromReference{
Namespace: "",
},
},
},
},
},
expected: EventPolicy{
ObjectMeta: metav1.ObjectMeta{
Namespace: "my-ns",
},
Spec: EventPolicySpec{
From: []EventPolicySpecFrom{
{
Ref: &EventPolicyFromReference{
Namespace: "my-ns",
},
},
},
},
},
},
}
for n, tc := range testCases {
t.Run(n, func(t *testing.T) {
Expand Down
6 changes: 6 additions & 0 deletions pkg/auth/event_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package auth

import (
"fmt"
"sort"
"strings"

eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
Expand Down Expand Up @@ -93,6 +94,11 @@ func GetEventPoliciesForResource(lister listerseventingv1alpha1.EventPolicyListe
}
}

// Sort the policies by name to ensure deterministic order
sort.Slice(relevantPolicies, func(i, j int) bool {
return relevantPolicies[i].Name < relevantPolicies[j].Name
})

return relevantPolicies, nil
}

Expand Down
1 change: 1 addition & 0 deletions pkg/reconciler/broker/resources/eventpolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func MakeEventPolicyForBackingChannel(b *eventingv1.Broker, backingChannel *even
APIVersion: eventingv1.SchemeGroupVersion.String(),
Kind: brokerKind,
Name: b.Name,
UID: b.UID,
},
},
Labels: LabelsForBackingChannelsEventPolicy(b),
Expand Down
145 changes: 145 additions & 0 deletions pkg/reconciler/parallel/parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrs "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -36,6 +37,7 @@ import (
pkgreconciler "knative.dev/pkg/reconciler"

duckv1 "knative.dev/eventing/pkg/apis/duck/v1"
eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
"knative.dev/eventing/pkg/apis/feature"
v1 "knative.dev/eventing/pkg/apis/flows/v1"
messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1"
Expand Down Expand Up @@ -144,6 +146,11 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, p *v1.Parallel) pkgrecon
return fmt.Errorf("error removing unwanted Subscriptions: %w", err)
}

// Reconcile EventPolicies for the parallel.
if err := r.reconcileEventPolicies(ctx, p, ingressChannel, channels, filterSubs, featureFlags); err != nil {
return fmt.Errorf("failed to reconcile EventPolicies for Parallel: %w", err)
}

err := auth.UpdateStatusWithEventPolicies(featureFlags, &p.Status.AppliedEventPoliciesStatus, &p.Status, r.eventPolicyLister, v1.SchemeGroupVersion.WithKind("Parallel"), p.ObjectMeta)
if err != nil {
return fmt.Errorf("could not update parallel status with EventPolicies: %v", err)
Expand Down Expand Up @@ -349,3 +356,141 @@ func (r *Reconciler) removeUnwantedSubscriptions(ctx context.Context, p *v1.Para

return nil
}

func (r *Reconciler) reconcileEventPolicies(ctx context.Context, p *v1.Parallel, ingressChannel *duckv1.Channelable,
channels []*duckv1.Channelable, filterSubs []*messagingv1.Subscription, featureFlags feature.Flags) error {

if !featureFlags.IsOIDCAuthentication() {
return r.cleanupAllEventPolicies(ctx, p)
}
// list all the existing event policies for the parallel.
existingPolicies, err := r.listEventPoliciesForParallel(p)
if err != nil {
return fmt.Errorf("failed to list existing event policies for parallel: %w", err)
}
// make a map of existing event policies for easy and efficient lookup.
existingPolicyMap := make(map[string]*eventingv1alpha1.EventPolicy)
for _, policy := range existingPolicies {
existingPolicyMap[policy.Name] = policy
}

// prepare the list of event policies to create, update and delete.
var policiesToCreate, policiesToUpdate []*eventingv1alpha1.EventPolicy
policiesToDelete := make([]*eventingv1alpha1.EventPolicy, 0, len(existingPolicyMap))

for i, channel := range channels {
filterSub := filterSubs[i]
expectedPolicy := resources.MakeEventPolicyForParallelChannel(p, channel, filterSub)
if existingPolicy, ok := existingPolicyMap[expectedPolicy.Name]; ok {
if !equality.Semantic.DeepDerivative(expectedPolicy, existingPolicy) {
expectedPolicy.SetResourceVersion(existingPolicy.ResourceVersion)
policiesToUpdate = append(policiesToUpdate, expectedPolicy)
}
delete(existingPolicyMap, expectedPolicy.Name)
} else {
policiesToCreate = append(policiesToCreate, expectedPolicy)
}
}

// prepare the event policies for the ingress channel.
ingressChannelEventPolicies, err := r.prepareIngressChannelEventpolicies(p, ingressChannel)
if err != nil {
return fmt.Errorf("failed to prepare event policies for ingress channel: %w", err)
}

for _, policy := range ingressChannelEventPolicies {
if existingIngressChannelPolicy, ok := existingPolicyMap[policy.Name]; ok {
if !equality.Semantic.DeepDerivative(policy, existingIngressChannelPolicy) {
policy.SetResourceVersion(existingIngressChannelPolicy.ResourceVersion)
policiesToUpdate = append(policiesToUpdate, policy)
}
delete(existingPolicyMap, policy.Name)
} else {
policiesToCreate = append(policiesToCreate, policy)
}
}

// delete the remaining event policies in the map.
for _, policy := range existingPolicyMap {
policiesToDelete = append(policiesToDelete, policy)
}

// now that we have the list of event policies to create, update and delete, we can perform the operations.
if err := r.createEventPolicies(ctx, policiesToCreate); err != nil {
return fmt.Errorf("failed to create event policies: %w", err)
}
if err := r.updateEventPolicies(ctx, policiesToUpdate); err != nil {
return fmt.Errorf("failed to update event policies: %w", err)
}
if err := r.deleteEventPolicies(ctx, policiesToDelete); err != nil {
return fmt.Errorf("failed to delete event policies: %w", err)
}

return nil
}

func (r *Reconciler) createEventPolicies(ctx context.Context, policies []*eventingv1alpha1.EventPolicy) error {
for _, policy := range policies {
_, err := r.eventingClientSet.EventingV1alpha1().EventPolicies(policy.Namespace).Create(ctx, policy, metav1.CreateOptions{})
if err != nil {
return err
}
}
return nil
}

func (r *Reconciler) updateEventPolicies(ctx context.Context, policies []*eventingv1alpha1.EventPolicy) error {
for _, policy := range policies {
_, err := r.eventingClientSet.EventingV1alpha1().EventPolicies(policy.Namespace).Update(ctx, policy, metav1.UpdateOptions{})
if err != nil {
return err
}
}
return nil
}

func (r *Reconciler) deleteEventPolicies(ctx context.Context, policies []*eventingv1alpha1.EventPolicy) error {
for _, policy := range policies {
err := r.eventingClientSet.EventingV1alpha1().EventPolicies(policy.Namespace).Delete(ctx, policy.Name, metav1.DeleteOptions{})
if err != nil && !apierrs.IsNotFound(err) {
return err
}
}
return nil
}

func (r *Reconciler) prepareIngressChannelEventpolicies(p *v1.Parallel, ingressChannel *duckv1.Channelable) ([]*eventingv1alpha1.EventPolicy, error) {
applyingEventPoliciesForParallel, err := auth.GetEventPoliciesForResource(r.eventPolicyLister, v1.SchemeGroupVersion.WithKind("Parallel"), p.ObjectMeta)
if err != nil {
return nil, fmt.Errorf("could not get EventPolicies for Parallel %s/%s: %w", p.Namespace, p.Name, err)
}

if len(applyingEventPoliciesForParallel) == 0 {
return nil, nil
}

ingressChannelEventPolicies := make([]*eventingv1alpha1.EventPolicy, 0, len(applyingEventPoliciesForParallel))
for _, eventPolicy := range applyingEventPoliciesForParallel {
ingressChannelEventPolicies = append(ingressChannelEventPolicies, resources.MakeEventPolicyForParallelIngressChannel(p, ingressChannel, eventPolicy))
}

return ingressChannelEventPolicies, nil
}

func (r *Reconciler) cleanupAllEventPolicies(ctx context.Context, p *v1.Parallel) error {
// list all the event policies for the parallel.
eventPolicies, err := r.listEventPoliciesForParallel(p)
if err != nil {
return err
}
return r.deleteEventPolicies(ctx, eventPolicies)
}

// listEventPoliciesForParallel lists all EventPolicies (e.g. the policies for the input channel and the intermediate channels)
// created during reconcileKind that are associated with the given Parallel.
func (r *Reconciler) listEventPoliciesForParallel(p *v1.Parallel) ([]*eventingv1alpha1.EventPolicy, error) {
labelSelector := labels.SelectorFromSet(map[string]string{
resources.ParallelChannelEventPolicyLabelPrefix + "parallel-name": p.Name,
})
return r.eventPolicyLister.EventPolicies(p.Namespace).List(labelSelector)
}
Loading

0 comments on commit 6ad3213

Please sign in to comment.