From da88b42acc3a33319b8df4d615dacbd9a8ec3b32 Mon Sep 17 00:00:00 2001 From: fabriziopandini Date: Fri, 20 Dec 2024 11:43:41 +0100 Subject: [PATCH 1/3] Reconcile topology only when necessary --- .../topology/cluster/cluster_controller.go | 94 ++++++++++++++++++- 1 file changed, 90 insertions(+), 4 deletions(-) diff --git a/internal/controllers/topology/cluster/cluster_controller.go b/internal/controllers/topology/cluster/cluster_controller.go index 0bc91eb609c0..3f601fb40866 100644 --- a/internal/controllers/topology/cluster/cluster_controller.go +++ b/internal/controllers/topology/cluster/cluster_controller.go @@ -19,22 +19,28 @@ package cluster import ( "context" "fmt" + "reflect" "time" "github.com/go-logr/logr" "github.com/pkg/errors" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" kerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/client-go/tools/record" + "k8s.io/klog/v2" "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/cache/informertest" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/apiutil" "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" @@ -103,8 +109,11 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt predicateLog := ctrl.LoggerFrom(ctx).WithValues("controller", "topology/cluster") c, err := ctrl.NewControllerManagedBy(mgr). For(&clusterv1.Cluster{}, builder.WithPredicates( - // Only reconcile Cluster with topology. - predicates.ClusterHasTopology(mgr.GetScheme(), predicateLog), + // Only reconcile Cluster with topology and with changes relevant for this controller. + predicates.All(mgr.GetScheme(), predicateLog, + predicates.ClusterHasTopology(mgr.GetScheme(), predicateLog), + clusterChangeIsRelevant(mgr.GetScheme(), predicateLog), + ), )). Named("topology/cluster"). WatchesRawSource(r.ClusterCache.GetClusterSource("topology/cluster", func(_ context.Context, o client.Object) []ctrl.Request { @@ -118,16 +127,17 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt Watches( &clusterv1.MachineDeployment{}, handler.EnqueueRequestsFromMapFunc(r.machineDeploymentToCluster), - // Only trigger Cluster reconciliation if the MachineDeployment is topology owned. + // Only trigger Cluster reconciliation if the MachineDeployment is topology owned, the resource is changed, and the change is relevant. builder.WithPredicates(predicates.All(mgr.GetScheme(), predicateLog, predicates.ResourceIsChanged(mgr.GetScheme(), predicateLog), predicates.ResourceIsTopologyOwned(mgr.GetScheme(), predicateLog), + machineDeploymentChangeIsRelevant(mgr.GetScheme(), predicateLog), )), ). Watches( &expv1.MachinePool{}, handler.EnqueueRequestsFromMapFunc(r.machinePoolToCluster), - // Only trigger Cluster reconciliation if the MachinePool is topology owned. + // Only trigger Cluster reconciliation if the MachinePool is topology owned, the resource is changed. builder.WithPredicates(predicates.All(mgr.GetScheme(), predicateLog, predicates.ResourceIsChanged(mgr.GetScheme(), predicateLog), predicates.ResourceIsTopologyOwned(mgr.GetScheme(), predicateLog), @@ -155,6 +165,82 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt return nil } +func clusterChangeIsRelevant(scheme *runtime.Scheme, logger logr.Logger) predicate.Funcs { + dropNotRelevant := func(cluster *clusterv1.Cluster) *clusterv1.Cluster { + c := cluster.DeepCopy() + // Drop changes on v1beta2 conditions; when v1beta2 conditions will be moved top level, we will review this + // selectively drop changes not relevant for this controller. + c.Status.V1Beta2 = nil + return c + } + + return predicate.Funcs{ + UpdateFunc: func(e event.UpdateEvent) bool { + log := logger.WithValues("predicate", "ClusterUpdateUnpaused", "eventType", "update") + if gvk, err := apiutil.GVKForObject(e.ObjectOld, scheme); err == nil { + log = log.WithValues(gvk.Kind, klog.KObj(e.ObjectOld)) + } + + oldObj, ok := e.ObjectOld.(*clusterv1.Cluster) + if !ok { + log.V(4).Info("Expected Cluster", "type", fmt.Sprintf("%T", e.ObjectOld)) + return false + } + oldObj = dropNotRelevant(oldObj) + + newObj := e.ObjectNew.(*clusterv1.Cluster) + if !ok { + log.V(4).Info("Expected Cluster", "type", fmt.Sprintf("%T", e.ObjectNew)) + return false + } + newObj = dropNotRelevant(newObj) + + return reflect.DeepEqual(oldObj, newObj) + }, + CreateFunc: func(event.CreateEvent) bool { return true }, + DeleteFunc: func(event.DeleteEvent) bool { return true }, + GenericFunc: func(event.GenericEvent) bool { return false }, + } +} + +func machineDeploymentChangeIsRelevant(scheme *runtime.Scheme, logger logr.Logger) predicate.Funcs { + dropNotRelevant := func(cluster *clusterv1.MachineDeployment) *clusterv1.MachineDeployment { + md := cluster.DeepCopy() + // Drop changes on v1beta2 conditions; when v1beta2 conditions will be moved top level, we will review this + // selectively drop changes not relevant for this controller. + md.Status.V1Beta2 = nil + return md + } + + return predicate.Funcs{ + UpdateFunc: func(e event.UpdateEvent) bool { + log := logger.WithValues("predicate", "ClusterUpdateUnpaused", "eventType", "update") + if gvk, err := apiutil.GVKForObject(e.ObjectOld, scheme); err == nil { + log = log.WithValues(gvk.Kind, klog.KObj(e.ObjectOld)) + } + + oldObj, ok := e.ObjectOld.(*clusterv1.MachineDeployment) + if !ok { + log.V(4).Info("Expected Cluster", "type", fmt.Sprintf("%T", e.ObjectOld)) + return false + } + oldObj = dropNotRelevant(oldObj) + + newObj := e.ObjectNew.(*clusterv1.MachineDeployment) + if !ok { + log.V(4).Info("Expected Cluster", "type", fmt.Sprintf("%T", e.ObjectNew)) + return false + } + newObj = dropNotRelevant(newObj) + + return reflect.DeepEqual(oldObj, newObj) + }, + CreateFunc: func(event.CreateEvent) bool { return true }, + DeleteFunc: func(event.DeleteEvent) bool { return true }, + GenericFunc: func(event.GenericEvent) bool { return false }, + } +} + // SetupForDryRun prepares the Reconciler for a dry run execution. func (r *Reconciler) SetupForDryRun(recorder record.EventRecorder) { r.desiredStateGenerator = desiredstate.NewGenerator(r.Client, r.ClusterCache, r.RuntimeClient) From d0962fa98278332e17ec4b4fc8263aa38292f459 Mon Sep 17 00:00:00 2001 From: fabriziopandini Date: Fri, 20 Dec 2024 12:47:56 +0100 Subject: [PATCH 2/3] Address comments --- .../topology/cluster/cluster_controller.go | 38 ++++++++++++++----- 1 file changed, 28 insertions(+), 10 deletions(-) diff --git a/internal/controllers/topology/cluster/cluster_controller.go b/internal/controllers/topology/cluster/cluster_controller.go index 3f601fb40866..3e31c0d34722 100644 --- a/internal/controllers/topology/cluster/cluster_controller.go +++ b/internal/controllers/topology/cluster/cluster_controller.go @@ -168,6 +168,10 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt func clusterChangeIsRelevant(scheme *runtime.Scheme, logger logr.Logger) predicate.Funcs { dropNotRelevant := func(cluster *clusterv1.Cluster) *clusterv1.Cluster { c := cluster.DeepCopy() + // Drop metadata fields which are impacted by not relevant changes. + c.ObjectMeta.ManagedFields = nil + c.ObjectMeta.ResourceVersion = "" + // Drop changes on v1beta2 conditions; when v1beta2 conditions will be moved top level, we will review this // selectively drop changes not relevant for this controller. c.Status.V1Beta2 = nil @@ -176,7 +180,7 @@ func clusterChangeIsRelevant(scheme *runtime.Scheme, logger logr.Logger) predica return predicate.Funcs{ UpdateFunc: func(e event.UpdateEvent) bool { - log := logger.WithValues("predicate", "ClusterUpdateUnpaused", "eventType", "update") + log := logger.WithValues("predicate", "ClusterChangeIsRelevant", "eventType", "update") if gvk, err := apiutil.GVKForObject(e.ObjectOld, scheme); err == nil { log = log.WithValues(gvk.Kind, klog.KObj(e.ObjectOld)) } @@ -195,17 +199,26 @@ func clusterChangeIsRelevant(scheme *runtime.Scheme, logger logr.Logger) predica } newObj = dropNotRelevant(newObj) - return reflect.DeepEqual(oldObj, newObj) + if reflect.DeepEqual(oldObj, newObj) { + logger.V(6).Info("Cluster does not have relevant changes, blocking further processing") + return false + } + logger.V(6).Info("Cluster has relevant changes, allowing further processing") + return true }, CreateFunc: func(event.CreateEvent) bool { return true }, DeleteFunc: func(event.DeleteEvent) bool { return true }, - GenericFunc: func(event.GenericEvent) bool { return false }, + GenericFunc: func(event.GenericEvent) bool { return true }, } } func machineDeploymentChangeIsRelevant(scheme *runtime.Scheme, logger logr.Logger) predicate.Funcs { - dropNotRelevant := func(cluster *clusterv1.MachineDeployment) *clusterv1.MachineDeployment { - md := cluster.DeepCopy() + dropNotRelevant := func(machineDeployment *clusterv1.MachineDeployment) *clusterv1.MachineDeployment { + md := machineDeployment.DeepCopy() + // Drop metadata fields which are impacted by not relevant changes. + md.ObjectMeta.ManagedFields = nil + md.ObjectMeta.ResourceVersion = "" + // Drop changes on v1beta2 conditions; when v1beta2 conditions will be moved top level, we will review this // selectively drop changes not relevant for this controller. md.Status.V1Beta2 = nil @@ -214,30 +227,35 @@ func machineDeploymentChangeIsRelevant(scheme *runtime.Scheme, logger logr.Logge return predicate.Funcs{ UpdateFunc: func(e event.UpdateEvent) bool { - log := logger.WithValues("predicate", "ClusterUpdateUnpaused", "eventType", "update") + log := logger.WithValues("predicate", "MachineDeploymentChangeIsRelevant", "eventType", "update") if gvk, err := apiutil.GVKForObject(e.ObjectOld, scheme); err == nil { log = log.WithValues(gvk.Kind, klog.KObj(e.ObjectOld)) } oldObj, ok := e.ObjectOld.(*clusterv1.MachineDeployment) if !ok { - log.V(4).Info("Expected Cluster", "type", fmt.Sprintf("%T", e.ObjectOld)) + log.V(4).Info("Expected MachineDeployment", "type", fmt.Sprintf("%T", e.ObjectOld)) return false } oldObj = dropNotRelevant(oldObj) newObj := e.ObjectNew.(*clusterv1.MachineDeployment) if !ok { - log.V(4).Info("Expected Cluster", "type", fmt.Sprintf("%T", e.ObjectNew)) + log.V(4).Info("Expected MachineDeployment", "type", fmt.Sprintf("%T", e.ObjectNew)) return false } newObj = dropNotRelevant(newObj) - return reflect.DeepEqual(oldObj, newObj) + if reflect.DeepEqual(oldObj, newObj) { + logger.V(6).Info("MachineDeployment does not have relevant changes, blocking further processing") + return false + } + logger.V(6).Info("MachineDeployment has relevant changes, allowing further processing") + return true }, CreateFunc: func(event.CreateEvent) bool { return true }, DeleteFunc: func(event.DeleteEvent) bool { return true }, - GenericFunc: func(event.GenericEvent) bool { return false }, + GenericFunc: func(event.GenericEvent) bool { return true }, } } From 70ecd4ac46713b33f4232abcf661242f5cc08f7c Mon Sep 17 00:00:00 2001 From: fabriziopandini Date: Fri, 20 Dec 2024 15:40:26 +0100 Subject: [PATCH 3/3] Allow resync for the cluster object --- .../topology/cluster/cluster_controller.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/internal/controllers/topology/cluster/cluster_controller.go b/internal/controllers/topology/cluster/cluster_controller.go index 3e31c0d34722..a5cd93bf1b06 100644 --- a/internal/controllers/topology/cluster/cluster_controller.go +++ b/internal/controllers/topology/cluster/cluster_controller.go @@ -185,6 +185,11 @@ func clusterChangeIsRelevant(scheme *runtime.Scheme, logger logr.Logger) predica log = log.WithValues(gvk.Kind, klog.KObj(e.ObjectOld)) } + if e.ObjectOld.GetResourceVersion() == e.ObjectNew.GetResourceVersion() { + log.V(6).Info("Cluster resync event, allowing further processing") + return true + } + oldObj, ok := e.ObjectOld.(*clusterv1.Cluster) if !ok { log.V(4).Info("Expected Cluster", "type", fmt.Sprintf("%T", e.ObjectOld)) @@ -200,10 +205,10 @@ func clusterChangeIsRelevant(scheme *runtime.Scheme, logger logr.Logger) predica newObj = dropNotRelevant(newObj) if reflect.DeepEqual(oldObj, newObj) { - logger.V(6).Info("Cluster does not have relevant changes, blocking further processing") + log.V(6).Info("Cluster does not have relevant changes, blocking further processing") return false } - logger.V(6).Info("Cluster has relevant changes, allowing further processing") + log.V(6).Info("Cluster has relevant changes, allowing further processing") return true }, CreateFunc: func(event.CreateEvent) bool { return true }, @@ -247,10 +252,10 @@ func machineDeploymentChangeIsRelevant(scheme *runtime.Scheme, logger logr.Logge newObj = dropNotRelevant(newObj) if reflect.DeepEqual(oldObj, newObj) { - logger.V(6).Info("MachineDeployment does not have relevant changes, blocking further processing") + log.V(6).Info("MachineDeployment does not have relevant changes, blocking further processing") return false } - logger.V(6).Info("MachineDeployment has relevant changes, allowing further processing") + log.V(6).Info("MachineDeployment has relevant changes, allowing further processing") return true }, CreateFunc: func(event.CreateEvent) bool { return true },