From a4e8aca85c917413ad406fff1281bd1578c7ebc9 Mon Sep 17 00:00:00 2001 From: Tim Ramlot <42113979+inteon@users.noreply.github.com> Date: Sun, 11 Feb 2024 10:59:00 +0100 Subject: [PATCH] move Source definitions to a single folder without breaking the API Signed-off-by: Tim Ramlot <42113979+inteon@users.noreply.github.com> --- pkg/builder/controller.go | 11 +- pkg/source/internal/channel.go | 150 +++++++++++++++ .../internal}/event_handler.go | 0 pkg/source/internal/func.go | 39 ++++ pkg/source/internal/informer.go | 53 ++++++ .../internal}/internal_suite_test.go | 0 .../internal}/internal_test.go | 2 +- .../source => source/internal}/kind.go | 33 ++++ pkg/source/source.go | 173 +----------------- 9 files changed, 292 insertions(+), 169 deletions(-) create mode 100644 pkg/source/internal/channel.go rename pkg/{internal/source => source/internal}/event_handler.go (100%) create mode 100644 pkg/source/internal/func.go create mode 100644 pkg/source/internal/informer.go rename pkg/{internal/source => source/internal}/internal_suite_test.go (100%) rename pkg/{internal/source => source/internal}/internal_test.go (99%) rename pkg/{internal/source => source/internal}/kind.go (77%) diff --git a/pkg/builder/controller.go b/pkg/builder/controller.go index 1a115f2f7b..a649f47071 100644 --- a/pkg/builder/controller.go +++ b/pkg/builder/controller.go @@ -30,7 +30,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client/apiutil" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/handler" - internalsource "sigs.k8s.io/controller-runtime/pkg/internal/source" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -313,12 +312,14 @@ func (blder *Builder) doWatch() error { } for _, w := range blder.watchesInput { // If the source of this watch is of type Kind, project it. - if srcKind, ok := w.src.(*internalsource.Kind); ok { - typeForSrc, err := blder.project(srcKind.Type, w.objectProjection) - if err != nil { + if srcKind, ok := w.src.(interface { + ProjectObject(func(client.Object) (client.Object, error)) error + }); ok { + if err := srcKind.ProjectObject(func(o client.Object) (client.Object, error) { + return blder.project(o, w.objectProjection) + }); err != nil { return err } - srcKind.Type = typeForSrc } allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...) allPredicates = append(allPredicates, w.predicates...) diff --git a/pkg/source/internal/channel.go b/pkg/source/internal/channel.go new file mode 100644 index 0000000000..9471be24e4 --- /dev/null +++ b/pkg/source/internal/channel.go @@ -0,0 +1,150 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package internal + +import ( + "context" + "fmt" + "sync" + + "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" +) + +const ( + // defaultBufferSize is the default number of event notifications that can be buffered. + defaultBufferSize = 1024 +) + +// Channel is used to provide a source of events originating outside the cluster +// (e.g. GitHub Webhook callback). Channel requires the user to wire the external +// source (eh.g. http handler) to write GenericEvents to the underlying channel. +type Channel struct { + // once ensures the event distribution goroutine will be performed only once + once sync.Once + + // Source is the source channel to fetch GenericEvents + Source <-chan event.GenericEvent + + // dest is the destination channels of the added event handlers + dest []chan event.GenericEvent + + // DestBufferSize is the specified buffer size of dest channels. + // Default to 1024 if not specified. + DestBufferSize int + + // destLock is to ensure the destination channels are safely added/removed + destLock sync.Mutex +} + +func (cs *Channel) String() string { + return fmt.Sprintf("channel source: %p", cs) +} + +// Start implements Source and should only be called by the Controller. +func (cs *Channel) Start( + ctx context.Context, + handler handler.EventHandler, + queue workqueue.RateLimitingInterface, + prct ...predicate.Predicate) error { + // Source should have been specified by the user. + if cs.Source == nil { + return fmt.Errorf("must specify Channel.Source") + } + + // use default value if DestBufferSize not specified + if cs.DestBufferSize == 0 { + cs.DestBufferSize = defaultBufferSize + } + + dst := make(chan event.GenericEvent, cs.DestBufferSize) + + cs.destLock.Lock() + cs.dest = append(cs.dest, dst) + cs.destLock.Unlock() + + cs.once.Do(func() { + // Distribute GenericEvents to all EventHandler / Queue pairs Watching this source + go cs.syncLoop(ctx) + }) + + go func() { + for evt := range dst { + shouldHandle := true + for _, p := range prct { + if !p.Generic(evt) { + shouldHandle = false + break + } + } + + if shouldHandle { + func() { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + handler.Generic(ctx, evt, queue) + }() + } + } + }() + + return nil +} + +func (cs *Channel) doStop() { + cs.destLock.Lock() + defer cs.destLock.Unlock() + + for _, dst := range cs.dest { + close(dst) + } +} + +func (cs *Channel) distribute(evt event.GenericEvent) { + cs.destLock.Lock() + defer cs.destLock.Unlock() + + for _, dst := range cs.dest { + // We cannot make it under goroutine here, or we'll meet the + // race condition of writing message to closed channels. + // To avoid blocking, the dest channels are expected to be of + // proper buffer size. If we still see it blocked, then + // the controller is thought to be in an abnormal state. + dst <- evt + } +} + +func (cs *Channel) syncLoop(ctx context.Context) { + for { + select { + case <-ctx.Done(): + // Close destination channels + cs.doStop() + return + case evt, stillOpen := <-cs.Source: + if !stillOpen { + // if the source channel is closed, we're never gonna get + // anything more on it, so stop & bail + cs.doStop() + return + } + cs.distribute(evt) + } + } +} diff --git a/pkg/internal/source/event_handler.go b/pkg/source/internal/event_handler.go similarity index 100% rename from pkg/internal/source/event_handler.go rename to pkg/source/internal/event_handler.go diff --git a/pkg/source/internal/func.go b/pkg/source/internal/func.go new file mode 100644 index 0000000000..9a0e43bb09 --- /dev/null +++ b/pkg/source/internal/func.go @@ -0,0 +1,39 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package internal + +import ( + "context" + "fmt" + + "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" +) + +// Func is a function that implements Source. +type Func func(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error + +// Start implements Source. +func (f Func) Start(ctx context.Context, evt handler.EventHandler, queue workqueue.RateLimitingInterface, + pr ...predicate.Predicate) error { + return f(ctx, evt, queue, pr...) +} + +func (f Func) String() string { + return fmt.Sprintf("func source: %p", f) +} diff --git a/pkg/source/internal/informer.go b/pkg/source/internal/informer.go new file mode 100644 index 0000000000..63023c583e --- /dev/null +++ b/pkg/source/internal/informer.go @@ -0,0 +1,53 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package internal + +import ( + "context" + "fmt" + + "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" +) + +// Informer is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create). +type Informer struct { + // Informer is the controller-runtime Informer + Informer cache.Informer +} + +// Start is internal and should be called only by the Controller to register an EventHandler with the Informer +// to enqueue reconcile.Requests. +func (is *Informer) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface, + prct ...predicate.Predicate) error { + // Informer should have been specified by the user. + if is.Informer == nil { + return fmt.Errorf("must specify Informer.Informer") + } + + _, err := is.Informer.AddEventHandler(NewEventHandler(ctx, queue, handler, prct).HandlerFuncs()) + if err != nil { + return err + } + return nil +} + +func (is *Informer) String() string { + return fmt.Sprintf("informer source: %p", is.Informer) +} diff --git a/pkg/internal/source/internal_suite_test.go b/pkg/source/internal/internal_suite_test.go similarity index 100% rename from pkg/internal/source/internal_suite_test.go rename to pkg/source/internal/internal_suite_test.go diff --git a/pkg/internal/source/internal_test.go b/pkg/source/internal/internal_test.go similarity index 99% rename from pkg/internal/source/internal_test.go rename to pkg/source/internal/internal_test.go index 0574f7180e..c7b7aa4847 100644 --- a/pkg/internal/source/internal_test.go +++ b/pkg/source/internal/internal_test.go @@ -25,7 +25,7 @@ import ( "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" - internal "sigs.k8s.io/controller-runtime/pkg/internal/source" + internal "sigs.k8s.io/controller-runtime/pkg/source/internal" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" diff --git a/pkg/internal/source/kind.go b/pkg/source/internal/kind.go similarity index 77% rename from pkg/internal/source/kind.go rename to pkg/source/internal/kind.go index b3a8227125..14483dcd08 100644 --- a/pkg/internal/source/kind.go +++ b/pkg/source/internal/kind.go @@ -1,3 +1,19 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package internal import ( @@ -94,6 +110,23 @@ func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue w return nil } +// ProjectObject sets the Kind's object to the given object. +// This function should only be called by the Controller builder. +// NOTE: make sure to update pkg/builder/controller.go if you change this function. +func (ks *Kind) ProjectObject(fn func(client.Object) (client.Object, error)) error { + if ks.startCancel != nil { + return fmt.Errorf("cannot project object after Start has been called") + } + + newType, err := fn(ks.Type) + if err != nil { + return err + } + + ks.Type = newType + return nil +} + func (ks *Kind) String() string { if ks.Type != nil { return fmt.Sprintf("kind source: %T", ks.Type) diff --git a/pkg/source/source.go b/pkg/source/source.go index 099c8d68fa..cf359d355c 100644 --- a/pkg/source/source.go +++ b/pkg/source/source.go @@ -18,24 +18,16 @@ package source import ( "context" - "fmt" - "sync" "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" - internal "sigs.k8s.io/controller-runtime/pkg/internal/source" + internal "sigs.k8s.io/controller-runtime/pkg/source/internal" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/predicate" ) -const ( - // defaultBufferSize is the default number of event notifications that can be buffered. - defaultBufferSize = 1024 -) - // Source is a source of events (eh.g. Create, Update, Delete operations on Kubernetes Objects, Webhook callbacks, etc) // which should be processed by event.EventHandlers to enqueue reconcile.Requests. // @@ -62,164 +54,19 @@ func Kind(cache cache.Cache, object client.Object) SyncingSource { return &internal.Kind{Type: object, Cache: cache} } -var _ Source = &Channel{} +// Informer is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create). +type Informer = internal.Informer // Channel is used to provide a source of events originating outside the cluster // (e.g. GitHub Webhook callback). Channel requires the user to wire the external // source (eh.g. http handler) to write GenericEvents to the underlying channel. -type Channel struct { - // once ensures the event distribution goroutine will be performed only once - once sync.Once - - // Source is the source channel to fetch GenericEvents - Source <-chan event.GenericEvent - - // dest is the destination channels of the added event handlers - dest []chan event.GenericEvent - - // DestBufferSize is the specified buffer size of dest channels. - // Default to 1024 if not specified. - DestBufferSize int - - // destLock is to ensure the destination channels are safely added/removed - destLock sync.Mutex -} - -func (cs *Channel) String() string { - return fmt.Sprintf("channel source: %p", cs) -} - -// Start implements Source and should only be called by the Controller. -func (cs *Channel) Start( - ctx context.Context, - handler handler.EventHandler, - queue workqueue.RateLimitingInterface, - prct ...predicate.Predicate) error { - // Source should have been specified by the user. - if cs.Source == nil { - return fmt.Errorf("must specify Channel.Source") - } - - // use default value if DestBufferSize not specified - if cs.DestBufferSize == 0 { - cs.DestBufferSize = defaultBufferSize - } - - dst := make(chan event.GenericEvent, cs.DestBufferSize) - - cs.destLock.Lock() - cs.dest = append(cs.dest, dst) - cs.destLock.Unlock() - - cs.once.Do(func() { - // Distribute GenericEvents to all EventHandler / Queue pairs Watching this source - go cs.syncLoop(ctx) - }) - - go func() { - for evt := range dst { - shouldHandle := true - for _, p := range prct { - if !p.Generic(evt) { - shouldHandle = false - break - } - } - - if shouldHandle { - func() { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - handler.Generic(ctx, evt, queue) - }() - } - } - }() - - return nil -} - -func (cs *Channel) doStop() { - cs.destLock.Lock() - defer cs.destLock.Unlock() - - for _, dst := range cs.dest { - close(dst) - } -} - -func (cs *Channel) distribute(evt event.GenericEvent) { - cs.destLock.Lock() - defer cs.destLock.Unlock() - - for _, dst := range cs.dest { - // We cannot make it under goroutine here, or we'll meet the - // race condition of writing message to closed channels. - // To avoid blocking, the dest channels are expected to be of - // proper buffer size. If we still see it blocked, then - // the controller is thought to be in an abnormal state. - dst <- evt - } -} - -func (cs *Channel) syncLoop(ctx context.Context) { - for { - select { - case <-ctx.Done(): - // Close destination channels - cs.doStop() - return - case evt, stillOpen := <-cs.Source: - if !stillOpen { - // if the source channel is closed, we're never gonna get - // anything more on it, so stop & bail - cs.doStop() - return - } - cs.distribute(evt) - } - } -} - -// Informer is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create). -type Informer struct { - // Informer is the controller-runtime Informer - Informer cache.Informer -} - -var _ Source = &Informer{} - -// Start is internal and should be called only by the Controller to register an EventHandler with the Informer -// to enqueue reconcile.Requests. -func (is *Informer) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface, - prct ...predicate.Predicate) error { - // Informer should have been specified by the user. - if is.Informer == nil { - return fmt.Errorf("must specify Informer.Informer") - } - - _, err := is.Informer.AddEventHandler(internal.NewEventHandler(ctx, queue, handler, prct).HandlerFuncs()) - if err != nil { - return err - } - return nil -} - -func (is *Informer) String() string { - return fmt.Sprintf("informer source: %p", is.Informer) -} - -var _ Source = Func(nil) +type Channel = internal.Channel // Func is a function that implements Source. -type Func func(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error +type Func = internal.Func -// Start implements Source. -func (f Func) Start(ctx context.Context, evt handler.EventHandler, queue workqueue.RateLimitingInterface, - pr ...predicate.Predicate) error { - return f(ctx, evt, queue, pr...) -} - -func (f Func) String() string { - return fmt.Sprintf("func source: %p", f) -} +var _ Source = &internal.Kind{} +var _ SyncingSource = &internal.Kind{} +var _ Source = &internal.Informer{} +var _ Source = &internal.Channel{} +var _ Source = internal.Func(nil)