Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🏃 Refactor healthcheck logic into new package #1

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 49 additions & 117 deletions controlplane/kubeadm/controllers/kubeadm_control_plane_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"k8s.io/apiserver/pkg/storage/names"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal"
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal/etcd"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
Expand All @@ -48,7 +47,6 @@ import (
"sigs.k8s.io/cluster-api/controllers/external"
"sigs.k8s.io/cluster-api/controllers/remote"
controlplanev1 "sigs.k8s.io/cluster-api/controlplane/kubeadm/api/v1alpha3"
etcdutil "sigs.k8s.io/cluster-api/controlplane/kubeadm/internal/etcd/util"
capierrors "sigs.k8s.io/cluster-api/errors"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/kubeconfig"
Expand All @@ -70,25 +68,16 @@ const (

// KubeadmControlPlaneReconciler reconciles a KubeadmControlPlane object

type EtcdClient interface {
Close() error
Members(ctx context.Context) ([]*etcd.Member, error)
MoveLeader(ctx context.Context, newLeaderID uint64) error
RemoveMember(ctx context.Context, id uint64) error
UpdateMemberPeerURLs(ctx context.Context, id uint64, peerURLs []string) ([]*etcd.Member, error)
Alarms(ctx context.Context) ([]etcd.MemberAlarm, error)
}

type EtcdClientGetter func(c client.Client, nodeName string) (EtcdClient, error)
type KubeadmControlPlaneReconciler struct {
Client client.Client
Log logr.Logger
scheme *runtime.Scheme
controller controller.Controller
recorder record.EventRecorder

managementCluster *internal.ManagementCluster

remoteClientGetter remote.ClusterClientGetter
remoteEtcdClientGetter EtcdClientGetter
}

func (r *KubeadmControlPlaneReconciler) SetupWithManager(mgr ctrl.Manager, options controller.Options) error {
Expand Down Expand Up @@ -119,6 +108,7 @@ func (r *KubeadmControlPlaneReconciler) SetupWithManager(mgr ctrl.Manager, optio
func (r *KubeadmControlPlaneReconciler) Reconcile(req ctrl.Request) (res ctrl.Result, reterr error) {
logger := r.Log.WithValues("kubeadmControlPlane", req.Name, "namespace", req.Namespace)
ctx := context.Background()
r.managementCluster = &internal.ManagementCluster{Client: r.Client}

// Fetch the KubeadmControlPlane instance.
kcp := &controlplanev1.KubeadmControlPlane{}
Expand Down Expand Up @@ -197,12 +187,15 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, cluster *
}

// TODO: handle proper adoption of Machines
allMachines, err := r.getMachines(ctx, types.NamespacedName{Namespace: cluster.Namespace, Name: cluster.Name})
clusterKey := types.NamespacedName{
Namespace: cluster.GetNamespace(),
Name: cluster.GetName(),
}
ownedMachines, err := r.managementCluster.GetMachinesForCluster(ctx, clusterKey, internal.OwnedControlPlaneMachines(kcp.Name))
if err != nil {
logger.Error(err, "Failed to get list of machines")
return ctrl.Result{}, err
}
ownedMachines := r.filterOwnedMachines(kcp, allMachines)

// Generate Cluster Certificates if needed
config := kcp.Spec.KubeadmConfigSpec.DeepCopy()
Expand Down Expand Up @@ -249,7 +242,7 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, cluster *

// Currently we are not handling upgrade, so treat all owned machines as one for now.
// Once we start handling upgrade, we'll need to filter this list and act appropriately
numMachines := len(ownedMachines.Items)
numMachines := len(ownedMachines)
desiredReplicas := int(*kcp.Spec.Replicas)

switch {
Expand All @@ -269,7 +262,7 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, cluster *
case numMachines < desiredReplicas && numMachines > 0:
// create a new Machine w/ join
logger.Info("Scaling up by one replica", "Desired Replicas", desiredReplicas, "Existing Replicas", numMachines)
if err := r.scaleUpControlPlane(ctx, cluster, kcp, ownedMachines); err != nil {
if err := r.scaleUpControlPlane(ctx, cluster, kcp); err != nil {
logger.Error(err, "Failed to scale up the Control Plane")
r.recorder.Eventf(kcp, corev1.EventTypeWarning, "FailedScaleUp", "Failed to scale up the control plane: %v", err)
return ctrl.Result{}, err
Expand All @@ -289,23 +282,21 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, cluster *
}

func (r *KubeadmControlPlaneReconciler) updateStatus(ctx context.Context, kcp *controlplanev1.KubeadmControlPlane, cluster *clusterv1.Cluster) error {
labelSelector := generateKubeadmControlPlaneSelector(cluster.Name)
selector, err := metav1.LabelSelectorAsSelector(labelSelector)
if err != nil {
// Since we are building up the LabelSelector above, this should not fail
return errors.Wrap(err, "failed to parse label selector")
}
selector := r.managementCluster.ControlPlaneSelectorForCluster(cluster.Name)
// Copy label selector to its status counterpart in string format.
// This is necessary for CRDs including scale subresources.
kcp.Status.Selector = selector.String()

allMachines, err := r.getMachines(ctx, types.NamespacedName{Namespace: cluster.Namespace, Name: cluster.Name})
clusterKey := types.NamespacedName{
Namespace: cluster.Namespace,
Name: cluster.Name,
}
ownedMachines, err := r.managementCluster.GetMachinesForCluster(ctx, clusterKey, internal.OwnedControlPlaneMachines(kcp.Name))
if err != nil {
return errors.Wrap(err, "failed to get list of owned machines")
}
ownedMachines := r.filterOwnedMachines(kcp, allMachines)

replicas := int32(len(ownedMachines.Items))
replicas := int32(len(ownedMachines))
// TODO: take into account configuration hash once upgrades are in place
kcp.Status.Replicas = replicas

Expand All @@ -315,8 +306,8 @@ func (r *KubeadmControlPlaneReconciler) updateStatus(ctx context.Context, kcp *c
}

readyMachines := int32(0)
for i := range ownedMachines.Items {
m := &ownedMachines.Items[i]
for i := range ownedMachines {
m := &ownedMachines[i]
node, err := getMachineNode(ctx, remoteClient, m)
if err != nil {
return errors.Wrap(err, "failed to get referenced Node")
Expand All @@ -339,76 +330,19 @@ func (r *KubeadmControlPlaneReconciler) updateStatus(ctx context.Context, kcp *c
return nil
}

func (r *KubeadmControlPlaneReconciler) scaleUpControlPlane(ctx context.Context, cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane, ownedMachines *clusterv1.MachineList) error {
func (r *KubeadmControlPlaneReconciler) scaleUpControlPlane(ctx context.Context, cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane) error {
// Every control plane replica must be Ready
if int(kcp.Status.UnavailableReplicas) > 0 {
return errors.New(fmt.Sprintf("%d control plane replicas are not yet Ready", int(kcp.Status.UnavailableReplicas)))
}

var errs []error
var knownClusterID uint64
var knownMemberIDSet etcdutil.UInt64

for _, om := range ownedMachines.Items {
// Get the control plane machine Node Name from the NodeRef
if om.Status.NodeRef == nil {
// A Ready control plane machine should have a non-nil NodeRef
errs = append(errs, errors.New(fmt.Sprintf("internal error: control plane machine %s is Ready, but has a nil NodeRef", om.Name)))
continue
}
nodeName := om.Status.NodeRef.Name

// Create the etcd client for the etcd Pod scheduled on the Node
etcdClient, err := r.remoteEtcdClientGetter(r.Client, nodeName)
if err != nil {
errs = append(errs, errors.New(fmt.Sprintf("failed to create etcd client for control plane machine %s", om.Name)))
continue
}

// List etcd members. This checks that the member is healthy, because the request goes through consensus.
members, err := etcdClient.Members(ctx)
if err != nil {
errs = append(errs, errors.New(fmt.Sprintf("failed to list etcd members using etcd client for control plane machine %s", om.Name)))
continue
}
member := etcdutil.MemberForName(members, nodeName)

// Check that the member reports no alarms.
if len(member.Alarms) > 0 {
errs = append(errs, errors.New(fmt.Sprintf("etcd member for control plane machine %s reports alarms: %v", om.Name, member.Alarms)))
continue
}

// Check that the member belongs to the same cluster as all other members.
clusterID := member.ClusterID
if knownClusterID == 0 {
knownClusterID = clusterID
} else if knownClusterID != clusterID {
errs = append(errs, errors.New(fmt.Sprintf("etcd member for control plane machine %s has cluster ID %d, but all previously seen etcd members have cluster ID %d", om.Name, clusterID, knownClusterID)))
continue
}

// Check that the member list is stable.
memberIDSet := etcdutil.MemberIDSet(members)
if knownMemberIDSet.Len() == 0 {
knownMemberIDSet = memberIDSet
} else {
unknownMembers := memberIDSet.Difference(knownMemberIDSet)
if unknownMembers.Len() > 0 {
errs = append(errs, errors.New(fmt.Sprintf("etcd member for control plane machine %s reports members IDs %v, but all previously seen etcd members reported member IDs %v", om.Name, memberIDSet.UnsortedList(), knownMemberIDSet.UnsortedList())))
}
continue
}
clusterKey := types.NamespacedName{
Namespace: cluster.Namespace,
Name: cluster.Name,
}

// Check that there is exactly one etcd member for every control plane machine.
// There should be no etcd members added "out of band.""
if len(ownedMachines.Items) != len(knownMemberIDSet) {
errs = append(errs, errors.New(fmt.Sprintf("there are %d control plane machines, but %d etcd members", len(ownedMachines.Items), len(knownMemberIDSet))))
}

if len(errs) > 0 {
return kerrors.NewAggregate(errs)
// TODO(chuckha) Need to skip/rework this for external etcd case and assume it's working.
// This assumes that there is a 1;1 correspondence between control plane nodes and etcd members.
if err := r.managementCluster.TargetClusterEtcdIsHealthy(ctx, clusterKey, kcp.Name); err != nil {
return err
}

// Create the bootstrap configuration
Expand Down Expand Up @@ -447,7 +381,7 @@ func (r *KubeadmControlPlaneReconciler) cloneConfigsAndGenerateMachine(ctx conte
Namespace: kcp.Namespace,
OwnerRef: infraCloneOwner,
ClusterName: cluster.Name,
Labels: generateKubeadmControlPlaneLabels(cluster.Name),
Labels: r.managementCluster.ControlPlaneLabelsForCluster(cluster.Name),
})
if err != nil {
// Safe to return early here since no resources have been created yet.
Expand Down Expand Up @@ -539,13 +473,16 @@ func (r *KubeadmControlPlaneReconciler) failureDomainForScaleUp(ctx context.Cont
if len(cluster.Status.FailureDomains) == 0 {
return nil, nil
}
machineList, err := r.getMachines(ctx, types.NamespacedName{Namespace: cluster.GetNamespace(), Name: cluster.GetName()})
clusterKey := types.NamespacedName{
Namespace: cluster.Namespace,
Name: cluster.Name,
}
ownedMachines, err := r.managementCluster.GetMachinesForCluster(ctx, clusterKey, internal.OwnedControlPlaneMachines(kcp.Name))
if err != nil {
return nil, err
}
machineList = r.filterOwnedMachines(kcp, machineList)
picker := internal.FailureDomainPicker{Log: r.Log}
failureDomain := picker.PickFewest(cluster.Status.FailureDomains.FilterControlPlane(), machineList.Items)
failureDomain := picker.PickFewest(cluster.Status.FailureDomains.FilterControlPlane(), ownedMachines)
return &failureDomain, nil
}

Expand All @@ -559,7 +496,7 @@ func (r *KubeadmControlPlaneReconciler) generateMachine(ctx context.Context, kcp
ObjectMeta: metav1.ObjectMeta{
Name: names.SimpleNameGenerator.GenerateName(kcp.Name + "-"),
Namespace: kcp.Namespace,
Labels: generateKubeadmControlPlaneLabels(cluster.Name),
Labels: r.managementCluster.ControlPlaneLabelsForCluster(cluster.Name),
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(kcp, controlplanev1.GroupVersion.WithKind("KubeadmControlPlane")),
},
Expand All @@ -582,49 +519,44 @@ func (r *KubeadmControlPlaneReconciler) generateMachine(ctx context.Context, kcp
return nil
}

func generateKubeadmControlPlaneSelector(clusterName string) *metav1.LabelSelector {
return &metav1.LabelSelector{
MatchLabels: generateKubeadmControlPlaneLabels(clusterName),
}
}

func generateKubeadmControlPlaneLabels(clusterName string) map[string]string {
return map[string]string{
clusterv1.ClusterLabelName: clusterName,
clusterv1.MachineControlPlaneLabelName: "",
}
}

// reconcileDelete handles KubeadmControlPlane deletion.
// The implementation does not take non-control plane workloads into
// consideration. This may or may not change in the future. Please see
// https://github.com/kubernetes-sigs/cluster-api/issues/2064
func (r *KubeadmControlPlaneReconciler) reconcileDelete(ctx context.Context, cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane, logger logr.Logger) (_ ctrl.Result, reterr error) {
// Fetch Machines
allMachines, err := util.GetMachinesForCluster(ctx, r.Client, cluster)
clusterKey := types.NamespacedName{
Namespace: cluster.Namespace,
Name: cluster.Name,
}
allMachines, err := r.managementCluster.GetMachinesForCluster(ctx, clusterKey)
if err != nil {
logger.Error(err, "Failed to get list of machines")
return ctrl.Result{}, err
}
ownedMachines := r.filterOwnedMachines(kcp, allMachines)
controlPlaneMachines, err := r.managementCluster.GetMachinesForCluster(ctx, clusterKey, internal.OwnedControlPlaneMachines(kcp.Name))
if err != nil {
return ctrl.Result{}, err
}

// Verify that only control plane machines remain
if len(allMachines.Items) != len(ownedMachines.Items) {
if len(allMachines) != len(controlPlaneMachines) {
err := errors.New("at least one machine is not owned by the control plane")
logger.Error(err, "Failed to delete the control plane")
return ctrl.Result{}, err
}

// If no control plane machines remain, remove the finalizer
if len(ownedMachines.Items) == 0 {
if len(controlPlaneMachines) == 0 {
controllerutil.RemoveFinalizer(kcp, controlplanev1.KubeadmControlPlaneFinalizer)
return ctrl.Result{}, nil
}

// Delete control plane machines in parallel
var errs []error
for i := range ownedMachines.Items {
m := &ownedMachines.Items[i]
for i := range controlPlaneMachines {
m := &controlPlaneMachines[i]
if err := r.Client.Delete(ctx, m); err != nil && !apierrors.IsNotFound(err) {
errs = append(errs, errors.Wrap(err, "failed to cleanup owned machines"))
}
Expand Down Expand Up @@ -666,7 +598,7 @@ func (r *KubeadmControlPlaneReconciler) reconcileKubeconfig(ctx context.Context,
}

func (r *KubeadmControlPlaneReconciler) getMachines(ctx context.Context, clusterName types.NamespacedName) (*clusterv1.MachineList, error) {
selector := generateKubeadmControlPlaneLabels(clusterName.Name)
selector := r.managementCluster.ControlPlaneLabelsForCluster(clusterName.Name)
allMachines := &clusterv1.MachineList{}
if err := r.Client.List(ctx, allMachines, client.InNamespace(clusterName.Namespace), client.MatchingLabels(selector)); err != nil {
return nil, errors.Wrap(err, "failed to list machines")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"k8s.io/klog/klogr"
"k8s.io/utils/pointer"
utilpointer "k8s.io/utils/pointer"
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
Expand Down Expand Up @@ -854,6 +855,7 @@ func TestKubeadmControlPlaneReconciler_updateStatusNoMachines(t *testing.T) {
Log: log.Log,
remoteClientGetter: fakeremote.NewClusterClient,
scheme: scheme.Scheme,
managementCluster: &internal.ManagementCluster{Client: fakeClient},
}

g.Expect(r.updateStatus(context.Background(), kcp, cluster)).To(Succeed())
Expand Down Expand Up @@ -895,6 +897,13 @@ func createNodeAndNodeRefForMachine(machine *clusterv1.Machine, ready bool) (*cl
return machine, node
}

func generateKubeadmControlPlaneLabels(name string) map[string]string {
return map[string]string{
clusterv1.ClusterLabelName: name,
clusterv1.MachineControlPlaneLabelName: "",
}
}

func createMachineNodePair(name string, cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane, ready bool) (*clusterv1.Machine, *corev1.Node) {
machine := &clusterv1.Machine{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -1089,7 +1098,6 @@ func TestReconcileControlPlaneScaleUp(t *testing.T) {
recorder: record.NewFakeRecorder(32),
scheme: scheme.Scheme,
remoteClientGetter: fakeremote.NewClusterClient,
remoteEtcdClientGetter: fakeRemoteEtcdClientGetter(fakeEtcdClient),
}

for i := 1; i <= desiredReplicas; i++ {
Expand Down Expand Up @@ -1210,12 +1218,6 @@ func createFakeClient() (client.Client, error) {
return fakeClient, nil
}

func fakeRemoteEtcdClientGetter(fc *fakeetcd.FakeEtcdClient) EtcdClientGetter {
return func(c client.Client, nodeName string) (EtcdClient, error) {
return fc, nil
}
}

func TestScaleUpControlPlaneAddsANewMachine(t *testing.T) {
g := NewWithT(t)

Expand All @@ -1237,7 +1239,6 @@ func TestScaleUpControlPlaneAddsANewMachine(t *testing.T) {
recorder: record.NewFakeRecorder(32),
scheme: scheme.Scheme,
remoteClientGetter: fakeremote.NewClusterClient,
remoteEtcdClientGetter: fakeRemoteEtcdClientGetter(fakeEtcdClient),
}

// Create the first control plane replica
Expand All @@ -1258,7 +1259,7 @@ func TestScaleUpControlPlaneAddsANewMachine(t *testing.T) {

// Tell the controller to scale up
g.Expect(fakeClient.List(context.Background(), ownedMachines)).To(Succeed())
g.Expect(r.scaleUpControlPlane(context.Background(), cluster, kcp, ownedMachines)).To(Succeed())
g.Expect(r.scaleUpControlPlane(context.Background(), cluster, kcp)).To(Succeed())

// Verify that controller created a new machine
g.Expect(fakeClient.List(context.Background(), ownedMachines)).To(Succeed())
Expand Down
Loading