Skip to content

Commit

Permalink
Use filtered informer to watch OIDC service accounts (#3719)
Browse files Browse the repository at this point in the history
* Changes in pkg/reconciler/trigger based on #7527

* codegen updated

* fixing imported auth path

* made fixes

* bug fixes

* fixed setup fake context lines

* added label selector

* linting

* comitting changes to pass tests in
/trigger_finalizer_test.go

* Reconcile trigger on OIDC service account changes only, if SA references a trigger for correct broker class

* Run goimport and gofmt

* Namespaced broker: Reconcile trigger on OIDC service account changes only, if SA references a trigger for correct broker class

* Remove unneeded comments

* Remove one more of unneeded comments

* Use correct BrokerClass for NamespacedBroker SA filter

Co-authored-by: Calum Murray <cmurray@redhat.com>

---------

Co-authored-by: yijie-04 <yijiewang0806@gmail.com>
Co-authored-by: Christoph Stäbler <cstabler@redhat.com>
Co-authored-by: Calum Murray <cmurray@redhat.com>
  • Loading branch information
4 people committed May 7, 2024
1 parent 75fee8a commit 6c4665c
Show file tree
Hide file tree
Showing 6 changed files with 262 additions and 20 deletions.
43 changes: 38 additions & 5 deletions control-plane/pkg/reconciler/trigger/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@ package trigger
import (
"context"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"go.uber.org/zap"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/tools/cache"
"knative.dev/eventing/pkg/auth"
v1 "knative.dev/eventing/pkg/client/informers/externalversions/eventing/v1"
kubeclient "knative.dev/pkg/client/injection/kube/client"
configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap"
Expand All @@ -44,7 +48,8 @@ import (
triggerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger"
triggerreconciler "knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1/trigger"
eventinglisters "knative.dev/eventing/pkg/client/listers/eventing/v1"
serviceaccountinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount"

serviceaccountinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount/filtered"

"knative.dev/eventing-kafka-broker/control-plane/pkg/config"
"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka"
Expand All @@ -65,7 +70,7 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf
brokerInformer := brokerinformer.Get(ctx)
triggerInformer := triggerinformer.Get(ctx)
triggerLister := triggerInformer.Lister()
serviceaccountInformer := serviceaccountinformer.Get(ctx)
oidcServiceaccountInformer := serviceaccountinformer.Get(ctx, auth.OIDCLabelSelector)

clientPool := clientpool.Get(ctx)

Expand Down Expand Up @@ -95,7 +100,7 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf
GetKafkaClient: clientPool.GetClient,
GetKafkaClusterAdmin: clientPool.GetClusterAdmin,
InitOffsetsFunc: offset.InitOffsets,
ServiceAccountLister: serviceaccountInformer.Lister(),
ServiceAccountLister: oidcServiceaccountInformer.Lister(),
}

impl := triggerreconciler.NewImpl(ctx, reconciler, func(impl *controller.Impl) controller.Options {
Expand Down Expand Up @@ -153,8 +158,8 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf
secretinformer.Get(ctx).Informer().AddEventHandler(controller.HandleAll(reconciler.Tracker.OnChanged))

// Reconciler Trigger when the OIDC service account changes
serviceaccountInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.FilterController(&eventing.Trigger{}),
oidcServiceaccountInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: filterOIDCServiceAccounts(triggerInformer.Lister(), brokerInformer.Lister(), kafka.BrokerClass, FinalizerName),
Handler: controller.HandleAll(impl.EnqueueControllerOf),
})

Expand Down Expand Up @@ -182,6 +187,34 @@ func filterTriggers(lister eventinglisters.BrokerLister, brokerClass string, fin
}
}

// filterOIDCServiceAccounts returns a function that returns true if the resource passed
// is a service account, which is owned by a trigger pointing to a the given broker class.
func filterOIDCServiceAccounts(triggerLister eventinglisters.TriggerLister, brokerLister eventinglisters.BrokerLister, brokerClass string, finalizer string) func(interface{}) bool {
return func(obj interface{}) bool {
controlledByTrigger := controller.FilterController(&eventing.Trigger{})(obj)
if !controlledByTrigger {
return false
}

sa, ok := obj.(*corev1.ServiceAccount)
if !ok {
return false
}

owner := metav1.GetControllerOf(sa)
if owner == nil {
return false
}

trigger, err := triggerLister.Triggers(sa.Namespace).Get(owner.Name)
if err != nil {
return false
}

return filterTriggers(brokerLister, brokerClass, finalizer)(trigger)
}
}

func hasKafkaBrokerTriggerFinalizer(finalizers []string, finalizerName string) bool {
for _, f := range finalizers {
if f == finalizerName {
Expand Down
183 changes: 179 additions & 4 deletions control-plane/pkg/reconciler/trigger/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,15 @@
package trigger

import (
"context"
"testing"

triggerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger"
"knative.dev/pkg/ptr"

"knative.dev/eventing/pkg/auth"
filteredFactory "knative.dev/pkg/client/injection/kube/informers/factory/filtered"

"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -27,7 +34,8 @@ import (
_ "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap/fake"
_ "knative.dev/pkg/client/injection/kube/informers/core/v1/pod/fake"
_ "knative.dev/pkg/client/injection/kube/informers/core/v1/secret/fake"
_ "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount/fake"
_ "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount/filtered/fake"
_ "knative.dev/pkg/client/injection/kube/informers/factory/filtered/fake"
"knative.dev/pkg/configmap"
reconcilertesting "knative.dev/pkg/reconciler/testing"

Expand All @@ -42,8 +50,7 @@ import (
)

func TestNewController(t *testing.T) {
ctx, _ := reconcilertesting.SetupFakeContext(t)

ctx, _ := reconcilertesting.SetupFakeContext(t, SetUpInformerSelector)
ctx = clientpool.WithKafkaClientPool(ctx)

controller := NewController(ctx, configmap.NewStaticWatcher(&corev1.ConfigMap{
Expand All @@ -60,8 +67,13 @@ func TestNewController(t *testing.T) {
}
}

func SetUpInformerSelector(ctx context.Context) context.Context {
ctx = filteredFactory.WithSelectors(ctx, auth.OIDCLabelSelector)
return ctx
}

func TestFilterTriggers(t *testing.T) {
ctx, _ := reconcilertesting.SetupFakeContext(t)
ctx, _ := reconcilertesting.SetupFakeContext(t, SetUpInformerSelector)

tt := []struct {
name string
Expand Down Expand Up @@ -184,3 +196,166 @@ func TestFilterTriggers(t *testing.T) {
})
}
}

func TestFilterOIDCServiceAccounts(t *testing.T) {
ctx, _ := reconcilertesting.SetupFakeContext(t, SetUpInformerSelector)

tt := []struct {
name string
sa *corev1.ServiceAccount
trigger *eventing.Trigger
brokers []*eventing.Broker
pass bool
}{{
name: "matching owner reference",
sa: &corev1.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "sa",
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: eventing.SchemeGroupVersion.String(),
Kind: "Trigger",
Name: "tr",
Controller: ptr.Bool(true),
},
},
},
},
trigger: &eventing.Trigger{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "tr",
Finalizers: []string{FinalizerName},
},
Spec: eventing.TriggerSpec{
Broker: "br",
},
},
brokers: []*eventing.Broker{{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "br",
Annotations: map[string]string{
eventing.BrokerClassAnnotationKey: kafka.BrokerClass,
},
},
}},
pass: true,
}, {
name: "references trigger for wrong broker class",
sa: &corev1.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "sa",
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: eventing.SchemeGroupVersion.String(),
Kind: "Trigger",
Name: "tr",
Controller: ptr.Bool(true),
},
},
},
},
trigger: &eventing.Trigger{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "tr",
},
Spec: eventing.TriggerSpec{
Broker: "br",
},
},
brokers: []*eventing.Broker{{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "br",
Annotations: map[string]string{
eventing.BrokerClassAnnotationKey: "another-broker-class",
},
},
}},
pass: false,
}, {
name: "references trigger with correct finalizer",
sa: &corev1.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "sa",
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: eventing.SchemeGroupVersion.String(),
Kind: "Trigger",
Name: "tr",
Controller: ptr.Bool(true),
},
},
},
},
trigger: &eventing.Trigger{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "tr",
Finalizers: []string{FinalizerName},
},
Spec: eventing.TriggerSpec{
Broker: "br",
},
},
brokers: []*eventing.Broker{{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "br",
},
}},
pass: true,
}, {
name: "no owner reference",
sa: &corev1.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "sa",
},
},
trigger: &eventing.Trigger{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "tr",
Finalizers: []string{FinalizerName},
},
Spec: eventing.TriggerSpec{
Broker: "br",
},
},
brokers: []*eventing.Broker{{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "br",
Annotations: map[string]string{
eventing.BrokerClassAnnotationKey: kafka.BrokerClass,
},
},
}},
pass: false,
}}

for _, tc := range tt {
tc := tc
t.Run(tc.name, func(t *testing.T) {
brokerInformer := brokerinformer.Get(ctx)
for _, obj := range tc.brokers {
err := brokerInformer.Informer().GetStore().Add(obj)
assert.NoError(t, err)
}

triggerInformer := triggerinformer.Get(ctx)
err := triggerInformer.Informer().GetStore().Add(tc.trigger)
assert.NoError(t, err)

filter := filterOIDCServiceAccounts(triggerInformer.Lister(), brokerInformer.Lister(), kafka.BrokerClass, FinalizerName)
pass := filter(tc.sa)
assert.Equal(t, tc.pass, pass)
})
}
}
14 changes: 8 additions & 6 deletions control-plane/pkg/reconciler/trigger/namespaced_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,19 @@ import (
"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/offset"

"k8s.io/client-go/tools/cache"
"knative.dev/eventing/pkg/auth"
kubeclient "knative.dev/pkg/client/injection/kube/client"
configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap"
podinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod"
secretinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/secret"
serviceaccountinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount"

serviceaccountinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount/filtered"

"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
"knative.dev/pkg/resolver"

eventing "knative.dev/eventing/pkg/apis/eventing/v1"
"knative.dev/eventing/pkg/apis/feature"
eventingclient "knative.dev/eventing/pkg/client/injection/client"
brokerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker"
Expand All @@ -60,7 +62,7 @@ func NewNamespacedController(ctx context.Context, watcher configmap.Watcher, con
brokerInformer := brokerinformer.Get(ctx)
triggerInformer := triggerinformer.Get(ctx)
triggerLister := triggerInformer.Lister()
serviceaccountInformer := serviceaccountinformer.Get(ctx)
oidcServiceaccountInformer := serviceaccountinformer.Get(ctx, auth.OIDCLabelSelector)

clientPool := clientpool.Get(ctx)

Expand All @@ -82,7 +84,7 @@ func NewNamespacedController(ctx context.Context, watcher configmap.Watcher, con
},
BrokerLister: brokerInformer.Lister(),
ConfigMapLister: configmapInformer.Lister(),
ServiceAccountLister: serviceaccountInformer.Lister(),
ServiceAccountLister: oidcServiceaccountInformer.Lister(),
EventingClient: eventingclient.Get(ctx),
Env: configs,
GetKafkaClient: clientPool.GetClient,
Expand Down Expand Up @@ -150,8 +152,8 @@ func NewNamespacedController(ctx context.Context, watcher configmap.Watcher, con
secretinformer.Get(ctx).Informer().AddEventHandler(controller.HandleAll(reconciler.Tracker.OnChanged))

// Reconciler Trigger when the OIDC service account changes
serviceaccountInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.FilterController(&eventing.Trigger{}),
oidcServiceaccountInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: filterOIDCServiceAccounts(triggerInformer.Lister(), brokerInformer.Lister(), kafka.NamespacedBrokerClass, FinalizerName),
Handler: controller.HandleAll(impl.EnqueueControllerOf),
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ import (
"knative.dev/pkg/configmap"
reconcilertesting "knative.dev/pkg/reconciler/testing"

_ "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount/filtered/fake"
_ "knative.dev/pkg/client/injection/kube/informers/factory/filtered/fake"

_ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker/fake"
_ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger/fake"

Expand All @@ -37,7 +40,7 @@ import (
)

func TestNewNamespacedController(t *testing.T) {
ctx, _ := reconcilertesting.SetupFakeContext(t)
ctx, _ := reconcilertesting.SetupFakeContext(t, SetUpInformerSelector)

ctx = clientpool.WithKafkaClientPool(ctx)

Expand Down
Loading

0 comments on commit 6c4665c

Please sign in to comment.