Skip to content

Commit

Permalink
Add eventing for blocked consolidation on nodes or machines
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathan-innis committed May 2, 2023
1 parent 4890c1c commit d6a0e2e
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 14 deletions.
4 changes: 2 additions & 2 deletions pkg/controllers/deprovisioning/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (c *Controller) Builder(_ context.Context, m manager.Manager) controller.Bu
}

func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconcile.Result, error) {
// this won't catch if the reconcile loop hangs forever but it will catch other issues
// this won't catch if the reconcile loop hangs forever, but it will catch other issues
c.logAbnormalRuns(ctx)
defer c.logAbnormalRuns(ctx)
c.recordRun("deprovisioning-loop")
Expand Down Expand Up @@ -144,7 +144,7 @@ func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconc
}

func (c *Controller) deprovision(ctx context.Context, deprovisioner Deprovisioner) (bool, error) {
candidates, err := GetCandidates(ctx, c.cluster, c.kubeClient, c.clock, c.cloudProvider, deprovisioner.ShouldDeprovision)
candidates, err := GetCandidates(ctx, c.cluster, c.kubeClient, c.recorder, c.clock, c.cloudProvider, deprovisioner.ShouldDeprovision)
if err != nil {
return false, fmt.Errorf("determining candidates, %w", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (c *EmptyMachineConsolidation) ComputeCommand(ctx context.Context, candidat
return Command{}, errors.New("interrupted")
case <-c.clock.After(consolidationTTL):
}
validationCandidates, err := GetCandidates(ctx, c.cluster, c.kubeClient, c.clock, c.cloudProvider, c.ShouldDeprovision)
validationCandidates, err := GetCandidates(ctx, c.cluster, c.kubeClient, c.recorder, c.clock, c.cloudProvider, c.ShouldDeprovision)
if err != nil {
logging.FromContext(ctx).Errorf("computing validation candidates %s", err)
return Command{}, err
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/deprovisioning/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,13 +177,13 @@ func disruptionCost(ctx context.Context, pods []*v1.Pod) float64 {
}

// GetCandidates returns nodes that appear to be currently deprovisionable based off of their provisioner
func GetCandidates(ctx context.Context, cluster *state.Cluster, kubeClient client.Client, clk clock.Clock, cloudProvider cloudprovider.CloudProvider, shouldDeprovision CandidateFilter) ([]*Candidate, error) {
func GetCandidates(ctx context.Context, cluster *state.Cluster, kubeClient client.Client, recorder events.Recorder, clk clock.Clock, cloudProvider cloudprovider.CloudProvider, shouldDeprovision CandidateFilter) ([]*Candidate, error) {
provisionerMap, provisionerToInstanceTypes, err := buildProvisionerMap(ctx, kubeClient, cloudProvider)
if err != nil {
return nil, err
}
candidates := lo.FilterMap(cluster.Nodes(), func(n *state.StateNode, _ int) (*Candidate, bool) {
cn, e := NewCandidate(ctx, kubeClient, clk, n, provisionerMap, provisionerToInstanceTypes)
cn, e := NewCandidate(ctx, kubeClient, recorder, clk, n, provisionerMap, provisionerToInstanceTypes)
return cn, e == nil
})
// Filter only the valid candidates that we should deprovision
Expand Down
14 changes: 10 additions & 4 deletions pkg/controllers/deprovisioning/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"math"
"sort"
"strings"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -49,6 +50,7 @@ import (
"github.com/aws/karpenter-core/pkg/controllers/provisioning"
"github.com/aws/karpenter-core/pkg/controllers/state"
"github.com/aws/karpenter-core/pkg/controllers/state/informer"
"github.com/aws/karpenter-core/pkg/events"
"github.com/aws/karpenter-core/pkg/operator/controller"
"github.com/aws/karpenter-core/pkg/operator/scheme"
"github.com/aws/karpenter-core/pkg/scheduling"
Expand Down Expand Up @@ -1135,10 +1137,14 @@ var _ = Describe("Delete Node", func() {

// Expect Unconsolidatable events to be fired
evts := recorder.Events()
Expect(evts).To(HaveLen(2))

Expect(evts[0].Message).To(ContainSubstring("not all pods would schedule"))
Expect(evts[1].Message).To(ContainSubstring("would schedule against a non-initialized node"))
_, ok := lo.Find(evts, func(e events.Event) bool {
return strings.Contains(e.Message, "not all pods would schedule")
})
Expect(ok).To(BeTrue())
_, ok = lo.Find(evts, func(e events.Event) bool {
return strings.Contains(e.Message, "would schedule against a non-initialized node")
})
Expect(ok).To(BeTrue())
})
})

Expand Down
19 changes: 15 additions & 4 deletions pkg/controllers/deprovisioning/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ import (

"github.com/aws/karpenter-core/pkg/apis/v1alpha5"
"github.com/aws/karpenter-core/pkg/cloudprovider"
deprovisioningevents "github.com/aws/karpenter-core/pkg/controllers/deprovisioning/events"
"github.com/aws/karpenter-core/pkg/controllers/provisioning/scheduling"
"github.com/aws/karpenter-core/pkg/controllers/state"
"github.com/aws/karpenter-core/pkg/events"
)

type Deprovisioner interface {
Expand All @@ -51,16 +53,23 @@ type Candidate struct {
}

//nolint:gocyclo
func NewCandidate(ctx context.Context, kubeClient client.Client, clk clock.Clock, node *state.StateNode,
func NewCandidate(ctx context.Context, kubeClient client.Client, recorder events.Recorder, clk clock.Clock, node *state.StateNode,
provisionerMap map[string]*v1alpha5.Provisioner, provisionerToInstanceTypes map[string]map[string]*cloudprovider.InstanceType) (*Candidate, error) {

if node.Node == nil || node.Machine == nil {
return nil, fmt.Errorf("state node doesn't contain both a node and a machine")
}
// check whether the node has all the labels we need
for _, label := range []string{
v1alpha5.LabelCapacityType,
v1.LabelTopologyZone,
v1alpha5.ProvisionerNameLabelKey,
} {
if _, ok := node.Labels()[label]; !ok {
// This means that we don't own the candidate which means we shouldn't fire an event for it
if label != v1alpha5.ProvisionerNameLabelKey {
recorder.Publish(deprovisioningevents.Blocked(node.Node, node.Machine, fmt.Sprintf("required label %q doesn't exist", label))...)
}
return nil, fmt.Errorf("state node doesn't have required label '%s'", label)
}
}
Expand All @@ -69,30 +78,32 @@ func NewCandidate(ctx context.Context, kubeClient client.Client, clk clock.Clock
instanceTypeMap := provisionerToInstanceTypes[node.Labels()[v1alpha5.ProvisionerNameLabelKey]]
// skip any nodes where we can't determine the provisioner
if provisioner == nil || instanceTypeMap == nil {
recorder.Publish(deprovisioningevents.Blocked(node.Node, node.Machine, fmt.Sprintf("owning provisioner %q not found", provisioner.Name))...)
return nil, fmt.Errorf("provisioner '%s' can't be resolved for state node", node.Labels()[v1alpha5.ProvisionerNameLabelKey])
}
instanceType := instanceTypeMap[node.Labels()[v1.LabelInstanceTypeStable]]
// skip any nodes that we can't determine the instance of
if instanceType == nil {
recorder.Publish(deprovisioningevents.Blocked(node.Node, node.Machine, fmt.Sprintf("instance type %q not found", node.Labels()[v1.LabelInstanceTypeStable]))...)
return nil, fmt.Errorf("instance type '%s' can't be resolved", node.Labels()[v1.LabelInstanceTypeStable])
}

// skip any nodes that are already marked for deletion and being handled
if node.MarkedForDeletion() {
recorder.Publish(deprovisioningevents.Blocked(node.Node, node.Machine, "machine is marked for deletion")...)
return nil, fmt.Errorf("state node is marked for deletion")
}
// skip nodes that aren't initialized
// This also means that the real Node doesn't exist for it
if !node.Initialized() {
recorder.Publish(deprovisioningevents.Blocked(node.Node, node.Machine, "machine is not initialized")...)
return nil, fmt.Errorf("state node isn't initialized")
}
// skip the node if it is nominated by a recent provisioning pass to be the target of a pending pod.
if node.Nominated() {
recorder.Publish(deprovisioningevents.Blocked(node.Node, node.Machine, "machine is nominated")...)
return nil, fmt.Errorf("state node is nominated")
}
if node.Node == nil || node.Machine == nil {
return nil, fmt.Errorf("state node doesn't contain both a node and a machine")
}

pods, err := node.Pods(ctx, kubeClient)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/deprovisioning/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (v *Validation) IsValid(ctx context.Context, cmd Command) (bool, error) {
}
}
if len(v.validationCandidates) == 0 {
v.validationCandidates, err = GetCandidates(ctx, v.cluster, v.kubeClient, v.clock, v.cloudProvider, v.ShouldDeprovision)
v.validationCandidates, err = GetCandidates(ctx, v.cluster, v.kubeClient, v.recorder, v.clock, v.cloudProvider, v.ShouldDeprovision)
if err != nil {
return false, fmt.Errorf("constructing validation candidates, %w", err)
}
Expand Down

0 comments on commit d6a0e2e

Please sign in to comment.