Skip to content

Commit

Permalink
chore: update kubelet hash annotation on nodeclaim
Browse files Browse the repository at this point in the history
  • Loading branch information
jigisha620 committed Jul 13, 2024
1 parent c32ccc4 commit b325ca0
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 3 deletions.
4 changes: 2 additions & 2 deletions pkg/apis/v1/ec2nodeclass.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ type EC2NodeClassSpec struct {
// +kubebuilder:validation:XValidation:message="evictionSoft OwnerKey does not have a matching evictionSoftGracePeriod",rule="has(self.evictionSoft) ? self.evictionSoft.all(e, (e in self.evictionSoftGracePeriod)):true"
// +kubebuilder:validation:XValidation:message="evictionSoftGracePeriod OwnerKey does not have a matching evictionSoft",rule="has(self.evictionSoftGracePeriod) ? self.evictionSoftGracePeriod.all(e, (e in self.evictionSoft)):true"
// +optional
Kubelet *KubeletConfiguration `json:"kubelet,omitempty"`
Kubelet *KubeletConfiguration `json:"kubelet,omitempty" hash:"ignore"`
// BlockDeviceMappings to be applied to provisioned nodes.
// +kubebuilder:validation:XValidation:message="must have only one blockDeviceMappings with rootVolume",rule="self.filter(x, has(x.rootVolume)?x.rootVolume==true:false).size() <= 1"
// +kubebuilder:validation:MaxItems:=50
Expand Down Expand Up @@ -416,7 +416,7 @@ type EC2NodeClass struct {
// 1. A field changes its default value for an existing field that is already hashed
// 2. A field is added to the hash calculation with an already-set value
// 3. A field is removed from the hash calculations
const EC2NodeClassHashVersion = "v2"
const EC2NodeClassHashVersion = "v3"

func (in *EC2NodeClass) Hash() string {
return fmt.Sprint(lo.Must(hashstructure.Hash(in.Spec, hashstructure.FormatV2, &hashstructure.HashOptions{
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/v1/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ var (
LabelInstanceAcceleratorManufacturer = apis.Group + "/instance-accelerator-manufacturer"
LabelInstanceAcceleratorCount = apis.Group + "/instance-accelerator-count"
AnnotationEC2NodeClassHash = apis.Group + "/ec2nodeclass-hash"
AnnotationKubeletHash = apis.Group + "/kubelet-drift-hash"
AnnotationEC2NodeClassHashVersion = apis.Group + "/ec2nodeclass-hash-version"
AnnotationInstanceTagged = apis.Group + "/tagged"

Expand Down
9 changes: 9 additions & 0 deletions pkg/cloudprovider/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@ func (c *CloudProvider) Create(ctx context.Context, nodeClaim *karpv1.NodeClaim)
// We treat a failure to resolve the NodeClass as an ICE since this means there is no capacity possibilities for this NodeClaim
return nil, cloudprovider.NewInsufficientCapacityError(fmt.Errorf("resolving node class, %w", err))
}
nodePool, err := utils.ResolveNodePoolFromNodeClaim(ctx, c.kubeClient, nodeClaim)
if err != nil {
return nil, err
}
kubeletHash, err := utils.GetHashKubelet(nodePool, nodeClass)
if err != nil {
return nil, err
}
nodeClassReady := nodeClass.StatusConditions().Get(status.ConditionReady)
if !nodeClassReady.IsTrue() {
return nil, fmt.Errorf("resolving ec2nodeclass, %s", nodeClassReady.Message)
Expand All @@ -106,6 +114,7 @@ func (c *CloudProvider) Create(ctx context.Context, nodeClaim *karpv1.NodeClaim)
})
nc := c.instanceToNodeClaim(instance, instanceType, nodeClass)
nc.Annotations = lo.Assign(nodeClass.Annotations, map[string]string{
v1.AnnotationKubeletHash: kubeletHash,
v1.AnnotationEC2NodeClassHash: nodeClass.Hash(),
v1.AnnotationEC2NodeClassHashVersion: v1.EC2NodeClassHashVersion,
})
Expand Down
27 changes: 26 additions & 1 deletion pkg/cloudprovider/drift.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ func (c *CloudProvider) isNodeClassDrifted(ctx context.Context, nodeClaim *karpv
if drifted := c.areStaticFieldsDrifted(nodeClaim, nodeClass); drifted != "" {
return drifted, nil
}
kubeletDrifted, err := c.isKubeletConfigurationDrifted(nodeClaim, nodeClass, nodePool)
if err != nil {
return "", err
}
instance, err := c.getInstance(ctx, nodeClaim.Status.ProviderID)
if err != nil {
return "", err
Expand All @@ -59,7 +63,7 @@ func (c *CloudProvider) isNodeClassDrifted(ctx context.Context, nodeClaim *karpv
if err != nil {
return "", fmt.Errorf("calculating subnet drift, %w", err)
}
drifted := lo.FindOrElse([]cloudprovider.DriftReason{amiDrifted, securitygroupDrifted, subnetDrifted}, "", func(i cloudprovider.DriftReason) bool {
drifted := lo.FindOrElse([]cloudprovider.DriftReason{amiDrifted, securitygroupDrifted, subnetDrifted, kubeletDrifted}, "", func(i cloudprovider.DriftReason) bool {
return string(i) != ""
})
return drifted, nil
Expand Down Expand Up @@ -135,6 +139,27 @@ func (c *CloudProvider) areStaticFieldsDrifted(nodeClaim *karpv1.NodeClaim, node
return lo.Ternary(nodeClassHash != nodeClaimHash, NodeClassDrift, "")
}

// Remove once v1beta1 is dropped
func (c *CloudProvider) isKubeletConfigurationDrifted(nodeClaim *karpv1.NodeClaim, nodeClass *v1.EC2NodeClass, nodePool *karpv1.NodePool) (cloudprovider.DriftReason, error) {
kubeletHash, err := utils.GetHashKubelet(nodePool, nodeClass)
if err != nil {
return "", err
}
nodeClaimKubeletHash, foundNodeClaimKubeletHash := nodeClaim.Annotations[v1.AnnotationKubeletHash]
nodeClassHashVersion, foundNodeClassHashVersion := nodeClass.Annotations[v1.AnnotationEC2NodeClassHashVersion]
nodeClaimHashVersion, foundNodeClaimHashVersion := nodeClaim.Annotations[v1.AnnotationEC2NodeClassHashVersion]

if !foundNodeClaimKubeletHash || !foundNodeClaimHashVersion || !foundNodeClassHashVersion {
return "", nil
}

// validate that the hash version for the EC2NodeClass is the same as the NodeClaim before evaluating for static drift
if nodeClassHashVersion != nodeClaimHashVersion {
return "", nil
}
return lo.Ternary(kubeletHash != nodeClaimKubeletHash, cloudprovider.DriftReason("KubeletDrifted"), ""), nil
}

func (c *CloudProvider) getInstance(ctx context.Context, providerID string) (*instance.Instance, error) {
// Get InstanceID to fetch from EC2
instanceID, err := utils.ParseInstanceID(providerID)
Expand Down
12 changes: 12 additions & 0 deletions pkg/controllers/nodeclass/hash/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package hash
import (
"context"

"github.com/aws/karpenter-provider-aws/pkg/utils"

"github.com/samber/lo"
"go.uber.org/multierr"
"k8s.io/apimachinery/pkg/api/equality"
Expand Down Expand Up @@ -93,6 +95,15 @@ func (c *Controller) updateNodeClaimHash(ctx context.Context, nodeClass *v1.EC2N
nc := ncList.Items[i]
stored := nc.DeepCopy()

nodePool, err := utils.ResolveNodePoolFromNodeClaim(ctx, c.kubeClient, &nc)
if err != nil {
return err
}
kubeletHash, err := utils.GetHashKubelet(nodePool, nodeClass)
if err != nil {
return err
}

if nc.Annotations[v1.AnnotationEC2NodeClassHashVersion] != v1.EC2NodeClassHashVersion {
nc.Annotations = lo.Assign(nc.Annotations, map[string]string{
v1.AnnotationEC2NodeClassHashVersion: v1.EC2NodeClassHashVersion,
Expand All @@ -103,6 +114,7 @@ func (c *Controller) updateNodeClaimHash(ctx context.Context, nodeClass *v1.EC2N
if nc.StatusConditions().Get(karpv1.ConditionTypeDrifted) == nil {
nc.Annotations = lo.Assign(nc.Annotations, map[string]string{
v1.AnnotationEC2NodeClassHash: nodeClass.Hash(),
v1.AnnotationKubeletHash: kubeletHash,
})
}

Expand Down
38 changes: 38 additions & 0 deletions pkg/controllers/nodeclass/hash/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,44 @@ var _ = Describe("NodeClass Hash Controller", func() {
Entry("MetadataOptions Drift", &v1.EC2NodeClass{Spec: v1.EC2NodeClassSpec{MetadataOptions: &v1.MetadataOptions{HTTPEndpoint: aws.String("disabled")}}}),
Entry("Context Drift", &v1.EC2NodeClass{Spec: v1.EC2NodeClassSpec{Context: aws.String("context-2")}}),
)
It("should update nodeClaim annotation kubelet hash when kubelet configuration on ec2nodeClass updates", func() {
nodePool := coretest.NodePool(karpv1.NodePool{
Spec: karpv1.NodePoolSpec{
Template: karpv1.NodeClaimTemplate{
Spec: karpv1.NodeClaimSpec{
NodeClassRef: &karpv1.NodeClassReference{
Group: object.GVK(nodeClass).Group,
Kind: object.GVK(nodeClass).Kind,
Name: nodeClass.Name,
},
},
},
},
})
nodeClaim := coretest.NodeClaim(karpv1.NodeClaim{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{karpv1.NodePoolLabelKey: nodePool.Name},
Annotations: map[string]string{
v1.AnnotationEC2NodeClassHash: "123456",
v1.AnnotationEC2NodeClassHashVersion: "test",
v1.AnnotationKubeletHash: "123456",
},
},
Spec: karpv1.NodeClaimSpec{
NodeClassRef: &karpv1.NodeClassReference{
Group: object.GVK(nodeClass).Group,
Kind: object.GVK(nodeClass).Kind,
Name: nodeClass.Name,
},
},
})
ExpectApplied(ctx, env.Client, nodeClass, nodeClaim, nodePool)
hashBefore := nodeClaim.Annotations[v1.AnnotationKubeletHash]

ExpectObjectReconciled(ctx, env.Client, hashController, nodeClass)
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
Expect(nodeClaim.Annotations[v1.AnnotationKubeletHash]).ToNot(Equal(hashBefore))
})
It("should not update the drift hash when dynamic field is updated", func() {
ExpectApplied(ctx, env.Client, nodeClass)
ExpectObjectReconciled(ctx, env.Client, hashController, nodeClass)
Expand Down
31 changes: 31 additions & 0 deletions pkg/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,22 @@ limitations under the License.
package utils

import (
"context"
"encoding/json"
"fmt"
"regexp"
"strings"

"github.com/mitchellh/hashstructure/v2"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
coreapis "sigs.k8s.io/karpenter/pkg/apis"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/samber/lo"
"sigs.k8s.io/controller-runtime/pkg/client"
karpv1 "sigs.k8s.io/karpenter/pkg/apis/v1"
karpv1beta1 "sigs.k8s.io/karpenter/pkg/apis/v1beta1"

Expand Down Expand Up @@ -111,3 +119,26 @@ func parseKubeletConfiguration(annotation string) (*v1.KubeletConfiguration, err
CPUCFSQuota: kubelet.CPUCFSQuota,
}, nil
}

func GetHashKubelet(nodePool *karpv1.NodePool, enc *v1.EC2NodeClass) (string, error) {
kubelet, err := GetKubletConfigurationWithNodePool(nodePool, enc)
if err != nil {
return "", err
}
return fmt.Sprint(lo.Must(hashstructure.Hash(kubelet, hashstructure.FormatV2, &hashstructure.HashOptions{
SlicesAsSets: true,
IgnoreZeroValue: true,
ZeroNil: true,
}))), nil
}

func ResolveNodePoolFromNodeClaim(ctx context.Context, kubeClient client.Client, nodeClaim *karpv1.NodeClaim) (*karpv1.NodePool, error) {
if nodePoolName, ok := nodeClaim.Labels[karpv1.NodePoolLabelKey]; ok {
nodePool := &karpv1.NodePool{}
if err := kubeClient.Get(ctx, types.NamespacedName{Name: nodePoolName}, nodePool); err != nil {
return nil, err
}
return nodePool, nil
}
return nil, errors.NewNotFound(schema.GroupResource{Group: coreapis.Group, Resource: "nodepools"}, "")
}

0 comments on commit b325ca0

Please sign in to comment.