Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add more info for rollout status cloneset #65

Merged
merged 2 commits into from
Jul 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions pkg/cmd/rollout/rollout_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/cli-runtime/pkg/resource"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
watchtools "k8s.io/client-go/tools/watch"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
Expand Down Expand Up @@ -77,10 +78,12 @@ type RolloutStatusOptions struct {
Watch bool
Revision int64
Timeout time.Duration
Detail bool

StatusViewerFn func(*meta.RESTMapping) (internalpolymorphichelpers.StatusViewer, error)
Builder func() *resource.Builder
DynamicClient dynamic.Interface
ClientSet kubernetes.Interface

FilenameOptions *resource.FilenameOptions
genericclioptions.IOStreams
Expand All @@ -94,6 +97,7 @@ func NewRolloutStatusOptions(streams genericclioptions.IOStreams) *RolloutStatus
IOStreams: streams,
Watch: true,
Timeout: 0,
Detail: false,
}
}

Expand Down Expand Up @@ -122,6 +126,7 @@ func NewCmdRolloutStatus(f cmdutil.Factory, streams genericclioptions.IOStreams)
cmd.Flags().BoolVarP(&o.Watch, "watch", "w", o.Watch, "Watch the status of the rollout until it's done.")
cmd.Flags().Int64Var(&o.Revision, "revision", o.Revision, "Pin to a specific revision for showing its status. Defaults to 0 (last revision).")
cmd.Flags().DurationVar(&o.Timeout, "timeout", o.Timeout, "The length of time to wait before ending watch, zero means never. Any other values should contain a corresponding time unit (e.g. 1s, 2m, 3h).")
cmd.Flags().BoolVarP(&o.Detail, "detail", "d", o.Detail, "Show the detail status of the rollout.")

return cmd
}
Expand Down Expand Up @@ -149,6 +154,11 @@ func (o *RolloutStatusOptions) Complete(f cmdutil.Factory, args []string) error
return err
}

o.ClientSet, err = kubernetes.NewForConfig(clientConfig)
if err != nil {
return err
}

return nil
}

Expand Down Expand Up @@ -224,17 +234,23 @@ func (o *RolloutStatusOptions) Run() error {
// if the rollout isn't done yet, keep watching deployment status
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), o.Timeout)
intr := interrupt.New(nil, cancel)
var status string
var consideredDone bool
return intr.Run(func() error {
_, err = watchtools.UntilWithSync(ctx, lw, &unstructured.Unstructured{}, preconditionFunc, func(e watch.Event) (bool, error) {
switch t := e.Type; t {
case watch.Added, watch.Modified:
status, done, err := statusViewer.Status(e.Object.(runtime.Unstructured), o.Revision)
if o.Detail {
status, consideredDone, err = statusViewer.DetailStatus(o.ClientSet, e.Object.(runtime.Unstructured), o.Detail, o.Revision)
} else {
status, consideredDone, err = statusViewer.Status(o.ClientSet, e.Object.(runtime.Unstructured), o.Revision)
}
if err != nil {
return false, err
}
fmt.Fprintf(o.Out, "%s", status)
// Quit waiting if the rollout is done
if done {
if consideredDone {
return true, nil
}

Expand Down
172 changes: 161 additions & 11 deletions pkg/internal/polymorphichelpers/rollout_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@ import (
extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes"
deploymentutil "k8s.io/kubectl/pkg/util/deployment"
)

// StatusViewer provides an interface for resources that have rollout status.
type StatusViewer interface {
Status(obj runtime.Unstructured, revision int64) (string, bool, error)
Status(c kubernetes.Interface, obj runtime.Unstructured, revision int64) (string, bool, error)
DetailStatus(c kubernetes.Interface, obj runtime.Unstructured, detail bool, revision int64) (string, bool, error)
}

// StatusViewerFor returns a StatusViewer for the resource specified by kind.
Expand Down Expand Up @@ -71,7 +73,43 @@ type CloneSetStatusViewer struct{}
type AdvancedStatefulSetStatusViewer struct{}

// Status returns a message describing deployment status, and a bool value indicating if the status is considered done.
func (s *DeploymentStatusViewer) Status(obj runtime.Unstructured, revision int64) (string, bool, error) {
func (s *DeploymentStatusViewer) Status(c kubernetes.Interface, obj runtime.Unstructured, revision int64) (string, bool, error) {
deployment := &appsv1.Deployment{}
err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), deployment)
if err != nil {
return "", false, fmt.Errorf("failed to convert %T to %T: %v", obj, deployment, err)
}

if revision > 0 {
deploymentRev, err := deploymentutil.Revision(deployment)
if err != nil {
return "", false, fmt.Errorf("cannot get the revision of deployment %q: %v", deployment.Name, err)
}
if revision != deploymentRev {
return "", false, fmt.Errorf("desired revision (%d) is different from the running revision (%d)", revision, deploymentRev)
}
}
if deployment.Generation <= deployment.Status.ObservedGeneration {
cond := deploymentutil.GetDeploymentCondition(deployment.Status, appsv1.DeploymentProgressing)
if cond != nil && cond.Reason == deploymentutil.TimedOutReason {
return "", false, fmt.Errorf("deployment %q exceeded its progress deadline", deployment.Name)
}
if deployment.Spec.Replicas != nil && deployment.Status.UpdatedReplicas < *deployment.Spec.Replicas {
return fmt.Sprintf("Waiting for deployment %q rollout to finish: %d out of %d new replicas have been updated...\n", deployment.Name, deployment.Status.UpdatedReplicas, *deployment.Spec.Replicas), false, nil
}
if deployment.Status.Replicas > deployment.Status.UpdatedReplicas {
return fmt.Sprintf("Waiting for deployment %q rollout to finish: %d old replicas are pending termination...\n", deployment.Name, deployment.Status.Replicas-deployment.Status.UpdatedReplicas), false, nil
}
if deployment.Status.AvailableReplicas < deployment.Status.UpdatedReplicas {
return fmt.Sprintf("Waiting for deployment %q rollout to finish: %d of %d updated replicas are available...\n", deployment.Name, deployment.Status.AvailableReplicas, deployment.Status.UpdatedReplicas), false, nil
}
return fmt.Sprintf("deployment %q successfully rolled out\n", deployment.Name), true, nil
}
return fmt.Sprintf("Waiting for deployment spec update to be observed...\n"), false, nil
}

// DetailStatus returns a message describing deployment detail status, and a bool value indicating if the status is considered done.
func (s *DeploymentStatusViewer) DetailStatus(c kubernetes.Interface, obj runtime.Unstructured, detail bool, revision int64) (string, bool, error) {
deployment := &appsv1.Deployment{}
err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), deployment)
if err != nil {
Expand Down Expand Up @@ -107,7 +145,32 @@ func (s *DeploymentStatusViewer) Status(obj runtime.Unstructured, revision int64
}

// Status returns a message describing daemon set status, and a bool value indicating if the status is considered done.
func (s *DaemonSetStatusViewer) Status(obj runtime.Unstructured, revision int64) (string, bool, error) {
func (s *DaemonSetStatusViewer) Status(c kubernetes.Interface, obj runtime.Unstructured, revision int64) (string, bool, error) {
//ignoring revision as DaemonSets does not have history yet

daemon := &appsv1.DaemonSet{}
err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), daemon)
if err != nil {
return "", false, fmt.Errorf("failed to convert %T to %T: %v", obj, daemon, err)
}

if daemon.Spec.UpdateStrategy.Type != appsv1.RollingUpdateDaemonSetStrategyType {
return "", true, fmt.Errorf("rollout status is only available for %s strategy type", appsv1.RollingUpdateStatefulSetStrategyType)
}
if daemon.Generation <= daemon.Status.ObservedGeneration {
if daemon.Status.UpdatedNumberScheduled < daemon.Status.DesiredNumberScheduled {
return fmt.Sprintf("Waiting for daemon set %q rollout to finish: %d out of %d new pods have been updated...\n", daemon.Name, daemon.Status.UpdatedNumberScheduled, daemon.Status.DesiredNumberScheduled), false, nil
}
if daemon.Status.NumberAvailable < daemon.Status.DesiredNumberScheduled {
return fmt.Sprintf("Waiting for daemon set %q rollout to finish: %d of %d updated pods are available...\n", daemon.Name, daemon.Status.NumberAvailable, daemon.Status.DesiredNumberScheduled), false, nil
}
return fmt.Sprintf("daemon set %q successfully rolled out\n", daemon.Name), true, nil
}
return fmt.Sprintf("Waiting for daemon set spec update to be observed...\n"), false, nil
}

// DetailStatus returns a message describing daemon set status, and a bool value indicating if the status is considered done.
func (s *DaemonSetStatusViewer) DetailStatus(c kubernetes.Interface, obj runtime.Unstructured, detail bool, revision int64) (string, bool, error) {
//ignoring revision as DaemonSets does not have history yet

daemon := &appsv1.DaemonSet{}
Expand All @@ -132,7 +195,7 @@ func (s *DaemonSetStatusViewer) Status(obj runtime.Unstructured, revision int64)
}

// Status returns a message describing statefulset status, and a bool value indicating if the status is considered done.
func (s *StatefulSetStatusViewer) Status(obj runtime.Unstructured, revision int64) (string, bool, error) {
func (s *StatefulSetStatusViewer) Status(c kubernetes.Interface, obj runtime.Unstructured, revision int64) (string, bool, error) {
sts := &appsv1.StatefulSet{}
err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), sts)
if err != nil {
Expand Down Expand Up @@ -163,11 +226,43 @@ func (s *StatefulSetStatusViewer) Status(obj runtime.Unstructured, revision int6
sts.Status.UpdatedReplicas, sts.Status.UpdateRevision), false, nil
}
return fmt.Sprintf("statefulset rolling update complete %d pods at revision %s...\n", sts.Status.CurrentReplicas, sts.Status.CurrentRevision), true, nil
}

func (s *StatefulSetStatusViewer) DetailStatus(c kubernetes.Interface, obj runtime.Unstructured, detail bool, revision int64) (string, bool, error) {
sts := &appsv1.StatefulSet{}
err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), sts)
if err != nil {
return "", false, fmt.Errorf("failed to convert %T to %T: %v", obj, sts, err)
}

if sts.Spec.UpdateStrategy.Type != appsv1.RollingUpdateStatefulSetStrategyType {
return "", true, fmt.Errorf("rollout status is only available for %s strategy type", appsv1.RollingUpdateStatefulSetStrategyType)
}
if sts.Status.ObservedGeneration == 0 || sts.Generation > sts.Status.ObservedGeneration {
return "Waiting for statefulset spec update to be observed...\n", false, nil
}
if sts.Spec.Replicas != nil && sts.Status.ReadyReplicas < *sts.Spec.Replicas {
return fmt.Sprintf("Waiting for %d pods to be ready...\n", *sts.Spec.Replicas-sts.Status.ReadyReplicas), false, nil
}
if sts.Spec.UpdateStrategy.Type == appsv1.RollingUpdateStatefulSetStrategyType && sts.Spec.UpdateStrategy.RollingUpdate != nil {
if sts.Spec.Replicas != nil && sts.Spec.UpdateStrategy.RollingUpdate.Partition != nil {
if sts.Status.UpdatedReplicas < (*sts.Spec.Replicas - *sts.Spec.UpdateStrategy.RollingUpdate.Partition) {
return fmt.Sprintf("Waiting for partitioned roll out to finish: %d out of %d new pods have been updated...\n",
sts.Status.UpdatedReplicas, *sts.Spec.Replicas-*sts.Spec.UpdateStrategy.RollingUpdate.Partition), false, nil
}
}
return fmt.Sprintf("partitioned roll out complete: %d new pods have been updated...\n",
sts.Status.UpdatedReplicas), true, nil
}
if sts.Status.UpdateRevision != sts.Status.CurrentRevision {
return fmt.Sprintf("waiting for statefulset rolling update to complete %d pods at revision %s...\n",
sts.Status.UpdatedReplicas, sts.Status.UpdateRevision), false, nil
}
return fmt.Sprintf("statefulset rolling update complete %d pods at revision %s...\n", sts.Status.CurrentReplicas, sts.Status.CurrentRevision), true, nil
}

// Status returns a message describing cloneset status, and a bool value indicating if the status is considered done.
func (s *CloneSetStatusViewer) Status(obj runtime.Unstructured, revision int64) (string, bool, error) {
func (s *CloneSetStatusViewer) Status(c kubernetes.Interface, obj runtime.Unstructured, revision int64) (string, bool, error) {
cs := &kruiseappsv1alpha1.CloneSet{}
err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), cs)
if err != nil {
Expand All @@ -178,11 +273,8 @@ func (s *CloneSetStatusViewer) Status(obj runtime.Unstructured, revision int64)
if cs.Spec.UpdateStrategy.Type == kruiseappsv1alpha1.InPlaceOnlyCloneSetUpdateStrategyType ||
cs.Spec.UpdateStrategy.Type == kruiseappsv1alpha1.InPlaceIfPossibleCloneSetUpdateStrategyType {
if cs.Spec.Replicas != nil && cs.Spec.UpdateStrategy.Partition != nil {
if cs.Status.UpdatedReplicas < (*cs.Spec.Replicas - cs.Spec.UpdateStrategy.Partition.IntVal) {
return fmt.Sprintf("Waiting for partitioned roll out to finish: %d out of %d new pods have been updated...\n",
cs.Status.UpdatedReplicas, *cs.Spec.Replicas-cs.Spec.UpdateStrategy.Partition.IntVal), false, nil

}
return fmt.Sprintf("Waiting for partitioned roll out to finish: %d out of %d new pods have been updated...\n",
cs.Status.UpdatedReplicas, *cs.Spec.Replicas-cs.Spec.UpdateStrategy.Partition.IntVal), false, nil
}
}

Expand All @@ -196,8 +288,39 @@ func (s *CloneSetStatusViewer) Status(obj runtime.Unstructured, revision int64)
return fmt.Sprintf("CloneSet rolling update complete %d pods at revision %s...\n", cs.Status.AvailableReplicas, cs.Status.UpdateRevision), true, nil
}

// DetailStatus returns a message describing cloneset detail status, and a bool value indicating if the status is considered done.
func (s *CloneSetStatusViewer) DetailStatus(c kubernetes.Interface, obj runtime.Unstructured, detail bool, revision int64) (string, bool, error) {
cs := &kruiseappsv1alpha1.CloneSet{}
err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), cs)
if err != nil {
return "", false, fmt.Errorf("failed to convert %T to %T: %v", obj, cs, err)
}

// check InPlaceOnly and InPlacePossible UpdateStrategy
if cs.Spec.UpdateStrategy.Type == kruiseappsv1alpha1.InPlaceOnlyCloneSetUpdateStrategyType ||
cs.Spec.UpdateStrategy.Type == kruiseappsv1alpha1.InPlaceIfPossibleCloneSetUpdateStrategyType {
if cs.Spec.Replicas != nil && cs.Spec.UpdateStrategy.Partition != nil {
if cs.Status.UpdatedReplicas < (*cs.Spec.Replicas - cs.Spec.UpdateStrategy.Partition.IntVal) {
return fmt.Sprintf("CloneSet %s Waiting for partitioned roll out to finish: %d out of %d new pods have been updated...\n%s",
cs.Name, cs.Status.UpdatedReplicas, *cs.Spec.Replicas-cs.Spec.UpdateStrategy.Partition.IntVal, generatePodsInfoForCloneSet(c, cs)), false, nil
}
}
}

if cs.Status.ObservedGeneration == 0 || cs.Generation > cs.Status.ObservedGeneration {
return fmt.Sprintf("Waiting for CloneSet %s spec update to be observed...\n", cs.Name), false, nil
}
if cs.Spec.Replicas != nil && cs.Status.ReadyReplicas < *cs.Spec.Replicas {
return fmt.Sprintf("Waiting for %d pods to be ready...\n%s", *cs.Spec.Replicas-cs.Status.ReadyReplicas,
generatePodsInfoForCloneSet(c, cs)), false, nil
}

return fmt.Sprintf("CloneSet %s rolling update complete %d pods at revision %s...\n%s",
cs.Name, cs.Status.AvailableReplicas, cs.Status.UpdateRevision, generatePodsInfoForCloneSet(c, cs)), true, nil
}

// Status returns a message describing advanced statefulset status, and a bool value indicating if the status is considered done.
func (s *AdvancedStatefulSetStatusViewer) Status(obj runtime.Unstructured, revision int64) (string, bool, error) {
func (s *AdvancedStatefulSetStatusViewer) Status(c kubernetes.Interface, obj runtime.Unstructured, revision int64) (string, bool, error) {
asts := &kruiseappsv1beta1.StatefulSet{}
err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), asts)
if err != nil {
Expand All @@ -222,5 +345,32 @@ func (s *AdvancedStatefulSetStatusViewer) Status(obj runtime.Unstructured, revis
return fmt.Sprintf("Waiting for %d pods to be ready...\n", *asts.Spec.Replicas-asts.Status.ReadyReplicas), false, nil
}
return fmt.Sprintf("Advanced StatefulSet rolling update complete %d pods at revision %s...\n", asts.Status.AvailableReplicas, asts.Status.UpdateRevision), true, nil
}

// DetailStatus returns a message describing advanced statefulset status, and a bool value indicating if the status is considered done.
func (s *AdvancedStatefulSetStatusViewer) DetailStatus(c kubernetes.Interface, obj runtime.Unstructured, detail bool, revision int64) (string, bool, error) {
asts := &kruiseappsv1beta1.StatefulSet{}
err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), asts)
if err != nil {
return "", false, fmt.Errorf("failed to convert %T to %T: %v", obj, asts, err)
}

// check InPlaceOnly and InPlacePossible UpdateStrategy
if asts.Spec.UpdateStrategy.Type == appsv1.RollingUpdateStatefulSetStrategyType {
if asts.Spec.Replicas != nil && asts.Spec.UpdateStrategy.RollingUpdate.Partition != nil {
if asts.Status.UpdatedReplicas < (*asts.Spec.Replicas - *asts.Spec.UpdateStrategy.RollingUpdate.Partition) {
return fmt.Sprintf("Waiting for partitioned roll out to finish:%d out of %d new pods has been updated...\n",
asts.Status.UpdatedReplicas, *asts.Spec.Replicas-*asts.Spec.UpdateStrategy.RollingUpdate.Partition), false, nil
}
}
}

if asts.Status.ObservedGeneration == 0 || asts.Generation > asts.Status.ObservedGeneration {
return "Waiting for Advanced StatefulSet spec update to be observed...\n", false, nil
}

if asts.Spec.Replicas != nil && asts.Status.ReadyReplicas < *asts.Spec.Replicas {
return fmt.Sprintf("Waiting for %d pods to be ready...\n", *asts.Spec.Replicas-asts.Status.ReadyReplicas), false, nil
}
return fmt.Sprintf("Advanced StatefulSet rolling update complete %d pods at revision %s...\n", asts.Status.AvailableReplicas, asts.Status.UpdateRevision), true, nil
}
74 changes: 74 additions & 0 deletions pkg/internal/polymorphichelpers/rollout_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package polymorphichelpers

import (
"context"
"fmt"

kruiseappsv1alpha1 "github.com/openkruise/kruise-api/apps/v1alpha1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
)

func getPodsByLabelSelector(client kubernetes.Interface, ns string, labelSelector *metav1.LabelSelector) ([]*corev1.Pod, error) {
var podsList []*corev1.Pod
pods, err := client.CoreV1().Pods(ns).List(context.TODO(), metav1.ListOptions{LabelSelector: labels.Set(labelSelector.MatchLabels).String()})
if err != nil {
return nil, err
}

for i := range pods.Items {
podsList = append(podsList, &pods.Items[i])
}

return podsList, nil
}

func filterOldNewReadyPodsFromCloneSet(client kubernetes.Interface, clone *kruiseappsv1alpha1.CloneSet) (oldPods []*corev1.Pod,
newNotReadyPods []*corev1.Pod, updatedReadyPods []*corev1.Pod, err error) {
pods, err := getPodsByLabelSelector(client, clone.Namespace, clone.Spec.Selector)
if err != nil {
return
}

for i := range pods {
if podRevision, ok := pods[i].GetLabels()["controller-revision-hash"]; ok {
if podRevision == clone.Status.UpdateRevision {
if podReady(pods[i]) {
updatedReadyPods = append(updatedReadyPods, pods[i])
} else {
newNotReadyPods = append(newNotReadyPods, pods[i])
}
} else {
oldPods = append(oldPods, pods[i])
}
}
}
return
}
func podReady(p *corev1.Pod) bool {
cs := p.Status.Conditions
for _, c := range cs {
if c.Type == corev1.PodReady {
return c.Status == corev1.ConditionTrue
}
}
return false
}

func generatePodsInfoForCloneSet(client kubernetes.Interface, clone *kruiseappsv1alpha1.CloneSet) string {
var notReadyPodsSlice, ReadyPodsSlice []string
_, notReadyNewPods, readyNewPods, err := filterOldNewReadyPodsFromCloneSet(client, clone)
if err != nil {
return ""
}
for i := range notReadyNewPods {
notReadyPodsSlice = append(notReadyPodsSlice, notReadyNewPods[i].Name)
}
for j := range readyNewPods {
ReadyPodsSlice = append(ReadyPodsSlice, readyNewPods[j].Name)
}

return fmt.Sprintf("Updated ready pods: %v\nUpdated not ready pods: %v\n", ReadyPodsSlice, notReadyPodsSlice)
}