diff --git a/pkg/controllers/deprovisioning/controller.go b/pkg/controllers/deprovisioning/controller.go index 966af4cf36..f78a8a3ed2 100644 --- a/pkg/controllers/deprovisioning/controller.go +++ b/pkg/controllers/deprovisioning/controller.go @@ -110,7 +110,7 @@ func (c *Controller) Builder(_ context.Context, m manager.Manager) controller.Bu } func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconcile.Result, error) { - // this won't catch if the reconcile loop hangs forever but it will catch other issues + // this won't catch if the reconcile loop hangs forever, but it will catch other issues c.logAbnormalRuns(ctx) defer c.logAbnormalRuns(ctx) c.recordRun("deprovisioning-loop") @@ -144,7 +144,7 @@ func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconc } func (c *Controller) deprovision(ctx context.Context, deprovisioner Deprovisioner) (bool, error) { - candidates, err := GetCandidates(ctx, c.cluster, c.kubeClient, c.clock, c.cloudProvider, deprovisioner.ShouldDeprovision) + candidates, err := GetCandidates(ctx, c.cluster, c.kubeClient, c.recorder, c.clock, c.cloudProvider, deprovisioner.ShouldDeprovision) if err != nil { return false, fmt.Errorf("determining candidates, %w", err) } diff --git a/pkg/controllers/deprovisioning/emptymachineconsolidation.go b/pkg/controllers/deprovisioning/emptymachineconsolidation.go index a414858e62..9a49c69945 100644 --- a/pkg/controllers/deprovisioning/emptymachineconsolidation.go +++ b/pkg/controllers/deprovisioning/emptymachineconsolidation.go @@ -69,7 +69,7 @@ func (c *EmptyMachineConsolidation) ComputeCommand(ctx context.Context, candidat return Command{}, errors.New("interrupted") case <-c.clock.After(consolidationTTL): } - validationCandidates, err := GetCandidates(ctx, c.cluster, c.kubeClient, c.clock, c.cloudProvider, c.ShouldDeprovision) + validationCandidates, err := GetCandidates(ctx, c.cluster, c.kubeClient, c.recorder, c.clock, c.cloudProvider, c.ShouldDeprovision) if err != nil { logging.FromContext(ctx).Errorf("computing validation candidates %s", err) return Command{}, err diff --git a/pkg/controllers/deprovisioning/helpers.go b/pkg/controllers/deprovisioning/helpers.go index eee0eb5d4d..1978271a39 100644 --- a/pkg/controllers/deprovisioning/helpers.go +++ b/pkg/controllers/deprovisioning/helpers.go @@ -177,13 +177,13 @@ func disruptionCost(ctx context.Context, pods []*v1.Pod) float64 { } // GetCandidates returns nodes that appear to be currently deprovisionable based off of their provisioner -func GetCandidates(ctx context.Context, cluster *state.Cluster, kubeClient client.Client, clk clock.Clock, cloudProvider cloudprovider.CloudProvider, shouldDeprovision CandidateFilter) ([]*Candidate, error) { +func GetCandidates(ctx context.Context, cluster *state.Cluster, kubeClient client.Client, recorder events.Recorder, clk clock.Clock, cloudProvider cloudprovider.CloudProvider, shouldDeprovision CandidateFilter) ([]*Candidate, error) { provisionerMap, provisionerToInstanceTypes, err := buildProvisionerMap(ctx, kubeClient, cloudProvider) if err != nil { return nil, err } candidates := lo.FilterMap(cluster.Nodes(), func(n *state.StateNode, _ int) (*Candidate, bool) { - cn, e := NewCandidate(ctx, kubeClient, clk, n, provisionerMap, provisionerToInstanceTypes) + cn, e := NewCandidate(ctx, kubeClient, recorder, clk, n, provisionerMap, provisionerToInstanceTypes) return cn, e == nil }) // Filter only the valid candidates that we should deprovision diff --git a/pkg/controllers/deprovisioning/suite_test.go b/pkg/controllers/deprovisioning/suite_test.go index ea0c71f187..671e8901d2 100644 --- a/pkg/controllers/deprovisioning/suite_test.go +++ b/pkg/controllers/deprovisioning/suite_test.go @@ -19,6 +19,7 @@ import ( "fmt" "math" "sort" + "strings" "sync" "sync/atomic" "testing" @@ -49,6 +50,7 @@ import ( "github.com/aws/karpenter-core/pkg/controllers/provisioning" "github.com/aws/karpenter-core/pkg/controllers/state" "github.com/aws/karpenter-core/pkg/controllers/state/informer" + "github.com/aws/karpenter-core/pkg/events" "github.com/aws/karpenter-core/pkg/operator/controller" "github.com/aws/karpenter-core/pkg/operator/scheme" "github.com/aws/karpenter-core/pkg/scheduling" @@ -1135,10 +1137,14 @@ var _ = Describe("Delete Node", func() { // Expect Unconsolidatable events to be fired evts := recorder.Events() - Expect(evts).To(HaveLen(2)) - - Expect(evts[0].Message).To(ContainSubstring("not all pods would schedule")) - Expect(evts[1].Message).To(ContainSubstring("would schedule against a non-initialized node")) + _, ok := lo.Find(evts, func(e events.Event) bool { + return strings.Contains(e.Message, "not all pods would schedule") + }) + Expect(ok).To(BeTrue()) + _, ok = lo.Find(evts, func(e events.Event) bool { + return strings.Contains(e.Message, "would schedule against a non-initialized node") + }) + Expect(ok).To(BeTrue()) }) }) diff --git a/pkg/controllers/deprovisioning/types.go b/pkg/controllers/deprovisioning/types.go index 368f889c15..43ecbb0fd1 100644 --- a/pkg/controllers/deprovisioning/types.go +++ b/pkg/controllers/deprovisioning/types.go @@ -26,8 +26,10 @@ import ( "github.com/aws/karpenter-core/pkg/apis/v1alpha5" "github.com/aws/karpenter-core/pkg/cloudprovider" + deprovisioningevents "github.com/aws/karpenter-core/pkg/controllers/deprovisioning/events" "github.com/aws/karpenter-core/pkg/controllers/provisioning/scheduling" "github.com/aws/karpenter-core/pkg/controllers/state" + "github.com/aws/karpenter-core/pkg/events" ) type Deprovisioner interface { @@ -51,9 +53,12 @@ type Candidate struct { } //nolint:gocyclo -func NewCandidate(ctx context.Context, kubeClient client.Client, clk clock.Clock, node *state.StateNode, +func NewCandidate(ctx context.Context, kubeClient client.Client, recorder events.Recorder, clk clock.Clock, node *state.StateNode, provisionerMap map[string]*v1alpha5.Provisioner, provisionerToInstanceTypes map[string]map[string]*cloudprovider.InstanceType) (*Candidate, error) { + if node.Node == nil || node.Machine == nil { + return nil, fmt.Errorf("state node doesn't contain both a node and a machine") + } // check whether the node has all the labels we need for _, label := range []string{ v1alpha5.LabelCapacityType, @@ -61,6 +66,10 @@ func NewCandidate(ctx context.Context, kubeClient client.Client, clk clock.Clock v1alpha5.ProvisionerNameLabelKey, } { if _, ok := node.Labels()[label]; !ok { + // This means that we don't own the candidate which means we shouldn't fire an event for it + if label != v1alpha5.ProvisionerNameLabelKey { + recorder.Publish(deprovisioningevents.Blocked(node.Node, node.Machine, fmt.Sprintf("required label %q doesn't exist", label))...) + } return nil, fmt.Errorf("state node doesn't have required label '%s'", label) } } @@ -69,30 +78,32 @@ func NewCandidate(ctx context.Context, kubeClient client.Client, clk clock.Clock instanceTypeMap := provisionerToInstanceTypes[node.Labels()[v1alpha5.ProvisionerNameLabelKey]] // skip any nodes where we can't determine the provisioner if provisioner == nil || instanceTypeMap == nil { + recorder.Publish(deprovisioningevents.Blocked(node.Node, node.Machine, fmt.Sprintf("owning provisioner %q not found", provisioner.Name))...) return nil, fmt.Errorf("provisioner '%s' can't be resolved for state node", node.Labels()[v1alpha5.ProvisionerNameLabelKey]) } instanceType := instanceTypeMap[node.Labels()[v1.LabelInstanceTypeStable]] // skip any nodes that we can't determine the instance of if instanceType == nil { + recorder.Publish(deprovisioningevents.Blocked(node.Node, node.Machine, fmt.Sprintf("instance type %q not found", node.Labels()[v1.LabelInstanceTypeStable]))...) return nil, fmt.Errorf("instance type '%s' can't be resolved", node.Labels()[v1.LabelInstanceTypeStable]) } // skip any nodes that are already marked for deletion and being handled if node.MarkedForDeletion() { + recorder.Publish(deprovisioningevents.Blocked(node.Node, node.Machine, "machine is marked for deletion")...) return nil, fmt.Errorf("state node is marked for deletion") } // skip nodes that aren't initialized // This also means that the real Node doesn't exist for it if !node.Initialized() { + recorder.Publish(deprovisioningevents.Blocked(node.Node, node.Machine, "machine is not initialized")...) return nil, fmt.Errorf("state node isn't initialized") } // skip the node if it is nominated by a recent provisioning pass to be the target of a pending pod. if node.Nominated() { + recorder.Publish(deprovisioningevents.Blocked(node.Node, node.Machine, "machine is nominated")...) return nil, fmt.Errorf("state node is nominated") } - if node.Node == nil || node.Machine == nil { - return nil, fmt.Errorf("state node doesn't contain both a node and a machine") - } pods, err := node.Pods(ctx, kubeClient) if err != nil { diff --git a/pkg/controllers/deprovisioning/validation.go b/pkg/controllers/deprovisioning/validation.go index bd23c4bcd6..edc90eaf94 100644 --- a/pkg/controllers/deprovisioning/validation.go +++ b/pkg/controllers/deprovisioning/validation.go @@ -77,7 +77,7 @@ func (v *Validation) IsValid(ctx context.Context, cmd Command) (bool, error) { } } if len(v.validationCandidates) == 0 { - v.validationCandidates, err = GetCandidates(ctx, v.cluster, v.kubeClient, v.clock, v.cloudProvider, v.ShouldDeprovision) + v.validationCandidates, err = GetCandidates(ctx, v.cluster, v.kubeClient, v.recorder, v.clock, v.cloudProvider, v.ShouldDeprovision) if err != nil { return false, fmt.Errorf("constructing validation candidates, %w", err) }