Skip to content

Commit

Permalink
node-controller: Support an annotation to hold updates
Browse files Browse the repository at this point in the history
Today the MCO arbitrarily chooses a node to update from the candidates.
We want to allow admins to avoid specific nodes entirely.

(Aside: This replaces the defunct etcd-specific ordering code that
 we didn't end up using)

Add an annotation `machineconfiguration.openshift.io/hold` that allows
an external controller (and/or human) to avoid specific nodes.

Related: openshift#2059
  • Loading branch information
cgwalters committed Oct 15, 2020
1 parent 86c8766 commit 9bd2397
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 75 deletions.
102 changes: 41 additions & 61 deletions pkg/controller/node/node_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ func (ctrl *Controller) updateNode(old, cur interface{}) {
daemonconsts.CurrentMachineConfigAnnotationKey,
daemonconsts.DesiredMachineConfigAnnotationKey,
daemonconsts.MachineConfigDaemonStateAnnotationKey,
daemonconsts.MachineUpdateHoldAnnotationKey,
}
for _, anno := range annos {
newValue := curNode.Annotations[anno]
Expand Down Expand Up @@ -758,10 +759,10 @@ func (ctrl *Controller) syncMachineConfigPool(key string) error {
return err
}

candidates, capacity := getAllCandidateMachines(pool, nodes, maxunavail)
if len(candidates) > 0 {
ctrl.logPool(pool, "%d candidate nodes for update, capacity: %d", len(candidates), capacity)
if err := ctrl.updateCandidateMachines(pool, candidates, capacity); err != nil {
state := getAllCandidateMachines(pool, nodes, maxunavail)
if len(state.candidates) > 0 {
ctrl.logPool(pool, "%d candidate nodes for update, capacity: %d", len(state.candidates), state.capacity)
if err := ctrl.updateCandidateMachines(pool, state.candidates, state.capacity); err != nil {
if syncErr := ctrl.syncStatusOnly(pool); syncErr != nil {
return goerrs.Wrapf(err, "error setting desired machine config annotation for pool %q, sync error: %v", pool.Name, syncErr)
}
Expand Down Expand Up @@ -833,20 +834,21 @@ func (ctrl *Controller) setDesiredMachineConfigAnnotation(nodeName, currentConfi
})
}

type updateCandidateState struct {
candidates []*corev1.Node
unavail uint
held uint
capacity uint
}

// getAllCandidateMachines returns all possible nodes which can be updated to the target config, along with a maximum
// capacity. It is the reponsibility of the caller to choose a subset of the nodes given the capacity.
func getAllCandidateMachines(pool *mcfgv1.MachineConfigPool, nodesInPool []*corev1.Node, maxUnavailable int) ([]*corev1.Node, uint) {
func getAllCandidateMachines(pool *mcfgv1.MachineConfigPool, nodesInPool []*corev1.Node, maxUnavailable int) updateCandidateState {
targetConfig := pool.Spec.Configuration.Name

unavail := getUnavailableMachines(nodesInPool)
// If we're at capacity, there's nothing to do.
if len(unavail) >= maxUnavailable {
return nil, 0
}
capacity := maxUnavailable - len(unavail)
failingThisConfig := 0
var failingThisConfig uint
ret := updateCandidateState{}
// We only look at nodes which aren't already targeting our desired config
var nodes []*corev1.Node
var candidates []*corev1.Node
for _, node := range nodesInPool {
if node.Annotations[daemonconsts.DesiredMachineConfigAnnotationKey] == targetConfig {
if isNodeMCDFailing(node) {
Expand All @@ -855,69 +857,47 @@ func getAllCandidateMachines(pool *mcfgv1.MachineConfigPool, nodesInPool []*core
continue
}

nodes = append(nodes, node)
if _, ok := node.Annotations[daemonconsts.MachineUpdateHoldAnnotationKey]; ok {
ret.held++
continue
}

candidates = append(candidates, node)
}

ret.unavail = uint(len(getUnavailableMachines(nodesInPool)))
// If we're at capacity, there's nothing to do.
if ret.unavail >= uint(maxUnavailable) {
ret.capacity = 0
return ret
}
ret.capacity = uint(maxUnavailable) - ret.unavail

// Nodes which are failing to target this config also count against
// availability - it might be a transient issue, and if the issue
// clears we don't want multiple to update at once.
if failingThisConfig >= capacity {
return nil, 0
if failingThisConfig >= ret.capacity {
ret.capacity = 0
return ret
}
capacity -= failingThisConfig
ret.capacity -= failingThisConfig
ret.candidates = candidates

return nodes, uint(capacity)
return ret
}

// getCandidateMachines returns the maximum subset of nodes which can be updated to the target config given availability constraints.
func getCandidateMachines(pool *mcfgv1.MachineConfigPool, nodesInPool []*corev1.Node, maxUnavailable int) []*corev1.Node {
nodes, capacity := getAllCandidateMachines(pool, nodesInPool, maxUnavailable)
if uint(len(nodes)) < capacity {
return nodes
state := getAllCandidateMachines(pool, nodesInPool, maxUnavailable)
fmt.Printf("%+v\n", state)
if uint(len(state.candidates)) < state.capacity {
return state.candidates
}
return nodes[:capacity]
}

// getCurrentEtcdLeader is not yet implemented
func (ctrl *Controller) getCurrentEtcdLeader(candidates []*corev1.Node) (*corev1.Node, error) {
return nil, nil
}

// filterControlPlaneCandidateNodes adjusts the candidates and capacity specifically
// for the control plane, e.g. based on which node is the etcd leader at the time.
// nolint:unparam
func (ctrl *Controller) filterControlPlaneCandidateNodes(pool *mcfgv1.MachineConfigPool, candidates []*corev1.Node, capacity uint) ([]*corev1.Node, uint, error) {
if len(candidates) <= 1 {
return candidates, capacity, nil
}
etcdLeader, err := ctrl.getCurrentEtcdLeader(candidates)
if err != nil {
glog.Warningf("Failed to find current etcd leader (continuing anyways): %v", err)
}
var newCandidates []*corev1.Node
for _, node := range candidates {
if node == etcdLeader {
// For now make this an event so we know it's working, even though it's more of a non-event
ctrl.eventRecorder.Eventf(pool, corev1.EventTypeNormal, "DeferringEtcdLeaderUpdate", "Deferring update of etcd leader %s", node.Name)
glog.Infof("Deferring update of etcd leader: %s", node.Name)
continue
}
newCandidates = append(newCandidates, node)
}
return newCandidates, capacity, nil
return state.candidates[:state.capacity]
}

// updateCandidateMachines sets the desiredConfig annotation the candidate machines
func (ctrl *Controller) updateCandidateMachines(pool *mcfgv1.MachineConfigPool, candidates []*corev1.Node, capacity uint) error {
if pool.Name == masterPoolName {
var err error
candidates, capacity, err = ctrl.filterControlPlaneCandidateNodes(pool, candidates, capacity)
if err != nil {
return err
}
// In practice right now these counts will be 1 but let's stay general to support 5 etcd nodes in the future
ctrl.logPool(pool, "filtered to %d candidate nodes for update, capacity: %d", len(candidates), capacity)
}
if capacity < uint(len(candidates)) {
// Arbitrarily pick the first N candidates; no attempt at sorting.
// Perhaps later we allow admins to weight somehow, or do something more intelligent.
Expand Down
47 changes: 33 additions & 14 deletions pkg/controller/node/node_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,7 @@ func TestMaxUnavailable(t *testing.T) {

func TestGetCandidateMachines(t *testing.T) {
tests := []struct {
desc string
nodes []*corev1.Node
progress int

Expand All @@ -512,7 +513,7 @@ func TestGetCandidateMachines(t *testing.T) {
// capacity is the maximum number of nodes we could update
capacity uint
}{{
//no progress
desc: "no progress",
progress: 1,
nodes: []*corev1.Node{
newNodeWithReady("node-0", "v1", "v1", corev1.ConditionTrue),
Expand All @@ -523,7 +524,7 @@ func TestGetCandidateMachines(t *testing.T) {
otherCandidates: nil,
capacity: 1,
}, {
//no progress
desc: "no progress 2",
progress: 1,
nodes: []*corev1.Node{
newNodeWithReady("node-0", "v1", "v1", corev1.ConditionTrue),
Expand All @@ -534,7 +535,7 @@ func TestGetCandidateMachines(t *testing.T) {
otherCandidates: nil,
capacity: 0,
}, {
//no progress because we have an unavailable node
desc: "no progress because we have an unavailable node",
progress: 1,
nodes: []*corev1.Node{
newNodeWithReady("node-0", "v1", "v1", corev1.ConditionTrue),
Expand All @@ -545,7 +546,7 @@ func TestGetCandidateMachines(t *testing.T) {
otherCandidates: nil,
capacity: 0,
}, {
// node-2 is going to change config, so we can only progress one more
desc: "node-2 is going to change config, so we can only progress one more",
progress: 3,
nodes: []*corev1.Node{
newNodeWithReady("node-0", "v1", "v1", corev1.ConditionTrue),
Expand All @@ -558,7 +559,7 @@ func TestGetCandidateMachines(t *testing.T) {
otherCandidates: []string{"node-4"},
capacity: 1,
}, {
// We have a node working, don't start anything else
desc: "We have a node working, don't start anything else",
progress: 1,
nodes: []*corev1.Node{
newNodeWithReady("node-0", "v1", "v1", corev1.ConditionTrue),
Expand All @@ -571,7 +572,7 @@ func TestGetCandidateMachines(t *testing.T) {
otherCandidates: nil,
capacity: 0,
}, {
//progress on old stuck node
desc: "progress on old stuck node",
progress: 1,
nodes: []*corev1.Node{
newNodeWithReady("node-0", "v1", "v1", corev1.ConditionTrue),
Expand All @@ -582,7 +583,7 @@ func TestGetCandidateMachines(t *testing.T) {
otherCandidates: []string{"node-2"},
capacity: 1,
}, {
// Don't change a degraded node to same config, but also don't start another
desc: "Don't change a degraded node to same config, but also don't start another",
progress: 1,
nodes: []*corev1.Node{
newNodeWithReady("node-0", "v1", "v1", corev1.ConditionTrue),
Expand All @@ -593,7 +594,7 @@ func TestGetCandidateMachines(t *testing.T) {
otherCandidates: nil,
capacity: 0,
}, {
// Must be able to roll back
desc: "Must be able to roll back",
progress: 1,
nodes: []*corev1.Node{
newNodeWithReady("node-0", "v1", "v1", corev1.ConditionTrue),
Expand All @@ -605,7 +606,7 @@ func TestGetCandidateMachines(t *testing.T) {
otherCandidates: nil,
capacity: 1,
}, {
// Validate we also don't affect nodes which haven't started work
desc: "Validate we also don't affect nodes which haven't started work",
progress: 1,
nodes: []*corev1.Node{
newNodeWithReady("node-0", "v1", "v1", corev1.ConditionTrue),
Expand All @@ -616,7 +617,7 @@ func TestGetCandidateMachines(t *testing.T) {
otherCandidates: nil,
capacity: 0,
}, {
// A test with more nodes in mixed order
desc: "A test with more nodes in mixed order",
progress: 4,
nodes: []*corev1.Node{
newNodeWithReady("node-0", "v1", "v1", corev1.ConditionTrue),
Expand All @@ -632,10 +633,28 @@ func TestGetCandidateMachines(t *testing.T) {
expected: []string{"node-3", "node-4"},
otherCandidates: []string{"node-5", "node-6"},
capacity: 2,
}, {
desc: "A test with more nodes in mixed order",
progress: 4,
nodes: []*corev1.Node{
newNodeWithReady("node-0", "v1", "v1", corev1.ConditionTrue),
newNodeWithReady("node-1", "v1", "v1", corev1.ConditionFalse),
newNodeWithReady("node-2", "v0", "v1", corev1.ConditionTrue),
newNodeWithReadyButHeld("node-3", "v0", "v0", corev1.ConditionTrue),
newNodeWithReady("node-4", "v0", "v0", corev1.ConditionTrue),
newNodeWithReadyButHeld("node-5", "v0", "v0", corev1.ConditionTrue),
newNodeWithReady("node-6", "v0", "v0", corev1.ConditionTrue),
newNodeWithReady("node-7", "v1", "v1", corev1.ConditionTrue),
newNodeWithReady("node-8", "v1", "v1", corev1.ConditionTrue),
newNodeWithReady("node-9", "v0", "v0", corev1.ConditionTrue),
},
expected: []string{"node-4", "node-6"},
otherCandidates: []string{"node-9"},
capacity: 2,
}}

for idx, test := range tests {
t.Run(fmt.Sprintf("case#%d", idx), func(t *testing.T) {
t.Run(fmt.Sprintf("case#%d: %s", idx, test.desc), func(t *testing.T) {
pool := &mcfgv1.MachineConfigPool{
Spec: mcfgv1.MachineConfigPoolSpec{
Configuration: mcfgv1.MachineConfigPoolStatusConfiguration{ObjectReference: corev1.ObjectReference{Name: "v1"}},
Expand All @@ -649,10 +668,10 @@ func TestGetCandidateMachines(t *testing.T) {
}
assert.Equal(t, test.expected, nodeNames)

allCandidates, capacity := getAllCandidateMachines(pool, test.nodes, test.progress)
assert.Equal(t, test.capacity, capacity)
state := getAllCandidateMachines(pool, test.nodes, test.progress)
assert.Equal(t, test.capacity, state.capacity)
var otherCandidates []string
for i, node := range allCandidates {
for i, node := range state.candidates {
if i < len(nodeNames) {
assert.Equal(t, node.Name, nodeNames[i])
} else {
Expand Down
10 changes: 10 additions & 0 deletions pkg/controller/node/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,16 @@ func newNodeWithReady(name string, currentConfig, desiredConfig string, status c
return node
}

func newNodeWithReadyButHeld(name string, currentConfig, desiredConfig string, status corev1.ConditionStatus) *corev1.Node {
node := newNode(name, currentConfig, desiredConfig)
node.Status = corev1.NodeStatus{Conditions: []corev1.NodeCondition{{Type: corev1.NodeReady, Status: status}}}
if node.Annotations == nil {
node.Annotations = map[string]string{}
}
node.Annotations[daemonconsts.MachineUpdateHoldAnnotationKey] = "true"
return node
}

func newNodeWithReadyAndDaemonState(name string, currentConfig, desiredConfig string, status corev1.ConditionStatus, dstate string) *corev1.Node {
node := newNode(name, currentConfig, desiredConfig)
node.Status = corev1.NodeStatus{Conditions: []corev1.NodeCondition{{Type: corev1.NodeReady, Status: status}}}
Expand Down
2 changes: 2 additions & 0 deletions pkg/daemon/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ const (
DesiredMachineConfigAnnotationKey = "machineconfiguration.openshift.io/desiredConfig"
// MachineConfigDaemonStateAnnotationKey is used to fetch the state of the daemon on the machine.
MachineConfigDaemonStateAnnotationKey = "machineconfiguration.openshift.io/state"
// MachineUpdateHoldAnnotationKey is used to skip specific nodes for updates
MachineUpdateHoldAnnotationKey = "machineconfiguration.openshift.io/hold"
// OpenShiftOperatorManagedLabel is used to filter out kube objects that don't need to be synced by the MCO
OpenShiftOperatorManagedLabel = "openshift.io/operator-managed"
// MachineConfigDaemonStateWorking is set by daemon when it is applying an update.
Expand Down

0 comments on commit 9bd2397

Please sign in to comment.