Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: cleanup deprovisioning types #289

Merged
merged 18 commits into from
May 4, 2023
12 changes: 5 additions & 7 deletions pkg/controllers/deprovisioning/consolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ...
if err != nil {
// if a candidate node is now deleting, just retry
if errors.Is(err, errCandidateDeleting) {
return Command{action: actionDoNothing}, nil
return Command{}, nil
}
return Command{}, err
}
Expand All @@ -121,14 +121,13 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ...
if len(candidates) == 1 {
c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, candidates[0].Machine, results.PodSchedulingErrors())...)
}
return Command{action: actionDoNothing}, nil
return Command{}, nil
}

// were we able to schedule all the pods on the inflight candidates?
if len(results.NewMachines) == 0 {
return Command{
candidates: candidates,
action: actionDelete,
}, nil
}

Expand All @@ -137,7 +136,7 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ...
if len(candidates) == 1 {
c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, candidates[0].Machine, fmt.Sprintf("can't remove without creating %d candidates", len(results.NewMachines)))...)
}
return Command{action: actionDoNothing}, nil
return Command{}, nil
}

// get the current node price based on the offering
Expand All @@ -152,7 +151,7 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ...
c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, candidates[0].Machine, "can't replace with a cheaper node")...)
}
// no instance types remain after filtering by price
return Command{action: actionDoNothing}, nil
return Command{}, nil
}

// If the existing candidates are all spot and the replacement is spot, we don't consolidate. We don't have a reliable
Expand All @@ -170,7 +169,7 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ...
if len(candidates) == 1 {
c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, candidates[0].Machine, "can't replace a spot node with a spot node")...)
}
return Command{action: actionDoNothing}, nil
return Command{}, nil
}

// We are consolidating a node from OD -> [OD,Spot] but have filtered the instance types by cost based on the
Expand All @@ -184,7 +183,6 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ...

return Command{
candidates: candidates,
action: actionReplace,
replacements: results.NewMachines,
}, nil
}
Expand Down
11 changes: 6 additions & 5 deletions pkg/controllers/deprovisioning/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (c *Controller) deprovision(ctx context.Context, deprovisioner Deprovisione
if err != nil {
return false, fmt.Errorf("computing deprovisioning decision, %w", err)
}
if cmd.action == actionDoNothing {
if cmd.Action() == noOpAction {
return false, nil
}

Expand All @@ -171,11 +171,12 @@ func (c *Controller) deprovision(ctx context.Context, deprovisioner Deprovisione
}

func (c *Controller) executeCommand(ctx context.Context, d Deprovisioner, command Command) error {
deprovisioningActionsPerformedCounter.With(prometheus.Labels{"action": fmt.Sprintf("%s/%s", d, command.action)}).Add(1)
logging.FromContext(ctx).Infof("deprovisioning via %s %s", d, command)
action := lo.Ternary(len(command.replacements) > 0, replaceAction, deleteAction)
deprovisioningActionsPerformedCounter.With(prometheus.Labels{"action": fmt.Sprintf("%s/%s", d, action)}).Inc()
logging.FromContext(ctx).Infof("deprovisioning via %s %s", d, action)

reason := fmt.Sprintf("%s/%s", d, command.action)
if command.action == actionReplace {
reason := fmt.Sprintf("%s/%s", d, action)
if command.Action() == replaceAction {
if err := c.launchReplacementMachines(ctx, command, reason); err != nil {
// If we failed to launch the replacement, don't deprovision. If this is some permanent failure,
// we don't want to disrupt workloads with no way to provision new nodes for them.
Expand Down
5 changes: 1 addition & 4 deletions pkg/controllers/deprovisioning/drift.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ func (d *Drift) ComputeCommand(ctx context.Context, nodes ...*Candidate) (Comman
}); len(empty) > 0 {
return Command{
candidates: empty,
action: actionDelete,
njtran marked this conversation as resolved.
Show resolved Hide resolved
}, nil
}

Expand All @@ -92,16 +91,14 @@ func (d *Drift) ComputeCommand(ctx context.Context, nodes ...*Candidate) (Comman
if len(results.NewMachines) == 0 {
return Command{
candidates: []*Candidate{candidate},
action: actionDelete,
}, nil
}
return Command{
candidates: []*Candidate{candidate},
action: actionReplace,
replacements: results.NewMachines,
}, nil
}
return Command{action: actionDoNothing}, nil
return Command{}, nil
}

// String is the string representation of the deprovisioner
Expand Down
4 changes: 0 additions & 4 deletions pkg/controllers/deprovisioning/emptiness.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,8 @@ func (e *Emptiness) ComputeCommand(_ context.Context, candidates ...*Candidate)
return cn.Machine.DeletionTimestamp.IsZero() && len(cn.pods) == 0
})

if len(emptyCandidates) == 0 {
return Command{action: actionDoNothing}, nil
}
return Command{
candidates: emptyCandidates,
action: actionDelete,
}, nil
}

Expand Down
5 changes: 2 additions & 3 deletions pkg/controllers/deprovisioning/emptymachineconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func NewEmptyMachineConsolidation(clk clock.Clock, cluster *state.Cluster, kubeC
// ComputeCommand generates a deprovisioning command given deprovisionable machines
func (c *EmptyMachineConsolidation) ComputeCommand(ctx context.Context, candidates ...*Candidate) (Command, error) {
if c.cluster.Consolidated() {
return Command{action: actionDoNothing}, nil
return Command{}, nil
}
candidates, err := c.sortAndFilterCandidates(ctx, candidates)
if err != nil {
Expand All @@ -53,12 +53,11 @@ func (c *EmptyMachineConsolidation) ComputeCommand(ctx context.Context, candidat
// select the entirely empty nodes
emptyCandidates := lo.Filter(candidates, func(n *Candidate, _ int) bool { return len(n.pods) == 0 })
if len(emptyCandidates) == 0 {
return Command{action: actionDoNothing}, nil
return Command{}, nil
}

cmd := Command{
candidates: emptyCandidates,
action: actionDelete,
}

// empty machine consolidation doesn't use Validation as we get to take advantage of cluster.IsNodeNominated. This
Expand Down
4 changes: 1 addition & 3 deletions pkg/controllers/deprovisioning/expiration.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ func (e *Expiration) ComputeCommand(ctx context.Context, nodes ...*Candidate) (C
}); len(empty) > 0 {
return Command{
candidates: empty,
action: actionDelete,
}, nil
}

Expand All @@ -115,11 +114,10 @@ func (e *Expiration) ComputeCommand(ctx context.Context, nodes ...*Candidate) (C
With("delay", time.Since(node.GetExpirationTime(candidates[0].Node, candidates[0].provisioner))).Infof("triggering termination for expired node after TTL")
return Command{
candidates: []*Candidate{candidate},
action: actionReplace,
replacements: results.NewMachines,
}, nil
}
return Command{action: actionDoNothing}, nil
return Command{}, nil
}

// String is the string representation of the deprovisioner
Expand Down
17 changes: 8 additions & 9 deletions pkg/controllers/deprovisioning/multimachineconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func NewMultiMachineConsolidation(clk clock.Clock, cluster *state.Cluster, kubeC

func (m *MultiMachineConsolidation) ComputeCommand(ctx context.Context, candidates ...*Candidate) (Command, error) {
if m.cluster.Consolidated() {
return Command{action: actionDoNothing}, nil
return Command{}, nil
}
candidates, err := m.sortAndFilterCandidates(ctx, candidates)
if err != nil {
Expand All @@ -54,7 +54,7 @@ func (m *MultiMachineConsolidation) ComputeCommand(ctx context.Context, candidat
if err != nil {
return Command{}, err
}
if cmd.action == actionDoNothing {
if cmd.Action() == noOpAction {
return cmd, nil
}

Expand All @@ -75,14 +75,14 @@ func (m *MultiMachineConsolidation) ComputeCommand(ctx context.Context, candidat
func (m *MultiMachineConsolidation) firstNMachineConsolidationOption(ctx context.Context, candidates []*Candidate, max int) (Command, error) {
// we always operate on at least two machines at once, for single machines standard consolidation will find all solutions
if len(candidates) < 2 {
return Command{action: actionDoNothing}, nil
return Command{}, nil
}
min := 1
if len(candidates) <= max {
max = len(candidates) - 1
}

lastSavedCommand := Command{action: actionDoNothing}
lastSavedCommand := Command{}
// binary search to find the maximum number of machines we can terminate
for min <= max {
mid := (min + max) / 2
Expand All @@ -96,14 +96,13 @@ func (m *MultiMachineConsolidation) firstNMachineConsolidationOption(ctx context

// ensure that the action is sensical for replacements, see explanation on filterOutSameType for why this is
// required
if action.action == actionReplace {
instanceTypeFiltered := false
njtran marked this conversation as resolved.
Show resolved Hide resolved
if action.Action() == replaceAction {
action.replacements[0].InstanceTypeOptions = filterOutSameType(action.replacements[0], candidatesToConsolidate)
if len(action.replacements[0].InstanceTypeOptions) == 0 {
action.action = actionDoNothing
}
instanceTypeFiltered = len(action.replacements[0].InstanceTypeOptions) == 0
}

if action.action == actionReplace || action.action == actionDelete {
if !instanceTypeFiltered {
// we can consolidate machines [0,mid]
lastSavedCommand = action
min = mid + 1
Expand Down
8 changes: 4 additions & 4 deletions pkg/controllers/deprovisioning/singlemachineconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func NewSingleMachineConsolidation(clk clock.Clock, cluster *state.Cluster, kube
// nolint:gocyclo
func (c *SingleMachineConsolidation) ComputeCommand(ctx context.Context, candidates ...*Candidate) (Command, error) {
if c.cluster.Consolidated() {
return Command{action: actionDoNothing}, nil
return Command{}, nil
}
candidates, err := c.sortAndFilterCandidates(ctx, candidates)
if err != nil {
Expand All @@ -57,7 +57,7 @@ func (c *SingleMachineConsolidation) ComputeCommand(ctx context.Context, candida
logging.FromContext(ctx).Errorf("computing consolidation %s", err)
continue
}
if cmd.action == actionDoNothing {
if cmd.Action() == noOpAction {
continue
}

Expand All @@ -70,9 +70,9 @@ func (c *SingleMachineConsolidation) ComputeCommand(ctx context.Context, candida
return Command{}, fmt.Errorf("command is no longer valid, %s", cmd)
}

if cmd.action == actionReplace || cmd.action == actionDelete {
if cmd.Action() != noOpAction {
return cmd, nil
}
}
return Command{action: actionDoNothing}, nil
return Command{}, nil
}
44 changes: 18 additions & 26 deletions pkg/controllers/deprovisioning/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,6 @@ type Candidate struct {
func NewCandidate(ctx context.Context, kubeClient client.Client, recorder events.Recorder, clk clock.Clock, node *state.StateNode,
provisionerMap map[string]*v1alpha5.Provisioner, provisionerToInstanceTypes map[string]map[string]*cloudprovider.InstanceType) (*Candidate, error) {

if node.Node == nil || node.Machine == nil {
return nil, fmt.Errorf("state node doesn't contain both a node and a machine")
}
// check whether the node has all the labels we need
for _, label := range []string{
v1alpha5.LabelCapacityType,
Expand Down Expand Up @@ -104,6 +101,9 @@ func NewCandidate(ctx context.Context, kubeClient client.Client, recorder events
recorder.Publish(deprovisioningevents.Blocked(node.Node, node.Machine, "machine is nominated")...)
return nil, fmt.Errorf("state node is nominated")
}
if node.Node == nil || node.Machine == nil {
return nil, fmt.Errorf("state node doesn't contain both a node and a machine")
}

pods, err := node.Pods(ctx, kubeClient)
if err != nil {
Expand Down Expand Up @@ -136,38 +136,30 @@ func (c *Candidate) lifetimeRemaining(clock clock.Clock) float64 {
return remaining
}

type action byte
njtran marked this conversation as resolved.
Show resolved Hide resolved
type Command struct {
candidates []*Candidate
replacements []*scheduling.Machine
}

const (
actionDelete action = iota
actionReplace
actionDoNothing
noOpAction = "no-op"
njtran marked this conversation as resolved.
Show resolved Hide resolved
njtran marked this conversation as resolved.
Show resolved Hide resolved
replaceAction = "replace"
deleteAction = "delete"
)

func (a action) String() string {
switch a {
// Deprovisioning action with no replacement machines
case actionDelete:
return "delete"
// Deprovisioning action with replacement machines
case actionReplace:
return "replace"
case actionDoNothing:
return "do nothing"
default:
return fmt.Sprintf("unknown (%d)", a)
func (o Command) Action() string {
if len(o.candidates) == 0 {
njtran marked this conversation as resolved.
Show resolved Hide resolved
return noOpAction
}
}

type Command struct {
candidates []*Candidate
action action
replacements []*scheduling.Machine
if len(o.replacements) > 0 {
return replaceAction
}
return deleteAction
}

func (o Command) String() string {
var buf bytes.Buffer
fmt.Fprintf(&buf, "%s, terminating %d machines ", o.action, len(o.candidates))
fmt.Fprintf(&buf, "%s, terminating %d machines ", o.Action(), len(o.candidates))
for i, old := range o.candidates {
if i != 0 {
fmt.Fprint(&buf, ", ")
Expand Down