Skip to content

Commit

Permalink
SSA: Implement request caching
Browse files Browse the repository at this point in the history
  • Loading branch information
sbueringer committed Mar 2, 2023
1 parent 9fe7ff5 commit 888c7d2
Show file tree
Hide file tree
Showing 18 changed files with 558 additions and 112 deletions.
7 changes: 7 additions & 0 deletions internal/controllers/machine/machine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"sigs.k8s.io/cluster-api/controllers/external"
"sigs.k8s.io/cluster-api/controllers/noderefutil"
"sigs.k8s.io/cluster-api/controllers/remote"
"sigs.k8s.io/cluster-api/internal/util/ssa"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/annotations"
"sigs.k8s.io/cluster-api/util/collections"
Expand All @@ -58,6 +59,10 @@ import (
const (
// controllerName defines the controller used when creating clients.
controllerName = "machine-controller"

// machineManagerName is the manager name used for Server-Side-Apply (SSA) operations
// in the Machine controller.
machineManagerName = "capi-machine"
)

var (
Expand Down Expand Up @@ -91,6 +96,7 @@ type Reconciler struct {
// nodeDeletionRetryTimeout determines how long the controller will retry deleting a node
// during a single reconciliation.
nodeDeletionRetryTimeout time.Duration
ssaCache ssa.Cache
}

func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options) error {
Expand Down Expand Up @@ -134,6 +140,7 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt
r.externalTracker = external.ObjectTracker{
Controller: controller,
}
r.ssaCache = ssa.NewCache()
return nil
}

Expand Down
12 changes: 5 additions & 7 deletions internal/controllers/machine/machine_controller_noderef.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/api/v1beta1/index"
"sigs.k8s.io/cluster-api/controllers/noderefutil"
"sigs.k8s.io/cluster-api/internal/util/ssa"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/annotations"
"sigs.k8s.io/cluster-api/util/conditions"
Expand Down Expand Up @@ -124,13 +125,10 @@ func (r *Reconciler) reconcileNode(ctx context.Context, cluster *clusterv1.Clust
}
}

options := []client.PatchOption{
client.FieldOwner("capi-machine"),
client.ForceOwnership,
}
nodePatch := unstructuredNode(node.Name, node.UID, getManagedLabels(machine.Labels))
if err := remoteClient.Patch(ctx, nodePatch, client.Apply, options...); err != nil {
return ctrl.Result{}, errors.Wrap(err, "failed to apply patch label to the node")
updatedNode := unstructuredNode(node.Name, node.UID, getManagedLabels(machine.Labels))
_, err = ssa.Patch(ctx, remoteClient, machineManagerName, updatedNode, ssa.WithCachingProxy{Cache: r.ssaCache, Original: node})
if err != nil {
return ctrl.Result{}, errors.Wrap(err, "failed to apply labels to Node")
}

// Do the remaining node health checks, then set the node health to true if all checks pass.
Expand Down
11 changes: 7 additions & 4 deletions internal/controllers/machine/machine_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"sigs.k8s.io/cluster-api/api/v1beta1/index"
"sigs.k8s.io/cluster-api/controllers/remote"
"sigs.k8s.io/cluster-api/internal/test/builder"
"sigs.k8s.io/cluster-api/internal/util/ssa"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/conditions"
"sigs.k8s.io/cluster-api/util/patch"
Expand Down Expand Up @@ -726,8 +727,9 @@ func TestReconcileRequest(t *testing.T) {
).WithIndex(&corev1.Node{}, index.NodeProviderIDField, index.NodeByProviderID).Build()

r := &Reconciler{
Client: clientFake,
Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), clientFake, scheme.Scheme, client.ObjectKey{Name: testCluster.Name, Namespace: testCluster.Namespace}),
Client: clientFake,
Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), clientFake, scheme.Scheme, client.ObjectKey{Name: testCluster.Name, Namespace: testCluster.Namespace}),
ssaCache: ssa.NewCache(),
}

result, err := r.Reconcile(ctx, reconcile.Request{NamespacedName: util.ObjectKey(&tc.machine)})
Expand Down Expand Up @@ -972,8 +974,9 @@ func TestMachineConditions(t *testing.T) {
Build()

r := &Reconciler{
Client: clientFake,
Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), clientFake, scheme.Scheme, client.ObjectKey{Name: testCluster.Name, Namespace: testCluster.Namespace}),
Client: clientFake,
Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), clientFake, scheme.Scheme, client.ObjectKey{Name: testCluster.Name, Namespace: testCluster.Namespace}),
ssaCache: ssa.NewCache(),
}

_, err := r.Reconcile(ctx, reconcile.Request{NamespacedName: util.ObjectKey(&machine)})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type Reconciler struct {
WatchFilterValue string

recorder record.EventRecorder
ssaCache ssa.Cache
}

func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options) error {
Expand Down Expand Up @@ -106,6 +107,7 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt
}

r.recorder = mgr.GetEventRecorderFor("machinedeployment-controller")
r.ssaCache = ssa.NewCache()
return nil
}

Expand Down
18 changes: 7 additions & 11 deletions internal/controllers/machinedeployment/machinedeployment_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ import (

clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/internal/controllers/machinedeployment/mdutil"
"sigs.k8s.io/cluster-api/internal/util/hash"
"sigs.k8s.io/cluster-api/internal/util/ssa"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/conditions"
"sigs.k8s.io/cluster-api/util/patch"
Expand Down Expand Up @@ -153,14 +155,12 @@ func (r *Reconciler) updateMachineSet(ctx context.Context, deployment *clusterv1
}

// Update the MachineSet to propagate in-place mutable fields from the MachineDeployment.
patchOptions := []client.PatchOption{
client.ForceOwnership,
client.FieldOwner(machineDeploymentManagerName),
}
if err := r.Client.Patch(ctx, updatedMS, client.Apply, patchOptions...); err != nil {
updatedObject, err := ssa.Patch(ctx, r.Client, machineDeploymentManagerName, updatedMS, ssa.WithCachingProxy{Cache: r.ssaCache, Original: ms})
if err != nil {
r.recorder.Eventf(deployment, corev1.EventTypeWarning, "FailedUpdate", "Failed to update MachineSet %s: %v", klog.KObj(updatedMS), err)
return nil, errors.Wrapf(err, "failed to update MachineSet %s", klog.KObj(updatedMS))
}
updatedMS = updatedObject.(*clusterv1.MachineSet)

log.V(4).Info("Updated MachineSet", "MachineSet", klog.KObj(updatedMS))
return updatedMS, nil
Expand All @@ -178,11 +178,7 @@ func (r *Reconciler) createMachineSetAndWait(ctx context.Context, deployment *cl
}

// Create the MachineSet.
patchOptions := []client.PatchOption{
client.ForceOwnership,
client.FieldOwner(machineDeploymentManagerName),
}
if err = r.Client.Patch(ctx, newMS, client.Apply, patchOptions...); err != nil {
if _, err := ssa.Patch(ctx, r.Client, machineDeploymentManagerName, newMS); err != nil {
r.recorder.Eventf(deployment, corev1.EventTypeWarning, "FailedCreate", "Failed to create MachineSet %s: %v", klog.KObj(newMS), err)
return nil, errors.Wrapf(err, "failed to create new MachineSet %s", klog.KObj(newMS))
}
Expand Down Expand Up @@ -237,7 +233,7 @@ func (r *Reconciler) computeDesiredMachineSet(deployment *clusterv1.MachineDeplo
// As a result, we use the hash of the machine template while ignoring all in-place mutable fields, i.e. the
// machine template with only fields that could trigger a rollout for the machine-template-hash, making it
// independent of the changes to any in-place mutable fields.
templateHash, err := mdutil.ComputeSpewHash(mdutil.MachineTemplateDeepCopyRolloutFields(&deployment.Spec.Template))
templateHash, err := hash.Compute(mdutil.MachineTemplateDeepCopyRolloutFields(&deployment.Spec.Template))
if err != nil {
return nil, errors.Wrap(err, "failed to compute desired MachineSet: failed to compute machine template hash")
}
Expand Down
30 changes: 0 additions & 30 deletions internal/controllers/machinedeployment/mdutil/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,10 @@ package mdutil

import (
"fmt"
"hash"
"hash/fnv"
"sort"
"strconv"
"strings"

"github.com/davecgh/go-spew/spew"
"github.com/go-logr/logr"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -688,33 +685,6 @@ func CloneSelectorAndAddLabel(selector *metav1.LabelSelector, labelKey, labelVal
return newSelector
}

// SpewHashObject writes specified object to hash using the spew library
// which follows pointers and prints actual values of the nested objects
// ensuring the hash does not change when a pointer changes.
func SpewHashObject(hasher hash.Hash, objectToWrite interface{}) error {
hasher.Reset()
printer := spew.ConfigState{
Indent: " ",
SortKeys: true,
DisableMethods: true,
SpewKeys: true,
}

if _, err := printer.Fprintf(hasher, "%#v", objectToWrite); err != nil {
return fmt.Errorf("failed to write object to hasher")
}
return nil
}

// ComputeSpewHash computes the hash of a MachineTemplateSpec using the spew library.
func ComputeSpewHash(objectToWrite interface{}) (uint32, error) {
machineTemplateSpecHasher := fnv.New32a()
if err := SpewHashObject(machineTemplateSpecHasher, objectToWrite); err != nil {
return 0, err
}
return machineTemplateSpecHasher.Sum32(), nil
}

// GetDeletingMachineCount gets the number of machines that are in the process of being deleted
// in a machineList.
func GetDeletingMachineCount(machineList *clusterv1.MachineList) int32 {
Expand Down
22 changes: 7 additions & 15 deletions internal/controllers/machineset/machineset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ type Reconciler struct {
// WatchFilterValue is the label value used to filter events prior to reconciliation.
WatchFilterValue string

ssaCache ssa.Cache
recorder record.EventRecorder
}

Expand Down Expand Up @@ -122,6 +123,7 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt
}

r.recorder = mgr.GetEventRecorderFor("machineset-controller")
r.ssaCache = ssa.NewCache()
return nil
}

Expand Down Expand Up @@ -393,14 +395,12 @@ func (r *Reconciler) syncMachines(ctx context.Context, machineSet *clusterv1.Mac

// Update Machine to propagate in-place mutable fields from the MachineSet.
updatedMachine := r.computeDesiredMachine(machineSet, m)
patchOptions := []client.PatchOption{
client.ForceOwnership,
client.FieldOwner(machineSetManagerName),
}
if err := r.Client.Patch(ctx, updatedMachine, client.Apply, patchOptions...); err != nil {
updatedObject, err := ssa.Patch(ctx, r.Client, machineSetManagerName, updatedMachine, ssa.WithCachingProxy{Cache: r.ssaCache, Original: m})
if err != nil {
log.Error(err, "failed to update Machine", "Machine", klog.KObj(updatedMachine))
return errors.Wrapf(err, "failed to update Machine %q", klog.KObj(updatedMachine))
}
updatedMachine = updatedObject.(*clusterv1.Machine)
machines[i] = updatedMachine

infraMachine, err := external.Get(ctx, r.Client, &updatedMachine.Spec.InfrastructureRef, updatedMachine.Namespace)
Expand Down Expand Up @@ -529,11 +529,7 @@ func (r *Reconciler) syncReplicas(ctx context.Context, ms *clusterv1.MachineSet,
machine.Spec.InfrastructureRef = *infraRef

// Create the Machine.
patchOptions := []client.PatchOption{
client.ForceOwnership,
client.FieldOwner(machineSetManagerName),
}
if err := r.Client.Patch(ctx, machine, client.Apply, patchOptions...); err != nil {
if _, err := ssa.Patch(ctx, r.Client, machineSetManagerName, machine); err != nil {
log.Error(err, "Error while creating a machine")
r.recorder.Eventf(ms, corev1.EventTypeWarning, "FailedCreate", "Failed to create machine: %v", err)
errs = append(errs, err)
Expand Down Expand Up @@ -676,11 +672,7 @@ func (r *Reconciler) updateExternalObject(ctx context.Context, obj client.Object
updatedObject.SetLabels(machineLabelsFromMachineSet(machineSet))
updatedObject.SetAnnotations(machineAnnotationsFromMachineSet(machineSet))

patchOptions := []client.PatchOption{
client.ForceOwnership,
client.FieldOwner(machineSetManagerName),
}
if err := r.Client.Patch(ctx, updatedObject, client.Apply, patchOptions...); err != nil {
if _, err := ssa.Patch(ctx, r.Client, machineSetManagerName, updatedObject, ssa.WithCachingProxy{Cache: r.ssaCache, Original: obj}); err != nil {
return errors.Wrapf(err, "failed to update %s", klog.KObj(obj))
}
return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1175,7 +1175,7 @@ func TestMachineSetReconciler_syncMachines(t *testing.T) {

// Run syncMachines to clean up managed fields and have proper field ownership
// for Machines, InfrastructureMachines and BootstrapConfigs.
reconciler := &Reconciler{Client: env}
reconciler := &Reconciler{Client: env, ssaCache: ssa.NewCache()}
g.Expect(reconciler.syncMachines(ctx, ms, machines)).To(Succeed())

// The inPlaceMutatingMachine should have cleaned up managed fields.
Expand Down
7 changes: 4 additions & 3 deletions internal/controllers/topology/cluster/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"sigs.k8s.io/cluster-api/internal/hooks"
tlog "sigs.k8s.io/cluster-api/internal/log"
runtimeclient "sigs.k8s.io/cluster-api/internal/runtime/client"
"sigs.k8s.io/cluster-api/internal/util/ssa"
"sigs.k8s.io/cluster-api/internal/webhooks"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/annotations"
Expand Down Expand Up @@ -118,7 +119,7 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt
r.patchEngine = patches.NewEngine(r.RuntimeClient)
r.recorder = mgr.GetEventRecorderFor("topology/cluster")
if r.patchHelperFactory == nil {
r.patchHelperFactory = serverSideApplyPatchHelperFactory(r.Client)
r.patchHelperFactory = serverSideApplyPatchHelperFactory(r.Client, ssa.NewCache())
}
return nil
}
Expand Down Expand Up @@ -394,9 +395,9 @@ func (r *Reconciler) reconcileDelete(ctx context.Context, cluster *clusterv1.Clu
}

// serverSideApplyPatchHelperFactory makes use of managed fields provided by server side apply and is used by the controller.
func serverSideApplyPatchHelperFactory(c client.Client) structuredmerge.PatchHelperFactoryFunc {
func serverSideApplyPatchHelperFactory(c client.Client, ssaCache ssa.Cache) structuredmerge.PatchHelperFactoryFunc {
return func(ctx context.Context, original, modified client.Object, opts ...structuredmerge.HelperOption) (structuredmerge.PatchHelper, error) {
return structuredmerge.NewServerSidePatchHelper(ctx, original, modified, c, opts...)
return structuredmerge.NewServerSidePatchHelper(ctx, original, modified, c, ssaCache, opts...)
}
}

Expand Down
Loading

0 comments on commit 888c7d2

Please sign in to comment.