Skip to content

Commit

Permalink
✨ Adds kubeadm control plane scale up/down
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Lipovetsky <dlipovetsky@d2iq.com>
  • Loading branch information
dlipovetsky committed Feb 14, 2020
1 parent 5aea6d2 commit 386d3bf
Show file tree
Hide file tree
Showing 5 changed files with 387 additions and 383 deletions.
135 changes: 101 additions & 34 deletions controlplane/kubeadm/controllers/kubeadm_control_plane_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package controllers
import (
"context"
"fmt"
"sort"
"strings"
"time"

Expand Down Expand Up @@ -59,8 +60,18 @@ const (
// DeleteRequeueAfter is how long to wait before checking again to see if
// all control plane machines have been deleted.
DeleteRequeueAfter = 30 * time.Second

// HealthCheckFailedRequeueAfter is how long to wait before trying to scale
// up/down if some target cluster health check has failed
HealthCheckFailedRequeueAfter = 20 * time.Second
)

type managementCluster interface {
GetMachinesForCluster(ctx context.Context, cluster types.NamespacedName, filters ...func(machine *clusterv1.Machine) bool) ([]*clusterv1.Machine, error)
TargetClusterControlPlaneIsHealthy(ctx context.Context, clusterKey types.NamespacedName, controlPlaneName string) error
TargetClusterEtcdIsHealthy(ctx context.Context, clusterKey types.NamespacedName, controlPlaneName string) error
}

// +kubebuilder:rbac:groups=core,resources=events,verbs=get;list;watch;create;patch
// +kubebuilder:rbac:groups=core,resources=secrets,verbs=get;list;watch;create;patch
// +kubebuilder:rbac:groups=infrastructure.cluster.x-k8s.io;bootstrap.cluster.x-k8s.io;controlplane.cluster.x-k8s.io,resources=*,verbs=get;list;watch;create;update;patch;delete
Expand All @@ -77,7 +88,7 @@ type KubeadmControlPlaneReconciler struct {

remoteClientGetter remote.ClusterClientGetter

managementCluster *internal.ManagementCluster
managementCluster managementCluster
}

func (r *KubeadmControlPlaneReconciler) SetupWithManager(mgr ctrl.Manager, options controller.Options) error {
Expand Down Expand Up @@ -210,7 +221,6 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, cluster *
if requeueErr, ok := errors.Cause(err).(capierrors.HasRequeueAfterError); ok {
logger.Error(err, "required certificates not found, requeueing")
return ctrl.Result{
Requeue: true,
RequeueAfter: requeueErr.GetRequeueAfter(),
}, nil
}
Expand Down Expand Up @@ -251,29 +261,36 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, cluster *
switch {
// We are creating the first replica
case numMachines < desiredReplicas && numMachines == 0:
// create new Machine w/ init
logger.Info("Scaling to 1", "Desired Replicas", desiredReplicas, "Existing Replicas", numMachines)
if err := r.initializeControlPlane(ctx, cluster, kcp); err != nil {
logger.Error(err, "Failed to initialize the Control Plane")
r.recorder.Eventf(kcp, corev1.EventTypeWarning, "FailedInitialization", "Failed to initialize the control plane: %v", err)
return ctrl.Result{}, err
// Create new Machine w/ init
logger.Info("Initializing control plane", "Desired", desiredReplicas, "Existing", numMachines)
result, err := r.initializeControlPlane(ctx, cluster, kcp)
if err != nil {
logger.Error(err, "Failed to initialize control plane")
r.recorder.Eventf(kcp, corev1.EventTypeWarning, "FailedInitialization", "Failed to initialize cluster %s/%s control plane: %v", cluster.Namespace, cluster.Name, err)
}
// scaling up
// TODO: return the error if it is unexpected and should cause an immediate requeue
return result, nil
// We are scaling up
case numMachines < desiredReplicas && numMachines > 0:
// create a new Machine w/ join
logger.Info("Scaling up", "Desired Replicas", desiredReplicas, "Existing Replicas", numMachines)
wantMachines := desiredReplicas - numMachines
if err := r.scaleUpControlPlane(ctx, cluster, kcp, wantMachines); err != nil {
logger.Error(err, "Failed to scale up the Control Plane")
r.recorder.Eventf(kcp, corev1.EventTypeWarning, "FailedScaleUp", "Failed to scale up the control plane: %v", err)
return ctrl.Result{}, err
// Create a new Machine w/ join
logger.Info("Scaling up control plane", "Desired", desiredReplicas, "Existing", numMachines)
result, err := r.scaleUpControlPlane(ctx, cluster, kcp)
if err != nil {
logger.Error(err, "Failed to scale up control plane")
r.recorder.Eventf(kcp, corev1.EventTypeWarning, "FailedScaleUp", "Failed to scale up cluster %s/%s control plane: %v", cluster.Namespace, cluster.Name, err)
}
// scaling down
// TODO: return the error if it is unexpected and should cause an immediate requeue
return result, nil
// We are scaling down
case numMachines > desiredReplicas:
logger.Info("Scaling down", "Desired Replicas", desiredReplicas, "Existing Replicas", numMachines)
err := errors.New("Not Implemented")
logger.Error(err, "Should delete the appropriate Machine here.")
return ctrl.Result{}, err
logger.Info("Scaling down control plane", "Desired", desiredReplicas, "Existing", numMachines)
result, err := r.scaleDownControlPlane(ctx, cluster, kcp)
if err != nil {
logger.Error(err, "Failed to scale down control plane")
r.recorder.Eventf(kcp, corev1.EventTypeWarning, "FailedScaleDown", "Failed to scale down cluster %s/%s control plane: %v", cluster.Namespace, cluster.Name, err)
}
// TODO: return the error if it is unexpected and should cause an immediate requeue
return result, nil
}

return ctrl.Result{}, nil
Expand Down Expand Up @@ -346,28 +363,70 @@ func (r *KubeadmControlPlaneReconciler) upgradeControlPlane(ctx context.Context,
return nil
}

func (r *KubeadmControlPlaneReconciler) scaleUpControlPlane(ctx context.Context, cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane, numMachines int) error {
var errs []error
func (r *KubeadmControlPlaneReconciler) initializeControlPlane(ctx context.Context, cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane) (ctrl.Result, error) {
bootstrapSpec := kcp.Spec.KubeadmConfigSpec.DeepCopy()
bootstrapSpec.JoinConfiguration = nil

if err := r.cloneConfigsAndGenerateMachine(ctx, cluster, kcp, bootstrapSpec); err != nil {
return ctrl.Result{}, errors.Wrapf(err, "failed to create control plane Machine for cluster %s/%s", cluster.Name, cluster.Namespace)
}

// Requeue the control plane, in case we are going to scale up
return ctrl.Result{Requeue: true}, nil
}

func (r *KubeadmControlPlaneReconciler) scaleUpControlPlane(ctx context.Context, cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane) (ctrl.Result, error) {
if err := r.managementCluster.TargetClusterControlPlaneIsHealthy(ctx, clusterKey(cluster), kcp.Name); err != nil {
return ctrl.Result{RequeueAfter: HealthCheckFailedRequeueAfter}, errors.Wrap(err, "control plane is not healthy")
}

if err := r.managementCluster.TargetClusterEtcdIsHealthy(ctx, clusterKey(cluster), kcp.Name); err != nil {
return ctrl.Result{RequeueAfter: HealthCheckFailedRequeueAfter}, errors.Wrap(err, "etcd cluster is not healthy")
}

// Create the bootstrap configuration
bootstrapSpec := kcp.Spec.KubeadmConfigSpec.DeepCopy()
bootstrapSpec.InitConfiguration = nil
bootstrapSpec.ClusterConfiguration = nil

for i := 0; i < numMachines; i++ {
err := r.cloneConfigsAndGenerateMachine(ctx, cluster, kcp, bootstrapSpec)
if err != nil {
errs = append(errs, errors.Wrap(err, "failed to clone and create an additional control plane Machine"))
}
if err := r.cloneConfigsAndGenerateMachine(ctx, cluster, kcp, bootstrapSpec); err != nil {
return ctrl.Result{}, errors.Wrapf(err, "failed to create control plane Machine for cluster %s/%s", cluster.Name, cluster.Namespace)
}

return kerrors.NewAggregate(errs)
// Requeue the control plane, in case we are not done scaling up
return ctrl.Result{Requeue: true}, nil
}

func (r *KubeadmControlPlaneReconciler) initializeControlPlane(ctx context.Context, cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane) error {
bootstrapSpec := kcp.Spec.KubeadmConfigSpec.DeepCopy()
bootstrapSpec.JoinConfiguration = nil
return r.cloneConfigsAndGenerateMachine(ctx, cluster, kcp, bootstrapSpec)
func (r *KubeadmControlPlaneReconciler) scaleDownControlPlane(ctx context.Context, cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane) (ctrl.Result, error) {
if err := r.managementCluster.TargetClusterControlPlaneIsHealthy(ctx, clusterKey(cluster), kcp.Name); err != nil {
return ctrl.Result{RequeueAfter: HealthCheckFailedRequeueAfter}, errors.Wrap(err, "control plane is not healthy")
}

if err := r.managementCluster.TargetClusterEtcdIsHealthy(ctx, clusterKey(cluster), kcp.Name); err != nil {
return ctrl.Result{RequeueAfter: HealthCheckFailedRequeueAfter}, errors.Wrap(err, "etcd cluster is not healthy")
}

ownedMachines, err := r.managementCluster.GetMachinesForCluster(ctx, clusterKey(cluster), internal.OwnedControlPlaneMachines(kcp.Name))
if err != nil {
return ctrl.Result{}, err
}

// Wait for any delete in progress to complete before deleting another Machine
if len(internal.FilterMachines(ownedMachines, internal.HasDeletionTimestamp())) > 0 {
return ctrl.Result{RequeueAfter: DeleteRequeueAfter}, nil
}

machineToDelete, err := oldestMachine(ownedMachines)
if err != nil {
return ctrl.Result{}, errors.Wrap(err, "failed to pick control plane Machine to delete")
}

if err := r.Client.Delete(ctx, machineToDelete); err != nil && !apierrors.IsNotFound(err) {
return ctrl.Result{}, errors.Wrapf(err, "failed to delete control plane Machine %s/%s", machineToDelete.Namespace, machineToDelete.Name)
}

// Requeue the control plane, in case we are not done scaling down
return ctrl.Result{Requeue: true}, nil
}

func (r *KubeadmControlPlaneReconciler) cloneConfigsAndGenerateMachine(ctx context.Context, cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane, bootstrapSpec *bootstrapv1.KubeadmConfigSpec) error {
Expand Down Expand Up @@ -558,7 +617,7 @@ func (r *KubeadmControlPlaneReconciler) reconcileDelete(ctx context.Context, clu
if errs != nil {
return ctrl.Result{}, kerrors.NewAggregate(errs)
}
return ctrl.Result{Requeue: true, RequeueAfter: DeleteRequeueAfter}, nil
return ctrl.Result{RequeueAfter: DeleteRequeueAfter}, nil
}

func (r *KubeadmControlPlaneReconciler) reconcileKubeconfig(ctx context.Context, clusterName types.NamespacedName, endpoint clusterv1.APIEndpoint, kcp *controlplanev1.KubeadmControlPlane) error {
Expand Down Expand Up @@ -667,3 +726,11 @@ func clusterKey(cluster *clusterv1.Cluster) types.NamespacedName {
Name: cluster.Name,
}
}

func oldestMachine(machines []*clusterv1.Machine) (*clusterv1.Machine, error) {
if len(machines) == 0 {
return &clusterv1.Machine{}, errors.New("no machines given")
}
sort.Sort(util.MachinesByCreationTimestamp(machines))
return machines[0], nil
}
Loading

0 comments on commit 386d3bf

Please sign in to comment.