Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add EventPolicy reconciliation for Sequence #8106

Merged
merged 32 commits into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
fb45ef6
feat: initial commit
Leo6Leo Jul 19, 2024
3298698
Merge branch 'main' into Create-EventPolicies-for-Sequence
Leo6Leo Jul 22, 2024
043141a
feat: add the test for eventpolicy in sequence reconciler
Leo6Leo Jul 23, 2024
2c99a12
fix: fix the typo and remove the unused helper function
Leo6Leo Jul 23, 2024
1696769
fix: trying to fix the git diff issue
Leo6Leo Jul 24, 2024
0adb907
fix: trying to fix the git diff issue
Leo6Leo Jul 24, 2024
e82a1db
fix: fix the nit minor comments
Leo6Leo Jul 24, 2024
5e73f53
fix: update the reconcilation mechanism
Leo6Leo Jul 24, 2024
ca8b3ae
Merge branch 'main' into Create-EventPolicies-for-Sequence
Leo6Leo Jul 25, 2024
78a8b0a
fix: fix the goimports and remove unused helper functions and input p…
Leo6Leo Jul 25, 2024
fe72c3c
fix: add more unit tests to test out remove steps from the sequence
Leo6Leo Jul 26, 2024
f088cf8
Update pkg/reconciler/sequence/sequence.go
Leo6Leo Jul 26, 2024
22520ed
Update pkg/reconciler/sequence/sequence.go
Leo6Leo Jul 26, 2024
f048559
Apply suggestions from code review
Leo6Leo Jul 26, 2024
092f87e
fix: fix the nit review comments from pierdipi and rahul
Leo6Leo Jul 26, 2024
8126f8c
fix: using auth.GetEventPoliciesForResource when trying to list all S…
Leo6Leo Jul 26, 2024
2f23635
Merge remote-tracking branch 'upstream/main' into Create-EventPolicie…
Leo6Leo Jul 29, 2024
268c595
feat: add the sorting to avoid flaky test when there are multiple eve…
Leo6Leo Jul 31, 2024
38017b8
feat: remove the nil condition for channel name when creating the seq…
Leo6Leo Jul 31, 2024
fc8be57
feat: add more unit tests
Leo6Leo Jul 31, 2024
b7bfc92
fix: lint & goimports
Leo6Leo Jul 31, 2024
56b071b
Merge remote-tracking branch 'upstream/main' into Create-EventPolicie…
Leo6Leo Jul 31, 2024
dce2679
Merge remote-tracking branch 'upstream/main' into Create-EventPolicie…
Leo6Leo Aug 1, 2024
18373ca
Merge branch 'main' into Create-EventPolicies-for-Sequence
Leo6Leo Aug 7, 2024
5da98e0
fix: fix the review comments
Leo6Leo Aug 7, 2024
36a1cb3
fix: fix Christoph's review comments
Leo6Leo Aug 7, 2024
6042cbb
feat: adding a test for sequence with existing intermediate eventpoli…
Leo6Leo Aug 7, 2024
9d1e73b
fix: the deepDerivative failed to compare the eventpolicy's From.Spec…
Leo6Leo Aug 7, 2024
e6cddbe
fix: change back to use DeepDerivative
Leo6Leo Aug 8, 2024
cf4d257
fix: fix the test case to make the eventpolicy has a valid spec
Leo6Leo Aug 8, 2024
ad4df83
fix: fix the flaky issue by soring the policies
Leo6Leo Aug 8, 2024
6c8fa1a
fix: change input channel's ownerref to sequence's eventpolicy
Leo6Leo Aug 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pkg/auth/event_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package auth

import (
"fmt"
"sort"
"strings"

eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
Expand Down Expand Up @@ -93,6 +94,11 @@ func GetEventPoliciesForResource(lister listerseventingv1alpha1.EventPolicyListe
}
}

// Sort the policies by name to ensure deterministic order
sort.Slice(relevantPolicies, func(i, j int) bool {
return relevantPolicies[i].Name < relevantPolicies[j].Name
})

return relevantPolicies, nil
}

Expand Down
118 changes: 118 additions & 0 deletions pkg/reconciler/sequence/resources/eventpolicy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
Copyright 2024 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package resources

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
flowsv1 "knative.dev/eventing/pkg/apis/flows/v1"
messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1"
"knative.dev/pkg/kmeta"
)

const (
SequenceChannelEventPolicyLabelPrefix = "flows.knative.dev/"
sequenceKind = "Sequence"
subscriptionKind = "Subscription"
eventPolicyKind = "EventPolicy"
)

func MakeEventPolicyForSequenceChannel(s *flowsv1.Sequence, channel *eventingduckv1.Channelable, subscription *messagingv1.Subscription) *eventingv1alpha1.EventPolicy {
return &eventingv1alpha1.EventPolicy{
ObjectMeta: metav1.ObjectMeta{
Namespace: channel.Namespace,
Name: SequenceEventPolicyName(s.Name, channel.Name),
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: flowsv1.SchemeGroupVersion.String(),
Kind: sequenceKind,
Name: s.Name,
creydr marked this conversation as resolved.
Show resolved Hide resolved
UID: s.UID,
},
},
Labels: LabelsForSequenceChannelsEventPolicy(s.Name),
},
Spec: eventingv1alpha1.EventPolicySpec{
To: []eventingv1alpha1.EventPolicySpecTo{
{
Ref: &eventingv1alpha1.EventPolicyToReference{
APIVersion: channel.APIVersion,
Kind: channel.Kind,
Name: channel.Name,
},
},
},
From: []eventingv1alpha1.EventPolicySpecFrom{
{
Ref: &eventingv1alpha1.EventPolicyFromReference{
APIVersion: messagingv1.SchemeGroupVersion.String(),
Kind: subscriptionKind,
Name: subscription.Name,
Namespace: subscription.Namespace,
},
},
},
},
}
}

func LabelsForSequenceChannelsEventPolicy(sequenceName string) map[string]string {
return map[string]string{
SequenceChannelEventPolicyLabelPrefix + "sequence-name": sequenceName,
}
}

func SequenceEventPolicyName(sequenceName, postfix string) string {

if postfix == "" {
return sequenceName
}
return kmeta.ChildName(sequenceName, "-"+postfix)

}

// MakeEventPolicyForSequenceInputChannel creates an EventPolicy for the input channel of a Sequence
func MakeEventPolicyForSequenceInputChannel(s *flowsv1.Sequence, inputChannel *eventingduckv1.Channelable, sequencePolicy *eventingv1alpha1.EventPolicy) *eventingv1alpha1.EventPolicy {
return &eventingv1alpha1.EventPolicy{
ObjectMeta: metav1.ObjectMeta{
Namespace: inputChannel.Namespace,
Name: SequenceEventPolicyName(s.Name, sequencePolicy.Name),
creydr marked this conversation as resolved.
Show resolved Hide resolved
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: eventingv1alpha1.SchemeGroupVersion.String(),
Kind: eventPolicyKind,
Name: sequencePolicy.Name,
UID: sequencePolicy.UID,
},
},
Labels: LabelsForSequenceChannelsEventPolicy(s.Name),
},
Spec: eventingv1alpha1.EventPolicySpec{
To: []eventingv1alpha1.EventPolicySpecTo{
{
Ref: &eventingv1alpha1.EventPolicyToReference{
APIVersion: inputChannel.APIVersion,
Kind: inputChannel.Kind,
Name: inputChannel.Name,
},
},
},
From: sequencePolicy.Spec.From,
},
}
}
148 changes: 148 additions & 0 deletions pkg/reconciler/sequence/sequence.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrs "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -37,6 +38,7 @@ import (
"knative.dev/pkg/kmp"

eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
"knative.dev/eventing/pkg/apis/feature"
v1 "knative.dev/eventing/pkg/apis/flows/v1"
messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1"
Expand Down Expand Up @@ -130,6 +132,10 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, s *v1.Sequence) pkgrecon
return err
}

if err := r.reconcileEventPolicies(ctx, s, channels, subs, featureFlags); err != nil {
return fmt.Errorf("failed to reconcile EventPolicies: %w", err)
}

err := auth.UpdateStatusWithEventPolicies(featureFlags, &s.Status.AppliedEventPoliciesStatus, &s.Status, r.eventPolicyLister, v1.SchemeGroupVersion.WithKind("Sequence"), s.ObjectMeta)
if err != nil {
return fmt.Errorf("could not update Sequence status with EventPolicies: %v", err)
Expand Down Expand Up @@ -333,3 +339,145 @@ func (r *Reconciler) removeUnwantedSubscriptions(ctx context.Context, seq *v1.Se

return nil
}

func (r *Reconciler) reconcileEventPolicies(ctx context.Context, s *v1.Sequence, channels []*eventingduckv1.Channelable, subs []*messagingv1.Subscription, featureFlags feature.Flags) error {
if !featureFlags.IsOIDCAuthentication() {
return r.cleanupAllEventPolicies(ctx, s)
}

existingPolicies, err := r.listEventPoliciesForSequence(s)
if err != nil {
return fmt.Errorf("failed to list existing EventPolicies: %w", err)
}

// Prepare maps for efficient lookups, updates, and deletions of policies
existingPolicyMap := make(map[string]*eventingv1alpha1.EventPolicy)
for _, policy := range existingPolicies {
existingPolicyMap[policy.Name] = policy
}

// Prepare lists for different actions so that policies can be categorized
var policiesToUpdate, policiesToCreate []*eventingv1alpha1.EventPolicy
policiesToDelete := make([]*eventingv1alpha1.EventPolicy, 0, len(existingPolicies))

// Handle intermediate channel policies (skip the first channel as it's the input channel!)
for i := 1; i < len(channels); i++ {
expectedPolicy := resources.MakeEventPolicyForSequenceChannel(s, channels[i], subs[i-1])
existingPolicy, exists := existingPolicyMap[expectedPolicy.Name]

if exists {
if !equality.Semantic.DeepDerivative(expectedPolicy, existingPolicy) {
expectedPolicy.SetResourceVersion(existingPolicy.ResourceVersion)
policiesToUpdate = append(policiesToUpdate, expectedPolicy)
creydr marked this conversation as resolved.
Show resolved Hide resolved
}
delete(existingPolicyMap, expectedPolicy.Name)
} else {
policiesToCreate = append(policiesToCreate, expectedPolicy)
}
}

// Handle input channel policies
inputPolicies, err := r.prepareInputChannelEventPolicy(s, channels[0])
if err != nil {
return fmt.Errorf("failed to prepare input channel EventPolicies: %w", err)
}
for _, inputPolicy := range inputPolicies {
existingInputPolicy, exists := existingPolicyMap[inputPolicy.Name]
if exists {
if !equality.Semantic.DeepDerivative(inputPolicy, existingInputPolicy) {
inputPolicy.SetResourceVersion(existingInputPolicy.ResourceVersion)
policiesToUpdate = append(policiesToUpdate, inputPolicy)
creydr marked this conversation as resolved.
Show resolved Hide resolved
}
delete(existingPolicyMap, inputPolicy.Name)
} else {
policiesToCreate = append(policiesToCreate, inputPolicy)
}
}

// Any remaining policies in the map should be deleted
for _, policy := range existingPolicyMap {
policiesToDelete = append(policiesToDelete, policy)
}

// Perform the actual CRUD operations
if err := r.createEventPolicies(ctx, policiesToCreate); err != nil {
return fmt.Errorf("failed to create EventPolicies: %w", err)
}
if err := r.updateEventPolicies(ctx, policiesToUpdate); err != nil {
return fmt.Errorf("failed to update EventPolicies: %w", err)
}
if err := r.deleteEventPolicies(ctx, policiesToDelete); err != nil {
return fmt.Errorf("failed to delete EventPolicies: %w", err)
}

return nil
}

// listEventPoliciesForSequence lists all EventPolicies (e.g. the policies for the input channel and the intermediate channels) created during reconcileKind that are associated with the given Sequence.
func (r *Reconciler) listEventPoliciesForSequence(s *v1.Sequence) ([]*eventingv1alpha1.EventPolicy, error) {
labelSelector := labels.SelectorFromSet(map[string]string{
resources.SequenceChannelEventPolicyLabelPrefix + "sequence-name": s.Name,
Leo6Leo marked this conversation as resolved.
Show resolved Hide resolved
})
return r.eventPolicyLister.EventPolicies(s.Namespace).List(labelSelector)
}

func (r *Reconciler) prepareInputChannelEventPolicy(s *v1.Sequence, inputChannel *eventingduckv1.Channelable) ([]*eventingv1alpha1.EventPolicy, error) {
matchingPolicies, err := auth.GetEventPoliciesForResource(
r.eventPolicyLister,
v1.SchemeGroupVersion.WithKind("Sequence"),
s.ObjectMeta,
)
if err != nil {
return nil, fmt.Errorf("failed to get matching EventPolicies for Sequence: %w", err)
}

if len(matchingPolicies) == 0 {
return nil, nil
}

inputChannelPolicies := make([]*eventingv1alpha1.EventPolicy, 0, len(matchingPolicies))
for _, policy := range matchingPolicies {
inputChannelPolicy := resources.MakeEventPolicyForSequenceInputChannel(s, inputChannel, policy)
inputChannelPolicies = append(inputChannelPolicies, inputChannelPolicy)
}

return inputChannelPolicies, nil
}

func (r *Reconciler) createEventPolicies(ctx context.Context, policies []*eventingv1alpha1.EventPolicy) error {
for _, policy := range policies {
_, err := r.eventingClientSet.EventingV1alpha1().EventPolicies(policy.Namespace).Create(ctx, policy, metav1.CreateOptions{})
if err != nil {
return err
}
}
return nil
}

func (r *Reconciler) updateEventPolicies(ctx context.Context, policies []*eventingv1alpha1.EventPolicy) error {
for _, policy := range policies {
_, err := r.eventingClientSet.EventingV1alpha1().EventPolicies(policy.Namespace).Update(ctx, policy, metav1.UpdateOptions{})
if err != nil {
return err
}
}
return nil
}

func (r *Reconciler) deleteEventPolicies(ctx context.Context, policies []*eventingv1alpha1.EventPolicy) error {
for _, policy := range policies {
err := r.eventingClientSet.EventingV1alpha1().EventPolicies(policy.Namespace).Delete(ctx, policy.Name, metav1.DeleteOptions{})
if err != nil && !apierrs.IsNotFound(err) {
return err
}
}
return nil
}

func (r *Reconciler) cleanupAllEventPolicies(ctx context.Context, s *v1.Sequence) error {
policies, err := r.listEventPoliciesForSequence(s)
if err != nil {
return fmt.Errorf("failed to list EventPolicies for cleanup: %w", err)
}
return r.deleteEventPolicies(ctx, policies)
}
Loading
Loading