Skip to content

Commit

Permalink
add more info for rollout status cloneset (#65)
Browse files Browse the repository at this point in the history
* add more info for rollout status cloneset

Signed-off-by: hantmac <hantmac@outlook.com>

* add  flag for rollout status

Signed-off-by: hantmac <hantmac@outlook.com>
  • Loading branch information
hantmac authored Jul 22, 2022
1 parent 8012780 commit e1b55f7
Show file tree
Hide file tree
Showing 3 changed files with 253 additions and 13 deletions.
20 changes: 18 additions & 2 deletions pkg/cmd/rollout/rollout_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/cli-runtime/pkg/resource"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
watchtools "k8s.io/client-go/tools/watch"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
Expand Down Expand Up @@ -77,10 +78,12 @@ type RolloutStatusOptions struct {
Watch bool
Revision int64
Timeout time.Duration
Detail bool

StatusViewerFn func(*meta.RESTMapping) (internalpolymorphichelpers.StatusViewer, error)
Builder func() *resource.Builder
DynamicClient dynamic.Interface
ClientSet kubernetes.Interface

FilenameOptions *resource.FilenameOptions
genericclioptions.IOStreams
Expand All @@ -94,6 +97,7 @@ func NewRolloutStatusOptions(streams genericclioptions.IOStreams) *RolloutStatus
IOStreams: streams,
Watch: true,
Timeout: 0,
Detail: false,
}
}

Expand Down Expand Up @@ -122,6 +126,7 @@ func NewCmdRolloutStatus(f cmdutil.Factory, streams genericclioptions.IOStreams)
cmd.Flags().BoolVarP(&o.Watch, "watch", "w", o.Watch, "Watch the status of the rollout until it's done.")
cmd.Flags().Int64Var(&o.Revision, "revision", o.Revision, "Pin to a specific revision for showing its status. Defaults to 0 (last revision).")
cmd.Flags().DurationVar(&o.Timeout, "timeout", o.Timeout, "The length of time to wait before ending watch, zero means never. Any other values should contain a corresponding time unit (e.g. 1s, 2m, 3h).")
cmd.Flags().BoolVarP(&o.Detail, "detail", "d", o.Detail, "Show the detail status of the rollout.")

return cmd
}
Expand Down Expand Up @@ -149,6 +154,11 @@ func (o *RolloutStatusOptions) Complete(f cmdutil.Factory, args []string) error
return err
}

o.ClientSet, err = kubernetes.NewForConfig(clientConfig)
if err != nil {
return err
}

return nil
}

Expand Down Expand Up @@ -224,17 +234,23 @@ func (o *RolloutStatusOptions) Run() error {
// if the rollout isn't done yet, keep watching deployment status
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), o.Timeout)
intr := interrupt.New(nil, cancel)
var status string
var consideredDone bool
return intr.Run(func() error {
_, err = watchtools.UntilWithSync(ctx, lw, &unstructured.Unstructured{}, preconditionFunc, func(e watch.Event) (bool, error) {
switch t := e.Type; t {
case watch.Added, watch.Modified:
status, done, err := statusViewer.Status(e.Object.(runtime.Unstructured), o.Revision)
if o.Detail {
status, consideredDone, err = statusViewer.DetailStatus(o.ClientSet, e.Object.(runtime.Unstructured), o.Detail, o.Revision)
} else {
status, consideredDone, err = statusViewer.Status(o.ClientSet, e.Object.(runtime.Unstructured), o.Revision)
}
if err != nil {
return false, err
}
fmt.Fprintf(o.Out, "%s", status)
// Quit waiting if the rollout is done
if done {
if consideredDone {
return true, nil
}

Expand Down
172 changes: 161 additions & 11 deletions pkg/internal/polymorphichelpers/rollout_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@ import (
extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes"
deploymentutil "k8s.io/kubectl/pkg/util/deployment"
)

// StatusViewer provides an interface for resources that have rollout status.
type StatusViewer interface {
Status(obj runtime.Unstructured, revision int64) (string, bool, error)
Status(c kubernetes.Interface, obj runtime.Unstructured, revision int64) (string, bool, error)
DetailStatus(c kubernetes.Interface, obj runtime.Unstructured, detail bool, revision int64) (string, bool, error)
}

// StatusViewerFor returns a StatusViewer for the resource specified by kind.
Expand Down Expand Up @@ -71,7 +73,43 @@ type CloneSetStatusViewer struct{}
type AdvancedStatefulSetStatusViewer struct{}

// Status returns a message describing deployment status, and a bool value indicating if the status is considered done.
func (s *DeploymentStatusViewer) Status(obj runtime.Unstructured, revision int64) (string, bool, error) {
func (s *DeploymentStatusViewer) Status(c kubernetes.Interface, obj runtime.Unstructured, revision int64) (string, bool, error) {
deployment := &appsv1.Deployment{}
err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), deployment)
if err != nil {
return "", false, fmt.Errorf("failed to convert %T to %T: %v", obj, deployment, err)
}

if revision > 0 {
deploymentRev, err := deploymentutil.Revision(deployment)
if err != nil {
return "", false, fmt.Errorf("cannot get the revision of deployment %q: %v", deployment.Name, err)
}
if revision != deploymentRev {
return "", false, fmt.Errorf("desired revision (%d) is different from the running revision (%d)", revision, deploymentRev)
}
}
if deployment.Generation <= deployment.Status.ObservedGeneration {
cond := deploymentutil.GetDeploymentCondition(deployment.Status, appsv1.DeploymentProgressing)
if cond != nil && cond.Reason == deploymentutil.TimedOutReason {
return "", false, fmt.Errorf("deployment %q exceeded its progress deadline", deployment.Name)
}
if deployment.Spec.Replicas != nil && deployment.Status.UpdatedReplicas < *deployment.Spec.Replicas {
return fmt.Sprintf("Waiting for deployment %q rollout to finish: %d out of %d new replicas have been updated...\n", deployment.Name, deployment.Status.UpdatedReplicas, *deployment.Spec.Replicas), false, nil
}
if deployment.Status.Replicas > deployment.Status.UpdatedReplicas {
return fmt.Sprintf("Waiting for deployment %q rollout to finish: %d old replicas are pending termination...\n", deployment.Name, deployment.Status.Replicas-deployment.Status.UpdatedReplicas), false, nil
}
if deployment.Status.AvailableReplicas < deployment.Status.UpdatedReplicas {
return fmt.Sprintf("Waiting for deployment %q rollout to finish: %d of %d updated replicas are available...\n", deployment.Name, deployment.Status.AvailableReplicas, deployment.Status.UpdatedReplicas), false, nil
}
return fmt.Sprintf("deployment %q successfully rolled out\n", deployment.Name), true, nil
}
return fmt.Sprintf("Waiting for deployment spec update to be observed...\n"), false, nil
}

// DetailStatus returns a message describing deployment detail status, and a bool value indicating if the status is considered done.
func (s *DeploymentStatusViewer) DetailStatus(c kubernetes.Interface, obj runtime.Unstructured, detail bool, revision int64) (string, bool, error) {
deployment := &appsv1.Deployment{}
err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), deployment)
if err != nil {
Expand Down Expand Up @@ -107,7 +145,32 @@ func (s *DeploymentStatusViewer) Status(obj runtime.Unstructured, revision int64
}

// Status returns a message describing daemon set status, and a bool value indicating if the status is considered done.
func (s *DaemonSetStatusViewer) Status(obj runtime.Unstructured, revision int64) (string, bool, error) {
func (s *DaemonSetStatusViewer) Status(c kubernetes.Interface, obj runtime.Unstructured, revision int64) (string, bool, error) {
//ignoring revision as DaemonSets does not have history yet

daemon := &appsv1.DaemonSet{}
err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), daemon)
if err != nil {
return "", false, fmt.Errorf("failed to convert %T to %T: %v", obj, daemon, err)
}

if daemon.Spec.UpdateStrategy.Type != appsv1.RollingUpdateDaemonSetStrategyType {
return "", true, fmt.Errorf("rollout status is only available for %s strategy type", appsv1.RollingUpdateStatefulSetStrategyType)
}
if daemon.Generation <= daemon.Status.ObservedGeneration {
if daemon.Status.UpdatedNumberScheduled < daemon.Status.DesiredNumberScheduled {
return fmt.Sprintf("Waiting for daemon set %q rollout to finish: %d out of %d new pods have been updated...\n", daemon.Name, daemon.Status.UpdatedNumberScheduled, daemon.Status.DesiredNumberScheduled), false, nil
}
if daemon.Status.NumberAvailable < daemon.Status.DesiredNumberScheduled {
return fmt.Sprintf("Waiting for daemon set %q rollout to finish: %d of %d updated pods are available...\n", daemon.Name, daemon.Status.NumberAvailable, daemon.Status.DesiredNumberScheduled), false, nil
}
return fmt.Sprintf("daemon set %q successfully rolled out\n", daemon.Name), true, nil
}
return fmt.Sprintf("Waiting for daemon set spec update to be observed...\n"), false, nil
}

// DetailStatus returns a message describing daemon set status, and a bool value indicating if the status is considered done.
func (s *DaemonSetStatusViewer) DetailStatus(c kubernetes.Interface, obj runtime.Unstructured, detail bool, revision int64) (string, bool, error) {
//ignoring revision as DaemonSets does not have history yet

daemon := &appsv1.DaemonSet{}
Expand All @@ -132,7 +195,7 @@ func (s *DaemonSetStatusViewer) Status(obj runtime.Unstructured, revision int64)
}

// Status returns a message describing statefulset status, and a bool value indicating if the status is considered done.
func (s *StatefulSetStatusViewer) Status(obj runtime.Unstructured, revision int64) (string, bool, error) {
func (s *StatefulSetStatusViewer) Status(c kubernetes.Interface, obj runtime.Unstructured, revision int64) (string, bool, error) {
sts := &appsv1.StatefulSet{}
err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), sts)
if err != nil {
Expand Down Expand Up @@ -163,11 +226,43 @@ func (s *StatefulSetStatusViewer) Status(obj runtime.Unstructured, revision int6
sts.Status.UpdatedReplicas, sts.Status.UpdateRevision), false, nil
}
return fmt.Sprintf("statefulset rolling update complete %d pods at revision %s...\n", sts.Status.CurrentReplicas, sts.Status.CurrentRevision), true, nil
}

func (s *StatefulSetStatusViewer) DetailStatus(c kubernetes.Interface, obj runtime.Unstructured, detail bool, revision int64) (string, bool, error) {
sts := &appsv1.StatefulSet{}
err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), sts)
if err != nil {
return "", false, fmt.Errorf("failed to convert %T to %T: %v", obj, sts, err)
}

if sts.Spec.UpdateStrategy.Type != appsv1.RollingUpdateStatefulSetStrategyType {
return "", true, fmt.Errorf("rollout status is only available for %s strategy type", appsv1.RollingUpdateStatefulSetStrategyType)
}
if sts.Status.ObservedGeneration == 0 || sts.Generation > sts.Status.ObservedGeneration {
return "Waiting for statefulset spec update to be observed...\n", false, nil
}
if sts.Spec.Replicas != nil && sts.Status.ReadyReplicas < *sts.Spec.Replicas {
return fmt.Sprintf("Waiting for %d pods to be ready...\n", *sts.Spec.Replicas-sts.Status.ReadyReplicas), false, nil
}
if sts.Spec.UpdateStrategy.Type == appsv1.RollingUpdateStatefulSetStrategyType && sts.Spec.UpdateStrategy.RollingUpdate != nil {
if sts.Spec.Replicas != nil && sts.Spec.UpdateStrategy.RollingUpdate.Partition != nil {
if sts.Status.UpdatedReplicas < (*sts.Spec.Replicas - *sts.Spec.UpdateStrategy.RollingUpdate.Partition) {
return fmt.Sprintf("Waiting for partitioned roll out to finish: %d out of %d new pods have been updated...\n",
sts.Status.UpdatedReplicas, *sts.Spec.Replicas-*sts.Spec.UpdateStrategy.RollingUpdate.Partition), false, nil
}
}
return fmt.Sprintf("partitioned roll out complete: %d new pods have been updated...\n",
sts.Status.UpdatedReplicas), true, nil
}
if sts.Status.UpdateRevision != sts.Status.CurrentRevision {
return fmt.Sprintf("waiting for statefulset rolling update to complete %d pods at revision %s...\n",
sts.Status.UpdatedReplicas, sts.Status.UpdateRevision), false, nil
}
return fmt.Sprintf("statefulset rolling update complete %d pods at revision %s...\n", sts.Status.CurrentReplicas, sts.Status.CurrentRevision), true, nil
}

// Status returns a message describing cloneset status, and a bool value indicating if the status is considered done.
func (s *CloneSetStatusViewer) Status(obj runtime.Unstructured, revision int64) (string, bool, error) {
func (s *CloneSetStatusViewer) Status(c kubernetes.Interface, obj runtime.Unstructured, revision int64) (string, bool, error) {
cs := &kruiseappsv1alpha1.CloneSet{}
err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), cs)
if err != nil {
Expand All @@ -178,11 +273,8 @@ func (s *CloneSetStatusViewer) Status(obj runtime.Unstructured, revision int64)
if cs.Spec.UpdateStrategy.Type == kruiseappsv1alpha1.InPlaceOnlyCloneSetUpdateStrategyType ||
cs.Spec.UpdateStrategy.Type == kruiseappsv1alpha1.InPlaceIfPossibleCloneSetUpdateStrategyType {
if cs.Spec.Replicas != nil && cs.Spec.UpdateStrategy.Partition != nil {
if cs.Status.UpdatedReplicas < (*cs.Spec.Replicas - cs.Spec.UpdateStrategy.Partition.IntVal) {
return fmt.Sprintf("Waiting for partitioned roll out to finish: %d out of %d new pods have been updated...\n",
cs.Status.UpdatedReplicas, *cs.Spec.Replicas-cs.Spec.UpdateStrategy.Partition.IntVal), false, nil

}
return fmt.Sprintf("Waiting for partitioned roll out to finish: %d out of %d new pods have been updated...\n",
cs.Status.UpdatedReplicas, *cs.Spec.Replicas-cs.Spec.UpdateStrategy.Partition.IntVal), false, nil
}
}

Expand All @@ -196,8 +288,39 @@ func (s *CloneSetStatusViewer) Status(obj runtime.Unstructured, revision int64)
return fmt.Sprintf("CloneSet rolling update complete %d pods at revision %s...\n", cs.Status.AvailableReplicas, cs.Status.UpdateRevision), true, nil
}

// DetailStatus returns a message describing cloneset detail status, and a bool value indicating if the status is considered done.
func (s *CloneSetStatusViewer) DetailStatus(c kubernetes.Interface, obj runtime.Unstructured, detail bool, revision int64) (string, bool, error) {
cs := &kruiseappsv1alpha1.CloneSet{}
err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), cs)
if err != nil {
return "", false, fmt.Errorf("failed to convert %T to %T: %v", obj, cs, err)
}

// check InPlaceOnly and InPlacePossible UpdateStrategy
if cs.Spec.UpdateStrategy.Type == kruiseappsv1alpha1.InPlaceOnlyCloneSetUpdateStrategyType ||
cs.Spec.UpdateStrategy.Type == kruiseappsv1alpha1.InPlaceIfPossibleCloneSetUpdateStrategyType {
if cs.Spec.Replicas != nil && cs.Spec.UpdateStrategy.Partition != nil {
if cs.Status.UpdatedReplicas < (*cs.Spec.Replicas - cs.Spec.UpdateStrategy.Partition.IntVal) {
return fmt.Sprintf("CloneSet %s Waiting for partitioned roll out to finish: %d out of %d new pods have been updated...\n%s",
cs.Name, cs.Status.UpdatedReplicas, *cs.Spec.Replicas-cs.Spec.UpdateStrategy.Partition.IntVal, generatePodsInfoForCloneSet(c, cs)), false, nil
}
}
}

if cs.Status.ObservedGeneration == 0 || cs.Generation > cs.Status.ObservedGeneration {
return fmt.Sprintf("Waiting for CloneSet %s spec update to be observed...\n", cs.Name), false, nil
}
if cs.Spec.Replicas != nil && cs.Status.ReadyReplicas < *cs.Spec.Replicas {
return fmt.Sprintf("Waiting for %d pods to be ready...\n%s", *cs.Spec.Replicas-cs.Status.ReadyReplicas,
generatePodsInfoForCloneSet(c, cs)), false, nil
}

return fmt.Sprintf("CloneSet %s rolling update complete %d pods at revision %s...\n%s",
cs.Name, cs.Status.AvailableReplicas, cs.Status.UpdateRevision, generatePodsInfoForCloneSet(c, cs)), true, nil
}

// Status returns a message describing advanced statefulset status, and a bool value indicating if the status is considered done.
func (s *AdvancedStatefulSetStatusViewer) Status(obj runtime.Unstructured, revision int64) (string, bool, error) {
func (s *AdvancedStatefulSetStatusViewer) Status(c kubernetes.Interface, obj runtime.Unstructured, revision int64) (string, bool, error) {
asts := &kruiseappsv1beta1.StatefulSet{}
err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), asts)
if err != nil {
Expand All @@ -222,5 +345,32 @@ func (s *AdvancedStatefulSetStatusViewer) Status(obj runtime.Unstructured, revis
return fmt.Sprintf("Waiting for %d pods to be ready...\n", *asts.Spec.Replicas-asts.Status.ReadyReplicas), false, nil
}
return fmt.Sprintf("Advanced StatefulSet rolling update complete %d pods at revision %s...\n", asts.Status.AvailableReplicas, asts.Status.UpdateRevision), true, nil
}

// DetailStatus returns a message describing advanced statefulset status, and a bool value indicating if the status is considered done.
func (s *AdvancedStatefulSetStatusViewer) DetailStatus(c kubernetes.Interface, obj runtime.Unstructured, detail bool, revision int64) (string, bool, error) {
asts := &kruiseappsv1beta1.StatefulSet{}
err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), asts)
if err != nil {
return "", false, fmt.Errorf("failed to convert %T to %T: %v", obj, asts, err)
}

// check InPlaceOnly and InPlacePossible UpdateStrategy
if asts.Spec.UpdateStrategy.Type == appsv1.RollingUpdateStatefulSetStrategyType {
if asts.Spec.Replicas != nil && asts.Spec.UpdateStrategy.RollingUpdate.Partition != nil {
if asts.Status.UpdatedReplicas < (*asts.Spec.Replicas - *asts.Spec.UpdateStrategy.RollingUpdate.Partition) {
return fmt.Sprintf("Waiting for partitioned roll out to finish:%d out of %d new pods has been updated...\n",
asts.Status.UpdatedReplicas, *asts.Spec.Replicas-*asts.Spec.UpdateStrategy.RollingUpdate.Partition), false, nil
}
}
}

if asts.Status.ObservedGeneration == 0 || asts.Generation > asts.Status.ObservedGeneration {
return "Waiting for Advanced StatefulSet spec update to be observed...\n", false, nil
}

if asts.Spec.Replicas != nil && asts.Status.ReadyReplicas < *asts.Spec.Replicas {
return fmt.Sprintf("Waiting for %d pods to be ready...\n", *asts.Spec.Replicas-asts.Status.ReadyReplicas), false, nil
}
return fmt.Sprintf("Advanced StatefulSet rolling update complete %d pods at revision %s...\n", asts.Status.AvailableReplicas, asts.Status.UpdateRevision), true, nil
}
74 changes: 74 additions & 0 deletions pkg/internal/polymorphichelpers/rollout_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package polymorphichelpers

import (
"context"
"fmt"

kruiseappsv1alpha1 "github.com/openkruise/kruise-api/apps/v1alpha1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
)

func getPodsByLabelSelector(client kubernetes.Interface, ns string, labelSelector *metav1.LabelSelector) ([]*corev1.Pod, error) {
var podsList []*corev1.Pod
pods, err := client.CoreV1().Pods(ns).List(context.TODO(), metav1.ListOptions{LabelSelector: labels.Set(labelSelector.MatchLabels).String()})
if err != nil {
return nil, err
}

for i := range pods.Items {
podsList = append(podsList, &pods.Items[i])
}

return podsList, nil
}

func filterOldNewReadyPodsFromCloneSet(client kubernetes.Interface, clone *kruiseappsv1alpha1.CloneSet) (oldPods []*corev1.Pod,
newNotReadyPods []*corev1.Pod, updatedReadyPods []*corev1.Pod, err error) {
pods, err := getPodsByLabelSelector(client, clone.Namespace, clone.Spec.Selector)
if err != nil {
return
}

for i := range pods {
if podRevision, ok := pods[i].GetLabels()["controller-revision-hash"]; ok {
if podRevision == clone.Status.UpdateRevision {
if podReady(pods[i]) {
updatedReadyPods = append(updatedReadyPods, pods[i])
} else {
newNotReadyPods = append(newNotReadyPods, pods[i])
}
} else {
oldPods = append(oldPods, pods[i])
}
}
}
return
}
func podReady(p *corev1.Pod) bool {
cs := p.Status.Conditions
for _, c := range cs {
if c.Type == corev1.PodReady {
return c.Status == corev1.ConditionTrue
}
}
return false
}

func generatePodsInfoForCloneSet(client kubernetes.Interface, clone *kruiseappsv1alpha1.CloneSet) string {
var notReadyPodsSlice, ReadyPodsSlice []string
_, notReadyNewPods, readyNewPods, err := filterOldNewReadyPodsFromCloneSet(client, clone)
if err != nil {
return ""
}
for i := range notReadyNewPods {
notReadyPodsSlice = append(notReadyPodsSlice, notReadyNewPods[i].Name)
}
for j := range readyNewPods {
ReadyPodsSlice = append(ReadyPodsSlice, readyNewPods[j].Name)
}

return fmt.Sprintf("Updated ready pods: %v\nUpdated not ready pods: %v\n", ReadyPodsSlice, notReadyPodsSlice)
}

0 comments on commit e1b55f7

Please sign in to comment.