Skip to content

Commit

Permalink
Add DMP phases
Browse files Browse the repository at this point in the history
  • Loading branch information
Jont828 committed Aug 8, 2023
1 parent 759e5dd commit d09e5ad
Show file tree
Hide file tree
Showing 3 changed files with 293 additions and 290 deletions.
3 changes: 3 additions & 0 deletions exp/internal/controllers/machinepool_controller_phases.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@ func (r *MachinePoolReconciler) ensureInfraMachineOnwerRefs(ctx context.Context,
if !ok {
return errors.Errorf("failed to patch ownerRef for infraMachine %q because no Machine has an infraRef pointing to it", infraMachine.GetName())
}
log.Info("Machine GVK is", "gvk", machine.GroupVersionKind())
machineRef := metav1.NewControllerRef(&machine, machine.GroupVersionKind())
if !util.HasOwnerRef(ownerRefs, *machineRef) {
log.V(2).Info("Setting ownerRef on infraMachine", "infraMachine", infraMachine.GetName(), "namespace", infraMachine.GetNamespace(), "machine", machine.GetName())
Expand All @@ -460,6 +461,8 @@ func (r *MachinePoolReconciler) ensureInfraMachineOnwerRefs(ctx context.Context,
if err := patchHelper.Patch(ctx, infraMachine); err != nil {
return errors.Wrapf(err, "failed to patch %s", klog.KObj(infraMachine))
}

log.V(4).Info("Successfully set ownerRef on infraMachine", "infraMachine", infraMachine.GetName(), "namespace", infraMachine.GetNamespace(), "machine", machine.GetName())
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,10 @@ package controllers
import (
"context"
"fmt"
"math/rand"
"sort"
"strings"
"time"

"github.com/blang/semver"
"github.com/pkg/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
Expand All @@ -39,7 +34,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/kind/pkg/cluster/constants"

clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/controllers/external"
Expand All @@ -50,10 +44,8 @@ import (
infrav1 "sigs.k8s.io/cluster-api/test/infrastructure/docker/api/v1beta1"
infraexpv1 "sigs.k8s.io/cluster-api/test/infrastructure/docker/exp/api/v1beta1"
"sigs.k8s.io/cluster-api/test/infrastructure/docker/internal/docker"
"sigs.k8s.io/cluster-api/test/infrastructure/kind"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/conditions"
"sigs.k8s.io/cluster-api/util/labels/format"
"sigs.k8s.io/cluster-api/util/patch"
"sigs.k8s.io/cluster-api/util/predicates"
)
Expand Down Expand Up @@ -294,7 +286,6 @@ func (r *DockerMachinePoolReconciler) reconcileNormal(ctx context.Context, clust
conditions.MarkFalse(dockerMachinePool, expv1.ReplicasReadyCondition, expv1.WaitingForReplicasReadyReason, clusterv1.ConditionSeverityInfo, "")

return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
// return res, nil
}

func getDockerMachines(ctx context.Context, c client.Client, cluster clusterv1.Cluster, machinePool expv1.MachinePool, dockerMachinePool infraexpv1.DockerMachinePool) (*infrav1.DockerMachineList, error) {
Expand All @@ -310,287 +301,6 @@ func getDockerMachines(ctx context.Context, c client.Client, cluster clusterv1.C
return dockerMachineList, nil
}

// DeleteExtraDockerMachines deletes any DockerMachines owned by the DockerMachinePool that reference an invalid providerID, i.e. not in the latest copy of the node pool instances.
func (r *DockerMachinePoolReconciler) DeleteExtraDockerMachines(ctx context.Context, cluster *clusterv1.Cluster, machinePool *expv1.MachinePool, dockerMachinePool *infraexpv1.DockerMachinePool) error {
log := ctrl.LoggerFrom(ctx)
// log.V(2).Info("Deleting orphaned machines", "dockerMachinePool", dockerMachinePool.Name, "namespace", dockerMachinePool.Namespace, "nodePoolMachineStatuses", nodePoolMachineStatuses)
dockerMachineList, err := getDockerMachines(ctx, r.Client, *cluster, *machinePool, *dockerMachinePool)
if err != nil {
return err
}

log.V(2).Info("DockerMachineList kind is", "kind", dockerMachineList.GetObjectKind())

dockerMachinesToDelete, err := getDockerMachinesToDelete(ctx, dockerMachineList.Items, cluster, machinePool, dockerMachinePool)
if err != nil {
return err
}

for _, dockerMachine := range dockerMachinesToDelete {
log.V(2).Info("Deleting DockerMachine", "dockerMachine", dockerMachine.Name, "namespace", dockerMachine.Namespace)
if err := r.deleteMachinePoolMachine(ctx, dockerMachine, *machinePool); err != nil {
return err
}
}

return nil
}

func getDockerMachinesToDelete(ctx context.Context, dockerMachines []infrav1.DockerMachine, cluster *clusterv1.Cluster, machinePool *expv1.MachinePool, dockerMachinePool *infraexpv1.DockerMachinePool) ([]infrav1.DockerMachine, error) {
log := ctrl.LoggerFrom(ctx)

// TODO: sort these by delete priority
dockerMachinesToDelete := []infrav1.DockerMachine{}
numToDelete := len(dockerMachines) - int(*machinePool.Spec.Replicas)
// TODO: sort list of dockerMachines
labelFilters := map[string]string{dockerMachinePoolLabel: dockerMachinePool.Name}

// Sort priority delete to the front of the list
sort.Slice(dockerMachines, func(i, j int) bool {
_, iHasAnnotation := dockerMachines[i].Annotations[clusterv1.DeleteMachineAnnotation]
_, jHasAnnotation := dockerMachines[j].Annotations[clusterv1.DeleteMachineAnnotation]

if iHasAnnotation && jHasAnnotation {
return dockerMachines[i].Name < dockerMachines[j].Name
}

return iHasAnnotation
})

for _, dockerMachine := range dockerMachines {
// externalMachine, err := docker.NewMachine(ctx, cluster, dockerMachine.Name, labelFilters)
if numToDelete > 0 {
dockerMachinesToDelete = append(dockerMachinesToDelete, dockerMachine)
numToDelete--
} else {
externalMachine, err := docker.NewMachine(ctx, cluster, dockerMachine.Name, labelFilters)
if err != nil {
// TODO: should we delete anyways
return nil, err
}
if !isMachineMatchingInfrastructureSpec(ctx, externalMachine, machinePool, dockerMachinePool) {
dockerMachinesToDelete = append(dockerMachinesToDelete, dockerMachine)
}
}

log.V(2).Info("Keeping DockerMachine, nothing to do", "dockerMachine", dockerMachine.Name, "namespace", dockerMachine.Namespace)
}

return dockerMachinesToDelete, nil
}

func (r *DockerMachinePoolReconciler) deleteMachinePoolMachine(ctx context.Context, dockerMachine infrav1.DockerMachine, machinePool expv1.MachinePool) error {
log := ctrl.LoggerFrom(ctx)
machine, err := util.GetOwnerMachine(ctx, r.Client, dockerMachine.ObjectMeta)
// Not found doesn't return an error, so we need to check for nil.
if err != nil {
return errors.Wrapf(err, "error getting owner Machine for DockerMachine %s/%s", dockerMachine.Namespace, dockerMachine.Name)
}
if machine == nil {
// If MachinePool is deleted, the DockerMachine and owner Machine doesn't already exist, then it will never come online.
if mpDeleted := isMachinePoolDeleted(ctx, r.Client, &machinePool); mpDeleted {
log.Info("DockerMachine is orphaned and MachinePool is deleted, deleting DockerMachine", "dockerMachine", dockerMachine.Name, "namespace", dockerMachine.Namespace)
if err := r.Client.Delete(ctx, &dockerMachine); err != nil {
return errors.Wrapf(err, "failed to delete orphaned DockerMachine %s/%s", dockerMachine.Namespace, dockerMachine.Name)
}
} else { // If the MachinePool still exists, then the Machine will be created, so we need to wait for that to happen.
return errors.Errorf("DockerMachine %s/%s has no parent Machine, will reattempt deletion once parent Machine is present", dockerMachine.Namespace, dockerMachine.Name)
}
} else {
if err := r.Client.Delete(ctx, machine); err != nil {
return errors.Wrapf(err, "failed to delete Machine %s/%s", machine.Namespace, machine.Name)
}
}

return nil
}

// CreateNewReplicas creates a DockerMachine for each instance returned by the node pool if it doesn't exist.
func (r *DockerMachinePoolReconciler) CreateNewReplicas(ctx context.Context, cluster *clusterv1.Cluster, machinePool *expv1.MachinePool, dockerMachinePool *infraexpv1.DockerMachinePool) error {
log := ctrl.LoggerFrom(ctx)

labelFilters := map[string]string{dockerMachinePoolLabel: dockerMachinePool.Name}

machines, err := docker.ListMachinesByCluster(ctx, cluster, labelFilters)
if err != nil {
return errors.Wrapf(err, "failed to list all machines in the cluster")
}

matchingMachineCount := len(machinesMatchingInfrastructureSpec(ctx, machines, machinePool, dockerMachinePool))
log.Info("MatchingMachineCount", "count", matchingMachineCount)
numToCreate := int(*machinePool.Spec.Replicas) - matchingMachineCount
for i := 0; i < numToCreate; i++ {
createdMachine, err := createReplica(ctx, cluster, machinePool, dockerMachinePool)
if err != nil {
return errors.Wrap(err, "failed to create a new docker machine")
}

if err := r.createDockerMachine(ctx, createdMachine.Name(), cluster, machinePool, dockerMachinePool); err != nil {
return errors.Wrap(err, "failed to create a new docker machine")
}
}

return nil
}

// CreateNewDockerMachines creates a DockerMachine for each instance returned by the node pool if it doesn't exist.
func (r *DockerMachinePoolReconciler) CreateNewDockerMachines(ctx context.Context, cluster *clusterv1.Cluster, machinePool *expv1.MachinePool, dockerMachinePool *infraexpv1.DockerMachinePool) error {
_ = ctrl.LoggerFrom(ctx)

labelFilters := map[string]string{dockerMachinePoolLabel: dockerMachinePool.Name}

machines, err := docker.ListMachinesByCluster(ctx, cluster, labelFilters)
if err != nil {
return errors.Wrapf(err, "failed to list all machines in the cluster")
}

matchingMachines := machinesMatchingInfrastructureSpec(ctx, machines, machinePool, dockerMachinePool)
for _, machine := range matchingMachines {
if err := r.Client.Get(ctx, client.ObjectKey{Namespace: machinePool.Namespace, Name: machine.Name()}, &infrav1.DockerMachine{}); err != nil {
if apierrors.IsNotFound(err) {
if err := r.createDockerMachine(ctx, machine.Name(), cluster, machinePool, dockerMachinePool); err != nil {
return errors.Wrap(err, "failed to create a new docker machine")
}
} else {
return errors.Wrap(err, "failed to get docker machine")
}
}
}

return nil
}

func createReplica(ctx context.Context, cluster *clusterv1.Cluster, machinePool *expv1.MachinePool, dockerMachinePool *infraexpv1.DockerMachinePool) (*docker.Machine, error) {
name := fmt.Sprintf("worker-%s", util.RandomString(6))
labelFilters := map[string]string{dockerMachinePoolLabel: dockerMachinePool.Name}
externalMachine, err := docker.NewMachine(ctx, cluster, name, labelFilters)
if err != nil {
return nil, errors.Wrapf(err, "failed to create helper for managing the externalMachine named %s", name)
}

// NOTE: FailureDomains don't mean much in CAPD since it's all local, but we are setting a label on
// each container, so we can check placement.
labels := map[string]string{}
for k, v := range labelFilters {
labels[k] = v
}

if len(machinePool.Spec.FailureDomains) > 0 {
// For MachinePools placement is expected to be managed by the underlying infrastructure primitive, but
// given that there is no such an thing in CAPD, we are picking a random failure domain.
randomIndex := rand.Intn(len(machinePool.Spec.FailureDomains)) //nolint:gosec
for k, v := range docker.FailureDomainLabel(&machinePool.Spec.FailureDomains[randomIndex]) {
labels[k] = v
}
}

if err := externalMachine.Create(ctx, dockerMachinePool.Spec.Template.CustomImage, constants.WorkerNodeRoleValue, machinePool.Spec.Template.Spec.Version, labels, dockerMachinePool.Spec.Template.ExtraMounts); err != nil {
return nil, errors.Wrapf(err, "failed to create docker machine with name %s", name)
}
return externalMachine, nil
}

func (r *DockerMachinePoolReconciler) createDockerMachine(ctx context.Context, name string, cluster *clusterv1.Cluster, machinePool *expv1.MachinePool, dockerMachinePool *infraexpv1.DockerMachinePool) error {
labels := map[string]string{
clusterv1.ClusterNameLabel: cluster.Name,
clusterv1.MachinePoolNameLabel: format.MustFormatValue(machinePool.Name),
}
dockerMachine := &infrav1.DockerMachine{
ObjectMeta: metav1.ObjectMeta{
Namespace: dockerMachinePool.Namespace,
Name: name,
Labels: labels,
Annotations: make(map[string]string),
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: dockerMachinePool.APIVersion,
Kind: dockerMachinePool.Kind,
Name: dockerMachinePool.Name,
UID: dockerMachinePool.UID,
},
// Note: Since the MachinePool controller has not created its parent Machine yet, we want to set the DockerMachinePool as the owner so it's not orphaned.
},
},
Spec: infrav1.DockerMachineSpec{
CustomImage: dockerMachinePool.Spec.Template.CustomImage,
PreLoadImages: dockerMachinePool.Spec.Template.PreLoadImages,
},
}

// log.V(2).Info("Instance name for dockerMachine is", "instanceName", nodePoolMachineStatus.Name, "dockerMachine", dockerMachine.GetName())

if err := r.Client.Create(ctx, dockerMachine); err != nil {
return errors.Wrap(err, "failed to create dockerMachine")
}

return nil
}

func isMachineMatchingInfrastructureSpec(ctx context.Context, machine *docker.Machine, machinePool *expv1.MachinePool, dockerMachinePool *infraexpv1.DockerMachinePool) bool {
// NOTE: With the current implementation we are checking if the machine is using a kindest/node image for the expected version,
// but not checking if the machine has the expected extra.mounts or pre.loaded images.

semVer, err := semver.Parse(strings.TrimPrefix(*machinePool.Spec.Template.Spec.Version, "v"))
if err != nil {
// TODO: consider if to return an error
panic(errors.Wrap(err, "failed to parse DockerMachine version").Error())
}

kindMapping := kind.GetMapping(semVer, dockerMachinePool.Spec.Template.CustomImage)

return machine.ContainerImage() == kindMapping.Image
}

func machinesMatchingInfrastructureSpec(ctx context.Context, machines []*docker.Machine, machinePool *expv1.MachinePool, dockerMachinePool *infraexpv1.DockerMachinePool) []*docker.Machine {
var matchingMachines []*docker.Machine
for _, machine := range machines {
if isMachineMatchingInfrastructureSpec(ctx, machine, machinePool, dockerMachinePool) {
matchingMachines = append(matchingMachines, machine)
}
}

return matchingMachines
}

// func (r *DockerMachinePoolReconciler) initNodePoolMachineStatusList(ctx context.Context, dockerMachines []infrav1.DockerMachine, dockerMachinePool *infraexpv1.DockerMachinePool) ([]dockerexp.NodePoolMachineStatus, error) {
// log := ctrl.LoggerFrom(ctx)

// nodePoolInstances := make([]dockerexp.NodePoolMachineStatus, len(dockerMachines))
// for i := range dockerMachines {
// // Needed to avoid implicit memory aliasing of the loop variable.
// dockerMachine := dockerMachines[i]

// // Try to get the owner machine to see if it has a delete annotation. If it doesn't exist, we'll requeue until it does.
// machine, err := util.GetOwnerMachine(ctx, r.Client, dockerMachine.ObjectMeta)
// if err != nil {
// return nil, errors.Wrapf(err, "failed to get owner Machine for DockerMachine %s/%s", dockerMachine.Namespace, dockerMachine.Name)
// }

// hasDeleteAnnotation := false
// if machine != nil {
// if machine.Annotations != nil {
// _, hasDeleteAnnotation = machine.Annotations[clusterv1.DeleteMachineAnnotation]
// }
// } else {
// sampleMachine := &clusterv1.Machine{}
// sampleMachine.APIVersion = clusterv1.GroupVersion.String()
// sampleMachine.Kind = "Machine"
// log.Info("DockerMachine has no parent Machine, will set up a watch and initNodePoolMachineStatusList", "dockerMachine", dockerMachine.Name, "namespace", dockerMachine.Namespace)
// // If machine == nil, then no Machine was found in the ownerRefs at all. Don't block nodepool reconciliation, but set up a Watch() instead.
// if err := r.externalTracker.Watch(log, sampleMachine, handler.EnqueueRequestsFromMapFunc(r.machineToDockerMachinePoolMapper(ctx, &dockerMachine, dockerMachinePool))); err != nil {
// return nil, errors.Wrapf(err, "failed to set watch for Machines %s/%s", dockerMachine.Namespace, dockerMachine.Name)
// }
// }

// nodePoolInstances[i] = dockerexp.NodePoolMachineStatus{
// Name: dockerMachine.Name,
// PrioritizeDelete: hasDeleteAnnotation,
// }
// }

// return nodePoolInstances, nil
// }

func isMachinePoolDeleted(ctx context.Context, c client.Client, machinePool *expv1.MachinePool) bool {
mp := &expv1.MachinePool{}
key := client.ObjectKey{Name: machinePool.Name, Namespace: machinePool.Namespace}
Expand Down
Loading

0 comments on commit d09e5ad

Please sign in to comment.