diff --git a/cmd/controller/main.go b/cmd/controller/main.go index 9d370a42694d..526b8c64f0d9 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -68,6 +68,7 @@ func main() { op.AMIProvider, op.LaunchTemplateProvider, op.InstanceTypesProvider, + op.CapacityReservationProvider, )...). WithWebhooks(ctx, webhooks.NewWebhooks()...). Start(ctx) diff --git a/pkg/apis/crds/karpenter.k8s.aws_ec2nodeclasses.yaml b/pkg/apis/crds/karpenter.k8s.aws_ec2nodeclasses.yaml index 6d4effd1b278..a2a3eb659fbd 100644 --- a/pkg/apis/crds/karpenter.k8s.aws_ec2nodeclasses.yaml +++ b/pkg/apis/crds/karpenter.k8s.aws_ec2nodeclasses.yaml @@ -216,6 +216,59 @@ spec: - message: must have only one blockDeviceMappings with rootVolume rule: self.filter(x, has(x.rootVolume)?x.rootVolume==true:false).size() <= 1 + capacityReservationSelectorTerms: + description: CapacityReservationSelectorTerms is a list of or Capacity + Reservation selector terms. The terms are ORed. + items: + description: |- + CapacityReservationSelectorTerm defines selection logic for a Capacity Reservation used by Karpenter to launch nodes. + If multiple fields are used for selection, the requirements are ANDed. + properties: + availabilityZone: + description: The Availability Zone of the Capacity Reservation + type: string + id: + description: The platform of operating system for which the + Capacity Reservation reserves capacity + type: string + instanceType: + description: The type of operating system for which the Capacity + Reservation reserves capacity + type: string + ownerId: + description: The ID of the Amazon Web Services account that + owns the Capacity Reservation + type: string + tags: + additionalProperties: + type: string + description: |- + Tags is a map of key/value tags used to select subnets + Specifying '*' for a value selects all values for a given tag key. + maxProperties: 20 + type: object + x-kubernetes-validations: + - message: empty tag keys or values aren't supported + rule: self.all(k, k != '' && self[k] != '') + type: + description: |- + Indicates the type of instance launches that the Capacity Reservation accepts. The options include: + - open: + The Capacity Reservation accepts all instances that have + matching attributes (instance type, platform, and Availability + Zone). Instances that have matching attributes launch into the + Capacity Reservation automatically without specifying any + additional parameters. + - targeted: + The Capacity Reservation only accepts instances that + have matching attributes (instance type, platform, and + Availability Zone), and explicitly target the Capacity + Reservation. This ensures that only permitted instances can use + the reserved capacity. + type: string + type: object + maxItems: 30 + type: array context: description: |- Context is a Reserved field in EC2 APIs @@ -510,6 +563,88 @@ spec: - requirements type: object type: array + capacityReservations: + description: |- + CapacityReservations contains the current Capacity Reservations values that are available to the + cluster under the CapacityReservations selectors. + items: + description: CapacityReservation contains resolved Capacity Reservation + selector values utilized for node launch + properties: + availabilityZone: + description: AvailabilityZone of the Capacity Reservation + type: string + availableInstanceCount: + description: Available Instance Count of the Capacity Reservation + type: integer + endDate: + description: |- + The date and time at which the Capacity Reservation expires. When a Capacity + Reservation expires, the reserved capacity is released and you can no longer + launch instances into it. The Capacity Reservation's state changes to expired + when it reaches its end date and time. + type: string + endDateType: + description: |- + Indicates the way in which the Capacity Reservation ends. A Capacity Reservation + can have one of the following end types: + + + * unlimited - The Capacity Reservation remains active until you explicitly + cancel it. + + + * limited - The Capacity Reservation expires automatically at a specified + date and time. + type: string + id: + description: ID of the Capacity Reservation + type: string + instanceMatchCriteria: + description: |- + Indicates the type of instance launches that the Capacity Reservation accepts. The options include: + - open: + The Capacity Reservation accepts all instances that have + matching attributes (instance type, platform, and Availability + Zone). Instances that have matching attributes launch into the + Capacity Reservation automatically without specifying any + additional parameters. + - targeted: + The Capacity Reservation only accepts instances that + have matching attributes (instance type, platform, and + Availability Zone), and explicitly target the Capacity + Reservation. This ensures that only permitted instances can use + the reserved capacity. + type: string + instancePlatform: + description: Instance Platform of the Capacity Reservation + type: string + instanceType: + description: Instance Type of the Capacity Reservation + type: string + ownerId: + description: Owner Id of the Capacity Reservation + type: string + startDate: + description: The date and time at which the Capacity Reservation + was started. + type: string + totalInstanceCount: + description: Total Instance Count of the Capacity Reservation + type: integer + required: + - availabilityZone + - availableInstanceCount + - endDateType + - id + - instanceMatchCriteria + - instancePlatform + - instanceType + - ownerId + - startDate + - totalInstanceCount + type: object + type: array conditions: description: Conditions contains signals for health and readiness items: @@ -819,6 +954,59 @@ spec: - message: must have only one blockDeviceMappings with rootVolume rule: self.filter(x, has(x.rootVolume)?x.rootVolume==true:false).size() <= 1 + capacityReservationSelectorTerms: + description: CapacityReservationSelectorTerms is a list of or Capacity + Reservation selector terms. The terms are ORed. + items: + description: |- + CapacityReservationSelectorTerm defines selection logic for a Capacity Reservation used by Karpenter to launch nodes. + If multiple fields are used for selection, the requirements are ANDed. + properties: + availabilityZone: + description: The Availability Zone of the Capacity Reservation + type: string + id: + description: The platform of operating system for which the + Capacity Reservation reserves capacity + type: string + instanceType: + description: The type of operating system for which the Capacity + Reservation reserves capacity + type: string + ownerId: + description: The ID of the Amazon Web Services account that + owns the Capacity Reservation + type: string + tags: + additionalProperties: + type: string + description: |- + Tags is a map of key/value tags used to select subnets + Specifying '*' for a value selects all values for a given tag key. + maxProperties: 20 + type: object + x-kubernetes-validations: + - message: empty tag keys or values aren't supported + rule: self.all(k, k != '' && self[k] != '') + type: + description: |- + Indicates the type of instance launches that the Capacity Reservation accepts. The options include: + - open: + The Capacity Reservation accepts all instances that have + matching attributes (instance type, platform, and Availability + Zone). Instances that have matching attributes launch into the + Capacity Reservation automatically without specifying any + additional parameters. + - targeted: + The Capacity Reservation only accepts instances that + have matching attributes (instance type, platform, and + Availability Zone), and explicitly target the Capacity + Reservation. This ensures that only permitted instances can use + the reserved capacity. + type: string + type: object + maxItems: 30 + type: array context: description: |- Context is a Reserved field in EC2 APIs @@ -1113,6 +1301,88 @@ spec: - requirements type: object type: array + capacityReservations: + description: |- + CapacityReservations contains the current Capacity Reservations values that are available to the + cluster under the CapacityReservations selectors. + items: + description: CapacityReservation contains resolved Capacity Reservation + selector values utilized for node launch + properties: + availabilityZone: + description: AvailabilityZone of the Capacity Reservation + type: string + availableInstanceCount: + description: Available Instance Count of the Capacity Reservation + type: integer + endDate: + description: |- + The date and time at which the Capacity Reservation expires. When a Capacity + Reservation expires, the reserved capacity is released and you can no longer + launch instances into it. The Capacity Reservation's state changes to expired + when it reaches its end date and time. + type: string + endDateType: + description: |- + Indicates the way in which the Capacity Reservation ends. A Capacity Reservation + can have one of the following end types: + + + * unlimited - The Capacity Reservation remains active until you explicitly + cancel it. + + + * limited - The Capacity Reservation expires automatically at a specified + date and time. + type: string + id: + description: ID of the Capacity Reservation + type: string + instanceMatchCriteria: + description: |- + Indicates the type of instance launches that the Capacity Reservation accepts. The options include: + - open: + The Capacity Reservation accepts all instances that have + matching attributes (instance type, platform, and Availability + Zone). Instances that have matching attributes launch into the + Capacity Reservation automatically without specifying any + additional parameters. + - targeted: + The Capacity Reservation only accepts instances that + have matching attributes (instance type, platform, and + Availability Zone), and explicitly target the Capacity + Reservation. This ensures that only permitted instances can use + the reserved capacity. + type: string + instancePlatform: + description: Instance Platform of the Capacity Reservation + type: string + instanceType: + description: Instance Type of the Capacity Reservation + type: string + ownerId: + description: Owner Id of the Capacity Reservation + type: string + startDate: + description: The date and time at which the Capacity Reservation + was started. + type: string + totalInstanceCount: + description: Total Instance Count of the Capacity Reservation + type: integer + required: + - availabilityZone + - availableInstanceCount + - endDateType + - id + - instanceMatchCriteria + - instancePlatform + - instanceType + - ownerId + - startDate + - totalInstanceCount + type: object + type: array conditions: description: Conditions contains signals for health and readiness items: diff --git a/pkg/apis/v1/ec2nodeclass.go b/pkg/apis/v1/ec2nodeclass.go index 2a114d21e82a..9785019c6781 100644 --- a/pkg/apis/v1/ec2nodeclass.go +++ b/pkg/apis/v1/ec2nodeclass.go @@ -55,6 +55,10 @@ type EC2NodeClassSpec struct { // +kubebuilder:validation:Enum:={AL2,AL2023,Bottlerocket,Ubuntu,Custom,Windows2019,Windows2022} // +required AMIFamily *string `json:"amiFamily"` + // CapacityReservationSelectorTerms is a list of or Capacity Reservation selector terms. The terms are ORed. + // +kubebuilder:validation:MaxItems:=30 + // +optional + CapacityReservationSelectorTerms []CapacityReservationSelectorTerm `json:"capacityReservationSelectorTerms,omitempty" hash:"ignore"` // UserData to be applied to the provisioned nodes. // It must be in the appropriate format based on the AMIFamily in use. Karpenter will merge certain fields into // this UserData to ensure nodes are being provisioned with the correct configuration. @@ -175,6 +179,44 @@ type AMISelectorTerm struct { Owner string `json:"owner,omitempty"` } +// CapacityReservationSelectorTerm defines selection logic for a Capacity Reservation used by Karpenter to launch nodes. +// If multiple fields are used for selection, the requirements are ANDed. +type CapacityReservationSelectorTerm struct { + // The Availability Zone of the Capacity Reservation + // +optional + AvailabilityZone string `json:"availabilityZone,omitempty"` + // The platform of operating system for which the Capacity Reservation reserves capacity + // +optional + ID string `json:"id,omitempty"` + // The type of operating system for which the Capacity Reservation reserves capacity + // +optional + InstanceType string `json:"instanceType,omitempty"` + // Tags is a map of key/value tags used to select subnets + // Specifying '*' for a value selects all values for a given tag key. + // +kubebuilder:validation:XValidation:message="empty tag keys or values aren't supported",rule="self.all(k, k != '' && self[k] != '')" + // +kubebuilder:validation:MaxProperties:=20 + // +optional + Tags map[string]string `json:"tags,omitempty"` + // Indicates the type of instance launches that the Capacity Reservation accepts. The options include: + // - open: + // The Capacity Reservation accepts all instances that have + // matching attributes (instance type, platform, and Availability + // Zone). Instances that have matching attributes launch into the + // Capacity Reservation automatically without specifying any + // additional parameters. + // - targeted: + // The Capacity Reservation only accepts instances that + // have matching attributes (instance type, platform, and + // Availability Zone), and explicitly target the Capacity + // Reservation. This ensures that only permitted instances can use + // the reserved capacity. + // +optional + Type string `json:"type,omitempty"` + // The ID of the Amazon Web Services account that owns the Capacity Reservation + // +optional + OwnerID string `json:"ownerId,omitempty"` +} + // MetadataOptions contains parameters for specifying the exposure of the // Instance Metadata Service to provisioned EC2 nodes. type MetadataOptions struct { diff --git a/pkg/apis/v1/ec2nodeclass_status.go b/pkg/apis/v1/ec2nodeclass_status.go index 6b551971ca08..737bfa3129df 100644 --- a/pkg/apis/v1/ec2nodeclass_status.go +++ b/pkg/apis/v1/ec2nodeclass_status.go @@ -55,8 +55,71 @@ type AMI struct { Requirements []v1.NodeSelectorRequirement `json:"requirements"` } +// CapacityReservation contains resolved Capacity Reservation selector values utilized for node launch +type CapacityReservation struct { + // ID of the Capacity Reservation + // +required + ID string `json:"id"` + // AvailabilityZone of the Capacity Reservation + // +required + AvailabilityZone string `json:"availabilityZone"` + // Available Instance Count of the Capacity Reservation + // +required + AvailableInstanceCount int `json:"availableInstanceCount"` + // The date and time at which the Capacity Reservation expires. When a Capacity + // Reservation expires, the reserved capacity is released and you can no longer + // launch instances into it. The Capacity Reservation's state changes to expired + // when it reaches its end date and time. + // +optional + EndDate *string `json:"endDate,omitempty"` + // Indicates the way in which the Capacity Reservation ends. A Capacity Reservation + // can have one of the following end types: + // + // * unlimited - The Capacity Reservation remains active until you explicitly + // cancel it. + // + // * limited - The Capacity Reservation expires automatically at a specified + // date and time. + // +required + EndDateType string `json:"endDateType"` + // Indicates the type of instance launches that the Capacity Reservation accepts. The options include: + // - open: + // The Capacity Reservation accepts all instances that have + // matching attributes (instance type, platform, and Availability + // Zone). Instances that have matching attributes launch into the + // Capacity Reservation automatically without specifying any + // additional parameters. + // - targeted: + // The Capacity Reservation only accepts instances that + // have matching attributes (instance type, platform, and + // Availability Zone), and explicitly target the Capacity + // Reservation. This ensures that only permitted instances can use + // the reserved capacity. + // +required + InstanceMatchCriteria string `json:"instanceMatchCriteria"` + // Instance Platform of the Capacity Reservation + // +required + InstancePlatform string `json:"instancePlatform"` + // Instance Type of the Capacity Reservation + // +required + InstanceType string `json:"instanceType"` + // Owner Id of the Capacity Reservation + // +required + OwnerID string `json:"ownerId"` + // The date and time at which the Capacity Reservation was started. + // +required + StartDate string `json:"startDate"` + // Total Instance Count of the Capacity Reservation + // +required + TotalInstanceCount int `json:"totalInstanceCount"` +} + // EC2NodeClassStatus contains the resolved state of the EC2NodeClass type EC2NodeClassStatus struct { + // CapacityReservations contains the current Capacity Reservations values that are available to the + // cluster under the CapacityReservations selectors. + // +optional + CapacityReservations []CapacityReservation `json:"capacityReservations,omitempty"` // Subnets contains the current Subnet values that are available to the // cluster under the subnet selectors. // +optional diff --git a/pkg/apis/v1/labels.go b/pkg/apis/v1/labels.go index 9d83b5325b59..d12ca735df55 100644 --- a/pkg/apis/v1/labels.go +++ b/pkg/apis/v1/labels.go @@ -27,6 +27,7 @@ import ( func init() { v1beta1.RestrictedLabelDomains = v1beta1.RestrictedLabelDomains.Insert(RestrictedLabelDomains...) v1beta1.WellKnownLabels = v1beta1.WellKnownLabels.Insert( + LabelCapactiyReservationID, LabelInstanceHypervisor, LabelInstanceEncryptionInTransitSupported, LabelInstanceCategory, @@ -119,6 +120,7 @@ var ( AnnotationEC2NodeClassHash = Group + "/ec2nodeclass-hash" AnnotationEC2NodeClassHashVersion = Group + "/ec2nodeclass-hash-version" AnnotationInstanceTagged = Group + "/tagged" + LabelCapactiyReservationID = Group + "/capacity-reservation-id" TagNodeClaim = v1beta1.Group + "/nodeclaim" TagManagedLaunchTemplate = Group + "/cluster" diff --git a/pkg/apis/v1/zz_generated.deepcopy.go b/pkg/apis/v1/zz_generated.deepcopy.go index 07709df56478..cafb5757edf4 100644 --- a/pkg/apis/v1/zz_generated.deepcopy.go +++ b/pkg/apis/v1/zz_generated.deepcopy.go @@ -148,6 +148,48 @@ func (in *BlockDeviceMapping) DeepCopy() *BlockDeviceMapping { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *CapacityReservation) DeepCopyInto(out *CapacityReservation) { + *out = *in + if in.EndDate != nil { + in, out := &in.EndDate, &out.EndDate + *out = new(string) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new CapacityReservation. +func (in *CapacityReservation) DeepCopy() *CapacityReservation { + if in == nil { + return nil + } + out := new(CapacityReservation) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *CapacityReservationSelectorTerm) DeepCopyInto(out *CapacityReservationSelectorTerm) { + *out = *in + if in.Tags != nil { + in, out := &in.Tags, &out.Tags + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new CapacityReservationSelectorTerm. +func (in *CapacityReservationSelectorTerm) DeepCopy() *CapacityReservationSelectorTerm { + if in == nil { + return nil + } + out := new(CapacityReservationSelectorTerm) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *EC2NodeClass) DeepCopyInto(out *EC2NodeClass) { *out = *in @@ -241,6 +283,13 @@ func (in *EC2NodeClassSpec) DeepCopyInto(out *EC2NodeClassSpec) { *out = new(string) **out = **in } + if in.CapacityReservationSelectorTerms != nil { + in, out := &in.CapacityReservationSelectorTerms, &out.CapacityReservationSelectorTerms + *out = make([]CapacityReservationSelectorTerm, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } if in.UserData != nil { in, out := &in.UserData, &out.UserData *out = new(string) @@ -304,6 +353,13 @@ func (in *EC2NodeClassSpec) DeepCopy() *EC2NodeClassSpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *EC2NodeClassStatus) DeepCopyInto(out *EC2NodeClassStatus) { *out = *in + if in.CapacityReservations != nil { + in, out := &in.CapacityReservations, &out.CapacityReservations + *out = make([]CapacityReservation, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } if in.Subnets != nil { in, out := &in.Subnets, &out.Subnets *out = make([]Subnet, len(*in)) diff --git a/pkg/apis/v1beta1/ec2nodeclass.go b/pkg/apis/v1beta1/ec2nodeclass.go index b3395dd3a60d..b48a575d217f 100644 --- a/pkg/apis/v1beta1/ec2nodeclass.go +++ b/pkg/apis/v1beta1/ec2nodeclass.go @@ -55,6 +55,10 @@ type EC2NodeClassSpec struct { // +kubebuilder:validation:Enum:={AL2,AL2023,Bottlerocket,Ubuntu,Custom,Windows2019,Windows2022} // +required AMIFamily *string `json:"amiFamily"` + // CapacityReservationSelectorTerms is a list of or Capacity Reservation selector terms. The terms are ORed. + // +kubebuilder:validation:MaxItems:=30 + // +optional + CapacityReservationSelectorTerms []CapacityReservationSelectorTerm `json:"capacityReservationSelectorTerms,omitempty" hash:"ignore"` // UserData to be applied to the provisioned nodes. // It must be in the appropriate format based on the AMIFamily in use. Karpenter will merge certain fields into // this UserData to ensure nodes are being provisioned with the correct configuration. @@ -175,6 +179,44 @@ type AMISelectorTerm struct { Owner string `json:"owner,omitempty"` } +// CapacityReservationSelectorTerm defines selection logic for a Capacity Reservation used by Karpenter to launch nodes. +// If multiple fields are used for selection, the requirements are ANDed. +type CapacityReservationSelectorTerm struct { + // The Availability Zone of the Capacity Reservation + // +optional + AvailabilityZone string `json:"availabilityZone,omitempty"` + // The platform of operating system for which the Capacity Reservation reserves capacity + // +optional + ID string `json:"id,omitempty"` + // The type of operating system for which the Capacity Reservation reserves capacity + // +optional + InstanceType string `json:"instanceType,omitempty"` + // Tags is a map of key/value tags used to select subnets + // Specifying '*' for a value selects all values for a given tag key. + // +kubebuilder:validation:XValidation:message="empty tag keys or values aren't supported",rule="self.all(k, k != '' && self[k] != '')" + // +kubebuilder:validation:MaxProperties:=20 + // +optional + Tags map[string]string `json:"tags,omitempty"` + // Indicates the type of instance launches that the Capacity Reservation accepts. The options include: + // - open: + // The Capacity Reservation accepts all instances that have + // matching attributes (instance type, platform, and Availability + // Zone). Instances that have matching attributes launch into the + // Capacity Reservation automatically without specifying any + // additional parameters. + // - targeted: + // The Capacity Reservation only accepts instances that + // have matching attributes (instance type, platform, and + // Availability Zone), and explicitly target the Capacity + // Reservation. This ensures that only permitted instances can use + // the reserved capacity. + // +optional + Type string `json:"type,omitempty"` + // The ID of the Amazon Web Services account that owns the Capacity Reservation + // +optional + OwnerID string `json:"ownerId,omitempty"` +} + // MetadataOptions contains parameters for specifying the exposure of the // Instance Metadata Service to provisioned EC2 nodes. type MetadataOptions struct { diff --git a/pkg/apis/v1beta1/ec2nodeclass_status.go b/pkg/apis/v1beta1/ec2nodeclass_status.go index 9510e5b0567b..664c500f581f 100644 --- a/pkg/apis/v1beta1/ec2nodeclass_status.go +++ b/pkg/apis/v1beta1/ec2nodeclass_status.go @@ -42,6 +42,65 @@ type SecurityGroup struct { Name string `json:"name,omitempty"` } +// CapacityReservation contains resolved Capacity Reservation selector values utilized for node launch +type CapacityReservation struct { + // ID of the Capacity Reservation + // +required + ID string `json:"id"` + // AvailabilityZone of the Capacity Reservation + // +required + AvailabilityZone string `json:"availabilityZone"` + // Available Instance Count of the Capacity Reservation + // +required + AvailableInstanceCount int `json:"availableInstanceCount"` + // The date and time at which the Capacity Reservation expires. When a Capacity + // Reservation expires, the reserved capacity is released and you can no longer + // launch instances into it. The Capacity Reservation's state changes to expired + // when it reaches its end date and time. + // +optional + EndDate *string `json:"endDate,omitempty"` + // Indicates the way in which the Capacity Reservation ends. A Capacity Reservation + // can have one of the following end types: + // + // * unlimited - The Capacity Reservation remains active until you explicitly + // cancel it. + // + // * limited - The Capacity Reservation expires automatically at a specified + // date and time. + // +required + EndDateType string `json:"endDateType"` + // Indicates the type of instance launches that the Capacity Reservation accepts. The options include: + // - open: + // The Capacity Reservation accepts all instances that have + // matching attributes (instance type, platform, and Availability + // Zone). Instances that have matching attributes launch into the + // Capacity Reservation automatically without specifying any + // additional parameters. + // - targeted: + // The Capacity Reservation only accepts instances that + // have matching attributes (instance type, platform, and + // Availability Zone), and explicitly target the Capacity + // Reservation. This ensures that only permitted instances can use + // the reserved capacity. + // +required + InstanceMatchCriteria string `json:"instanceMatchCriteria"` + // Instance Platform of the Capacity Reservation + // +required + InstancePlatform string `json:"instancePlatform"` + // Instance Type of the Capacity Reservation + // +required + InstanceType string `json:"instanceType"` + // Owner Id of the Capacity Reservation + // +required + OwnerID string `json:"ownerId"` + // The date and time at which the Capacity Reservation was started. + // +required + StartDate string `json:"startDate"` + // Total Instance Count of the Capacity Reservation + // +required + TotalInstanceCount int `json:"totalInstanceCount"` +} + // AMI contains resolved AMI selector values utilized for node launch type AMI struct { // ID of the AMI @@ -57,6 +116,10 @@ type AMI struct { // EC2NodeClassStatus contains the resolved state of the EC2NodeClass type EC2NodeClassStatus struct { + // CapacityReservations contains the current Capacity Reservations values that are available to the + // cluster under the CapacityReservations selectors. + // +optional + CapacityReservations []CapacityReservation `json:"capacityReservations,omitempty"` // Subnets contains the current Subnet values that are available to the // cluster under the subnet selectors. // +optional diff --git a/pkg/apis/v1beta1/ec2nodeclass_validation.go b/pkg/apis/v1beta1/ec2nodeclass_validation.go index 4fc18527f00d..6f9c82de1e02 100644 --- a/pkg/apis/v1beta1/ec2nodeclass_validation.go +++ b/pkg/apis/v1beta1/ec2nodeclass_validation.go @@ -26,15 +26,16 @@ import ( ) const ( - subnetSelectorTermsPath = "subnetSelectorTerms" - securityGroupSelectorTermsPath = "securityGroupSelectorTerms" - amiSelectorTermsPath = "amiSelectorTerms" - amiFamilyPath = "amiFamily" - tagsPath = "tags" - metadataOptionsPath = "metadataOptions" - blockDeviceMappingsPath = "blockDeviceMappings" - rolePath = "role" - instanceProfilePath = "instanceProfile" + subnetSelectorTermsPath = "subnetSelectorTerms" + securityGroupSelectorTermsPath = "securityGroupSelectorTerms" + amiSelectorTermsPath = "amiSelectorTerms" + amiFamilyPath = "amiFamily" + tagsPath = "tags" + metadataOptionsPath = "metadataOptions" + blockDeviceMappingsPath = "blockDeviceMappings" + rolePath = "role" + instanceProfilePath = "instanceProfile" + capacityReservationSelectorTermsPath = "capacityReservationSelectorTerms" ) var ( @@ -81,6 +82,7 @@ func (in *EC2NodeClassSpec) validate(_ context.Context) (errs *apis.FieldError) in.validateAMIFamily().ViaField(amiFamilyPath), in.validateBlockDeviceMappings().ViaField(blockDeviceMappingsPath), in.validateTags().ViaField(tagsPath), + in.validateCapacityReservationSelectorTerms().ViaField(capacityReservationSelectorTermsPath), ) } @@ -127,6 +129,27 @@ func (in *SecurityGroupSelectorTerm) validate() (errs *apis.FieldError) { return errs } +func (in *EC2NodeClassSpec) validateCapacityReservationSelectorTerms() (errs *apis.FieldError) { + if len(in.CapacityReservationSelectorTerms) == 0 { + errs = errs.Also(apis.ErrMissingOneOf()) + } + for _, term := range in.CapacityReservationSelectorTerms { + errs = errs.Also(term.validate()) + } + return errs +} + +//nolint:gocyclo +func (in *CapacityReservationSelectorTerm) validate() (errs *apis.FieldError) { + errs = errs.Also(validateTags(in.Tags).ViaField("tags")) + if len(in.Tags) == 0 && in.ID == "" && in.InstanceType == "" && in.Type == "" && in.OwnerID == "" { + errs = errs.Also(apis.ErrGeneric("expect at least one, got none", "tags", "id", "name", "instanceType", "type", "ownerId")) + } else if in.ID != "" && (len(in.Tags) > 0 || in.InstanceType != "" || in.Type != "" || in.OwnerID != "") { + errs = errs.Also(apis.ErrGeneric(`"id" is mutually exclusive, cannot be set with a combination of other fields in`)) + } + return errs +} + func (in *EC2NodeClassSpec) validateAMISelectorTerms() (errs *apis.FieldError) { for _, term := range in.AMISelectorTerms { errs = errs.Also(term.validate()) diff --git a/pkg/apis/v1beta1/labels.go b/pkg/apis/v1beta1/labels.go index ac72ab48b908..78011557039a 100644 --- a/pkg/apis/v1beta1/labels.go +++ b/pkg/apis/v1beta1/labels.go @@ -27,6 +27,7 @@ import ( func init() { v1beta1.RestrictedLabelDomains = v1beta1.RestrictedLabelDomains.Insert(RestrictedLabelDomains...) v1beta1.WellKnownLabels = v1beta1.WellKnownLabels.Insert( + LabelCapactiyReservationID, LabelInstanceHypervisor, LabelInstanceEncryptionInTransitSupported, LabelInstanceCategory, @@ -119,6 +120,7 @@ var ( AnnotationEC2NodeClassHash = Group + "/ec2nodeclass-hash" AnnotationEC2NodeClassHashVersion = Group + "/ec2nodeclass-hash-version" AnnotationInstanceTagged = Group + "/tagged" + LabelCapactiyReservationID = Group + "/capacity-reservation-id" TagNodeClaim = v1beta1.Group + "/nodeclaim" TagManagedLaunchTemplate = Group + "/cluster" diff --git a/pkg/apis/v1beta1/zz_generated.deepcopy.go b/pkg/apis/v1beta1/zz_generated.deepcopy.go index f248b480be5b..bbd2160de3d0 100644 --- a/pkg/apis/v1beta1/zz_generated.deepcopy.go +++ b/pkg/apis/v1beta1/zz_generated.deepcopy.go @@ -148,6 +148,48 @@ func (in *BlockDeviceMapping) DeepCopy() *BlockDeviceMapping { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *CapacityReservation) DeepCopyInto(out *CapacityReservation) { + *out = *in + if in.EndDate != nil { + in, out := &in.EndDate, &out.EndDate + *out = new(string) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new CapacityReservation. +func (in *CapacityReservation) DeepCopy() *CapacityReservation { + if in == nil { + return nil + } + out := new(CapacityReservation) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *CapacityReservationSelectorTerm) DeepCopyInto(out *CapacityReservationSelectorTerm) { + *out = *in + if in.Tags != nil { + in, out := &in.Tags, &out.Tags + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new CapacityReservationSelectorTerm. +func (in *CapacityReservationSelectorTerm) DeepCopy() *CapacityReservationSelectorTerm { + if in == nil { + return nil + } + out := new(CapacityReservationSelectorTerm) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *EC2NodeClass) DeepCopyInto(out *EC2NodeClass) { *out = *in @@ -241,6 +283,13 @@ func (in *EC2NodeClassSpec) DeepCopyInto(out *EC2NodeClassSpec) { *out = new(string) **out = **in } + if in.CapacityReservationSelectorTerms != nil { + in, out := &in.CapacityReservationSelectorTerms, &out.CapacityReservationSelectorTerms + *out = make([]CapacityReservationSelectorTerm, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } if in.UserData != nil { in, out := &in.UserData, &out.UserData *out = new(string) @@ -304,6 +353,13 @@ func (in *EC2NodeClassSpec) DeepCopy() *EC2NodeClassSpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *EC2NodeClassStatus) DeepCopyInto(out *EC2NodeClassStatus) { *out = *in + if in.CapacityReservations != nil { + in, out := &in.CapacityReservations, &out.CapacityReservations + *out = make([]CapacityReservation, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } if in.Subnets != nil { in, out := &in.Subnets, &out.Subnets *out = make([]Subnet, len(*in)) diff --git a/pkg/cloudprovider/cloudprovider.go b/pkg/cloudprovider/cloudprovider.go index a4c0c83fff16..573b39e53b86 100644 --- a/pkg/cloudprovider/cloudprovider.go +++ b/pkg/cloudprovider/cloudprovider.go @@ -77,6 +77,7 @@ func New(instanceTypeProvider instancetype.Provider, instanceProvider instance.P // Create a NodeClaim given the constraints. func (c *CloudProvider) Create(ctx context.Context, nodeClaim *corev1beta1.NodeClaim) (*corev1beta1.NodeClaim, error) { + log.FromContext(ctx).WithValues("nodeClaim", nodeClaim).V(0).Info("Create nodeClaim") nodeClass, err := c.resolveNodeClassFromNodeClaim(ctx, nodeClaim) if err != nil { if errors.IsNotFound(err) { @@ -103,6 +104,7 @@ func (c *CloudProvider) Create(ctx context.Context, nodeClaim *corev1beta1.NodeC instanceType, _ := lo.Find(instanceTypes, func(i *cloudprovider.InstanceType) bool { return i.Name == instance.Type }) + nc := c.instanceToNodeClaim(instance, instanceType, nodeClass) nc.Annotations = lo.Assign(nodeClass.Annotations, map[string]string{ v1beta1.AnnotationEC2NodeClassHash: nodeClass.Hash(), @@ -348,6 +350,11 @@ func (c *CloudProvider) instanceToNodeClaim(i *instance.Instance, instanceType * if v, ok := i.Tags[corev1beta1.ManagedByAnnotationKey]; ok { annotations[corev1beta1.ManagedByAnnotationKey] = v } + + if i.CapacityReservationID != nil { + labels[v1beta1.LabelCapactiyReservationID] = *i.CapacityReservationID + } + nodeClaim.Labels = labels nodeClaim.Annotations = annotations nodeClaim.CreationTimestamp = metav1.Time{Time: i.LaunchTime} @@ -357,6 +364,7 @@ func (c *CloudProvider) instanceToNodeClaim(i *instance.Instance, instanceType * } nodeClaim.Status.ProviderID = fmt.Sprintf("aws:///%s/%s", i.Zone, i.ID) nodeClaim.Status.ImageID = i.ImageID + return nodeClaim } diff --git a/pkg/cloudprovider/drift.go b/pkg/cloudprovider/drift.go index 5c87fdb62da9..7c85fbbee58f 100644 --- a/pkg/cloudprovider/drift.go +++ b/pkg/cloudprovider/drift.go @@ -32,10 +32,11 @@ import ( ) const ( - AMIDrift cloudprovider.DriftReason = "AMIDrift" - SubnetDrift cloudprovider.DriftReason = "SubnetDrift" - SecurityGroupDrift cloudprovider.DriftReason = "SecurityGroupDrift" - NodeClassDrift cloudprovider.DriftReason = "NodeClassDrift" + AMIDrift cloudprovider.DriftReason = "AMIDrift" + SubnetDrift cloudprovider.DriftReason = "SubnetDrift" + SecurityGroupDrift cloudprovider.DriftReason = "SecurityGroupDrift" + NodeClassDrift cloudprovider.DriftReason = "NodeClassDrift" + CapacityReservationDrift cloudprovider.DriftReason = "CapacityReservationDrift" ) func (c *CloudProvider) isNodeClassDrifted(ctx context.Context, nodeClaim *corev1beta1.NodeClaim, nodePool *corev1beta1.NodePool, nodeClass *v1beta1.EC2NodeClass) (cloudprovider.DriftReason, error) { @@ -59,7 +60,11 @@ func (c *CloudProvider) isNodeClassDrifted(ctx context.Context, nodeClaim *corev if err != nil { return "", fmt.Errorf("calculating subnet drift, %w", err) } - drifted := lo.FindOrElse([]cloudprovider.DriftReason{amiDrifted, securitygroupDrifted, subnetDrifted}, "", func(i cloudprovider.DriftReason) bool { + capacityReservationDrifted, err := c.isCapacityReservationDrifted(nodeClaim, instance, nodeClass) + if err != nil { + return "", fmt.Errorf("calculating capacityreservation drift, %w", err) + } + drifted := lo.FindOrElse([]cloudprovider.DriftReason{amiDrifted, securitygroupDrifted, subnetDrifted, capacityReservationDrifted}, "", func(i cloudprovider.DriftReason) bool { return string(i) != "" }) return drifted, nil @@ -105,6 +110,32 @@ func (c *CloudProvider) isSubnetDrifted(instance *instance.Instance, nodeClass * return "", nil } +// Checks if the capacity reservation of nodeclaim is drifted, by ensuring node claim capacity reservation +// matches current ec2nodeclass capacity reservations +func (c *CloudProvider) isCapacityReservationDrifted( + nodeClaim *corev1beta1.NodeClaim, + instance *instance.Instance, + nodeClass *v1beta1.EC2NodeClass, +) (cloudprovider.DriftReason, error) { + nodeClaimCapacityReservationId := nodeClaim.Labels[v1beta1.LabelCapactiyReservationID] + if nodeClaimCapacityReservationId == "" { + return "", nil + } + + if nodeClaimCapacityReservationId != lo.FromPtr(instance.CapacityReservationID) { + // nodeClaim and instance are not synced yet + return "", nil + } + + for _, capacityReservation := range nodeClass.Status.CapacityReservations { + if nodeClaimCapacityReservationId == capacityReservation.ID { + return "", nil + } + } + + return CapacityReservationDrift, nil +} + // Checks if the security groups are drifted, by comparing the security groups returned from the SecurityGroupProvider // to the ec2 instance security groups func (c *CloudProvider) areSecurityGroupsDrifted(ec2Instance *instance.Instance, nodeClass *v1beta1.EC2NodeClass) (cloudprovider.DriftReason, error) { diff --git a/pkg/cloudprovider/suite_test.go b/pkg/cloudprovider/suite_test.go index 9bade4ba4f67..349d2c6857b9 100644 --- a/pkg/cloudprovider/suite_test.go +++ b/pkg/cloudprovider/suite_test.go @@ -1099,7 +1099,7 @@ var _ = Describe("CloudProvider", func() { {SubnetId: aws.String("test-subnet-2"), AvailabilityZone: aws.String("test-zone-1a"), AvailabilityZoneId: aws.String("tstz1-1a"), AvailableIpAddressCount: aws.Int64(100), Tags: []*ec2.Tag{{Key: aws.String("Name"), Value: aws.String("test-subnet-2")}}}, }}) - controller := status.NewController(env.Client, awsEnv.SubnetProvider, awsEnv.SecurityGroupProvider, awsEnv.AMIProvider, awsEnv.InstanceProfileProvider, awsEnv.LaunchTemplateProvider) + controller := status.NewController(env.Client, awsEnv.SubnetProvider, awsEnv.SecurityGroupProvider, awsEnv.AMIProvider, awsEnv.InstanceProfileProvider, awsEnv.LaunchTemplateProvider, awsEnv.CapacityReservationProvider) ExpectApplied(ctx, env.Client, nodePool, nodeClass) ExpectObjectReconciled(ctx, env.Client, controller, nodeClass) pod := coretest.UnschedulablePod(coretest.PodOptions{NodeSelector: map[string]string{v1.LabelTopologyZone: "test-zone-1a"}}) @@ -1116,7 +1116,7 @@ var _ = Describe("CloudProvider", func() { {SubnetId: aws.String("test-subnet-2"), AvailabilityZone: aws.String("test-zone-1a"), AvailabilityZoneId: aws.String("tstz1-1a"), AvailableIpAddressCount: aws.Int64(11), Tags: []*ec2.Tag{{Key: aws.String("Name"), Value: aws.String("test-subnet-2")}}}, }}) - controller := status.NewController(env.Client, awsEnv.SubnetProvider, awsEnv.SecurityGroupProvider, awsEnv.AMIProvider, awsEnv.InstanceProfileProvider, awsEnv.LaunchTemplateProvider) + controller := status.NewController(env.Client, awsEnv.SubnetProvider, awsEnv.SecurityGroupProvider, awsEnv.AMIProvider, awsEnv.InstanceProfileProvider, awsEnv.LaunchTemplateProvider, awsEnv.CapacityReservationProvider) nodePool.Spec.Template.Spec.Kubelet = &corev1beta1.KubeletConfiguration{MaxPods: aws.Int32(1)} ExpectApplied(ctx, env.Client, nodePool, nodeClass) ExpectObjectReconciled(ctx, env.Client, controller, nodeClass) @@ -1154,7 +1154,7 @@ var _ = Describe("CloudProvider", func() { }}) nodeClass.Spec.SubnetSelectorTerms = []v1beta1.SubnetSelectorTerm{{Tags: map[string]string{"Name": "test-subnet-1"}}} ExpectApplied(ctx, env.Client, nodePool, nodeClass) - controller := status.NewController(env.Client, awsEnv.SubnetProvider, awsEnv.SecurityGroupProvider, awsEnv.AMIProvider, awsEnv.InstanceProfileProvider, awsEnv.LaunchTemplateProvider) + controller := status.NewController(env.Client, awsEnv.SubnetProvider, awsEnv.SecurityGroupProvider, awsEnv.AMIProvider, awsEnv.InstanceProfileProvider, awsEnv.LaunchTemplateProvider, awsEnv.CapacityReservationProvider) ExpectObjectReconciled(ctx, env.Client, controller, nodeClass) podSubnet1 := coretest.UnschedulablePod() ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, podSubnet1) diff --git a/pkg/controllers/controllers.go b/pkg/controllers/controllers.go index b270658e95ef..2c88b12df164 100644 --- a/pkg/controllers/controllers.go +++ b/pkg/controllers/controllers.go @@ -25,6 +25,7 @@ import ( nodeclasstermination "github.com/aws/karpenter-provider-aws/pkg/controllers/nodeclass/termination" controllersinstancetype "github.com/aws/karpenter-provider-aws/pkg/controllers/providers/instancetype" controllerspricing "github.com/aws/karpenter-provider-aws/pkg/controllers/providers/pricing" + "github.com/aws/karpenter-provider-aws/pkg/providers/capacityreservation" "github.com/aws/karpenter-provider-aws/pkg/providers/launchtemplate" "github.com/aws/aws-sdk-go/aws/session" @@ -37,6 +38,7 @@ import ( "github.com/aws/karpenter-provider-aws/pkg/cache" "github.com/aws/karpenter-provider-aws/pkg/controllers/interruption" + nodeclaimcapacityreservation "github.com/aws/karpenter-provider-aws/pkg/controllers/nodeclaim/capacityreservation" nodeclaimgarbagecollection "github.com/aws/karpenter-provider-aws/pkg/controllers/nodeclaim/garbagecollection" nodeclaimtagging "github.com/aws/karpenter-provider-aws/pkg/controllers/nodeclaim/tagging" "github.com/aws/karpenter-provider-aws/pkg/operator/options" @@ -50,17 +52,40 @@ import ( "github.com/aws/karpenter-provider-aws/pkg/providers/subnet" ) -func NewControllers(ctx context.Context, sess *session.Session, clk clock.Clock, kubeClient client.Client, recorder events.Recorder, - unavailableOfferings *cache.UnavailableOfferings, cloudProvider cloudprovider.CloudProvider, subnetProvider subnet.Provider, - securityGroupProvider securitygroup.Provider, instanceProfileProvider instanceprofile.Provider, instanceProvider instance.Provider, - pricingProvider pricing.Provider, amiProvider amifamily.Provider, launchTemplateProvider launchtemplate.Provider, instanceTypeProvider instancetype.Provider) []controller.Controller { +func NewControllers( + ctx context.Context, + sess *session.Session, + clk clock.Clock, + kubeClient client.Client, + recorder events.Recorder, + unavailableOfferings *cache.UnavailableOfferings, + cloudProvider cloudprovider.CloudProvider, + subnetProvider subnet.Provider, + securityGroupProvider securitygroup.Provider, + instanceProfileProvider instanceprofile.Provider, + instanceProvider instance.Provider, + pricingProvider pricing.Provider, + amiProvider amifamily.Provider, + launchTemplateProvider launchtemplate.Provider, + instanceTypeProvider instancetype.Provider, + capacityReservationProvider capacityreservation.Provider, +) []controller.Controller { controllers := []controller.Controller{ nodeclasshash.NewController(kubeClient), - nodeclassstatus.NewController(kubeClient, subnetProvider, securityGroupProvider, amiProvider, instanceProfileProvider, launchTemplateProvider), + nodeclassstatus.NewController( + kubeClient, + subnetProvider, + securityGroupProvider, + amiProvider, + instanceProfileProvider, + launchTemplateProvider, + capacityReservationProvider, + ), nodeclasstermination.NewController(kubeClient, recorder, instanceProfileProvider, launchTemplateProvider), nodeclaimgarbagecollection.NewController(kubeClient, cloudProvider), nodeclaimtagging.NewController(kubeClient, instanceProvider), + nodeclaimcapacityreservation.NewController(kubeClient, cloudProvider), controllerspricing.NewController(pricingProvider), controllersinstancetype.NewController(instanceTypeProvider), } diff --git a/pkg/controllers/nodeclaim/capacityreservation/controller.go b/pkg/controllers/nodeclaim/capacityreservation/controller.go new file mode 100644 index 000000000000..519b2915ec49 --- /dev/null +++ b/pkg/controllers/nodeclaim/capacityreservation/controller.go @@ -0,0 +1,104 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package capacityreservation + +import ( + "context" + "fmt" + "time" + + "github.com/aws/karpenter-provider-aws/pkg/apis/v1beta1" + "github.com/awslabs/operatorpkg/singleton" + "github.com/samber/lo" + "k8s.io/apimachinery/pkg/api/equality" + controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + corev1beta1 "sigs.k8s.io/karpenter/pkg/apis/v1beta1" + "sigs.k8s.io/karpenter/pkg/cloudprovider" +) + +// Controller is an nodeclaim capacity reservation controller. +type Controller struct { + kubeClient client.Client + cloudProvider cloudprovider.CloudProvider +} + +func NewController( + kubeClient client.Client, + cloudProvider cloudprovider.CloudProvider, +) *Controller { + return &Controller{ + kubeClient: kubeClient, + cloudProvider: cloudProvider, + } +} + +func (c *Controller) Reconcile(ctx context.Context) (reconcile.Result, error) { + cloudProviderNodeClaims, err := c.cloudProvider.List(ctx) + if err != nil { + return reconcile.Result{}, fmt.Errorf("listing instances, %w", err) + } + + providerIDCloudProviderNodeClaimMap := make(map[string]*corev1beta1.NodeClaim, len(cloudProviderNodeClaims)) + for _, cloudProviderNodeClaim := range cloudProviderNodeClaims { + log.FromContext(ctx).WithValues("cloudProviderNodeClaim", cloudProviderNodeClaim).V(0).Info("reconcile nodeClaim adding cloudProviderNodeClaim") + providerIDCloudProviderNodeClaimMap[cloudProviderNodeClaim.Status.ProviderID] = cloudProviderNodeClaim + } + + nodeClaimList := &corev1beta1.NodeClaimList{} + if err := c.kubeClient.List(ctx, nodeClaimList); err != nil { + return reconcile.Result{}, fmt.Errorf("listing nodeclaims, %w", err) + } + + // Find the NodeClaims that don't match + // Then patch the label so that it adds or removes the capacity reservation label + errs := make([]error, len(nodeClaimList.Items)) + for i := range nodeClaimList.Items { + nodeClaim := nodeClaimList.Items[i] + stored := nodeClaim.DeepCopy() + + log.FromContext(ctx).WithValues("nodeClaim", nodeClaim).V(0).Info("reconcile nodeClaim") + + cloudProviderNodeClaim, ok := providerIDCloudProviderNodeClaimMap[nodeClaim.Status.ProviderID] + if !ok { + continue + } + + log.FromContext(ctx).WithValues("nodeClaim", nodeClaim, "cloudProviderNodeClaim", cloudProviderNodeClaim).V(0).Info("reconcile nodeClaim with cloudProviderNodeClaim") + + nodeClaim.Labels = lo.Assign(nodeClaim.Labels, map[string]string{ + v1beta1.LabelCapactiyReservationID: cloudProviderNodeClaim.Labels[v1beta1.LabelCapactiyReservationID], + }) + + if !equality.Semantic.DeepEqual(stored, nodeClaim) { + log.FromContext(ctx).WithValues("nodeClaim", nodeClaim, "stored", stored).V(0).Info("patch nodeClaim") + if err := c.kubeClient.Patch(ctx, &nodeClaim, client.MergeFrom(stored)); err != nil { + errs[i] = client.IgnoreNotFound(err) + } + } + } + + return reconcile.Result{RequeueAfter: time.Minute}, nil +} + +func (c *Controller) Register(_ context.Context, m manager.Manager) error { + return controllerruntime.NewControllerManagedBy(m). + Named("nodeclaim.capacityreservation"). + WatchesRawSource(singleton.Source()). + Complete(singleton.AsReconciler(c)) +} diff --git a/pkg/controllers/nodeclass/status/capacityreservation.go b/pkg/controllers/nodeclass/status/capacityreservation.go new file mode 100644 index 000000000000..84ae8056053e --- /dev/null +++ b/pkg/controllers/nodeclass/status/capacityreservation.go @@ -0,0 +1,68 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package status + +import ( + "context" + "fmt" + "sort" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/ec2" + "github.com/samber/lo" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + "github.com/aws/karpenter-provider-aws/pkg/apis/v1beta1" + "github.com/aws/karpenter-provider-aws/pkg/providers/capacityreservation" +) + +type CapacityReservation struct { + capacityReservationProvider capacityreservation.Provider +} + +func (sg *CapacityReservation) Reconcile(ctx context.Context, nodeClass *v1beta1.EC2NodeClass) (reconcile.Result, error) { + capacityReservations, err := sg.capacityReservationProvider.List(ctx, nodeClass) + if err != nil { + return reconcile.Result{}, err + } + if len(capacityReservations) == 0 && len(nodeClass.Spec.CapacityReservationSelectorTerms) > 0 { + nodeClass.Status.CapacityReservations = nil + return reconcile.Result{}, fmt.Errorf("no capacity reservations exist given constraints") + } + sort.Slice(capacityReservations, func(i, j int) bool { + return *capacityReservations[i].CapacityReservationId < *capacityReservations[j].CapacityReservationId + }) + nodeClass.Status.CapacityReservations = lo.Map(capacityReservations, func(capacityReservation *ec2.CapacityReservation, _ int) v1beta1.CapacityReservation { + var endDate *string + if capacityReservation.EndDate != nil { + endDate = aws.String(capacityReservation.EndDate.Format(time.RFC3339)) + } + return v1beta1.CapacityReservation{ + ID: *capacityReservation.CapacityReservationId, + AvailabilityZone: *capacityReservation.AvailabilityZone, + AvailableInstanceCount: int(*capacityReservation.AvailableInstanceCount), + EndDate: endDate, + EndDateType: *capacityReservation.EndDateType, + InstanceMatchCriteria: *capacityReservation.InstanceMatchCriteria, + InstancePlatform: *capacityReservation.InstancePlatform, + InstanceType: *capacityReservation.InstanceType, + OwnerID: *capacityReservation.OwnerId, + StartDate: capacityReservation.StartDate.Format(time.RFC3339), + TotalInstanceCount: int(*capacityReservation.TotalInstanceCount), + } + }) + return reconcile.Result{RequeueAfter: 5 * time.Minute}, nil +} diff --git a/pkg/controllers/nodeclass/status/controller.go b/pkg/controllers/nodeclass/status/controller.go index 8c32ff832d14..9459b99b839c 100644 --- a/pkg/controllers/nodeclass/status/controller.go +++ b/pkg/controllers/nodeclass/status/controller.go @@ -33,6 +33,7 @@ import ( "github.com/aws/karpenter-provider-aws/pkg/apis/v1beta1" "github.com/aws/karpenter-provider-aws/pkg/providers/amifamily" + "github.com/aws/karpenter-provider-aws/pkg/providers/capacityreservation" "github.com/aws/karpenter-provider-aws/pkg/providers/instanceprofile" "github.com/aws/karpenter-provider-aws/pkg/providers/launchtemplate" "github.com/aws/karpenter-provider-aws/pkg/providers/securitygroup" @@ -46,23 +47,32 @@ type nodeClassStatusReconciler interface { type Controller struct { kubeClient client.Client - ami *AMI - instanceprofile *InstanceProfile - subnet *Subnet - securitygroup *SecurityGroup - readiness *Readiness //TODO : Remove this when we have sub status conditions + ami *AMI + instanceprofile *InstanceProfile + subnet *Subnet + securitygroup *SecurityGroup + readiness *Readiness //TODO : Remove this when we have sub status conditions + capacityreservation *CapacityReservation } -func NewController(kubeClient client.Client, subnetProvider subnet.Provider, securityGroupProvider securitygroup.Provider, - amiProvider amifamily.Provider, instanceProfileProvider instanceprofile.Provider, launchTemplateProvider launchtemplate.Provider) *Controller { +func NewController( + kubeClient client.Client, + subnetProvider subnet.Provider, + securityGroupProvider securitygroup.Provider, + amiProvider amifamily.Provider, + instanceProfileProvider instanceprofile.Provider, + launchTemplateProvider launchtemplate.Provider, + capacityReservationProvider capacityreservation.Provider, +) *Controller { return &Controller{ kubeClient: kubeClient, - ami: &AMI{amiProvider: amiProvider}, - subnet: &Subnet{subnetProvider: subnetProvider}, - securitygroup: &SecurityGroup{securityGroupProvider: securityGroupProvider}, - instanceprofile: &InstanceProfile{instanceProfileProvider: instanceProfileProvider}, - readiness: &Readiness{launchTemplateProvider: launchTemplateProvider}, + ami: &AMI{amiProvider: amiProvider}, + subnet: &Subnet{subnetProvider: subnetProvider}, + securitygroup: &SecurityGroup{securityGroupProvider: securityGroupProvider}, + instanceprofile: &InstanceProfile{instanceProfileProvider: instanceProfileProvider}, + readiness: &Readiness{launchTemplateProvider: launchTemplateProvider}, + capacityreservation: &CapacityReservation{capacityReservationProvider: capacityReservationProvider}, } } @@ -86,6 +96,7 @@ func (c *Controller) Reconcile(ctx context.Context, nodeClass *v1beta1.EC2NodeCl c.securitygroup, c.instanceprofile, c.readiness, + c.capacityreservation, } { res, err := reconciler.Reconcile(ctx, nodeClass) errs = multierr.Append(errs, err) diff --git a/pkg/controllers/nodeclass/status/suite_test.go b/pkg/controllers/nodeclass/status/suite_test.go index bd3afe8886ab..02b45baacaec 100644 --- a/pkg/controllers/nodeclass/status/suite_test.go +++ b/pkg/controllers/nodeclass/status/suite_test.go @@ -59,6 +59,7 @@ var _ = BeforeSuite(func() { awsEnv.AMIProvider, awsEnv.InstanceProfileProvider, awsEnv.LaunchTemplateProvider, + awsEnv.CapacityReservationProvider, ) }) diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index 5267aad4672d..26de0dbb4837 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -48,6 +48,7 @@ var ( "UnfulfillableCapacity", "Unsupported", "InsufficientFreeAddressesInSubnet", + "ReservationCapacityExceeded", ) ) diff --git a/pkg/operator/operator.go b/pkg/operator/operator.go index 3b95d093d869..1cecc1d80f3f 100644 --- a/pkg/operator/operator.go +++ b/pkg/operator/operator.go @@ -54,6 +54,7 @@ import ( awscache "github.com/aws/karpenter-provider-aws/pkg/cache" "github.com/aws/karpenter-provider-aws/pkg/operator/options" "github.com/aws/karpenter-provider-aws/pkg/providers/amifamily" + "github.com/aws/karpenter-provider-aws/pkg/providers/capacityreservation" "github.com/aws/karpenter-provider-aws/pkg/providers/instance" "github.com/aws/karpenter-provider-aws/pkg/providers/instanceprofile" "github.com/aws/karpenter-provider-aws/pkg/providers/instancetype" @@ -73,19 +74,20 @@ func init() { type Operator struct { *operator.Operator - Session *session.Session - UnavailableOfferingsCache *awscache.UnavailableOfferings - EC2API ec2iface.EC2API - SubnetProvider subnet.Provider - SecurityGroupProvider securitygroup.Provider - InstanceProfileProvider instanceprofile.Provider - AMIProvider amifamily.Provider - AMIResolver *amifamily.Resolver - LaunchTemplateProvider launchtemplate.Provider - PricingProvider pricing.Provider - VersionProvider version.Provider - InstanceTypesProvider instancetype.Provider - InstanceProvider instance.Provider + Session *session.Session + UnavailableOfferingsCache *awscache.UnavailableOfferings + EC2API ec2iface.EC2API + SubnetProvider subnet.Provider + SecurityGroupProvider securitygroup.Provider + InstanceProfileProvider instanceprofile.Provider + AMIProvider amifamily.Provider + AMIResolver *amifamily.Resolver + LaunchTemplateProvider launchtemplate.Provider + PricingProvider pricing.Provider + VersionProvider version.Provider + InstanceTypesProvider instancetype.Provider + InstanceProvider instance.Provider + CapacityReservationProvider capacityreservation.Provider } func NewOperator(ctx context.Context, operator *operator.Operator) (context.Context, *Operator) { @@ -149,6 +151,7 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont launchTemplateProvider := launchtemplate.NewDefaultProvider( ctx, cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval), + cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval), ec2api, eks.New(sess), amiResolver, @@ -176,22 +179,27 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont subnetProvider, launchTemplateProvider, ) + capacityReservationProvider := capacityreservation.NewDefaultProvider( + ec2api, + cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval), + ) return ctx, &Operator{ - Operator: operator, - Session: sess, - UnavailableOfferingsCache: unavailableOfferingsCache, - EC2API: ec2api, - SubnetProvider: subnetProvider, - SecurityGroupProvider: securityGroupProvider, - InstanceProfileProvider: instanceProfileProvider, - AMIProvider: amiProvider, - AMIResolver: amiResolver, - VersionProvider: versionProvider, - LaunchTemplateProvider: launchTemplateProvider, - PricingProvider: pricingProvider, - InstanceTypesProvider: instanceTypeProvider, - InstanceProvider: instanceProvider, + Operator: operator, + Session: sess, + UnavailableOfferingsCache: unavailableOfferingsCache, + EC2API: ec2api, + SubnetProvider: subnetProvider, + SecurityGroupProvider: securityGroupProvider, + InstanceProfileProvider: instanceProfileProvider, + AMIProvider: amiProvider, + AMIResolver: amiResolver, + VersionProvider: versionProvider, + LaunchTemplateProvider: launchTemplateProvider, + PricingProvider: pricingProvider, + InstanceTypesProvider: instanceTypeProvider, + InstanceProvider: instanceProvider, + CapacityReservationProvider: capacityReservationProvider, } } diff --git a/pkg/providers/amifamily/resolver.go b/pkg/providers/amifamily/resolver.go index c257111900b1..6764d4e21569 100644 --- a/pkg/providers/amifamily/resolver.go +++ b/pkg/providers/amifamily/resolver.go @@ -23,6 +23,7 @@ import ( "github.com/imdario/mergo" "github.com/samber/lo" core "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" corev1beta1 "sigs.k8s.io/karpenter/pkg/apis/v1beta1" @@ -65,14 +66,15 @@ type Options struct { // LaunchTemplate holds the dynamically generated launch template parameters type LaunchTemplate struct { *Options - UserData bootstrap.Bootstrapper - BlockDeviceMappings []*v1beta1.BlockDeviceMapping - MetadataOptions *v1beta1.MetadataOptions - AMIID string - InstanceTypes []*cloudprovider.InstanceType `hash:"ignore"` - DetailedMonitoring bool - EFACount int - CapacityType string + UserData bootstrap.Bootstrapper + BlockDeviceMappings []*v1beta1.BlockDeviceMapping + MetadataOptions *v1beta1.MetadataOptions + AMIID string + InstanceTypes []*cloudprovider.InstanceType `hash:"ignore"` + DetailedMonitoring bool + EFACount int + CapacityType string + CapacityReservationID *string } // AMIFamily can be implemented to override the default logic for generating dynamic launch template parameters @@ -149,8 +151,31 @@ func (r Resolver) Resolve(nodeClass *v1beta1.EC2NodeClass, nodeClaim *corev1beta maxPods: int(instanceType.Capacity.Pods().Value()), } }) + // tvonhacht: figure out if capacity-reservation is available for params, instanceTypes := range paramsToInstanceTypes { - resolved, err := r.resolveLaunchTemplate(nodeClass, nodeClaim, instanceTypes, capacityType, amiFamily, amiID, params.maxPods, params.efaCount, options) + if r.isCapacityReservationLaunch(nodeClaim, instanceTypes) { + requirements := scheduling.NewNodeSelectorRequirementsWithMinValues(nodeClaim.Spec.Requirements...) + for _, instanceType := range instanceTypes { + for _, offering := range instanceType.Offerings { + requirements[corev1beta1.CapacityTypeLabelKey] = scheduling.NewRequirement(corev1beta1.CapacityTypeLabelKey, v1.NodeSelectorOpIn, corev1beta1.CapacityTypeOnDemand) + requirements[v1beta1.LabelCapactiyReservationID] = scheduling.NewRequirement(v1beta1.LabelCapactiyReservationID, v1.NodeSelectorOpExists) + if requirements.Compatible(offering.Requirements, scheduling.AllowUndefinedWellKnownLabels) != nil { + continue + } + + capacityReservationID := offering.Requirements.Get(v1beta1.LabelCapactiyReservationID).Values()[0] + + resolved, err := r.resolveLaunchTemplate(nodeClass, nodeClaim, instanceTypes, capacityType, amiFamily, amiID, params.maxPods, params.efaCount, &capacityReservationID, options) + if err != nil { + return nil, err + } + resolvedTemplates = append(resolvedTemplates, resolved) + } + } + continue + } + + resolved, err := r.resolveLaunchTemplate(nodeClass, nodeClaim, instanceTypes, capacityType, amiFamily, amiID, params.maxPods, params.efaCount, nil, options) if err != nil { return nil, err } @@ -160,6 +185,28 @@ func (r Resolver) Resolve(nodeClass *v1beta1.EC2NodeClass, nodeClaim *corev1beta return resolvedTemplates, nil } +func (r Resolver) isCapacityReservationLaunch(nodeClaim *corev1beta1.NodeClaim, instanceTypes []*cloudprovider.InstanceType) bool { + requirements := scheduling.NewNodeSelectorRequirementsWithMinValues(nodeClaim.Spec.Requirements...) + // requirements must allow on-demand + if !requirements.Get(corev1beta1.CapacityTypeLabelKey).Has(corev1beta1.CapacityTypeOnDemand) { + return false + } + + requirements[corev1beta1.CapacityTypeLabelKey] = scheduling.NewRequirement(corev1beta1.CapacityTypeLabelKey, v1.NodeSelectorOpIn, corev1beta1.CapacityTypeOnDemand) + requirements[v1beta1.LabelCapactiyReservationID] = scheduling.NewRequirement(v1beta1.LabelCapactiyReservationID, v1.NodeSelectorOpExists) + for _, instanceType := range instanceTypes { + for _, offering := range instanceType.Offerings { + if requirements.Compatible(offering.Requirements, scheduling.AllowUndefinedWellKnownLabels) == nil { + if offering.Requirements.Has(v1beta1.LabelCapactiyReservationID) { + return true + } + } + } + } + + return false +} + func GetAMIFamily(amiFamily *string, options *Options) AMIFamily { switch aws.StringValue(amiFamily) { case v1beta1.AMIFamilyBottlerocket: @@ -206,7 +253,7 @@ func (r Resolver) defaultClusterDNS(opts *Options, kubeletConfig *corev1beta1.Ku } func (r Resolver) resolveLaunchTemplate(nodeClass *v1beta1.EC2NodeClass, nodeClaim *corev1beta1.NodeClaim, instanceTypes []*cloudprovider.InstanceType, capacityType string, - amiFamily AMIFamily, amiID string, maxPods int, efaCount int, options *Options) (*LaunchTemplate, error) { + amiFamily AMIFamily, amiID string, maxPods int, efaCount int, capacityReservationID *string, options *Options) (*LaunchTemplate, error) { kubeletConfig := &corev1beta1.KubeletConfiguration{} if nodeClaim.Spec.Kubelet != nil { if err := mergo.Merge(kubeletConfig, nodeClaim.Spec.Kubelet); err != nil { @@ -227,13 +274,14 @@ func (r Resolver) resolveLaunchTemplate(nodeClass *v1beta1.EC2NodeClass, nodeCla nodeClass.Spec.UserData, options.InstanceStorePolicy, ), - BlockDeviceMappings: nodeClass.Spec.BlockDeviceMappings, - MetadataOptions: nodeClass.Spec.MetadataOptions, - DetailedMonitoring: aws.BoolValue(nodeClass.Spec.DetailedMonitoring), - AMIID: amiID, - InstanceTypes: instanceTypes, - EFACount: efaCount, - CapacityType: capacityType, + BlockDeviceMappings: nodeClass.Spec.BlockDeviceMappings, + MetadataOptions: nodeClass.Spec.MetadataOptions, + DetailedMonitoring: aws.BoolValue(nodeClass.Spec.DetailedMonitoring), + AMIID: amiID, + InstanceTypes: instanceTypes, + EFACount: efaCount, + CapacityType: capacityType, + CapacityReservationID: capacityReservationID, } if len(resolved.BlockDeviceMappings) == 0 { resolved.BlockDeviceMappings = amiFamily.DefaultBlockDeviceMappings() diff --git a/pkg/providers/capacityreservation/capacityreservation.go b/pkg/providers/capacityreservation/capacityreservation.go new file mode 100644 index 000000000000..0c2f1e7bcc9a --- /dev/null +++ b/pkg/providers/capacityreservation/capacityreservation.go @@ -0,0 +1,204 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package capacityreservation + +import ( + "context" + "fmt" + "sync" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/ec2" + "github.com/aws/aws-sdk-go/service/ec2/ec2iface" + "github.com/mitchellh/hashstructure/v2" + "github.com/patrickmn/go-cache" + "github.com/samber/lo" + "knative.dev/pkg/logging" + + "sigs.k8s.io/karpenter/pkg/utils/pretty" + + "github.com/aws/karpenter-provider-aws/pkg/apis/v1beta1" +) + +type Provider interface { + List(context.Context, *v1beta1.EC2NodeClass) ([]*ec2.CapacityReservation, error) +} + +type DefaultProvider struct { + sync.Mutex + ec2api ec2iface.EC2API + cache *cache.Cache + cm *pretty.ChangeMonitor +} + +func NewDefaultProvider(ec2api ec2iface.EC2API, cache *cache.Cache) *DefaultProvider { + return &DefaultProvider{ + ec2api: ec2api, + cm: pretty.NewChangeMonitor(), + cache: cache, + } +} + +func (p *DefaultProvider) List(ctx context.Context, nodeClass *v1beta1.EC2NodeClass) ([]*ec2.CapacityReservation, error) { + p.Lock() + defer p.Unlock() + + capacityReservations, err := p.getCapacityReservations(ctx, nodeClass.Spec.CapacityReservationSelectorTerms) + if err != nil { + return nil, fmt.Errorf("get capacity reservations, %w", err) + } + if p.cm.HasChanged(fmt.Sprintf("capacity-reservations/%s", nodeClass.Name), capacityReservations) { + logging.FromContext(ctx). + With("capacity-reservations", lo.Map(capacityReservations, func(s *ec2.CapacityReservation, _ int) string { + return aws.StringValue(s.CapacityReservationId) + })). + Debugf("discovered capacity reservations") + } + return capacityReservations, nil +} + +func (p *DefaultProvider) getCapacityReservations(ctx context.Context, terms []v1beta1.CapacityReservationSelectorTerm) ([]*ec2.CapacityReservation, error) { + hash, err := hashstructure.Hash(terms, hashstructure.FormatV2, &hashstructure.HashOptions{SlicesAsSets: true}) + if err != nil { + return nil, err + } + + if cr, ok := p.cache.Get(fmt.Sprint(hash)); ok { + return cr.([]*ec2.CapacityReservation), nil + } + + capacityReservationsUnfiltered, err := p.describeCapacityReservations(ctx) + if err != nil { + return nil, err + } + + capacityReservationsMap := map[string]*ec2.CapacityReservation{} + for _, term := range terms { + capacityReservations := getCapacityReservations(capacityReservationsUnfiltered, term) + for _, capacityReservation := range capacityReservations { + capacityReservationsMap[lo.FromPtr(capacityReservation.CapacityReservationId)] = capacityReservation + } + } + p.cache.SetDefault(fmt.Sprint(hash), lo.Values(capacityReservationsMap)) + return lo.Values(capacityReservationsMap), nil +} + +func (p *DefaultProvider) describeCapacityReservations(ctx context.Context) ([]*ec2.CapacityReservation, error) { + describeCapacityReservations := []*ec2.CapacityReservation{} + + err := p.ec2api.DescribeCapacityReservationsPagesWithContext( + ctx, + &ec2.DescribeCapacityReservationsInput{ + Filters: []*ec2.Filter{ + { + Name: aws.String("state"), + Values: aws.StringSlice([]string{ec2.CapacityReservationFleetStateActive}), + }, + }, + }, + func(describeCapacityReservationsOutput *ec2.DescribeCapacityReservationsOutput, lastPage bool) bool { + describeCapacityReservations = append( + describeCapacityReservations, + describeCapacityReservationsOutput.CapacityReservations..., + ) + return !lastPage + }, + ) + if err != nil { + return nil, err + } + + return describeCapacityReservations, nil +} + +func getCapacityReservations( + capacityReservationsUnfiltered []*ec2.CapacityReservation, + term v1beta1.CapacityReservationSelectorTerm, +) []*ec2.CapacityReservation { + capacityReservations := []*ec2.CapacityReservation{} + + for _, capacityReservation := range capacityReservationsUnfiltered { + if matches(capacityReservation, term) { + capacityReservations = append( + capacityReservations, + capacityReservation, + ) + } + } + + return capacityReservations +} + +func matches( + capacityReservation *ec2.CapacityReservation, + term v1beta1.CapacityReservationSelectorTerm, +) bool { + if term.ID != "" { + return term.ID == aws.StringValue(capacityReservation.CapacityReservationId) + } + + if term.Type != "" { + if term.Type != aws.StringValue(capacityReservation.InstanceMatchCriteria) { + return false + } + } + + if term.AvailabilityZone != "" { + if term.AvailabilityZone != aws.StringValue(capacityReservation.AvailabilityZone) { + return false + } + } + + if term.InstanceType != "" { + if term.InstanceType != aws.StringValue(capacityReservation.InstanceType) { + return false + } + } + + if term.OwnerID != "" { + if term.OwnerID != aws.StringValue(capacityReservation.OwnerId) { + return false + } + } + + tags := getCapacityReservationTags(capacityReservation) + for key, value := range term.Tags { + if _, ok := tags[key]; !ok { + return false + } + + if value == "*" { + continue + } + + if tags[key] != value { + return false + } + + continue + } + + return true +} + +func getCapacityReservationTags(capacityReservation *ec2.CapacityReservation) map[string]string { + tags := map[string]string{} + + for _, tag := range capacityReservation.Tags { + tags[aws.StringValue(tag.Key)] = aws.StringValue(tag.Value) + } + + return tags +} diff --git a/pkg/providers/instance/instance.go b/pkg/providers/instance/instance.go index 95c17cefb206..8677562ee3c4 100644 --- a/pkg/providers/instance/instance.go +++ b/pkg/providers/instance/instance.go @@ -96,24 +96,25 @@ func (p *DefaultProvider) Create(ctx context.Context, nodeClass *v1beta1.EC2Node schedulingRequirements := scheduling.NewNodeSelectorRequirementsWithMinValues(nodeClaim.Spec.Requirements...) // Only filter the instances if there are no minValues in the requirement. if !schedulingRequirements.HasMinValues() { - instanceTypes = p.filterInstanceTypes(nodeClaim, instanceTypes) + instanceTypes = p.filterInstanceTypes(ctx, nodeClaim, instanceTypes) } instanceTypes, err := cloudprovider.InstanceTypes(instanceTypes).Truncate(schedulingRequirements, maxInstanceTypes) if err != nil { return nil, fmt.Errorf("truncating instance types, %w", err) } tags := getTags(ctx, nodeClass, nodeClaim) - fleetInstance, err := p.launchInstance(ctx, nodeClass, nodeClaim, instanceTypes, tags) + fleetInstance, capacityReservationID, err := p.launchInstance(ctx, nodeClass, nodeClaim, instanceTypes, tags) if awserrors.IsLaunchTemplateNotFound(err) { // retry once if launch template is not found. This allows karpenter to generate a new LT if the // cache was out-of-sync on the first try - fleetInstance, err = p.launchInstance(ctx, nodeClass, nodeClaim, instanceTypes, tags) + fleetInstance, capacityReservationID, err = p.launchInstance(ctx, nodeClass, nodeClaim, instanceTypes, tags) } if err != nil { return nil, err } efaEnabled := lo.Contains(lo.Keys(nodeClaim.Spec.Resources.Requests), v1beta1.ResourceEFA) - return NewInstanceFromFleet(fleetInstance, tags, efaEnabled), nil + + return NewInstanceFromFleet(fleetInstance, tags, efaEnabled, capacityReservationID), nil } func (p *DefaultProvider) Get(ctx context.Context, id string) (*Instance, error) { @@ -200,17 +201,17 @@ func (p *DefaultProvider) CreateTags(ctx context.Context, id string, tags map[st return nil } -func (p *DefaultProvider) launchInstance(ctx context.Context, nodeClass *v1beta1.EC2NodeClass, nodeClaim *corev1beta1.NodeClaim, instanceTypes []*cloudprovider.InstanceType, tags map[string]string) (*ec2.CreateFleetInstance, error) { +func (p *DefaultProvider) launchInstance(ctx context.Context, nodeClass *v1beta1.EC2NodeClass, nodeClaim *corev1beta1.NodeClaim, instanceTypes []*cloudprovider.InstanceType, tags map[string]string) (*ec2.CreateFleetInstance, *string, error) { capacityType := p.getCapacityType(nodeClaim, instanceTypes) zonalSubnets, err := p.subnetProvider.ZonalSubnetsForLaunch(ctx, nodeClass, instanceTypes, capacityType) if err != nil { - return nil, fmt.Errorf("getting subnets, %w", err) + return nil, nil, fmt.Errorf("getting subnets, %w", err) } // Get Launch Template Configs, which may differ due to GPU or Architecture requirements launchTemplateConfigs, err := p.getLaunchTemplateConfigs(ctx, nodeClass, nodeClaim, instanceTypes, zonalSubnets, capacityType, tags) if err != nil { - return nil, fmt.Errorf("getting launch template configs, %w", err) + return nil, nil, fmt.Errorf("getting launch template configs, %w", err) } if err := p.checkODFallback(nodeClaim, instanceTypes, launchTemplateConfigs); err != nil { log.FromContext(ctx).Error(err, "failed while checking on-demand fallback") @@ -236,6 +237,7 @@ func (p *DefaultProvider) launchInstance(ctx context.Context, nodeClass *v1beta1 createFleetInput.OnDemandOptions = &ec2.OnDemandOptionsRequest{AllocationStrategy: aws.String(ec2.FleetOnDemandAllocationStrategyLowestPrice)} } + log.FromContext(ctx).WithValues("createFleetInput", createFleetInput).V(0).Info("CreateFleet") createFleetOutput, err := p.ec2Batcher.CreateFleet(ctx, createFleetInput) p.subnetProvider.UpdateInflightIPs(createFleetInput, createFleetOutput, instanceTypes, lo.Values(zonalSubnets), capacityType) if err != nil { @@ -243,19 +245,24 @@ func (p *DefaultProvider) launchInstance(ctx context.Context, nodeClass *v1beta1 for _, lt := range launchTemplateConfigs { p.launchTemplateProvider.InvalidateCache(ctx, aws.StringValue(lt.LaunchTemplateSpecification.LaunchTemplateName), aws.StringValue(lt.LaunchTemplateSpecification.LaunchTemplateId)) } - return nil, fmt.Errorf("creating fleet %w", err) + return nil, nil, fmt.Errorf("creating fleet %w", err) } var reqFailure awserr.RequestFailure if errors.As(err, &reqFailure) { - return nil, fmt.Errorf("creating fleet %w (%s)", err, reqFailure.RequestID()) + return nil, nil, fmt.Errorf("creating fleet %w (%s)", err, reqFailure.RequestID()) } - return nil, fmt.Errorf("creating fleet %w", err) + return nil, nil, fmt.Errorf("creating fleet %w", err) } + p.updateUnavailableOfferingsCache(ctx, createFleetOutput.Errors, capacityType) + p.updateUnavailableCapacityReservation(ctx, nodeClass, createFleetOutput.Errors) if len(createFleetOutput.Instances) == 0 || len(createFleetOutput.Instances[0].InstanceIds) == 0 { - return nil, combineFleetErrors(createFleetOutput.Errors) + return nil, nil, combineFleetErrors(createFleetOutput.Errors) } - return createFleetOutput.Instances[0], nil + + launchTemplateName := createFleetInput.LaunchTemplateConfigs[0].LaunchTemplateSpecification.LaunchTemplateName + capacityReservationID := p.launchTemplateProvider.GetCapacityReservationID(*launchTemplateName) + return createFleetOutput.Instances[0], capacityReservationID, nil } func getTags(ctx context.Context, nodeClass *v1beta1.EC2NodeClass, nodeClaim *corev1beta1.NodeClaim) map[string]string { @@ -298,11 +305,16 @@ func (p *DefaultProvider) getLaunchTemplateConfigs(ctx context.Context, nodeClas if err != nil { return nil, fmt.Errorf("getting launch templates, %w", err) } - requirements := scheduling.NewNodeSelectorRequirementsWithMinValues(nodeClaim.Spec.Requirements...) - requirements[corev1beta1.CapacityTypeLabelKey] = scheduling.NewRequirement(corev1beta1.CapacityTypeLabelKey, v1.NodeSelectorOpIn, capacityType) + + // Pick 1 NodePool (instanceType + AZ) if mulitple ODCRs match it, because CreateFleet does not allow for _, launchTemplate := range launchTemplates { + requirements := scheduling.NewNodeSelectorRequirementsWithMinValues(nodeClaim.Spec.Requirements...) + requirements[corev1beta1.CapacityTypeLabelKey] = scheduling.NewRequirement(corev1beta1.CapacityTypeLabelKey, v1.NodeSelectorOpIn, capacityType) + if launchTemplate.CapacityReservationID != nil { + requirements[v1beta1.LabelCapactiyReservationID] = scheduling.NewRequirement(v1beta1.LabelCapactiyReservationID, v1.NodeSelectorOpIn, *launchTemplate.CapacityReservationID) + } launchTemplateConfig := &ec2.FleetLaunchTemplateConfigRequest{ - Overrides: p.getOverrides(launchTemplate.InstanceTypes, zonalSubnets, requirements, launchTemplate.ImageID), + Overrides: p.getOverrides(ctx, launchTemplate.InstanceTypes, zonalSubnets, requirements, launchTemplate.ImageID), LaunchTemplateSpecification: &ec2.FleetLaunchTemplateSpecificationRequest{ LaunchTemplateName: aws.String(launchTemplate.Name), Version: aws.String("$Latest"), @@ -312,6 +324,8 @@ func (p *DefaultProvider) getLaunchTemplateConfigs(ctx context.Context, nodeClas launchTemplateConfigs = append(launchTemplateConfigs, launchTemplateConfig) } } + + log.FromContext(ctx).WithValues("launchTemplateConfigs", launchTemplateConfigs).V(0).Info("getLaunchTemplateConfigs") if len(launchTemplateConfigs) == 0 { return nil, fmt.Errorf("no capacity offerings are currently available given the constraints") } @@ -320,7 +334,7 @@ func (p *DefaultProvider) getLaunchTemplateConfigs(ctx context.Context, nodeClas // getOverrides creates and returns launch template overrides for the cross product of InstanceTypes and subnets (with subnets being constrained by // zones and the offerings in InstanceTypes) -func (p *DefaultProvider) getOverrides(instanceTypes []*cloudprovider.InstanceType, zonalSubnets map[string]*subnet.Subnet, reqs scheduling.Requirements, image string) []*ec2.FleetLaunchTemplateOverridesRequest { +func (p *DefaultProvider) getOverrides(ctx context.Context, instanceTypes []*cloudprovider.InstanceType, zonalSubnets map[string]*subnet.Subnet, reqs scheduling.Requirements, image string) []*ec2.FleetLaunchTemplateOverridesRequest { // Unwrap all the offerings to a flat slice that includes a pointer // to the parent instance type name type offeringWithParentName struct { @@ -339,9 +353,12 @@ func (p *DefaultProvider) getOverrides(instanceTypes []*cloudprovider.InstanceTy } var overrides []*ec2.FleetLaunchTemplateOverridesRequest for _, offering := range unwrappedOfferings { + log.FromContext(ctx).WithValues("offering.Requirements", offering.Requirements).V(0).Info("getOverrides") if reqs.Compatible(offering.Requirements, scheduling.AllowUndefinedWellKnownLabels) != nil { + log.FromContext(ctx).WithValues("offering.Requirements", offering.Requirements, "reqs", reqs).V(0).Info("getOverrides not compatible") continue } + log.FromContext(ctx).WithValues("unwrappedOffering", offering, "zones", offering.Requirements.Get(v1.LabelTopologyZone).Values()).V(0).Info("getOverrides compatible") subnet, ok := zonalSubnets[offering.Requirements.Get(v1.LabelTopologyZone).Any()] if !ok { continue @@ -366,6 +383,36 @@ func (p *DefaultProvider) updateUnavailableOfferingsCache(ctx context.Context, e } } +func (p *DefaultProvider) updateUnavailableCapacityReservation(ctx context.Context, nodeClass *v1beta1.EC2NodeClass, errors []*ec2.CreateFleetError) { + // // TODO: tvonhacht maybe not needed because drift detection will add it, or a later refresh? if we do this maybe we should do this withn the Create before returning the object? + // instance, err = c.instanceProvider.Get(ctx, instance.ID) + // if err != nil { + // return nil, fmt.Errorf("getting instance after creating, %w", err) + // } + + // offerings[capacityReservationID] + // nodeClass.Status.CapacityReservations[instance.CapacityReservationID].AvaialbleCount-- + + // 1. GetInstanceTypes() relies on the NodeClass status capacity reservation information + // 2. A failure to launch a capacity reservation causes us to update the NodeClass status avaialble count for that CR ID to 0 + // CreateFleet returns that it can't fulfill the request + // 3. Launching into a capacity reservation pool causes us to decrement the available count on the NodeClass status for that CR ID + // 4. When there is an update on the NodeClass capacity reservation status, that should invalidate the current GetInstanceTypes() cache for that NodePool/NodeClass + // 5. The Capacity reservation provider reconciliation polls to update the capacity reservation status + + // Note: One thing to consider here is that you can't read your writes immediately, so there is a delay between when we do the update and when we see the update + + for _, err := range errors { + if "CapacityReservationExceeded" == *err.ErrorCode { + // "fake-capacity-reservation-id" + // nodeClass.Status.CapacityReservations[0].AvailableInstanceCount = 0 + // p.kubeClient. + // ec2nodeclass.status.capacityreservations[capacityReservationID].availableCount = 0 + + } + } +} + // getCapacityType selects spot if both constraints are flexible and there is an // available offering. The AWS Cloud Provider defaults to [ on-demand ], so spot // must be explicitly included in capacity type requirements. @@ -386,13 +433,21 @@ func (p *DefaultProvider) getCapacityType(nodeClaim *corev1beta1.NodeClaim, inst // filterInstanceTypes is used to provide filtering on the list of potential instance types to further limit it to those // that make the most sense given our specific AWS cloudprovider. -func (p *DefaultProvider) filterInstanceTypes(nodeClaim *corev1beta1.NodeClaim, instanceTypes []*cloudprovider.InstanceType) []*cloudprovider.InstanceType { +func (p *DefaultProvider) filterInstanceTypes(ctx context.Context, nodeClaim *corev1beta1.NodeClaim, instanceTypes []*cloudprovider.InstanceType) []*cloudprovider.InstanceType { instanceTypes = filterExoticInstanceTypes(instanceTypes) // If we could potentially launch either a spot or on-demand node, we want to filter out the spot instance types that // are more expensive than the cheapest on-demand type. if p.isMixedCapacityLaunch(nodeClaim, instanceTypes) { instanceTypes = filterUnwantedSpot(instanceTypes) } + // If we could potentially launch either a spot or on-demand node, we want to filter out the spot instance types that + // are more expensive than the cheapest on-demand type. + log.FromContext(ctx).WithValues("instanceTypes", instanceTypes, "scheduling.allowUndefinedWellKnownLabels", scheduling.AllowUndefinedWellKnownLabels).V(0).Info("filterInstanceTypes capacity reservations") + if p.isCapacityReservationLaunch(nodeClaim, instanceTypes) { + log.FromContext(ctx).V(0).Info("filterInstanceTypes capacity reservations only capacity reservations") + instanceTypes = filterOnlyCapacityReservations(instanceTypes) + } + log.FromContext(ctx).WithValues("instanceTypes", instanceTypes).V(0).Info("filterInstanceTypes capacity reservations after") return instanceTypes } @@ -424,6 +479,30 @@ func (p *DefaultProvider) isMixedCapacityLaunch(nodeClaim *corev1beta1.NodeClaim return hasSpotOfferings && hasODOffering } +// isMixedCapacityLaunch returns true if nodepools and available offerings could potentially allow either a spot or +// and on-demand node to launch +func (p *DefaultProvider) isCapacityReservationLaunch(nodeClaim *corev1beta1.NodeClaim, instanceTypes []*cloudprovider.InstanceType) bool { + requirements := scheduling.NewNodeSelectorRequirementsWithMinValues(nodeClaim.Spec.Requirements...) + // requirements must allow on-demand + if !requirements.Get(corev1beta1.CapacityTypeLabelKey).Has(corev1beta1.CapacityTypeOnDemand) { + return false + } + + requirements[corev1beta1.CapacityTypeLabelKey] = scheduling.NewRequirement(corev1beta1.CapacityTypeLabelKey, v1.NodeSelectorOpIn, corev1beta1.CapacityTypeOnDemand) + requirements[v1beta1.LabelCapactiyReservationID] = scheduling.NewRequirement(v1beta1.LabelCapactiyReservationID, v1.NodeSelectorOpExists) + for _, instanceType := range instanceTypes { + for _, offering := range instanceType.Offerings { + if requirements.Compatible(offering.Requirements, scheduling.AllowUndefinedWellKnownLabels) == nil { + if offering.Requirements.Has(v1beta1.LabelCapactiyReservationID) { + return true + } + } + } + } + + return false +} + // filterUnwantedSpot is used to filter out spot types that are more expensive than the cheapest on-demand type that we // could launch during mixed capacity-type launches func filterUnwantedSpot(instanceTypes []*cloudprovider.InstanceType) []*cloudprovider.InstanceType { @@ -450,6 +529,34 @@ func filterUnwantedSpot(instanceTypes []*cloudprovider.InstanceType) []*cloudpro return instanceTypes } +// filterOnlyCapacityReservations is used to filter only offerings that are capacity reservations +func filterOnlyCapacityReservations(instanceTypes []*cloudprovider.InstanceType) []*cloudprovider.InstanceType { + // Filter out any types where the cheapest offering, which should be spot, is more expensive than the cheapest + // on-demand instance type that would have worked. This prevents us from getting a larger more-expensive spot + // instance type compared to the cheapest sufficiently large on-demand instance type + instanceTypes = lo.FilterMap(instanceTypes, func(item *cloudprovider.InstanceType, index int) (*cloudprovider.InstanceType, bool) { + available := lo.FilterMap(item.Offerings, func(offering cloudprovider.Offering, index int) (cloudprovider.Offering, bool) { + if !offering.Available { + return cloudprovider.Offering{}, false + } + + if !offering.Requirements.Has(v1beta1.LabelCapactiyReservationID) { + return cloudprovider.Offering{}, false + } + + return offering, true + }) + + if len(available) == 0 { + return nil, false + } + item.Offerings = available + + return item, true + }) + return instanceTypes +} + // filterExoticInstanceTypes is used to eliminate less desirable instance types (like GPUs) from the list of possible instance types when // a set of more appropriate instance types would work. If a set of more desirable instance types is not found, then the original slice // of instance types are returned. diff --git a/pkg/providers/instance/types.go b/pkg/providers/instance/types.go index 5f3804f2d004..8d678124b284 100644 --- a/pkg/providers/instance/types.go +++ b/pkg/providers/instance/types.go @@ -27,28 +27,30 @@ import ( // Instance is an internal data representation of either an ec2.Instance or an ec2.FleetInstance // It contains all the common data that is needed to inject into the Machine from either of these responses type Instance struct { - LaunchTime time.Time - State string - ID string - ImageID string - Type string - Zone string - CapacityType string - SecurityGroupIDs []string - SubnetID string - Tags map[string]string - EFAEnabled bool + LaunchTime time.Time + State string + ID string + ImageID string + Type string + Zone string + CapacityType string + CapacityReservationID *string + SecurityGroupIDs []string + SubnetID string + Tags map[string]string + EFAEnabled bool } func NewInstance(out *ec2.Instance) *Instance { return &Instance{ - LaunchTime: aws.TimeValue(out.LaunchTime), - State: aws.StringValue(out.State.Name), - ID: aws.StringValue(out.InstanceId), - ImageID: aws.StringValue(out.ImageId), - Type: aws.StringValue(out.InstanceType), - Zone: aws.StringValue(out.Placement.AvailabilityZone), - CapacityType: lo.Ternary(out.SpotInstanceRequestId != nil, corev1beta1.CapacityTypeSpot, corev1beta1.CapacityTypeOnDemand), + LaunchTime: aws.TimeValue(out.LaunchTime), + State: aws.StringValue(out.State.Name), + ID: aws.StringValue(out.InstanceId), + ImageID: aws.StringValue(out.ImageId), + Type: aws.StringValue(out.InstanceType), + Zone: aws.StringValue(out.Placement.AvailabilityZone), + CapacityType: lo.Ternary(out.SpotInstanceRequestId != nil, corev1beta1.CapacityTypeSpot, corev1beta1.CapacityTypeOnDemand), + CapacityReservationID: out.CapacityReservationId, SecurityGroupIDs: lo.Map(out.SecurityGroups, func(securitygroup *ec2.GroupIdentifier, _ int) string { return aws.StringValue(securitygroup.GroupId) }), @@ -61,17 +63,18 @@ func NewInstance(out *ec2.Instance) *Instance { } -func NewInstanceFromFleet(out *ec2.CreateFleetInstance, tags map[string]string, efaEnabled bool) *Instance { +func NewInstanceFromFleet(out *ec2.CreateFleetInstance, tags map[string]string, efaEnabled bool, capacityReservationID *string) *Instance { return &Instance{ - LaunchTime: time.Now(), // estimate the launch time since we just launched - State: ec2.StatePending, - ID: aws.StringValue(out.InstanceIds[0]), - ImageID: aws.StringValue(out.LaunchTemplateAndOverrides.Overrides.ImageId), - Type: aws.StringValue(out.InstanceType), - Zone: aws.StringValue(out.LaunchTemplateAndOverrides.Overrides.AvailabilityZone), - CapacityType: aws.StringValue(out.Lifecycle), - SubnetID: aws.StringValue(out.LaunchTemplateAndOverrides.Overrides.SubnetId), - Tags: tags, - EFAEnabled: efaEnabled, + LaunchTime: time.Now(), // estimate the launch time since we just launched + State: ec2.StatePending, + ID: aws.StringValue(out.InstanceIds[0]), + ImageID: aws.StringValue(out.LaunchTemplateAndOverrides.Overrides.ImageId), + Type: aws.StringValue(out.InstanceType), + Zone: aws.StringValue(out.LaunchTemplateAndOverrides.Overrides.AvailabilityZone), + CapacityType: aws.StringValue(out.Lifecycle), + CapacityReservationID: capacityReservationID, + SubnetID: aws.StringValue(out.LaunchTemplateAndOverrides.Overrides.SubnetId), + Tags: tags, + EFAEnabled: efaEnabled, } } diff --git a/pkg/providers/instancetype/instancetype.go b/pkg/providers/instancetype/instancetype.go index 82bbcb099faa..976b277e365c 100644 --- a/pkg/providers/instancetype/instancetype.go +++ b/pkg/providers/instancetype/instancetype.go @@ -164,10 +164,27 @@ func (p *DefaultProvider) List(ctx context.Context, kc *corev1beta1.KubeletConfi // Any changes to the values passed into the NewInstanceType method will require making updates to the cache key // so that Karpenter is able to cache the set of InstanceTypes based on values that alter the set of instance types // !!! Important !!! - return NewInstanceType(ctx, i, p.region, - nodeClass.Spec.BlockDeviceMappings, nodeClass.Spec.InstanceStorePolicy, - kc.MaxPods, kc.PodsPerCore, kc.KubeReserved, kc.SystemReserved, kc.EvictionHard, kc.EvictionSoft, - amiFamily, p.createOfferings(ctx, i, allZones, p.instanceTypeOfferings[aws.StringValue(i.InstanceType)], nodeClass.Status.Subnets), + return NewInstanceType( + ctx, + i, + p.region, + nodeClass.Spec.BlockDeviceMappings, + nodeClass.Spec.InstanceStorePolicy, + kc.MaxPods, + kc.PodsPerCore, + kc.KubeReserved, + kc.SystemReserved, + kc.EvictionHard, + kc.EvictionSoft, + amiFamily, + p.createOfferings( + ctx, + i, + allZones, + p.instanceTypeOfferings[aws.StringValue(i.InstanceType)], + nodeClass.Status.Subnets, + nodeClass.Status.CapacityReservations, + ), ) }) p.instanceTypesCache.SetDefault(key, result) @@ -261,7 +278,14 @@ func (p *DefaultProvider) UpdateInstanceTypeOfferings(ctx context.Context) error // offering, you can do the following thanks to this invariant: // // offering.Requirements.Get(v1.TopologyLabelZone).Any() -func (p *DefaultProvider) createOfferings(ctx context.Context, instanceType *ec2.InstanceTypeInfo, zones, instanceTypeZones sets.Set[string], subnets []v1beta1.Subnet) []cloudprovider.Offering { +func (p *DefaultProvider) createOfferings( + ctx context.Context, + instanceType *ec2.InstanceTypeInfo, + zones, + instanceTypeZones sets.Set[string], + subnets []v1beta1.Subnet, + capacityReservations []v1beta1.CapacityReservation, +) []cloudprovider.Offering { var offerings []cloudprovider.Offering for zone := range zones { // while usage classes should be a distinct set, there's no guarantee of that @@ -275,7 +299,7 @@ func (p *DefaultProvider) createOfferings(ctx context.Context, instanceType *ec2 price, ok = p.pricingProvider.SpotPrice(*instanceType.InstanceType, zone) case ec2.UsageClassTypeOnDemand: price, ok = p.pricingProvider.OnDemandPrice(*instanceType.InstanceType) - case "capacity-block": + case ec2.UsageClassTypeCapacityBlock: // ignore since karpenter doesn't support it yet, but do not log an unknown capacity type error continue default: @@ -311,6 +335,52 @@ func (p *DefaultProvider) createOfferings(ctx context.Context, instanceType *ec2 }).Set(price) } } + + log.FromContext(ctx).WithValues("instanceType", *instanceType.InstanceType, "capacityReservations", capacityReservations).V(0).Info("capacity reservations for instanceType") + for _, capacityReservation := range capacityReservations { + log.FromContext(ctx).WithValues("instanceType", *instanceType.InstanceType, "capacityReservation", capacityReservation.ID).V(0).Info("capacity reservation for instanceType") + if capacityReservation.InstanceType != *instanceType.InstanceType { + log.FromContext(ctx).WithValues("instanceType", *instanceType.InstanceType, "capacityReservation", capacityReservation.ID).V(0).Info("capacity reservation not applicatble for instanceType") + continue + } + log.FromContext(ctx).WithValues("instanceType", *instanceType.InstanceType, "capacityReservation", capacityReservation.ID).V(0).Info("capacity reservation applicatble for instanceType") + price := 0.0 + isAvailable := capacityReservation.AvailableInstanceCount > 0 + zone := capacityReservation.AvailabilityZone + capacityType := ec2.UsageClassTypeOnDemand + + subnet, hasSubnet := lo.Find(subnets, func(s v1beta1.Subnet) bool { + return s.Zone == zone + }) + available := isAvailable && instanceTypeZones.Has(zone) && hasSubnet + offering := cloudprovider.Offering{ + Requirements: scheduling.NewRequirements( + scheduling.NewRequirement(corev1beta1.CapacityTypeLabelKey, v1.NodeSelectorOpIn, capacityType), + scheduling.NewRequirement(v1.LabelTopologyZone, v1.NodeSelectorOpIn, zone), + ), + Price: price, + Available: available, + } + if subnet.ZoneID != "" { + offering.Requirements.Add(scheduling.NewRequirement(v1beta1.LabelTopologyZoneID, v1.NodeSelectorOpIn, subnet.ZoneID)) + } + // TODO: tvonhacht + offering.Requirements.Add(scheduling.NewRequirement(v1beta1.LabelCapactiyReservationID, v1.NodeSelectorOpIn, capacityReservation.ID)) + log.FromContext(ctx).WithValues("instanceType", *instanceType.InstanceType, "capacityReservation", capacityReservation.ID, "offering", offering).V(0).Info("offering for capacity reservation and instanceType") + offerings = append(offerings, offering) + instanceTypeOfferingAvailable.With(prometheus.Labels{ + instanceTypeLabel: fmt.Sprintf("%s-%s", *instanceType.InstanceType, "capacity-reservation"), + capacityTypeLabel: capacityType, + zoneLabel: zone, + }).Set(float64(lo.Ternary(available, 1, 0))) + instanceTypeOfferingPriceEstimate.With(prometheus.Labels{ + instanceTypeLabel: fmt.Sprintf("%s-%s", *instanceType.InstanceType, "capacity-reservation"), + capacityTypeLabel: capacityType, + zoneLabel: zone, + }).Set(price) + } + + log.FromContext(ctx).WithValues("offerings", offerings, "capacityReservations", capacityReservations).V(0).Info("createOfferings") return offerings } diff --git a/pkg/providers/instancetype/metrics.go b/pkg/providers/instancetype/metrics.go index 5b505e8f813a..476d57a35418 100644 --- a/pkg/providers/instancetype/metrics.go +++ b/pkg/providers/instancetype/metrics.go @@ -64,6 +64,19 @@ var ( zoneLabel, }, ) + instanceTypeOfferingAvailableCapacityReservation = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: metrics.Namespace, + Subsystem: cloudProviderSubsystem, + Name: "instance_type_offering_available_capacity_reservation", + Help: "Instance type offering availability, based on capacity reservation, instance type, capacity type, and zone", + }, + []string{ + instanceTypeLabel, + capacityTypeLabel, + zoneLabel, + }, + ) instanceTypeOfferingPriceEstimate = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: metrics.Namespace, diff --git a/pkg/providers/launchtemplate/launchtemplate.go b/pkg/providers/launchtemplate/launchtemplate.go index b5a408df0d19..f537b99f954b 100644 --- a/pkg/providers/launchtemplate/launchtemplate.go +++ b/pkg/providers/launchtemplate/launchtemplate.go @@ -58,43 +58,58 @@ type Provider interface { DeleteAll(context.Context, *v1beta1.EC2NodeClass) error InvalidateCache(context.Context, string, string) ResolveClusterCIDR(context.Context) error + GetCapacityReservationID(launchTemplateName string) *string } type LaunchTemplate struct { - Name string - InstanceTypes []*cloudprovider.InstanceType - ImageID string + Name string + InstanceTypes []*cloudprovider.InstanceType + ImageID string + CapacityReservationID *string } type DefaultProvider struct { sync.Mutex - ec2api ec2iface.EC2API - eksapi eksiface.EKSAPI - amiFamily *amifamily.Resolver - securityGroupProvider securitygroup.Provider - subnetProvider subnet.Provider - cache *cache.Cache - cm *pretty.ChangeMonitor - KubeDNSIP net.IP - CABundle *string - ClusterEndpoint string - ClusterCIDR atomic.Pointer[string] + ec2api ec2iface.EC2API + eksapi eksiface.EKSAPI + amiFamily *amifamily.Resolver + securityGroupProvider securitygroup.Provider + subnetProvider subnet.Provider + cache *cache.Cache + capacityReservationIDs *cache.Cache + cm *pretty.ChangeMonitor + KubeDNSIP net.IP + CABundle *string + ClusterEndpoint string + ClusterCIDR atomic.Pointer[string] } -func NewDefaultProvider(ctx context.Context, cache *cache.Cache, ec2api ec2iface.EC2API, eksapi eksiface.EKSAPI, amiFamily *amifamily.Resolver, - securityGroupProvider securitygroup.Provider, subnetProvider subnet.Provider, - caBundle *string, startAsync <-chan struct{}, kubeDNSIP net.IP, clusterEndpoint string) *DefaultProvider { +func NewDefaultProvider( + ctx context.Context, + cache *cache.Cache, + capacityReservationIDs *cache.Cache, + ec2api ec2iface.EC2API, + eksapi eksiface.EKSAPI, + amiFamily *amifamily.Resolver, + securityGroupProvider securitygroup.Provider, + subnetProvider subnet.Provider, + caBundle *string, + startAsync <-chan struct{}, + kubeDNSIP net.IP, + clusterEndpoint string, +) *DefaultProvider { l := &DefaultProvider{ - ec2api: ec2api, - eksapi: eksapi, - amiFamily: amiFamily, - securityGroupProvider: securityGroupProvider, - subnetProvider: subnetProvider, - cache: cache, - CABundle: caBundle, - cm: pretty.NewChangeMonitor(), - KubeDNSIP: kubeDNSIP, - ClusterEndpoint: clusterEndpoint, + ec2api: ec2api, + eksapi: eksapi, + amiFamily: amiFamily, + securityGroupProvider: securityGroupProvider, + subnetProvider: subnetProvider, + cache: cache, + capacityReservationIDs: capacityReservationIDs, + CABundle: caBundle, + cm: pretty.NewChangeMonitor(), + KubeDNSIP: kubeDNSIP, + ClusterEndpoint: clusterEndpoint, } l.cache.OnEvicted(l.cachedEvictedFunc(ctx)) go func() { @@ -119,6 +134,8 @@ func (p *DefaultProvider) EnsureAll(ctx context.Context, nodeClass *v1beta1.EC2N if err != nil { return nil, err } + + log.FromContext(ctx).WithValues("instanceTypes", instanceTypes).V(0).Info("Ensure All") resolvedLaunchTemplates, err := p.amiFamily.Resolve(nodeClass, nodeClaim, instanceTypes, capacityType, options) if err != nil { return nil, err @@ -130,7 +147,15 @@ func (p *DefaultProvider) EnsureAll(ctx context.Context, nodeClass *v1beta1.EC2N if err != nil { return nil, err } - launchTemplates = append(launchTemplates, &LaunchTemplate{Name: *ec2LaunchTemplate.LaunchTemplateName, InstanceTypes: resolvedLaunchTemplate.InstanceTypes, ImageID: resolvedLaunchTemplate.AMIID}) + launchTemplates = append( + launchTemplates, + &LaunchTemplate{ + Name: *ec2LaunchTemplate.LaunchTemplateName, + InstanceTypes: resolvedLaunchTemplate.InstanceTypes, + ImageID: resolvedLaunchTemplate.AMIID, + CapacityReservationID: resolvedLaunchTemplate.CapacityReservationID, + }, + ) } return launchTemplates, nil } @@ -186,6 +211,7 @@ func (p *DefaultProvider) createAMIOptions(ctx context.Context, nodeClass *v1bet KubeDNSIP: p.KubeDNSIP, AssociatePublicIPAddress: nodeClass.Spec.AssociatePublicIPAddress, NodeClassName: nodeClass.Name, + // tvonhacht: might wanna add CapacityReservations here }, nil } @@ -235,6 +261,7 @@ func (p *DefaultProvider) createLaunchTemplate(ctx context.Context, options *ami launchTemplateDataTags = append(launchTemplateDataTags, &ec2.LaunchTemplateTagSpecificationRequest{ResourceType: aws.String(ec2.ResourceTypeSpotInstancesRequest), Tags: utils.MergeTags(options.Tags)}) } networkInterfaces := p.generateNetworkInterfaces(options) + capacityReservationSpecification := p.generateCapacityReservationSpecification(options) output, err := p.ec2api.CreateLaunchTemplateWithContext(ctx, &ec2.CreateLaunchTemplateInput{ LaunchTemplateName: aws.String(LaunchTemplateName(options)), LaunchTemplateData: &ec2.RequestLaunchTemplateData{ @@ -255,8 +282,9 @@ func (p *DefaultProvider) createLaunchTemplate(ctx context.Context, options *ami HttpPutResponseHopLimit: options.MetadataOptions.HTTPPutResponseHopLimit, HttpTokens: options.MetadataOptions.HTTPTokens, }, - NetworkInterfaces: networkInterfaces, - TagSpecifications: launchTemplateDataTags, + NetworkInterfaces: networkInterfaces, + TagSpecifications: launchTemplateDataTags, + CapacityReservationSpecification: capacityReservationSpecification, }, TagSpecifications: []*ec2.TagSpecification{ { @@ -269,6 +297,7 @@ func (p *DefaultProvider) createLaunchTemplate(ctx context.Context, options *ami return nil, err } log.FromContext(ctx).WithValues("id", aws.StringValue(output.LaunchTemplate.LaunchTemplateId)).V(1).Info("created launch template") + p.capacityReservationIDs.SetDefault(lo.FromPtr(output.LaunchTemplate.LaunchTemplateName), lo.FromPtr(options.CapacityReservationID)) return output.LaunchTemplate, nil } @@ -301,6 +330,18 @@ func (p *DefaultProvider) generateNetworkInterfaces(options *amifamily.LaunchTem return nil } +func (p *DefaultProvider) generateCapacityReservationSpecification(options *amifamily.LaunchTemplate) *ec2.LaunchTemplateCapacityReservationSpecificationRequest { + if options.CapacityReservationID == nil { + return nil + } + + return &ec2.LaunchTemplateCapacityReservationSpecificationRequest{ + CapacityReservationTarget: &ec2.CapacityReservationTarget{ + CapacityReservationId: options.CapacityReservationID, + }, + } +} + func (p *DefaultProvider) blockDeviceMappings(blockDeviceMappings []*v1beta1.BlockDeviceMapping) []*ec2.LaunchTemplateBlockDeviceMappingRequest { if len(blockDeviceMappings) == 0 { // The EC2 API fails with empty slices and expects nil. @@ -438,3 +479,12 @@ func (p *DefaultProvider) ResolveClusterCIDR(ctx context.Context) error { } return fmt.Errorf("no CIDR found in DescribeCluster response") } + +func (p *DefaultProvider) GetCapacityReservationID(launchTemplateName string) *string { + capacityReservationID, ok := p.capacityReservationIDs.Get(launchTemplateName) + if !ok { + return nil + } + + return lo.ToPtr(capacityReservationID.(string)) +} diff --git a/pkg/providers/launchtemplate/suite_test.go b/pkg/providers/launchtemplate/suite_test.go index e7c7608bf17d..c7164b66f323 100644 --- a/pkg/providers/launchtemplate/suite_test.go +++ b/pkg/providers/launchtemplate/suite_test.go @@ -1959,7 +1959,7 @@ var _ = Describe("LaunchTemplate Provider", func() { }}) nodeClass.Spec.AMISelectorTerms = []v1beta1.AMISelectorTerm{{Tags: map[string]string{"*": "*"}}} ExpectApplied(ctx, env.Client, nodeClass) - controller := status.NewController(env.Client, awsEnv.SubnetProvider, awsEnv.SecurityGroupProvider, awsEnv.AMIProvider, awsEnv.InstanceProfileProvider, awsEnv.LaunchTemplateProvider) + controller := status.NewController(env.Client, awsEnv.SubnetProvider, awsEnv.SecurityGroupProvider, awsEnv.AMIProvider, awsEnv.InstanceProfileProvider, awsEnv.LaunchTemplateProvider, awsEnv.CapacityReservationProvider) ExpectObjectReconciled(ctx, env.Client, controller, nodeClass) nodePool.Spec.Template.Spec.Requirements = []corev1beta1.NodeSelectorRequirementWithMinValues{ { diff --git a/pkg/test/environment.go b/pkg/test/environment.go index afef49ebfac5..f420c652dea0 100644 --- a/pkg/test/environment.go +++ b/pkg/test/environment.go @@ -29,6 +29,7 @@ import ( awscache "github.com/aws/karpenter-provider-aws/pkg/cache" "github.com/aws/karpenter-provider-aws/pkg/fake" "github.com/aws/karpenter-provider-aws/pkg/providers/amifamily" + "github.com/aws/karpenter-provider-aws/pkg/providers/capacityreservation" "github.com/aws/karpenter-provider-aws/pkg/providers/instance" "github.com/aws/karpenter-provider-aws/pkg/providers/instanceprofile" "github.com/aws/karpenter-provider-aws/pkg/providers/instancetype" @@ -67,18 +68,20 @@ type Environment struct { AssociatePublicIPAddressCache *cache.Cache SecurityGroupCache *cache.Cache InstanceProfileCache *cache.Cache + CapacityReservationCache *cache.Cache // Providers - InstanceTypesProvider *instancetype.DefaultProvider - InstanceProvider *instance.DefaultProvider - SubnetProvider *subnet.DefaultProvider - SecurityGroupProvider *securitygroup.DefaultProvider - InstanceProfileProvider *instanceprofile.DefaultProvider - PricingProvider *pricing.DefaultProvider - AMIProvider *amifamily.DefaultProvider - AMIResolver *amifamily.Resolver - VersionProvider *version.DefaultProvider - LaunchTemplateProvider *launchtemplate.DefaultProvider + InstanceTypesProvider *instancetype.DefaultProvider + InstanceProvider *instance.DefaultProvider + SubnetProvider *subnet.DefaultProvider + SecurityGroupProvider *securitygroup.DefaultProvider + InstanceProfileProvider *instanceprofile.DefaultProvider + PricingProvider *pricing.DefaultProvider + AMIProvider *amifamily.DefaultProvider + AMIResolver *amifamily.Resolver + VersionProvider *version.DefaultProvider + LaunchTemplateProvider *launchtemplate.DefaultProvider + CapacityReservationProvider *capacityreservation.DefaultProvider } func NewEnvironment(ctx context.Context, env *coretest.Environment) *Environment { @@ -94,11 +97,13 @@ func NewEnvironment(ctx context.Context, env *coretest.Environment) *Environment instanceTypeCache := cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval) unavailableOfferingsCache := awscache.NewUnavailableOfferings() launchTemplateCache := cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval) + capacityReservationIDsCache := cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval) subnetCache := cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval) availableIPAdressCache := cache.New(awscache.AvailableIPAddressTTL, awscache.DefaultCleanupInterval) associatePublicIPAddressCache := cache.New(awscache.AssociatePublicIPAddressTTL, awscache.DefaultCleanupInterval) securityGroupCache := cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval) instanceProfileCache := cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval) + capacityReservationCache := cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval) fakePricingAPI := &fake.PricingAPI{} // Providers @@ -114,6 +119,7 @@ func NewEnvironment(ctx context.Context, env *coretest.Environment) *Environment launchtemplate.NewDefaultProvider( ctx, launchTemplateCache, + capacityReservationIDsCache, ec2api, eksapi, amiResolver, @@ -133,6 +139,7 @@ func NewEnvironment(ctx context.Context, env *coretest.Environment) *Environment subnetProvider, launchTemplateProvider, ) + capacityReservationProvider := capacityreservation.NewDefaultProvider(ec2api, capacityReservationCache) return &Environment{ EC2API: ec2api, @@ -150,17 +157,19 @@ func NewEnvironment(ctx context.Context, env *coretest.Environment) *Environment SecurityGroupCache: securityGroupCache, InstanceProfileCache: instanceProfileCache, UnavailableOfferingsCache: unavailableOfferingsCache, - - InstanceTypesProvider: instanceTypesProvider, - InstanceProvider: instanceProvider, - SubnetProvider: subnetProvider, - SecurityGroupProvider: securityGroupProvider, - LaunchTemplateProvider: launchTemplateProvider, - InstanceProfileProvider: instanceProfileProvider, - PricingProvider: pricingProvider, - AMIProvider: amiProvider, - AMIResolver: amiResolver, - VersionProvider: versionProvider, + CapacityReservationCache: capacityReservationCache, + + InstanceTypesProvider: instanceTypesProvider, + InstanceProvider: instanceProvider, + SubnetProvider: subnetProvider, + SecurityGroupProvider: securityGroupProvider, + LaunchTemplateProvider: launchTemplateProvider, + InstanceProfileProvider: instanceProfileProvider, + PricingProvider: pricingProvider, + AMIProvider: amiProvider, + AMIResolver: amiResolver, + VersionProvider: versionProvider, + CapacityReservationProvider: capacityReservationProvider, } } diff --git a/website/content/en/preview/getting-started/migrating-from-cas/scripts/step04-controller-iam.sh b/website/content/en/preview/getting-started/migrating-from-cas/scripts/step04-controller-iam.sh index cc3d7f929986..31af18ff9feb 100644 --- a/website/content/en/preview/getting-started/migrating-from-cas/scripts/step04-controller-iam.sh +++ b/website/content/en/preview/getting-started/migrating-from-cas/scripts/step04-controller-iam.sh @@ -71,6 +71,17 @@ cat << EOF > controller-policy.json "Resource": "arn:${AWS_PARTITION}:eks:${AWS_REGION}:${AWS_ACCOUNT_ID}:cluster/${CLUSTER_NAME}", "Sid": "EKSClusterEndpointLookup" }, + { + "Effect": "Allow", + "Action": "ec2:DescribeCapacityReservations", + "Resource": "*", + "Condition": { + "StringEquals": { + "ec2:Region": "${AWS_REGION}" + } + } + "Sid": "AllowReadCapacityReservations" + }, { "Sid": "AllowScopedInstanceProfileCreationActions", "Effect": "Allow",