Skip to content

Commit

Permalink
Move EventPolicyEventHandler to auth package
Browse files Browse the repository at this point in the history
  • Loading branch information
creydr committed Jun 20, 2024
1 parent 999f4fc commit ed231fd
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 116 deletions.
123 changes: 123 additions & 0 deletions pkg/auth/event_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package auth

import (
"fmt"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
"strings"

corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -86,6 +88,52 @@ func GetEventPoliciesForResource(lister listerseventingv1alpha1.EventPolicyListe
return relevantPolicies, nil
}

// GetApplyingResourcesOfEventPolicyForGK returns all applying resource names of GK of the given event policy.
// It returns only the names, as the resources are part of the same namespace as the event policy.
//
// This function is kind of the "inverse" of GetEventPoliciesForResource.
func GetApplyingResourcesOfEventPolicyForGK(eventPolicy *v1alpha1.EventPolicy, gk schema.GroupKind, gkIndexer cache.Indexer) ([]string, error) {
applyingResources := []string{}
for _, to := range eventPolicy.Spec.To {
if to.Ref != nil {
toGV, err := schema.ParseGroupVersion(to.Ref.APIVersion)
if err != nil {
return nil, fmt.Errorf("could not parse group version of %q: %w", to.Ref.APIVersion, err)
}

if strings.EqualFold(toGV.Group, gk.Group) &&
strings.EqualFold(to.Ref.Kind, gk.Kind) {

applyingResources = append(applyingResources, to.Ref.Name)
}
}

if to.Selector != nil {
selectorGV, err := schema.ParseGroupVersion(to.Selector.APIVersion)
if err != nil {
return nil, fmt.Errorf("could not parse group version of %q: %w", to.Selector.APIVersion, err)
}

if strings.EqualFold(selectorGV.Group, gk.Group) &&
strings.EqualFold(to.Selector.Kind, gk.Kind) {

selector, err := metav1.LabelSelectorAsSelector(to.Selector.LabelSelector)
if err != nil {
return nil, fmt.Errorf("could not parse label selector %v: %w", to.Selector.LabelSelector, err)
}

err = cache.ListAllByNamespace(gkIndexer, eventPolicy.Namespace, selector, func(i interface{}) {
applyingResources = append(applyingResources, i.(metav1.Object).GetName())
})
if err != nil {
return nil, fmt.Errorf("could not list resources of GK in %q namespace for selector %v: %w", eventPolicy.Namespace, selector, err)
}
}
}
}
return applyingResources, nil
}

// ResolveSubjects returns the OIDC service accounts names for the objects referenced in the EventPolicySpecFrom.
func ResolveSubjects(resolver *resolver.AuthenticatableResolver, eventPolicy *v1alpha1.EventPolicy) ([]string, error) {
allSAs := []string{}
Expand Down Expand Up @@ -145,3 +193,78 @@ func SubjectContained(sub string, allowedSubs []string) bool {

return false
}

func handleApplyingResourcesOfEventPolicy(eventPolicy *v1alpha1.EventPolicy, gk schema.GroupKind, indexer cache.Indexer, handlerFn func(key types.NamespacedName) error) error {
applyingResources, err := GetApplyingResourcesOfEventPolicyForGK(eventPolicy, gk, indexer)
if err != nil {
return fmt.Errorf("could not get applying resources of eventpolicy: %w", err)
}

for _, resourceName := range applyingResources {
err := handlerFn(types.NamespacedName{
Namespace: eventPolicy.Namespace,
Name: resourceName,
})

if err != nil {
return fmt.Errorf("could not handle resource %q: %w", resourceName, err)
}
}

return nil
}

// EventPolicyEventHandler returns an ResourceEventHandler, which passes the referencing resources of the EventPolicy
// to the enqueueFn if the EventPolicy was referencing or got updated and now is referencing the resource of the given GVK.
func EventPolicyEventHandler(indexer cache.Indexer, gk schema.GroupKind, enqueueFn func(key types.NamespacedName)) cache.ResourceEventHandler {

return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
eventPolicy, ok := obj.(*v1alpha1.EventPolicy)
if !ok {
return
}

handleApplyingResourcesOfEventPolicy(eventPolicy, gk, indexer, func(key types.NamespacedName) error {
enqueueFn(key)
return nil
})
},
UpdateFunc: func(oldObj, newObj interface{}) {
// Here we need to check if the old or the new EventPolicy was referencing the given GVK
oldEventPolicy, ok := oldObj.(*v1alpha1.EventPolicy)
if !ok {
return
}
newEventPolicy, ok := newObj.(*v1alpha1.EventPolicy)
if !ok {
return
}

// make sure, we handle the keys only once
toHandle := map[types.NamespacedName]struct{}{}
addToHandleList := func(key types.NamespacedName) error {
toHandle[key] = struct{}{}
return nil
}

handleApplyingResourcesOfEventPolicy(oldEventPolicy, gk, indexer, addToHandleList)
handleApplyingResourcesOfEventPolicy(newEventPolicy, gk, indexer, addToHandleList)

for k := range toHandle {
enqueueFn(k)
}
},
DeleteFunc: func(obj interface{}) {
eventPolicy, ok := obj.(*v1alpha1.EventPolicy)
if !ok {
return
}

handleApplyingResourcesOfEventPolicy(eventPolicy, gk, indexer, func(key types.NamespacedName) error {
enqueueFn(key)
return nil
})
},
}
}
113 changes: 0 additions & 113 deletions pkg/reconciler/eventpolicy.go

This file was deleted.

6 changes: 3 additions & 3 deletions pkg/reconciler/inmemorychannel/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ package controller

import (
"context"
"knative.dev/eventing/pkg/auth"

"github.com/kelseyhightower/envconfig"
"k8s.io/client-go/tools/cache"
messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1"
"knative.dev/eventing/pkg/reconciler"
kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
Expand Down Expand Up @@ -144,11 +144,11 @@ func NewController(
Handler: controller.HandleAll(globalResync),
})

imcGVK := messagingv1.SchemeGroupVersion.WithKind("InMemoryChannel")
imcGK := messagingv1.SchemeGroupVersion.WithKind("InMemoryChannel").GroupKind()

// Enqueue the InMemoryChannel, if we have an EventPolicy which was referencing
// or got updated and now is referencing the InMemoryChannel
eventPolicyInformer.Informer().AddEventHandler(reconciler.EventPolicyEventHandler(inmemorychannelInformer.Informer().GetIndexer(), imcGVK, impl.EnqueueKey))
eventPolicyInformer.Informer().AddEventHandler(auth.EventPolicyEventHandler(inmemorychannelInformer.Informer().GetIndexer(), imcGK, impl.EnqueueKey))

// Setup the watch on the config map of dispatcher config
configStore := config.NewEventDispatcherConfigStore(logging.FromContext(ctx))
Expand Down

0 comments on commit ed231fd

Please sign in to comment.