Skip to content

Commit

Permalink
Modify member functions to get allowed disruptions by reason
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathan-innis authored and engedaam committed Aug 27, 2024
1 parent 52fe9e7 commit b21cb0a
Show file tree
Hide file tree
Showing 14 changed files with 116 additions and 154 deletions.
35 changes: 8 additions & 27 deletions pkg/apis/v1/nodepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package v1

import (
"context"
"fmt"
"math"
"sort"
Expand Down Expand Up @@ -134,7 +133,6 @@ const (
type DisruptionReason string

const (
DisruptionReasonAll DisruptionReason = "All"
DisruptionReasonUnderutilized DisruptionReason = "Underutilized"
DisruptionReasonEmpty DisruptionReason = "Empty"
DisruptionReasonDrifted DisruptionReason = "Drifted"
Expand Down Expand Up @@ -311,45 +309,28 @@ func (nl *NodePoolList) OrderByWeight() {
// MustGetAllowedDisruptions calls GetAllowedDisruptionsByReason if the error is not nil. This reduces the
// amount of state that the disruption controller must reconcile, while allowing the GetAllowedDisruptionsByReason()
// to bubble up any errors in validation.
func (in *NodePool) MustGetAllowedDisruptions(ctx context.Context, c clock.Clock, numNodes int) map[DisruptionReason]int {
allowedDisruptions, err := in.GetAllowedDisruptionsByReason(c, numNodes)
func (in *NodePool) MustGetAllowedDisruptions(c clock.Clock, numNodes int, reason DisruptionReason) int {
allowedDisruptions, err := in.GetAllowedDisruptionsByReason(c, numNodes, reason)
if err != nil {
return map[DisruptionReason]int{}
return 0
}
return allowedDisruptions
}

// GetAllowedDisruptionsByReason returns the minimum allowed disruptions across all disruption budgets, for all disruption methods for a given nodepool
func (in *NodePool) GetAllowedDisruptionsByReason(c clock.Clock, numNodes int) (map[DisruptionReason]int, error) {
func (in *NodePool) GetAllowedDisruptionsByReason(c clock.Clock, numNodes int, reason DisruptionReason) (int, error) {
allowedNodes := math.MaxInt32
var multiErr error
allowedDisruptions := map[DisruptionReason]int{}
allowedDisruptions[DisruptionReasonAll] = math.MaxInt32

for _, budget := range in.Spec.Disruption.Budgets {
val, err := budget.GetAllowedDisruptions(c, numNodes)
if err != nil {
multiErr = multierr.Append(multiErr, err)
}

if budget.Reasons == nil {
allowedDisruptions[DisruptionReasonAll] = lo.Min([]int{allowedDisruptions[DisruptionReasonAll], val})
continue
}
for _, reason := range budget.Reasons {
if reasonVal, found := allowedDisruptions[reason]; found {
allowedDisruptions[reason] = lo.Min([]int{reasonVal, val})
continue
}
allowedDisruptions[reason] = val
if budget.Reasons == nil || lo.Contains(budget.Reasons, reason) {
allowedNodes = lo.Min([]int{allowedNodes, val})
}
}

// All the node count for a specific reason needs to be less or equal then disruption reason shared for all disruption
for _, reason := range lo.Keys(allowedDisruptions) {
allowedDisruptions[reason] = lo.Min([]int{allowedDisruptions[reason], allowedDisruptions[DisruptionReasonAll]})
}

return allowedDisruptions, multiErr
return allowedNodes, multiErr
}

// GetAllowedDisruptions returns an intstr.IntOrString that can be used a comparison
Expand Down
58 changes: 35 additions & 23 deletions pkg/apis/v1/nodepool_budgets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ var _ = Describe("Budgets", func() {
var nodePool *NodePool
var budgets []Budget
var fakeClock *clock.FakeClock
var allKnownDisruptionReasons []DisruptionReason

BeforeEach(func() {
// Set the time to the middle of the year of 2000, the best year ever
Expand Down Expand Up @@ -92,53 +93,60 @@ var _ = Describe("Budgets", func() {
},
},
}
allKnownDisruptionReasons = append([]DisruptionReason{
DisruptionReasonEmpty,
DisruptionReasonUnderutilized,
DisruptionReasonDrifted},
DisruptionReason("CloudProviderDisruptionReason"),
)
})

Context("GetAllowedDisruptionsByReason", func() {
It("should return 0 for all reasons if a budget is active for all reasons", func() {
budgets[5].Schedule = lo.ToPtr("* * * * *")
budgets[5].Duration = lo.ToPtr(metav1.Duration{Duration: lo.Must(time.ParseDuration("1h"))})

disruptionsByReason, err := nodePool.GetAllowedDisruptionsByReason(fakeClock, 100)
Expect(err).To(BeNil())
Expect(disruptionsByReason[DisruptionReasonUnderutilized]).To(Equal(0))
Expect(disruptionsByReason[DisruptionReasonDrifted]).To(Equal(0))
Expect(disruptionsByReason[DisruptionReasonEmpty]).To(Equal(0))
Expect(disruptionsByReason["CloudProviderDisruptionReason"]).To(Equal(0))
for _, reason := range allKnownDisruptionReasons {
allowedDisruption, err := nodePool.GetAllowedDisruptionsByReason(fakeClock, 100, reason)
Expect(err).To(BeNil())
Expect(allowedDisruption).To(Equal(0))
}
})

It("should return MaxInt32 for all reasons when there are no active budgets", func() {
for i := range budgets {
budgets[i].Schedule = lo.ToPtr("@yearly")
budgets[i].Duration = lo.ToPtr(metav1.Duration{Duration: lo.Must(time.ParseDuration("1h"))})
}
disruptionsByReason, err := nodePool.GetAllowedDisruptionsByReason(fakeClock, 100)
Expect(err).To(BeNil())

// All budgets should have unbounded disruptions when inactive
for _, disruptions := range disruptionsByReason {
Expect(disruptions).To(Equal(math.MaxInt32))
for _, reason := range allKnownDisruptionReasons {
allowedDisruption, err := nodePool.GetAllowedDisruptionsByReason(fakeClock, 100, reason)
Expect(err).To(BeNil())
Expect(allowedDisruption).To(Equal(math.MaxInt32))
}
})

It("should ignore reason-defined budgets when inactive", func() {
budgets[3].Schedule = lo.ToPtr("@yearly")
budgets[4].Schedule = lo.ToPtr("@yearly")
disruptionsByReason, err := nodePool.GetAllowedDisruptionsByReason(fakeClock, 100)
Expect(err).To(BeNil())
for _, disruptions := range disruptionsByReason {
Expect(disruptions).To(Equal(10))

for _, reason := range allKnownDisruptionReasons {
allowedDisruption, err := nodePool.GetAllowedDisruptionsByReason(fakeClock, 100, reason)
Expect(err).To(BeNil())
Expect(allowedDisruption).To(Equal(10))
}
})

It("should return the budget for all disruption reasons when undefined", func() {
nodePool.Spec.Disruption.Budgets = budgets[:1]
Expect(len(nodePool.Spec.Disruption.Budgets)).To(Equal(1))
disruptionsByReason, err := nodePool.GetAllowedDisruptionsByReason(fakeClock, 100)
Expect(err).To(BeNil())
Expect(len(budgets[0].Reasons)).To(Equal(0))
for _, disruptions := range disruptionsByReason {
Expect(disruptions).To(Equal(10))

for _, reason := range allKnownDisruptionReasons {
allowedDisruption, err := nodePool.GetAllowedDisruptionsByReason(fakeClock, 100, reason)
Expect(err).To(BeNil())
Expect(allowedDisruption).To(Equal(10))
}
})

Expand All @@ -155,13 +163,17 @@ var _ = Describe("Budgets", func() {
},
},
}...)
disruptionsByReason, err := nodePool.GetAllowedDisruptionsByReason(fakeClock, 100)
Expect(err).To(BeNil())

Expect(disruptionsByReason[DisruptionReasonEmpty]).To(Equal(4))
Expect(disruptionsByReason[DisruptionReasonDrifted]).To(Equal(5))
emptyAllowedDisruption, err := nodePool.GetAllowedDisruptionsByReason(fakeClock, 100, DisruptionReasonEmpty)
Expect(err).To(BeNil())
Expect(emptyAllowedDisruption).To(Equal(4))
driftedAllowedDisruption, err := nodePool.GetAllowedDisruptionsByReason(fakeClock, 100, DisruptionReasonDrifted)
Expect(err).To(BeNil())
Expect(driftedAllowedDisruption).To(Equal(5))
// The budget where reason == nil overrides the budget with a specified reason
Expect(disruptionsByReason[DisruptionReasonUnderutilized]).To(Equal(10))
underutilizedAllowedDisruption, err := nodePool.GetAllowedDisruptionsByReason(fakeClock, 100, DisruptionReasonUnderutilized)
Expect(err).To(BeNil())
Expect(underutilizedAllowedDisruption).To(Equal(10))
})

})
Expand Down
12 changes: 6 additions & 6 deletions pkg/controllers/disruption/consolidation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ var _ = Describe("Consolidation", func() {
ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, nodes, nodeClaims)

emptyConsolidation := disruption.NewEmptiness(disruption.MakeConsolidation(fakeClock, cluster, env.Client, prov, cloudProvider, recorder, queue))
budgets, err := disruption.BuildDisruptionBudgets(ctx, cluster, fakeClock, env.Client, recorder)
budgets, err := disruption.BuildDisruptionBudgetMapping(ctx, cluster, fakeClock, env.Client, recorder, emptyConsolidation.Reason())
Expect(err).To(Succeed())

candidates, err := disruption.GetCandidates(ctx, cluster, env.Client, recorder, fakeClock, cloudProvider, emptyConsolidation.ShouldDisrupt, emptyConsolidation.Class(), queue)
Expand Down Expand Up @@ -641,7 +641,7 @@ var _ = Describe("Consolidation", func() {
ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, nodes, nodeClaims)

emptyConsolidation := disruption.NewEmptiness(disruption.MakeConsolidation(fakeClock, cluster, env.Client, prov, cloudProvider, recorder, queue))
budgets, err := disruption.BuildDisruptionBudgets(ctx, cluster, fakeClock, env.Client, recorder)
budgets, err := disruption.BuildDisruptionBudgetMapping(ctx, cluster, fakeClock, env.Client, recorder, emptyConsolidation.Reason())
Expect(err).To(Succeed())

candidates, err := disruption.GetCandidates(ctx, cluster, env.Client, recorder, fakeClock, cloudProvider, emptyConsolidation.ShouldDisrupt, emptyConsolidation.Class(), queue)
Expand All @@ -665,7 +665,7 @@ var _ = Describe("Consolidation", func() {
ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, nodes, nodeClaims)

multiConsolidation := disruption.NewMultiNodeConsolidation(disruption.MakeConsolidation(fakeClock, cluster, env.Client, prov, cloudProvider, recorder, queue))
budgets, err := disruption.BuildDisruptionBudgets(ctx, cluster, fakeClock, env.Client, recorder)
budgets, err := disruption.BuildDisruptionBudgetMapping(ctx, cluster, fakeClock, env.Client, recorder, multiConsolidation.Reason())
Expect(err).To(Succeed())

candidates, err := disruption.GetCandidates(ctx, cluster, env.Client, recorder, fakeClock, cloudProvider, multiConsolidation.ShouldDisrupt, multiConsolidation.Class(), queue)
Expand Down Expand Up @@ -728,7 +728,7 @@ var _ = Describe("Consolidation", func() {
ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, nodes, nodeClaims)

multiConsolidation := disruption.NewMultiNodeConsolidation(disruption.MakeConsolidation(fakeClock, cluster, env.Client, prov, cloudProvider, recorder, queue))
budgets, err := disruption.BuildDisruptionBudgets(ctx, cluster, fakeClock, env.Client, recorder)
budgets, err := disruption.BuildDisruptionBudgetMapping(ctx, cluster, fakeClock, env.Client, recorder, multiConsolidation.Reason())
Expect(err).To(Succeed())

candidates, err := disruption.GetCandidates(ctx, cluster, env.Client, recorder, fakeClock, cloudProvider, multiConsolidation.ShouldDisrupt, multiConsolidation.Class(), queue)
Expand All @@ -752,7 +752,7 @@ var _ = Describe("Consolidation", func() {
ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, nodes, nodeClaims)

singleConsolidation := disruption.NewSingleNodeConsolidation(disruption.MakeConsolidation(fakeClock, cluster, env.Client, prov, cloudProvider, recorder, queue))
budgets, err := disruption.BuildDisruptionBudgets(ctx, cluster, fakeClock, env.Client, recorder)
budgets, err := disruption.BuildDisruptionBudgetMapping(ctx, cluster, fakeClock, env.Client, recorder, singleConsolidation.Reason())
Expect(err).To(Succeed())

candidates, err := disruption.GetCandidates(ctx, cluster, env.Client, recorder, fakeClock, cloudProvider, singleConsolidation.ShouldDisrupt, singleConsolidation.Class(), queue)
Expand Down Expand Up @@ -815,7 +815,7 @@ var _ = Describe("Consolidation", func() {
ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, nodes, nodeClaims)

singleConsolidation := disruption.NewSingleNodeConsolidation(disruption.MakeConsolidation(fakeClock, cluster, env.Client, prov, cloudProvider, recorder, queue))
budgets, err := disruption.BuildDisruptionBudgets(ctx, cluster, fakeClock, env.Client, recorder)
budgets, err := disruption.BuildDisruptionBudgetMapping(ctx, cluster, fakeClock, env.Client, recorder, singleConsolidation.Reason())
Expect(err).To(Succeed())

candidates, err := disruption.GetCandidates(ctx, cluster, env.Client, recorder, fakeClock, cloudProvider, singleConsolidation.ShouldDisrupt, singleConsolidation.Class(), queue)
Expand Down
31 changes: 9 additions & 22 deletions pkg/controllers/disruption/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,9 @@ import (
"sync"
"time"

disruptionevents "sigs.k8s.io/karpenter/pkg/controllers/disruption/events"

"github.com/awslabs/operatorpkg/singleton"
"github.com/samber/lo"
"go.uber.org/multierr"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/utils/clock"
controllerruntime "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -75,7 +72,8 @@ func NewController(clk clock.Clock, kubeClient client.Client, provisioner *provi

// Generate eventually disruptable reason based on a combination of drift and cloudprovider disruption reason
eventualDisruptionMethods := []Method{}
for _, reason := range append([]v1.DisruptionReason{v1.DisruptionReasonDrifted}, cp.DisruptionReasons()...) {

for _, reason := range append(cp.DisruptionReasons(), v1.DisruptionReasonDrifted) {
eventualDisruptionMethods = append(eventualDisruptionMethods, NewEventualDisruption(kubeClient, cluster, provisioner, recorder, reason))
}

Expand Down Expand Up @@ -118,6 +116,7 @@ func (c *Controller) Reconcile(ctx context.Context) (reconcile.Result, error) {
c.recordRun("disruption-loop")

// Log if there are any budgets that are misconfigured that weren't caught by validation.
// Only validate the first reason, since CEL validation will catch invalid disruption reasons
c.logInvalidBudgets(ctx)

// We need to ensure that our internal cluster state mechanism is synced before we proceed
Expand Down Expand Up @@ -170,25 +169,10 @@ func (c *Controller) disrupt(ctx context.Context, disruption Method) (bool, erro
if len(candidates) == 0 {
return false, nil
}
disruptionBudgetMapping, err := BuildDisruptionBudgets(ctx, c.cluster, c.clock, c.kubeClient, c.recorder)
disruptionBudgetMapping, err := BuildDisruptionBudgetMapping(ctx, c.cluster, c.clock, c.kubeClient, c.recorder, disruption.Reason())
if err != nil {
return false, fmt.Errorf("building disruption budgets, %w", err)
}
// Emit metric for every nodepool
for _, nodePoolName := range lo.Keys(disruptionBudgetMapping) {
allowedDisruption, exists := disruptionBudgetMapping[nodePoolName][disruption.Reason()]
NodePoolAllowedDisruptions.With(map[string]string{
metrics.NodePoolLabel: nodePoolName, metrics.ReasonLabel: string(disruption.Reason()),
}).Set(float64(lo.Ternary(exists, allowedDisruption, disruptionBudgetMapping[nodePoolName][v1.DisruptionReasonAll])))
if allowedDisruption == 0 {
np := &v1.NodePool{}
if err := c.kubeClient.Get(ctx, types.NamespacedName{Name: nodePoolName}, np); err != nil {
return false, fmt.Errorf("getting nodepool, %w", err)
}
c.recorder.Publish(disruptionevents.NodePoolBlockedForDisruptionReason(np, disruption.Reason()))
}
}

// Determine the disruption action
cmd, schedulingResults, err := disruption.ComputeCommand(ctx, disruptionBudgetMapping, candidates...)
if err != nil {
Expand Down Expand Up @@ -301,8 +285,11 @@ func (c *Controller) logInvalidBudgets(ctx context.Context) {
var buf bytes.Buffer
for _, np := range nodePoolList.Items {
// Use a dummy value of 100 since we only care if this errors.
if _, err := np.GetAllowedDisruptionsByReason(c.clock, 100); err != nil {
fmt.Fprintf(&buf, "invalid disruption budgets in nodepool %s, %s", np.Name, err)
for _, method := range c.methods {
if _, err := np.GetAllowedDisruptionsByReason(c.clock, 100, method.Reason()); err != nil {
fmt.Fprintf(&buf, "invalid disruption budgets in nodepool %s, %s", np.Name, err)
break // Prevent duplicate error message
}
}
}
if buf.Len() > 0 {
Expand Down
8 changes: 3 additions & 5 deletions pkg/controllers/disruption/emptiness.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (e *Emptiness) ShouldDisrupt(_ context.Context, c *Candidate) bool {
// ComputeCommand generates a disruption command given candidates
//
//nolint:gocyclo
func (e *Emptiness) ComputeCommand(ctx context.Context, disruptionBudgetMapping map[string]map[v1.DisruptionReason]int, candidates ...*Candidate) (Command, scheduling.Results, error) {
func (e *Emptiness) ComputeCommand(ctx context.Context, disruptionBudgetMapping map[string]int, candidates ...*Candidate) (Command, scheduling.Results, error) {
if e.IsConsolidated() {
return Command{}, scheduling.Results{}, nil
}
Expand All @@ -65,20 +65,18 @@ func (e *Emptiness) ComputeCommand(ctx context.Context, disruptionBudgetMapping
empty := make([]*Candidate, 0, len(candidates))
constrainedByBudgets := false
for _, candidate := range candidates {
_, found := disruptionBudgetMapping[candidate.nodePool.Name][e.Reason()]
reason := lo.Ternary(found, e.Reason(), v1.DisruptionReasonAll)
if len(candidate.reschedulablePods) > 0 {
continue
}
if disruptionBudgetMapping[candidate.nodePool.Name][reason] == 0 {
if disruptionBudgetMapping[candidate.nodePool.Name] == 0 {
// set constrainedByBudgets to true if any node was a candidate but was constrained by a budget
constrainedByBudgets = true
continue
}
// If there's disruptions allowed for the candidate's nodepool,
// add it to the list of candidates, and decrement the budget.
empty = append(empty, candidate)
disruptionBudgetMapping[candidate.nodePool.Name][reason]--
disruptionBudgetMapping[candidate.nodePool.Name]--
}
// none empty, so do nothing
if len(empty) == 0 {
Expand Down
13 changes: 4 additions & 9 deletions pkg/controllers/disruption/eventual.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"errors"
"sort"

"github.com/samber/lo"
"sigs.k8s.io/controller-runtime/pkg/client"

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
Expand Down Expand Up @@ -57,7 +56,7 @@ func (d *EventualDisruption) ShouldDisrupt(ctx context.Context, c *Candidate) bo
}

// ComputeCommand generates a disruption command given candidates
func (d *EventualDisruption) ComputeCommand(ctx context.Context, disruptionBudgetMapping map[string]map[v1.DisruptionReason]int, candidates ...*Candidate) (Command, scheduling.Results, error) {
func (d *EventualDisruption) ComputeCommand(ctx context.Context, disruptionBudgetMapping map[string]int, candidates ...*Candidate) (Command, scheduling.Results, error) {
sort.Slice(candidates, func(i int, j int) bool {
return candidates[i].NodeClaim.StatusConditions().Get(string(d.Reason())).LastTransitionTime.Time.Before(
candidates[j].NodeClaim.StatusConditions().Get(string(d.Reason())).LastTransitionTime.Time)
Expand All @@ -68,16 +67,14 @@ func (d *EventualDisruption) ComputeCommand(ctx context.Context, disruptionBudge
// add it to the existing command.
empty := make([]*Candidate, 0, len(candidates))
for _, candidate := range candidates {
_, found := disruptionBudgetMapping[candidate.nodePool.Name][d.Reason()]
reason := lo.Ternary(found, d.Reason(), v1.DisruptionReasonAll)
if len(candidate.reschedulablePods) > 0 {
continue
}
// If there's disruptions allowed for the candidate's nodepool,
// add it to the list of candidates, and decrement the budget.
if disruptionBudgetMapping[candidate.nodePool.Name][reason] > 0 {
if disruptionBudgetMapping[candidate.nodePool.Name] > 0 {
empty = append(empty, candidate)
disruptionBudgetMapping[candidate.nodePool.Name][reason]--
disruptionBudgetMapping[candidate.nodePool.Name]--
}
}
// Disrupt all empty drifted candidates, as they require no scheduling simulations.
Expand All @@ -88,12 +85,10 @@ func (d *EventualDisruption) ComputeCommand(ctx context.Context, disruptionBudge
}

for _, candidate := range candidates {
_, found := disruptionBudgetMapping[candidate.nodePool.Name][d.Reason()]
reason := lo.Ternary(found, d.Reason(), v1.DisruptionReasonAll)
// If the disruption budget doesn't allow this candidate to be disrupted,
// continue to the next candidate. We don't need to decrement any budget
// counter since drift commands can only have one candidate.
if disruptionBudgetMapping[candidate.nodePool.Name][reason] == 0 {
if disruptionBudgetMapping[candidate.nodePool.Name] == 0 {
continue
}
// Check if we need to create any NodeClaims.
Expand Down
Loading

0 comments on commit b21cb0a

Please sign in to comment.