Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ KubeadmControlPlane scale up serially #2193

Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 64 additions & 72 deletions controlplane/kubeadm/controllers/kubeadm_control_plane_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ type KubeadmControlPlaneReconciler struct {
controller controller.Controller
recorder record.EventRecorder

managementCluster *internal.ManagementCluster

remoteClientGetter remote.ClusterClientGetter
}

Expand Down Expand Up @@ -105,6 +107,7 @@ func (r *KubeadmControlPlaneReconciler) SetupWithManager(mgr ctrl.Manager, optio
func (r *KubeadmControlPlaneReconciler) Reconcile(req ctrl.Request) (res ctrl.Result, reterr error) {
logger := r.Log.WithValues("kubeadmControlPlane", req.Name, "namespace", req.Namespace)
ctx := context.Background()
r.managementCluster = &internal.ManagementCluster{Client: r.Client}

// Fetch the KubeadmControlPlane instance.
kcp := &controlplanev1.KubeadmControlPlane{}
Expand Down Expand Up @@ -183,12 +186,15 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, cluster *
}

// TODO: handle proper adoption of Machines
allMachines, err := r.getMachines(ctx, types.NamespacedName{Namespace: cluster.Namespace, Name: cluster.Name})
clusterKey := types.NamespacedName{
Namespace: cluster.GetNamespace(),
Name: cluster.GetName(),
}
ownedMachines, err := r.managementCluster.GetMachinesForCluster(ctx, clusterKey, internal.OwnedControlPlaneMachines(kcp.Name))
if err != nil {
logger.Error(err, "Failed to get list of machines")
return ctrl.Result{}, err
}
ownedMachines := r.filterOwnedMachines(kcp, allMachines)

// Generate Cluster Certificates if needed
config := kcp.Spec.KubeadmConfigSpec.DeepCopy()
Expand Down Expand Up @@ -235,7 +241,7 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, cluster *

// Currently we are not handling upgrade, so treat all owned machines as one for now.
// Once we start handling upgrade, we'll need to filter this list and act appropriately
numMachines := len(ownedMachines.Items)
numMachines := len(ownedMachines)
desiredReplicas := int(*kcp.Spec.Replicas)

switch {
Expand All @@ -248,16 +254,21 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, cluster *
r.recorder.Eventf(kcp, corev1.EventTypeWarning, "FailedInitialization", "Failed to initialize the control plane: %v", err)
return ctrl.Result{}, err
}
if numMachines+1 < desiredReplicas {
return ctrl.Result{Requeue: true}, nil
}
// scaling up
case numMachines < desiredReplicas && numMachines > 0:
dlipovetsky marked this conversation as resolved.
Show resolved Hide resolved
// create a new Machine w/ join
logger.Info("Scaling up", "Desired Replicas", desiredReplicas, "Existing Replicas", numMachines)
wantMachines := desiredReplicas - numMachines
if err := r.scaleUpControlPlane(ctx, cluster, kcp, wantMachines); err != nil {
logger.Info("Scaling up by one replica", "Desired Replicas", desiredReplicas, "Existing Replicas", numMachines)
if err := r.scaleUpControlPlane(ctx, cluster, kcp); err != nil {
logger.Error(err, "Failed to scale up the Control Plane")
r.recorder.Eventf(kcp, corev1.EventTypeWarning, "FailedScaleUp", "Failed to scale up the control plane: %v", err)
return ctrl.Result{}, err
}
if numMachines+1 < desiredReplicas {
return ctrl.Result{Requeue: true}, nil
}
// scaling down
case numMachines > desiredReplicas:
logger.Info("Scaling down", "Desired Replicas", desiredReplicas, "Existing Replicas", numMachines)
Expand All @@ -270,23 +281,21 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, cluster *
}

func (r *KubeadmControlPlaneReconciler) updateStatus(ctx context.Context, kcp *controlplanev1.KubeadmControlPlane, cluster *clusterv1.Cluster) error {
labelSelector := generateKubeadmControlPlaneSelector(cluster.Name)
selector, err := metav1.LabelSelectorAsSelector(labelSelector)
if err != nil {
// Since we are building up the LabelSelector above, this should not fail
return errors.Wrap(err, "failed to parse label selector")
}
selector := r.managementCluster.ControlPlaneSelectorForCluster(cluster.Name)
// Copy label selector to its status counterpart in string format.
// This is necessary for CRDs including scale subresources.
kcp.Status.Selector = selector.String()

allMachines, err := r.getMachines(ctx, types.NamespacedName{Namespace: cluster.Namespace, Name: cluster.Name})
clusterKey := types.NamespacedName{
Namespace: cluster.Namespace,
Name: cluster.Name,
}
ownedMachines, err := r.managementCluster.GetMachinesForCluster(ctx, clusterKey, internal.OwnedControlPlaneMachines(kcp.Name))
if err != nil {
return errors.Wrap(err, "failed to get list of owned machines")
}
ownedMachines := r.filterOwnedMachines(kcp, allMachines)

replicas := int32(len(ownedMachines.Items))
replicas := int32(len(ownedMachines))
// TODO: take into account configuration hash once upgrades are in place
kcp.Status.Replicas = replicas

Expand All @@ -296,8 +305,8 @@ func (r *KubeadmControlPlaneReconciler) updateStatus(ctx context.Context, kcp *c
}

readyMachines := int32(0)
for i := range ownedMachines.Items {
m := &ownedMachines.Items[i]
for i := range ownedMachines {
m := &ownedMachines[i]
node, err := getMachineNode(ctx, remoteClient, m)
if err != nil {
return errors.Wrap(err, "failed to get referenced Node")
Expand All @@ -320,22 +329,30 @@ func (r *KubeadmControlPlaneReconciler) updateStatus(ctx context.Context, kcp *c
return nil
}

func (r *KubeadmControlPlaneReconciler) scaleUpControlPlane(ctx context.Context, cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane, numMachines int) error {
var errs []error
func (r *KubeadmControlPlaneReconciler) scaleUpControlPlane(ctx context.Context, cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane) error {
// Every control plane replica must be Ready
if int(kcp.Status.UnavailableReplicas) > 0 {
dlipovetsky marked this conversation as resolved.
Show resolved Hide resolved
return errors.New(fmt.Sprintf("%d control plane replicas are not yet Ready", int(kcp.Status.UnavailableReplicas)))
}
clusterKey := types.NamespacedName{
Namespace: cluster.Namespace,
Name: cluster.Name,
}
// TODO(chuckha) Need to skip/rework this for external etcd case and assume it's working.
// This assumes that there is a 1;1 correspondence between control plane nodes and etcd members.
if err := r.managementCluster.TargetClusterEtcdIsHealthy(ctx, clusterKey, kcp.Name); err != nil {
return err
}

// Create the bootstrap configuration
bootstrapSpec := kcp.Spec.KubeadmConfigSpec.DeepCopy()
bootstrapSpec.InitConfiguration = nil
bootstrapSpec.ClusterConfiguration = nil

for i := 0; i < numMachines; i++ {
err := r.cloneConfigsAndGenerateMachine(ctx, cluster, kcp, bootstrapSpec)
if err != nil {
errs = append(errs, errors.Wrap(err, "failed to clone and create an additional control plane Machine"))
}
if err := r.cloneConfigsAndGenerateMachine(ctx, cluster, kcp, bootstrapSpec); err != nil {
return errors.Wrap(err, "failed to clone and create an additional control plane Machine")
}

return kerrors.NewAggregate(errs)
return nil
}

func (r *KubeadmControlPlaneReconciler) initializeControlPlane(ctx context.Context, cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane) error {
Expand Down Expand Up @@ -363,7 +380,7 @@ func (r *KubeadmControlPlaneReconciler) cloneConfigsAndGenerateMachine(ctx conte
Namespace: kcp.Namespace,
OwnerRef: infraCloneOwner,
ClusterName: cluster.Name,
Labels: generateKubeadmControlPlaneLabels(cluster.Name),
Labels: r.managementCluster.ControlPlaneLabelsForCluster(cluster.Name),
})
if err != nil {
// Safe to return early here since no resources have been created yet.
Expand Down Expand Up @@ -455,13 +472,16 @@ func (r *KubeadmControlPlaneReconciler) failureDomainForScaleUp(ctx context.Cont
if len(cluster.Status.FailureDomains) == 0 {
return nil, nil
}
machineList, err := r.getMachines(ctx, types.NamespacedName{Namespace: cluster.GetNamespace(), Name: cluster.GetName()})
clusterKey := types.NamespacedName{
Namespace: cluster.Namespace,
Name: cluster.Name,
}
ownedMachines, err := r.managementCluster.GetMachinesForCluster(ctx, clusterKey, internal.OwnedControlPlaneMachines(kcp.Name))
if err != nil {
return nil, err
}
machineList = r.filterOwnedMachines(kcp, machineList)
picker := internal.FailureDomainPicker{Log: r.Log}
failureDomain := picker.PickFewest(cluster.Status.FailureDomains.FilterControlPlane(), machineList.Items)
failureDomain := picker.PickFewest(cluster.Status.FailureDomains.FilterControlPlane(), ownedMachines)
return &failureDomain, nil
}

Expand All @@ -475,7 +495,7 @@ func (r *KubeadmControlPlaneReconciler) generateMachine(ctx context.Context, kcp
ObjectMeta: metav1.ObjectMeta{
Name: names.SimpleNameGenerator.GenerateName(kcp.Name + "-"),
Namespace: kcp.Namespace,
Labels: generateKubeadmControlPlaneLabels(cluster.Name),
Labels: r.managementCluster.ControlPlaneLabelsForCluster(cluster.Name),
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(kcp, controlplanev1.GroupVersion.WithKind("KubeadmControlPlane")),
},
Expand All @@ -498,49 +518,43 @@ func (r *KubeadmControlPlaneReconciler) generateMachine(ctx context.Context, kcp
return nil
}

func generateKubeadmControlPlaneSelector(clusterName string) *metav1.LabelSelector {
return &metav1.LabelSelector{
MatchLabels: generateKubeadmControlPlaneLabels(clusterName),
}
}

func generateKubeadmControlPlaneLabels(clusterName string) map[string]string {
return map[string]string{
clusterv1.ClusterLabelName: clusterName,
clusterv1.MachineControlPlaneLabelName: "",
}
}

// reconcileDelete handles KubeadmControlPlane deletion.
// The implementation does not take non-control plane workloads into
// consideration. This may or may not change in the future. Please see
// https://github.com/kubernetes-sigs/cluster-api/issues/2064
func (r *KubeadmControlPlaneReconciler) reconcileDelete(ctx context.Context, cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane, logger logr.Logger) (_ ctrl.Result, reterr error) {
// Fetch Machines
allMachines, err := util.GetMachinesForCluster(ctx, r.Client, cluster)
clusterKey := types.NamespacedName{
Namespace: cluster.Namespace,
Name: cluster.Name,
}
dlipovetsky marked this conversation as resolved.
Show resolved Hide resolved
allMachines, err := r.managementCluster.GetMachinesForCluster(ctx, clusterKey)
if err != nil {
logger.Error(err, "Failed to get list of machines")
return ctrl.Result{}, err
}
ownedMachines := r.filterOwnedMachines(kcp, allMachines)
controlPlaneMachines, err := r.managementCluster.GetMachinesForCluster(ctx, clusterKey, internal.OwnedControlPlaneMachines(kcp.Name))
if err != nil {
return ctrl.Result{}, err
}

// Verify that only control plane machines remain
if len(allMachines.Items) != len(ownedMachines.Items) {
if len(allMachines) != len(controlPlaneMachines) {
err := errors.New("at least one machine is not owned by the control plane")
logger.Error(err, "Failed to delete the control plane")
return ctrl.Result{}, err
}

// If no control plane machines remain, remove the finalizer
if len(ownedMachines.Items) == 0 {
if len(controlPlaneMachines) == 0 {
controllerutil.RemoveFinalizer(kcp, controlplanev1.KubeadmControlPlaneFinalizer)
return ctrl.Result{}, nil
}

// Delete control plane machines in parallel
var errs []error
for i := range ownedMachines.Items {
m := &ownedMachines.Items[i]
for i := range controlPlaneMachines {
m := &controlPlaneMachines[i]
if err := r.Client.Delete(ctx, m); err != nil && !apierrors.IsNotFound(err) {
errs = append(errs, errors.Wrap(err, "failed to cleanup owned machines"))
}
Expand Down Expand Up @@ -581,28 +595,6 @@ func (r *KubeadmControlPlaneReconciler) reconcileKubeconfig(ctx context.Context,
return nil
}

func (r *KubeadmControlPlaneReconciler) getMachines(ctx context.Context, clusterName types.NamespacedName) (*clusterv1.MachineList, error) {
selector := generateKubeadmControlPlaneLabels(clusterName.Name)
allMachines := &clusterv1.MachineList{}
if err := r.Client.List(ctx, allMachines, client.InNamespace(clusterName.Namespace), client.MatchingLabels(selector)); err != nil {
return nil, errors.Wrap(err, "failed to list machines")
}
return allMachines, nil
}

func (r *KubeadmControlPlaneReconciler) filterOwnedMachines(kcp *controlplanev1.KubeadmControlPlane, allMachines *clusterv1.MachineList) *clusterv1.MachineList {
ownedMachines := &clusterv1.MachineList{}
for i := range allMachines.Items {
m := allMachines.Items[i]
controllerRef := metav1.GetControllerOf(&m)
if controllerRef != nil && controllerRef.Kind == "KubeadmControlPlane" && controllerRef.Name == kcp.Name {
ownedMachines.Items = append(ownedMachines.Items, m)
}
}

return ownedMachines
}

func (r *KubeadmControlPlaneReconciler) reconcileExternalReference(ctx context.Context, cluster *clusterv1.Cluster, ref corev1.ObjectReference) error {
if !strings.HasSuffix(ref.Kind, external.TemplateSuffix) {
return nil
Expand Down
Loading