Skip to content

Commit

Permalink
Copy channels EventPolicies to backing channel
Browse files Browse the repository at this point in the history
  • Loading branch information
creydr committed Jun 24, 2024
1 parent 7891138 commit 1373264
Show file tree
Hide file tree
Showing 7 changed files with 310 additions and 45 deletions.
92 changes: 92 additions & 0 deletions pkg/reconciler/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ import (
"context"
"fmt"

"knative.dev/eventing/pkg/reconciler/channel/resources"

"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/labels"
eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
eventingclientset "knative.dev/eventing/pkg/client/clientset/versioned"

"go.uber.org/zap"
apierrs "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
Expand Down Expand Up @@ -52,6 +59,8 @@ type Reconciler struct {
dynamicClientSet dynamic.Interface

eventPolicyLister eventingv1alpha1listers.EventPolicyLister

eventingClientSet eventingclientset.Interface
}

// Check that our Reconciler implements Interface
Expand Down Expand Up @@ -108,9 +117,92 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, c *v1.Channel) pkgreconc
return fmt.Errorf("could not update channel status with EventPolicies: %v", err)
}

err = r.reconcileBackingChannelEventPolicies(ctx, c, backingChannel)
if err != nil {
return fmt.Errorf("could not reconcile backing channels (%s/%s) event policies: %w", backingChannel.Namespace, backingChannel.Name, err)
}

return nil
}

func (r *Reconciler) reconcileBackingChannelEventPolicies(ctx context.Context, channel *v1.Channel, backingChannel *eventingduckv1.Channelable) error {
applyingEventPoliciesForChannel, err := auth.GetEventPoliciesForResource(r.eventPolicyLister, v1.SchemeGroupVersion.WithKind("Channel"), channel.ObjectMeta)
if err != nil {
return fmt.Errorf("could not get applying EventPolicies for for channel %s/%s: %w", channel.Namespace, channel.Name, err)
}

for _, policy := range applyingEventPoliciesForChannel {
err := r.reconcileBackingChannelEventPolicy(ctx, backingChannel, policy)
if err != nil {
return fmt.Errorf("could not reconcile EventPolicy %s/%s for backing channel %s/%s: %w", policy.Namespace, policy.Name, backingChannel.Namespace, backingChannel.Name, err)
}
}

// Check, if we have old EP for the backing channel, which are not relevant anymore
applyingEventPoliciesForBackingChannel, err := auth.GetEventPoliciesForResource(r.eventPolicyLister, backingChannel.GroupVersionKind(), backingChannel.ObjectMeta)
if err != nil {
return fmt.Errorf("could not get applying EventPolicies for for backing channel %s/%s: %w", channel.Namespace, channel.Name, err)
}

selector, err := labels.ValidatedSelectorFromSet(resources.LabelsForBackingChannelsEventPolicy(backingChannel))
if err != nil {
return fmt.Errorf("could not get valid selector for backing channels EventPolicy %s/%s: %w", backingChannel.Namespace, backingChannel.Name, err)
}

existingEventPoliciesForBackingChannel, err := r.eventPolicyLister.EventPolicies(backingChannel.Namespace).List(selector)
if err != nil {
return fmt.Errorf("could not get existing EventPolicies in backing channels namespace %q: %w", backingChannel.Namespace, err)
}

for _, policy := range existingEventPoliciesForBackingChannel {
if !r.containsPolicy(policy.Name, applyingEventPoliciesForBackingChannel) {

// the existing policy is not in the list of applying policies anymore --> is outdated --> delete it
err := r.eventingClientSet.EventingV1alpha1().EventPolicies(policy.Namespace).Delete(ctx, policy.Name, metav1.DeleteOptions{})
if err != nil && apierrs.IsNotFound(err) {
return fmt.Errorf("could not delete old EventPolicy %s/%s: %w", policy.Namespace, policy.Name, err)
}
}
}

return nil
}

func (r *Reconciler) reconcileBackingChannelEventPolicy(ctx context.Context, backingChannel *eventingduckv1.Channelable, eventpolicy *eventingv1alpha1.EventPolicy) error {
expected := resources.MakeEventPolicyForBackingChannel(backingChannel, eventpolicy)

foundEP, err := r.eventPolicyLister.EventPolicies(expected.Namespace).Get(expected.Name)
if apierrs.IsNotFound(err) {
_, err := r.eventingClientSet.EventingV1alpha1().EventPolicies(expected.Namespace).Create(ctx, expected, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("could not create EventPolicy %s/%s: %w", expected.Namespace, expected.Name, err)
}
} else if err != nil {
return fmt.Errorf("could not get EventPolicy %s/%s: %w", expected.Namespace, expected.Name, err)
} else if r.policyNeedsUpdate(foundEP, expected) {
expected.SetResourceVersion(foundEP.ResourceVersion)
_, err := r.eventingClientSet.EventingV1alpha1().EventPolicies(expected.Namespace).Update(ctx, expected, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("could not update EventPolicy %s/%s: %w", expected.Namespace, expected.Name, err)
}
}

return nil
}

func (r *Reconciler) containsPolicy(name string, policies []*eventingv1alpha1.EventPolicy) bool {
for _, policy := range policies {
if policy.Name == name {
return true
}
}
return false
}

func (r *Reconciler) policyNeedsUpdate(foundEP, expected *eventingv1alpha1.EventPolicy) bool {
return !equality.Semantic.DeepDerivative(expected, foundEP)
}

// reconcileBackingChannel reconciles Channel's 'c' underlying CRD channel.
func (r *Reconciler) reconcileBackingChannel(ctx context.Context, channelResourceInterface dynamic.ResourceInterface, c *v1.Channel, backingChannelObjRef duckv1.KReference) (*eventingduckv1.Channelable, error) {
logger := logging.FromContext(ctx)
Expand Down
173 changes: 134 additions & 39 deletions pkg/reconciler/channel/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"fmt"
"testing"

"knative.dev/eventing/pkg/apis/eventing/v1alpha1"
v1 "knative.dev/eventing/pkg/apis/messaging/v1"

corev1 "k8s.io/api/core/v1"
Expand All @@ -29,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/types"
clientgotesting "k8s.io/client-go/testing"
"k8s.io/utils/pointer"
fakeeventingclient "knative.dev/eventing/pkg/client/injection/client/fake"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
v1addr "knative.dev/pkg/client/injection/ducks/duck/v1/addressable"
Expand All @@ -41,7 +41,6 @@ import (
"knative.dev/pkg/tracker"

eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
fakeeventingclient "knative.dev/eventing/pkg/client/injection/client/fake"
"knative.dev/eventing/pkg/client/injection/ducks/duck/v1/channelable"
channelreconciler "knative.dev/eventing/pkg/client/injection/reconciler/messaging/v1/channel"
"knative.dev/eventing/pkg/duck"
Expand Down Expand Up @@ -73,6 +72,12 @@ var (
Version: "v1",
Kind: "Channel",
}

imcV1GVK = metav1.GroupVersionKind{
Group: "messaging.knative.dev",
Version: "v1",
Kind: "InMemoryChannel",
}
)

func TestReconcile(t *testing.T) {
Expand Down Expand Up @@ -348,6 +353,24 @@ func TestReconcile(t *testing.T) {
WithReadyEventPolicyCondition,
WithEventPolicyToRef(channelV1GVK, channelName),
),
NewEventPolicy(fmt.Sprintf("%s-%s", readyEventPolicyName, channelName), testNS,
WithEventPolicyToRef(imcV1GVK, channelName),
WithEventPolicyOwnerReferences([]metav1.OwnerReference{
{
APIVersion: v1.SchemeGroupVersion.String(),
Kind: "InMemoryChannel",
Name: channelName,
}, {
Name: readyEventPolicyName,
},
}...),
WithEventPolicyLabels(map[string]string{
"messaging.knative.dev/channel-group": v1.SchemeGroupVersion.Group,
"messaging.knative.dev/channel-version": v1.SchemeGroupVersion.Version,
"messaging.knative.dev/channel-kind": "InMemoryChannel",
"messaging.knative.dev/channel-name": channelName,
}),
),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: NewChannel(channelName, testNS,
Expand Down Expand Up @@ -380,6 +403,24 @@ func TestReconcile(t *testing.T) {
WithUnreadyEventPolicyCondition,
WithEventPolicyToRef(channelV1GVK, channelName),
),
NewEventPolicy(fmt.Sprintf("%s-%s", unreadyEventPolicyName, channelName), testNS,
WithEventPolicyToRef(imcV1GVK, channelName),
WithEventPolicyOwnerReferences([]metav1.OwnerReference{
{
APIVersion: v1.SchemeGroupVersion.String(),
Kind: "InMemoryChannel",
Name: channelName,
}, {
Name: unreadyEventPolicyName,
},
}...),
WithEventPolicyLabels(map[string]string{
"messaging.knative.dev/channel-group": v1.SchemeGroupVersion.Group,
"messaging.knative.dev/channel-version": v1.SchemeGroupVersion.Version,
"messaging.knative.dev/channel-kind": "InMemoryChannel",
"messaging.knative.dev/channel-name": channelName,
}),
),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: NewChannel(channelName, testNS,
Expand Down Expand Up @@ -415,6 +456,42 @@ func TestReconcile(t *testing.T) {
WithUnreadyEventPolicyCondition,
WithEventPolicyToRef(channelV1GVK, channelName),
),
NewEventPolicy(fmt.Sprintf("%s-%s", readyEventPolicyName, channelName), testNS,
WithEventPolicyToRef(imcV1GVK, channelName),
WithEventPolicyOwnerReferences([]metav1.OwnerReference{
{
APIVersion: v1.SchemeGroupVersion.String(),
Kind: "InMemoryChannel",
Name: channelName,
}, {
Name: readyEventPolicyName,
},
}...),
WithEventPolicyLabels(map[string]string{
"messaging.knative.dev/channel-group": v1.SchemeGroupVersion.Group,
"messaging.knative.dev/channel-version": v1.SchemeGroupVersion.Version,
"messaging.knative.dev/channel-kind": "InMemoryChannel",
"messaging.knative.dev/channel-name": channelName,
}),
),
NewEventPolicy(fmt.Sprintf("%s-%s", unreadyEventPolicyName, channelName), testNS,
WithEventPolicyToRef(imcV1GVK, channelName),
WithEventPolicyOwnerReferences([]metav1.OwnerReference{
{
APIVersion: v1.SchemeGroupVersion.String(),
Kind: "InMemoryChannel",
Name: channelName,
}, {
Name: unreadyEventPolicyName,
},
}...),
WithEventPolicyLabels(map[string]string{
"messaging.knative.dev/channel-group": v1.SchemeGroupVersion.Group,
"messaging.knative.dev/channel-version": v1.SchemeGroupVersion.Version,
"messaging.knative.dev/channel-kind": "InMemoryChannel",
"messaging.knative.dev/channel-name": channelName,
}),
),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: NewChannel(channelName, testNS,
Expand All @@ -427,6 +504,60 @@ func TestReconcile(t *testing.T) {
WithChannelEventPoliciesNotReady("EventPoliciesNotReady", fmt.Sprintf("event policies %s are not ready", unreadyEventPolicyName)),
WithChannelEventPoliciesListed(readyEventPolicyName)),
}},
}, {
Name: "should create EventPolicies for backing channel",
Key: testKey,
Objects: []runtime.Object{
NewChannel(channelName, testNS,
WithChannelTemplate(channelCRD()),
WithInitChannelConditions,
WithChannelEventPoliciesReady(),
WithChannelEventPoliciesListed(readyEventPolicyName)),
NewInMemoryChannel(channelName, testNS,
WithInitInMemoryChannelConditions,
WithInMemoryChannelDeploymentReady(),
WithInMemoryChannelServiceReady(),
WithInMemoryChannelEndpointsReady(),
WithInMemoryChannelChannelServiceReady(),
WithInMemoryChannelAddress(backingChannelAddressable),
WithInMemoryChannelDLSUnknown(),
WithInMemoryChannelEventPoliciesReady()),
NewEventPolicy(readyEventPolicyName, testNS,
WithReadyEventPolicyCondition,
WithEventPolicyToRef(channelV1GVK, channelName),
),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: NewChannel(channelName, testNS,
WithChannelTemplate(channelCRD()),
WithInitChannelConditions,
WithBackingChannelObjRef(backingChannelObjRef()),
WithBackingChannelReady,
WithChannelDLSUnknown(),
WithChannelAddress(&backingChannelAddressable),
WithChannelEventPoliciesReady(),
WithChannelEventPoliciesListed(readyEventPolicyName)),
}},
WantCreates: []runtime.Object{
NewEventPolicy(fmt.Sprintf("%s-%s", readyEventPolicyName, channelName), testNS,
WithEventPolicyToRef(imcV1GVK, channelName),
WithEventPolicyOwnerReferences([]metav1.OwnerReference{
{
APIVersion: v1.SchemeGroupVersion.String(),
Kind: "InMemoryChannel",
Name: channelName,
}, {
Name: readyEventPolicyName,
},
}...),
WithEventPolicyLabels(map[string]string{
"messaging.knative.dev/channel-group": v1.SchemeGroupVersion.Group,
"messaging.knative.dev/channel-version": v1.SchemeGroupVersion.Version,
"messaging.knative.dev/channel-kind": "InMemoryChannel",
"messaging.knative.dev/channel-name": channelName,
}),
),
},
}}

logger := logtesting.TestLogger(t)
Expand All @@ -438,6 +569,7 @@ func TestReconcile(t *testing.T) {
channelLister: listers.GetMessagingChannelLister(),
channelableTracker: &fakeListableTracker{duck.NewListableTrackerFromTracker(ctx, channelable.Get, tracker.New(func(types.NamespacedName) {}, 0))},
eventPolicyLister: listers.GetEventPolicyLister(),
eventingClientSet: fakeeventingclient.Get(ctx),
}
return channelreconciler.NewReconciler(ctx, logger,
fakeeventingclient.Get(ctx), listers.GetMessagingChannelLister(),
Expand Down Expand Up @@ -597,40 +729,3 @@ func createChannel(namespace, name string, ready bool) *unstructured.Unstructure
},
}
}

func makeEventPolicy(eventPolicyName string) *v1alpha1.EventPolicy {
return &v1alpha1.EventPolicy{
TypeMeta: metav1.TypeMeta{
APIVersion: "eventing.knative.dev/v1alpha1",
Kind: "EventPolicy",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: testNS,
Name: eventPolicyName,
},
Spec: v1alpha1.EventPolicySpec{
To: []v1alpha1.EventPolicySpecTo{
{
Ref: &v1alpha1.EventPolicyToReference{
APIVersion: v1.SchemeGroupVersion.String(),
Kind: "Channel",
Name: channelName,
},
},
},
},
Status: v1alpha1.EventPolicyStatus{},
}
}

func makeReadyEventPolicy() *v1alpha1.EventPolicy {
policy := makeEventPolicy(readyEventPolicyName)
policy.Status.Conditions = []apis.Condition{{Type: v1alpha1.EventPolicyConditionReady, Status: corev1.ConditionTrue}}
return policy
}

func makeUnreadyEventPolicy() *v1alpha1.EventPolicy {
policy := makeEventPolicy(unreadyEventPolicyName)
policy.Status.Conditions = []apis.Condition{{Type: v1alpha1.EventPolicyConditionReady, Status: corev1.ConditionFalse}}
return policy
}
2 changes: 2 additions & 0 deletions pkg/reconciler/channel/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"knative.dev/pkg/controller"
"knative.dev/pkg/injection/clients/dynamicclient"

eventingclient "knative.dev/eventing/pkg/client/injection/client"
"knative.dev/eventing/pkg/client/injection/ducks/duck/v1/channelable"
"knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy"
channelinformer "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/channel"
Expand All @@ -49,6 +50,7 @@ func NewController(
dynamicClientSet: dynamicclient.Get(ctx),
channelLister: channelInformer.Lister(),
eventPolicyLister: eventPolicyInformer.Lister(),
eventingClientSet: eventingclient.Get(ctx),
}

var globalResync func()
Expand Down
3 changes: 2 additions & 1 deletion pkg/reconciler/channel/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ limitations under the License.
package channel

import (
"testing"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"knative.dev/eventing/pkg/apis/feature"
"testing"

"knative.dev/pkg/configmap"

Expand Down
Loading

0 comments on commit 1373264

Please sign in to comment.