Skip to content

Commit

Permalink
Source: Move handler and predicate out from Start
Browse files Browse the repository at this point in the history
  • Loading branch information
alvaroaleman committed Apr 14, 2024
1 parent 45e166d commit 9931919
Show file tree
Hide file tree
Showing 15 changed files with 339 additions and 380 deletions.
6 changes: 3 additions & 3 deletions examples/builtins/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ func main() {
}

// Watch ReplicaSets and enqueue ReplicaSet object key
if err := c.Watch(source.Kind(mgr.GetCache(), &appsv1.ReplicaSet{}), &handler.EnqueueRequestForObject{}); err != nil {
if err := c.Watch(source.Kind(mgr.GetCache(), &appsv1.ReplicaSet{}, &handler.EnqueueRequestForObject{})); err != nil {
entryLog.Error(err, "unable to watch ReplicaSets")
os.Exit(1)
}

// Watch Pods and enqueue owning ReplicaSet key
if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}),
handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &appsv1.ReplicaSet{}, handler.OnlyControllerOwner())); err != nil {
if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{},
handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &appsv1.ReplicaSet{}, handler.OnlyControllerOwner()))); err != nil {
entryLog.Error(err, "unable to watch Pods")
os.Exit(1)
}
Expand Down
29 changes: 17 additions & 12 deletions pkg/builder/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ func (blder *Builder) Owns(object client.Object, opts ...OwnsOption) *Builder {
// WatchesInput represents the information set by Watches method.
type WatchesInput struct {
src source.Source
eventHandler handler.EventHandler
predicates []predicate.Predicate
objectProjection objectProjection
}
Expand All @@ -135,8 +134,13 @@ type WatchesInput struct {
// This is the equivalent of calling
// WatchesRawSource(source.Kind(cache, object), eventHandler, opts...).
func (blder *Builder) Watches(object client.Object, eventHandler handler.EventHandler, opts ...WatchesOption) *Builder {
src := source.Kind(blder.mgr.GetCache(), object)
return blder.WatchesRawSource(src, eventHandler, opts...)
input := WatchesInput{}
for _, opt := range opts {
opt.ApplyToWatches(&input)
}
src := source.Kind(blder.mgr.GetCache(), object, eventHandler, input.predicates...)

return blder.WatchesRawSource(src, opts...)
}

// WatchesMetadata is the same as Watches, but forces the internal cache to only watch PartialObjectMetadata.
Expand Down Expand Up @@ -176,8 +180,8 @@ func (blder *Builder) WatchesMetadata(object client.Object, eventHandler handler
//
// STOP! Consider using For(...), Owns(...), Watches(...), WatchesMetadata(...) instead.
// This method is only exposed for more advanced use cases, most users should use one of the higher level functions.
func (blder *Builder) WatchesRawSource(src source.Source, eventHandler handler.EventHandler, opts ...WatchesOption) *Builder {
input := WatchesInput{src: src, eventHandler: eventHandler}
func (blder *Builder) WatchesRawSource(src source.Source, opts ...WatchesOption) *Builder {
input := WatchesInput{src: src}
for _, opt := range opts {
opt.ApplyToWatches(&input)
}
Expand Down Expand Up @@ -272,11 +276,11 @@ func (blder *Builder) doWatch() error {
if err != nil {
return err
}
src := source.Kind(blder.mgr.GetCache(), obj)
hdler := &handler.EnqueueRequestForObject{}
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, blder.forInput.predicates...)
if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
src := source.Kind(blder.mgr.GetCache(), obj, hdler, allPredicates...)
if err := blder.ctrl.Watch(src); err != nil {
return err
}
}
Expand All @@ -290,7 +294,6 @@ func (blder *Builder) doWatch() error {
if err != nil {
return err
}
src := source.Kind(blder.mgr.GetCache(), obj)
opts := []handler.OwnerOption{}
if !own.matchEveryOwner {
opts = append(opts, handler.OnlyControllerOwner())
Expand All @@ -302,7 +305,8 @@ func (blder *Builder) doWatch() error {
)
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, own.predicates...)
if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
src := source.Kind(blder.mgr.GetCache(), obj, hdler, allPredicates...)
if err := blder.ctrl.Watch(src); err != nil {
return err
}
}
Expand All @@ -311,18 +315,19 @@ func (blder *Builder) doWatch() error {
if len(blder.watchesInput) == 0 && blder.forInput.object == nil {
return errors.New("there are no watches configured, controller will never get triggered. Use For(), Owns() or Watches() to set them up")
}
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
for _, w := range blder.watchesInput {
// If the source of this watch is of type Kind, project it.
if srcKind, ok := w.src.(*internalsource.Kind); ok {
allPredicates := append(allPredicates, w.predicates...)
typeForSrc, err := blder.project(srcKind.Type, w.objectProjection)
if err != nil {
return err
}
srcKind.Type = typeForSrc
srcKind.Predicates = append(srcKind.Predicates, allPredicates...)
}
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, w.predicates...)
if err := blder.ctrl.Watch(w.src, w.eventHandler, allPredicates...); err != nil {
if err := blder.ctrl.Watch(w.src); err != nil {
return err
}
}
Expand Down
11 changes: 2 additions & 9 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,8 @@ import (
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"

"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/internal/controller"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/ratelimiter"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
Expand Down Expand Up @@ -84,13 +82,8 @@ type Controller interface {
// Reconciler is called to reconcile an object by Namespace/Name
reconcile.Reconciler

// Watch takes events provided by a Source and uses the EventHandler to
// enqueue reconcile.Requests in response to the events.
//
// Watch may be provided one or more Predicates to filter events before
// they are given to the EventHandler. Events will be passed to the
// EventHandler if all provided Predicates evaluate to true.
Watch(src source.Source, eventhandler handler.EventHandler, predicates ...predicate.Predicate) error
// Watch watches the provided Source.
Watch(src source.Source) error

// Start starts the controller. Start blocks until the context is closed or a
// controller has an error starting.
Expand Down
7 changes: 4 additions & 3 deletions pkg/controller/controller_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,13 @@ var _ = Describe("controller", func() {

By("Watching Resources")
err = instance.Watch(
source.Kind(cm.GetCache(), &appsv1.ReplicaSet{}),
handler.EnqueueRequestForOwner(cm.GetScheme(), cm.GetRESTMapper(), &appsv1.Deployment{}),
source.Kind(cm.GetCache(), &appsv1.ReplicaSet{},
handler.EnqueueRequestForOwner(cm.GetScheme(), cm.GetRESTMapper(), &appsv1.Deployment{}),
),
)
Expect(err).NotTo(HaveOccurred())

err = instance.Watch(source.Kind(cm.GetCache(), &appsv1.Deployment{}), &handler.EnqueueRequestForObject{})
err = instance.Watch(source.Kind(cm.GetCache(), &appsv1.Deployment{}, &handler.EnqueueRequestForObject{}))
Expect(err).NotTo(HaveOccurred())

err = cm.GetClient().Get(ctx, types.NamespacedName{Name: "foo"}, &corev1.Namespace{})
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ var _ = Describe("controller.Controller", func() {

ctx, cancel := context.WithCancel(context.Background())
watchChan := make(chan event.GenericEvent, 1)
watch := &source.Channel{Source: watchChan}
watch := &source.Channel{Source: watchChan, Handler: &handler.EnqueueRequestForObject{}}
watchChan <- event.GenericEvent{Object: &corev1.Pod{}}

reconcileStarted := make(chan struct{})
Expand All @@ -101,7 +101,7 @@ var _ = Describe("controller.Controller", func() {
Expect(err).NotTo(HaveOccurred())

c, err := controller.New("new-controller", m, controller.Options{Reconciler: rec})
Expect(c.Watch(watch, &handler.EnqueueRequestForObject{})).To(Succeed())
Expect(c.Watch(watch)).To(Succeed())
Expect(err).NotTo(HaveOccurred())

go func() {
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func ExampleController() {
}

// Watch for Pod create / update / delete events and call Reconcile
err = c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}), &handler.EnqueueRequestForObject{})
err = c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}, &handler.EnqueueRequestForObject{}))
if err != nil {
log.Error(err, "unable to watch pods")
os.Exit(1)
Expand Down Expand Up @@ -108,7 +108,7 @@ func ExampleController_unstructured() {
Version: "v1",
})
// Watch for Pod create / update / delete events and call Reconcile
err = c.Watch(source.Kind(mgr.GetCache(), u), &handler.EnqueueRequestForObject{})
err = c.Watch(source.Kind(mgr.GetCache(), u, &handler.EnqueueRequestForObject{}))
if err != nil {
log.Error(err, "unable to watch pods")
os.Exit(1)
Expand Down Expand Up @@ -139,7 +139,7 @@ func ExampleNewUnmanaged() {
os.Exit(1)
}

if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}), &handler.EnqueueRequestForObject{}); err != nil {
if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}, &handler.EnqueueRequestForObject{})); err != nil {
log.Error(err, "unable to watch pods")
os.Exit(1)
}
Expand Down
88 changes: 45 additions & 43 deletions pkg/handler/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ var (
func ExampleEnqueueRequestForObject() {
// controller is a controller.controller
err := c.Watch(
source.Kind(mgr.GetCache(), &corev1.Pod{}),
&handler.EnqueueRequestForObject{},
source.Kind(mgr.GetCache(), &corev1.Pod{}, &handler.EnqueueRequestForObject{}),
)
if err != nil {
// handle it
Expand All @@ -55,8 +54,9 @@ func ExampleEnqueueRequestForObject() {
func ExampleEnqueueRequestForOwner() {
// controller is a controller.controller
err := c.Watch(
source.Kind(mgr.GetCache(), &appsv1.ReplicaSet{}),
handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &appsv1.Deployment{}, handler.OnlyControllerOwner()),
source.Kind(mgr.GetCache(), &appsv1.ReplicaSet{},
handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &appsv1.Deployment{}, handler.OnlyControllerOwner()),
),
)
if err != nil {
// handle it
Expand All @@ -68,19 +68,20 @@ func ExampleEnqueueRequestForOwner() {
func ExampleEnqueueRequestsFromMapFunc() {
// controller is a controller.controller
err := c.Watch(
source.Kind(mgr.GetCache(), &appsv1.Deployment{}),
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a client.Object) []reconcile.Request {
return []reconcile.Request{
{NamespacedName: types.NamespacedName{
Name: a.GetName() + "-1",
Namespace: a.GetNamespace(),
}},
{NamespacedName: types.NamespacedName{
Name: a.GetName() + "-2",
Namespace: a.GetNamespace(),
}},
}
}),
source.Kind(mgr.GetCache(), &appsv1.Deployment{},
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a client.Object) []reconcile.Request {
return []reconcile.Request{
{NamespacedName: types.NamespacedName{
Name: a.GetName() + "-1",
Namespace: a.GetNamespace(),
}},
{NamespacedName: types.NamespacedName{
Name: a.GetName() + "-2",
Namespace: a.GetNamespace(),
}},
}
}),
),
)
if err != nil {
// handle it
Expand All @@ -91,33 +92,34 @@ func ExampleEnqueueRequestsFromMapFunc() {
func ExampleFuncs() {
// controller is a controller.controller
err := c.Watch(
source.Kind(mgr.GetCache(), &corev1.Pod{}),
handler.Funcs{
CreateFunc: func(ctx context.Context, e event.CreateEvent, q workqueue.RateLimitingInterface) {
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: e.Object.GetName(),
Namespace: e.Object.GetNamespace(),
}})
source.Kind(mgr.GetCache(), &corev1.Pod{},
handler.Funcs{
CreateFunc: func(ctx context.Context, e event.CreateEvent, q workqueue.RateLimitingInterface) {
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: e.Object.GetName(),
Namespace: e.Object.GetNamespace(),
}})
},
UpdateFunc: func(ctx context.Context, e event.UpdateEvent, q workqueue.RateLimitingInterface) {
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: e.ObjectNew.GetName(),
Namespace: e.ObjectNew.GetNamespace(),
}})
},
DeleteFunc: func(ctx context.Context, e event.DeleteEvent, q workqueue.RateLimitingInterface) {
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: e.Object.GetName(),
Namespace: e.Object.GetNamespace(),
}})
},
GenericFunc: func(ctx context.Context, e event.GenericEvent, q workqueue.RateLimitingInterface) {
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: e.Object.GetName(),
Namespace: e.Object.GetNamespace(),
}})
},
},
UpdateFunc: func(ctx context.Context, e event.UpdateEvent, q workqueue.RateLimitingInterface) {
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: e.ObjectNew.GetName(),
Namespace: e.ObjectNew.GetNamespace(),
}})
},
DeleteFunc: func(ctx context.Context, e event.DeleteEvent, q workqueue.RateLimitingInterface) {
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: e.Object.GetName(),
Namespace: e.Object.GetNamespace(),
}})
},
GenericFunc: func(ctx context.Context, e event.GenericEvent, q workqueue.RateLimitingInterface) {
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: e.Object.GetName(),
Namespace: e.Object.GetNamespace(),
}})
},
},
),
)
if err != nil {
// handle it
Expand Down
14 changes: 5 additions & 9 deletions pkg/internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,8 @@ import (
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/client-go/util/workqueue"

"sigs.k8s.io/controller-runtime/pkg/handler"
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/ratelimiter"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
Expand Down Expand Up @@ -98,9 +96,7 @@ type Controller struct {

// watchDescription contains all the information necessary to start a watch.
type watchDescription struct {
src source.Source
handler handler.EventHandler
predicates []predicate.Predicate
src source.Source
}

// Reconcile implements reconcile.Reconciler.
Expand All @@ -124,20 +120,20 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (_ re
}

// Watch implements controller.Controller.
func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error {
func (c *Controller) Watch(src source.Source) error {
c.mu.Lock()
defer c.mu.Unlock()

// Controller hasn't started yet, store the watches locally and return.
//
// These watches are going to be held on the controller struct until the manager or user calls Start(...).
if !c.Started {
c.startWatches = append(c.startWatches, watchDescription{src: src, handler: evthdler, predicates: prct})
c.startWatches = append(c.startWatches, watchDescription{src: src})
return nil
}

c.LogConstructor(nil).Info("Starting EventSource", "source", src)
return src.Start(c.ctx, evthdler, c.Queue, prct...)
return src.Start(c.ctx, c.Queue)
}

// NeedLeaderElection implements the manager.LeaderElectionRunnable interface.
Expand Down Expand Up @@ -181,7 +177,7 @@ func (c *Controller) Start(ctx context.Context) error {
for _, watch := range c.startWatches {
c.LogConstructor(nil).Info("Starting EventSource", "source", fmt.Sprintf("%s", watch.src))

if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {
if err := watch.src.Start(ctx, c.Queue); err != nil {
return err
}
}
Expand Down
Loading

0 comments on commit 9931919

Please sign in to comment.