Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add carbon-efficient option #6322

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions charts/karpenter/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -208,3 +208,7 @@ settings:
# -- spotToSpotConsolidation is ALPHA and is disabled by default.
# Setting this to true will enable spot replacement consolidation for both single and multi-node consolidation.
spotToSpotConsolidation: false
# -- carbonEfficient is ALPHA and is disabled by default.
# Setting this to true will enable carbon efficient provisioning which will attempt to provision instances in the most
# carbon efficient way possible.
carbonEfficient: false
23 changes: 17 additions & 6 deletions pkg/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
awscache "github.com/aws/karpenter-provider-aws/pkg/cache"
"github.com/aws/karpenter-provider-aws/pkg/operator/options"
"github.com/aws/karpenter-provider-aws/pkg/providers/amifamily"
"github.com/aws/karpenter-provider-aws/pkg/providers/carbon"
"github.com/aws/karpenter-provider-aws/pkg/providers/instance"
"github.com/aws/karpenter-provider-aws/pkg/providers/instanceprofile"
"github.com/aws/karpenter-provider-aws/pkg/providers/instancetype"
Expand Down Expand Up @@ -137,12 +138,22 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont
subnetProvider := subnet.NewDefaultProvider(ec2api, cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval), cache.New(awscache.AvailableIPAddressTTL, awscache.DefaultCleanupInterval), cache.New(awscache.AssociatePublicIPAddressTTL, awscache.DefaultCleanupInterval))
securityGroupProvider := securitygroup.NewDefaultProvider(ec2api, cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval))
instanceProfileProvider := instanceprofile.NewDefaultProvider(*sess.Config.Region, iam.New(sess), cache.New(awscache.InstanceProfileTTL, awscache.DefaultCleanupInterval))
pricingProvider := pricing.NewDefaultProvider(
ctx,
pricing.NewAPI(sess, *sess.Config.Region),
ec2api,
*sess.Config.Region,
)
var pricingProvider pricing.Provider
if options.FromContext(ctx).CarbonEfficient {
pricingProvider = carbon.NewProvider(
ctx,
pricing.NewAPI(sess, *sess.Config.Region),
ec2api,
*sess.Config.Region,
)
} else {
pricingProvider = pricing.NewDefaultProvider(
ctx,
pricing.NewAPI(sess, *sess.Config.Region),
ec2api,
*sess.Config.Region,
)
}
versionProvider := version.NewDefaultProvider(operator.KubernetesInterface, cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval))
amiProvider := amifamily.NewDefaultProvider(versionProvider, ssm.New(sess), ec2api, cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval))
amiResolver := amifamily.NewResolver(amiProvider)
Expand Down
2 changes: 2 additions & 0 deletions pkg/operator/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type Options struct {
VMMemoryOverheadPercent float64
InterruptionQueue string
ReservedENIs int
CarbonEfficient bool
}

func (o *Options) AddFlags(fs *coreoptions.FlagSet) {
Expand All @@ -54,6 +55,7 @@ func (o *Options) AddFlags(fs *coreoptions.FlagSet) {
fs.Float64Var(&o.VMMemoryOverheadPercent, "vm-memory-overhead-percent", env.WithDefaultFloat64("VM_MEMORY_OVERHEAD_PERCENT", 0.075), "The VM memory overhead as a percent that will be subtracted from the total memory for all instance types.")
fs.StringVar(&o.InterruptionQueue, "interruption-queue", env.WithDefaultString("INTERRUPTION_QUEUE", ""), "Interruption queue is the name of the SQS queue used for processing interruption events from EC2. Interruption handling is disabled if not specified. Enabling interruption handling may require additional permissions on the controller service account. Additional permissions are outlined in the docs.")
fs.IntVar(&o.ReservedENIs, "reserved-enis", env.WithDefaultInt("RESERVED_ENIS", 0), "Reserved ENIs are not included in the calculations for max-pods or kube-reserved. This is most often used in the VPC CNI custom networking setup https://docs.aws.amazon.com/eks/latest/userguide/cni-custom-network.html.")
fs.BoolVarWithEnv(&o.CarbonEfficient, "carbon-efficient", "CARBON_EFFICIENT", false, "If true, optimize for carbon efficiency by selecting instance types with the lowest carbon footprint. This will result in a more environmentally sustainable cluster.")
}

func (o *Options) Parse(fs *coreoptions.FlagSet, args ...string) error {
Expand Down
177 changes: 177 additions & 0 deletions pkg/providers/carbon/carbon.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package carbon

import (
"context"
"fmt"
"net/http"
"strings"
"sync"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
"github.com/aws/aws-sdk-go/service/pricing"
"github.com/aws/aws-sdk-go/service/pricing/pricingiface"
"github.com/samber/lo"

"sigs.k8s.io/karpenter/pkg/utils/pretty"
)

// CarbonProvider provides actual pricing data to the AWS cloud provider to allow it to make more informed decisions
// regarding which instances to launch. This is initialized at startup with a periodically updated static price list to
// support running in locations where pricing data is unavailable. In those cases the static pricing data provides a
// relative ordering that is still more accurate than our previous pricing model. In the event that a pricing update
// fails, the previous pricing information is retained and used which may be the static initial pricing data if pricing
// updates never succeed.
type CarbonProvider struct {
ec2 ec2iface.EC2API
pricing pricingiface.PricingAPI
region string
cm *pretty.ChangeMonitor

mu sync.RWMutex
onDemandUpdateTime time.Time
onDemandPrices map[string]float64
spotUpdateTime time.Time
spotPrices map[string]zonal
}

// zonalPricing is used to capture the per-zone price
// for spot data as well as the default price
// based on on-demand price when the provisioningController first
// comes up
type zonal struct {
defaultPrice float64 // Used until we get the spot pricing data
prices map[string]float64
}

type Err struct {
error
}

func newZonalPricing(defaultPrice float64) zonal {
z := zonal{
prices: map[string]float64{},
}
z.defaultPrice = defaultPrice
return z
}

// NewPricingAPI returns a pricing API configured based on a particular region
func NewAPI(sess *session.Session, region string) pricingiface.PricingAPI {
if sess == nil {
return nil
}
// pricing API doesn't have an endpoint in all regions
pricingAPIRegion := "us-east-1"
if strings.HasPrefix(region, "ap-") {
pricingAPIRegion = "ap-south-1"
} else if strings.HasPrefix(region, "cn-") {
pricingAPIRegion = "cn-northwest-1"
} else if strings.HasPrefix(region, "eu-") {
pricingAPIRegion = "eu-central-1"
}
return pricing.New(sess, &aws.Config{Region: aws.String(pricingAPIRegion)})
}

func NewProvider(_ context.Context, pricing pricingiface.PricingAPI, ec2Api ec2iface.EC2API, region string) *CarbonProvider {
p := &CarbonProvider{
region: region,
ec2: ec2Api,
pricing: pricing,
cm: pretty.NewChangeMonitor(),
}
// sets the pricing data from the static default state for the provider
p.Reset()

return p
}

// InstanceTypes returns the list of all instance types for which either a spot or on-demand price is known.
func (p *CarbonProvider) InstanceTypes() []string {
p.mu.RLock()
defer p.mu.RUnlock()
return lo.Union(lo.Keys(p.onDemandPrices), lo.Keys(p.spotPrices))
}

// OnDemandPrice returns the last known on-demand price for a given instance type, returning an error if there is no
// known on-demand pricing for the instance type.
func (p *CarbonProvider) OnDemandPrice(instanceType string) (float64, bool) {
p.mu.RLock()
defer p.mu.RUnlock()
price, ok := p.onDemandPrices[instanceType]
if !ok {
return 0.0, false
}
return price, true
}

// SpotPrice returns the last known spot price for a given instance type and zone, returning an error
// if there is no known spot pricing for that instance type or zone
func (p *CarbonProvider) SpotPrice(instanceType string, zone string) (float64, bool) {
p.mu.RLock()
defer p.mu.RUnlock()
if val, ok := p.spotPrices[instanceType]; ok {
if p.spotUpdateTime.Equal(initialPriceUpdate) {
return val.defaultPrice, true
}
if price, ok := p.spotPrices[instanceType].prices[zone]; ok {
return price, true
}
return 0.0, false
}
return 0.0, false
}

func (p *CarbonProvider) UpdateOnDemandPricing(ctx context.Context) error {
return nil
}

func (p *CarbonProvider) UpdateSpotPricing(ctx context.Context) error {
return nil
}

func (p *CarbonProvider) LivenessProbe(_ *http.Request) error {
// ensure we don't deadlock and nolint for the empty critical section
p.mu.Lock()
//nolint: staticcheck
p.mu.Unlock()
return nil
}

func populateInitialSpotPricing(pricing map[string]float64) map[string]zonal {
m := map[string]zonal{}
for it, price := range pricing {
m[it] = newZonalPricing(price)
}
return m
}

func (p *CarbonProvider) Reset() {
staticPricing, ok := carbonImpacts[p.region]
if !ok {
fallbackRegion := "eu-west-1"
fmt.Printf("Carbon Pricing Provider: No carbon impacts for %s, using fallback region: %s\n", p.region, fallbackRegion)
staticPricing = carbonImpacts[fallbackRegion]
}

p.onDemandPrices = *staticPricing
p.spotPrices = populateInitialSpotPricing(*staticPricing)
p.onDemandUpdateTime = initialPriceUpdate
p.spotUpdateTime = initialPriceUpdate
}
Loading