Skip to content

Commit

Permalink
workloadspread support rolling update
Browse files Browse the repository at this point in the history
Signed-off-by: mingzhou.swx <mingzhou.swx@alibaba-inc.com>
  • Loading branch information
mingzhou.swx committed Mar 21, 2023
1 parent 2be0f21 commit 70a358f
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 25 deletions.
4 changes: 2 additions & 2 deletions pkg/controller/workloadspread/workloadspread_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,10 +522,10 @@ func (r *ReconcileWorkloadSpread) patchFavoriteSubsetMetadataToPod(pod *corev1.P
func (r *ReconcileWorkloadSpread) calculateWorkloadSpreadStatus(ws *appsv1alpha1.WorkloadSpread,
versionedPodMap map[string]map[string][]*corev1.Pod, subsetPodMap map[string][]*corev1.Pod,
workloadReplicas int32) (*appsv1alpha1.WorkloadSpreadStatus, map[string][]*corev1.Pod) {
// set the generation in the returned status
status := appsv1alpha1.WorkloadSpreadStatus{}
// set the generation in the returned status
status.ObservedGeneration = ws.Generation
//status.ObservedWorkloadReplicas = workloadReplicas
// status.ObservedWorkloadReplicas = workloadReplicas
status.VersionedSubsetStatuses = make(map[string][]appsv1alpha1.WorkloadSpreadSubsetStatus, len(versionedPodMap))

var scheduleFailedPodMap map[string][]*corev1.Pod
Expand Down
48 changes: 33 additions & 15 deletions pkg/controller/workloadspread/workloadspread_event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ package workloadspread
import (
"context"
"encoding/json"
"reflect"

appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -42,9 +44,10 @@ import (
type EventAction string

const (
CreateEventAction EventAction = "Create"
UpdateEventAction EventAction = "Update"
DeleteEventAction EventAction = "Delete"
CreateEventAction EventAction = "Create"
UpdateEventAction EventAction = "Update"
DeleteEventAction EventAction = "Delete"
DeploymentRevisionAnnotation = "deployment.kubernetes.io/revision"
)

var _ handler.EventHandler = &podEventHandler{}
Expand All @@ -59,7 +62,7 @@ func (p *podEventHandler) Update(evt event.UpdateEvent, q workqueue.RateLimiting
oldPod := evt.ObjectOld.(*corev1.Pod)
newPod := evt.ObjectNew.(*corev1.Pod)

if kubecontroller.IsPodActive(oldPod) && !kubecontroller.IsPodActive(newPod) {
if kubecontroller.IsPodActive(oldPod) && !kubecontroller.IsPodActive(newPod) || wsutil.GetPodVersion(oldPod) != wsutil.GetPodVersion(newPod) {
p.handlePod(q, newPod, UpdateEventAction)
}
}
Expand Down Expand Up @@ -110,8 +113,11 @@ func (w workloadEventHandler) Update(evt event.UpdateEvent, q workqueue.RateLimi
otherChanges = newObject.Status.UpdateRevision != oldObject.Status.CurrentRevision
gvk = controllerKruiseKindCS
case *appsv1.Deployment:
oldReplicas = *evt.ObjectOld.(*appsv1.Deployment).Spec.Replicas
newReplicas = *evt.ObjectNew.(*appsv1.Deployment).Spec.Replicas
oldObject := evt.ObjectOld.(*appsv1.Deployment)
newObject := evt.ObjectNew.(*appsv1.Deployment)
oldReplicas = *oldObject.Spec.Replicas
newReplicas = *newObject.Spec.Replicas
otherChanges = newObject.Annotations[DeploymentRevisionAnnotation] != oldObject.Annotations[DeploymentRevisionAnnotation]
gvk = controllerKindDep
case *appsv1.ReplicaSet:
oldReplicas = *evt.ObjectOld.(*appsv1.ReplicaSet).Spec.Replicas
Expand Down Expand Up @@ -139,7 +145,8 @@ func (w workloadEventHandler) Update(evt event.UpdateEvent, q workqueue.RateLimi
Namespace: evt.ObjectNew.GetNamespace(),
Name: evt.ObjectNew.GetName(),
}
ws, err := w.getWorkloadSpreadForWorkload(workloadNsn, gvk)
owner := metav1.GetControllerOfNoCopy(evt.ObjectNew)
ws, err := w.getWorkloadSpreadForWorkload(workloadNsn, gvk, owner)
if err != nil {
klog.Errorf("unable to get WorkloadSpread related with %s (%s/%s), err: %v",
gvk.Kind, workloadNsn.Namespace, workloadNsn.Name, err)
Expand Down Expand Up @@ -185,7 +192,8 @@ func (w *workloadEventHandler) handleWorkload(q workqueue.RateLimitingInterface,
Namespace: obj.GetNamespace(),
Name: obj.GetName(),
}
ws, err := w.getWorkloadSpreadForWorkload(workloadNsn, gvk)
owner := metav1.GetControllerOfNoCopy(obj)
ws, err := w.getWorkloadSpreadForWorkload(workloadNsn, gvk, owner)
if err != nil {
klog.Errorf("unable to get WorkloadSpread related with %s (%s/%s), err: %v",
gvk.Kind, workloadNsn.Namespace, workloadNsn.Name, err)
Expand All @@ -201,14 +209,25 @@ func (w *workloadEventHandler) handleWorkload(q workqueue.RateLimitingInterface,

func (w *workloadEventHandler) getWorkloadSpreadForWorkload(
workloadNamespaceName types.NamespacedName,
gvk schema.GroupVersionKind) (*appsv1alpha1.WorkloadSpread, error) {
gvk schema.GroupVersionKind, ownerRef *metav1.OwnerReference) (*appsv1alpha1.WorkloadSpread, error) {
wsList := &appsv1alpha1.WorkloadSpreadList{}
listOptions := &client.ListOptions{Namespace: workloadNamespaceName.Namespace}
if err := w.List(context.TODO(), wsList, listOptions); err != nil {
klog.Errorf("List WorkloadSpread failed: %s", err.Error())
return nil, err
}

// In case of ReplicaSet owned by Deployment, we should consider if the
// Deployment is referred by workloadSpread.
var ownerKey *types.NamespacedName
var ownerGvk schema.GroupVersionKind
if ownerRef != nil && reflect.DeepEqual(gvk, controllerKindRS) {
ownerGvk = schema.FromAPIVersionAndKind(ownerRef.APIVersion, ownerRef.Kind)
if reflect.DeepEqual(ownerGvk, controllerKindDep) {
ownerKey = &types.NamespacedName{Namespace: workloadNamespaceName.Namespace, Name: ownerRef.Name}
}
}

for _, ws := range wsList.Items {
if ws.DeletionTimestamp != nil {
continue
Expand All @@ -219,13 +238,12 @@ func (w *workloadEventHandler) getWorkloadSpreadForWorkload(
continue
}

targetGV, err := schema.ParseGroupVersion(targetRef.APIVersion)
if err != nil {
klog.Errorf("failed to parse targetRef's group version: %s", targetRef.APIVersion)
continue
// Ignore version
targetGk := schema.FromAPIVersionAndKind(targetRef.APIVersion, targetRef.Kind).GroupKind()
if reflect.DeepEqual(targetGk, gvk.GroupKind()) && targetRef.Name == workloadNamespaceName.Name {
return &ws, nil
}

if targetRef.Kind == gvk.Kind && targetGV.Group == gvk.Group && targetRef.Name == workloadNamespaceName.Name {
if ownerKey != nil && reflect.DeepEqual(targetGk, ownerGvk.GroupKind()) && targetRef.Name == ownerKey.Name {
return &ws, nil
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ func TestGetWorkloadSpreadForCloneSet(t *testing.T) {
Name: cs.getCloneSet().Name,
}
handler := workloadEventHandler{Reader: fakeClient}
workloadSpread, _ := handler.getWorkloadSpreadForWorkload(nsn, controllerKruiseKindCS)
workloadSpread, _ := handler.getWorkloadSpreadForWorkload(nsn, controllerKruiseKindCS, nil)
expectTopology := cs.expectWorkloadSpread()

if expectTopology == nil {
Expand Down Expand Up @@ -506,7 +506,7 @@ func TestGetWorkloadSpreadForDeployment(t *testing.T) {
Name: cs.getDeployment().Name,
}
handler := workloadEventHandler{Reader: fakeClient}
workloadSpread, _ := handler.getWorkloadSpreadForWorkload(nsn, controllerKindDep)
workloadSpread, _ := handler.getWorkloadSpreadForWorkload(nsn, controllerKindDep, nil)
expectTopology := cs.expectWorkloadSpread()

if expectTopology == nil {
Expand Down Expand Up @@ -608,7 +608,7 @@ func TestGetWorkloadSpreadForJob(t *testing.T) {
Name: cs.getJob().Name,
}
handler := workloadEventHandler{Reader: fakeClient}
workloadSpread, _ := handler.getWorkloadSpreadForWorkload(nsn, controllerKindJob)
workloadSpread, _ := handler.getWorkloadSpreadForWorkload(nsn, controllerKindJob, nil)
expectTopology := cs.expectWorkloadSpread()

if expectTopology == nil {
Expand Down Expand Up @@ -734,7 +734,7 @@ func TestGetWorkloadSpreadForReplicaSet(t *testing.T) {
Name: cs.getReplicaset().Name,
}
handler := workloadEventHandler{Reader: fakeClient}
workloadSpread, _ := handler.getWorkloadSpreadForWorkload(nsn, controllerKindRS)
workloadSpread, _ := handler.getWorkloadSpreadForWorkload(nsn, controllerKindRS, nil)
expectTopology := cs.expectWorkloadSpread()

if expectTopology == nil {
Expand Down Expand Up @@ -860,7 +860,7 @@ func TestGetWorkloadSpreadForStatefulSet(t *testing.T) {
Name: cs.getStatefulSet().Name,
}
handler := workloadEventHandler{Reader: fakeClient}
workloadSpread, _ := handler.getWorkloadSpreadForWorkload(nsn, controllerKindSts)
workloadSpread, _ := handler.getWorkloadSpreadForWorkload(nsn, controllerKindSts, nil)
expectTopology := cs.expectWorkloadSpread()

if expectTopology == nil {
Expand Down Expand Up @@ -986,7 +986,7 @@ func TestGetWorkloadSpreadForAdvancedStatefulSet(t *testing.T) {
Name: cs.getStatefulSet().Name,
}
handler := workloadEventHandler{Reader: fakeClient}
workloadSpread, _ := handler.getWorkloadSpreadForWorkload(nsn, controllerKruiseKindSts)
workloadSpread, _ := handler.getWorkloadSpreadForWorkload(nsn, controllerKruiseKindSts, nil)
expectTopology := cs.expectWorkloadSpread()

if expectTopology == nil {
Expand Down
12 changes: 11 additions & 1 deletion pkg/util/workloadspread/workloadspread.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"math"
"regexp"
"strconv"
"strings"
"time"

appsv1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -462,7 +463,7 @@ func (h *Handler) updateSubsetForPod(ws *appsv1alpha1.WorkloadSpread,
var err error
version := GetPodVersion(pod)
subsetStatuses := ws.Status.VersionedSubsetStatuses[version]
if subsetStatuses == nil {
if len(subsetStatuses) == 0 {
subsetStatuses, err = h.initializedSubsetStatuses(ws)
if err != nil {
return false, nil, "", err
Expand Down Expand Up @@ -677,6 +678,9 @@ func (h *Handler) isReferenceEqual(target *appsv1alpha1.TargetReference, owner *
rs := &appsv1.ReplicaSet{}
err = h.Get(context.TODO(), client.ObjectKey{Namespace: namespace, Name: owner.Name}, rs)
if err != nil {
if errors.IsNotFound(err) { // to solve the problem of informer latency.
return replicaSetNameIsMatched(owner.Name, target.Name)
}
return false
}
if rs.UID != owner.UID {
Expand Down Expand Up @@ -736,6 +740,12 @@ func getSubsetCondition(ws *appsv1alpha1.WorkloadSpread, subsetName string, cond
return nil
}

// replicaSetNameIsMatched return if replicaset name (example: server-daily-xznclkds)
// matches deployment name (example: server-daily).
func replicaSetNameIsMatched(rsName, dName string) bool {
return rsName[:strings.LastIndex(rsName, "-")] == dName
}

func GetPodVersion(pod *corev1.Pod) string {
if version, exists := pod.Labels[appsv1.DefaultDeploymentUniqueLabelKey]; exists {
return version
Expand Down
13 changes: 12 additions & 1 deletion test/e2e/apps/workloadspread.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,15 @@ var _ = SIGDescribe("workloadspread", func() {
var ns string
var tester *framework.WorkloadSpreadTester

IsKubernetesVersionLessThan122 := func() bool {
if v, err := c.Discovery().ServerVersion(); err != nil {
framework.Logf("Failed to discovery server version: %v", err)
} else if minor, err := strconv.Atoi(v.Minor); err != nil || minor < 22 {
return true
}
return false
}

ginkgo.BeforeEach(func() {
ns = f.Namespace.Name
c = f.ClientSet
Expand Down Expand Up @@ -1712,6 +1721,9 @@ var _ = SIGDescribe("workloadspread", func() {

//test k8s cluster version >= 1.21
ginkgo.It("elastic deploy for deployment, zone-a=2, zone-b=nil", func() {
if IsKubernetesVersionLessThan122() {
ginkgo.Skip("kip this e2e case, it can only run on K8s >= 1.22")
}
deployment := tester.NewBaseDeployment(ns)
// create workloadSpread
targetRef := appsv1alpha1.TargetReference{
Expand Down Expand Up @@ -2018,6 +2030,5 @@ var _ = SIGDescribe("workloadspread", func() {
//
// ginkgo.By("workloadSpread for job, done")
//})

})
})

0 comments on commit 70a358f

Please sign in to comment.