Skip to content

Commit

Permalink
Add EventPolicy reconciliation for Sequence (#8106)
Browse files Browse the repository at this point in the history
* feat: initial commit

Signed-off-by: Leo Li <leoli@redhat.com>

* feat: add the test for eventpolicy in sequence reconciler

Signed-off-by: Leo Li <leoli@redhat.com>

* fix: fix the typo and remove the unused helper function

Signed-off-by: Leo Li <leoli@redhat.com>

* fix: trying to fix the git diff issue

Signed-off-by: Leo Li <leoli@redhat.com>

* fix: trying to fix the git diff issue

Signed-off-by: Leo Li <leoli@redhat.com>

* fix: fix the nit minor comments

Signed-off-by: Leo Li <leoli@redhat.com>

* fix: update the reconcilation mechanism

Signed-off-by: Leo Li <leoli@redhat.com>

* fix: fix the goimports and remove unused helper functions and input parameters

Signed-off-by: Leo Li <leoli@redhat.com>

* fix: add more unit tests to test out remove steps from the sequence

Signed-off-by: Leo Li <leoli@redhat.com>

* Update pkg/reconciler/sequence/sequence.go

Co-authored-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com>

* Update pkg/reconciler/sequence/sequence.go

Co-authored-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com>

* Apply suggestions from code review

Co-authored-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com>

* fix: fix the nit review comments from pierdipi and rahul

Signed-off-by: Leo Li <leoli@redhat.com>

* fix: using auth.GetEventPoliciesForResource when trying to list all Sequence's eventPolicy (not for the immediate channels)

Signed-off-by: Leo Li <leoli@redhat.com>

* feat: add the sorting to avoid flaky test when there are multiple eventplocies per sequence

Signed-off-by: Leo Li <leoli@redhat.com>

* feat: remove the nil condition for channel name when creating the sequence policy name

Signed-off-by: Leo Li <leoli@redhat.com>

* feat: add more unit tests

Signed-off-by: Leo Li <leoli@redhat.com>

* fix: lint & goimports

Signed-off-by: Leo Li <leoli@redhat.com>

* fix: fix the review comments

* fix: fix Christoph's review comments

Signed-off-by: Leo Li <leoli@redhat.com>

* feat: adding a test for sequence with existing intermediate eventpolicies requiring update and cleanup.

Signed-off-by: Leo Li <leoli@redhat.com>

* fix: the deepDerivative failed to compare the eventpolicy's From.Spec, change to reflect.DeepEqual

Signed-off-by: Leo Li <leoli@redhat.com>

* fix: change back to use DeepDerivative

Signed-off-by: Leo Li <leoli@redhat.com>

* fix: fix the test case to make the eventpolicy has a valid spec

Signed-off-by: Leo Li <leoli@redhat.com>

* fix: fix the flaky issue by soring the policies

Signed-off-by: Leo Li <leoli@redhat.com>

* fix: change input channel's ownerref to sequence's eventpolicy

---------

Signed-off-by: Leo Li <leoli@redhat.com>
Co-authored-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com>
  • Loading branch information
Leo6Leo and pierDipi authored Aug 9, 2024
1 parent ecb6c01 commit 5c81d76
Show file tree
Hide file tree
Showing 4 changed files with 2,360 additions and 1 deletion.
6 changes: 6 additions & 0 deletions pkg/auth/event_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package auth

import (
"fmt"
"sort"
"strings"

eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
Expand Down Expand Up @@ -93,6 +94,11 @@ func GetEventPoliciesForResource(lister listerseventingv1alpha1.EventPolicyListe
}
}

// Sort the policies by name to ensure deterministic order
sort.Slice(relevantPolicies, func(i, j int) bool {
return relevantPolicies[i].Name < relevantPolicies[j].Name
})

return relevantPolicies, nil
}

Expand Down
118 changes: 118 additions & 0 deletions pkg/reconciler/sequence/resources/eventpolicy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
Copyright 2024 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package resources

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
flowsv1 "knative.dev/eventing/pkg/apis/flows/v1"
messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1"
"knative.dev/pkg/kmeta"
)

const (
SequenceChannelEventPolicyLabelPrefix = "flows.knative.dev/"
sequenceKind = "Sequence"
subscriptionKind = "Subscription"
eventPolicyKind = "EventPolicy"
)

func MakeEventPolicyForSequenceChannel(s *flowsv1.Sequence, channel *eventingduckv1.Channelable, subscription *messagingv1.Subscription) *eventingv1alpha1.EventPolicy {
return &eventingv1alpha1.EventPolicy{
ObjectMeta: metav1.ObjectMeta{
Namespace: channel.Namespace,
Name: SequenceEventPolicyName(s.Name, channel.Name),
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: flowsv1.SchemeGroupVersion.String(),
Kind: sequenceKind,
Name: s.Name,
UID: s.UID,
},
},
Labels: LabelsForSequenceChannelsEventPolicy(s.Name),
},
Spec: eventingv1alpha1.EventPolicySpec{
To: []eventingv1alpha1.EventPolicySpecTo{
{
Ref: &eventingv1alpha1.EventPolicyToReference{
APIVersion: channel.APIVersion,
Kind: channel.Kind,
Name: channel.Name,
},
},
},
From: []eventingv1alpha1.EventPolicySpecFrom{
{
Ref: &eventingv1alpha1.EventPolicyFromReference{
APIVersion: messagingv1.SchemeGroupVersion.String(),
Kind: subscriptionKind,
Name: subscription.Name,
Namespace: subscription.Namespace,
},
},
},
},
}
}

func LabelsForSequenceChannelsEventPolicy(sequenceName string) map[string]string {
return map[string]string{
SequenceChannelEventPolicyLabelPrefix + "sequence-name": sequenceName,
}
}

func SequenceEventPolicyName(sequenceName, postfix string) string {

if postfix == "" {
return sequenceName
}
return kmeta.ChildName(sequenceName, "-"+postfix)

}

// MakeEventPolicyForSequenceInputChannel creates an EventPolicy for the input channel of a Sequence
func MakeEventPolicyForSequenceInputChannel(s *flowsv1.Sequence, inputChannel *eventingduckv1.Channelable, sequencePolicy *eventingv1alpha1.EventPolicy) *eventingv1alpha1.EventPolicy {
return &eventingv1alpha1.EventPolicy{
ObjectMeta: metav1.ObjectMeta{
Namespace: inputChannel.Namespace,
Name: SequenceEventPolicyName(s.Name, sequencePolicy.Name),
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: eventingv1alpha1.SchemeGroupVersion.String(),
Kind: eventPolicyKind,
Name: sequencePolicy.Name,
UID: sequencePolicy.UID,
},
},
Labels: LabelsForSequenceChannelsEventPolicy(s.Name),
},
Spec: eventingv1alpha1.EventPolicySpec{
To: []eventingv1alpha1.EventPolicySpecTo{
{
Ref: &eventingv1alpha1.EventPolicyToReference{
APIVersion: inputChannel.APIVersion,
Kind: inputChannel.Kind,
Name: inputChannel.Name,
},
},
},
From: sequencePolicy.Spec.From,
},
}
}
148 changes: 148 additions & 0 deletions pkg/reconciler/sequence/sequence.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrs "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -37,6 +38,7 @@ import (
"knative.dev/pkg/kmp"

eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
"knative.dev/eventing/pkg/apis/feature"
v1 "knative.dev/eventing/pkg/apis/flows/v1"
messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1"
Expand Down Expand Up @@ -130,6 +132,10 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, s *v1.Sequence) pkgrecon
return err
}

if err := r.reconcileEventPolicies(ctx, s, channels, subs, featureFlags); err != nil {
return fmt.Errorf("failed to reconcile EventPolicies: %w", err)
}

err := auth.UpdateStatusWithEventPolicies(featureFlags, &s.Status.AppliedEventPoliciesStatus, &s.Status, r.eventPolicyLister, v1.SchemeGroupVersion.WithKind("Sequence"), s.ObjectMeta)
if err != nil {
return fmt.Errorf("could not update Sequence status with EventPolicies: %v", err)
Expand Down Expand Up @@ -333,3 +339,145 @@ func (r *Reconciler) removeUnwantedSubscriptions(ctx context.Context, seq *v1.Se

return nil
}

func (r *Reconciler) reconcileEventPolicies(ctx context.Context, s *v1.Sequence, channels []*eventingduckv1.Channelable, subs []*messagingv1.Subscription, featureFlags feature.Flags) error {
if !featureFlags.IsOIDCAuthentication() {
return r.cleanupAllEventPolicies(ctx, s)
}

existingPolicies, err := r.listEventPoliciesForSequence(s)
if err != nil {
return fmt.Errorf("failed to list existing EventPolicies: %w", err)
}

// Prepare maps for efficient lookups, updates, and deletions of policies
existingPolicyMap := make(map[string]*eventingv1alpha1.EventPolicy)
for _, policy := range existingPolicies {
existingPolicyMap[policy.Name] = policy
}

// Prepare lists for different actions so that policies can be categorized
var policiesToUpdate, policiesToCreate []*eventingv1alpha1.EventPolicy
policiesToDelete := make([]*eventingv1alpha1.EventPolicy, 0, len(existingPolicies))

// Handle intermediate channel policies (skip the first channel as it's the input channel!)
for i := 1; i < len(channels); i++ {
expectedPolicy := resources.MakeEventPolicyForSequenceChannel(s, channels[i], subs[i-1])
existingPolicy, exists := existingPolicyMap[expectedPolicy.Name]

if exists {
if !equality.Semantic.DeepDerivative(expectedPolicy, existingPolicy) {
expectedPolicy.SetResourceVersion(existingPolicy.ResourceVersion)
policiesToUpdate = append(policiesToUpdate, expectedPolicy)
}
delete(existingPolicyMap, expectedPolicy.Name)
} else {
policiesToCreate = append(policiesToCreate, expectedPolicy)
}
}

// Handle input channel policies
inputPolicies, err := r.prepareInputChannelEventPolicy(s, channels[0])
if err != nil {
return fmt.Errorf("failed to prepare input channel EventPolicies: %w", err)
}
for _, inputPolicy := range inputPolicies {
existingInputPolicy, exists := existingPolicyMap[inputPolicy.Name]
if exists {
if !equality.Semantic.DeepDerivative(inputPolicy, existingInputPolicy) {
inputPolicy.SetResourceVersion(existingInputPolicy.ResourceVersion)
policiesToUpdate = append(policiesToUpdate, inputPolicy)
}
delete(existingPolicyMap, inputPolicy.Name)
} else {
policiesToCreate = append(policiesToCreate, inputPolicy)
}
}

// Any remaining policies in the map should be deleted
for _, policy := range existingPolicyMap {
policiesToDelete = append(policiesToDelete, policy)
}

// Perform the actual CRUD operations
if err := r.createEventPolicies(ctx, policiesToCreate); err != nil {
return fmt.Errorf("failed to create EventPolicies: %w", err)
}
if err := r.updateEventPolicies(ctx, policiesToUpdate); err != nil {
return fmt.Errorf("failed to update EventPolicies: %w", err)
}
if err := r.deleteEventPolicies(ctx, policiesToDelete); err != nil {
return fmt.Errorf("failed to delete EventPolicies: %w", err)
}

return nil
}

// listEventPoliciesForSequence lists all EventPolicies (e.g. the policies for the input channel and the intermediate channels) created during reconcileKind that are associated with the given Sequence.
func (r *Reconciler) listEventPoliciesForSequence(s *v1.Sequence) ([]*eventingv1alpha1.EventPolicy, error) {
labelSelector := labels.SelectorFromSet(map[string]string{
resources.SequenceChannelEventPolicyLabelPrefix + "sequence-name": s.Name,
})
return r.eventPolicyLister.EventPolicies(s.Namespace).List(labelSelector)
}

func (r *Reconciler) prepareInputChannelEventPolicy(s *v1.Sequence, inputChannel *eventingduckv1.Channelable) ([]*eventingv1alpha1.EventPolicy, error) {
matchingPolicies, err := auth.GetEventPoliciesForResource(
r.eventPolicyLister,
v1.SchemeGroupVersion.WithKind("Sequence"),
s.ObjectMeta,
)
if err != nil {
return nil, fmt.Errorf("failed to get matching EventPolicies for Sequence: %w", err)
}

if len(matchingPolicies) == 0 {
return nil, nil
}

inputChannelPolicies := make([]*eventingv1alpha1.EventPolicy, 0, len(matchingPolicies))
for _, policy := range matchingPolicies {
inputChannelPolicy := resources.MakeEventPolicyForSequenceInputChannel(s, inputChannel, policy)
inputChannelPolicies = append(inputChannelPolicies, inputChannelPolicy)
}

return inputChannelPolicies, nil
}

func (r *Reconciler) createEventPolicies(ctx context.Context, policies []*eventingv1alpha1.EventPolicy) error {
for _, policy := range policies {
_, err := r.eventingClientSet.EventingV1alpha1().EventPolicies(policy.Namespace).Create(ctx, policy, metav1.CreateOptions{})
if err != nil {
return err
}
}
return nil
}

func (r *Reconciler) updateEventPolicies(ctx context.Context, policies []*eventingv1alpha1.EventPolicy) error {
for _, policy := range policies {
_, err := r.eventingClientSet.EventingV1alpha1().EventPolicies(policy.Namespace).Update(ctx, policy, metav1.UpdateOptions{})
if err != nil {
return err
}
}
return nil
}

func (r *Reconciler) deleteEventPolicies(ctx context.Context, policies []*eventingv1alpha1.EventPolicy) error {
for _, policy := range policies {
err := r.eventingClientSet.EventingV1alpha1().EventPolicies(policy.Namespace).Delete(ctx, policy.Name, metav1.DeleteOptions{})
if err != nil && !apierrs.IsNotFound(err) {
return err
}
}
return nil
}

func (r *Reconciler) cleanupAllEventPolicies(ctx context.Context, s *v1.Sequence) error {
policies, err := r.listEventPoliciesForSequence(s)
if err != nil {
return fmt.Errorf("failed to list EventPolicies for cleanup: %w", err)
}
return r.deleteEventPolicies(ctx, policies)
}
Loading

0 comments on commit 5c81d76

Please sign in to comment.