diff --git a/pkg/cmd/rollout/rollout_status.go b/pkg/cmd/rollout/rollout_status.go index b6d4e38..33f035c 100644 --- a/pkg/cmd/rollout/rollout_status.go +++ b/pkg/cmd/rollout/rollout_status.go @@ -35,6 +35,7 @@ import ( "k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/cli-runtime/pkg/resource" "k8s.io/client-go/dynamic" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" watchtools "k8s.io/client-go/tools/watch" cmdutil "k8s.io/kubectl/pkg/cmd/util" @@ -77,10 +78,12 @@ type RolloutStatusOptions struct { Watch bool Revision int64 Timeout time.Duration + Detail bool StatusViewerFn func(*meta.RESTMapping) (internalpolymorphichelpers.StatusViewer, error) Builder func() *resource.Builder DynamicClient dynamic.Interface + ClientSet kubernetes.Interface FilenameOptions *resource.FilenameOptions genericclioptions.IOStreams @@ -94,6 +97,7 @@ func NewRolloutStatusOptions(streams genericclioptions.IOStreams) *RolloutStatus IOStreams: streams, Watch: true, Timeout: 0, + Detail: false, } } @@ -122,6 +126,7 @@ func NewCmdRolloutStatus(f cmdutil.Factory, streams genericclioptions.IOStreams) cmd.Flags().BoolVarP(&o.Watch, "watch", "w", o.Watch, "Watch the status of the rollout until it's done.") cmd.Flags().Int64Var(&o.Revision, "revision", o.Revision, "Pin to a specific revision for showing its status. Defaults to 0 (last revision).") cmd.Flags().DurationVar(&o.Timeout, "timeout", o.Timeout, "The length of time to wait before ending watch, zero means never. Any other values should contain a corresponding time unit (e.g. 1s, 2m, 3h).") + cmd.Flags().BoolVarP(&o.Detail, "detail", "d", o.Detail, "Show the detail status of the rollout.") return cmd } @@ -149,6 +154,11 @@ func (o *RolloutStatusOptions) Complete(f cmdutil.Factory, args []string) error return err } + o.ClientSet, err = kubernetes.NewForConfig(clientConfig) + if err != nil { + return err + } + return nil } @@ -224,17 +234,23 @@ func (o *RolloutStatusOptions) Run() error { // if the rollout isn't done yet, keep watching deployment status ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), o.Timeout) intr := interrupt.New(nil, cancel) + var status string + var consideredDone bool return intr.Run(func() error { _, err = watchtools.UntilWithSync(ctx, lw, &unstructured.Unstructured{}, preconditionFunc, func(e watch.Event) (bool, error) { switch t := e.Type; t { case watch.Added, watch.Modified: - status, done, err := statusViewer.Status(e.Object.(runtime.Unstructured), o.Revision) + if o.Detail { + status, consideredDone, err = statusViewer.DetailStatus(o.ClientSet, e.Object.(runtime.Unstructured), o.Detail, o.Revision) + } else { + status, consideredDone, err = statusViewer.Status(o.ClientSet, e.Object.(runtime.Unstructured), o.Revision) + } if err != nil { return false, err } fmt.Fprintf(o.Out, "%s", status) // Quit waiting if the rollout is done - if done { + if consideredDone { return true, nil } diff --git a/pkg/internal/polymorphichelpers/rollout_status.go b/pkg/internal/polymorphichelpers/rollout_status.go index f93d51e..eabbc1b 100644 --- a/pkg/internal/polymorphichelpers/rollout_status.go +++ b/pkg/internal/polymorphichelpers/rollout_status.go @@ -27,12 +27,14 @@ import ( extensionsv1beta1 "k8s.io/api/extensions/v1beta1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/kubernetes" deploymentutil "k8s.io/kubectl/pkg/util/deployment" ) // StatusViewer provides an interface for resources that have rollout status. type StatusViewer interface { - Status(obj runtime.Unstructured, revision int64) (string, bool, error) + Status(c kubernetes.Interface, obj runtime.Unstructured, revision int64) (string, bool, error) + DetailStatus(c kubernetes.Interface, obj runtime.Unstructured, detail bool, revision int64) (string, bool, error) } // StatusViewerFor returns a StatusViewer for the resource specified by kind. @@ -71,7 +73,43 @@ type CloneSetStatusViewer struct{} type AdvancedStatefulSetStatusViewer struct{} // Status returns a message describing deployment status, and a bool value indicating if the status is considered done. -func (s *DeploymentStatusViewer) Status(obj runtime.Unstructured, revision int64) (string, bool, error) { +func (s *DeploymentStatusViewer) Status(c kubernetes.Interface, obj runtime.Unstructured, revision int64) (string, bool, error) { + deployment := &appsv1.Deployment{} + err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), deployment) + if err != nil { + return "", false, fmt.Errorf("failed to convert %T to %T: %v", obj, deployment, err) + } + + if revision > 0 { + deploymentRev, err := deploymentutil.Revision(deployment) + if err != nil { + return "", false, fmt.Errorf("cannot get the revision of deployment %q: %v", deployment.Name, err) + } + if revision != deploymentRev { + return "", false, fmt.Errorf("desired revision (%d) is different from the running revision (%d)", revision, deploymentRev) + } + } + if deployment.Generation <= deployment.Status.ObservedGeneration { + cond := deploymentutil.GetDeploymentCondition(deployment.Status, appsv1.DeploymentProgressing) + if cond != nil && cond.Reason == deploymentutil.TimedOutReason { + return "", false, fmt.Errorf("deployment %q exceeded its progress deadline", deployment.Name) + } + if deployment.Spec.Replicas != nil && deployment.Status.UpdatedReplicas < *deployment.Spec.Replicas { + return fmt.Sprintf("Waiting for deployment %q rollout to finish: %d out of %d new replicas have been updated...\n", deployment.Name, deployment.Status.UpdatedReplicas, *deployment.Spec.Replicas), false, nil + } + if deployment.Status.Replicas > deployment.Status.UpdatedReplicas { + return fmt.Sprintf("Waiting for deployment %q rollout to finish: %d old replicas are pending termination...\n", deployment.Name, deployment.Status.Replicas-deployment.Status.UpdatedReplicas), false, nil + } + if deployment.Status.AvailableReplicas < deployment.Status.UpdatedReplicas { + return fmt.Sprintf("Waiting for deployment %q rollout to finish: %d of %d updated replicas are available...\n", deployment.Name, deployment.Status.AvailableReplicas, deployment.Status.UpdatedReplicas), false, nil + } + return fmt.Sprintf("deployment %q successfully rolled out\n", deployment.Name), true, nil + } + return fmt.Sprintf("Waiting for deployment spec update to be observed...\n"), false, nil +} + +// DetailStatus returns a message describing deployment detail status, and a bool value indicating if the status is considered done. +func (s *DeploymentStatusViewer) DetailStatus(c kubernetes.Interface, obj runtime.Unstructured, detail bool, revision int64) (string, bool, error) { deployment := &appsv1.Deployment{} err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), deployment) if err != nil { @@ -107,7 +145,32 @@ func (s *DeploymentStatusViewer) Status(obj runtime.Unstructured, revision int64 } // Status returns a message describing daemon set status, and a bool value indicating if the status is considered done. -func (s *DaemonSetStatusViewer) Status(obj runtime.Unstructured, revision int64) (string, bool, error) { +func (s *DaemonSetStatusViewer) Status(c kubernetes.Interface, obj runtime.Unstructured, revision int64) (string, bool, error) { + //ignoring revision as DaemonSets does not have history yet + + daemon := &appsv1.DaemonSet{} + err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), daemon) + if err != nil { + return "", false, fmt.Errorf("failed to convert %T to %T: %v", obj, daemon, err) + } + + if daemon.Spec.UpdateStrategy.Type != appsv1.RollingUpdateDaemonSetStrategyType { + return "", true, fmt.Errorf("rollout status is only available for %s strategy type", appsv1.RollingUpdateStatefulSetStrategyType) + } + if daemon.Generation <= daemon.Status.ObservedGeneration { + if daemon.Status.UpdatedNumberScheduled < daemon.Status.DesiredNumberScheduled { + return fmt.Sprintf("Waiting for daemon set %q rollout to finish: %d out of %d new pods have been updated...\n", daemon.Name, daemon.Status.UpdatedNumberScheduled, daemon.Status.DesiredNumberScheduled), false, nil + } + if daemon.Status.NumberAvailable < daemon.Status.DesiredNumberScheduled { + return fmt.Sprintf("Waiting for daemon set %q rollout to finish: %d of %d updated pods are available...\n", daemon.Name, daemon.Status.NumberAvailable, daemon.Status.DesiredNumberScheduled), false, nil + } + return fmt.Sprintf("daemon set %q successfully rolled out\n", daemon.Name), true, nil + } + return fmt.Sprintf("Waiting for daemon set spec update to be observed...\n"), false, nil +} + +// DetailStatus returns a message describing daemon set status, and a bool value indicating if the status is considered done. +func (s *DaemonSetStatusViewer) DetailStatus(c kubernetes.Interface, obj runtime.Unstructured, detail bool, revision int64) (string, bool, error) { //ignoring revision as DaemonSets does not have history yet daemon := &appsv1.DaemonSet{} @@ -132,7 +195,7 @@ func (s *DaemonSetStatusViewer) Status(obj runtime.Unstructured, revision int64) } // Status returns a message describing statefulset status, and a bool value indicating if the status is considered done. -func (s *StatefulSetStatusViewer) Status(obj runtime.Unstructured, revision int64) (string, bool, error) { +func (s *StatefulSetStatusViewer) Status(c kubernetes.Interface, obj runtime.Unstructured, revision int64) (string, bool, error) { sts := &appsv1.StatefulSet{} err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), sts) if err != nil { @@ -163,11 +226,43 @@ func (s *StatefulSetStatusViewer) Status(obj runtime.Unstructured, revision int6 sts.Status.UpdatedReplicas, sts.Status.UpdateRevision), false, nil } return fmt.Sprintf("statefulset rolling update complete %d pods at revision %s...\n", sts.Status.CurrentReplicas, sts.Status.CurrentRevision), true, nil +} +func (s *StatefulSetStatusViewer) DetailStatus(c kubernetes.Interface, obj runtime.Unstructured, detail bool, revision int64) (string, bool, error) { + sts := &appsv1.StatefulSet{} + err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), sts) + if err != nil { + return "", false, fmt.Errorf("failed to convert %T to %T: %v", obj, sts, err) + } + + if sts.Spec.UpdateStrategy.Type != appsv1.RollingUpdateStatefulSetStrategyType { + return "", true, fmt.Errorf("rollout status is only available for %s strategy type", appsv1.RollingUpdateStatefulSetStrategyType) + } + if sts.Status.ObservedGeneration == 0 || sts.Generation > sts.Status.ObservedGeneration { + return "Waiting for statefulset spec update to be observed...\n", false, nil + } + if sts.Spec.Replicas != nil && sts.Status.ReadyReplicas < *sts.Spec.Replicas { + return fmt.Sprintf("Waiting for %d pods to be ready...\n", *sts.Spec.Replicas-sts.Status.ReadyReplicas), false, nil + } + if sts.Spec.UpdateStrategy.Type == appsv1.RollingUpdateStatefulSetStrategyType && sts.Spec.UpdateStrategy.RollingUpdate != nil { + if sts.Spec.Replicas != nil && sts.Spec.UpdateStrategy.RollingUpdate.Partition != nil { + if sts.Status.UpdatedReplicas < (*sts.Spec.Replicas - *sts.Spec.UpdateStrategy.RollingUpdate.Partition) { + return fmt.Sprintf("Waiting for partitioned roll out to finish: %d out of %d new pods have been updated...\n", + sts.Status.UpdatedReplicas, *sts.Spec.Replicas-*sts.Spec.UpdateStrategy.RollingUpdate.Partition), false, nil + } + } + return fmt.Sprintf("partitioned roll out complete: %d new pods have been updated...\n", + sts.Status.UpdatedReplicas), true, nil + } + if sts.Status.UpdateRevision != sts.Status.CurrentRevision { + return fmt.Sprintf("waiting for statefulset rolling update to complete %d pods at revision %s...\n", + sts.Status.UpdatedReplicas, sts.Status.UpdateRevision), false, nil + } + return fmt.Sprintf("statefulset rolling update complete %d pods at revision %s...\n", sts.Status.CurrentReplicas, sts.Status.CurrentRevision), true, nil } // Status returns a message describing cloneset status, and a bool value indicating if the status is considered done. -func (s *CloneSetStatusViewer) Status(obj runtime.Unstructured, revision int64) (string, bool, error) { +func (s *CloneSetStatusViewer) Status(c kubernetes.Interface, obj runtime.Unstructured, revision int64) (string, bool, error) { cs := &kruiseappsv1alpha1.CloneSet{} err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), cs) if err != nil { @@ -178,11 +273,8 @@ func (s *CloneSetStatusViewer) Status(obj runtime.Unstructured, revision int64) if cs.Spec.UpdateStrategy.Type == kruiseappsv1alpha1.InPlaceOnlyCloneSetUpdateStrategyType || cs.Spec.UpdateStrategy.Type == kruiseappsv1alpha1.InPlaceIfPossibleCloneSetUpdateStrategyType { if cs.Spec.Replicas != nil && cs.Spec.UpdateStrategy.Partition != nil { - if cs.Status.UpdatedReplicas < (*cs.Spec.Replicas - cs.Spec.UpdateStrategy.Partition.IntVal) { - return fmt.Sprintf("Waiting for partitioned roll out to finish: %d out of %d new pods have been updated...\n", - cs.Status.UpdatedReplicas, *cs.Spec.Replicas-cs.Spec.UpdateStrategy.Partition.IntVal), false, nil - - } + return fmt.Sprintf("Waiting for partitioned roll out to finish: %d out of %d new pods have been updated...\n", + cs.Status.UpdatedReplicas, *cs.Spec.Replicas-cs.Spec.UpdateStrategy.Partition.IntVal), false, nil } } @@ -196,8 +288,39 @@ func (s *CloneSetStatusViewer) Status(obj runtime.Unstructured, revision int64) return fmt.Sprintf("CloneSet rolling update complete %d pods at revision %s...\n", cs.Status.AvailableReplicas, cs.Status.UpdateRevision), true, nil } +// DetailStatus returns a message describing cloneset detail status, and a bool value indicating if the status is considered done. +func (s *CloneSetStatusViewer) DetailStatus(c kubernetes.Interface, obj runtime.Unstructured, detail bool, revision int64) (string, bool, error) { + cs := &kruiseappsv1alpha1.CloneSet{} + err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), cs) + if err != nil { + return "", false, fmt.Errorf("failed to convert %T to %T: %v", obj, cs, err) + } + + // check InPlaceOnly and InPlacePossible UpdateStrategy + if cs.Spec.UpdateStrategy.Type == kruiseappsv1alpha1.InPlaceOnlyCloneSetUpdateStrategyType || + cs.Spec.UpdateStrategy.Type == kruiseappsv1alpha1.InPlaceIfPossibleCloneSetUpdateStrategyType { + if cs.Spec.Replicas != nil && cs.Spec.UpdateStrategy.Partition != nil { + if cs.Status.UpdatedReplicas < (*cs.Spec.Replicas - cs.Spec.UpdateStrategy.Partition.IntVal) { + return fmt.Sprintf("CloneSet %s Waiting for partitioned roll out to finish: %d out of %d new pods have been updated...\n%s", + cs.Name, cs.Status.UpdatedReplicas, *cs.Spec.Replicas-cs.Spec.UpdateStrategy.Partition.IntVal, generatePodsInfoForCloneSet(c, cs)), false, nil + } + } + } + + if cs.Status.ObservedGeneration == 0 || cs.Generation > cs.Status.ObservedGeneration { + return fmt.Sprintf("Waiting for CloneSet %s spec update to be observed...\n", cs.Name), false, nil + } + if cs.Spec.Replicas != nil && cs.Status.ReadyReplicas < *cs.Spec.Replicas { + return fmt.Sprintf("Waiting for %d pods to be ready...\n%s", *cs.Spec.Replicas-cs.Status.ReadyReplicas, + generatePodsInfoForCloneSet(c, cs)), false, nil + } + + return fmt.Sprintf("CloneSet %s rolling update complete %d pods at revision %s...\n%s", + cs.Name, cs.Status.AvailableReplicas, cs.Status.UpdateRevision, generatePodsInfoForCloneSet(c, cs)), true, nil +} + // Status returns a message describing advanced statefulset status, and a bool value indicating if the status is considered done. -func (s *AdvancedStatefulSetStatusViewer) Status(obj runtime.Unstructured, revision int64) (string, bool, error) { +func (s *AdvancedStatefulSetStatusViewer) Status(c kubernetes.Interface, obj runtime.Unstructured, revision int64) (string, bool, error) { asts := &kruiseappsv1beta1.StatefulSet{} err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), asts) if err != nil { @@ -222,5 +345,32 @@ func (s *AdvancedStatefulSetStatusViewer) Status(obj runtime.Unstructured, revis return fmt.Sprintf("Waiting for %d pods to be ready...\n", *asts.Spec.Replicas-asts.Status.ReadyReplicas), false, nil } return fmt.Sprintf("Advanced StatefulSet rolling update complete %d pods at revision %s...\n", asts.Status.AvailableReplicas, asts.Status.UpdateRevision), true, nil +} + +// DetailStatus returns a message describing advanced statefulset status, and a bool value indicating if the status is considered done. +func (s *AdvancedStatefulSetStatusViewer) DetailStatus(c kubernetes.Interface, obj runtime.Unstructured, detail bool, revision int64) (string, bool, error) { + asts := &kruiseappsv1beta1.StatefulSet{} + err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), asts) + if err != nil { + return "", false, fmt.Errorf("failed to convert %T to %T: %v", obj, asts, err) + } + + // check InPlaceOnly and InPlacePossible UpdateStrategy + if asts.Spec.UpdateStrategy.Type == appsv1.RollingUpdateStatefulSetStrategyType { + if asts.Spec.Replicas != nil && asts.Spec.UpdateStrategy.RollingUpdate.Partition != nil { + if asts.Status.UpdatedReplicas < (*asts.Spec.Replicas - *asts.Spec.UpdateStrategy.RollingUpdate.Partition) { + return fmt.Sprintf("Waiting for partitioned roll out to finish:%d out of %d new pods has been updated...\n", + asts.Status.UpdatedReplicas, *asts.Spec.Replicas-*asts.Spec.UpdateStrategy.RollingUpdate.Partition), false, nil + } + } + } + if asts.Status.ObservedGeneration == 0 || asts.Generation > asts.Status.ObservedGeneration { + return "Waiting for Advanced StatefulSet spec update to be observed...\n", false, nil + } + + if asts.Spec.Replicas != nil && asts.Status.ReadyReplicas < *asts.Spec.Replicas { + return fmt.Sprintf("Waiting for %d pods to be ready...\n", *asts.Spec.Replicas-asts.Status.ReadyReplicas), false, nil + } + return fmt.Sprintf("Advanced StatefulSet rolling update complete %d pods at revision %s...\n", asts.Status.AvailableReplicas, asts.Status.UpdateRevision), true, nil } diff --git a/pkg/internal/polymorphichelpers/rollout_utils.go b/pkg/internal/polymorphichelpers/rollout_utils.go new file mode 100644 index 0000000..ea5b1ff --- /dev/null +++ b/pkg/internal/polymorphichelpers/rollout_utils.go @@ -0,0 +1,74 @@ +package polymorphichelpers + +import ( + "context" + "fmt" + + kruiseappsv1alpha1 "github.com/openkruise/kruise-api/apps/v1alpha1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/kubernetes" +) + +func getPodsByLabelSelector(client kubernetes.Interface, ns string, labelSelector *metav1.LabelSelector) ([]*corev1.Pod, error) { + var podsList []*corev1.Pod + pods, err := client.CoreV1().Pods(ns).List(context.TODO(), metav1.ListOptions{LabelSelector: labels.Set(labelSelector.MatchLabels).String()}) + if err != nil { + return nil, err + } + + for i := range pods.Items { + podsList = append(podsList, &pods.Items[i]) + } + + return podsList, nil +} + +func filterOldNewReadyPodsFromCloneSet(client kubernetes.Interface, clone *kruiseappsv1alpha1.CloneSet) (oldPods []*corev1.Pod, + newNotReadyPods []*corev1.Pod, updatedReadyPods []*corev1.Pod, err error) { + pods, err := getPodsByLabelSelector(client, clone.Namespace, clone.Spec.Selector) + if err != nil { + return + } + + for i := range pods { + if podRevision, ok := pods[i].GetLabels()["controller-revision-hash"]; ok { + if podRevision == clone.Status.UpdateRevision { + if podReady(pods[i]) { + updatedReadyPods = append(updatedReadyPods, pods[i]) + } else { + newNotReadyPods = append(newNotReadyPods, pods[i]) + } + } else { + oldPods = append(oldPods, pods[i]) + } + } + } + return +} +func podReady(p *corev1.Pod) bool { + cs := p.Status.Conditions + for _, c := range cs { + if c.Type == corev1.PodReady { + return c.Status == corev1.ConditionTrue + } + } + return false +} + +func generatePodsInfoForCloneSet(client kubernetes.Interface, clone *kruiseappsv1alpha1.CloneSet) string { + var notReadyPodsSlice, ReadyPodsSlice []string + _, notReadyNewPods, readyNewPods, err := filterOldNewReadyPodsFromCloneSet(client, clone) + if err != nil { + return "" + } + for i := range notReadyNewPods { + notReadyPodsSlice = append(notReadyPodsSlice, notReadyNewPods[i].Name) + } + for j := range readyNewPods { + ReadyPodsSlice = append(ReadyPodsSlice, readyNewPods[j].Name) + } + + return fmt.Sprintf("Updated ready pods: %v\nUpdated not ready pods: %v\n", ReadyPodsSlice, notReadyPodsSlice) +}