diff --git a/bootstrap/kubeadm/internal/controllers/kubeadmconfig_controller.go b/bootstrap/kubeadm/internal/controllers/kubeadmconfig_controller.go index f44295c25160..c2cb05411fd9 100644 --- a/bootstrap/kubeadm/internal/controllers/kubeadmconfig_controller.go +++ b/bootstrap/kubeadm/internal/controllers/kubeadmconfig_controller.go @@ -122,12 +122,14 @@ func (r *KubeadmConfigReconciler) SetupWithManager(ctx context.Context, mgr ctrl Watches( &clusterv1.Machine{}, handler.EnqueueRequestsFromMapFunc(r.MachineToBootstrapMapFunc), + builder.WithPredicates(predicates.ResourceIsChanged(mgr.GetScheme(), predicateLog)), ).WithEventFilter(predicates.ResourceHasFilterLabel(mgr.GetScheme(), predicateLog, r.WatchFilterValue)) if feature.Gates.Enabled(feature.MachinePool) { b = b.Watches( &expv1.MachinePool{}, handler.EnqueueRequestsFromMapFunc(r.MachinePoolToBootstrapMapFunc), + builder.WithPredicates(predicates.ResourceIsChanged(mgr.GetScheme(), predicateLog)), ) } @@ -136,6 +138,7 @@ func (r *KubeadmConfigReconciler) SetupWithManager(ctx context.Context, mgr ctrl handler.EnqueueRequestsFromMapFunc(r.ClusterToKubeadmConfigs), builder.WithPredicates( predicates.All(mgr.GetScheme(), predicateLog, + predicates.ResourceIsChanged(mgr.GetScheme(), predicateLog), predicates.ClusterPausedTransitionsOrInfrastructureReady(mgr.GetScheme(), predicateLog), predicates.ResourceHasFilterLabel(mgr.GetScheme(), predicateLog, r.WatchFilterValue), ), diff --git a/controlplane/kubeadm/internal/controllers/controller.go b/controlplane/kubeadm/internal/controllers/controller.go index 6fd32914525b..622023264805 100644 --- a/controlplane/kubeadm/internal/controllers/controller.go +++ b/controlplane/kubeadm/internal/controllers/controller.go @@ -115,7 +115,7 @@ func (r *KubeadmControlPlaneReconciler) SetupWithManager(ctx context.Context, mg predicateLog := ctrl.LoggerFrom(ctx).WithValues("controller", "kubeadmcontrolplane") c, err := ctrl.NewControllerManagedBy(mgr). For(&controlplanev1.KubeadmControlPlane{}). - Owns(&clusterv1.Machine{}). + Owns(&clusterv1.Machine{}, builder.WithPredicates(predicates.ResourceIsChanged(mgr.GetScheme(), predicateLog))). WithOptions(options). WithEventFilter(predicates.ResourceHasFilterLabel(mgr.GetScheme(), predicateLog, r.WatchFilterValue)). Watches( @@ -123,6 +123,7 @@ func (r *KubeadmControlPlaneReconciler) SetupWithManager(ctx context.Context, mg handler.EnqueueRequestsFromMapFunc(r.ClusterToKubeadmControlPlane), builder.WithPredicates( predicates.All(mgr.GetScheme(), predicateLog, + predicates.ResourceIsChanged(mgr.GetScheme(), predicateLog), predicates.ResourceHasFilterLabel(mgr.GetScheme(), predicateLog, r.WatchFilterValue), predicates.ClusterPausedTransitionsOrInfrastructureReady(mgr.GetScheme(), predicateLog), ), diff --git a/exp/addons/internal/controllers/clusterresourceset_controller.go b/exp/addons/internal/controllers/clusterresourceset_controller.go index fb9c6bb54072..6ca5bf3c7eaa 100644 --- a/exp/addons/internal/controllers/clusterresourceset_controller.go +++ b/exp/addons/internal/controllers/clusterresourceset_controller.go @@ -81,6 +81,7 @@ func (r *ClusterResourceSetReconciler) SetupWithManager(ctx context.Context, mgr Watches( &clusterv1.Cluster{}, handler.EnqueueRequestsFromMapFunc(r.clusterToClusterResourceSet), + builder.WithPredicates(predicates.ResourceIsChanged(mgr.GetScheme(), predicateLog)), ). WatchesRawSource(r.ClusterCache.GetClusterSource("clusterresourceset", r.clusterToClusterResourceSet)). WatchesMetadata( @@ -88,7 +89,12 @@ func (r *ClusterResourceSetReconciler) SetupWithManager(ctx context.Context, mgr handler.EnqueueRequestsFromMapFunc( resourceToClusterResourceSetFunc[client.Object](r.Client), ), - builder.WithPredicates(resourcepredicates.TypedResourceCreateOrUpdate[client.Object](predicateLog)), + builder.WithPredicates( + predicates.All(mgr.GetScheme(), predicateLog, + predicates.ResourceIsChanged(mgr.GetScheme(), predicateLog), + resourcepredicates.TypedResourceCreateOrUpdate[client.Object](predicateLog), + ), + ), ). WatchesRawSource(source.Kind( partialSecretCache, @@ -101,7 +107,10 @@ func (r *ClusterResourceSetReconciler) SetupWithManager(ctx context.Context, mgr handler.TypedEnqueueRequestsFromMapFunc( resourceToClusterResourceSetFunc[*metav1.PartialObjectMetadata](r.Client), ), - resourcepredicates.TypedResourceCreateOrUpdate[*metav1.PartialObjectMetadata](predicateLog), + predicates.TypedAll(mgr.GetScheme(), predicateLog, + predicates.TypedResourceIsChanged[*metav1.PartialObjectMetadata](mgr.GetScheme(), predicateLog), + resourcepredicates.TypedResourceCreateOrUpdate[*metav1.PartialObjectMetadata](predicateLog), + ), )). WithOptions(options). WithEventFilter(predicates.ResourceHasFilterLabel(mgr.GetScheme(), predicateLog, r.WatchFilterValue)). diff --git a/exp/addons/internal/controllers/clusterresourcesetbinding_controller.go b/exp/addons/internal/controllers/clusterresourcesetbinding_controller.go index 79b46d516264..bac1ab864436 100644 --- a/exp/addons/internal/controllers/clusterresourcesetbinding_controller.go +++ b/exp/addons/internal/controllers/clusterresourcesetbinding_controller.go @@ -23,6 +23,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/handler" @@ -58,6 +59,7 @@ func (r *ClusterResourceSetBindingReconciler) SetupWithManager(ctx context.Conte Watches( &clusterv1.Cluster{}, handler.EnqueueRequestsFromMapFunc(r.clusterToClusterResourceSetBinding), + builder.WithPredicates(predicates.ResourceIsChanged(mgr.GetScheme(), predicateLog)), ). WithOptions(options). WithEventFilter(predicates.ResourceNotPausedAndHasFilterLabel(mgr.GetScheme(), predicateLog, r.WatchFilterValue)). diff --git a/exp/internal/controllers/machinepool_controller.go b/exp/internal/controllers/machinepool_controller.go index 482e2b6a569f..39be4b39524e 100644 --- a/exp/internal/controllers/machinepool_controller.go +++ b/exp/internal/controllers/machinepool_controller.go @@ -21,6 +21,7 @@ import ( "fmt" "time" + "github.com/go-logr/logr" "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -29,12 +30,14 @@ import ( kerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" + "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" @@ -82,6 +85,8 @@ type MachinePoolReconciler struct { ssaCache ssa.Cache recorder record.EventRecorder externalTracker external.ObjectTracker + + predicateLog *logr.Logger } // scope holds the different objects that are read and used during the reconcile. @@ -104,7 +109,7 @@ func (r *MachinePoolReconciler) SetupWithManager(ctx context.Context, mgr ctrl.M return errors.New("Client, APIReader and ClusterCache must not be nil") } - predicateLog := ctrl.LoggerFrom(ctx).WithValues("controller", "machinepool") + r.predicateLog = ptr.To(ctrl.LoggerFrom(ctx).WithValues("controller", "machinepool")) clusterToMachinePools, err := util.ClusterToTypedObjectsMapper(mgr.GetClient(), &expv1.MachinePoolList{}, mgr.GetScheme()) if err != nil { return err @@ -113,15 +118,16 @@ func (r *MachinePoolReconciler) SetupWithManager(ctx context.Context, mgr ctrl.M c, err := ctrl.NewControllerManagedBy(mgr). For(&expv1.MachinePool{}). WithOptions(options). - WithEventFilter(predicates.ResourceHasFilterLabel(mgr.GetScheme(), predicateLog, r.WatchFilterValue)). + WithEventFilter(predicates.ResourceHasFilterLabel(mgr.GetScheme(), *r.predicateLog, r.WatchFilterValue)). Watches( &clusterv1.Cluster{}, handler.EnqueueRequestsFromMapFunc(clusterToMachinePools), // TODO: should this wait for Cluster.Status.InfrastructureReady similar to Infra Machine resources? builder.WithPredicates( - predicates.All(mgr.GetScheme(), predicateLog, - predicates.ClusterPausedTransitions(mgr.GetScheme(), predicateLog), - predicates.ResourceHasFilterLabel(mgr.GetScheme(), predicateLog, r.WatchFilterValue), + predicates.All(mgr.GetScheme(), *r.predicateLog, + predicates.ResourceIsChanged(mgr.GetScheme(), *r.predicateLog), + predicates.ClusterPausedTransitions(mgr.GetScheme(), *r.predicateLog), + predicates.ResourceHasFilterLabel(mgr.GetScheme(), *r.predicateLog, r.WatchFilterValue), ), ), ). @@ -137,7 +143,7 @@ func (r *MachinePoolReconciler) SetupWithManager(ctx context.Context, mgr ctrl.M Controller: c, Cache: mgr.GetCache(), Scheme: mgr.GetScheme(), - PredicateLogger: &predicateLog, + PredicateLogger: r.predicateLog, } r.ssaCache = ssa.NewCache() @@ -377,6 +383,7 @@ func (r *MachinePoolReconciler) watchClusterNodes(ctx context.Context, cluster * Watcher: r.controller, Kind: &corev1.Node{}, EventHandler: handler.EnqueueRequestsFromMapFunc(r.nodeToMachinePool), + Predicates: []predicate.TypedPredicate[client.Object]{predicates.TypedResourceIsChanged[client.Object](r.Client.Scheme(), *r.predicateLog)}, })) } diff --git a/exp/internal/controllers/machinepool_controller_phases.go b/exp/internal/controllers/machinepool_controller_phases.go index 8cdc238499d8..c8bb00928327 100644 --- a/exp/internal/controllers/machinepool_controller_phases.go +++ b/exp/internal/controllers/machinepool_controller_phases.go @@ -49,6 +49,7 @@ import ( "sigs.k8s.io/cluster-api/util/labels" "sigs.k8s.io/cluster-api/util/labels/format" "sigs.k8s.io/cluster-api/util/patch" + "sigs.k8s.io/cluster-api/util/predicates" ) func (r *MachinePoolReconciler) reconcilePhase(mp *expv1.MachinePool) { @@ -123,7 +124,7 @@ func (r *MachinePoolReconciler) reconcileExternal(ctx context.Context, m *expv1. } // Ensure we add a watch to the external object, if there isn't one already. - if err := r.externalTracker.Watch(log, obj, handler.EnqueueRequestForOwner(r.Client.Scheme(), r.Client.RESTMapper(), &expv1.MachinePool{})); err != nil { + if err := r.externalTracker.Watch(log, obj, handler.EnqueueRequestForOwner(r.Client.Scheme(), r.Client.RESTMapper(), &expv1.MachinePool{}), predicates.ResourceIsChanged(r.Client.Scheme(), *r.externalTracker.PredicateLogger)); err != nil { return external.ReconcileOutput{}, err } @@ -364,7 +365,7 @@ func (r *MachinePoolReconciler) reconcileMachines(ctx context.Context, s *scope, sampleInfraMachine.SetKind(infraMachineKind) // Add watcher for infraMachine, if there isn't one already. - if err := r.externalTracker.Watch(log, sampleInfraMachine, handler.EnqueueRequestsFromMapFunc(r.infraMachineToMachinePoolMapper)); err != nil { + if err := r.externalTracker.Watch(log, sampleInfraMachine, handler.EnqueueRequestsFromMapFunc(r.infraMachineToMachinePoolMapper), predicates.ResourceIsChanged(r.Client.Scheme(), *r.externalTracker.PredicateLogger)); err != nil { return err } diff --git a/exp/runtime/internal/controllers/extensionconfig_controller.go b/exp/runtime/internal/controllers/extensionconfig_controller.go index cbd344028c0f..63c762f1b267 100644 --- a/exp/runtime/internal/controllers/extensionconfig_controller.go +++ b/exp/runtime/internal/controllers/extensionconfig_controller.go @@ -81,6 +81,7 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt handler.TypedEnqueueRequestsFromMapFunc( r.secretToExtensionConfig, ), + predicates.TypedResourceIsChanged[*metav1.PartialObjectMetadata](mgr.GetScheme(), predicateLog), )). WithOptions(options). WithEventFilter(predicates.ResourceNotPausedAndHasFilterLabel(mgr.GetScheme(), predicateLog, r.WatchFilterValue)). diff --git a/internal/controllers/cluster/cluster_controller.go b/internal/controllers/cluster/cluster_controller.go index 21de8193b298..a0a9d55426f1 100644 --- a/internal/controllers/cluster/cluster_controller.go +++ b/internal/controllers/cluster/cluster_controller.go @@ -35,6 +35,7 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" "sigs.k8s.io/controller-runtime/pkg/controller" @@ -103,14 +104,17 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt Watches( &clusterv1.Machine{}, handler.EnqueueRequestsFromMapFunc(r.controlPlaneMachineToCluster), + builder.WithPredicates(predicates.ResourceIsChanged(mgr.GetScheme(), predicateLog)), ). Watches( &clusterv1.MachineDeployment{}, handler.EnqueueRequestsFromMapFunc(r.machineDeploymentToCluster), + builder.WithPredicates(predicates.ResourceIsChanged(mgr.GetScheme(), predicateLog)), ). Watches( &expv1.MachinePool{}, handler.EnqueueRequestsFromMapFunc(r.machinePoolToCluster), + builder.WithPredicates(predicates.ResourceIsChanged(mgr.GetScheme(), predicateLog)), ). WithOptions(options). WithEventFilter(predicates.ResourceHasFilterLabel(mgr.GetScheme(), predicateLog, r.WatchFilterValue)). diff --git a/internal/controllers/cluster/cluster_controller_phases.go b/internal/controllers/cluster/cluster_controller_phases.go index 11fa91eaab16..525e9621b340 100644 --- a/internal/controllers/cluster/cluster_controller_phases.go +++ b/internal/controllers/cluster/cluster_controller_phases.go @@ -39,6 +39,7 @@ import ( utilconversion "sigs.k8s.io/cluster-api/util/conversion" "sigs.k8s.io/cluster-api/util/kubeconfig" "sigs.k8s.io/cluster-api/util/patch" + "sigs.k8s.io/cluster-api/util/predicates" "sigs.k8s.io/cluster-api/util/secret" ) @@ -96,7 +97,7 @@ func (r *Reconciler) reconcileExternal(ctx context.Context, cluster *clusterv1.C } // Ensure we add a watcher to the external object. - if err := r.externalTracker.Watch(log, obj, handler.EnqueueRequestForOwner(r.Client.Scheme(), r.Client.RESTMapper(), &clusterv1.Cluster{})); err != nil { + if err := r.externalTracker.Watch(log, obj, handler.EnqueueRequestForOwner(r.Client.Scheme(), r.Client.RESTMapper(), &clusterv1.Cluster{}), predicates.ResourceIsChanged(r.Client.Scheme(), *r.externalTracker.PredicateLogger)); err != nil { return nil, err } diff --git a/internal/controllers/clusterclass/clusterclass_controller.go b/internal/controllers/clusterclass/clusterclass_controller.go index 64d0d5884c29..e54be4e6b262 100644 --- a/internal/controllers/clusterclass/clusterclass_controller.go +++ b/internal/controllers/clusterclass/clusterclass_controller.go @@ -34,6 +34,7 @@ import ( "k8s.io/apimachinery/pkg/util/validation/field" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" @@ -90,6 +91,7 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt Watches( &runtimev1.ExtensionConfig{}, handler.EnqueueRequestsFromMapFunc(r.extensionConfigToClusterClass), + builder.WithPredicates(predicates.ResourceIsChanged(mgr.GetScheme(), predicateLog)), ). WithEventFilter(predicates.ResourceHasFilterLabel(mgr.GetScheme(), predicateLog, r.WatchFilterValue)). Complete(r) diff --git a/internal/controllers/machine/machine_controller.go b/internal/controllers/machine/machine_controller.go index fe98ab3436dd..249eef45b259 100644 --- a/internal/controllers/machine/machine_controller.go +++ b/internal/controllers/machine/machine_controller.go @@ -23,6 +23,7 @@ import ( "strings" "time" + "github.com/go-logr/logr" "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" @@ -42,6 +43,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" @@ -111,6 +113,8 @@ type Reconciler struct { // specific time for a specific Request. This is used to implement rate-limiting to avoid // e.g. spamming workload clusters with eviction requests during Node drain. reconcileDeleteCache cache.Cache[cache.ReconcileEntry] + + predicateLog *logr.Logger } func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options) error { @@ -122,7 +126,7 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt return errors.New("Client, APIReader and ClusterCache must not be nil and RemoteConditionsGracePeriod must not be < 2m") } - predicateLog := ctrl.LoggerFrom(ctx).WithValues("controller", "machine") + r.predicateLog = ptr.To(ctrl.LoggerFrom(ctx).WithValues("controller", "machine")) clusterToMachines, err := util.ClusterToTypedObjectsMapper(mgr.GetClient(), &clusterv1.MachineList{}, mgr.GetScheme()) if err != nil { return err @@ -139,29 +143,31 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt if r.nodeDeletionRetryTimeout.Nanoseconds() == 0 { r.nodeDeletionRetryTimeout = 10 * time.Second } - c, err := ctrl.NewControllerManagedBy(mgr). For(&clusterv1.Machine{}). WithOptions(options). - WithEventFilter(predicates.ResourceHasFilterLabel(mgr.GetScheme(), predicateLog, r.WatchFilterValue)). + WithEventFilter(predicates.ResourceHasFilterLabel(mgr.GetScheme(), *r.predicateLog, r.WatchFilterValue)). Watches( &clusterv1.Cluster{}, handler.EnqueueRequestsFromMapFunc(clusterToMachines), builder.WithPredicates( // TODO: should this wait for Cluster.Status.InfrastructureReady similar to Infra Machine resources? - predicates.All(mgr.GetScheme(), predicateLog, - predicates.ClusterControlPlaneInitialized(mgr.GetScheme(), predicateLog), - predicates.ResourceHasFilterLabel(mgr.GetScheme(), predicateLog, r.WatchFilterValue), + predicates.All(mgr.GetScheme(), *r.predicateLog, + predicates.ResourceIsChanged(mgr.GetScheme(), *r.predicateLog), + predicates.ClusterControlPlaneInitialized(mgr.GetScheme(), *r.predicateLog), + predicates.ResourceHasFilterLabel(mgr.GetScheme(), *r.predicateLog, r.WatchFilterValue), ), )). WatchesRawSource(r.ClusterCache.GetClusterSource("machine", clusterToMachines, clustercache.WatchForProbeFailure(r.RemoteConditionsGracePeriod))). Watches( &clusterv1.MachineSet{}, handler.EnqueueRequestsFromMapFunc(msToMachines), + builder.WithPredicates(predicates.ResourceIsChanged(mgr.GetScheme(), *r.predicateLog)), ). Watches( &clusterv1.MachineDeployment{}, handler.EnqueueRequestsFromMapFunc(mdToMachines), + builder.WithPredicates(predicates.ResourceIsChanged(mgr.GetScheme(), *r.predicateLog)), ). Build(r) if err != nil { @@ -174,7 +180,7 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt Controller: c, Cache: mgr.GetCache(), Scheme: mgr.GetScheme(), - PredicateLogger: &predicateLog, + PredicateLogger: r.predicateLog, } r.ssaCache = ssa.NewCache() r.reconcileDeleteCache = cache.New[cache.ReconcileEntry]() @@ -1081,6 +1087,7 @@ func (r *Reconciler) watchClusterNodes(ctx context.Context, cluster *clusterv1.C Watcher: r.controller, Kind: &corev1.Node{}, EventHandler: handler.EnqueueRequestsFromMapFunc(r.nodeToMachine), + Predicates: []predicate.TypedPredicate[client.Object]{predicates.TypedResourceIsChanged[client.Object](r.Client.Scheme(), *r.predicateLog)}, })) } diff --git a/internal/controllers/machine/machine_controller_phases.go b/internal/controllers/machine/machine_controller_phases.go index 8289c766d141..c9f71f7769b0 100644 --- a/internal/controllers/machine/machine_controller_phases.go +++ b/internal/controllers/machine/machine_controller_phases.go @@ -40,6 +40,7 @@ import ( "sigs.k8s.io/cluster-api/util/conditions" utilconversion "sigs.k8s.io/cluster-api/util/conversion" "sigs.k8s.io/cluster-api/util/patch" + "sigs.k8s.io/cluster-api/util/predicates" ) var externalReadyWait = 30 * time.Second @@ -89,7 +90,7 @@ func (r *Reconciler) ensureExternalOwnershipAndWatch(ctx context.Context, cluste } // Ensure we add a watch to the external object, if there isn't one already. - if err := r.externalTracker.Watch(log, obj, handler.EnqueueRequestForOwner(r.Client.Scheme(), r.Client.RESTMapper(), &clusterv1.Machine{})); err != nil { + if err := r.externalTracker.Watch(log, obj, handler.EnqueueRequestForOwner(r.Client.Scheme(), r.Client.RESTMapper(), &clusterv1.Machine{}), predicates.ResourceIsChanged(r.Client.Scheme(), *r.externalTracker.PredicateLogger)); err != nil { return nil, err } diff --git a/internal/controllers/machinedeployment/machinedeployment_controller.go b/internal/controllers/machinedeployment/machinedeployment_controller.go index 062c6d1e8213..ec92f246761a 100644 --- a/internal/controllers/machinedeployment/machinedeployment_controller.go +++ b/internal/controllers/machinedeployment/machinedeployment_controller.go @@ -91,20 +91,22 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt err = ctrl.NewControllerManagedBy(mgr). For(&clusterv1.MachineDeployment{}). - Owns(&clusterv1.MachineSet{}). + Owns(&clusterv1.MachineSet{}, builder.WithPredicates(predicates.ResourceIsChanged(mgr.GetScheme(), predicateLog))). // Watches enqueues MachineDeployment for corresponding MachineSet resources, if no managed controller reference (owner) exists. Watches( &clusterv1.MachineSet{}, handler.EnqueueRequestsFromMapFunc(r.MachineSetToDeployments), + builder.WithPredicates(predicates.ResourceIsChanged(mgr.GetScheme(), predicateLog)), ). WithOptions(options). WithEventFilter(predicates.ResourceHasFilterLabel(mgr.GetScheme(), predicateLog, r.WatchFilterValue)). Watches( &clusterv1.Cluster{}, handler.EnqueueRequestsFromMapFunc(clusterToMachineDeployments), - builder.WithPredicates( + builder.WithPredicates(predicates.All(mgr.GetScheme(), predicateLog, + predicates.ResourceIsChanged(mgr.GetScheme(), predicateLog), predicates.ClusterPausedTransitions(mgr.GetScheme(), predicateLog), - ), + )), // TODO: should this wait for Cluster.Status.InfrastructureReady similar to Infra Machine resources? ).Complete(r) if err != nil { diff --git a/internal/controllers/machinehealthcheck/machinehealthcheck_controller.go b/internal/controllers/machinehealthcheck/machinehealthcheck_controller.go index 35a64d74e7e1..fefdeb86860d 100644 --- a/internal/controllers/machinehealthcheck/machinehealthcheck_controller.go +++ b/internal/controllers/machinehealthcheck/machinehealthcheck_controller.go @@ -35,11 +35,13 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" + "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" @@ -84,6 +86,8 @@ type Reconciler struct { controller controller.Controller recorder record.EventRecorder + + predicateLog *logr.Logger } func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options) error { @@ -91,23 +95,25 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt return errors.New("Client and ClusterCache must not be nil") } - predicateLog := ctrl.LoggerFrom(ctx).WithValues("controller", "machinehealthcheck") + r.predicateLog = ptr.To(ctrl.LoggerFrom(ctx).WithValues("controller", "machinehealthcheck")) c, err := ctrl.NewControllerManagedBy(mgr). For(&clusterv1.MachineHealthCheck{}). Watches( &clusterv1.Machine{}, handler.EnqueueRequestsFromMapFunc(r.machineToMachineHealthCheck), + builder.WithPredicates(predicates.ResourceIsChanged(mgr.GetScheme(), *r.predicateLog)), ). WithOptions(options). - WithEventFilter(predicates.ResourceHasFilterLabel(mgr.GetScheme(), predicateLog, r.WatchFilterValue)). + WithEventFilter(predicates.ResourceHasFilterLabel(mgr.GetScheme(), *r.predicateLog, r.WatchFilterValue)). Watches( &clusterv1.Cluster{}, handler.EnqueueRequestsFromMapFunc(r.clusterToMachineHealthCheck), builder.WithPredicates( // TODO: should this wait for Cluster.Status.InfrastructureReady similar to Infra Machine resources? - predicates.All(mgr.GetScheme(), predicateLog, - predicates.ClusterPausedTransitions(mgr.GetScheme(), predicateLog), - predicates.ResourceHasFilterLabel(mgr.GetScheme(), predicateLog, r.WatchFilterValue), + predicates.All(mgr.GetScheme(), *r.predicateLog, + predicates.ResourceIsChanged(mgr.GetScheme(), *r.predicateLog), + predicates.ClusterPausedTransitions(mgr.GetScheme(), *r.predicateLog), + predicates.ResourceHasFilterLabel(mgr.GetScheme(), *r.predicateLog, r.WatchFilterValue), ), ), ). @@ -617,6 +623,7 @@ func (r *Reconciler) watchClusterNodes(ctx context.Context, cluster *clusterv1.C Watcher: r.controller, Kind: &corev1.Node{}, EventHandler: handler.EnqueueRequestsFromMapFunc(r.nodeToMachineHealthCheck), + Predicates: []predicate.TypedPredicate[client.Object]{predicates.TypedResourceIsChanged[client.Object](r.Client.Scheme(), *r.predicateLog)}, })) } diff --git a/internal/controllers/machineset/machineset_controller.go b/internal/controllers/machineset/machineset_controller.go index e46833eb77d5..4ee0f88af938 100644 --- a/internal/controllers/machineset/machineset_controller.go +++ b/internal/controllers/machineset/machineset_controller.go @@ -121,15 +121,17 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt err = ctrl.NewControllerManagedBy(mgr). For(&clusterv1.MachineSet{}). - Owns(&clusterv1.Machine{}). + Owns(&clusterv1.Machine{}, builder.WithPredicates(predicates.ResourceIsChanged(mgr.GetScheme(), predicateLog))). // Watches enqueues MachineSet for corresponding Machine resources, if no managed controller reference (owner) exists. Watches( &clusterv1.Machine{}, handler.EnqueueRequestsFromMapFunc(r.MachineToMachineSets), + builder.WithPredicates(predicates.ResourceIsChanged(mgr.GetScheme(), predicateLog)), ). Watches( &clusterv1.MachineDeployment{}, handler.EnqueueRequestsFromMapFunc(mdToMachineSets), + builder.WithPredicates(predicates.ResourceIsChanged(mgr.GetScheme(), predicateLog)), ). WithOptions(options). WithEventFilter(predicates.ResourceHasFilterLabel(mgr.GetScheme(), predicateLog, r.WatchFilterValue)). @@ -139,6 +141,7 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt builder.WithPredicates( // TODO: should this wait for Cluster.Status.InfrastructureReady similar to Infra Machine resources? predicates.All(mgr.GetScheme(), predicateLog, + predicates.ResourceIsChanged(mgr.GetScheme(), predicateLog), predicates.ClusterPausedTransitions(mgr.GetScheme(), predicateLog), predicates.ResourceHasFilterLabel(mgr.GetScheme(), predicateLog, r.WatchFilterValue), ), diff --git a/internal/controllers/topology/cluster/cluster_controller.go b/internal/controllers/topology/cluster/cluster_controller.go index ef8faedfc244..0bc91eb609c0 100644 --- a/internal/controllers/topology/cluster/cluster_controller.go +++ b/internal/controllers/topology/cluster/cluster_controller.go @@ -89,8 +89,6 @@ type Reconciler struct { desiredStateGenerator desiredstate.Generator patchHelperFactory structuredmerge.PatchHelperFactoryFunc - - predicateLog logr.Logger } func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options) error { @@ -102,11 +100,11 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt return errors.New("RuntimeClient must not be nil") } - r.predicateLog = ctrl.LoggerFrom(ctx).WithValues("controller", "topology/cluster") + predicateLog := ctrl.LoggerFrom(ctx).WithValues("controller", "topology/cluster") c, err := ctrl.NewControllerManagedBy(mgr). For(&clusterv1.Cluster{}, builder.WithPredicates( // Only reconcile Cluster with topology. - predicates.ClusterHasTopology(mgr.GetScheme(), r.predicateLog), + predicates.ClusterHasTopology(mgr.GetScheme(), predicateLog), )). Named("topology/cluster"). WatchesRawSource(r.ClusterCache.GetClusterSource("topology/cluster", func(_ context.Context, o client.Object) []ctrl.Request { @@ -115,21 +113,28 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt Watches( &clusterv1.ClusterClass{}, handler.EnqueueRequestsFromMapFunc(r.clusterClassToCluster), + builder.WithPredicates(predicates.ResourceIsChanged(mgr.GetScheme(), predicateLog)), ). Watches( &clusterv1.MachineDeployment{}, handler.EnqueueRequestsFromMapFunc(r.machineDeploymentToCluster), // Only trigger Cluster reconciliation if the MachineDeployment is topology owned. - builder.WithPredicates(predicates.ResourceIsTopologyOwned(mgr.GetScheme(), r.predicateLog)), + builder.WithPredicates(predicates.All(mgr.GetScheme(), predicateLog, + predicates.ResourceIsChanged(mgr.GetScheme(), predicateLog), + predicates.ResourceIsTopologyOwned(mgr.GetScheme(), predicateLog), + )), ). Watches( &expv1.MachinePool{}, handler.EnqueueRequestsFromMapFunc(r.machinePoolToCluster), // Only trigger Cluster reconciliation if the MachinePool is topology owned. - builder.WithPredicates(predicates.ResourceIsTopologyOwned(mgr.GetScheme(), r.predicateLog)), + builder.WithPredicates(predicates.All(mgr.GetScheme(), predicateLog, + predicates.ResourceIsChanged(mgr.GetScheme(), predicateLog), + predicates.ResourceIsTopologyOwned(mgr.GetScheme(), predicateLog), + )), ). WithOptions(options). - WithEventFilter(predicates.ResourceHasFilterLabel(mgr.GetScheme(), r.predicateLog, r.WatchFilterValue)). + WithEventFilter(predicates.ResourceHasFilterLabel(mgr.GetScheme(), predicateLog, r.WatchFilterValue)). Build(r) if err != nil { @@ -140,7 +145,7 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt Controller: c, Cache: mgr.GetCache(), Scheme: mgr.GetScheme(), - PredicateLogger: &r.predicateLog, + PredicateLogger: &predicateLog, } r.desiredStateGenerator = desiredstate.NewGenerator(r.Client, r.ClusterCache, r.RuntimeClient) r.recorder = mgr.GetEventRecorderFor("topology/cluster-controller") @@ -324,7 +329,10 @@ func (r *Reconciler) setupDynamicWatches(ctx context.Context, s *scope.Scope) er if err := r.externalTracker.Watch(ctrl.LoggerFrom(ctx), s.Current.InfrastructureCluster, handler.EnqueueRequestForOwner(scheme, r.Client.RESTMapper(), &clusterv1.Cluster{}), // Only trigger Cluster reconciliation if the InfrastructureCluster is topology owned. - predicates.ResourceIsTopologyOwned(scheme, r.predicateLog)); err != nil { + predicates.All(scheme, *r.externalTracker.PredicateLogger, + predicates.ResourceIsChanged(scheme, *r.externalTracker.PredicateLogger), + predicates.ResourceIsTopologyOwned(scheme, *r.externalTracker.PredicateLogger), + )); err != nil { return errors.Wrap(err, "error watching Infrastructure CR") } } @@ -332,7 +340,10 @@ func (r *Reconciler) setupDynamicWatches(ctx context.Context, s *scope.Scope) er if err := r.externalTracker.Watch(ctrl.LoggerFrom(ctx), s.Current.ControlPlane.Object, handler.EnqueueRequestForOwner(scheme, r.Client.RESTMapper(), &clusterv1.Cluster{}), // Only trigger Cluster reconciliation if the ControlPlane is topology owned. - predicates.ResourceIsTopologyOwned(scheme, r.predicateLog)); err != nil { + predicates.All(scheme, *r.externalTracker.PredicateLogger, + predicates.ResourceIsChanged(scheme, *r.externalTracker.PredicateLogger), + predicates.ResourceIsTopologyOwned(scheme, *r.externalTracker.PredicateLogger), + )); err != nil { return errors.Wrap(err, "error watching ControlPlane CR") } } diff --git a/internal/controllers/topology/machinedeployment/machinedeployment_controller.go b/internal/controllers/topology/machinedeployment/machinedeployment_controller.go index b1a8e27ed564..4c355efc9faf 100644 --- a/internal/controllers/topology/machinedeployment/machinedeployment_controller.go +++ b/internal/controllers/topology/machinedeployment/machinedeployment_controller.go @@ -85,6 +85,7 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt handler.EnqueueRequestsFromMapFunc(clusterToMachineDeployments), builder.WithPredicates( predicates.All(mgr.GetScheme(), predicateLog, + predicates.ResourceIsChanged(mgr.GetScheme(), predicateLog), predicates.ClusterUnpaused(mgr.GetScheme(), predicateLog), predicates.ClusterHasTopology(mgr.GetScheme(), predicateLog), ), diff --git a/internal/controllers/topology/machineset/machineset_controller.go b/internal/controllers/topology/machineset/machineset_controller.go index e2b119477ec0..aa2b52132aac 100644 --- a/internal/controllers/topology/machineset/machineset_controller.go +++ b/internal/controllers/topology/machineset/machineset_controller.go @@ -87,6 +87,7 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt handler.EnqueueRequestsFromMapFunc(clusterToMachineSets), builder.WithPredicates( predicates.All(mgr.GetScheme(), predicateLog, + predicates.ResourceIsChanged(mgr.GetScheme(), predicateLog), predicates.ClusterUnpaused(mgr.GetScheme(), predicateLog), predicates.ClusterHasTopology(mgr.GetScheme(), predicateLog), ), diff --git a/test/infrastructure/docker/exp/internal/controllers/dockermachinepool_controller.go b/test/infrastructure/docker/exp/internal/controllers/dockermachinepool_controller.go index f019c91d767c..0d27d953cf24 100644 --- a/test/infrastructure/docker/exp/internal/controllers/dockermachinepool_controller.go +++ b/test/infrastructure/docker/exp/internal/controllers/dockermachinepool_controller.go @@ -173,18 +173,21 @@ func (r *DockerMachinePoolReconciler) SetupWithManager(ctx context.Context, mgr &expv1.MachinePool{}, handler.EnqueueRequestsFromMapFunc(utilexp.MachinePoolToInfrastructureMapFunc(ctx, infraexpv1.GroupVersion.WithKind("DockerMachinePool"))), + builder.WithPredicates(predicates.ResourceIsChanged(mgr.GetScheme(), predicateLog)), ). Watches( &infrav1.DockerMachine{}, handler.EnqueueRequestsFromMapFunc(dockerMachineToDockerMachinePool), + builder.WithPredicates(predicates.ResourceIsChanged(mgr.GetScheme(), predicateLog)), ). Watches( &clusterv1.Cluster{}, handler.EnqueueRequestsFromMapFunc(clusterToDockerMachinePools), - builder.WithPredicates( + builder.WithPredicates(predicates.All(mgr.GetScheme(), predicateLog, + predicates.ResourceIsChanged(mgr.GetScheme(), predicateLog), //nolint:staticcheck // This usage will be removed when adding v1beta2 status and implementing the Paused condition. predicates.ClusterUnpausedAndInfrastructureReady(mgr.GetScheme(), predicateLog), - ), + )), ).Build(r) if err != nil { return errors.Wrap(err, "failed setting up with a controller manager") diff --git a/test/infrastructure/docker/internal/controllers/dockercluster_controller.go b/test/infrastructure/docker/internal/controllers/dockercluster_controller.go index 6eba1c266cb1..7a2a816da346 100644 --- a/test/infrastructure/docker/internal/controllers/dockercluster_controller.go +++ b/test/infrastructure/docker/internal/controllers/dockercluster_controller.go @@ -213,9 +213,10 @@ func (r *DockerClusterReconciler) SetupWithManager(ctx context.Context, mgr ctrl Watches( &clusterv1.Cluster{}, handler.EnqueueRequestsFromMapFunc(util.ClusterToInfrastructureMapFunc(ctx, infrav1.GroupVersion.WithKind("DockerCluster"), mgr.GetClient(), &infrav1.DockerCluster{})), - builder.WithPredicates( + builder.WithPredicates(predicates.All(mgr.GetScheme(), predicateLog, + predicates.ResourceIsChanged(mgr.GetScheme(), predicateLog), predicates.ClusterPausedTransitions(mgr.GetScheme(), predicateLog), - ), + )), ).Complete(r) if err != nil { return errors.Wrap(err, "failed setting up with a controller manager") diff --git a/test/infrastructure/docker/internal/controllers/dockermachine_controller.go b/test/infrastructure/docker/internal/controllers/dockermachine_controller.go index 50e420c420c3..03de44628a2a 100644 --- a/test/infrastructure/docker/internal/controllers/dockermachine_controller.go +++ b/test/infrastructure/docker/internal/controllers/dockermachine_controller.go @@ -527,17 +527,20 @@ func (r *DockerMachineReconciler) SetupWithManager(ctx context.Context, mgr ctrl Watches( &clusterv1.Machine{}, handler.EnqueueRequestsFromMapFunc(util.MachineToInfrastructureMapFunc(infrav1.GroupVersion.WithKind("DockerMachine"))), + builder.WithPredicates(predicates.ResourceIsChanged(mgr.GetScheme(), predicateLog)), ). Watches( &infrav1.DockerCluster{}, handler.EnqueueRequestsFromMapFunc(r.DockerClusterToDockerMachines), + builder.WithPredicates(predicates.ResourceIsChanged(mgr.GetScheme(), predicateLog)), ). Watches( &clusterv1.Cluster{}, handler.EnqueueRequestsFromMapFunc(clusterToDockerMachines), - builder.WithPredicates( + builder.WithPredicates(predicates.All(mgr.GetScheme(), predicateLog, + predicates.ResourceIsChanged(mgr.GetScheme(), predicateLog), predicates.ClusterPausedTransitionsOrInfrastructureReady(mgr.GetScheme(), predicateLog), - ), + )), ). WatchesRawSource(r.ClusterCache.GetClusterSource("dockermachine", clusterToDockerMachines)). Complete(r) diff --git a/test/infrastructure/inmemory/internal/controllers/inmemorycluster_controller.go b/test/infrastructure/inmemory/internal/controllers/inmemorycluster_controller.go index 045c3bbd69c0..ea5eb85f672d 100644 --- a/test/infrastructure/inmemory/internal/controllers/inmemorycluster_controller.go +++ b/test/infrastructure/inmemory/internal/controllers/inmemorycluster_controller.go @@ -250,9 +250,10 @@ func (r *InMemoryClusterReconciler) SetupWithManager(ctx context.Context, mgr ct Watches( &clusterv1.Cluster{}, handler.EnqueueRequestsFromMapFunc(util.ClusterToInfrastructureMapFunc(ctx, infrav1.GroupVersion.WithKind("InMemoryCluster"), mgr.GetClient(), &infrav1.InMemoryCluster{})), - builder.WithPredicates( + builder.WithPredicates(predicates.All(mgr.GetScheme(), predicateLog, + predicates.ResourceIsChanged(mgr.GetScheme(), predicateLog), predicates.ClusterPausedTransitions(mgr.GetScheme(), predicateLog), - ), + )), ).Complete(r) if err != nil { return errors.Wrap(err, "failed setting up with a controller manager") diff --git a/test/infrastructure/inmemory/internal/controllers/inmemorymachine_controller.go b/test/infrastructure/inmemory/internal/controllers/inmemorymachine_controller.go index e737b9d0d1ca..4063df788dc5 100644 --- a/test/infrastructure/inmemory/internal/controllers/inmemorymachine_controller.go +++ b/test/infrastructure/inmemory/internal/controllers/inmemorymachine_controller.go @@ -1172,17 +1172,20 @@ func (r *InMemoryMachineReconciler) SetupWithManager(ctx context.Context, mgr ct Watches( &clusterv1.Machine{}, handler.EnqueueRequestsFromMapFunc(util.MachineToInfrastructureMapFunc(infrav1.GroupVersion.WithKind("InMemoryMachine"))), + builder.WithPredicates(predicates.ResourceIsChanged(mgr.GetScheme(), predicateLog)), ). Watches( &infrav1.InMemoryCluster{}, handler.EnqueueRequestsFromMapFunc(r.InMemoryClusterToInMemoryMachines), + builder.WithPredicates(predicates.ResourceIsChanged(mgr.GetScheme(), predicateLog)), ). Watches( &clusterv1.Cluster{}, handler.EnqueueRequestsFromMapFunc(clusterToInMemoryMachines), - builder.WithPredicates( + builder.WithPredicates(predicates.All(mgr.GetScheme(), predicateLog, + predicates.ResourceIsChanged(mgr.GetScheme(), predicateLog), predicates.ClusterPausedTransitionsOrInfrastructureReady(mgr.GetScheme(), predicateLog), - ), + )), ).Complete(r) if err != nil { return errors.Wrap(err, "failed setting up with a controller manager") diff --git a/util/predicates/generic_predicates.go b/util/predicates/generic_predicates.go index fe170f846f74..84d8071ebe85 100644 --- a/util/predicates/generic_predicates.go +++ b/util/predicates/generic_predicates.go @@ -31,8 +31,13 @@ import ( // All returns a predicate that returns true only if all given predicates return true. func All(scheme *runtime.Scheme, logger logr.Logger, predicates ...predicate.Funcs) predicate.Funcs { - return predicate.Funcs{ - UpdateFunc: func(e event.UpdateEvent) bool { + return TypedAll(scheme, logger, predicates...) +} + +// TypedAll returns a predicate that returns true only if all given predicates return true. +func TypedAll[T client.Object](scheme *runtime.Scheme, logger logr.Logger, predicates ...predicate.TypedFuncs[T]) predicate.TypedFuncs[T] { + return predicate.TypedFuncs[T]{ + UpdateFunc: func(e event.TypedUpdateEvent[T]) bool { log := logger.WithValues("predicateAggregation", "All") if gvk, err := apiutil.GVKForObject(e.ObjectNew, scheme); err == nil { log = log.WithValues(gvk.Kind, klog.KObj(e.ObjectNew)) @@ -46,7 +51,7 @@ func All(scheme *runtime.Scheme, logger logr.Logger, predicates ...predicate.Fun log.V(6).Info("All provided predicates returned true, allowing further processing") return true }, - CreateFunc: func(e event.CreateEvent) bool { + CreateFunc: func(e event.TypedCreateEvent[T]) bool { log := logger.WithValues("predicateAggregation", "All") if gvk, err := apiutil.GVKForObject(e.Object, scheme); err == nil { log = log.WithValues(gvk.Kind, klog.KObj(e.Object)) @@ -60,7 +65,7 @@ func All(scheme *runtime.Scheme, logger logr.Logger, predicates ...predicate.Fun log.V(6).Info("All provided predicates returned true, allowing further processing") return true }, - DeleteFunc: func(e event.DeleteEvent) bool { + DeleteFunc: func(e event.TypedDeleteEvent[T]) bool { log := logger.WithValues("predicateAggregation", "All") if gvk, err := apiutil.GVKForObject(e.Object, scheme); err == nil { log = log.WithValues(gvk.Kind, klog.KObj(e.Object)) @@ -74,7 +79,7 @@ func All(scheme *runtime.Scheme, logger logr.Logger, predicates ...predicate.Fun log.V(6).Info("All provided predicates returned true, allowing further processing") return true }, - GenericFunc: func(e event.GenericEvent) bool { + GenericFunc: func(e event.TypedGenericEvent[T]) bool { log := logger.WithValues("predicateAggregation", "All") if gvk, err := apiutil.GVKForObject(e.Object, scheme); err == nil { log = log.WithValues(gvk.Kind, klog.KObj(e.Object)) @@ -303,3 +308,33 @@ func processIfTopologyOwned(scheme *runtime.Scheme, logger logr.Logger, obj clie logger.V(6).Info("Resource is not topology owned, will not attempt to map resource") return false } + +// ResourceIsChanged returns a predicate that returns true only if the resource +// has changed. This predicate allows to drop resync events on additionally watched objects. +func ResourceIsChanged(scheme *runtime.Scheme, logger logr.Logger) predicate.Funcs { + return TypedResourceIsChanged[client.Object](scheme, logger) +} + +// TypedResourceIsChanged returns a predicate that returns true only if the resource +// has changed. This predicate allows to drop resync events on additionally watched objects. +func TypedResourceIsChanged[T client.Object](scheme *runtime.Scheme, logger logr.Logger) predicate.TypedFuncs[T] { + log := logger.WithValues("predicate", "ResourceIsChanged") + return predicate.TypedFuncs[T]{ + UpdateFunc: func(e event.TypedUpdateEvent[T]) bool { + // Ensure we don't modify log from above. + log := log + if gvk, err := apiutil.GVKForObject(e.ObjectNew, scheme); err == nil { + log = log.WithValues(gvk.Kind, klog.KObj(e.ObjectNew)) + } + if e.ObjectOld.GetResourceVersion() == e.ObjectNew.GetResourceVersion() { + log.WithValues("eventType", "update").V(6).Info("Resource is not changed, will not attempt to map resource") + return false + } + log.WithValues("eventType", "update").V(6).Info("Resource is changed, will attempt to map resource") + return true + }, + CreateFunc: func(event.TypedCreateEvent[T]) bool { return true }, + DeleteFunc: func(event.TypedDeleteEvent[T]) bool { return true }, + GenericFunc: func(event.TypedGenericEvent[T]) bool { return true }, + } +}