Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

⚠️ Refactor source/handler/predicate packages to remove dep injection #2120

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ issues:
- Subprocess launch(ed with variable|ing should be audited)
- (G204|G104|G307)
- "ST1000: at least one file in a package should have a package comment"
- "SA1019: \"sigs.k8s.io/controller-runtime/pkg/runtime/inject\""
- "SA1019: inject.*"
exclude-rules:
- linters:
- gosec
Expand Down
6 changes: 3 additions & 3 deletions examples/builtins/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ func main() {
}

// Watch ReplicaSets and enqueue ReplicaSet object key
if err := c.Watch(&source.Kind{Type: &appsv1.ReplicaSet{}}, &handler.EnqueueRequestForObject{}); err != nil {
if err := c.Watch(source.Kind(mgr.GetCache(), &appsv1.ReplicaSet{}), &handler.EnqueueRequestForObject{}); err != nil {
entryLog.Error(err, "unable to watch ReplicaSets")
os.Exit(1)
}

// Watch Pods and enqueue owning ReplicaSet key
if err := c.Watch(&source.Kind{Type: &corev1.Pod{}},
&handler.EnqueueRequestForOwner{OwnerType: &appsv1.ReplicaSet{}, IsController: true}); err != nil {
if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}),
handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &appsv1.ReplicaSet{}, handler.OnlyControllerOwner())); err != nil {
entryLog.Error(err, "unable to watch Pods")
os.Exit(1)
}
Expand Down
2 changes: 1 addition & 1 deletion hack/test-all.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ if [[ -n ${ARTIFACTS:-} ]]; then
fi

result=0
go test -race ${P_FLAG} ${MOD_OPT} ./... ${GINKGO_ARGS} || result=$?
go test -v -race ${P_FLAG} ${MOD_OPT} ./... --ginkgo.fail-fast ${GINKGO_ARGS} || result=$?
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These were actually useful to catch errors right away, and have a bit more verbosity when tests are running, we should probably keep them


if [[ -n ${ARTIFACTS:-} ]]; then
mkdir -p ${ARTIFACTS}
Expand Down
22 changes: 12 additions & 10 deletions pkg/builder/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
internalsource "sigs.k8s.io/controller-runtime/pkg/internal/source"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand Down Expand Up @@ -217,11 +218,11 @@ func (blder *Builder) project(obj client.Object, proj objectProjection) (client.
func (blder *Builder) doWatch() error {
// Reconcile type
if blder.forInput.object != nil {
typeForSrc, err := blder.project(blder.forInput.object, blder.forInput.objectProjection)
obj, err := blder.project(blder.forInput.object, blder.forInput.objectProjection)
if err != nil {
return err
}
src := &source.Kind{Type: typeForSrc}
src := source.Kind(blder.mgr.GetCache(), obj)
hdler := &handler.EnqueueRequestForObject{}
allPredicates := append(blder.globalPredicates, blder.forInput.predicates...)
if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
Expand All @@ -234,15 +235,16 @@ func (blder *Builder) doWatch() error {
return errors.New("Owns() can only be used together with For()")
}
for _, own := range blder.ownsInput {
typeForSrc, err := blder.project(own.object, own.objectProjection)
obj, err := blder.project(own.object, own.objectProjection)
if err != nil {
return err
}
src := &source.Kind{Type: typeForSrc}
hdler := &handler.EnqueueRequestForOwner{
OwnerType: blder.forInput.object,
IsController: true,
}
src := source.Kind(blder.mgr.GetCache(), obj)
hdler := handler.EnqueueRequestForOwner(
blder.mgr.GetScheme(), blder.mgr.GetRESTMapper(),
blder.forInput.object,
handler.OnlyControllerOwner(),
)
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, own.predicates...)
if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
Expand All @@ -258,8 +260,8 @@ func (blder *Builder) doWatch() error {
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, w.predicates...)

// If the source of this watch is of type *source.Kind, project it.
if srckind, ok := w.src.(*source.Kind); ok {
// If the source of this watch is of type Kind, project it.
if srckind, ok := w.src.(*internalsource.Kind); ok {
typeForSrc, err := blder.project(srckind.Type, w.objectProjection)
if err != nil {
return err
Expand Down
18 changes: 10 additions & 8 deletions pkg/builder/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ var _ = Describe("application", func() {
Expect(err).NotTo(HaveOccurred())

instance, err := ControllerManagedBy(m).
Watches(&source.Kind{Type: &appsv1.ReplicaSet{}}, &handler.EnqueueRequestForObject{}).
Watches(source.Kind(m.GetCache(), &appsv1.ReplicaSet{}), &handler.EnqueueRequestForObject{}).
Build(noop)
Expect(err).To(MatchError(ContainSubstring("one of For() or Named() must be called")))
Expect(instance).To(BeNil())
Expand Down Expand Up @@ -157,7 +157,7 @@ var _ = Describe("application", func() {

instance, err := ControllerManagedBy(m).
Named("my_controller").
Watches(&source.Kind{Type: &appsv1.ReplicaSet{}}, &handler.EnqueueRequestForObject{}).
Watches(source.Kind(m.GetCache(), &appsv1.ReplicaSet{}), &handler.EnqueueRequestForObject{}).
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now the new experience on how to create a controller in a manager. Over time, the builder has increased the options we're passing in, which is ok, but we've also had to deal with all of the dependency injection stuff which isn't pretty at all.

This UX might be a stopgap, but it's actually nicer to be explicit where a source is getting its cache or any other function. In a follow-up PR we might want to explore easier ways to plow through this information when we're clearly building a controller in a Manager, and make the source aware that it can access whatever it needs through the manager.

For now, we can keep this as a first step breaking change and iterate on it as we see fit. The clarity might come handy in the future when we reason through the codebase.

Build(noop)
Expect(err).NotTo(HaveOccurred())
Expect(instance).NotTo(BeNil())
Expand Down Expand Up @@ -369,8 +369,9 @@ var _ = Describe("application", func() {
bldr := ControllerManagedBy(m).
For(&appsv1.Deployment{}).
Watches( // Equivalent of Owns
&source.Kind{Type: &appsv1.ReplicaSet{}},
&handler.EnqueueRequestForOwner{OwnerType: &appsv1.Deployment{}, IsController: true})
source.Kind(m.GetCache(), &appsv1.ReplicaSet{}),
handler.EnqueueRequestForOwner(m.GetScheme(), m.GetRESTMapper(), &appsv1.Deployment{}, handler.OnlyControllerOwner()),
)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -384,10 +385,11 @@ var _ = Describe("application", func() {
bldr := ControllerManagedBy(m).
Named("Deployment").
Watches( // Equivalent of For
&source.Kind{Type: &appsv1.Deployment{}}, &handler.EnqueueRequestForObject{}).
source.Kind(m.GetCache(), &appsv1.Deployment{}), &handler.EnqueueRequestForObject{}).
Watches( // Equivalent of Owns
&source.Kind{Type: &appsv1.ReplicaSet{}},
&handler.EnqueueRequestForOwner{OwnerType: &appsv1.Deployment{}, IsController: true})
source.Kind(m.GetCache(), &appsv1.ReplicaSet{}),
handler.EnqueueRequestForOwner(m.GetScheme(), m.GetRESTMapper(), &appsv1.Deployment{}, handler.OnlyControllerOwner()),
)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -481,7 +483,7 @@ var _ = Describe("application", func() {
bldr := ControllerManagedBy(mgr).
For(&appsv1.Deployment{}, OnlyMetadata).
Owns(&appsv1.ReplicaSet{}, OnlyMetadata).
Watches(&source.Kind{Type: &appsv1.StatefulSet{}},
Watches(source.Kind(mgr.GetCache(), &appsv1.StatefulSet{}),
handler.EnqueueRequestsFromMapFunc(func(o client.Object) []reconcile.Request {
defer GinkgoRecover()

Expand Down
11 changes: 3 additions & 8 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,12 @@ import (

// Cluster provides various methods to interact with a cluster.
type Cluster interface {
// SetFields will set any dependencies on an object for which the object has implemented the inject
// interface - e.g. inject.Client.
// Deprecated: use the equivalent Options field to set a field. This method will be removed in v0.10.
SetFields(interface{}) error

// GetConfig returns an initialized Config
GetConfig() *rest.Config

// GetCache returns a cache.Cache
GetCache() cache.Cache

// GetScheme returns an initialized Scheme
GetScheme() *runtime.Scheme

Expand All @@ -57,9 +55,6 @@ type Cluster interface {
// GetFieldIndexer returns a client.FieldIndexer configured with the client
GetFieldIndexer() client.FieldIndexer

// GetCache returns a cache.Cache
GetCache() cache.Cache

// GetEventRecorderFor returns a new EventRecorder for the provided name
GetEventRecorderFor(name string) record.EventRecorder

Expand Down
128 changes: 0 additions & 128 deletions pkg/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,8 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/cache/informertest"
"sigs.k8s.io/controller-runtime/pkg/client"
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder"
"sigs.k8s.io/controller-runtime/pkg/runtime/inject"
)

var _ = Describe("cluster.Cluster", func() {
Expand Down Expand Up @@ -111,78 +108,6 @@ var _ = Describe("cluster.Cluster", func() {
})
})

Describe("SetFields", func() {
It("should inject field values", func() {
c, err := New(cfg, func(o *Options) {
o.NewCache = func(_ *rest.Config, _ cache.Options) (cache.Cache, error) {
return &informertest.FakeInformers{}, nil
}
})
Expect(err).NotTo(HaveOccurred())

By("Injecting the dependencies")
err = c.SetFields(&injectable{
scheme: func(scheme *runtime.Scheme) error {
defer GinkgoRecover()
Expect(scheme).To(Equal(c.GetScheme()))
return nil
},
config: func(config *rest.Config) error {
defer GinkgoRecover()
Expect(config).To(Equal(c.GetConfig()))
return nil
},
client: func(client client.Client) error {
defer GinkgoRecover()
Expect(client).To(Equal(c.GetClient()))
return nil
},
cache: func(cache cache.Cache) error {
defer GinkgoRecover()
Expect(cache).To(Equal(c.GetCache()))
return nil
},
log: func(logger logr.Logger) error {
defer GinkgoRecover()
Expect(logger).To(Equal(logf.RuntimeLog.WithName("cluster")))
return nil
},
})
Expect(err).NotTo(HaveOccurred())

By("Returning an error if dependency injection fails")

expected := fmt.Errorf("expected error")
err = c.SetFields(&injectable{
client: func(client client.Client) error {
return expected
},
})
Expect(err).To(Equal(expected))

err = c.SetFields(&injectable{
scheme: func(scheme *runtime.Scheme) error {
return expected
},
})
Expect(err).To(Equal(expected))

err = c.SetFields(&injectable{
config: func(config *rest.Config) error {
return expected
},
})
Expect(err).To(Equal(expected))

err = c.SetFields(&injectable{
cache: func(c cache.Cache) error {
return expected
},
})
Expect(err).To(Equal(expected))
})
})

It("should not leak goroutines when stopped", func() {
currentGRs := goleak.IgnoreCurrent()

Expand Down Expand Up @@ -242,56 +167,3 @@ var _ = Describe("cluster.Cluster", func() {
Expect(c.GetAPIReader()).NotTo(BeNil())
})
})

var _ inject.Cache = &injectable{}
var _ inject.Client = &injectable{}
var _ inject.Scheme = &injectable{}
var _ inject.Config = &injectable{}
var _ inject.Logger = &injectable{}

type injectable struct {
scheme func(scheme *runtime.Scheme) error
client func(client.Client) error
config func(config *rest.Config) error
cache func(cache.Cache) error
log func(logger logr.Logger) error
}

func (i *injectable) InjectCache(c cache.Cache) error {
if i.cache == nil {
return nil
}
return i.cache(c)
}

func (i *injectable) InjectConfig(config *rest.Config) error {
if i.config == nil {
return nil
}
return i.config(config)
}

func (i *injectable) InjectClient(c client.Client) error {
if i.client == nil {
return nil
}
return i.client(c)
}

func (i *injectable) InjectScheme(scheme *runtime.Scheme) error {
if i.scheme == nil {
return nil
}
return i.scheme(scheme)
}

func (i *injectable) InjectLogger(log logr.Logger) error {
if i.log == nil {
return nil
}
return i.log(log)
}

func (i *injectable) Start(<-chan struct{}) error {
return nil
}
23 changes: 0 additions & 23 deletions pkg/cluster/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder"
"sigs.k8s.io/controller-runtime/pkg/runtime/inject"
)

type cluster struct {
Expand Down Expand Up @@ -64,28 +63,6 @@ type cluster struct {
logger logr.Logger
}

func (c *cluster) SetFields(i interface{}) error {
if _, err := inject.ConfigInto(c.config, i); err != nil {
return err
}
if _, err := inject.ClientInto(c.client, i); err != nil {
return err
}
if _, err := inject.APIReaderInto(c.apiReader, i); err != nil {
return err
}
if _, err := inject.SchemeInto(c.scheme, i); err != nil {
return err
}
if _, err := inject.CacheInto(c.cache, i); err != nil {
return err
}
if _, err := inject.MapperInto(c.mapper, i); err != nil {
return err
}
return nil
}

func (c *cluster) GetConfig() *rest.Config {
return c.config
}
Expand Down
6 changes: 0 additions & 6 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,6 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller
options.RateLimiter = workqueue.DefaultControllerRateLimiter()
}

// Inject dependencies into Reconciler
if err := mgr.SetFields(options.Reconciler); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty sure this will break people, as Kubebuilder at least used to scaffold in a way that this ended up being done, not sure if that is still the case today. You sure you want to just remove this instead of warn for one release if any of the things end up getting injected?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, these fields have been deprecated since 0.10; it's better to not linger the support for dependency injection longer. We can do a couple of things to avoid breaking, including giving a heads up to folks in the mailing list.

return nil, err
}

if options.RecoverPanic == nil {
options.RecoverPanic = mgr.GetControllerOptions().RecoverPanic
}
Expand All @@ -156,7 +151,6 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller
},
MaxConcurrentReconciles: options.MaxConcurrentReconciles,
CacheSyncTimeout: options.CacheSyncTimeout,
SetFields: mgr.SetFields,
Name: name,
LogConstructor: options.LogConstructor,
RecoverPanic: options.RecoverPanic,
Expand Down
9 changes: 5 additions & 4 deletions pkg/controller/controller_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,13 @@ var _ = Describe("controller", func() {
Expect(err).NotTo(HaveOccurred())

By("Watching Resources")
err = instance.Watch(&source.Kind{Type: &appsv1.ReplicaSet{}}, &handler.EnqueueRequestForOwner{
OwnerType: &appsv1.Deployment{},
})
err = instance.Watch(
source.Kind(cm.GetCache(), &appsv1.ReplicaSet{}),
handler.EnqueueRequestForOwner(cm.GetScheme(), cm.GetRESTMapper(), &appsv1.Deployment{}),
)
Expect(err).NotTo(HaveOccurred())

err = instance.Watch(&source.Kind{Type: &appsv1.Deployment{}}, &handler.EnqueueRequestForObject{})
err = instance.Watch(source.Kind(cm.GetCache(), &appsv1.Deployment{}), &handler.EnqueueRequestForObject{})
Expect(err).NotTo(HaveOccurred())

err = cm.GetClient().Get(ctx, types.NamespacedName{Name: "foo"}, &corev1.Namespace{})
Expand Down
Loading