Skip to content

Commit

Permalink
workloadspread support rolling update
Browse files Browse the repository at this point in the history
Signed-off-by: mingzhou.swx <mingzhou.swx@alibaba-inc.com>
  • Loading branch information
mingzhou.swx committed Mar 1, 2023
1 parent 6396116 commit 32e9dbe
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 20 deletions.
46 changes: 32 additions & 14 deletions pkg/controller/workloadspread/workloadspread_event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ package workloadspread
import (
"context"
"encoding/json"
"reflect"

appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -42,9 +44,10 @@ import (
type EventAction string

const (
CreateEventAction EventAction = "Create"
UpdateEventAction EventAction = "Update"
DeleteEventAction EventAction = "Delete"
CreateEventAction EventAction = "Create"
UpdateEventAction EventAction = "Update"
DeleteEventAction EventAction = "Delete"
DeploymentRevisionAnnotation = "deployment.kubernetes.io/revision"
)

var _ handler.EventHandler = &podEventHandler{}
Expand Down Expand Up @@ -110,8 +113,11 @@ func (w workloadEventHandler) Update(evt event.UpdateEvent, q workqueue.RateLimi
otherChanges = newObject.Status.UpdateRevision != oldObject.Status.CurrentRevision
gvk = controllerKruiseKindCS
case *appsv1.Deployment:
oldReplicas = *evt.ObjectOld.(*appsv1.Deployment).Spec.Replicas
newReplicas = *evt.ObjectNew.(*appsv1.Deployment).Spec.Replicas
oldObject := evt.ObjectOld.(*appsv1.Deployment)
newObject := evt.ObjectNew.(*appsv1.Deployment)
oldReplicas = *oldObject.Spec.Replicas
newReplicas = *newObject.Spec.Replicas
otherChanges = newObject.Annotations[DeploymentRevisionAnnotation] != oldObject.Annotations[DeploymentRevisionAnnotation]
gvk = controllerKindDep
case *appsv1.ReplicaSet:
oldReplicas = *evt.ObjectOld.(*appsv1.ReplicaSet).Spec.Replicas
Expand Down Expand Up @@ -139,7 +145,8 @@ func (w workloadEventHandler) Update(evt event.UpdateEvent, q workqueue.RateLimi
Namespace: evt.ObjectNew.GetNamespace(),
Name: evt.ObjectNew.GetName(),
}
ws, err := w.getWorkloadSpreadForWorkload(workloadNsn, gvk)
owner := metav1.GetControllerOfNoCopy(evt.ObjectNew)
ws, err := w.getWorkloadSpreadForWorkload(workloadNsn, gvk, owner)
if err != nil {
klog.Errorf("unable to get WorkloadSpread related with %s (%s/%s), err: %v",
gvk.Kind, workloadNsn.Namespace, workloadNsn.Name, err)
Expand Down Expand Up @@ -185,7 +192,8 @@ func (w *workloadEventHandler) handleWorkload(q workqueue.RateLimitingInterface,
Namespace: obj.GetNamespace(),
Name: obj.GetName(),
}
ws, err := w.getWorkloadSpreadForWorkload(workloadNsn, gvk)
owner := metav1.GetControllerOfNoCopy(obj)
ws, err := w.getWorkloadSpreadForWorkload(workloadNsn, gvk, owner)
if err != nil {
klog.Errorf("unable to get WorkloadSpread related with %s (%s/%s), err: %v",
gvk.Kind, workloadNsn.Namespace, workloadNsn.Name, err)
Expand All @@ -201,14 +209,25 @@ func (w *workloadEventHandler) handleWorkload(q workqueue.RateLimitingInterface,

func (w *workloadEventHandler) getWorkloadSpreadForWorkload(
workloadNamespaceName types.NamespacedName,
gvk schema.GroupVersionKind) (*appsv1alpha1.WorkloadSpread, error) {
gvk schema.GroupVersionKind, ownerRef *metav1.OwnerReference) (*appsv1alpha1.WorkloadSpread, error) {
wsList := &appsv1alpha1.WorkloadSpreadList{}
listOptions := &client.ListOptions{Namespace: workloadNamespaceName.Namespace}
if err := w.List(context.TODO(), wsList, listOptions); err != nil {
klog.Errorf("List WorkloadSpread failed: %s", err.Error())
return nil, err
}

// In case of ReplicaSet owned by Deployment, we should consider if the
// Deployment is referred by workloadSpread.
var ownerKey *types.NamespacedName
var ownerGvk schema.GroupVersionKind
if ownerRef != nil && reflect.DeepEqual(gvk, controllerKindRS) {
ownerGvk = schema.FromAPIVersionAndKind(ownerRef.APIVersion, ownerRef.Kind)
if reflect.DeepEqual(ownerGvk, controllerKindDep) {
ownerKey = &types.NamespacedName{Namespace: workloadNamespaceName.Namespace, Name: ownerRef.Name}
}
}

for _, ws := range wsList.Items {
if ws.DeletionTimestamp != nil {
continue
Expand All @@ -219,13 +238,12 @@ func (w *workloadEventHandler) getWorkloadSpreadForWorkload(
continue
}

targetGV, err := schema.ParseGroupVersion(targetRef.APIVersion)
if err != nil {
klog.Errorf("failed to parse targetRef's group version: %s", targetRef.APIVersion)
continue
// Ignore version
targetGk := schema.FromAPIVersionAndKind(targetRef.APIVersion, targetRef.Kind).GroupKind()
if reflect.DeepEqual(targetGk, gvk.GroupKind()) && targetRef.Name == workloadNamespaceName.Name {
return &ws, nil
}

if targetRef.Kind == gvk.Kind && targetGV.Group == gvk.Group && targetRef.Name == workloadNamespaceName.Name {
if ownerKey != nil && reflect.DeepEqual(targetGk, ownerGvk.GroupKind()) && targetRef.Name == ownerKey.Name {
return &ws, nil
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ func TestGetWorkloadSpreadForCloneSet(t *testing.T) {
Name: cs.getCloneSet().Name,
}
handler := workloadEventHandler{Reader: fakeClient}
workloadSpread, _ := handler.getWorkloadSpreadForWorkload(nsn, controllerKruiseKindCS)
workloadSpread, _ := handler.getWorkloadSpreadForWorkload(nsn, controllerKruiseKindCS, nil)
expectTopology := cs.expectWorkloadSpread()

if expectTopology == nil {
Expand Down Expand Up @@ -506,7 +506,7 @@ func TestGetWorkloadSpreadForDeployment(t *testing.T) {
Name: cs.getDeployment().Name,
}
handler := workloadEventHandler{Reader: fakeClient}
workloadSpread, _ := handler.getWorkloadSpreadForWorkload(nsn, controllerKindDep)
workloadSpread, _ := handler.getWorkloadSpreadForWorkload(nsn, controllerKindDep, nil)
expectTopology := cs.expectWorkloadSpread()

if expectTopology == nil {
Expand Down Expand Up @@ -608,7 +608,7 @@ func TestGetWorkloadSpreadForJob(t *testing.T) {
Name: cs.getJob().Name,
}
handler := workloadEventHandler{Reader: fakeClient}
workloadSpread, _ := handler.getWorkloadSpreadForWorkload(nsn, controllerKindJob)
workloadSpread, _ := handler.getWorkloadSpreadForWorkload(nsn, controllerKindJob, nil)
expectTopology := cs.expectWorkloadSpread()

if expectTopology == nil {
Expand Down Expand Up @@ -734,7 +734,7 @@ func TestGetWorkloadSpreadForReplicaSet(t *testing.T) {
Name: cs.getReplicaset().Name,
}
handler := workloadEventHandler{Reader: fakeClient}
workloadSpread, _ := handler.getWorkloadSpreadForWorkload(nsn, controllerKindRS)
workloadSpread, _ := handler.getWorkloadSpreadForWorkload(nsn, controllerKindRS, nil)
expectTopology := cs.expectWorkloadSpread()

if expectTopology == nil {
Expand Down Expand Up @@ -860,7 +860,7 @@ func TestGetWorkloadSpreadForStatefulSet(t *testing.T) {
Name: cs.getStatefulSet().Name,
}
handler := workloadEventHandler{Reader: fakeClient}
workloadSpread, _ := handler.getWorkloadSpreadForWorkload(nsn, controllerKindSts)
workloadSpread, _ := handler.getWorkloadSpreadForWorkload(nsn, controllerKindSts, nil)
expectTopology := cs.expectWorkloadSpread()

if expectTopology == nil {
Expand Down Expand Up @@ -986,7 +986,7 @@ func TestGetWorkloadSpreadForAdvancedStatefulSet(t *testing.T) {
Name: cs.getStatefulSet().Name,
}
handler := workloadEventHandler{Reader: fakeClient}
workloadSpread, _ := handler.getWorkloadSpreadForWorkload(nsn, controllerKruiseKindSts)
workloadSpread, _ := handler.getWorkloadSpreadForWorkload(nsn, controllerKruiseKindSts, nil)
expectTopology := cs.expectWorkloadSpread()

if expectTopology == nil {
Expand Down

0 comments on commit 32e9dbe

Please sign in to comment.