Skip to content

Commit

Permalink
Add kubelet for both nodepool and ec2nodeclass
Browse files Browse the repository at this point in the history
  • Loading branch information
engedaam committed Jul 11, 2024
1 parent 32482ac commit 891c4a7
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 3 deletions.
4 changes: 2 additions & 2 deletions pkg/apis/v1/ec2nodeclass.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ type EC2NodeClassSpec struct {
// +kubebuilder:validation:XValidation:message="evictionSoft OwnerKey does not have a matching evictionSoftGracePeriod",rule="has(self.evictionSoft) ? self.evictionSoft.all(e, (e in self.evictionSoftGracePeriod)):true"
// +kubebuilder:validation:XValidation:message="evictionSoftGracePeriod OwnerKey does not have a matching evictionSoft",rule="has(self.evictionSoftGracePeriod) ? self.evictionSoftGracePeriod.all(e, (e in self.evictionSoft)):true"
// +optional
Kubelet *KubeletConfiguration `json:"kubelet,omitempty"`
Kubelet *KubeletConfiguration `json:"kubelet,omitempty" hash:"ignore"`
// BlockDeviceMappings to be applied to provisioned nodes.
// +kubebuilder:validation:XValidation:message="must have only one blockDeviceMappings with rootVolume",rule="self.filter(x, has(x.rootVolume)?x.rootVolume==true:false).size() <= 1"
// +kubebuilder:validation:MaxItems:=50
Expand Down Expand Up @@ -416,7 +416,7 @@ type EC2NodeClass struct {
// 1. A field changes its default value for an existing field that is already hashed
// 2. A field is added to the hash calculation with an already-set value
// 3. A field is removed from the hash calculations
const EC2NodeClassHashVersion = "v2"
const EC2NodeClassHashVersion = "v3"

func (in *EC2NodeClass) Hash() string {
return fmt.Sprint(lo.Must(hashstructure.Hash(in.Spec, hashstructure.FormatV2, &hashstructure.HashOptions{
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/v1/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ var (
LabelInstanceAcceleratorManufacturer = apis.Group + "/instance-accelerator-manufacturer"
LabelInstanceAcceleratorCount = apis.Group + "/instance-accelerator-count"
AnnotationEC2NodeClassHash = apis.Group + "/ec2nodeclass-hash"
AnnotationKubeletHash = apis.Group + "/kubelet-drift-hash"
AnnotationEC2NodeClassHashVersion = apis.Group + "/ec2nodeclass-hash-version"
AnnotationInstanceTagged = apis.Group + "/tagged"

Expand Down
9 changes: 9 additions & 0 deletions pkg/cloudprovider/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@ func (c *CloudProvider) Create(ctx context.Context, nodeClaim *corev1.NodeClaim)
// We treat a failure to resolve the NodeClass as an ICE since this means there is no capacity possibilities for this NodeClaim
return nil, cloudprovider.NewInsufficientCapacityError(fmt.Errorf("resolving node class, %w", err))
}
nodePool, err := utils.ResolveNodePoolFromNodeClaim(ctx, c.kubeClient, nodeClaim)
if err != nil {
return nil, err
}
kubeletHash, err := utils.GetHashKubelet(nodePool.Annotations[corev1.KubeletCompatabilityAnnotationKey], nodeClass)
if err != nil {
return nil, err
}
nodeClassReady := nodeClass.StatusConditions().Get(status.ConditionReady)
if !nodeClassReady.IsTrue() {
return nil, fmt.Errorf("resolving ec2nodeclass, %s", nodeClassReady.Message)
Expand All @@ -106,6 +114,7 @@ func (c *CloudProvider) Create(ctx context.Context, nodeClaim *corev1.NodeClaim)
})
nc := c.instanceToNodeClaim(instance, instanceType, nodeClass)
nc.Annotations = lo.Assign(nodeClass.Annotations, map[string]string{
providerv1.AnnotationKubeletHash: kubeletHash,
providerv1.AnnotationEC2NodeClassHash: nodeClass.Hash(),
providerv1.AnnotationEC2NodeClassHashVersion: providerv1.EC2NodeClassHashVersion,
})
Expand Down
27 changes: 26 additions & 1 deletion pkg/cloudprovider/drift.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ func (c *CloudProvider) isNodeClassDrifted(ctx context.Context, nodeClaim *corev
if drifted := c.areStaticFieldsDrifted(nodeClaim, nodeClass); drifted != "" {
return drifted, nil
}
kubeletDrifted, err := c.isKubeletConfigurationDrifted(nodeClaim, nodeClass, nodePool)
if err != nil {
return "", err
}
instance, err := c.getInstance(ctx, nodeClaim.Status.ProviderID)
if err != nil {
return "", err
Expand All @@ -59,7 +63,7 @@ func (c *CloudProvider) isNodeClassDrifted(ctx context.Context, nodeClaim *corev
if err != nil {
return "", fmt.Errorf("calculating subnet drift, %w", err)
}
drifted := lo.FindOrElse([]cloudprovider.DriftReason{amiDrifted, securitygroupDrifted, subnetDrifted}, "", func(i cloudprovider.DriftReason) bool {
drifted := lo.FindOrElse([]cloudprovider.DriftReason{amiDrifted, securitygroupDrifted, subnetDrifted, kubeletDrifted}, "", func(i cloudprovider.DriftReason) bool {
return string(i) != ""
})
return drifted, nil
Expand Down Expand Up @@ -135,6 +139,27 @@ func (c *CloudProvider) areStaticFieldsDrifted(nodeClaim *corev1.NodeClaim, node
return lo.Ternary(nodeClassHash != nodeClaimHash, NodeClassDrift, "")
}

// Remove once v1beta1 is dropped
func (c *CloudProvider) isKubeletConfigurationDrifted(nodeClaim *corev1.NodeClaim, nodeClass *providerv1.EC2NodeClass, nodePool *corev1.NodePool) (cloudprovider.DriftReason, error) {
kubeletHash, err := utils.GetHashKubelet(nodePool.Annotations[corev1.KubeletCompatabilityAnnotationKey], nodeClass)
if err != nil {
return "", err
}
nodeClaimKubeletHash, foundNodeClaimKubeletHash := nodeClaim.Annotations[providerv1.AnnotationKubeletHash]
nodeClassHashVersion, foundNodeClassHashVersion := nodeClass.Annotations[providerv1.AnnotationEC2NodeClassHashVersion]
nodeClaimHashVersion, foundNodeClaimHashVersion := nodeClaim.Annotations[providerv1.AnnotationEC2NodeClassHashVersion]

if !foundNodeClaimKubeletHash || !foundNodeClaimHashVersion || !foundNodeClassHashVersion {
return "", nil
}

// validate that the hash version for the EC2NodeClass is the same as the NodeClaim before evaluating for static drift
if nodeClassHashVersion != nodeClaimHashVersion {
return "", nil
}
return lo.Ternary(kubeletHash != nodeClaimKubeletHash, cloudprovider.DriftReason("KubeletDrifted"), ""), nil
}

func (c *CloudProvider) getInstance(ctx context.Context, providerID string) (*instance.Instance, error) {
// Get InstanceID to fetch from EC2
instanceID, err := utils.ParseInstanceID(providerID)
Expand Down
11 changes: 11 additions & 0 deletions pkg/controllers/nodeclass/hash/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
corev1 "sigs.k8s.io/karpenter/pkg/apis/v1"

providerv1 "github.com/aws/karpenter-provider-aws/pkg/apis/v1"
"github.com/aws/karpenter-provider-aws/pkg/utils"
)

type Controller struct {
Expand Down Expand Up @@ -93,6 +94,15 @@ func (c *Controller) updateNodeClaimHash(ctx context.Context, nodeClass *provide
nc := ncList.Items[i]
stored := nc.DeepCopy()

nodePool, err := utils.ResolveNodePoolFromNodeClaim(ctx, c.kubeClient, &nc)
if err != nil {
return err
}
kubeletHash, err := utils.GetHashKubelet(nodePool.Annotations[corev1.KubeletCompatabilityAnnotationKey], nodeClass)
if err != nil {
return err
}

if nc.Annotations[providerv1.AnnotationEC2NodeClassHashVersion] != providerv1.EC2NodeClassHashVersion {
nc.Annotations = lo.Assign(nc.Annotations, map[string]string{
providerv1.AnnotationEC2NodeClassHashVersion: providerv1.EC2NodeClassHashVersion,
Expand All @@ -103,6 +113,7 @@ func (c *Controller) updateNodeClaimHash(ctx context.Context, nodeClass *provide
if nc.StatusConditions().Get(corev1.ConditionTypeDrifted) == nil {
nc.Annotations = lo.Assign(nc.Annotations, map[string]string{
providerv1.AnnotationEC2NodeClassHash: nodeClass.Hash(),
providerv1.AnnotationKubeletHash: kubeletHash,
})
}

Expand Down
32 changes: 32 additions & 0 deletions pkg/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,22 @@ limitations under the License.
package utils

import (
"context"
"encoding/json"
"fmt"
"regexp"
"strings"

"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
coreapis "sigs.k8s.io/karpenter/pkg/apis"
corev1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/apis/v1beta1"

"github.com/mitchellh/hashstructure/v2"

v1 "github.com/aws/karpenter-provider-aws/pkg/apis/v1"

"github.com/aws/aws-sdk-go/aws"
Expand Down Expand Up @@ -89,6 +98,29 @@ func GetKubelet(kubeletAnnotation string, enc *v1.EC2NodeClass) (*v1.KubeletConf
return enc.Spec.Kubelet, nil
}

func GetHashKubelet(kubeletAnnotation string, enc *v1.EC2NodeClass) (string, error) {
kubelet, err := GetKubelet(kubeletAnnotation, enc)
if err != nil {
return "", err
}
return fmt.Sprint(lo.Must(hashstructure.Hash(kubelet, hashstructure.FormatV2, &hashstructure.HashOptions{
SlicesAsSets: true,
IgnoreZeroValue: true,
ZeroNil: true,
}))), nil
}

func ResolveNodePoolFromNodeClaim(ctx context.Context, kubeClient client.Client, nodeClaim *corev1.NodeClaim) (*corev1.NodePool, error) {
if nodePoolName, ok := nodeClaim.Labels[corev1.NodePoolLabelKey]; ok {
nodePool := &corev1.NodePool{}
if err := kubeClient.Get(ctx, types.NamespacedName{Name: nodePoolName}, nodePool); err != nil {
return nil, err
}
return nodePool, nil
}
return nil, errors.NewNotFound(schema.GroupResource{Group: coreapis.Group, Resource: "nodepools"}, "")
}

func updateKubeletType(kubelet *v1beta1.KubeletConfiguration) *v1.KubeletConfiguration {
resultKubelet := &v1.KubeletConfiguration{}

Expand Down

0 comments on commit 891c4a7

Please sign in to comment.