From 891c4a7de25b3935a440b83d93889c5c6da083f5 Mon Sep 17 00:00:00 2001 From: Amanuel Engeda Date: Thu, 11 Jul 2024 16:20:22 -0700 Subject: [PATCH] Add kubelet for both nodepool and ec2nodeclass --- pkg/apis/v1/ec2nodeclass.go | 4 +-- pkg/apis/v1/labels.go | 1 + pkg/cloudprovider/cloudprovider.go | 9 ++++++ pkg/cloudprovider/drift.go | 27 ++++++++++++++++- pkg/controllers/nodeclass/hash/controller.go | 11 +++++++ pkg/utils/utils.go | 32 ++++++++++++++++++++ 6 files changed, 81 insertions(+), 3 deletions(-) diff --git a/pkg/apis/v1/ec2nodeclass.go b/pkg/apis/v1/ec2nodeclass.go index 4dac7e882417..48be5a41b6ee 100644 --- a/pkg/apis/v1/ec2nodeclass.go +++ b/pkg/apis/v1/ec2nodeclass.go @@ -92,7 +92,7 @@ type EC2NodeClassSpec struct { // +kubebuilder:validation:XValidation:message="evictionSoft OwnerKey does not have a matching evictionSoftGracePeriod",rule="has(self.evictionSoft) ? self.evictionSoft.all(e, (e in self.evictionSoftGracePeriod)):true" // +kubebuilder:validation:XValidation:message="evictionSoftGracePeriod OwnerKey does not have a matching evictionSoft",rule="has(self.evictionSoftGracePeriod) ? self.evictionSoftGracePeriod.all(e, (e in self.evictionSoft)):true" // +optional - Kubelet *KubeletConfiguration `json:"kubelet,omitempty"` + Kubelet *KubeletConfiguration `json:"kubelet,omitempty" hash:"ignore"` // BlockDeviceMappings to be applied to provisioned nodes. // +kubebuilder:validation:XValidation:message="must have only one blockDeviceMappings with rootVolume",rule="self.filter(x, has(x.rootVolume)?x.rootVolume==true:false).size() <= 1" // +kubebuilder:validation:MaxItems:=50 @@ -416,7 +416,7 @@ type EC2NodeClass struct { // 1. A field changes its default value for an existing field that is already hashed // 2. A field is added to the hash calculation with an already-set value // 3. A field is removed from the hash calculations -const EC2NodeClassHashVersion = "v2" +const EC2NodeClassHashVersion = "v3" func (in *EC2NodeClass) Hash() string { return fmt.Sprint(lo.Must(hashstructure.Hash(in.Spec, hashstructure.FormatV2, &hashstructure.HashOptions{ diff --git a/pkg/apis/v1/labels.go b/pkg/apis/v1/labels.go index adda9ce6f820..cfcbf1d06932 100644 --- a/pkg/apis/v1/labels.go +++ b/pkg/apis/v1/labels.go @@ -119,6 +119,7 @@ var ( LabelInstanceAcceleratorManufacturer = apis.Group + "/instance-accelerator-manufacturer" LabelInstanceAcceleratorCount = apis.Group + "/instance-accelerator-count" AnnotationEC2NodeClassHash = apis.Group + "/ec2nodeclass-hash" + AnnotationKubeletHash = apis.Group + "/kubelet-drift-hash" AnnotationEC2NodeClassHashVersion = apis.Group + "/ec2nodeclass-hash-version" AnnotationInstanceTagged = apis.Group + "/tagged" diff --git a/pkg/cloudprovider/cloudprovider.go b/pkg/cloudprovider/cloudprovider.go index 438f033f34b8..897dbc500ef5 100644 --- a/pkg/cloudprovider/cloudprovider.go +++ b/pkg/cloudprovider/cloudprovider.go @@ -86,6 +86,14 @@ func (c *CloudProvider) Create(ctx context.Context, nodeClaim *corev1.NodeClaim) // We treat a failure to resolve the NodeClass as an ICE since this means there is no capacity possibilities for this NodeClaim return nil, cloudprovider.NewInsufficientCapacityError(fmt.Errorf("resolving node class, %w", err)) } + nodePool, err := utils.ResolveNodePoolFromNodeClaim(ctx, c.kubeClient, nodeClaim) + if err != nil { + return nil, err + } + kubeletHash, err := utils.GetHashKubelet(nodePool.Annotations[corev1.KubeletCompatabilityAnnotationKey], nodeClass) + if err != nil { + return nil, err + } nodeClassReady := nodeClass.StatusConditions().Get(status.ConditionReady) if !nodeClassReady.IsTrue() { return nil, fmt.Errorf("resolving ec2nodeclass, %s", nodeClassReady.Message) @@ -106,6 +114,7 @@ func (c *CloudProvider) Create(ctx context.Context, nodeClaim *corev1.NodeClaim) }) nc := c.instanceToNodeClaim(instance, instanceType, nodeClass) nc.Annotations = lo.Assign(nodeClass.Annotations, map[string]string{ + providerv1.AnnotationKubeletHash: kubeletHash, providerv1.AnnotationEC2NodeClassHash: nodeClass.Hash(), providerv1.AnnotationEC2NodeClassHashVersion: providerv1.EC2NodeClassHashVersion, }) diff --git a/pkg/cloudprovider/drift.go b/pkg/cloudprovider/drift.go index fa11ffc12225..76a6e326e1ef 100644 --- a/pkg/cloudprovider/drift.go +++ b/pkg/cloudprovider/drift.go @@ -43,6 +43,10 @@ func (c *CloudProvider) isNodeClassDrifted(ctx context.Context, nodeClaim *corev if drifted := c.areStaticFieldsDrifted(nodeClaim, nodeClass); drifted != "" { return drifted, nil } + kubeletDrifted, err := c.isKubeletConfigurationDrifted(nodeClaim, nodeClass, nodePool) + if err != nil { + return "", err + } instance, err := c.getInstance(ctx, nodeClaim.Status.ProviderID) if err != nil { return "", err @@ -59,7 +63,7 @@ func (c *CloudProvider) isNodeClassDrifted(ctx context.Context, nodeClaim *corev if err != nil { return "", fmt.Errorf("calculating subnet drift, %w", err) } - drifted := lo.FindOrElse([]cloudprovider.DriftReason{amiDrifted, securitygroupDrifted, subnetDrifted}, "", func(i cloudprovider.DriftReason) bool { + drifted := lo.FindOrElse([]cloudprovider.DriftReason{amiDrifted, securitygroupDrifted, subnetDrifted, kubeletDrifted}, "", func(i cloudprovider.DriftReason) bool { return string(i) != "" }) return drifted, nil @@ -135,6 +139,27 @@ func (c *CloudProvider) areStaticFieldsDrifted(nodeClaim *corev1.NodeClaim, node return lo.Ternary(nodeClassHash != nodeClaimHash, NodeClassDrift, "") } +// Remove once v1beta1 is dropped +func (c *CloudProvider) isKubeletConfigurationDrifted(nodeClaim *corev1.NodeClaim, nodeClass *providerv1.EC2NodeClass, nodePool *corev1.NodePool) (cloudprovider.DriftReason, error) { + kubeletHash, err := utils.GetHashKubelet(nodePool.Annotations[corev1.KubeletCompatabilityAnnotationKey], nodeClass) + if err != nil { + return "", err + } + nodeClaimKubeletHash, foundNodeClaimKubeletHash := nodeClaim.Annotations[providerv1.AnnotationKubeletHash] + nodeClassHashVersion, foundNodeClassHashVersion := nodeClass.Annotations[providerv1.AnnotationEC2NodeClassHashVersion] + nodeClaimHashVersion, foundNodeClaimHashVersion := nodeClaim.Annotations[providerv1.AnnotationEC2NodeClassHashVersion] + + if !foundNodeClaimKubeletHash || !foundNodeClaimHashVersion || !foundNodeClassHashVersion { + return "", nil + } + + // validate that the hash version for the EC2NodeClass is the same as the NodeClaim before evaluating for static drift + if nodeClassHashVersion != nodeClaimHashVersion { + return "", nil + } + return lo.Ternary(kubeletHash != nodeClaimKubeletHash, cloudprovider.DriftReason("KubeletDrifted"), ""), nil +} + func (c *CloudProvider) getInstance(ctx context.Context, providerID string) (*instance.Instance, error) { // Get InstanceID to fetch from EC2 instanceID, err := utils.ParseInstanceID(providerID) diff --git a/pkg/controllers/nodeclass/hash/controller.go b/pkg/controllers/nodeclass/hash/controller.go index 351dac8fda47..24ea2e0248d3 100644 --- a/pkg/controllers/nodeclass/hash/controller.go +++ b/pkg/controllers/nodeclass/hash/controller.go @@ -31,6 +31,7 @@ import ( corev1 "sigs.k8s.io/karpenter/pkg/apis/v1" providerv1 "github.com/aws/karpenter-provider-aws/pkg/apis/v1" + "github.com/aws/karpenter-provider-aws/pkg/utils" ) type Controller struct { @@ -93,6 +94,15 @@ func (c *Controller) updateNodeClaimHash(ctx context.Context, nodeClass *provide nc := ncList.Items[i] stored := nc.DeepCopy() + nodePool, err := utils.ResolveNodePoolFromNodeClaim(ctx, c.kubeClient, &nc) + if err != nil { + return err + } + kubeletHash, err := utils.GetHashKubelet(nodePool.Annotations[corev1.KubeletCompatabilityAnnotationKey], nodeClass) + if err != nil { + return err + } + if nc.Annotations[providerv1.AnnotationEC2NodeClassHashVersion] != providerv1.EC2NodeClassHashVersion { nc.Annotations = lo.Assign(nc.Annotations, map[string]string{ providerv1.AnnotationEC2NodeClassHashVersion: providerv1.EC2NodeClassHashVersion, @@ -103,6 +113,7 @@ func (c *Controller) updateNodeClaimHash(ctx context.Context, nodeClass *provide if nc.StatusConditions().Get(corev1.ConditionTypeDrifted) == nil { nc.Annotations = lo.Assign(nc.Annotations, map[string]string{ providerv1.AnnotationEC2NodeClassHash: nodeClass.Hash(), + providerv1.AnnotationKubeletHash: kubeletHash, }) } diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index 14e272a54017..1e7655637125 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -15,13 +15,22 @@ limitations under the License. package utils import ( + "context" "encoding/json" "fmt" "regexp" "strings" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + coreapis "sigs.k8s.io/karpenter/pkg/apis" + corev1 "sigs.k8s.io/karpenter/pkg/apis/v1" "sigs.k8s.io/karpenter/pkg/apis/v1beta1" + "github.com/mitchellh/hashstructure/v2" + v1 "github.com/aws/karpenter-provider-aws/pkg/apis/v1" "github.com/aws/aws-sdk-go/aws" @@ -89,6 +98,29 @@ func GetKubelet(kubeletAnnotation string, enc *v1.EC2NodeClass) (*v1.KubeletConf return enc.Spec.Kubelet, nil } +func GetHashKubelet(kubeletAnnotation string, enc *v1.EC2NodeClass) (string, error) { + kubelet, err := GetKubelet(kubeletAnnotation, enc) + if err != nil { + return "", err + } + return fmt.Sprint(lo.Must(hashstructure.Hash(kubelet, hashstructure.FormatV2, &hashstructure.HashOptions{ + SlicesAsSets: true, + IgnoreZeroValue: true, + ZeroNil: true, + }))), nil +} + +func ResolveNodePoolFromNodeClaim(ctx context.Context, kubeClient client.Client, nodeClaim *corev1.NodeClaim) (*corev1.NodePool, error) { + if nodePoolName, ok := nodeClaim.Labels[corev1.NodePoolLabelKey]; ok { + nodePool := &corev1.NodePool{} + if err := kubeClient.Get(ctx, types.NamespacedName{Name: nodePoolName}, nodePool); err != nil { + return nil, err + } + return nodePool, nil + } + return nil, errors.NewNotFound(schema.GroupResource{Group: coreapis.Group, Resource: "nodepools"}, "") +} + func updateKubeletType(kubelet *v1beta1.KubeletConfiguration) *v1.KubeletConfiguration { resultKubelet := &v1.KubeletConfiguration{}