From 86ced6fe0d42e5f2ea80dab5fc4e3047c782656c Mon Sep 17 00:00:00 2001 From: Danil Grigorev Date: Wed, 17 Apr 2024 23:29:16 +0200 Subject: [PATCH] Refactor on Alvaro's PR Signed-off-by: Danil Grigorev --- .../controllers/kubeadmconfig_controller.go | 7 +- controllers/external/tracker.go | 6 +- controllers/remote/cluster_cache_tracker.go | 4 +- .../remote/cluster_cache_tracker_test.go | 3 +- .../internal/controllers/controller.go | 3 +- .../clusterresourceset_controller.go | 3 +- .../clusterresourcesetbinding_controller.go | 3 +- .../controllers/machinepool_controller.go | 15 ++-- .../controllers/extensionconfig_controller.go | 1 + exp/util/util.go | 2 +- go.mod | 2 +- go.sum | 4 +- .../controllers/cluster/cluster_controller.go | 3 +- .../clusterclass/clusterclass_controller.go | 5 +- .../controllers/machine/machine_controller.go | 22 +++--- .../machine_controller_noderef_test.go | 3 +- .../machinedeployment_controller.go | 5 +- .../machinehealthcheck_controller.go | 16 +++-- .../machineset/machineset_controller.go | 5 +- .../topology/cluster/cluster_controller.go | 10 +-- .../machinedeployment_controller.go | 2 +- .../machineset/machineset_controller.go | 2 +- test/go.mod | 2 +- test/go.sum | 4 +- .../dockermachinepool_controller.go | 7 +- .../controllers/dockercluster_controller.go | 3 +- .../controllers/dockermachine_controller.go | 7 +- .../controllers/inmemorycluster_controller.go | 3 +- .../controllers/inmemorymachine_controller.go | 7 +- util/predicates/cluster_predicates.go | 69 ++++++++++--------- util/predicates/cluster_predicates_test.go | 6 +- util/predicates/generic_predicates.go | 16 ++--- util/util.go | 10 +-- 33 files changed, 146 insertions(+), 114 deletions(-) diff --git a/bootstrap/kubeadm/internal/controllers/kubeadmconfig_controller.go b/bootstrap/kubeadm/internal/controllers/kubeadmconfig_controller.go index f6bc296a91c5..7cfaa04dc89b 100644 --- a/bootstrap/kubeadm/internal/controllers/kubeadmconfig_controller.go +++ b/bootstrap/kubeadm/internal/controllers/kubeadmconfig_controller.go @@ -108,6 +108,7 @@ func (r *KubeadmConfigReconciler) SetupWithManager(ctx context.Context, mgr ctrl } b := ctrl.NewControllerManagedBy(mgr). + Named("kubeadmConfig"). Add(builder.For(mgr, &bootstrapv1.KubeadmConfig{}, predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &bootstrapv1.KubeadmConfig{}), @@ -115,21 +116,21 @@ func (r *KubeadmConfigReconciler) SetupWithManager(ctx context.Context, mgr ctrl WithOptions(options). Add(builder.Watches(mgr, &clusterv1.Machine{}, - handler.EnqueueRequestsFromObjectMap(r.MachineToBootstrapMapFunc), + handler.EnqueueRequestsFromTypedMapFunc(r.MachineToBootstrapMapFunc), predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &clusterv1.Machine{}), )) if feature.Gates.Enabled(feature.MachinePool) { b = b.Add(builder.Watches(mgr, &expv1.MachinePool{}, - handler.EnqueueRequestsFromObjectMap(r.MachinePoolToBootstrapMapFunc), + handler.EnqueueRequestsFromTypedMapFunc(r.MachinePoolToBootstrapMapFunc), predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &expv1.MachinePool{}), )) } b = b.Add(builder.Watches(mgr, &clusterv1.Cluster{}, - handler.EnqueueRequestsFromObjectMap(r.ClusterToKubeadmConfigs), + handler.EnqueueRequestsFromTypedMapFunc(r.ClusterToKubeadmConfigs), predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &clusterv1.Cluster{}), predicates.ClusterUnpausedAndInfrastructureReady(ctrl.LoggerFrom(ctx)), )) diff --git a/controllers/external/tracker.go b/controllers/external/tracker.go index 850b877fae61..3d23f2b2b875 100644 --- a/controllers/external/tracker.go +++ b/controllers/external/tracker.go @@ -22,7 +22,6 @@ import ( "github.com/go-logr/logr" "github.com/pkg/errors" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" @@ -54,12 +53,9 @@ func (o *ObjectTracker) Watch(log logr.Logger, obj client.Object, handler handle return nil } - u := &unstructured.Unstructured{} - u.SetGroupVersionKind(gvk) - log.Info(fmt.Sprintf("Adding watch on external object %q", gvk.String())) err := o.Controller.Watch( - source.Kind(o.Cache, u).Prepare(handler, append(p, predicates.ResourceNotPaused(log, obj))...), + source.Kind(o.Cache, obj, handler, append(p, predicates.ResourceNotPaused(log, obj))...), ) if err != nil { o.m.Delete(key) diff --git a/controllers/remote/cluster_cache_tracker.go b/controllers/remote/cluster_cache_tracker.go index 5060729ce1a4..4b0d889d3ce9 100644 --- a/controllers/remote/cluster_cache_tracker.go +++ b/controllers/remote/cluster_cache_tracker.go @@ -536,7 +536,7 @@ func (t *ClusterCacheTracker) deleteAccessor(_ context.Context, cluster client.O // Watcher is a scoped-down interface from Controller that only knows how to watch. type Watcher interface { // Watch watches src for changes, sending events to eventHandler if they pass predicates. - Watch(src source.Source, eventHandler handler.EventHandler, predicates ...predicate.Predicate) error + Watch(src source.Source) error } // WatchInput specifies the parameters used to establish a new watch for a remote cluster. @@ -585,7 +585,7 @@ func (t *ClusterCacheTracker) Watch(ctx context.Context, input WatchInput) error } // Need to create the watch - if err := input.Watcher.Watch(source.Kind(accessor.cache, input.Kind), input.EventHandler, input.Predicates...); err != nil { + if err := input.Watcher.Watch(source.Kind(accessor.cache, input.Kind, input.EventHandler, input.Predicates...)); err != nil { return errors.Wrapf(err, "failed to add %s watch on cluster %s: failed to create watch", input.Kind, klog.KRef(input.Cluster.Namespace, input.Cluster.Name)) } diff --git a/controllers/remote/cluster_cache_tracker_test.go b/controllers/remote/cluster_cache_tracker_test.go index 77eb0baf2937..3920f957b536 100644 --- a/controllers/remote/cluster_cache_tracker_test.go +++ b/controllers/remote/cluster_cache_tracker_test.go @@ -30,7 +30,6 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" - "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/manager" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" @@ -88,7 +87,7 @@ func TestClusterCacheTracker(t *testing.T) { watch, err := ctrl.NewControllerManagedBy(mgr).For(&clusterv1.MachineDeployment{}).Build(c) g.Expect(err).ToNot(HaveOccurred()) - w = &controller.ControllerAdapter{Controller: watch} + w = watch mgrContext, mgrCancel = context.WithCancel(ctx) t.Log("Starting the manager") diff --git a/controlplane/kubeadm/internal/controllers/controller.go b/controlplane/kubeadm/internal/controllers/controller.go index 5b66c49ccb3f..b87229d79c7e 100644 --- a/controlplane/kubeadm/internal/controllers/controller.go +++ b/controlplane/kubeadm/internal/controllers/controller.go @@ -91,6 +91,7 @@ type KubeadmControlPlaneReconciler struct { func (r *KubeadmControlPlaneReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options) error { c, err := ctrl.NewControllerManagedBy(mgr). + Named("kubeadmControlPlane"). Add(builder.For(mgr, &controlplanev1.KubeadmControlPlane{}, predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &controlplanev1.KubeadmControlPlane{}), @@ -103,7 +104,7 @@ func (r *KubeadmControlPlaneReconciler) SetupWithManager(ctx context.Context, mg WithOptions(options). Add(builder.Watches(mgr, &clusterv1.Cluster{}, - handler.EnqueueRequestsFromObjectMap(r.ClusterToKubeadmControlPlane), + handler.EnqueueRequestsFromTypedMapFunc(r.ClusterToKubeadmControlPlane), predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &clusterv1.Cluster{}), predicates.ClusterUnpausedAndInfrastructureReady(ctrl.LoggerFrom(ctx)), )).Build(r) diff --git a/exp/addons/internal/controllers/clusterresourceset_controller.go b/exp/addons/internal/controllers/clusterresourceset_controller.go index 7e944fbd655b..27e7fdfdc12a 100644 --- a/exp/addons/internal/controllers/clusterresourceset_controller.go +++ b/exp/addons/internal/controllers/clusterresourceset_controller.go @@ -66,12 +66,13 @@ type ClusterResourceSetReconciler struct { func (r *ClusterResourceSetReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options) error { err := ctrl.NewControllerManagedBy(mgr). + Named("clusterResourceSet"). Add(builder.For(mgr, &addonsv1.ClusterResourceSet{}, predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &addonsv1.ClusterResourceSet{}), )). Add(builder.Watches(mgr, &clusterv1.Cluster{}, - handler.EnqueueRequestsFromObjectMap(r.clusterToClusterResourceSet), + handler.EnqueueRequestsFromTypedMapFunc(r.clusterToClusterResourceSet), predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &clusterv1.Cluster{}), )). WatchesMetadata( diff --git a/exp/addons/internal/controllers/clusterresourcesetbinding_controller.go b/exp/addons/internal/controllers/clusterresourcesetbinding_controller.go index 3bbfde969657..ea57478e427e 100644 --- a/exp/addons/internal/controllers/clusterresourcesetbinding_controller.go +++ b/exp/addons/internal/controllers/clusterresourcesetbinding_controller.go @@ -50,10 +50,11 @@ type ClusterResourceSetBindingReconciler struct { func (r *ClusterResourceSetBindingReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options) error { err := ctrl.NewControllerManagedBy(mgr). + Named("clusterResourceSetBinding"). For(&addonsv1.ClusterResourceSetBinding{}). Add(builder.Watches(mgr, &clusterv1.Cluster{}, - handler.EnqueueRequestsFromObjectMap(r.clusterToClusterResourceSetBinding), + handler.EnqueueRequestsFromTypedMapFunc(r.clusterToClusterResourceSetBinding), predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &clusterv1.Cluster{}), )). WithOptions(options). diff --git a/exp/internal/controllers/machinepool_controller.go b/exp/internal/controllers/machinepool_controller.go index 9db7630a89d3..404f42d0ac81 100644 --- a/exp/internal/controllers/machinepool_controller.go +++ b/exp/internal/controllers/machinepool_controller.go @@ -18,6 +18,7 @@ package controllers import ( "context" + "fmt" "time" "github.com/pkg/errors" @@ -87,13 +88,14 @@ func (r *MachinePoolReconciler) SetupWithManager(ctx context.Context, mgr ctrl.M } c, err := ctrl.NewControllerManagedBy(mgr). + Named("machinepool"). Add(builder.For(mgr, &expv1.MachinePool{}, predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &expv1.MachinePool{}))). WithOptions(options). Add(builder.Watches(mgr, &clusterv1.Cluster{}, - handler.EnqueueRequestsFromObjectMap(clusterToMachinePools), + handler.EnqueueRequestsFromTypedMapFunc(clusterToMachinePools), // TODO: should this wait for Cluster.Status.InfrastructureReady similar to Infra Machine resources? predicates.ClusterUnpaused(ctrl.LoggerFrom(ctx)), predicates.ResourceHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &clusterv1.Cluster{}), @@ -331,13 +333,18 @@ func (r *MachinePoolReconciler) watchClusterNodes(ctx context.Context, cluster * return r.Tracker.Watch(ctx, remote.WatchInput{ Name: "machinepool-watchNodes", Cluster: util.ObjectKey(cluster), - Watcher: &controller.ControllerAdapter{Controller: r.controller}, + Watcher: r.controller, Kind: &corev1.Node{}, - EventHandler: handler.EnqueueRequestsFromObjectMapFunc(r.nodeToMachinePool), + EventHandler: handler.EnqueueRequestsFromMapFunc(r.nodeToMachinePool), }) } -func (r *MachinePoolReconciler) nodeToMachinePool(ctx context.Context, node *corev1.Node) []reconcile.Request { +func (r *MachinePoolReconciler) nodeToMachinePool(ctx context.Context, o client.Object) []reconcile.Request { + node, ok := o.(*corev1.Node) + if !ok { + panic(fmt.Sprintf("Expected a Node but got a %T", o)) + } + var filters []client.ListOption // Match by clusterName when the node has the annotation. if clusterName, ok := node.GetAnnotations()[clusterv1.ClusterNameAnnotation]; ok { diff --git a/exp/runtime/internal/controllers/extensionconfig_controller.go b/exp/runtime/internal/controllers/extensionconfig_controller.go index f29ffe6e7dc2..5a5e74ea8458 100644 --- a/exp/runtime/internal/controllers/extensionconfig_controller.go +++ b/exp/runtime/internal/controllers/extensionconfig_controller.go @@ -62,6 +62,7 @@ type Reconciler struct { func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options) error { err := ctrl.NewControllerManagedBy(mgr). + Named("extensionconfig"). Add(builder.For(mgr, &runtimev1.ExtensionConfig{}, predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &runtimev1.ExtensionConfig{}), diff --git a/exp/util/util.go b/exp/util/util.go index 0f82a919163c..a7d451c87a90 100644 --- a/exp/util/util.go +++ b/exp/util/util.go @@ -89,7 +89,7 @@ func GetMachinePoolByLabels(ctx context.Context, c client.Client, namespace stri // MachinePoolToInfrastructureMapFunc returns a handler.MapFunc that watches for // MachinePool events and returns reconciliation requests for an infrastructure provider object. -func MachinePoolToInfrastructureMapFunc(gvk schema.GroupVersionKind, log logr.Logger) handler.ObjectMapFunc[*expv1.MachinePool] { +func MachinePoolToInfrastructureMapFunc(gvk schema.GroupVersionKind, log logr.Logger) handler.TypedMapFunc[*expv1.MachinePool] { log = log.WithValues("machine-pool-to-infra-map-func", gvk.String()) return func(_ context.Context, m *expv1.MachinePool) []reconcile.Request { log := log.WithValues("MachinePool", klog.KObj(m)) diff --git a/go.mod b/go.mod index d433675bd120..9c661c74bb28 100644 --- a/go.mod +++ b/go.mod @@ -2,7 +2,7 @@ module sigs.k8s.io/cluster-api go 1.22.0 -replace sigs.k8s.io/controller-runtime => github.com/Danil-Grigorev/controller-runtime v0.6.1-0.20240417125124-8984b3049571 +replace sigs.k8s.io/controller-runtime => github.com/Danil-Grigorev/controller-runtime v0.6.1-0.20240419082425-50710c08e9d2 require ( github.com/MakeNowJust/heredoc v1.0.0 diff --git a/go.sum b/go.sum index ff206e0a630b..00851c4468ee 100644 --- a/go.sum +++ b/go.sum @@ -20,8 +20,8 @@ github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOEl github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= -github.com/Danil-Grigorev/controller-runtime v0.6.1-0.20240417125124-8984b3049571 h1:a1Oaf+Zk1mbhUP0wVULBOLZ+b4MXLW6g/2kadPQg5yw= -github.com/Danil-Grigorev/controller-runtime v0.6.1-0.20240417125124-8984b3049571/go.mod h1:TLM3OvUJgcqHVBLVRlNylmfbOlOukMLFHtc6jo3EtIQ= +github.com/Danil-Grigorev/controller-runtime v0.6.1-0.20240417201603-e18909c2932e h1:hEGMiTp7lLNM666lIVpXzlyjkTpAMV+iYcnofBNKAYk= +github.com/Danil-Grigorev/controller-runtime v0.6.1-0.20240417201603-e18909c2932e/go.mod h1:TLM3OvUJgcqHVBLVRlNylmfbOlOukMLFHtc6jo3EtIQ= github.com/MakeNowJust/heredoc v1.0.0 h1:cXCdzVdstXyiTqTvfqk9SDHpKNjxuom+DOlyEeQ4pzQ= github.com/MakeNowJust/heredoc v1.0.0/go.mod h1:mG5amYoWBHf8vpLOuehzbGGw0EHxpZZ6lCpQ4fNJ8LE= github.com/Masterminds/goutils v1.1.1 h1:5nUrii3FMTL5diU80unEVvNevw1nH4+ZV4DSLVJLSYI= diff --git a/internal/controllers/cluster/cluster_controller.go b/internal/controllers/cluster/cluster_controller.go index f756e0a8b66e..9a4c38c681df 100644 --- a/internal/controllers/cluster/cluster_controller.go +++ b/internal/controllers/cluster/cluster_controller.go @@ -78,10 +78,11 @@ type Reconciler struct { func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options) error { c, err := ctrl.NewControllerManagedBy(mgr). + Named("cluster"). Add(builder.For(mgr, &clusterv1.Cluster{}, predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &clusterv1.Cluster{}))). Add(builder.Watches(mgr, &clusterv1.Machine{}, - handler.EnqueueRequestsFromObjectMap(r.controlPlaneMachineToCluster), + handler.EnqueueRequestsFromTypedMapFunc(r.controlPlaneMachineToCluster), predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &clusterv1.Machine{}), )). WithOptions(options). diff --git a/internal/controllers/clusterclass/clusterclass_controller.go b/internal/controllers/clusterclass/clusterclass_controller.go index a62f72ba04d4..96e070771b78 100644 --- a/internal/controllers/clusterclass/clusterclass_controller.go +++ b/internal/controllers/clusterclass/clusterclass_controller.go @@ -73,11 +73,12 @@ type Reconciler struct { } func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options) error { - err := ctrl.NewControllerManagedBy(mgr).Add(builder.For(mgr, &clusterv1.ClusterClass{}, predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &clusterv1.ClusterClass{}))). + err := ctrl.NewControllerManagedBy(mgr). Named("clusterclass"). + Add(builder.For(mgr, &clusterv1.ClusterClass{}, predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &clusterv1.ClusterClass{}))). WithOptions(options). Add(builder.Watches(mgr, &runtimev1.ExtensionConfig{}, - handler.EnqueueRequestsFromObjectMap(r.extensionConfigToClusterClass), + handler.EnqueueRequestsFromTypedMapFunc(r.extensionConfigToClusterClass), predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &runtimev1.ExtensionConfig{}))). Complete(r) diff --git a/internal/controllers/machine/machine_controller.go b/internal/controllers/machine/machine_controller.go index 032695071354..5f499818670c 100644 --- a/internal/controllers/machine/machine_controller.go +++ b/internal/controllers/machine/machine_controller.go @@ -115,14 +115,15 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt } c, err := ctrl.NewControllerManagedBy(mgr). + Named("machine"). Add(builder.For(mgr, &clusterv1.Machine{}, predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &clusterv1.Machine{}))). WithOptions(options). Add(builder.Watches(mgr, &clusterv1.Cluster{}, - handler.EnqueueRequestsFromObjectMap(clusterToMachines), + handler.EnqueueRequestsFromTypedMapFunc(clusterToMachines), // TODO: should this wait for Cluster.Status.InfrastructureReady similar to Infra Machine resources? - predicate.Any( - predicate.Any( + predicate.Or( + predicate.Or( predicates.ClusterUnpaused(ctrl.LoggerFrom(ctx)), predicates.ClusterControlPlaneInitialized(ctrl.LoggerFrom(ctx)), ), @@ -132,12 +133,12 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt )). Add(builder.Watches(mgr, &clusterv1.MachineSet{}, - handler.EnqueueRequestsFromObjectMap(msToMachines), + handler.EnqueueRequestsFromTypedMapFunc(msToMachines), predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &clusterv1.MachineSet{}), )). Add(builder.Watches(mgr, &clusterv1.MachineDeployment{}, - handler.EnqueueRequestsFromObjectMap(mdToMachines), + handler.EnqueueRequestsFromTypedMapFunc(mdToMachines), predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &clusterv1.MachineDeployment{}), )). Build(r) @@ -859,13 +860,18 @@ func (r *Reconciler) watchClusterNodes(ctx context.Context, cluster *clusterv1.C return r.Tracker.Watch(ctx, remote.WatchInput{ Name: "machine-watchNodes", Cluster: util.ObjectKey(cluster), - Watcher: &controller.ControllerAdapter{Controller: r.controller}, + Watcher: r.controller, Kind: &corev1.Node{}, - EventHandler: handler.EnqueueRequestsFromObjectMapFunc(r.nodeToMachine), + EventHandler: handler.EnqueueRequestsFromTypedMapFunc(r.nodeToMachine), }) } -func (r *Reconciler) nodeToMachine(ctx context.Context, node *corev1.Node) []reconcile.Request { +func (r *Reconciler) nodeToMachine(ctx context.Context, o client.Object) []reconcile.Request { + node, ok := o.(*corev1.Node) + if !ok { + panic(fmt.Sprintf("Expected a Node but got a %T", o)) + } + var filters []client.ListOption // Match by clusterName when the node has the annotation. if clusterName, ok := node.GetAnnotations()[clusterv1.ClusterNameAnnotation]; ok { diff --git a/internal/controllers/machine/machine_controller_noderef_test.go b/internal/controllers/machine/machine_controller_noderef_test.go index b2470356e49f..48ebcabf3b02 100644 --- a/internal/controllers/machine/machine_controller_noderef_test.go +++ b/internal/controllers/machine/machine_controller_noderef_test.go @@ -29,7 +29,6 @@ import ( "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -144,7 +143,7 @@ func TestGetNode(t *testing.T) { g.Expect(tracker.Watch(ctx, remote.WatchInput{ Name: "TestGetNode", Cluster: util.ObjectKey(testCluster), - Watcher: &controller.ControllerAdapter{Controller: w}, + Watcher: w, Kind: &corev1.Node{}, EventHandler: handler.EnqueueRequestsFromMapFunc(func(context.Context, client.Object) []reconcile.Request { return nil diff --git a/internal/controllers/machinedeployment/machinedeployment_controller.go b/internal/controllers/machinedeployment/machinedeployment_controller.go index 1c95f5c9ef11..4ef32ebc2d20 100644 --- a/internal/controllers/machinedeployment/machinedeployment_controller.go +++ b/internal/controllers/machinedeployment/machinedeployment_controller.go @@ -80,6 +80,7 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt } err = ctrl.NewControllerManagedBy(mgr). + Named("machineDeployment"). Add(builder.For(mgr, &clusterv1.MachineDeployment{}, predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &clusterv1.MachineDeployment{}))). @@ -88,13 +89,13 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt // Watches enqueues MachineDeployment for corresponding MachineSet resources, if no managed controller reference (owner) exists. Add(builder.Watches(mgr, &clusterv1.MachineSet{}, - handler.EnqueueRequestsFromObjectMap(r.MachineSetToDeployments), + handler.EnqueueRequestsFromTypedMapFunc(r.MachineSetToDeployments), predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &clusterv1.MachineSet{})), ). WithOptions(options). Add(builder.Watches(mgr, &clusterv1.Cluster{}, - handler.EnqueueRequestsFromObjectMap(clusterToMachineDeployments), + handler.EnqueueRequestsFromTypedMapFunc(clusterToMachineDeployments), // TODO: should this wait for Cluster.Status.InfrastructureReady similar to Infra Machine resources? predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &clusterv1.Cluster{}), )).Complete(r) diff --git a/internal/controllers/machinehealthcheck/machinehealthcheck_controller.go b/internal/controllers/machinehealthcheck/machinehealthcheck_controller.go index 95767a506fd2..ce88113bb3da 100644 --- a/internal/controllers/machinehealthcheck/machinehealthcheck_controller.go +++ b/internal/controllers/machinehealthcheck/machinehealthcheck_controller.go @@ -86,18 +86,19 @@ type Reconciler struct { func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options) error { c, err := ctrl.NewControllerManagedBy(mgr). + Named("machineHealthCheck"). Add(builder.For(mgr, &clusterv1.MachineHealthCheck{}, predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &clusterv1.MachineHealthCheck{}))). Add(builder.Watches(mgr, &clusterv1.Machine{}, - handler.EnqueueRequestsFromObjectMap(r.machineToMachineHealthCheck), + handler.EnqueueRequestsFromTypedMapFunc(r.machineToMachineHealthCheck), predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &clusterv1.Machine{}), )). WithOptions(options). Add(builder.Watches(mgr, &clusterv1.Cluster{}, - handler.EnqueueRequestsFromObjectMap(r.clusterToMachineHealthCheck), + handler.EnqueueRequestsFromTypedMapFunc(r.clusterToMachineHealthCheck), // TODO: should this wait for Cluster.Status.InfrastructureReady similar to Infra Machine resources? predicates.ClusterUnpaused(ctrl.LoggerFrom(ctx)), predicates.ResourceHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &clusterv1.Cluster{}), @@ -497,7 +498,12 @@ func (r *Reconciler) machineToMachineHealthCheck(ctx context.Context, m *cluster return requests } -func (r *Reconciler) nodeToMachineHealthCheck(ctx context.Context, node *corev1.Node) []reconcile.Request { +func (r *Reconciler) nodeToMachineHealthCheck(ctx context.Context, o client.Object) []reconcile.Request { + node, ok := o.(*corev1.Node) + if !ok { + panic(fmt.Sprintf("Expected a corev1.Node, got %T", o)) + } + machine, err := getMachineFromNode(ctx, r.Client, node.Name) if machine == nil || err != nil { return nil @@ -515,9 +521,9 @@ func (r *Reconciler) watchClusterNodes(ctx context.Context, cluster *clusterv1.C return r.Tracker.Watch(ctx, remote.WatchInput{ Name: "machinehealthcheck-watchClusterNodes", Cluster: util.ObjectKey(cluster), - Watcher: &controller.ControllerAdapter{Controller: r.controller}, + Watcher: r.controller, Kind: &corev1.Node{}, - EventHandler: handler.EnqueueRequestsFromObjectMapFunc(r.nodeToMachineHealthCheck), + EventHandler: handler.EnqueueRequestsFromTypedMapFunc(r.nodeToMachineHealthCheck), }) } diff --git a/internal/controllers/machineset/machineset_controller.go b/internal/controllers/machineset/machineset_controller.go index ca2dd9c8bff4..ea212c8d5d4c 100644 --- a/internal/controllers/machineset/machineset_controller.go +++ b/internal/controllers/machineset/machineset_controller.go @@ -98,18 +98,19 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt } err = ctrl.NewControllerManagedBy(mgr). + Named("machineset"). Add(builder.For(mgr, &clusterv1.MachineSet{}, predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &clusterv1.MachineSet{}))). Add(builder.Owns(mgr, &clusterv1.MachineSet{}, &clusterv1.Machine{}, predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &clusterv1.Machine{}))). // Watches enqueues MachineSet for corresponding Machine resources, if no managed controller reference (owner) exists. Add(builder.Watches(mgr, &clusterv1.Machine{}, - handler.EnqueueRequestsFromObjectMap(r.MachineToMachineSets), + handler.EnqueueRequestsFromTypedMapFunc(r.MachineToMachineSets), predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &clusterv1.Machine{}), )). WithOptions(options). Add(builder.Watches(mgr, &clusterv1.Cluster{}, - handler.EnqueueRequestsFromObjectMap(clusterToMachineSets), + handler.EnqueueRequestsFromTypedMapFunc(clusterToMachineSets), // TODO: should this wait for Cluster.Status.InfrastructureReady similar to Infra Machine resources? predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &clusterv1.Cluster{}), )).Complete(r) diff --git a/internal/controllers/topology/cluster/cluster_controller.go b/internal/controllers/topology/cluster/cluster_controller.go index a006239266e4..a9e37aae44cc 100644 --- a/internal/controllers/topology/cluster/cluster_controller.go +++ b/internal/controllers/topology/cluster/cluster_controller.go @@ -99,19 +99,19 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt Named("topology/cluster"). Add(builder.Watches(mgr, &clusterv1.ClusterClass{}, - handler.EnqueueRequestsFromObjectMap(r.clusterClassToCluster), + handler.EnqueueRequestsFromTypedMapFunc(r.clusterClassToCluster), predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &clusterv1.ClusterClass{}), )). Add(builder.Watches(mgr, &clusterv1.MachineDeployment{}, - handler.EnqueueRequestsFromObjectMap(r.machineDeploymentToCluster), + handler.EnqueueRequestsFromTypedMapFunc(r.machineDeploymentToCluster), // Only trigger Cluster reconciliation if the MachineDeployment is topology owned. predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &clusterv1.MachineDeployment{}), predicates.ResourceIsTopologyOwned(ctrl.LoggerFrom(ctx), &clusterv1.MachineDeployment{}), )). Add(builder.Watches(mgr, &expv1.MachinePool{}, - handler.EnqueueRequestsFromObjectMap(r.machinePoolToCluster), + handler.EnqueueRequestsFromTypedMapFunc(r.machinePoolToCluster), // Only trigger Cluster reconciliation if the MachinePool is topology owned. predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &expv1.MachinePool{}), predicates.ResourceIsTopologyOwned(ctrl.LoggerFrom(ctx), &expv1.MachinePool{}), @@ -299,7 +299,7 @@ func (r *Reconciler) setupDynamicWatches(ctx context.Context, s *scope.Scope) er if err := r.externalTracker.Watch(ctrl.LoggerFrom(ctx), s.Current.InfrastructureCluster, handler.EnqueueRequestForOwner(r.Client.Scheme(), r.Client.RESTMapper(), &clusterv1.Cluster{}), // Only trigger Cluster reconciliation if the InfrastructureCluster is topology owned. - predicates.ResourceIsTopologyOwned(ctrl.LoggerFrom(ctx), &clusterv1.Cluster{})); err != nil { + predicates.ResourceIsTopologyOwned[client.Object](ctrl.LoggerFrom(ctx), &clusterv1.Cluster{})); err != nil { return errors.Wrap(err, "error watching Infrastructure CR") } } @@ -307,7 +307,7 @@ func (r *Reconciler) setupDynamicWatches(ctx context.Context, s *scope.Scope) er if err := r.externalTracker.Watch(ctrl.LoggerFrom(ctx), s.Current.ControlPlane.Object, handler.EnqueueRequestForOwner(r.Client.Scheme(), r.Client.RESTMapper(), &clusterv1.Cluster{}), // Only trigger Cluster reconciliation if the ControlPlane is topology owned. - predicates.ResourceIsTopologyOwned(ctrl.LoggerFrom(ctx), &clusterv1.Cluster{})); err != nil { + predicates.ResourceIsTopologyOwned[client.Object](ctrl.LoggerFrom(ctx), &clusterv1.Cluster{})); err != nil { return errors.Wrap(err, "error watching ControlPlane CR") } } diff --git a/internal/controllers/topology/machinedeployment/machinedeployment_controller.go b/internal/controllers/topology/machinedeployment/machinedeployment_controller.go index 2f8a9372742e..3762f48b1b9c 100644 --- a/internal/controllers/topology/machinedeployment/machinedeployment_controller.go +++ b/internal/controllers/topology/machinedeployment/machinedeployment_controller.go @@ -71,7 +71,7 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt WithOptions(options). Add(builder.Watches(mgr, &clusterv1.Cluster{}, - handler.EnqueueRequestsFromObjectMap(clusterToMachineDeployments), + handler.EnqueueRequestsFromTypedMapFunc(clusterToMachineDeployments), predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &clusterv1.Cluster{}), predicates.ClusterHasTopology(ctrl.LoggerFrom(ctx)), )). diff --git a/internal/controllers/topology/machineset/machineset_controller.go b/internal/controllers/topology/machineset/machineset_controller.go index ff5a6630760b..d9ce06aaf3f3 100644 --- a/internal/controllers/topology/machineset/machineset_controller.go +++ b/internal/controllers/topology/machineset/machineset_controller.go @@ -73,7 +73,7 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt WithOptions(options). Add(builder.Watches(mgr, &clusterv1.Cluster{}, - handler.EnqueueRequestsFromObjectMap(clusterToMachineSets), + handler.EnqueueRequestsFromTypedMapFunc(clusterToMachineSets), predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &clusterv1.Cluster{}), predicates.ClusterUnpaused(ctrl.LoggerFrom(ctx)), predicates.ClusterHasTopology(ctrl.LoggerFrom(ctx)), diff --git a/test/go.mod b/test/go.mod index fd6665cd0aa1..16ce880dc7f7 100644 --- a/test/go.mod +++ b/test/go.mod @@ -4,7 +4,7 @@ go 1.22.0 replace sigs.k8s.io/cluster-api => ../ -replace sigs.k8s.io/controller-runtime => github.com/Danil-Grigorev/controller-runtime v0.6.1-0.20240417125124-8984b3049571 +replace sigs.k8s.io/controller-runtime => github.com/Danil-Grigorev/controller-runtime v0.6.1-0.20240417201603-e18909c2932e require ( github.com/blang/semver/v4 v4.0.0 diff --git a/test/go.sum b/test/go.sum index 547ad02476cd..25d853432d75 100644 --- a/test/go.sum +++ b/test/go.sum @@ -9,8 +9,8 @@ github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg6 github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/toml v1.0.0 h1:dtDWrepsVPfW9H/4y7dDgFc2MBUSeJhlaDtK13CxFlU= github.com/BurntSushi/toml v1.0.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= -github.com/Danil-Grigorev/controller-runtime v0.6.1-0.20240417125124-8984b3049571 h1:a1Oaf+Zk1mbhUP0wVULBOLZ+b4MXLW6g/2kadPQg5yw= -github.com/Danil-Grigorev/controller-runtime v0.6.1-0.20240417125124-8984b3049571/go.mod h1:TLM3OvUJgcqHVBLVRlNylmfbOlOukMLFHtc6jo3EtIQ= +github.com/Danil-Grigorev/controller-runtime v0.6.1-0.20240417201603-e18909c2932e h1:hEGMiTp7lLNM666lIVpXzlyjkTpAMV+iYcnofBNKAYk= +github.com/Danil-Grigorev/controller-runtime v0.6.1-0.20240417201603-e18909c2932e/go.mod h1:TLM3OvUJgcqHVBLVRlNylmfbOlOukMLFHtc6jo3EtIQ= github.com/MakeNowJust/heredoc v1.0.0 h1:cXCdzVdstXyiTqTvfqk9SDHpKNjxuom+DOlyEeQ4pzQ= github.com/MakeNowJust/heredoc v1.0.0/go.mod h1:mG5amYoWBHf8vpLOuehzbGGw0EHxpZZ6lCpQ4fNJ8LE= github.com/Masterminds/goutils v1.1.1 h1:5nUrii3FMTL5diU80unEVvNevw1nH4+ZV4DSLVJLSYI= diff --git a/test/infrastructure/docker/exp/internal/controllers/dockermachinepool_controller.go b/test/infrastructure/docker/exp/internal/controllers/dockermachinepool_controller.go index 2f351563f043..c6ebf91c19d8 100644 --- a/test/infrastructure/docker/exp/internal/controllers/dockermachinepool_controller.go +++ b/test/infrastructure/docker/exp/internal/controllers/dockermachinepool_controller.go @@ -172,6 +172,7 @@ func (r *DockerMachinePoolReconciler) SetupWithManager(ctx context.Context, mgr } c, err := ctrl.NewControllerManagedBy(mgr). + Named("dockerMachinePool"). Add(builder.For(mgr, &infraexpv1.DockerMachinePool{}, predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &infraexpv1.DockerMachinePool{}), @@ -179,18 +180,18 @@ func (r *DockerMachinePoolReconciler) SetupWithManager(ctx context.Context, mgr WithOptions(options). Add(builder.Watches(mgr, &expv1.MachinePool{}, - handler.EnqueueRequestsFromObjectMap(utilexp.MachinePoolToInfrastructureMapFunc( + handler.EnqueueRequestsFromTypedMapFunc(utilexp.MachinePoolToInfrastructureMapFunc( infraexpv1.GroupVersion.WithKind("DockerMachinePool"), ctrl.LoggerFrom(ctx))), predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &expv1.MachinePool{}), )). Add(builder.Watches(mgr, &infrav1.DockerMachine{}, - handler.EnqueueRequestsFromObjectMap(dockerMachineToDockerMachinePool), + handler.EnqueueRequestsFromTypedMapFunc(dockerMachineToDockerMachinePool), predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &infrav1.DockerMachine{}), )). Add(builder.Watches(mgr, &clusterv1.Cluster{}, - handler.EnqueueRequestsFromObjectMap(clusterToDockerMachinePools), + handler.EnqueueRequestsFromTypedMapFunc(clusterToDockerMachinePools), predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &clusterv1.Cluster{}), predicates.ClusterUnpausedAndInfrastructureReady(ctrl.LoggerFrom(ctx)), )).Build(r) diff --git a/test/infrastructure/docker/internal/controllers/dockercluster_controller.go b/test/infrastructure/docker/internal/controllers/dockercluster_controller.go index 21b5251c2861..e4ae07663c58 100644 --- a/test/infrastructure/docker/internal/controllers/dockercluster_controller.go +++ b/test/infrastructure/docker/internal/controllers/dockercluster_controller.go @@ -199,6 +199,7 @@ func (r *DockerClusterReconciler) reconcileDelete(ctx context.Context, dockerClu // SetupWithManager will add watches for this controller. func (r *DockerClusterReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options) error { err := ctrl.NewControllerManagedBy(mgr). + Named("dockerCluster"). Add(builder.For(mgr, &infrav1.DockerCluster{}, predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &infrav1.DockerCluster{}), @@ -206,7 +207,7 @@ func (r *DockerClusterReconciler) SetupWithManager(ctx context.Context, mgr ctrl WithOptions(options). Add(builder.Watches(mgr, &clusterv1.Cluster{}, - handler.EnqueueRequestsFromObjectMap(util.ClusterToInfrastructureMapFunc(ctx, infrav1.GroupVersion.WithKind("DockerCluster"), mgr.GetClient(), &infrav1.DockerCluster{})), + handler.EnqueueRequestsFromTypedMapFunc(util.ClusterToInfrastructureMapFunc(ctx, infrav1.GroupVersion.WithKind("DockerCluster"), mgr.GetClient(), &infrav1.DockerCluster{})), predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &clusterv1.Cluster{}), predicates.ClusterUnpaused(ctrl.LoggerFrom(ctx)), )).Complete(r) diff --git a/test/infrastructure/docker/internal/controllers/dockermachine_controller.go b/test/infrastructure/docker/internal/controllers/dockermachine_controller.go index 18fe38ca3754..b7aefcb5897b 100644 --- a/test/infrastructure/docker/internal/controllers/dockermachine_controller.go +++ b/test/infrastructure/docker/internal/controllers/dockermachine_controller.go @@ -485,23 +485,24 @@ func (r *DockerMachineReconciler) SetupWithManager(ctx context.Context, mgr ctrl } err = ctrl.NewControllerManagedBy(mgr). + Named("dockerMachine"). Add(builder.For(mgr, &infrav1.DockerMachine{}, predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &infrav1.DockerMachine{}))). WithOptions(options). Add(builder.Watches(mgr, &clusterv1.Machine{}, - handler.EnqueueRequestsFromObjectMap(util.MachineToInfrastructureMapFunc(infrav1.GroupVersion.WithKind("DockerMachine"))), + handler.EnqueueRequestsFromTypedMapFunc(util.MachineToInfrastructureMapFunc(infrav1.GroupVersion.WithKind("DockerMachine"))), predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &clusterv1.Machine{}), )). Add(builder.Watches(mgr, &infrav1.DockerCluster{}, - handler.EnqueueRequestsFromObjectMap(r.DockerClusterToDockerMachines), + handler.EnqueueRequestsFromTypedMapFunc(r.DockerClusterToDockerMachines), predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &infrav1.DockerCluster{}), )). Add(builder.Watches(mgr, &clusterv1.Cluster{}, - handler.EnqueueRequestsFromObjectMap(clusterToDockerMachines), + handler.EnqueueRequestsFromTypedMapFunc(clusterToDockerMachines), predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &clusterv1.Cluster{}), predicates.ClusterUnpausedAndInfrastructureReady(ctrl.LoggerFrom(ctx)), predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &clusterv1.Cluster{}), diff --git a/test/infrastructure/inmemory/internal/controllers/inmemorycluster_controller.go b/test/infrastructure/inmemory/internal/controllers/inmemorycluster_controller.go index 307c3fb5e52e..0b5d77d44c24 100644 --- a/test/infrastructure/inmemory/internal/controllers/inmemorycluster_controller.go +++ b/test/infrastructure/inmemory/internal/controllers/inmemorycluster_controller.go @@ -209,13 +209,14 @@ func (r *InMemoryClusterReconciler) reconcileDelete(_ context.Context, cluster * // SetupWithManager will add watches for this controller. func (r *InMemoryClusterReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options) error { err := ctrl.NewControllerManagedBy(mgr). + Named("inMemoryCluster"). Add(builder.For(mgr, &infrav1.InMemoryCluster{}, predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &infrav1.InMemoryCluster{}))). WithOptions(options). Add(builder.Watches(mgr, &clusterv1.Cluster{}, - handler.EnqueueRequestsFromObjectMap(util.ClusterToInfrastructureMapFunc(ctx, infrav1.GroupVersion.WithKind("InMemoryCluster"), mgr.GetClient(), &infrav1.InMemoryCluster{})), + handler.EnqueueRequestsFromTypedMapFunc(util.ClusterToInfrastructureMapFunc(ctx, infrav1.GroupVersion.WithKind("InMemoryCluster"), mgr.GetClient(), &infrav1.InMemoryCluster{})), predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &clusterv1.Cluster{}), predicates.ClusterUnpaused(ctrl.LoggerFrom(ctx)), )).Complete(r) diff --git a/test/infrastructure/inmemory/internal/controllers/inmemorymachine_controller.go b/test/infrastructure/inmemory/internal/controllers/inmemorymachine_controller.go index 491734c9bb20..08d87b5cdfa1 100644 --- a/test/infrastructure/inmemory/internal/controllers/inmemorymachine_controller.go +++ b/test/infrastructure/inmemory/internal/controllers/inmemorymachine_controller.go @@ -1144,6 +1144,7 @@ func (r *InMemoryMachineReconciler) SetupWithManager(ctx context.Context, mgr ct } err = ctrl.NewControllerManagedBy(mgr). + Named("inMemoryMachine"). Add(builder.For(mgr, &infrav1.InMemoryMachine{}, predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &infrav1.InMemoryMachine{}), @@ -1151,17 +1152,17 @@ func (r *InMemoryMachineReconciler) SetupWithManager(ctx context.Context, mgr ct WithOptions(options). Add(builder.Watches(mgr, &clusterv1.Machine{}, - handler.EnqueueRequestsFromObjectMap(util.MachineToInfrastructureMapFunc(infrav1.GroupVersion.WithKind("InMemoryMachine"))), + handler.EnqueueRequestsFromTypedMapFunc(util.MachineToInfrastructureMapFunc(infrav1.GroupVersion.WithKind("InMemoryMachine"))), predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &clusterv1.Machine{}), )). Add(builder.Watches(mgr, &infrav1.InMemoryCluster{}, - handler.EnqueueRequestsFromObjectMap(r.InMemoryClusterToInMemoryMachines), + handler.EnqueueRequestsFromTypedMapFunc(r.InMemoryClusterToInMemoryMachines), predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &infrav1.InMemoryCluster{}), )). Add(builder.Watches(mgr, &clusterv1.Cluster{}, - handler.EnqueueRequestsFromObjectMap(clusterToInMemoryMachines), + handler.EnqueueRequestsFromTypedMapFunc(clusterToInMemoryMachines), predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &clusterv1.Cluster{}), predicates.ClusterUnpausedAndInfrastructureReady(ctrl.LoggerFrom(ctx)), )).Complete(r) diff --git a/util/predicates/cluster_predicates.go b/util/predicates/cluster_predicates.go index 06fd4fa7329a..06bfdc3481df 100644 --- a/util/predicates/cluster_predicates.go +++ b/util/predicates/cluster_predicates.go @@ -20,6 +20,7 @@ package predicates import ( "github.com/go-logr/logr" "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/predicate" clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" @@ -28,14 +29,14 @@ import ( // ClusterCreateInfraReady returns a predicate that returns true for a create event when a cluster has Status.InfrastructureReady set as true // it also returns true if the resource provided is not a Cluster to allow for use with controller-runtime NewControllerManagedBy. -func ClusterCreateInfraReady(logger logr.Logger) predicate.ObjectFuncs[*clusterv1.Cluster] { - return predicate.ObjectFuncs[*clusterv1.Cluster]{ - CreateFunc: func(c *clusterv1.Cluster) bool { +func ClusterCreateInfraReady(logger logr.Logger) predicate.TypedFuncs[*clusterv1.Cluster] { + return predicate.TypedFuncs[*clusterv1.Cluster]{ + CreateFunc: func(c event.TypedCreateEvent[*clusterv1.Cluster]) bool { log := logger.WithValues("predicate", "ClusterCreateInfraReady", "eventType", "create") - log = log.WithValues("Cluster", klog.KObj(c)) + log = log.WithValues("Cluster", klog.KObj(c.Object)) // Only need to trigger a reconcile if the Cluster.Status.InfrastructureReady is true - if c.Status.InfrastructureReady { + if c.Object.Status.InfrastructureReady { log.V(6).Info("Cluster infrastructure is ready, allowing further processing") return true } @@ -48,14 +49,14 @@ func ClusterCreateInfraReady(logger logr.Logger) predicate.ObjectFuncs[*clusterv // ClusterCreateNotPaused returns a predicate that returns true for a create event when a cluster has Spec.Paused set as false // it also returns true if the resource provided is not a Cluster to allow for use with controller-runtime NewControllerManagedBy. -func ClusterCreateNotPaused(logger logr.Logger) predicate.ObjectFuncs[*clusterv1.Cluster] { - return predicate.ObjectFuncs[*clusterv1.Cluster]{ - CreateFunc: func(c *clusterv1.Cluster) bool { +func ClusterCreateNotPaused(logger logr.Logger) predicate.TypedFuncs[*clusterv1.Cluster] { + return predicate.TypedFuncs[*clusterv1.Cluster]{ + CreateFunc: func(c event.TypedCreateEvent[*clusterv1.Cluster]) bool { log := logger.WithValues("predicate", "ClusterCreateNotPaused", "eventType", "create") - log = log.WithValues("Cluster", klog.KObj(c)) + log = log.WithValues("Cluster", klog.KObj(c.Object)) // Only need to trigger a reconcile if the Cluster.Spec.Paused is false - if !c.Spec.Paused { + if !c.Object.Spec.Paused { log.V(6).Info("Cluster is not paused, allowing further processing") return true } @@ -68,13 +69,13 @@ func ClusterCreateNotPaused(logger logr.Logger) predicate.ObjectFuncs[*clusterv1 // ClusterUpdateInfraReady returns a predicate that returns true for an update event when a cluster has Status.InfrastructureReady changed from false to true // it also returns true if the resource provided is not a Cluster to allow for use with controller-runtime NewControllerManagedBy. -func ClusterUpdateInfraReady(logger logr.Logger) predicate.ObjectFuncs[*clusterv1.Cluster] { - return predicate.ObjectFuncs[*clusterv1.Cluster]{ - UpdateFunc: func(oldCluster, newCluster *clusterv1.Cluster) bool { +func ClusterUpdateInfraReady(logger logr.Logger) predicate.TypedFuncs[*clusterv1.Cluster] { + return predicate.TypedFuncs[*clusterv1.Cluster]{ + UpdateFunc: func(u event.TypedUpdateEvent[*clusterv1.Cluster]) bool { log := logger.WithValues("predicate", "ClusterUpdateInfraReady", "eventType", "update") - log = log.WithValues("Cluster", klog.KObj(oldCluster)) + log = log.WithValues("Cluster", klog.KObj(u.ObjectOld)) - if !oldCluster.Status.InfrastructureReady && newCluster.Status.InfrastructureReady { + if !u.ObjectOld.Status.InfrastructureReady && u.ObjectNew.Status.InfrastructureReady { log.V(6).Info("Cluster infrastructure became ready, allowing further processing") return true } @@ -87,13 +88,13 @@ func ClusterUpdateInfraReady(logger logr.Logger) predicate.ObjectFuncs[*clusterv // ClusterUpdateUnpaused returns a predicate that returns true for an update event when a cluster has Spec.Paused changed from true to false // it also returns true if the resource provided is not a Cluster to allow for use with controller-runtime NewControllerManagedBy. -func ClusterUpdateUnpaused(logger logr.Logger) predicate.ObjectFuncs[*clusterv1.Cluster] { - return predicate.ObjectFuncs[*clusterv1.Cluster]{ - UpdateFunc: func(oldCluster, newCluster *clusterv1.Cluster) bool { +func ClusterUpdateUnpaused(logger logr.Logger) predicate.TypedFuncs[*clusterv1.Cluster] { + return predicate.TypedFuncs[*clusterv1.Cluster]{ + UpdateFunc: func(u event.TypedUpdateEvent[*clusterv1.Cluster]) bool { log := logger.WithValues("predicate", "ClusterUpdateUnpaused", "eventType", "update") - log = log.WithValues("Cluster", klog.KObj(oldCluster)) + log = log.WithValues("Cluster", klog.KObj(u.ObjectOld)) - if oldCluster.Spec.Paused && !newCluster.Spec.Paused { + if u.ObjectOld.Spec.Paused && !u.ObjectNew.Spec.Paused { log.V(4).Info("Cluster was unpaused, allowing further processing") return true } @@ -117,11 +118,11 @@ func ClusterUpdateUnpaused(logger logr.Logger) predicate.ObjectFuncs[*clusterv1. // handler.EnqueueRequestsFromMapFunc(clusterToMachines) // predicates.ClusterUnpaused(r.Log), // ) -func ClusterUnpaused(logger logr.Logger) predicate.ObjectPredicate[*clusterv1.Cluster] { +func ClusterUnpaused(logger logr.Logger) predicate.TypedPredicate[*clusterv1.Cluster] { log := logger.WithValues("predicate", "ClusterUnpaused") // Use any to ensure we process either create or update events we care about - return predicate.Any(ClusterCreateNotPaused(log), ClusterUpdateUnpaused(log)) + return predicate.Or(ClusterCreateNotPaused(log), ClusterUpdateUnpaused(log)) } // ClusterControlPlaneInitialized returns a Predicate that returns true on Update events @@ -133,14 +134,14 @@ func ClusterUnpaused(logger logr.Logger) predicate.ObjectPredicate[*clusterv1.Cl // handler.EnqueueRequestsFromMapFunc(clusterToMachines) // predicates.ClusterControlPlaneInitialized(r.Log), // ) -func ClusterControlPlaneInitialized(logger logr.Logger) predicate.ObjectPredicate[*clusterv1.Cluster] { - return predicate.ObjectFuncs[*clusterv1.Cluster]{ - UpdateFunc: func(oldCluster, newCluster *clusterv1.Cluster) bool { +func ClusterControlPlaneInitialized(logger logr.Logger) predicate.TypedPredicate[*clusterv1.Cluster] { + return predicate.TypedFuncs[*clusterv1.Cluster]{ + UpdateFunc: func(u event.TypedUpdateEvent[*clusterv1.Cluster]) bool { log := logger.WithValues("predicate", "ClusterControlPlaneInitialized", "eventType", "update") - log = log.WithValues("Cluster", klog.KObj(oldCluster)) + log = log.WithValues("Cluster", klog.KObj(u.ObjectOld)) - if !conditions.IsTrue(oldCluster, clusterv1.ControlPlaneInitializedCondition) && - conditions.IsTrue(newCluster, clusterv1.ControlPlaneInitializedCondition) { + if !conditions.IsTrue(u.ObjectOld, clusterv1.ControlPlaneInitializedCondition) && + conditions.IsTrue(u.ObjectNew, clusterv1.ControlPlaneInitializedCondition) { log.V(6).Info("Cluster ControlPlaneInitialized was set, allow further processing") return true } @@ -163,23 +164,23 @@ func ClusterControlPlaneInitialized(logger logr.Logger) predicate.ObjectPredicat // handler.EnqueueRequestsFromMapFunc(clusterToMachines) // predicates.ClusterUnpausedAndInfrastructureReady(r.Log), // ) -func ClusterUnpausedAndInfrastructureReady(logger logr.Logger) predicate.ObjectPredicate[*clusterv1.Cluster] { +func ClusterUnpausedAndInfrastructureReady(logger logr.Logger) predicate.TypedPredicate[*clusterv1.Cluster] { log := logger.WithValues("predicate", "ClusterUnpausedAndInfrastructureReady") // Only continue processing create events if both not paused and infrastructure is ready - createPredicates := predicate.All(ClusterCreateNotPaused(log), ClusterCreateInfraReady(log)) + createPredicates := predicate.And(ClusterCreateNotPaused(log), ClusterCreateInfraReady(log)) // Process update events if either Cluster is unpaused or infrastructure becomes ready - updatePredicates := predicate.Any(ClusterUpdateUnpaused(log), ClusterUpdateInfraReady(log)) + updatePredicates := predicate.And(ClusterUpdateUnpaused(log), ClusterUpdateInfraReady(log)) // Use any to ensure we process either create or update events we care about - return predicate.Any(createPredicates, updatePredicates) + return predicate.Or(createPredicates, updatePredicates) } // ClusterHasTopology returns a Predicate that returns true when cluster.Spec.Topology // is NOT nil and false otherwise. -func ClusterHasTopology(logger logr.Logger) predicate.ObjectPredicate[*clusterv1.Cluster] { - return predicate.NewObjectPredicateFuncs(processIfTopologyManaged(logger.WithValues("predicate", "ClusterHasTopology"))) +func ClusterHasTopology(logger logr.Logger) predicate.TypedPredicate[*clusterv1.Cluster] { + return predicate.NewTypedPredicateFuncs(processIfTopologyManaged(logger.WithValues("predicate", "ClusterHasTopology"))) } func processIfTopologyManaged(logger logr.Logger) func(*clusterv1.Cluster) bool { diff --git a/util/predicates/cluster_predicates_test.go b/util/predicates/cluster_predicates_test.go index e0919c20d2fb..be8111fc151e 100644 --- a/util/predicates/cluster_predicates_test.go +++ b/util/predicates/cluster_predicates_test.go @@ -21,6 +21,7 @@ import ( "github.com/go-logr/logr" . "github.com/onsi/gomega" + "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/log" clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" @@ -87,7 +88,10 @@ func TestClusterControlplaneInitializedPredicate(t *testing.T) { for i := range testcases { tc := testcases[i] t.Run(tc.name, func(*testing.T) { - g.Expect(predicate.OnUpdate(&tc.oldCluster, &tc.newCluster)).To(Equal(tc.expected)) + g.Expect(predicate.Update(event.TypedUpdateEvent[*clusterv1.Cluster]{ + ObjectOld: &tc.oldCluster, + ObjectNew: &tc.newCluster, + })).To(Equal(tc.expected)) }) } } diff --git a/util/predicates/generic_predicates.go b/util/predicates/generic_predicates.go index 121db8112940..c25a641279ad 100644 --- a/util/predicates/generic_predicates.go +++ b/util/predicates/generic_predicates.go @@ -130,8 +130,8 @@ func Any(logger logr.Logger, predicates ...predicate.Funcs) predicate.Funcs { // ResourceHasFilterLabel returns a predicate that returns true only if the provided resource contains // a label with the WatchLabel key and the configured label value exactly. -func ResourceHasFilterLabel[T client.Object](logger logr.Logger, labelValue string, _ T) predicate.ObjectFuncs[T] { - return predicate.NewObjectPredicateFuncs(processIfLabelMatch[T](logger.WithValues("predicate", "ResourceHasFilterLabel"), labelValue)) +func ResourceHasFilterLabel[T client.Object](logger logr.Logger, labelValue string, _ T) predicate.TypedPredicate[T] { + return predicate.NewTypedPredicateFuncs(processIfLabelMatch[T](logger.WithValues("predicate", "ResourceHasFilterLabel"), labelValue)) } // ResourceNotPaused returns a Predicate that returns true only if the provided resource does not contain the @@ -148,14 +148,14 @@ func ResourceHasFilterLabel[T client.Object](logger logr.Logger, labelValue stri // Build(r) // return err // } -func ResourceNotPaused[T client.Object](logger logr.Logger, _ T) predicate.ObjectFuncs[T] { - return predicate.NewObjectPredicateFuncs(processIfNotPaused[T](logger.WithValues("predicate", "ResourceNotPaused"))) +func ResourceNotPaused[T client.Object](logger logr.Logger, _ T) predicate.TypedPredicate[T] { + return predicate.NewTypedPredicateFuncs(processIfNotPaused[T](logger.WithValues("predicate", "ResourceNotPaused"))) } // ResourceNotPausedAndHasFilterLabel returns a predicate that returns true only if the // ResourceNotPaused and ResourceHasFilterLabel predicates return true. -func ResourceNotPausedAndHasFilterLabel[T client.Object](logger logr.Logger, labelValue string, o T) predicate.ObjectPredicate[T] { - return predicate.All(ResourceNotPaused(logger, o), ResourceHasFilterLabel(logger, labelValue, o)) +func ResourceNotPausedAndHasFilterLabel[T client.Object](logger logr.Logger, labelValue string, o T) predicate.TypedPredicate[T] { + return predicate.And(ResourceNotPaused(logger, o), ResourceHasFilterLabel(logger, labelValue, o)) } func processIfNotPaused[T client.Object](logger logr.Logger) func(T) bool { @@ -223,8 +223,8 @@ func processIfNotExternallyManaged(logger logr.Logger, obj client.Object) bool { // ResourceIsTopologyOwned returns a predicate that returns true only if the resource has // the `topology.cluster.x-k8s.io/owned` label. -func ResourceIsTopologyOwned[T client.Object](logger logr.Logger, _ T) predicate.ObjectFuncs[T] { - return predicate.NewObjectPredicateFuncs(processIfTopologyOwned[T](logger.WithValues("predicate", "ResourceIsTopologyOwned"))) +func ResourceIsTopologyOwned[T client.Object](logger logr.Logger, _ T) predicate.TypedPredicate[T] { + return predicate.NewTypedPredicateFuncs(processIfTopologyOwned[T](logger.WithValues("predicate", "ResourceIsTopologyOwned"))) } func processIfTopologyOwned[T client.Object](logger logr.Logger) func(T) bool { diff --git a/util/util.go b/util/util.go index a274e1fd36d1..38af938086be 100644 --- a/util/util.go +++ b/util/util.go @@ -197,7 +197,7 @@ func ObjectKey(object metav1.Object) client.ObjectKey { // ClusterToInfrastructureMapFunc returns a handler.ToRequestsFunc that watches for // Cluster events and returns reconciliation requests for an infrastructure provider object. -func ClusterToInfrastructureMapFunc(ctx context.Context, gvk schema.GroupVersionKind, c client.Client, providerCluster client.Object) handler.ObjectMapFunc[*clusterv1.Cluster] { +func ClusterToInfrastructureMapFunc(ctx context.Context, gvk schema.GroupVersionKind, c client.Client, providerCluster client.Object) handler.TypedMapFunc[*clusterv1.Cluster] { log := ctrl.LoggerFrom(ctx) return func(ctx context.Context, cluster *clusterv1.Cluster) []reconcile.Request { // Return early if the InfrastructureRef is nil. @@ -260,7 +260,7 @@ func GetMachineByName(ctx context.Context, c client.Client, namespace, name stri // MachineToInfrastructureMapFunc returns a handler.ToRequestsFunc that watches for // Machine events and returns reconciliation requests for an infrastructure provider object. -func MachineToInfrastructureMapFunc(gvk schema.GroupVersionKind) handler.ObjectMapFunc[*clusterv1.Machine] { +func MachineToInfrastructureMapFunc(gvk schema.GroupVersionKind) handler.TypedMapFunc[*clusterv1.Machine] { return func(_ context.Context, m *clusterv1.Machine) []reconcile.Request { gk := gvk.GroupKind() // Return early if the GroupKind doesn't match what we expect. @@ -463,7 +463,7 @@ func (k KubeAwareAPIVersions) Less(i, j int) bool { // Note: This function uses the passed in typed ObjectList and thus with the default client configuration all list calls // will be cached. // NB: The objects are required to have `clusterv1.ClusterNameLabel` applied. -func ClusterToTypedObjectsMapper(c client.Client, ro client.ObjectList, scheme *runtime.Scheme) (handler.ObjectMapFunc[*clusterv1.Cluster], error) { +func ClusterToTypedObjectsMapper(c client.Client, ro client.ObjectList, scheme *runtime.Scheme) (handler.TypedMapFunc[*clusterv1.Cluster], error) { gvk, err := apiutil.GVKForObject(ro, scheme) if err != nil { return nil, err @@ -522,7 +522,7 @@ func ClusterToTypedObjectsMapper(c client.Client, ro client.ObjectList, scheme * // MachineDeploymentToObjectsMapper returns a mapper function that gets a machinedeployment // and lists all objects for the object passed in and returns a list of requests. // NB: The objects are required to have `clusterv1.MachineDeploymentNameLabel` applied. -func MachineDeploymentToObjectsMapper(c client.Client, ro client.ObjectList, scheme *runtime.Scheme) (handler.ObjectMapFunc[*clusterv1.MachineDeployment], error) { +func MachineDeploymentToObjectsMapper(c client.Client, ro client.ObjectList, scheme *runtime.Scheme) (handler.TypedMapFunc[*clusterv1.MachineDeployment], error) { gvk, err := apiutil.GVKForObject(ro, scheme) if err != nil { return nil, err @@ -581,7 +581,7 @@ func MachineDeploymentToObjectsMapper(c client.Client, ro client.ObjectList, sch // MachineSetToObjectsMapper returns a mapper function that gets a machineset // and lists all objects for the object passed in and returns a list of requests. // NB: The objects are required to have `clusterv1.MachineSetNameLabel` applied. -func MachineSetToObjectsMapper(c client.Client, ro client.ObjectList, scheme *runtime.Scheme) (handler.ObjectMapFunc[*clusterv1.MachineSet], error) { +func MachineSetToObjectsMapper(c client.Client, ro client.ObjectList, scheme *runtime.Scheme) (handler.TypedMapFunc[*clusterv1.MachineSet], error) { gvk, err := apiutil.GVKForObject(ro, scheme) if err != nil { return nil, err