Skip to content

Commit

Permalink
✨ KubeadmControlPlane scale up serially
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Lipovetsky <dlipovetsky@d2iq.com>
  • Loading branch information
dlipovetsky committed Feb 5, 2020
1 parent 5ac3f42 commit 5486644
Show file tree
Hide file tree
Showing 5 changed files with 723 additions and 311 deletions.
108 changes: 96 additions & 12 deletions controlplane/kubeadm/controllers/kubeadm_control_plane_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"k8s.io/apiserver/pkg/storage/names"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal"
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal/etcd"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
Expand All @@ -47,6 +48,7 @@ import (
"sigs.k8s.io/cluster-api/controllers/external"
"sigs.k8s.io/cluster-api/controllers/remote"
controlplanev1 "sigs.k8s.io/cluster-api/controlplane/kubeadm/api/v1alpha3"
etcdutil "sigs.k8s.io/cluster-api/controlplane/kubeadm/internal/etcd/util"
capierrors "sigs.k8s.io/cluster-api/errors"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/kubeconfig"
Expand All @@ -67,14 +69,26 @@ const (
// +kubebuilder:rbac:groups=cluster.x-k8s.io,resources=machines;machines/status,verbs=get;list;watch;create;update;patch;delete

// KubeadmControlPlaneReconciler reconciles a KubeadmControlPlane object

type EtcdClient interface {
Close() error
Members(ctx context.Context) ([]*etcd.Member, error)
MoveLeader(ctx context.Context, newLeaderID uint64) error
RemoveMember(ctx context.Context, id uint64) error
UpdateMemberPeerURLs(ctx context.Context, id uint64, peerURLs []string) ([]*etcd.Member, error)
Alarms(ctx context.Context) ([]etcd.MemberAlarm, error)
}

type EtcdClientGetter func(c client.Client, nodeName string) (EtcdClient, error)
type KubeadmControlPlaneReconciler struct {
Client client.Client
Log logr.Logger
scheme *runtime.Scheme
controller controller.Controller
recorder record.EventRecorder

remoteClientGetter remote.ClusterClientGetter
remoteClientGetter remote.ClusterClientGetter
remoteEtcdClientGetter EtcdClientGetter
}

func (r *KubeadmControlPlaneReconciler) SetupWithManager(mgr ctrl.Manager, options controller.Options) error {
Expand Down Expand Up @@ -248,16 +262,21 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, cluster *
r.recorder.Eventf(kcp, corev1.EventTypeWarning, "FailedInitialization", "Failed to initialize the control plane: %v", err)
return ctrl.Result{}, err
}
if numMachines+1 < desiredReplicas {
return ctrl.Result{Requeue: true}, nil
}
// scaling up
case numMachines < desiredReplicas && numMachines > 0:
// create a new Machine w/ join
logger.Info("Scaling up", "Desired Replicas", desiredReplicas, "Existing Replicas", numMachines)
wantMachines := desiredReplicas - numMachines
if err := r.scaleUpControlPlane(ctx, cluster, kcp, wantMachines); err != nil {
logger.Info("Scaling up by one replica", "Desired Replicas", desiredReplicas, "Existing Replicas", numMachines)
if err := r.scaleUpControlPlane(ctx, cluster, kcp, ownedMachines); err != nil {
logger.Error(err, "Failed to scale up the Control Plane")
r.recorder.Eventf(kcp, corev1.EventTypeWarning, "FailedScaleUp", "Failed to scale up the control plane: %v", err)
return ctrl.Result{}, err
}
if numMachines+1 < desiredReplicas {
return ctrl.Result{Requeue: true}, nil
}
// scaling down
case numMachines > desiredReplicas:
logger.Info("Scaling down", "Desired Replicas", desiredReplicas, "Existing Replicas", numMachines)
Expand Down Expand Up @@ -320,22 +339,87 @@ func (r *KubeadmControlPlaneReconciler) updateStatus(ctx context.Context, kcp *c
return nil
}

func (r *KubeadmControlPlaneReconciler) scaleUpControlPlane(ctx context.Context, cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane, numMachines int) error {
func (r *KubeadmControlPlaneReconciler) scaleUpControlPlane(ctx context.Context, cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane, ownedMachines *clusterv1.MachineList) error {
// Every control plane replica must be Ready
if int(kcp.Status.UnavailableReplicas) > 0 {
return errors.New(fmt.Sprintf("%d control plane replicas are not yet Ready", int(kcp.Status.UnavailableReplicas)))
}

var errs []error
var knownClusterID uint64
var knownMemberIDSet etcdutil.UInt64

for _, om := range ownedMachines.Items {
// Get the control plane machine Node Name from the NodeRef
if om.Status.NodeRef == nil {
// A Ready control plane machine should have a non-nil NodeRef
errs = append(errs, errors.New(fmt.Sprintf("internal error: control plane machine %s is Ready, but has a nil NodeRef", om.Name)))
continue
}
nodeName := om.Status.NodeRef.Name

// Create the etcd client for the etcd Pod scheduled on the Node
etcdClient, err := r.remoteEtcdClientGetter(r.Client, nodeName)
if err != nil {
errs = append(errs, errors.New(fmt.Sprintf("failed to create etcd client for control plane machine %s", om.Name)))
continue
}

// List etcd members. This checks that the member is healthy, because the request goes through consensus.
members, err := etcdClient.Members(ctx)
if err != nil {
errs = append(errs, errors.New(fmt.Sprintf("failed to list etcd members using etcd client for control plane machine %s", om.Name)))
continue
}
member := etcdutil.MemberForName(members, nodeName)

// Check that the member reports no alarms.
if len(member.Alarms) > 0 {
errs = append(errs, errors.New(fmt.Sprintf("etcd member for control plane machine %s reports alarms: %v", om.Name, member.Alarms)))
continue
}

// Check that the member belongs to the same cluster as all other members.
clusterID := member.ClusterID
if knownClusterID == 0 {
knownClusterID = clusterID
} else if knownClusterID != clusterID {
errs = append(errs, errors.New(fmt.Sprintf("etcd member for control plane machine %s has cluster ID %d, but all previously seen etcd members have cluster ID %d", om.Name, clusterID, knownClusterID)))
continue
}

// Check that the member list is stable.
memberIDSet := etcdutil.MemberIDSet(members)
if knownMemberIDSet.Len() == 0 {
knownMemberIDSet = memberIDSet
} else {
unknownMembers := memberIDSet.Difference(knownMemberIDSet)
if unknownMembers.Len() > 0 {
errs = append(errs, errors.New(fmt.Sprintf("etcd member for control plane machine %s reports members IDs %v, but all previously seen etcd members reported member IDs %v", om.Name, memberIDSet.UnsortedList(), knownMemberIDSet.UnsortedList())))
}
continue
}
}

// Check that there is exactly one etcd member for every control plane machine.
// There should be no etcd members added "out of band.""
if len(ownedMachines.Items) != len(knownMemberIDSet) {
errs = append(errs, errors.New(fmt.Sprintf("there are %d control plane machines, but %d etcd members", len(ownedMachines.Items), len(knownMemberIDSet))))
}

if len(errs) > 0 {
return kerrors.NewAggregate(errs)
}

// Create the bootstrap configuration
bootstrapSpec := kcp.Spec.KubeadmConfigSpec.DeepCopy()
bootstrapSpec.InitConfiguration = nil
bootstrapSpec.ClusterConfiguration = nil

for i := 0; i < numMachines; i++ {
err := r.cloneConfigsAndGenerateMachine(ctx, cluster, kcp, bootstrapSpec)
if err != nil {
errs = append(errs, errors.Wrap(err, "failed to clone and create an additional control plane Machine"))
}
if err := r.cloneConfigsAndGenerateMachine(ctx, cluster, kcp, bootstrapSpec); err != nil {
return errors.Wrap(err, "failed to clone and create an additional control plane Machine")
}

return kerrors.NewAggregate(errs)
return nil
}

func (r *KubeadmControlPlaneReconciler) initializeControlPlane(ctx context.Context, cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane) error {
Expand Down
Loading

0 comments on commit 5486644

Please sign in to comment.