Skip to content

Commit

Permalink
add node ready check for ota strategy
Browse files Browse the repository at this point in the history
Signed-off-by: hxcGit <houxc_mail@163.com>
Signed-off-by: Xuecheng <houxc_mail@163.com>
  • Loading branch information
xavier-hou committed Sep 14, 2022
1 parent 16979d8 commit 6055449
Show file tree
Hide file tree
Showing 7 changed files with 194 additions and 236 deletions.
3 changes: 3 additions & 0 deletions cmd/yurt-controller-manager/controller-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@ import (
"time"

"k8s.io/component-base/logs"

// for JSON log format registration
_ "k8s.io/component-base/logs/json/register"

// load all the prometheus client-go plugin
_ "k8s.io/component-base/metrics/prometheus/clientgo"

// for version metric registration
_ "k8s.io/component-base/metrics/prometheus/version"

Expand Down
137 changes: 71 additions & 66 deletions pkg/controller/daemonpodupdater/daemon_pod_updater_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func NewController(kc client.Interface, daemonsetInformer appsinformers.DaemonSe
return
}

if newDS.ResourceVersion == oldDS.ResourceVersion || oldDS.Status.CurrentNumberScheduled != newDS.Status.DesiredNumberScheduled {
if newDS.ResourceVersion == oldDS.ResourceVersion {
return
}

Expand Down Expand Up @@ -158,7 +158,6 @@ func (c *Controller) deletePod(obj interface{}) {
// the deleted key/value. Note that this value might be stale. If the pod
// changed labels the new daemonset will not be woken up till the periodic
// resync.

if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
Expand Down Expand Up @@ -196,75 +195,54 @@ func (c *Controller) deletePod(obj interface{}) {
c.expectations.DeletionObserved(dsKey)
}

// resolveControllerRef returns the controller referenced by a ControllerRef,
// or nil if the ControllerRef could not be resolved to a matching controller
// of the correct Kind.
func (c *Controller) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *appsv1.DaemonSet {
// We can't look up by UID, so look up by Name and then verify UID.
// Don't even try to look up by Name if it's the wrong Kind.
if controllerRef.Kind != controllerKind.Kind {
return nil
}
ds, err := c.daemonsetLister.DaemonSets(namespace).Get(controllerRef.Name)
if err != nil {
return nil
}
if ds.UID != controllerRef.UID {
// The controller we found with this Name is not the same one that the
// ControllerRef points to.
return nil
}
return ds
}

func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()

klog.Info("Starting daemonPodUpdater controller")
defer klog.Info("Shutting down daemonPodUpdater controller")
defer c.daemonsetWorkqueue.ShutDown()

//synchronize the cache before starting to process events
// Synchronize the cache before starting to process events
if !cache.WaitForCacheSync(stopCh, c.daemonsetSynced, c.nodeSynced,
c.podSynced) {
klog.Error("sync daemonPodUpdater controller timeout")
}

for i := 0; i < threadiness; i++ {
go wait.Until(c.runDaemonsetWorker, time.Second, stopCh)
go wait.Until(c.runWorker, time.Second, stopCh)
}

<-stopCh
}

func (c *Controller) runDaemonsetWorker() {
func (c *Controller) runWorker() {
for {
obj, shutdown := c.daemonsetWorkqueue.Get()
if shutdown {
return
}

if err := c.syncDaemonsetHandler(obj.(string)); err != nil {
// TODO(hxc): should requeue failed ds
if err := c.syncHandler(obj.(string)); err != nil {
utilruntime.HandleError(err)
}
c.daemonsetWorkqueue.Done(obj)
}
}

func (c *Controller) syncDaemonsetHandler(key string) error {
func (c *Controller) syncHandler(key string) error {
defer func() {
klog.V(4).Infof("Finish syncing daemonPodUpdater request %q", key)
}()

klog.V(4).Infof("Start handler daemonPodUpdater request %q", key)
klog.V(4).Infof("Start handling daemonPodUpdater request %q", key)

namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
return nil
return fmt.Errorf("invalid resource key: %s", key)
}

// daemonset that need to be synced
// Daemonset that need to be synced
ds, err := c.daemonsetLister.DaemonSets(namespace).Get(name)
if err != nil {
if apierrors.IsNotFound(err) {
Expand All @@ -284,25 +262,25 @@ func (c *Controller) syncDaemonsetHandler(key string) error {
return nil
}

pods, err := GetDaemonsetPods(c.podLister, ds)
nodeToDaemonPods, err := c.getNodesToDaemonPods(ds)
if err != nil {
return err
return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
}

// recheck required annotation
// Recheck required annotation
v, ok := ds.Annotations[UpdateAnnotation]
if !ok {
return fmt.Errorf("won't sync daemonset %q without annotation 'apps.openyurt.io/update-strategy'", ds.Name)
}

switch v {
case OTAUpdate:
if err := c.checkOTAUpdate(ds, pods); err != nil {
if err := c.checkOTAUpdate(ds, nodeToDaemonPods); err != nil {
return err
}

case AutoUpdate:
if err := c.autoUpdate(ds); err != nil {
if err := c.autoUpdate(ds, nodeToDaemonPods); err != nil {
return err
}
default:
Expand All @@ -313,25 +291,30 @@ func (c *Controller) syncDaemonsetHandler(key string) error {
}

// checkOTAUpdate compare every pod to its owner daemonset to check if pod is updatable
// If pod is in line with the latest daemonset version, set annotation "apps.openyurt.io/pod-updatable" to "true"
// If pod is in line with the latest daemonset spec, set annotation "apps.openyurt.io/pod-updatable" to "true"
// while not, set annotation "apps.openyurt.io/pod-updatable" to "false"
func (c *Controller) checkOTAUpdate(ds *appsv1.DaemonSet, pods []*corev1.Pod) error {
for _, pod := range pods {
if err := SetPodUpdateAnnotation(c.kubeclientset, ds, pod); err != nil {
return err
func (c *Controller) checkOTAUpdate(ds *appsv1.DaemonSet, nodeToDaemonPods map[string][]*corev1.Pod) error {
for nodeName, pods := range nodeToDaemonPods {
// Check if node is ready, ignore not-ready node
ready, err := NodeReadyByName(c.nodeLister, nodeName)
if err != nil {
return fmt.Errorf("couldn't check node %q ready status, %v", nodeName, err)
}
if !ready {
continue
}
for _, pod := range pods {
if err := SetPodUpdateAnnotation(c.kubeclientset, ds, pod); err != nil {
return err
}
}
}
return nil
}

// autoUpdate identifies the set of old pods to delete
func (c *Controller) autoUpdate(ds *appsv1.DaemonSet) error {
nodeToDaemonPods, err := c.getNodesToDaemonPods(ds)
if err != nil {
return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
}

// TODO(hxc): calculate maxUnavailable specified by user, default is 1
func (c *Controller) autoUpdate(ds *appsv1.DaemonSet, nodeToDaemonPods map[string][]*corev1.Pod) error {
// Calculate maxUnavailable specified by user, default is 1
maxUnavailable, err := c.maxUnavailableCounts(ds, nodeToDaemonPods)
if err != nil {
return fmt.Errorf("couldn't get maxUnavailable number for daemon set %q: %v", ds.Name, err)
Expand All @@ -342,7 +325,8 @@ func (c *Controller) autoUpdate(ds *appsv1.DaemonSet) error {
var candidatePodsToDelete []string

for nodeName, pods := range nodeToDaemonPods {
// check if node is ready, ignore not-ready node
// Check if node is ready, ignore not-ready node
// this is a significant difference from the native daemonset controller
ready, err := NodeReadyByName(c.nodeLister, nodeName)
if err != nil {
return fmt.Errorf("couldn't check node %q ready status, %v", nodeName, err)
Expand All @@ -353,46 +337,46 @@ func (c *Controller) autoUpdate(ds *appsv1.DaemonSet) error {

newPod, oldPod, ok := findUpdatedPodsOnNode(ds, pods)
if !ok {
// let the manage loop clean up this node, and treat it as an unavailable node
// Let the manage loop clean up this node, and treat it as an unavailable node
klog.V(3).Infof("DaemonSet %s/%s has excess pods on node %s, skipping to allow the core loop to process", ds.Namespace, ds.Name, nodeName)
numUnavailable++
continue
}
switch {
case oldPod == nil && newPod == nil, oldPod != nil && newPod != nil:
// the manage loop will handle creating or deleting the appropriate pod, consider this unavailable
// The manage loop will handle creating or deleting the appropriate pod, consider this unavailable
numUnavailable++
case newPod != nil:
// this pod is up-to-date, check its availability
// This pod is up-to-date, check its availability
if !k8sutil.IsPodAvailable(newPod, ds.Spec.MinReadySeconds, metav1.Time{Time: time.Now()}) {
// an unavailable new pod is counted against maxUnavailable
// An unavailable new pod is counted against maxUnavailable
numUnavailable++
}
default:
// this pod is old, it is an update candidate
// This pod is old, it is an update candidate
switch {
case !k8sutil.IsPodAvailable(oldPod, ds.Spec.MinReadySeconds, metav1.Time{Time: time.Now()}):
// the old pod isn't available, so it needs to be replaced
// The old pod isn't available, so it needs to be replaced
klog.V(5).Infof("DaemonSet %s/%s pod %s on node %s is out of date and not available, allowing replacement", ds.Namespace, ds.Name, oldPod.Name, nodeName)
// record the replacement
// Record the replacement
if allowedReplacementPods == nil {
allowedReplacementPods = make([]string, 0, len(nodeToDaemonPods))
}
allowedReplacementPods = append(allowedReplacementPods, oldPod.Name)
case numUnavailable >= maxUnavailable:
// no point considering any other candidates
// No point considering any other candidates
continue
default:
klog.V(5).Infof("DaemonSet %s/%s pod %s on node %s is out of date, this is a candidate to replace", ds.Namespace, ds.Name, oldPod.Name, nodeName)
// record the candidate
// Record the candidate
if candidatePodsToDelete == nil {
candidatePodsToDelete = make([]string, 0, maxUnavailable)
}
candidatePodsToDelete = append(candidatePodsToDelete, oldPod.Name)
}
}
}
// use any of the candidates we can, including the allowedReplacemnntPods
// Use any of the candidates we can, including the allowedReplacemnntPods
klog.V(5).Infof("DaemonSet %s/%s allowing %d replacements, up to %d unavailable, %d are unavailable, %d candidates", ds.Namespace, ds.Name, len(allowedReplacementPods), maxUnavailable, numUnavailable, len(candidatePodsToDelete))
remainingUnavailable := maxUnavailable - numUnavailable
if remainingUnavailable < 0 {
Expand All @@ -407,7 +391,7 @@ func (c *Controller) autoUpdate(ds *appsv1.DaemonSet) error {
}

func (c *Controller) getNodesToDaemonPods(ds *appsv1.DaemonSet) (map[string][]*corev1.Pod, error) {
// TODO(hxc): ignore adopt/orphan pod, just deal with pods in podLister
// Ignore adopt/orphan pod, just deal with pods in podLister
pods, err := GetDaemonsetPods(c.podLister, ds)
if err != nil {
return nil, err
Expand Down Expand Up @@ -446,10 +430,10 @@ func (c *Controller) syncPodsOnNodes(ds *appsv1.DaemonSet, podsToDelete []string

c.expectations.SetExpectations(dsKey, 0, deleteDiff)

// error channel to communicate back failures, make the buffer big enough to avoid any blocking
// Error channel to communicate back failures, make the buffer big enough to avoid any blocking
errCh := make(chan error, deleteDiff)

// delete pods process
// Delete pods process
klog.V(4).Infof("Pods to delete for daemon set %s: %+v, deleting %d", ds.Name, podsToDelete, deleteDiff)
deleteWait := sync.WaitGroup{}
deleteWait.Add(deleteDiff)
Expand All @@ -469,7 +453,7 @@ func (c *Controller) syncPodsOnNodes(ds *appsv1.DaemonSet, podsToDelete []string
}
deleteWait.Wait()

// collect errors if any for proper reporting/retry logic in the controller
// Collect errors if any for proper reporting/retry logic in the controller
errors := []error{}
close(errCh)
for err := range errCh {
Expand All @@ -480,7 +464,7 @@ func (c *Controller) syncPodsOnNodes(ds *appsv1.DaemonSet, podsToDelete []string

// maxUnavailableCounts calculates the true number of allowed unavailable
func (c *Controller) maxUnavailableCounts(ds *appsv1.DaemonSet, nodeToDaemonPods map[string][]*corev1.Pod) (int, error) {
// if annotation is not set, use default value one
// If annotation is not set, use default value one
v, ok := ds.Annotations[MaxUnavailableAnnotation]
if !ok {
return 1, nil
Expand All @@ -492,11 +476,32 @@ func (c *Controller) maxUnavailableCounts(ds *appsv1.DaemonSet, nodeToDaemonPods
return -1, fmt.Errorf("invalid value for MaxUnavailable: %v", err)
}

// if the daemonset returned with an impossible configuration, obey the default of unavailable=1
// If the daemonset returned with an impossible configuration, obey the default of unavailable=1
if maxUnavailable == 0 {
klog.Warningf("DaemonSet %s/%s is not configured for unavailability, defaulting to accepting unavailability", ds.Namespace, ds.Name)
maxUnavailable = 1
}
klog.V(5).Infof("DaemonSet %s/%s, maxUnavailable: %d", ds.Namespace, ds.Name, maxUnavailable)
return maxUnavailable, nil
}

// resolveControllerRef returns the controller referenced by a ControllerRef,
// or nil if the ControllerRef could not be resolved to a matching controller
// of the correct Kind.
func (c *Controller) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *appsv1.DaemonSet {
// We can't look up by UID, so look up by Name and then verify UID.
// Don't even try to look up by Name if it's the wrong Kind.
if controllerRef.Kind != controllerKind.Kind {
return nil
}
ds, err := c.daemonsetLister.DaemonSets(namespace).Get(controllerRef.Name)
if err != nil {
return nil
}
if ds.UID != controllerRef.UID {
// The controller we found with this Name is not the same one that the
// ControllerRef points to.
return nil
}
return ds
}
Loading

0 comments on commit 6055449

Please sign in to comment.