Skip to content

Commit

Permalink
✨ KCP adopts existing machines
Browse files Browse the repository at this point in the history
The KCP controller identifies Machines that belong to the control plane
of an existing cluster and adopts them, including finding PKI materials
that may be owned by the machine's bootstrap config and pivoting their
ownership to the KCP as well.

Prior to adopting machines (which, if unsuccessful, will block the KCP
from taking any management actions), it runs a number of safety checks
including:

- Ensuring the KCP has not been deleted (to prevent re-adoption of
  orphans, though this process races with the garbage collector)
- Checking that the machine's bootstrap provider was KubeadmConfig
- Verifying that the Machine is no further than one minor version off of
  the KCP's spec

Additionally, we set set a "best guess" value for the
kubeadm.controlplane.cluster.x-k8s.io/hash on the adopted machine as if
it were generated by a KCP in the past. The intent is that a KCP will
adopt machines matching its "spec" (to the best of its ability) without
modification, which in practice works well for adopting machines with
the same spec'd version.

Co-authored-by: mnguyen <mnguyen@newrelic.com>
  • Loading branch information
sethp-nr and mytunguyen committed Mar 7, 2020
1 parent 4d0192c commit 17da6e3
Show file tree
Hide file tree
Showing 16 changed files with 854 additions and 116 deletions.
1 change: 1 addition & 0 deletions controlplane/kubeadm/config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ rules:
- get
- list
- patch
- update
- watch

---
Expand Down
4 changes: 2 additions & 2 deletions controlplane/kubeadm/controllers/fakes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ func (f *fakeManagementCluster) GetMachinesForCluster(c context.Context, n clien
return f.Machines, nil
}

func (f *fakeManagementCluster) TargetClusterControlPlaneIsHealthy(_ context.Context, _ client.ObjectKey, _ string) error {
func (f *fakeManagementCluster) TargetClusterControlPlaneIsHealthy(_ context.Context, _ client.ObjectKey) error {
if !f.ControlPlaneHealthy {
return errors.New("control plane is not healthy")
}
return nil
}

func (f *fakeManagementCluster) TargetClusterEtcdIsHealthy(_ context.Context, _ client.ObjectKey, _ string) error {
func (f *fakeManagementCluster) TargetClusterEtcdIsHealthy(_ context.Context, _ client.ObjectKey) error {
if !f.EtcdHealthy {
return errors.New("etcd is not healthy")
}
Expand Down
185 changes: 169 additions & 16 deletions controlplane/kubeadm/controllers/kubeadm_control_plane_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apiserver/pkg/storage/names"
"k8s.io/client-go/tools/record"
"k8s.io/utils/pointer"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
Expand Down Expand Up @@ -69,7 +70,7 @@ const (
)

// +kubebuilder:rbac:groups=core,resources=events,verbs=get;list;watch;create;patch
// +kubebuilder:rbac:groups=core,resources=secrets,verbs=get;list;watch;create;patch
// +kubebuilder:rbac:groups=core,resources=secrets,verbs=get;list;watch;create;update;patch
// +kubebuilder:rbac:groups=core,resources=configmaps,namespace=kube-system,verbs=get;list;watch;create
// +kubebuilder:rbac:groups=rbac,resources=roles,namespace=kube-system,verbs=get;list;watch;create
// +kubebuilder:rbac:groups=rbac,resources=rolebindings,namespace=kube-system,verbs=get;list;watch;create
Expand All @@ -86,6 +87,8 @@ type KubeadmControlPlaneReconciler struct {
recorder record.EventRecorder

managementCluster internal.ManagementCluster

uncachedClient client.Reader
}

func (r *KubeadmControlPlaneReconciler) SetupWithManager(mgr ctrl.Manager, options controller.Options) error {
Expand All @@ -110,6 +113,7 @@ func (r *KubeadmControlPlaneReconciler) SetupWithManager(mgr ctrl.Manager, optio
if r.managementCluster == nil {
r.managementCluster = &internal.Management{Client: r.Client}
}
r.uncachedClient = mgr.GetAPIReader()

return nil
}
Expand Down Expand Up @@ -227,13 +231,25 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, cluster *
return ctrl.Result{}, err
}

// TODO: handle proper adoption of Machines
ownedMachines, err := r.managementCluster.GetMachinesForCluster(ctx, util.ObjectKey(cluster), internal.OwnedControlPlaneMachines(kcp.Name))
controlPlaneMachines, err := r.managementCluster.GetMachinesForCluster(ctx, util.ObjectKey(cluster), internal.ControlPlaneMachines(cluster.Name))
if err != nil {
logger.Error(err, "failed to retrieve control plane machines for cluster")
return ctrl.Result{}, err
}

adoptableMachines := controlPlaneMachines.Filter(internal.AdoptableControlPlaneMachines(cluster.Name))
if len(adoptableMachines) > 0 {
// We adopt the Machines and then wait for the update event for the ownership reference to re-queue them so the cache is up-to-date
err = r.AdoptMachines(ctx, kcp, adoptableMachines)
return ctrl.Result{}, err
}

ownedMachines := controlPlaneMachines.Filter(internal.OwnedControlPlaneMachines(kcp))
if len(ownedMachines) != len(controlPlaneMachines) {
logger.Info("Not all control plane machines are owned by this KubeadmControlPlane, refusing to operate in mixed management mode")
return ctrl.Result{}, nil
}

now := metav1.Now()
var requireUpgrade internal.FilterableMachineCollection
if kcp.Spec.UpgradeAfter != nil && kcp.Spec.UpgradeAfter.Before(&now) {
Expand Down Expand Up @@ -276,17 +292,12 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, cluster *
}

func (r *KubeadmControlPlaneReconciler) updateStatus(ctx context.Context, kcp *controlplanev1.KubeadmControlPlane, cluster *clusterv1.Cluster) error {
labelSelector := internal.ControlPlaneSelectorForCluster(cluster.Name)
selector, err := metav1.LabelSelectorAsSelector(labelSelector)
if err != nil {
// Since we are building up the LabelSelector above, this should not fail
return errors.Wrap(err, "failed to parse label selector")
}
selector := internal.ControlPlaneSelectorForCluster(cluster.Name)
// Copy label selector to its status counterpart in string format.
// This is necessary for CRDs including scale subresources.
kcp.Status.Selector = selector.String()

ownedMachines, err := r.managementCluster.GetMachinesForCluster(ctx, util.ObjectKey(cluster), internal.OwnedControlPlaneMachines(kcp.Name))
ownedMachines, err := r.managementCluster.GetMachinesForCluster(ctx, util.ObjectKey(cluster), internal.OwnedControlPlaneMachines(kcp))
if err != nil {
return errors.Wrap(err, "failed to get list of owned machines")
}
Expand Down Expand Up @@ -444,13 +455,13 @@ func (r *KubeadmControlPlaneReconciler) initializeControlPlane(ctx context.Conte

func (r *KubeadmControlPlaneReconciler) scaleUpControlPlane(ctx context.Context, cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane, machines internal.FilterableMachineCollection) (ctrl.Result, error) {
logger := r.Log.WithValues("namespace", kcp.Namespace, "kubeadmControlPlane", kcp.Name, "cluster", cluster.Name)
if err := r.managementCluster.TargetClusterControlPlaneIsHealthy(ctx, util.ObjectKey(cluster), kcp.Name); err != nil {
if err := r.managementCluster.TargetClusterControlPlaneIsHealthy(ctx, util.ObjectKey(cluster)); err != nil {
logger.Error(err, "waiting for control plane to pass control plane health check before adding an additional control plane machine")
r.recorder.Eventf(kcp, corev1.EventTypeWarning, "ControlPlaneUnhealthy", "Waiting for control plane to pass control plane health check before adding additional control plane machine: %v", err)
return ctrl.Result{}, &capierrors.RequeueAfterError{RequeueAfter: HealthCheckFailedRequeueAfter}
}

if err := r.managementCluster.TargetClusterEtcdIsHealthy(ctx, util.ObjectKey(cluster), kcp.Name); err != nil {
if err := r.managementCluster.TargetClusterEtcdIsHealthy(ctx, util.ObjectKey(cluster)); err != nil {
logger.Error(err, "waiting for control plane to pass etcd health check before adding an additional control plane machine")
r.recorder.Eventf(kcp, corev1.EventTypeWarning, "ControlPlaneUnhealthy", "Waiting for control plane to pass etcd health check before adding additional control plane machine: %v", err)
return ctrl.Result{}, &capierrors.RequeueAfterError{RequeueAfter: HealthCheckFailedRequeueAfter}
Expand Down Expand Up @@ -511,7 +522,7 @@ func (r *KubeadmControlPlaneReconciler) scaleDownControlPlane(ctx context.Contex

if !internal.HasAnnotationKey(controlplanev1.ScaleDownEtcdMemberRemovedAnnotation)(machineToDelete) {
// Ensure etcd is healthy prior to attempting to remove the member
if err := r.managementCluster.TargetClusterEtcdIsHealthy(ctx, util.ObjectKey(cluster), kcp.Name); err != nil {
if err := r.managementCluster.TargetClusterEtcdIsHealthy(ctx, util.ObjectKey(cluster)); err != nil {
logger.Error(err, "waiting for control plane to pass etcd health check before removing a control plane machine")
r.recorder.Eventf(kcp, corev1.EventTypeWarning, "ControlPlaneUnhealthy", "Waiting for control plane to pass etcd health check before removing a control plane machine: %v", err)
return ctrl.Result{}, &capierrors.RequeueAfterError{RequeueAfter: HealthCheckFailedRequeueAfter}
Expand All @@ -526,7 +537,7 @@ func (r *KubeadmControlPlaneReconciler) scaleDownControlPlane(ctx context.Contex
}

if !internal.HasAnnotationKey(controlplanev1.ScaleDownConfigMapEntryRemovedAnnotation)(machineToDelete) {
if err := r.managementCluster.TargetClusterControlPlaneIsHealthy(ctx, util.ObjectKey(cluster), kcp.Name); err != nil {
if err := r.managementCluster.TargetClusterControlPlaneIsHealthy(ctx, util.ObjectKey(cluster)); err != nil {
logger.Error(err, "waiting for control plane to pass control plane health check before removing a control plane machine")
r.recorder.Eventf(kcp, corev1.EventTypeWarning, "ControlPlaneUnhealthy", "Waiting for control plane to pass control plane health check before removing a control plane machine: %v", err)
return ctrl.Result{}, &capierrors.RequeueAfterError{RequeueAfter: HealthCheckFailedRequeueAfter}
Expand All @@ -542,7 +553,7 @@ func (r *KubeadmControlPlaneReconciler) scaleDownControlPlane(ctx context.Contex
}

// Do a final health check of the Control Plane components prior to actually deleting the machine
if err := r.managementCluster.TargetClusterControlPlaneIsHealthy(ctx, util.ObjectKey(cluster), kcp.Name); err != nil {
if err := r.managementCluster.TargetClusterControlPlaneIsHealthy(ctx, util.ObjectKey(cluster)); err != nil {
logger.Error(err, "waiting for control plane to pass control plane health check before removing a control plane machine")
r.recorder.Eventf(kcp, corev1.EventTypeWarning, "ControlPlaneUnhealthy", "Waiting for control plane to pass control plane health check before removing a control plane machine: %v", err)
return ctrl.Result{}, &capierrors.RequeueAfterError{RequeueAfter: HealthCheckFailedRequeueAfter}
Expand Down Expand Up @@ -724,7 +735,7 @@ func (r *KubeadmControlPlaneReconciler) reconcileDelete(ctx context.Context, clu
logger.Error(err, "failed to retrieve machines for cluster")
return ctrl.Result{}, err
}
ownedMachines := allMachines.Filter(internal.OwnedControlPlaneMachines(kcp.Name))
ownedMachines := allMachines.Filter(internal.OwnedControlPlaneMachines(kcp))

// If no control plane machines remain, remove the finalizer
if len(ownedMachines) == 0 {
Expand Down Expand Up @@ -834,3 +845,145 @@ func (r *KubeadmControlPlaneReconciler) ClusterToKubeadmControlPlane(o handler.M

return nil
}

func (r *KubeadmControlPlaneReconciler) AdoptMachines(ctx context.Context, kcp *controlplanev1.KubeadmControlPlane, machines internal.FilterableMachineCollection) error {
// We do an uncached full quorum read against the KCP to avoid re-adopting Machines the garbage collector just intentionally orphaned
// See https://github.com/kubernetes/kubernetes/issues/42639
uncached := controlplanev1.KubeadmControlPlane{}
err := r.uncachedClient.Get(ctx, client.ObjectKey{Namespace: kcp.Namespace, Name: kcp.Name}, &uncached)
if err != nil {
return fmt.Errorf("can't recheck DeletionTimestamp: %v", err)
}
if !uncached.DeletionTimestamp.IsZero() {
return fmt.Errorf("%v/%v has just been deleted at %v", kcp.GetNamespace(), kcp.GetName(), kcp.GetDeletionTimestamp())
}

kcpVersion, err := semver.ParseTolerant(kcp.Spec.Version)
if err != nil {
return errors.Wrapf(err, "failed to parse kubernetes version %q", kcp.Spec.Version)
}

for _, m := range machines {
ref := m.Spec.Bootstrap.ConfigRef

// TODO instead of returning error here, we should instead Event and add a watch on potentially adoptable Machines
if ref == nil || ref.Kind != "KubeadmConfig" {
return fmt.Errorf("unable to adopt Machine %v/%v: expected a ConfigRef of kind KubeadmConfig but instead found %v", m.Namespace, m.Name, ref)
}

// TODO instead of returning error here, we should instead Event and add a watch on potentially adoptable Machines
if ref.Namespace != "" && ref.Namespace != kcp.Namespace {
return fmt.Errorf("could not adopt resources from KubeadmConfig %v/%v: cannot adopt across namespaces", ref.Namespace, ref.Name)
}

if m.Spec.Version == nil {
// if the machine's version is not immediately apparent, assume the operator knows what they're doing
continue
}

machineVersion, err := semver.ParseTolerant(*m.Spec.Version)
if err != nil {
return errors.Wrapf(err, "failed to parse kubernetes version %q", *m.Spec.Version)
}

dist := func(a, b uint64) uint64 {
if a > b {
return a - b
}
return b - a
}
if kcpVersion.Major != machineVersion.Major || dist(kcpVersion.Minor, machineVersion.Minor) > 1 {
r.recorder.Eventf(kcp, corev1.EventTypeWarning, "AdoptionFailed", "Could not adopt Machine %s/%s: its version (%q) is outside supported +/- one minor version skew from KCP's (%q)", m.Namespace, m.Name, *m.Spec.Version, kcp.Spec.Version)
// avoid returning an error here so we don't cause the KCP controller to spin until the operator clarifies their intent
return nil
}
}

for _, m := range machines {
ref := m.Spec.Bootstrap.ConfigRef
obj := &bootstrapv1.KubeadmConfig{}
err := r.Client.Get(ctx, client.ObjectKey{Name: ref.Name, Namespace: kcp.Namespace}, obj)
if err != nil {
return err
}

err = r.AdoptOwnedSecrets(ctx, kcp, obj)
if err != nil {
return err
}

patchHelper, err := patch.NewHelper(m, r.Client)
if err != nil {
return err
}

m.SetOwnerReferences(util.EnsureOwnerRef(m.GetOwnerReferences(), metav1.OwnerReference{
APIVersion: controlplanev1.GroupVersion.String(),
Kind: "KubeadmControlPlane",
Name: kcp.Name,
UID: kcp.UID,
Controller: pointer.BoolPtr(true),
BlockOwnerDeletion: pointer.BoolPtr(true),
}))

// 0. get machine.Spec.Version - the easy answer
machineKubernetesVersion := ""
if m.Spec.Version != nil {
machineKubernetesVersion = *m.Spec.Version
}

// 1. hash the version (kubernetes version) and kubeadm_controlplane's Spec.infrastructureTemplate
newSpec := controlplanev1.KubeadmControlPlaneSpec{
Version: machineKubernetesVersion,
InfrastructureTemplate: kcp.Spec.InfrastructureTemplate,
}
newConfigurationHash := hash.Compute(&newSpec)
// 2. add kubeadm.controlplane.cluster.x-k8s.io/hash as a label in each machine
m.Labels["kubeadm.controlplane.cluster.x-k8s.io/hash"] = newConfigurationHash

// Note that ValidateOwnerReferences() will reject this patch if another
// OwnerReference exists with controller=true.
if err := patchHelper.Patch(ctx, m); err != nil {
return err
}
}
return nil
}

func (r *KubeadmControlPlaneReconciler) AdoptOwnedSecrets(ctx context.Context, kcp *controlplanev1.KubeadmControlPlane, currentOwner metav1.Object) error {
secrets := corev1.SecretList{}
err := r.Client.List(ctx, &secrets, client.InNamespace(kcp.Namespace))

if err != nil {
return errors.Wrap(err, "error finding secrets for adoption")
}

for _, s := range secrets.Items {
if !util.PointsTo(s.GetOwnerReferences(), currentOwner) {
continue
}
// avoid taking ownership of the bootstrap data secret
if s.Name == currentOwner.GetName() {
continue
}

ss := corev1.Secret{}
s.DeepCopyInto(&ss)

ss.SetOwnerReferences(util.ReconcileOwnerRef(ss.GetOwnerReferences(), metav1.OwnerReference{
APIVersion: controlplanev1.GroupVersion.String(),
Kind: "KubeadmControlPlane",
Name: kcp.Name,
UID: kcp.UID,
Controller: pointer.BoolPtr(true),
BlockOwnerDeletion: pointer.BoolPtr(true),
}, currentOwner))

err := r.Client.Update(ctx, &ss)
if err != nil {
return errors.Wrapf(err, "error changing secret %v ownership from KubeadmConfig/%v to KubeadmControlPlane/%v", s.Name, currentOwner.GetName(), kcp.Name)
}
}

return nil
}
Loading

0 comments on commit 17da6e3

Please sign in to comment.