Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix DaemonSet surging with minReadySeconds #1014

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions apis/apps/defaults/v1alpha1.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,11 +283,9 @@ func SetDefaultsDaemonSet(obj *v1alpha1.DaemonSet) {
obj.Spec.UpdateStrategy.RollingUpdate.Type = v1alpha1.StandardRollingUpdateType
}

if obj.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable == nil {
if obj.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable == nil && obj.Spec.UpdateStrategy.RollingUpdate.MaxSurge == nil {
maxUnavailable := intstr.FromInt(1)
obj.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable = &maxUnavailable
}
if obj.Spec.UpdateStrategy.RollingUpdate.MaxSurge == nil {
MaxSurge := intstr.FromInt(0)
obj.Spec.UpdateStrategy.RollingUpdate.MaxSurge = &MaxSurge
}
Expand Down
8 changes: 5 additions & 3 deletions apis/apps/v1alpha1/daemonset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,11 @@ type DaemonSetStatus struct {
// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// +kubebuilder:resource:shortName=daemon;ads
// +kubebuilder:printcolumn:name="DesiredNumber",type="integer",JSONPath=".status.desiredNumberScheduled",description="The desired number of pods."
// +kubebuilder:printcolumn:name="CurrentNumber",type="integer",JSONPath=".status.currentNumberScheduled",description="The current number of pods."
// +kubebuilder:printcolumn:name="UpdatedNumberScheduled",type="integer",JSONPath=".status.updatedNumberScheduled",description="The updated number of pods."
// +kubebuilder:printcolumn:name="DESIRED",type="integer",JSONPath=".status.desiredNumberScheduled",description="The desired number of pods."
// +kubebuilder:printcolumn:name="CURRENT",type="integer",JSONPath=".status.currentNumberScheduled",description="The current number of pods."
// +kubebuilder:printcolumn:name="READY",type="integer",JSONPath=".status.numberReady",description="The ready number of pods."
// +kubebuilder:printcolumn:name="UP-TO-DATE",type="integer",JSONPath=".status.updatedNumberScheduled",description="The updated number of pods."
// +kubebuilder:printcolumn:name="AVAILABLE",type="integer",JSONPath=".status.numberAvailable",description="The updated number of pods."
// +kubebuilder:printcolumn:name="AGE",type="date",JSONPath=".metadata.creationTimestamp",description="CreationTimestamp is a timestamp representing the server time when this object was created. It is not guaranteed to be set in happens-before order across separate operations. Clients may not set this value. It is represented in RFC3339 form and is in UTC."
// +kubebuilder:printcolumn:name="CONTAINERS",type="string",priority=1,JSONPath=".spec.template.spec.containers[*].name",description="The containers of currently daemonset."
// +kubebuilder:printcolumn:name="IMAGES",type="string",priority=1,JSONPath=".spec.template.spec.containers[*].image",description="The images of currently advanced daemonset."
Expand Down
14 changes: 11 additions & 3 deletions config/crd/bases/apps.kruise.io_daemonsets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,23 @@ spec:
- additionalPrinterColumns:
- description: The desired number of pods.
jsonPath: .status.desiredNumberScheduled
name: DesiredNumber
name: DESIRED
type: integer
- description: The current number of pods.
jsonPath: .status.currentNumberScheduled
name: CurrentNumber
name: CURRENT
type: integer
- description: The ready number of pods.
jsonPath: .status.numberReady
name: READY
type: integer
- description: The updated number of pods.
jsonPath: .status.updatedNumberScheduled
name: UpdatedNumberScheduled
name: UP-TO-DATE
type: integer
- description: The updated number of pods.
jsonPath: .status.numberAvailable
name: AVAILABLE
type: integer
- description: CreationTimestamp is a timestamp representing the server time when
this object was created. It is not guaranteed to be set in happens-before
Expand Down
19 changes: 16 additions & 3 deletions pkg/controller/daemonset/daemonset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,13 +290,23 @@ type ReconcileDaemonSet struct {

// Reconcile reads that state of the cluster for a DaemonSet object and makes changes based on the state read
// and what is in the DaemonSet.Spec
func (dsc *ReconcileDaemonSet) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
func (dsc *ReconcileDaemonSet) Reconcile(ctx context.Context, request reconcile.Request) (res reconcile.Result, retErr error) {
onceBackoffGC.Do(func() {
go wait.Until(dsc.failedPodsBackoff.GC, BackoffGCInterval, ctx.Done())
})
startTime := dsc.failedPodsBackoff.Clock.Now()
startTime := time.Now()
defer func() {
klog.V(4).Infof("Finished syncing DaemonSet %q (%v)", request.String(), dsc.failedPodsBackoff.Clock.Now().Sub(startTime))
if retErr == nil {
if res.Requeue || res.RequeueAfter > 0 {
klog.Infof("Finished syncing DaemonSet %s, cost %v, result: %v", request, time.Since(startTime), res)
} else {
klog.Infof("Finished syncing DaemonSet %s, cost %v", request, time.Since(startTime))
}
} else {
klog.Errorf("Failed syncing DaemonSet %s: %v", request, retErr)
}
// clean the duration store
_ = durationStore.Pop(request.String())
}()

err := dsc.syncDaemonSet(request)
Expand Down Expand Up @@ -561,6 +571,7 @@ func (dsc *ReconcileDaemonSet) storeDaemonSetStatus(ds *appsv1alpha1.DaemonSet,
toUpdate.Status.DaemonSetHash = hash

if _, updateErr = dsClient.UpdateStatus(context.TODO(), toUpdate, metav1.UpdateOptions{}); updateErr == nil {
klog.Infof("Updated DaemonSet %s/%s status to %v", ds.Namespace, ds.Name, kruiseutil.DumpJSON(toUpdate.Status))
return nil
}

Expand Down Expand Up @@ -888,6 +899,8 @@ func (dsc *ReconcileDaemonSet) podsShouldBeOnNode(
case podutil.IsPodAvailable(oldestNewPod, ds.Spec.MinReadySeconds, metav1.Time{Time: dsc.failedPodsBackoff.Clock.Now()}):
klog.V(5).Infof("Pod %s/%s from daemonset %s is now ready and will replace older pod %s", oldestNewPod.Namespace, oldestNewPod.Name, ds.Name, oldestOldPod.Name)
podsToDelete = append(podsToDelete, oldestOldPod.Name)
case podutil.IsPodReady(oldestNewPod) && ds.Spec.MinReadySeconds > 0:
durationStore.Push(keyFunc(ds), podAvailableWaitingTime(oldestNewPod, ds.Spec.MinReadySeconds, dsc.failedPodsBackoff.Clock.Now()))
}
}

Expand Down
10 changes: 10 additions & 0 deletions pkg/controller/daemonset/daemonset_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"sort"
"sync"
"time"

appspub "github.com/openkruise/kruise/apis/apps/pub"
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
Expand Down Expand Up @@ -357,3 +358,12 @@ func isPodPreDeleting(pod *corev1.Pod) bool {
func isPodNilOrPreDeleting(pod *corev1.Pod) bool {
return pod == nil || isPodPreDeleting(pod)
}

func podAvailableWaitingTime(pod *corev1.Pod, minReadySeconds int32, now time.Time) time.Duration {
c := podutil.GetPodReadyCondition(pod.Status)
minReadySecondsDuration := time.Duration(minReadySeconds) * time.Second
if c == nil || c.LastTransitionTime.IsZero() {
return minReadySecondsDuration
}
return minReadySecondsDuration - now.Sub(c.LastTransitionTime.Time)
}
91 changes: 91 additions & 0 deletions test/e2e/apps/daemonset.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
daemonutil "k8s.io/kubernetes/pkg/controller/daemon/util"
)

Expand Down Expand Up @@ -406,5 +408,94 @@ var _ = SIGDescribe("DaemonSet", func() {
}, time.Second*60, time.Second).Should(gomega.Equal(1))

})

framework.ConformanceIt("should successfully surging update daemonset with minReadySeconds", func() {
label := map[string]string{framework.DaemonSetNameLabel: dsName}

ginkgo.By(fmt.Sprintf("Creating DaemonSet %q", dsName))
maxSurge := intstr.FromString("100%")
maxUnavailable := intstr.FromInt(0)
ds := tester.NewDaemonSet(dsName, label, WebserverImage, appsv1alpha1.DaemonSetUpdateStrategy{
Type: appsv1alpha1.RollingUpdateDaemonSetStrategyType,
RollingUpdate: &appsv1alpha1.RollingUpdateDaemonSet{
MaxSurge: &maxSurge,
MaxUnavailable: &maxUnavailable,
},
})
ds.Spec.MinReadySeconds = 10
ds, err := tester.CreateDaemonSet(ds)
gomega.Expect(err).NotTo(gomega.HaveOccurred())

ginkgo.By("Check that daemon pods launch on every node of the cluster.")
err = wait.PollImmediate(framework.DaemonSetRetryPeriod, framework.DaemonSetRetryTimeout, tester.CheckRunningOnAllNodes(ds))

gomega.Expect(err).NotTo(gomega.HaveOccurred(), "error waiting for daemon pod to start")
err = tester.CheckDaemonStatus(dsName)
gomega.Expect(err).NotTo(gomega.HaveOccurred())

ds, err = tester.GetDaemonSet(dsName)
gomega.Expect(err).NotTo(gomega.HaveOccurred())

ginkgo.By("Get all old daemon pods on nodes")
oldNodeToDaemonPods, err := tester.GetNodesToDaemonPods(label)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
gomega.Expect(oldNodeToDaemonPods).To(gomega.HaveLen(int(ds.Status.DesiredNumberScheduled)))

nodeNameList := sets.NewString()
for nodeName, pods := range oldNodeToDaemonPods {
nodeNameList.Insert(nodeName)
gomega.Expect(pods).To(gomega.HaveLen(1))
gomega.Expect(podutil.IsPodReady(pods[0])).To(gomega.BeTrue())
}

//change pods container image
err = tester.UpdateDaemonSet(ds.Name, func(ds *appsv1alpha1.DaemonSet) {
ds.Spec.Template.Spec.Containers[0].Image = NewWebserverImage
})
gomega.Expect(err).NotTo(gomega.HaveOccurred(), "error to update daemon")

ginkgo.By("Check all surging Pods created")
err = wait.PollImmediate(time.Second, time.Minute, func() (done bool, err error) {
nodeToDaemonPods, err := tester.GetNodesToDaemonPods(label)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
gomega.Expect(nodeToDaemonPods).To(gomega.HaveLen(int(ds.Status.DesiredNumberScheduled)))

for _, nodeName := range nodeNameList.List() {
pods := nodeToDaemonPods[nodeName]
if len(pods) < 2 {
continue
}

for _, pod := range pods {
if pod.Name == oldNodeToDaemonPods[nodeName][0].Name {
gomega.Expect(pod.DeletionTimestamp).To(gomega.BeNil())
}
}
nodeNameList.Delete(nodeName)
}
return nodeNameList.Len() == 0, nil
})

ginkgo.By("Check all old Pods deleted")
err = wait.PollImmediate(time.Second, time.Minute, func() (done bool, err error) {
nodeToDaemonPods, err := tester.GetNodesToDaemonPods(label)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
gomega.Expect(nodeToDaemonPods).To(gomega.HaveLen(int(ds.Status.DesiredNumberScheduled)))

finished := true
for nodeName, pods := range nodeToDaemonPods {
if len(pods) != 1 {
finished = false
continue
}

gomega.Expect(pods[0].Name).NotTo(gomega.Equal(oldNodeToDaemonPods[nodeName][0].Name))
gomega.Expect(podutil.IsPodReady(pods[0])).To(gomega.BeTrue())
c := podutil.GetPodReadyCondition(pods[0].Status)
gomega.Expect(int32(time.Since(c.LastTransitionTime.Time) / time.Second)).To(gomega.BeNumerically(">", ds.Spec.MinReadySeconds))
}
return finished, nil
})
})
})
})
21 changes: 21 additions & 0 deletions test/e2e/framework/daemonset_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package framework
import (
"context"
"fmt"

"reflect"
"strings"
"time"
Expand All @@ -22,6 +23,7 @@ import (
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/retry"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
daemonutil "k8s.io/kubernetes/pkg/controller/daemon/util"
utilpointer "k8s.io/utils/pointer"
)

Expand Down Expand Up @@ -419,3 +421,22 @@ func (t *DaemonSetTester) SortPodNames(podList *v1.PodList) []string {
}
return names.List()
}

func (t *DaemonSetTester) GetNodesToDaemonPods(label map[string]string) (map[string][]*v1.Pod, error) {
podList, err := t.ListDaemonPods(label)
if err != nil {
return nil, err
}
// Group Pods by Node name.
nodeToDaemonPods := make(map[string][]*v1.Pod)
for i := range podList.Items {
pod := &podList.Items[i]
nodeName, err := daemonutil.GetTargetNodeName(pod)
if err != nil {
continue
}
nodeToDaemonPods[nodeName] = append(nodeToDaemonPods[nodeName], pod)
}

return nodeToDaemonPods, nil
}