Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Add eventing for blocked consolidation on nodes or machines #303

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/controllers/deprovisioning/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (c *Controller) Builder(_ context.Context, m manager.Manager) controller.Bu
}

func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconcile.Result, error) {
// this won't catch if the reconcile loop hangs forever but it will catch other issues
// this won't catch if the reconcile loop hangs forever, but it will catch other issues
c.logAbnormalRuns(ctx)
defer c.logAbnormalRuns(ctx)
c.recordRun("deprovisioning-loop")
Expand Down Expand Up @@ -144,7 +144,7 @@ func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconc
}

func (c *Controller) deprovision(ctx context.Context, deprovisioner Deprovisioner) (bool, error) {
candidates, err := GetCandidates(ctx, c.cluster, c.kubeClient, c.clock, c.cloudProvider, deprovisioner.ShouldDeprovision)
candidates, err := GetCandidates(ctx, c.cluster, c.kubeClient, c.recorder, c.clock, c.cloudProvider, deprovisioner.ShouldDeprovision)
if err != nil {
return false, fmt.Errorf("determining candidates, %w", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (c *EmptyMachineConsolidation) ComputeCommand(ctx context.Context, candidat
return Command{}, errors.New("interrupted")
case <-c.clock.After(consolidationTTL):
}
validationCandidates, err := GetCandidates(ctx, c.cluster, c.kubeClient, c.clock, c.cloudProvider, c.ShouldDeprovision)
validationCandidates, err := GetCandidates(ctx, c.cluster, c.kubeClient, c.recorder, c.clock, c.cloudProvider, c.ShouldDeprovision)
if err != nil {
logging.FromContext(ctx).Errorf("computing validation candidates %s", err)
return Command{}, err
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/deprovisioning/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,13 +177,13 @@ func disruptionCost(ctx context.Context, pods []*v1.Pod) float64 {
}

// GetCandidates returns nodes that appear to be currently deprovisionable based off of their provisioner
func GetCandidates(ctx context.Context, cluster *state.Cluster, kubeClient client.Client, clk clock.Clock, cloudProvider cloudprovider.CloudProvider, shouldDeprovision CandidateFilter) ([]*Candidate, error) {
func GetCandidates(ctx context.Context, cluster *state.Cluster, kubeClient client.Client, recorder events.Recorder, clk clock.Clock, cloudProvider cloudprovider.CloudProvider, shouldDeprovision CandidateFilter) ([]*Candidate, error) {
provisionerMap, provisionerToInstanceTypes, err := buildProvisionerMap(ctx, kubeClient, cloudProvider)
if err != nil {
return nil, err
}
candidates := lo.FilterMap(cluster.Nodes(), func(n *state.StateNode, _ int) (*Candidate, bool) {
cn, e := NewCandidate(ctx, kubeClient, clk, n, provisionerMap, provisionerToInstanceTypes)
cn, e := NewCandidate(ctx, kubeClient, recorder, clk, n, provisionerMap, provisionerToInstanceTypes)
return cn, e == nil
})
// Filter only the valid candidates that we should deprovision
Expand Down
14 changes: 10 additions & 4 deletions pkg/controllers/deprovisioning/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"math"
"sort"
"strings"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -49,6 +50,7 @@ import (
"github.com/aws/karpenter-core/pkg/controllers/provisioning"
"github.com/aws/karpenter-core/pkg/controllers/state"
"github.com/aws/karpenter-core/pkg/controllers/state/informer"
"github.com/aws/karpenter-core/pkg/events"
"github.com/aws/karpenter-core/pkg/operator/controller"
"github.com/aws/karpenter-core/pkg/operator/scheme"
"github.com/aws/karpenter-core/pkg/scheduling"
Expand Down Expand Up @@ -1135,10 +1137,14 @@ var _ = Describe("Delete Node", func() {

// Expect Unconsolidatable events to be fired
evts := recorder.Events()
Expect(evts).To(HaveLen(2))

Expect(evts[0].Message).To(ContainSubstring("not all pods would schedule"))
Expect(evts[1].Message).To(ContainSubstring("would schedule against a non-initialized node"))
_, ok := lo.Find(evts, func(e events.Event) bool {
return strings.Contains(e.Message, "not all pods would schedule")
})
Expect(ok).To(BeTrue())
_, ok = lo.Find(evts, func(e events.Event) bool {
return strings.Contains(e.Message, "would schedule against a non-initialized node")
})
Expect(ok).To(BeTrue())
})
})

Expand Down
19 changes: 15 additions & 4 deletions pkg/controllers/deprovisioning/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ import (

"github.com/aws/karpenter-core/pkg/apis/v1alpha5"
"github.com/aws/karpenter-core/pkg/cloudprovider"
deprovisioningevents "github.com/aws/karpenter-core/pkg/controllers/deprovisioning/events"
"github.com/aws/karpenter-core/pkg/controllers/provisioning/scheduling"
"github.com/aws/karpenter-core/pkg/controllers/state"
"github.com/aws/karpenter-core/pkg/events"
)

type Deprovisioner interface {
Expand All @@ -51,16 +53,23 @@ type Candidate struct {
}

//nolint:gocyclo
func NewCandidate(ctx context.Context, kubeClient client.Client, clk clock.Clock, node *state.StateNode,
func NewCandidate(ctx context.Context, kubeClient client.Client, recorder events.Recorder, clk clock.Clock, node *state.StateNode,
provisionerMap map[string]*v1alpha5.Provisioner, provisionerToInstanceTypes map[string]map[string]*cloudprovider.InstanceType) (*Candidate, error) {

if node.Node == nil || node.Machine == nil {
return nil, fmt.Errorf("state node doesn't contain both a node and a machine")
}
// check whether the node has all the labels we need
for _, label := range []string{
v1alpha5.LabelCapacityType,
v1.LabelTopologyZone,
v1alpha5.ProvisionerNameLabelKey,
} {
if _, ok := node.Labels()[label]; !ok {
// This means that we don't own the candidate which means we shouldn't fire an event for it
if label != v1alpha5.ProvisionerNameLabelKey {
recorder.Publish(deprovisioningevents.Blocked(node.Node, node.Machine, fmt.Sprintf("required label %q doesn't exist", label))...)
jonathan-innis marked this conversation as resolved.
Show resolved Hide resolved
}
return nil, fmt.Errorf("state node doesn't have required label '%s'", label)
}
}
Expand All @@ -69,30 +78,32 @@ func NewCandidate(ctx context.Context, kubeClient client.Client, clk clock.Clock
instanceTypeMap := provisionerToInstanceTypes[node.Labels()[v1alpha5.ProvisionerNameLabelKey]]
// skip any nodes where we can't determine the provisioner
if provisioner == nil || instanceTypeMap == nil {
recorder.Publish(deprovisioningevents.Blocked(node.Node, node.Machine, fmt.Sprintf("owning provisioner %q not found", provisioner.Name))...)
jonathan-innis marked this conversation as resolved.
Show resolved Hide resolved
return nil, fmt.Errorf("provisioner '%s' can't be resolved for state node", node.Labels()[v1alpha5.ProvisionerNameLabelKey])
}
instanceType := instanceTypeMap[node.Labels()[v1.LabelInstanceTypeStable]]
// skip any nodes that we can't determine the instance of
if instanceType == nil {
recorder.Publish(deprovisioningevents.Blocked(node.Node, node.Machine, fmt.Sprintf("instance type %q not found", node.Labels()[v1.LabelInstanceTypeStable]))...)
return nil, fmt.Errorf("instance type '%s' can't be resolved", node.Labels()[v1.LabelInstanceTypeStable])
}

// skip any nodes that are already marked for deletion and being handled
if node.MarkedForDeletion() {
recorder.Publish(deprovisioningevents.Blocked(node.Node, node.Machine, "machine is marked for deletion")...)
return nil, fmt.Errorf("state node is marked for deletion")
}
// skip nodes that aren't initialized
// This also means that the real Node doesn't exist for it
if !node.Initialized() {
recorder.Publish(deprovisioningevents.Blocked(node.Node, node.Machine, "machine is not initialized")...)
return nil, fmt.Errorf("state node isn't initialized")
}
// skip the node if it is nominated by a recent provisioning pass to be the target of a pending pod.
if node.Nominated() {
recorder.Publish(deprovisioningevents.Blocked(node.Node, node.Machine, "machine is nominated")...)
return nil, fmt.Errorf("state node is nominated")
}
if node.Node == nil || node.Machine == nil {
return nil, fmt.Errorf("state node doesn't contain both a node and a machine")
}

pods, err := node.Pods(ctx, kubeClient)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/deprovisioning/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (v *Validation) IsValid(ctx context.Context, cmd Command) (bool, error) {
}
}
if len(v.validationCandidates) == 0 {
v.validationCandidates, err = GetCandidates(ctx, v.cluster, v.kubeClient, v.clock, v.cloudProvider, v.ShouldDeprovision)
v.validationCandidates, err = GetCandidates(ctx, v.cluster, v.kubeClient, v.recorder, v.clock, v.cloudProvider, v.ShouldDeprovision)
if err != nil {
return false, fmt.Errorf("constructing validation candidates, %w", err)
}
Expand Down