Skip to content

Commit

Permalink
SSA: Implement request caching
Browse files Browse the repository at this point in the history
  • Loading branch information
sbueringer committed Mar 2, 2023
1 parent 82946b3 commit 539760a
Show file tree
Hide file tree
Showing 22 changed files with 570 additions and 132 deletions.
2 changes: 2 additions & 0 deletions controlplane/kubeadm/internal/controllers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ type KubeadmControlPlaneReconciler struct {
// support SSA. This flag should be dropped after all affected tests are migrated
// to envtest.
disableInPlacePropagation bool
ssaCache ssa.Cache
}

func (r *KubeadmControlPlaneReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options) error {
Expand Down Expand Up @@ -113,6 +114,7 @@ func (r *KubeadmControlPlaneReconciler) SetupWithManager(ctx context.Context, mg

r.controller = c
r.recorder = mgr.GetEventRecorderFor("kubeadm-control-plane-controller")
r.ssaCache = ssa.NewCache()

if r.managementCluster == nil {
if r.Tracker == nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1574,7 +1574,7 @@ func TestKubeadmControlPlaneReconciler_syncMachines(t *testing.T) {

// Run syncMachines to clean up managed fields and have proper field ownership
// for Machines, InfrastructureMachines and KubeadmConfigs.
reconciler := &KubeadmControlPlaneReconciler{Client: env}
reconciler := &KubeadmControlPlaneReconciler{Client: env, ssaCache: ssa.NewCache()}
g.Expect(reconciler.syncMachines(ctx, controlPlane)).To(Succeed())

// The inPlaceMutatingMachine should have cleaned up managed fields.
Expand Down
27 changes: 9 additions & 18 deletions controlplane/kubeadm/internal/controllers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"sigs.k8s.io/cluster-api/controllers/external"
controlplanev1 "sigs.k8s.io/cluster-api/controlplane/kubeadm/api/v1beta1"
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal"
"sigs.k8s.io/cluster-api/internal/util/ssa"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/certs"
"sigs.k8s.io/cluster-api/util/conditions"
Expand Down Expand Up @@ -305,12 +306,8 @@ func (r *KubeadmControlPlaneReconciler) updateExternalObject(ctx context.Context
// Update annotations
updatedObject.SetAnnotations(kcp.Spec.MachineTemplate.ObjectMeta.Annotations)

patchOptions := []client.PatchOption{
client.ForceOwnership,
client.FieldOwner(kcpManagerName),
}
if err := r.Client.Patch(ctx, updatedObject, client.Apply, patchOptions...); err != nil {
return errors.Wrapf(err, "failed to update %s", klog.KObj(obj))
if err := ssa.Patch(ctx, r.Client, kcpManagerName, updatedObject); err != nil {
return errors.Wrapf(err, "failed to update %s", obj.GetObjectKind().GroupVersionKind().Kind)
}
return nil
}
Expand All @@ -320,12 +317,8 @@ func (r *KubeadmControlPlaneReconciler) createMachine(ctx context.Context, kcp *
if err != nil {
return errors.Wrap(err, "failed to create Machine: failed to compute desired Machine")
}
patchOptions := []client.PatchOption{
client.ForceOwnership,
client.FieldOwner(kcpManagerName),
}
if err := r.Client.Patch(ctx, machine, client.Apply, patchOptions...); err != nil {
return errors.Wrap(err, "failed to create Machine: apply failed")
if err := ssa.Patch(ctx, r.Client, kcpManagerName, machine); err != nil {
return errors.Wrap(err, "failed to create Machine")
}
// Remove the annotation tracking that a remediation is in progress (the remediation completed when
// the replacement machine has been created above).
Expand All @@ -342,12 +335,10 @@ func (r *KubeadmControlPlaneReconciler) updateMachine(ctx context.Context, machi
if err != nil {
return nil, errors.Wrap(err, "failed to update Machine: failed to compute desired Machine")
}
patchOptions := []client.PatchOption{
client.ForceOwnership,
client.FieldOwner(kcpManagerName),
}
if err := r.Client.Patch(ctx, updatedMachine, client.Apply, patchOptions...); err != nil {
return nil, errors.Wrap(err, "failed to update Machine: apply failed")

err = ssa.Patch(ctx, r.Client, kcpManagerName, updatedMachine, ssa.WithCachingProxy{Cache: r.ssaCache, Original: machine})
if err != nil {
return nil, errors.Wrap(err, "failed to update Machine")
}
return updatedMachine, nil
}
Expand Down
7 changes: 7 additions & 0 deletions internal/controllers/machine/machine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"sigs.k8s.io/cluster-api/controllers/external"
"sigs.k8s.io/cluster-api/controllers/noderefutil"
"sigs.k8s.io/cluster-api/controllers/remote"
"sigs.k8s.io/cluster-api/internal/util/ssa"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/annotations"
"sigs.k8s.io/cluster-api/util/collections"
Expand All @@ -58,6 +59,10 @@ import (
const (
// controllerName defines the controller used when creating clients.
controllerName = "machine-controller"

// machineManagerName is the manager name used for Server-Side-Apply (SSA) operations
// in the Machine controller.
machineManagerName = "capi-machine"
)

var (
Expand Down Expand Up @@ -91,6 +96,7 @@ type Reconciler struct {
// nodeDeletionRetryTimeout determines how long the controller will retry deleting a node
// during a single reconciliation.
nodeDeletionRetryTimeout time.Duration
ssaCache ssa.Cache
}

func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options) error {
Expand Down Expand Up @@ -134,6 +140,7 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt
r.externalTracker = external.ObjectTracker{
Controller: controller,
}
r.ssaCache = ssa.NewCache()
return nil
}

Expand Down
12 changes: 5 additions & 7 deletions internal/controllers/machine/machine_controller_noderef.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/api/v1beta1/index"
"sigs.k8s.io/cluster-api/controllers/noderefutil"
"sigs.k8s.io/cluster-api/internal/util/ssa"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/annotations"
"sigs.k8s.io/cluster-api/util/conditions"
Expand Down Expand Up @@ -124,13 +125,10 @@ func (r *Reconciler) reconcileNode(ctx context.Context, cluster *clusterv1.Clust
}
}

options := []client.PatchOption{
client.FieldOwner("capi-machine"),
client.ForceOwnership,
}
nodePatch := unstructuredNode(node.Name, node.UID, getManagedLabels(machine.Labels))
if err := remoteClient.Patch(ctx, nodePatch, client.Apply, options...); err != nil {
return ctrl.Result{}, errors.Wrap(err, "failed to apply patch label to the node")
updatedNode := unstructuredNode(node.Name, node.UID, getManagedLabels(machine.Labels))
err = ssa.Patch(ctx, remoteClient, machineManagerName, updatedNode, ssa.WithCachingProxy{Cache: r.ssaCache, Original: node})
if err != nil {
return ctrl.Result{}, errors.Wrap(err, "failed to apply labels to Node")
}

// Do the remaining node health checks, then set the node health to true if all checks pass.
Expand Down
11 changes: 7 additions & 4 deletions internal/controllers/machine/machine_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"sigs.k8s.io/cluster-api/api/v1beta1/index"
"sigs.k8s.io/cluster-api/controllers/remote"
"sigs.k8s.io/cluster-api/internal/test/builder"
"sigs.k8s.io/cluster-api/internal/util/ssa"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/conditions"
"sigs.k8s.io/cluster-api/util/patch"
Expand Down Expand Up @@ -726,8 +727,9 @@ func TestReconcileRequest(t *testing.T) {
).WithIndex(&corev1.Node{}, index.NodeProviderIDField, index.NodeByProviderID).Build()

r := &Reconciler{
Client: clientFake,
Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), clientFake, scheme.Scheme, client.ObjectKey{Name: testCluster.Name, Namespace: testCluster.Namespace}),
Client: clientFake,
Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), clientFake, scheme.Scheme, client.ObjectKey{Name: testCluster.Name, Namespace: testCluster.Namespace}),
ssaCache: ssa.NewCache(),
}

result, err := r.Reconcile(ctx, reconcile.Request{NamespacedName: util.ObjectKey(&tc.machine)})
Expand Down Expand Up @@ -972,8 +974,9 @@ func TestMachineConditions(t *testing.T) {
Build()

r := &Reconciler{
Client: clientFake,
Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), clientFake, scheme.Scheme, client.ObjectKey{Name: testCluster.Name, Namespace: testCluster.Namespace}),
Client: clientFake,
Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), clientFake, scheme.Scheme, client.ObjectKey{Name: testCluster.Name, Namespace: testCluster.Namespace}),
ssaCache: ssa.NewCache(),
}

_, err := r.Reconcile(ctx, reconcile.Request{NamespacedName: util.ObjectKey(&machine)})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type Reconciler struct {
WatchFilterValue string

recorder record.EventRecorder
ssaCache ssa.Cache
}

func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options) error {
Expand Down Expand Up @@ -106,6 +107,7 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt
}

r.recorder = mgr.GetEventRecorderFor("machinedeployment-controller")
r.ssaCache = ssa.NewCache()
return nil
}

Expand Down
17 changes: 6 additions & 11 deletions internal/controllers/machinedeployment/machinedeployment_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ import (

clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/internal/controllers/machinedeployment/mdutil"
"sigs.k8s.io/cluster-api/internal/util/hash"
"sigs.k8s.io/cluster-api/internal/util/ssa"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/conditions"
"sigs.k8s.io/cluster-api/util/patch"
Expand Down Expand Up @@ -153,11 +155,8 @@ func (r *Reconciler) updateMachineSet(ctx context.Context, deployment *clusterv1
}

// Update the MachineSet to propagate in-place mutable fields from the MachineDeployment.
patchOptions := []client.PatchOption{
client.ForceOwnership,
client.FieldOwner(machineDeploymentManagerName),
}
if err := r.Client.Patch(ctx, updatedMS, client.Apply, patchOptions...); err != nil {
err = ssa.Patch(ctx, r.Client, machineDeploymentManagerName, updatedMS, ssa.WithCachingProxy{Cache: r.ssaCache, Original: ms})
if err != nil {
r.recorder.Eventf(deployment, corev1.EventTypeWarning, "FailedUpdate", "Failed to update MachineSet %s: %v", klog.KObj(updatedMS), err)
return nil, errors.Wrapf(err, "failed to update MachineSet %s", klog.KObj(updatedMS))
}
Expand All @@ -178,11 +177,7 @@ func (r *Reconciler) createMachineSetAndWait(ctx context.Context, deployment *cl
}

// Create the MachineSet.
patchOptions := []client.PatchOption{
client.ForceOwnership,
client.FieldOwner(machineDeploymentManagerName),
}
if err = r.Client.Patch(ctx, newMS, client.Apply, patchOptions...); err != nil {
if err := ssa.Patch(ctx, r.Client, machineDeploymentManagerName, newMS); err != nil {
r.recorder.Eventf(deployment, corev1.EventTypeWarning, "FailedCreate", "Failed to create MachineSet %s: %v", klog.KObj(newMS), err)
return nil, errors.Wrapf(err, "failed to create new MachineSet %s", klog.KObj(newMS))
}
Expand Down Expand Up @@ -237,7 +232,7 @@ func (r *Reconciler) computeDesiredMachineSet(deployment *clusterv1.MachineDeplo
// As a result, we use the hash of the machine template while ignoring all in-place mutable fields, i.e. the
// machine template with only fields that could trigger a rollout for the machine-template-hash, making it
// independent of the changes to any in-place mutable fields.
templateHash, err := mdutil.ComputeSpewHash(mdutil.MachineTemplateDeepCopyRolloutFields(&deployment.Spec.Template))
templateHash, err := hash.Compute(mdutil.MachineTemplateDeepCopyRolloutFields(&deployment.Spec.Template))
if err != nil {
return nil, errors.Wrap(err, "failed to compute desired MachineSet: failed to compute machine template hash")
}
Expand Down
30 changes: 0 additions & 30 deletions internal/controllers/machinedeployment/mdutil/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,10 @@ package mdutil

import (
"fmt"
"hash"
"hash/fnv"
"sort"
"strconv"
"strings"

"github.com/davecgh/go-spew/spew"
"github.com/go-logr/logr"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -688,33 +685,6 @@ func CloneSelectorAndAddLabel(selector *metav1.LabelSelector, labelKey, labelVal
return newSelector
}

// SpewHashObject writes specified object to hash using the spew library
// which follows pointers and prints actual values of the nested objects
// ensuring the hash does not change when a pointer changes.
func SpewHashObject(hasher hash.Hash, objectToWrite interface{}) error {
hasher.Reset()
printer := spew.ConfigState{
Indent: " ",
SortKeys: true,
DisableMethods: true,
SpewKeys: true,
}

if _, err := printer.Fprintf(hasher, "%#v", objectToWrite); err != nil {
return fmt.Errorf("failed to write object to hasher")
}
return nil
}

// ComputeSpewHash computes the hash of a MachineTemplateSpec using the spew library.
func ComputeSpewHash(objectToWrite interface{}) (uint32, error) {
machineTemplateSpecHasher := fnv.New32a()
if err := SpewHashObject(machineTemplateSpecHasher, objectToWrite); err != nil {
return 0, err
}
return machineTemplateSpecHasher.Sum32(), nil
}

// GetDeletingMachineCount gets the number of machines that are in the process of being deleted
// in a machineList.
func GetDeletingMachineCount(machineList *clusterv1.MachineList) int32 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2245,7 +2245,7 @@ func TestGetMaxUnhealthy(t *testing.T) {
func ownerReferenceForCluster(ctx context.Context, g *WithT, c *clusterv1.Cluster) metav1.OwnerReference {
// Fetch the cluster to populate the UID
cc := &clusterv1.Cluster{}
g.Expect(env.GetClient().Get(ctx, util.ObjectKey(c), cc)).To(Succeed())
g.Expect(env.Get(ctx, util.ObjectKey(c), cc)).To(Succeed())

return metav1.OwnerReference{
APIVersion: clusterv1.GroupVersion.String(),
Expand Down
21 changes: 6 additions & 15 deletions internal/controllers/machineset/machineset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ type Reconciler struct {
// WatchFilterValue is the label value used to filter events prior to reconciliation.
WatchFilterValue string

ssaCache ssa.Cache
recorder record.EventRecorder
}

Expand Down Expand Up @@ -122,6 +123,7 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt
}

r.recorder = mgr.GetEventRecorderFor("machineset-controller")
r.ssaCache = ssa.NewCache()
return nil
}

Expand Down Expand Up @@ -393,11 +395,8 @@ func (r *Reconciler) syncMachines(ctx context.Context, machineSet *clusterv1.Mac

// Update Machine to propagate in-place mutable fields from the MachineSet.
updatedMachine := r.computeDesiredMachine(machineSet, m)
patchOptions := []client.PatchOption{
client.ForceOwnership,
client.FieldOwner(machineSetManagerName),
}
if err := r.Client.Patch(ctx, updatedMachine, client.Apply, patchOptions...); err != nil {
err := ssa.Patch(ctx, r.Client, machineSetManagerName, updatedMachine, ssa.WithCachingProxy{Cache: r.ssaCache, Original: m})
if err != nil {
log.Error(err, "failed to update Machine", "Machine", klog.KObj(updatedMachine))
return errors.Wrapf(err, "failed to update Machine %q", klog.KObj(updatedMachine))
}
Expand Down Expand Up @@ -529,11 +528,7 @@ func (r *Reconciler) syncReplicas(ctx context.Context, ms *clusterv1.MachineSet,
machine.Spec.InfrastructureRef = *infraRef

// Create the Machine.
patchOptions := []client.PatchOption{
client.ForceOwnership,
client.FieldOwner(machineSetManagerName),
}
if err := r.Client.Patch(ctx, machine, client.Apply, patchOptions...); err != nil {
if err := ssa.Patch(ctx, r.Client, machineSetManagerName, machine); err != nil {
log.Error(err, "Error while creating a machine")
r.recorder.Eventf(ms, corev1.EventTypeWarning, "FailedCreate", "Failed to create machine: %v", err)
errs = append(errs, err)
Expand Down Expand Up @@ -676,11 +671,7 @@ func (r *Reconciler) updateExternalObject(ctx context.Context, obj client.Object
updatedObject.SetLabels(machineLabelsFromMachineSet(machineSet))
updatedObject.SetAnnotations(machineAnnotationsFromMachineSet(machineSet))

patchOptions := []client.PatchOption{
client.ForceOwnership,
client.FieldOwner(machineSetManagerName),
}
if err := r.Client.Patch(ctx, updatedObject, client.Apply, patchOptions...); err != nil {
if err := ssa.Patch(ctx, r.Client, machineSetManagerName, updatedObject, ssa.WithCachingProxy{Cache: r.ssaCache, Original: obj}); err != nil {
return errors.Wrapf(err, "failed to update %s", klog.KObj(obj))
}
return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1175,7 +1175,7 @@ func TestMachineSetReconciler_syncMachines(t *testing.T) {

// Run syncMachines to clean up managed fields and have proper field ownership
// for Machines, InfrastructureMachines and BootstrapConfigs.
reconciler := &Reconciler{Client: env}
reconciler := &Reconciler{Client: env, ssaCache: ssa.NewCache()}
g.Expect(reconciler.syncMachines(ctx, ms, machines)).To(Succeed())

// The inPlaceMutatingMachine should have cleaned up managed fields.
Expand Down
7 changes: 4 additions & 3 deletions internal/controllers/topology/cluster/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"sigs.k8s.io/cluster-api/internal/hooks"
tlog "sigs.k8s.io/cluster-api/internal/log"
runtimeclient "sigs.k8s.io/cluster-api/internal/runtime/client"
"sigs.k8s.io/cluster-api/internal/util/ssa"
"sigs.k8s.io/cluster-api/internal/webhooks"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/annotations"
Expand Down Expand Up @@ -118,7 +119,7 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt
r.patchEngine = patches.NewEngine(r.RuntimeClient)
r.recorder = mgr.GetEventRecorderFor("topology/cluster")
if r.patchHelperFactory == nil {
r.patchHelperFactory = serverSideApplyPatchHelperFactory(r.Client)
r.patchHelperFactory = serverSideApplyPatchHelperFactory(r.Client, ssa.NewCache())
}
return nil
}
Expand Down Expand Up @@ -394,9 +395,9 @@ func (r *Reconciler) reconcileDelete(ctx context.Context, cluster *clusterv1.Clu
}

// serverSideApplyPatchHelperFactory makes use of managed fields provided by server side apply and is used by the controller.
func serverSideApplyPatchHelperFactory(c client.Client) structuredmerge.PatchHelperFactoryFunc {
func serverSideApplyPatchHelperFactory(c client.Client, ssaCache ssa.Cache) structuredmerge.PatchHelperFactoryFunc {
return func(ctx context.Context, original, modified client.Object, opts ...structuredmerge.HelperOption) (structuredmerge.PatchHelper, error) {
return structuredmerge.NewServerSidePatchHelper(ctx, original, modified, c, opts...)
return structuredmerge.NewServerSidePatchHelper(ctx, original, modified, c, ssaCache, opts...)
}
}

Expand Down
Loading

0 comments on commit 539760a

Please sign in to comment.