Skip to content

Commit

Permalink
Merge pull request #1 from chuckha/control-plane-scale-serially
Browse files Browse the repository at this point in the history
🏃 Refactor healthcheck logic into new package
  • Loading branch information
dlipovetsky committed Feb 7, 2020
2 parents 1f4e55d + 26e7b3b commit d7fd9d7
Show file tree
Hide file tree
Showing 6 changed files with 456 additions and 127 deletions.
166 changes: 49 additions & 117 deletions controlplane/kubeadm/controllers/kubeadm_control_plane_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"k8s.io/apiserver/pkg/storage/names"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal"
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal/etcd"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
Expand All @@ -48,7 +47,6 @@ import (
"sigs.k8s.io/cluster-api/controllers/external"
"sigs.k8s.io/cluster-api/controllers/remote"
controlplanev1 "sigs.k8s.io/cluster-api/controlplane/kubeadm/api/v1alpha3"
etcdutil "sigs.k8s.io/cluster-api/controlplane/kubeadm/internal/etcd/util"
capierrors "sigs.k8s.io/cluster-api/errors"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/kubeconfig"
Expand All @@ -70,25 +68,16 @@ const (

// KubeadmControlPlaneReconciler reconciles a KubeadmControlPlane object

type EtcdClient interface {
Close() error
Members(ctx context.Context) ([]*etcd.Member, error)
MoveLeader(ctx context.Context, newLeaderID uint64) error
RemoveMember(ctx context.Context, id uint64) error
UpdateMemberPeerURLs(ctx context.Context, id uint64, peerURLs []string) ([]*etcd.Member, error)
Alarms(ctx context.Context) ([]etcd.MemberAlarm, error)
}

type EtcdClientGetter func(c client.Client, nodeName string) (EtcdClient, error)
type KubeadmControlPlaneReconciler struct {
Client client.Client
Log logr.Logger
scheme *runtime.Scheme
controller controller.Controller
recorder record.EventRecorder

managementCluster *internal.ManagementCluster

remoteClientGetter remote.ClusterClientGetter
remoteEtcdClientGetter EtcdClientGetter
}

func (r *KubeadmControlPlaneReconciler) SetupWithManager(mgr ctrl.Manager, options controller.Options) error {
Expand Down Expand Up @@ -119,6 +108,7 @@ func (r *KubeadmControlPlaneReconciler) SetupWithManager(mgr ctrl.Manager, optio
func (r *KubeadmControlPlaneReconciler) Reconcile(req ctrl.Request) (res ctrl.Result, reterr error) {
logger := r.Log.WithValues("kubeadmControlPlane", req.Name, "namespace", req.Namespace)
ctx := context.Background()
r.managementCluster = &internal.ManagementCluster{Client: r.Client}

// Fetch the KubeadmControlPlane instance.
kcp := &controlplanev1.KubeadmControlPlane{}
Expand Down Expand Up @@ -197,12 +187,15 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, cluster *
}

// TODO: handle proper adoption of Machines
allMachines, err := r.getMachines(ctx, types.NamespacedName{Namespace: cluster.Namespace, Name: cluster.Name})
clusterKey := types.NamespacedName{
Namespace: cluster.GetNamespace(),
Name: cluster.GetName(),
}
ownedMachines, err := r.managementCluster.GetMachinesForCluster(ctx, clusterKey, internal.OwnedControlPlaneMachines(kcp.Name))
if err != nil {
logger.Error(err, "Failed to get list of machines")
return ctrl.Result{}, err
}
ownedMachines := r.filterOwnedMachines(kcp, allMachines)

// Generate Cluster Certificates if needed
config := kcp.Spec.KubeadmConfigSpec.DeepCopy()
Expand Down Expand Up @@ -249,7 +242,7 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, cluster *

// Currently we are not handling upgrade, so treat all owned machines as one for now.
// Once we start handling upgrade, we'll need to filter this list and act appropriately
numMachines := len(ownedMachines.Items)
numMachines := len(ownedMachines)
desiredReplicas := int(*kcp.Spec.Replicas)

switch {
Expand All @@ -269,7 +262,7 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, cluster *
case numMachines < desiredReplicas && numMachines > 0:
// create a new Machine w/ join
logger.Info("Scaling up by one replica", "Desired Replicas", desiredReplicas, "Existing Replicas", numMachines)
if err := r.scaleUpControlPlane(ctx, cluster, kcp, ownedMachines); err != nil {
if err := r.scaleUpControlPlane(ctx, cluster, kcp); err != nil {
logger.Error(err, "Failed to scale up the Control Plane")
r.recorder.Eventf(kcp, corev1.EventTypeWarning, "FailedScaleUp", "Failed to scale up the control plane: %v", err)
return ctrl.Result{}, err
Expand All @@ -289,23 +282,21 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, cluster *
}

func (r *KubeadmControlPlaneReconciler) updateStatus(ctx context.Context, kcp *controlplanev1.KubeadmControlPlane, cluster *clusterv1.Cluster) error {
labelSelector := generateKubeadmControlPlaneSelector(cluster.Name)
selector, err := metav1.LabelSelectorAsSelector(labelSelector)
if err != nil {
// Since we are building up the LabelSelector above, this should not fail
return errors.Wrap(err, "failed to parse label selector")
}
selector := r.managementCluster.ControlPlaneSelectorForCluster(cluster.Name)
// Copy label selector to its status counterpart in string format.
// This is necessary for CRDs including scale subresources.
kcp.Status.Selector = selector.String()

allMachines, err := r.getMachines(ctx, types.NamespacedName{Namespace: cluster.Namespace, Name: cluster.Name})
clusterKey := types.NamespacedName{
Namespace: cluster.Namespace,
Name: cluster.Name,
}
ownedMachines, err := r.managementCluster.GetMachinesForCluster(ctx, clusterKey, internal.OwnedControlPlaneMachines(kcp.Name))
if err != nil {
return errors.Wrap(err, "failed to get list of owned machines")
}
ownedMachines := r.filterOwnedMachines(kcp, allMachines)

replicas := int32(len(ownedMachines.Items))
replicas := int32(len(ownedMachines))
// TODO: take into account configuration hash once upgrades are in place
kcp.Status.Replicas = replicas

Expand All @@ -315,8 +306,8 @@ func (r *KubeadmControlPlaneReconciler) updateStatus(ctx context.Context, kcp *c
}

readyMachines := int32(0)
for i := range ownedMachines.Items {
m := &ownedMachines.Items[i]
for i := range ownedMachines {
m := &ownedMachines[i]
node, err := getMachineNode(ctx, remoteClient, m)
if err != nil {
return errors.Wrap(err, "failed to get referenced Node")
Expand All @@ -339,76 +330,19 @@ func (r *KubeadmControlPlaneReconciler) updateStatus(ctx context.Context, kcp *c
return nil
}

func (r *KubeadmControlPlaneReconciler) scaleUpControlPlane(ctx context.Context, cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane, ownedMachines *clusterv1.MachineList) error {
func (r *KubeadmControlPlaneReconciler) scaleUpControlPlane(ctx context.Context, cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane) error {
// Every control plane replica must be Ready
if int(kcp.Status.UnavailableReplicas) > 0 {
return errors.New(fmt.Sprintf("%d control plane replicas are not yet Ready", int(kcp.Status.UnavailableReplicas)))
}

var errs []error
var knownClusterID uint64
var knownMemberIDSet etcdutil.UInt64

for _, om := range ownedMachines.Items {
// Get the control plane machine Node Name from the NodeRef
if om.Status.NodeRef == nil {
// A Ready control plane machine should have a non-nil NodeRef
errs = append(errs, errors.New(fmt.Sprintf("internal error: control plane machine %s is Ready, but has a nil NodeRef", om.Name)))
continue
}
nodeName := om.Status.NodeRef.Name

// Create the etcd client for the etcd Pod scheduled on the Node
etcdClient, err := r.remoteEtcdClientGetter(r.Client, nodeName)
if err != nil {
errs = append(errs, errors.New(fmt.Sprintf("failed to create etcd client for control plane machine %s", om.Name)))
continue
}

// List etcd members. This checks that the member is healthy, because the request goes through consensus.
members, err := etcdClient.Members(ctx)
if err != nil {
errs = append(errs, errors.New(fmt.Sprintf("failed to list etcd members using etcd client for control plane machine %s", om.Name)))
continue
}
member := etcdutil.MemberForName(members, nodeName)

// Check that the member reports no alarms.
if len(member.Alarms) > 0 {
errs = append(errs, errors.New(fmt.Sprintf("etcd member for control plane machine %s reports alarms: %v", om.Name, member.Alarms)))
continue
}

// Check that the member belongs to the same cluster as all other members.
clusterID := member.ClusterID
if knownClusterID == 0 {
knownClusterID = clusterID
} else if knownClusterID != clusterID {
errs = append(errs, errors.New(fmt.Sprintf("etcd member for control plane machine %s has cluster ID %d, but all previously seen etcd members have cluster ID %d", om.Name, clusterID, knownClusterID)))
continue
}

// Check that the member list is stable.
memberIDSet := etcdutil.MemberIDSet(members)
if knownMemberIDSet.Len() == 0 {
knownMemberIDSet = memberIDSet
} else {
unknownMembers := memberIDSet.Difference(knownMemberIDSet)
if unknownMembers.Len() > 0 {
errs = append(errs, errors.New(fmt.Sprintf("etcd member for control plane machine %s reports members IDs %v, but all previously seen etcd members reported member IDs %v", om.Name, memberIDSet.UnsortedList(), knownMemberIDSet.UnsortedList())))
}
continue
}
clusterKey := types.NamespacedName{
Namespace: cluster.Namespace,
Name: cluster.Name,
}

// Check that there is exactly one etcd member for every control plane machine.
// There should be no etcd members added "out of band.""
if len(ownedMachines.Items) != len(knownMemberIDSet) {
errs = append(errs, errors.New(fmt.Sprintf("there are %d control plane machines, but %d etcd members", len(ownedMachines.Items), len(knownMemberIDSet))))
}

if len(errs) > 0 {
return kerrors.NewAggregate(errs)
// TODO(chuckha) Need to skip/rework this for external etcd case and assume it's working.
// This assumes that there is a 1;1 correspondence between control plane nodes and etcd members.
if err := r.managementCluster.TargetClusterEtcdIsHealthy(ctx, clusterKey, kcp.Name); err != nil {
return err
}

// Create the bootstrap configuration
Expand Down Expand Up @@ -447,7 +381,7 @@ func (r *KubeadmControlPlaneReconciler) cloneConfigsAndGenerateMachine(ctx conte
Namespace: kcp.Namespace,
OwnerRef: infraCloneOwner,
ClusterName: cluster.Name,
Labels: generateKubeadmControlPlaneLabels(cluster.Name),
Labels: r.managementCluster.ControlPlaneLabelsForCluster(cluster.Name),
})
if err != nil {
// Safe to return early here since no resources have been created yet.
Expand Down Expand Up @@ -539,13 +473,16 @@ func (r *KubeadmControlPlaneReconciler) failureDomainForScaleUp(ctx context.Cont
if len(cluster.Status.FailureDomains) == 0 {
return nil, nil
}
machineList, err := r.getMachines(ctx, types.NamespacedName{Namespace: cluster.GetNamespace(), Name: cluster.GetName()})
clusterKey := types.NamespacedName{
Namespace: cluster.Namespace,
Name: cluster.Name,
}
ownedMachines, err := r.managementCluster.GetMachinesForCluster(ctx, clusterKey, internal.OwnedControlPlaneMachines(kcp.Name))
if err != nil {
return nil, err
}
machineList = r.filterOwnedMachines(kcp, machineList)
picker := internal.FailureDomainPicker{Log: r.Log}
failureDomain := picker.PickFewest(cluster.Status.FailureDomains.FilterControlPlane(), machineList.Items)
failureDomain := picker.PickFewest(cluster.Status.FailureDomains.FilterControlPlane(), ownedMachines)
return &failureDomain, nil
}

Expand All @@ -559,7 +496,7 @@ func (r *KubeadmControlPlaneReconciler) generateMachine(ctx context.Context, kcp
ObjectMeta: metav1.ObjectMeta{
Name: names.SimpleNameGenerator.GenerateName(kcp.Name + "-"),
Namespace: kcp.Namespace,
Labels: generateKubeadmControlPlaneLabels(cluster.Name),
Labels: r.managementCluster.ControlPlaneLabelsForCluster(cluster.Name),
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(kcp, controlplanev1.GroupVersion.WithKind("KubeadmControlPlane")),
},
Expand All @@ -582,49 +519,44 @@ func (r *KubeadmControlPlaneReconciler) generateMachine(ctx context.Context, kcp
return nil
}

func generateKubeadmControlPlaneSelector(clusterName string) *metav1.LabelSelector {
return &metav1.LabelSelector{
MatchLabels: generateKubeadmControlPlaneLabels(clusterName),
}
}

func generateKubeadmControlPlaneLabels(clusterName string) map[string]string {
return map[string]string{
clusterv1.ClusterLabelName: clusterName,
clusterv1.MachineControlPlaneLabelName: "",
}
}

// reconcileDelete handles KubeadmControlPlane deletion.
// The implementation does not take non-control plane workloads into
// consideration. This may or may not change in the future. Please see
// https://github.com/kubernetes-sigs/cluster-api/issues/2064
func (r *KubeadmControlPlaneReconciler) reconcileDelete(ctx context.Context, cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane, logger logr.Logger) (_ ctrl.Result, reterr error) {
// Fetch Machines
allMachines, err := util.GetMachinesForCluster(ctx, r.Client, cluster)
clusterKey := types.NamespacedName{
Namespace: cluster.Namespace,
Name: cluster.Name,
}
allMachines, err := r.managementCluster.GetMachinesForCluster(ctx, clusterKey)
if err != nil {
logger.Error(err, "Failed to get list of machines")
return ctrl.Result{}, err
}
ownedMachines := r.filterOwnedMachines(kcp, allMachines)
controlPlaneMachines, err := r.managementCluster.GetMachinesForCluster(ctx, clusterKey, internal.OwnedControlPlaneMachines(kcp.Name))
if err != nil {
return ctrl.Result{}, err
}

// Verify that only control plane machines remain
if len(allMachines.Items) != len(ownedMachines.Items) {
if len(allMachines) != len(controlPlaneMachines) {
err := errors.New("at least one machine is not owned by the control plane")
logger.Error(err, "Failed to delete the control plane")
return ctrl.Result{}, err
}

// If no control plane machines remain, remove the finalizer
if len(ownedMachines.Items) == 0 {
if len(controlPlaneMachines) == 0 {
controllerutil.RemoveFinalizer(kcp, controlplanev1.KubeadmControlPlaneFinalizer)
return ctrl.Result{}, nil
}

// Delete control plane machines in parallel
var errs []error
for i := range ownedMachines.Items {
m := &ownedMachines.Items[i]
for i := range controlPlaneMachines {
m := &controlPlaneMachines[i]
if err := r.Client.Delete(ctx, m); err != nil && !apierrors.IsNotFound(err) {
errs = append(errs, errors.Wrap(err, "failed to cleanup owned machines"))
}
Expand Down Expand Up @@ -666,7 +598,7 @@ func (r *KubeadmControlPlaneReconciler) reconcileKubeconfig(ctx context.Context,
}

func (r *KubeadmControlPlaneReconciler) getMachines(ctx context.Context, clusterName types.NamespacedName) (*clusterv1.MachineList, error) {
selector := generateKubeadmControlPlaneLabels(clusterName.Name)
selector := r.managementCluster.ControlPlaneLabelsForCluster(clusterName.Name)
allMachines := &clusterv1.MachineList{}
if err := r.Client.List(ctx, allMachines, client.InNamespace(clusterName.Namespace), client.MatchingLabels(selector)); err != nil {
return nil, errors.Wrap(err, "failed to list machines")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"k8s.io/klog/klogr"
"k8s.io/utils/pointer"
utilpointer "k8s.io/utils/pointer"
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
Expand Down Expand Up @@ -854,6 +855,7 @@ func TestKubeadmControlPlaneReconciler_updateStatusNoMachines(t *testing.T) {
Log: log.Log,
remoteClientGetter: fakeremote.NewClusterClient,
scheme: scheme.Scheme,
managementCluster: &internal.ManagementCluster{Client: fakeClient},
}

g.Expect(r.updateStatus(context.Background(), kcp, cluster)).To(Succeed())
Expand Down Expand Up @@ -895,6 +897,13 @@ func createNodeAndNodeRefForMachine(machine *clusterv1.Machine, ready bool) (*cl
return machine, node
}

func generateKubeadmControlPlaneLabels(name string) map[string]string {
return map[string]string{
clusterv1.ClusterLabelName: name,
clusterv1.MachineControlPlaneLabelName: "",
}
}

func createMachineNodePair(name string, cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane, ready bool) (*clusterv1.Machine, *corev1.Node) {
machine := &clusterv1.Machine{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -1089,7 +1098,6 @@ func TestReconcileControlPlaneScaleUp(t *testing.T) {
recorder: record.NewFakeRecorder(32),
scheme: scheme.Scheme,
remoteClientGetter: fakeremote.NewClusterClient,
remoteEtcdClientGetter: fakeRemoteEtcdClientGetter(fakeEtcdClient),
}

for i := 1; i <= desiredReplicas; i++ {
Expand Down Expand Up @@ -1210,12 +1218,6 @@ func createFakeClient() (client.Client, error) {
return fakeClient, nil
}

func fakeRemoteEtcdClientGetter(fc *fakeetcd.FakeEtcdClient) EtcdClientGetter {
return func(c client.Client, nodeName string) (EtcdClient, error) {
return fc, nil
}
}

func TestScaleUpControlPlaneAddsANewMachine(t *testing.T) {
g := NewWithT(t)

Expand All @@ -1237,7 +1239,6 @@ func TestScaleUpControlPlaneAddsANewMachine(t *testing.T) {
recorder: record.NewFakeRecorder(32),
scheme: scheme.Scheme,
remoteClientGetter: fakeremote.NewClusterClient,
remoteEtcdClientGetter: fakeRemoteEtcdClientGetter(fakeEtcdClient),
}

// Create the first control plane replica
Expand All @@ -1258,7 +1259,7 @@ func TestScaleUpControlPlaneAddsANewMachine(t *testing.T) {

// Tell the controller to scale up
g.Expect(fakeClient.List(context.Background(), ownedMachines)).To(Succeed())
g.Expect(r.scaleUpControlPlane(context.Background(), cluster, kcp, ownedMachines)).To(Succeed())
g.Expect(r.scaleUpControlPlane(context.Background(), cluster, kcp)).To(Succeed())

// Verify that controller created a new machine
g.Expect(fakeClient.List(context.Background(), ownedMachines)).To(Succeed())
Expand Down
Loading

0 comments on commit d7fd9d7

Please sign in to comment.