Skip to content

Commit

Permalink
✨ KCP adopts existing machines
Browse files Browse the repository at this point in the history
A mostly-functional prototype for having the KCP controller identify
Machines that belong to the control plane of an existing cluster and
adopt them.
  • Loading branch information
sethp-nr committed Feb 19, 2020
1 parent bc65e0d commit 7931678
Show file tree
Hide file tree
Showing 5 changed files with 237 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apiserver/pkg/storage/names"
"k8s.io/client-go/tools/record"
"k8s.io/utils/pointer"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
Expand Down Expand Up @@ -89,6 +90,8 @@ type KubeadmControlPlaneReconciler struct {
remoteClientGetter remote.ClusterClientGetter

managementCluster managementCluster

uncachedClient client.Reader
}

func (r *KubeadmControlPlaneReconciler) SetupWithManager(mgr ctrl.Manager, options controller.Options) error {
Expand All @@ -112,6 +115,7 @@ func (r *KubeadmControlPlaneReconciler) SetupWithManager(mgr ctrl.Manager, optio
if r.remoteClientGetter == nil {
r.remoteClientGetter = remote.NewClusterClient
}
r.uncachedClient = mgr.GetAPIReader()

return nil
}
Expand Down Expand Up @@ -228,8 +232,18 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, cluster *
return ctrl.Result{}, err
}

// TODO: handle proper adoption of Machines
ownedMachines, err := r.managementCluster.GetMachinesForCluster(ctx, util.ObjectKey(cluster), internal.OwnedControlPlaneMachines(kcp.Name))
adoptableMachines, err := r.managementCluster.GetMachinesForCluster(ctx, util.ObjectKey(cluster), internal.AdoptableControlPlaneMachines(cluster.Name))
if err != nil {
return ctrl.Result{}, err
}

if len(adoptableMachines) > 0 {
// We adopt the Machines and then wait for the update event for the ownership reference to re-queue them so the cache is up-to-date
err = r.AdoptMachines(ctx, kcp, adoptableMachines...)
return ctrl.Result{}, err
}

ownedMachines, err := r.managementCluster.GetMachinesForCluster(ctx, util.ObjectKey(cluster), internal.OwnedControlPlaneMachines(kcp))
if err != nil {
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -297,17 +311,12 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, cluster *
}

func (r *KubeadmControlPlaneReconciler) updateStatus(ctx context.Context, kcp *controlplanev1.KubeadmControlPlane, cluster *clusterv1.Cluster) error {
labelSelector := internal.ControlPlaneSelectorForCluster(cluster.Name)
selector, err := metav1.LabelSelectorAsSelector(labelSelector)
if err != nil {
// Since we are building up the LabelSelector above, this should not fail
return errors.Wrap(err, "failed to parse label selector")
}
selector := internal.ControlPlaneSelectorForCluster(cluster.Name)
// Copy label selector to its status counterpart in string format.
// This is necessary for CRDs including scale subresources.
kcp.Status.Selector = selector.String()

ownedMachines, err := r.managementCluster.GetMachinesForCluster(ctx, util.ObjectKey(cluster), internal.OwnedControlPlaneMachines(kcp.Name))
ownedMachines, err := r.managementCluster.GetMachinesForCluster(ctx, util.ObjectKey(cluster), internal.OwnedControlPlaneMachines(kcp))
if err != nil {
return errors.Wrap(err, "failed to get list of owned machines")
}
Expand Down Expand Up @@ -406,7 +415,7 @@ func (r *KubeadmControlPlaneReconciler) scaleDownControlPlane(ctx context.Contex
return ctrl.Result{RequeueAfter: HealthCheckFailedRequeueAfter}, errors.Wrap(err, "etcd cluster is not healthy")
}

ownedMachines, err := r.managementCluster.GetMachinesForCluster(ctx, util.ObjectKey(cluster), internal.OwnedControlPlaneMachines(kcp.Name))
ownedMachines, err := r.managementCluster.GetMachinesForCluster(ctx, util.ObjectKey(cluster), internal.OwnedControlPlaneMachines(kcp))
if err != nil {
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -540,7 +549,7 @@ func (r *KubeadmControlPlaneReconciler) failureDomainForScaleUp(ctx context.Cont
if len(cluster.Status.FailureDomains) == 0 {
return nil, nil
}
ownedMachines, err := r.managementCluster.GetMachinesForCluster(ctx, util.ObjectKey(cluster), internal.OwnedControlPlaneMachines(kcp.Name))
ownedMachines, err := r.managementCluster.GetMachinesForCluster(ctx, util.ObjectKey(cluster), internal.OwnedControlPlaneMachines(kcp))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -590,7 +599,7 @@ func (r *KubeadmControlPlaneReconciler) reconcileDelete(ctx context.Context, clu
if err != nil {
return ctrl.Result{}, err
}
ownedMachines, err := r.managementCluster.GetMachinesForCluster(ctx, util.ObjectKey(cluster), internal.OwnedControlPlaneMachines(kcp.Name))
ownedMachines, err := r.managementCluster.GetMachinesForCluster(ctx, util.ObjectKey(cluster), internal.OwnedControlPlaneMachines(kcp))
if err != nil {
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -698,6 +707,42 @@ func (r *KubeadmControlPlaneReconciler) ClusterToKubeadmControlPlane(o handler.M
return nil
}

func (r *KubeadmControlPlaneReconciler) AdoptMachines(ctx context.Context, kcp *controlplanev1.KubeadmControlPlane, machines ...*clusterv1.Machine) error {
// We do an uncached full quorum read against the KCP to avoid re-adopting Machines the garbage collector just intentionally orphaned
// See https://github.com/kubernetes/kubernetes/issues/42639
uncached := controlplanev1.KubeadmControlPlane{}
err := r.uncachedClient.Get(ctx, client.ObjectKey{Namespace: kcp.Namespace, Name: kcp.Name}, &uncached)
if err != nil {
return fmt.Errorf("can't recheck DeletionTimestamp: %v", err)
}
if !uncached.DeletionTimestamp.IsZero() {
return fmt.Errorf("%v/%v has just been deleted at %v", kcp.GetNamespace(), kcp.GetName(), kcp.GetDeletionTimestamp())
}

for _, m := range machines {
patchHelper, err := patch.NewHelper(m, r.Client)
if err != nil {
return err
}

m.SetOwnerReferences(util.EnsureOwnerRef(m.GetOwnerReferences(), metav1.OwnerReference{
APIVersion: controlplanev1.GroupVersion.String(),
Kind: "KubeadmControlPlane",
Name: kcp.Name,
UID: kcp.UID,
Controller: pointer.BoolPtr(true),
BlockOwnerDeletion: pointer.BoolPtr(true),
}))

// Note that ValidateOwnerReferences() will reject this patch if another
// OwnerReference exists with controller=true.
if err := patchHelper.Patch(ctx, m); err != nil {
return err
}
}
return nil
}

func getMachineNode(ctx context.Context, crClient client.Client, machine *clusterv1.Machine) (*corev1.Node, error) {
nodeRef := machine.Status.NodeRef
if nodeRef == nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,115 @@ func TestReconcileClusterNoEndpoints(t *testing.T) {
g.Expect(machineList.Items).To(BeEmpty())
}

func TestKubeadmControlPlaneReconciler_adoption(t *testing.T) {
t.Run("adopts existing Machines", func(t *testing.T) {
g := NewWithT(t)

cluster, kcp, tmpl := createClusterWithControlPlane()
cluster.Spec.ControlPlaneEndpoint.Host = "foo"

fmc := &fakeManagementCluster{
Machines: []*clusterv1.Machine{},
ControlPlaneHealthy: true,
EtcdHealthy: true,
}
objs := []runtime.Object{cluster.DeepCopy(), kcp.DeepCopy(), tmpl.DeepCopy()}
for i := 0; i < 3; i++ {
m := &clusterv1.Machine{
ObjectMeta: metav1.ObjectMeta{
Namespace: cluster.Namespace,
Name: fmt.Sprintf("test-%d", i),
Labels: internal.ControlPlaneLabelsForCluster(cluster.Name),
},
}
objs = append(objs, m)
fmc.Machines = append(fmc.Machines, m)
}
fakeClient, err := fakeClient(objs...)
g.Expect(err).ToNot(HaveOccurred())

log.SetLogger(klogr.New())
r := &KubeadmControlPlaneReconciler{
Client: fakeClient,
Log: log.Log,
managementCluster: fmc,

uncachedClient: fakeClient,
}

g.Expect(r.reconcile(context.Background(), cluster, kcp, log.Log)).To(Equal(ctrl.Result{}))

machineList := &clusterv1.MachineList{}
g.Expect(fakeClient.List(context.Background(), machineList, client.InNamespace(cluster.Namespace))).To(Succeed())
g.Expect(machineList.Items).NotTo(BeEmpty())
g.Expect(machineList.Items).To(HaveLen(3))
for _, machine := range machineList.Items {
g.Expect(machine.OwnerReferences).To(HaveLen(1))
g.Expect(machine.OwnerReferences).To(ContainElement(*metav1.NewControllerRef(kcp, controlplanev1.GroupVersion.WithKind("KubeadmControlPlane"))))
}
})

t.Run("Deleted KubeadmControlPlanes don't adopt machines", func(t *testing.T) {
// Usually we won't get into the inner reconcile with a deleted control plane, but it's possible when deleting with "oprhanDependents":
// 1. The deletion timestamp is set in the API server, but our cache has not yet updated
// 2. The garbage collector removes our ownership reference from a Machine, triggering a re-reconcile (or we get unlucky with the periodic reconciliation)
// 3. We get into the inner reconcile function and re-adopt the Machine
// 4. The update to our cache for our deletion timestamp arrives
g := NewWithT(t)

cluster, kcp, tmpl := createClusterWithControlPlane()
cluster.Spec.ControlPlaneEndpoint.Host = "foo"

now := metav1.Now()
kcp.DeletionTimestamp = &now

fmc := &fakeManagementCluster{
Machines: []*clusterv1.Machine{},
ControlPlaneHealthy: true,
EtcdHealthy: true,
}
objs := []runtime.Object{cluster.DeepCopy(), kcp.DeepCopy(), tmpl.DeepCopy()}
for i := 0; i < 3; i++ {
m := &clusterv1.Machine{
ObjectMeta: metav1.ObjectMeta{
Namespace: cluster.Namespace,
Name: fmt.Sprintf("test-%d", i),
Labels: internal.ControlPlaneLabelsForCluster(cluster.Name),
},
}
objs = append(objs, m)
fmc.Machines = append(fmc.Machines, m)
}
fakeClient, err := fakeClient(objs...)
g.Expect(err).ToNot(HaveOccurred())

log.SetLogger(klogr.New())
r := &KubeadmControlPlaneReconciler{
Client: fakeClient,
Log: log.Log,
managementCluster: fmc,

uncachedClient: fakeClient,
}

result, err := r.reconcile(context.Background(), cluster, kcp, log.Log)
g.Expect(result).To(Equal(ctrl.Result{}))
g.Expect(err).To(HaveOccurred())

machineList := &clusterv1.MachineList{}
g.Expect(fakeClient.List(context.Background(), machineList, client.InNamespace(cluster.Namespace))).To(Succeed())
g.Expect(machineList.Items).NotTo(BeEmpty())
g.Expect(machineList.Items).To(HaveLen(3))
for _, machine := range machineList.Items {
g.Expect(machine.OwnerReferences).To(BeEmpty())
}
})

t.Run("TODO what should we do re: hashes on adopted Machines?", func(t *testing.T) {
t.Fail()
})
}

func TestReconcileInitializeControlPlane(t *testing.T) {
g := NewWithT(t)

Expand Down Expand Up @@ -1202,7 +1311,7 @@ func createClusterWithControlPlane() (*clusterv1.Cluster, *controlplanev1.Kubead
return cluster, kcp, genericMachineTemplate
}

func fakeClient() (client.Client, error) {
func fakeClient(initObjs ...runtime.Object) (client.Client, error) {
if err := clusterv1.AddToScheme(scheme.Scheme); err != nil {
return nil, err
}
Expand All @@ -1212,7 +1321,7 @@ func fakeClient() (client.Client, error) {
if err := controlplanev1.AddToScheme(scheme.Scheme); err != nil {
return nil, err
}
fakeClient := fake.NewFakeClientWithScheme(scheme.Scheme)
fakeClient := fake.NewFakeClientWithScheme(scheme.Scheme, initObjs...)
return fakeClient, nil
}

Expand Down Expand Up @@ -1323,6 +1432,19 @@ type fakeManagementCluster struct {
}

func (f *fakeManagementCluster) GetMachinesForCluster(ctx context.Context, cluster types.NamespacedName, filters ...func(machine *clusterv1.Machine) bool) ([]*clusterv1.Machine, error) {
ret := make([]*clusterv1.Machine, 0, len(f.Machines))
for _, m := range f.Machines {
if func() bool {
for _, f := range filters {
if !f(m) {
return false
}
}
return true
}() {
ret = append(ret, m)
}
}
return f.Machines, nil
}

Expand Down
45 changes: 41 additions & 4 deletions controlplane/kubeadm/internal/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/kubernetes/scheme"
Expand All @@ -52,8 +53,8 @@ type ManagementCluster struct {
}

// OwnedControlPlaneMachines returns a MachineFilter function to find all owned control plane machines.
// Usage: managementCluster.GetMachinesForCluster(ctx, cluster, OwnedControlPlaneMachines(controlPlane.Name))
func OwnedControlPlaneMachines(controlPlaneName string) func(machine *clusterv1.Machine) bool {
// Usage: managementCluster.GetMachinesForCluster(ctx, cluster, OwnedControlPlaneMachines(controlPlane))
func OwnedControlPlaneMachines(owner metav1.Object) func(machine *clusterv1.Machine) bool {
return func(machine *clusterv1.Machine) bool {
if machine == nil {
return false
Expand All @@ -62,10 +63,46 @@ func OwnedControlPlaneMachines(controlPlaneName string) func(machine *clusterv1.
if controllerRef == nil {
return false
}
return controllerRef.Kind == "KubeadmControlPlane" && controllerRef.Name == controlPlaneName
return controllerRef.Kind == "KubeadmControlPlane" && controllerRef.Name == owner.GetName() && controllerRef.UID == owner.GetUID()
}
}

func ControlPlaneMachines(clusterName string) func(machine *clusterv1.Machine) bool {
selector := ControlPlaneSelectorForCluster(clusterName)
return func(machine *clusterv1.Machine) bool {
if machine == nil {
return false
}
return selector.Matches(labels.Set(machine.Labels))
}
}

// AdoptableControlPlaneMachines returns a MachineFilter function to find all un-controlled control plane machines.
// Usage: managementCluster.GetMachinesForCluster(ctx, cluster, AdoptableControlPlaneMachines(cluster.Name, controlPlane))
func AdoptableControlPlaneMachines(clusterName string) func(machine *clusterv1.Machine) bool {
type ftype func(*clusterv1.Machine) bool

This comment has been minimized.

Copy link
@chuckha

chuckha Feb 20, 2020

i know this is beside the point, but there are some new functions to help you out here.

https://github.com/kubernetes-sigs/cluster-api/blob/master/controlplane/kubeadm/internal/machine_filters.go#L29

This comment has been minimized.

Copy link
@sethp-nr

sethp-nr Feb 20, 2020

Author

Indeed! I'm looking forward to refining this code with that before I open the PR. Thanks for taking a look :)

and := func(fs ...ftype) ftype {
return func(machine *clusterv1.Machine) bool {
for _, f := range fs {
if !f(machine) {
return false
}
}
return true
}
}
notNil := func(machine *clusterv1.Machine) bool {
return machine != nil
}
return and(
notNil,
ControlPlaneMachines(clusterName),
func(machine *clusterv1.Machine) bool {
return metav1.GetControllerOf(machine) == nil
},
)
}

// HasDeletionTimestamp returns a MachineFilter function to find all machines
// that have a deletion timestamp.
func HasDeletionTimestamp() func(machine *clusterv1.Machine) bool {
Expand Down Expand Up @@ -219,7 +256,7 @@ func (m *ManagementCluster) healthCheck(ctx context.Context, check healthCheck,
}

// Make sure Cluster API is aware of all the nodes.
machines, err := m.GetMachinesForCluster(ctx, clusterKey, OwnedControlPlaneMachines(controlPlaneName))
machines, err := m.GetMachinesForCluster(ctx, clusterKey, ControlPlaneMachines(clusterKey.Name))
if err != nil {
return err
}
Expand Down
16 changes: 12 additions & 4 deletions controlplane/kubeadm/internal/cluster_labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ limitations under the License.
package internal

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha3"
controlplanev1 "sigs.k8s.io/cluster-api/controlplane/kubeadm/api/v1alpha3"
)
Expand All @@ -39,8 +40,15 @@ func ControlPlaneLabelsForCluster(clusterName string) map[string]string {
}

// ControlPlaneSelectorForCluster returns the label selector necessary to get control plane machines for a given cluster.
func ControlPlaneSelectorForCluster(clusterName string) *metav1.LabelSelector {
return &metav1.LabelSelector{
MatchLabels: ControlPlaneLabelsForCluster(clusterName),
func ControlPlaneSelectorForCluster(clusterName string) labels.Selector {
must := func(r *labels.Requirement, err error) *labels.Requirement {
if err != nil {
panic(err)
}
return r
}
return labels.NewSelector().Add(
*must(labels.NewRequirement(clusterv1.ClusterLabelName, selection.Equals, []string{clusterName})),
*must(labels.NewRequirement(clusterv1.MachineControlPlaneLabelName, selection.Exists, []string{})),
)
}
Loading

0 comments on commit 7931678

Please sign in to comment.