diff --git a/apis/apps/v1alpha1/workloadspread_types.go b/apis/apps/v1alpha1/workloadspread_types.go index 34e7e4ab52..5b8784dc01 100644 --- a/apis/apps/v1alpha1/workloadspread_types.go +++ b/apis/apps/v1alpha1/workloadspread_types.go @@ -130,6 +130,11 @@ type WorkloadSpreadStatus struct { // Contains the status of each subset. Each element in this array represents one subset // +optional SubsetStatuses []WorkloadSpreadSubsetStatus `json:"subsetStatuses,omitempty"` + + // VersionedSubsetStatuses is to solve rolling-update problems, where the creation of new-version pod + // may be earlier than deletion of old-version pod. We have to calculate the pod subset distribution for + // each version. + VersionedSubsetStatuses map[string][]WorkloadSpreadSubsetStatus `json:"versionedSubsetStatuses,omitempty"` } type WorkloadSpreadSubsetConditionType string diff --git a/apis/apps/v1alpha1/zz_generated.deepcopy.go b/apis/apps/v1alpha1/zz_generated.deepcopy.go index 55d1a2cfba..176ebd1539 100644 --- a/apis/apps/v1alpha1/zz_generated.deepcopy.go +++ b/apis/apps/v1alpha1/zz_generated.deepcopy.go @@ -3666,6 +3666,23 @@ func (in *WorkloadSpreadStatus) DeepCopyInto(out *WorkloadSpreadStatus) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.VersionedSubsetStatuses != nil { + in, out := &in.VersionedSubsetStatuses, &out.VersionedSubsetStatuses + *out = make(map[string][]WorkloadSpreadSubsetStatus, len(*in)) + for key, val := range *in { + var outVal []WorkloadSpreadSubsetStatus + if val == nil { + (*out)[key] = nil + } else { + in, out := &val, &outVal + *out = make([]WorkloadSpreadSubsetStatus, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + (*out)[key] = outVal + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new WorkloadSpreadStatus. diff --git a/config/crd/bases/apps.kruise.io_workloadspreads.yaml b/config/crd/bases/apps.kruise.io_workloadspreads.yaml index de26e1122b..20972d1453 100644 --- a/config/crd/bases/apps.kruise.io_workloadspreads.yaml +++ b/config/crd/bases/apps.kruise.io_workloadspreads.yaml @@ -432,6 +432,96 @@ spec: - replicas type: object type: array + versionedSubsetStatuses: + additionalProperties: + items: + description: WorkloadSpreadSubsetStatus defines the observed state + of subset + properties: + conditions: + description: Conditions is an array of current observed subset + conditions. + items: + properties: + lastTransitionTime: + description: Last time the condition transitioned from + one status to another. + format: date-time + type: string + message: + description: A human readable message indicating details + about the transition. + type: string + reason: + description: The reason for the condition's last transition. + type: string + status: + description: Status of the condition, one of True, False, + Unknown. + type: string + type: + description: Type of in place set condition. + type: string + required: + - status + - type + type: object + type: array + creatingPods: + additionalProperties: + format: date-time + type: string + description: CreatingPods contains information about pods + whose creation was processed by the webhook handler but + not yet been observed by the WorkloadSpread controller. + A pod will be in this map from the time when the webhook + handler processed the creation request to the time when + the pod is seen by controller. The key in the map is the + name of the pod and the value is the time when the webhook + handler process the creation request. If the real creation + didn't happen and a pod is still in this map, it will be + removed from the list automatically by WorkloadSpread controller + after some time. If everything goes smooth this map should + be empty for the most of the time. Large number of entries + in the map may indicate problems with pod creations. + type: object + deletingPods: + additionalProperties: + format: date-time + type: string + description: DeletingPods is similar with CreatingPods and + it contains information about pod deletion. + type: object + missingReplicas: + description: MissingReplicas is the number of active replicas + belong to this subset not be found. MissingReplicas > 0 + indicates the subset is still missing MissingReplicas pods + to create MissingReplicas = 0 indicates the subset already + has enough pods, there is no need to create MissingReplicas + = -1 indicates the subset's MaxReplicas not set, then there + is no limit for pods number + format: int32 + type: integer + name: + description: Name should be unique between all of the subsets + under one WorkloadSpread. + type: string + replicas: + description: Replicas is the most recently observed number + of active replicas for subset. + format: int32 + type: integer + required: + - missingReplicas + - name + - replicas + type: object + type: array + description: VersionedSubsetStatuses is to solve rolling-update problems, + where the creation of new-version pod may be earlier than deletion + of old-version pod. We have to calculate the pod subset distribution + for each version. + type: object type: object type: object served: true diff --git a/pkg/controller/workloadspread/update_pod_deletion_cost.go b/pkg/controller/workloadspread/update_pod_deletion_cost.go index a8c00db040..48ee2a4b20 100644 --- a/pkg/controller/workloadspread/update_pod_deletion_cost.go +++ b/pkg/controller/workloadspread/update_pod_deletion_cost.go @@ -23,6 +23,7 @@ import ( "strconv" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/klog/v2" @@ -33,16 +34,66 @@ import ( wsutil "github.com/openkruise/kruise/pkg/util/workloadspread" ) +const ( + // RevisionAnnotation is the revision annotation of a deployment's replica sets which records its rollout sequence + RevisionAnnotation = "deployment.kubernetes.io/revision" +) + +func (r *ReconcileWorkloadSpread) getWorkloadLatestVersion(ws *appsv1alpha1.WorkloadSpread) (string, error) { + targetRef := ws.Spec.TargetReference + if targetRef == nil { + return "", nil + } + + gvk := schema.FromAPIVersionAndKind(targetRef.APIVersion, targetRef.Kind) + key := types.NamespacedName{Namespace: ws.Namespace, Name: targetRef.Name} + + object := wsutil.GenerateEmptyWorkloadObject(gvk, key) + if err := r.Get(context.TODO(), key, object); err != nil { + return "", client.IgnoreNotFound(err) + } + + return wsutil.GetWorkloadVersion(r.Client, object) +} + func (r *ReconcileWorkloadSpread) updateDeletionCost(ws *appsv1alpha1.WorkloadSpread, - podMap map[string][]*corev1.Pod, + versionedPodMap map[string]map[string][]*corev1.Pod, workloadReplicas int32) error { targetRef := ws.Spec.TargetReference if targetRef == nil || !isEffectiveKindForDeletionCost(targetRef) { return nil } + + latestVersion, err := r.getWorkloadLatestVersion(ws) + if err != nil { + klog.Errorf("Failed to get the latest version for workload in workloadSpread %v, err: %v", klog.KObj(ws), err) + return err + } + + // To try our best to keep the distribution of workload description during workload rolling: + // - to the latest version, we hope to scale down the last subset preferentially; + // - to other old versions, we hope to scale down the first subset preferentially; + for version, podMap := range versionedPodMap { + err = r.updateDeletionCostBySubset(ws, podMap, workloadReplicas, version != latestVersion) + if err != nil { + return err + } + } + return nil +} + +func (r *ReconcileWorkloadSpread) updateDeletionCostBySubset(ws *appsv1alpha1.WorkloadSpread, + podMap map[string][]*corev1.Pod, workloadReplicas int32, reverseOrder bool) error { + subsetNum := len(ws.Spec.Subsets) + subsetIndex := func(index int) int { + if reverseOrder { + return subsetNum - index - 1 + } + return index + } // update Pod's deletion-cost annotation in each subset for idx, subset := range ws.Spec.Subsets { - if err := r.syncSubsetPodDeletionCost(ws, &subset, idx, podMap[subset.Name], workloadReplicas); err != nil { + if err := r.syncSubsetPodDeletionCost(ws, &subset, subsetIndex(idx), podMap[subset.Name], workloadReplicas); err != nil { return err } } diff --git a/pkg/controller/workloadspread/workloadspread_controller.go b/pkg/controller/workloadspread/workloadspread_controller.go index 8d01c6748a..37375eef73 100644 --- a/pkg/controller/workloadspread/workloadspread_controller.go +++ b/pkg/controller/workloadspread/workloadspread_controller.go @@ -319,20 +319,20 @@ func (r *ReconcileWorkloadSpread) syncWorkloadSpread(ws *appsv1alpha1.WorkloadSp klog.Warningf("WorkloadSpread (%s/%s) has no matched pods, target workload's replicas[%d]", ws.Namespace, ws.Name, workloadReplicas) } - // group Pods by subset - podMap, err := r.groupPod(ws, pods, workloadReplicas) + // group Pods by pod-revision and subset + versionedPodMap, subsetPodMap, err := r.groupVersionedPods(ws, pods, workloadReplicas) if err != nil { return err } // update deletion-cost for each subset - err = r.updateDeletionCost(ws, podMap, workloadReplicas) + err = r.updateDeletionCost(ws, versionedPodMap, workloadReplicas) if err != nil { return err } // calculate status and reschedule - status, scheduleFailedPodMap := r.calculateWorkloadSpreadStatus(ws, podMap, workloadReplicas) + status, scheduleFailedPodMap := r.calculateWorkloadSpreadStatus(ws, versionedPodMap, subsetPodMap, workloadReplicas) if status == nil { return nil } @@ -362,8 +362,33 @@ func getInjectWorkloadSpreadFromPod(pod *corev1.Pod) *wsutil.InjectWorkloadSprea return injectWS } -// groupPod returns a map, the key is the name of subset and the value represents the Pods of the corresponding subset. -func (r *ReconcileWorkloadSpread) groupPod(ws *appsv1alpha1.WorkloadSpread, pods []*corev1.Pod, replicas int32) (map[string][]*corev1.Pod, error) { +// groupVersionedPods will group pods by pod version and subset +func (r *ReconcileWorkloadSpread) groupVersionedPods(ws *appsv1alpha1.WorkloadSpread, allPods []*corev1.Pod, replicas int32) (map[string]map[string][]*corev1.Pod, map[string][]*corev1.Pod, error) { + versionedPods := map[string][]*corev1.Pod{} + for _, pod := range allPods { + version := wsutil.GetPodVersion(pod) + versionedPods[version] = append(versionedPods[version], pod) + } + + subsetPodMap := map[string][]*corev1.Pod{} + versionedPodMap := map[string]map[string][]*corev1.Pod{} + // group pods by version + for version, pods := range versionedPods { + // group pods by subset + podMap, err := r.groupPodBySubset(ws, pods, replicas) + if err != nil { + return nil, nil, err + } + for subset, ps := range podMap { + subsetPodMap[subset] = append(subsetPodMap[subset], ps...) + } + versionedPodMap[version] = podMap + } + return versionedPodMap, subsetPodMap, nil +} + +// groupPodBySubset returns a map, the key is the name of subset and the value represents the Pods of the corresponding subset. +func (r *ReconcileWorkloadSpread) groupPodBySubset(ws *appsv1alpha1.WorkloadSpread, pods []*corev1.Pod, replicas int32) (map[string][]*corev1.Pod, error) { podMap := make(map[string][]*corev1.Pod, len(ws.Spec.Subsets)+1) podMap[FakeSubsetName] = []*corev1.Pod{} subsetMissingReplicas := make(map[string]int) @@ -507,18 +532,58 @@ func (r *ReconcileWorkloadSpread) patchFavoriteSubsetMetadataToPod(pod *corev1.P // 1. current WorkloadSpreadStatus // 2. a map, the key is the subsetName, the value is the schedule failed Pods belongs to the subset. func (r *ReconcileWorkloadSpread) calculateWorkloadSpreadStatus(ws *appsv1alpha1.WorkloadSpread, - podMap map[string][]*corev1.Pod, workloadReplicas int32) (*appsv1alpha1.WorkloadSpreadStatus, map[string][]*corev1.Pod) { - // set the generation in the returned status + versionedPodMap map[string]map[string][]*corev1.Pod, subsetPodMap map[string][]*corev1.Pod, + workloadReplicas int32) (*appsv1alpha1.WorkloadSpreadStatus, map[string][]*corev1.Pod) { status := appsv1alpha1.WorkloadSpreadStatus{} + // set the generation in the returned status status.ObservedGeneration = ws.Generation // status.ObservedWorkloadReplicas = workloadReplicas - status.SubsetStatuses = make([]appsv1alpha1.WorkloadSpreadSubsetStatus, len(ws.Spec.Subsets)) + status.VersionedSubsetStatuses = make(map[string][]appsv1alpha1.WorkloadSpreadSubsetStatus, len(versionedPodMap)) + + // overall subset statuses + var scheduleFailedPodMap map[string][]*corev1.Pod + status.SubsetStatuses, scheduleFailedPodMap = r.calculateWorkloadSpreadSubsetStatuses(ws, ws.Status.SubsetStatuses, subsetPodMap, workloadReplicas) + + // versioned subset statuses calculated by observed pods + for version, podMap := range versionedPodMap { + status.VersionedSubsetStatuses[version], _ = r.calculateWorkloadSpreadSubsetStatuses(ws, ws.Status.VersionedSubsetStatuses[version], podMap, workloadReplicas) + } + + // Consider this case: + // A Pod has been created and processed by webhook, but the Pod is not cached by controller. + // We have to keep the subsetStatus for this version even though there is no Pod belonging to it. + for version := range ws.Status.VersionedSubsetStatuses { + if _, exist := versionedPodMap[version]; exist { + continue + } + versionSubsetStatues, _ := r.calculateWorkloadSpreadSubsetStatuses(ws, ws.Status.VersionedSubsetStatuses[version], nil, workloadReplicas) + if !isEmptySubsetStatuses(versionSubsetStatues) { + status.VersionedSubsetStatuses[version] = versionSubsetStatues + } + } + + return &status, scheduleFailedPodMap +} + +func isEmptySubsetStatuses(statues []appsv1alpha1.WorkloadSpreadSubsetStatus) bool { + replicas, creating, deleting := 0, 0, 0 + for _, subset := range statues { + replicas += int(subset.Replicas) + creating += len(subset.CreatingPods) + deleting += len(subset.DeletingPods) + } + return replicas+creating+deleting == 0 +} + +func (r *ReconcileWorkloadSpread) calculateWorkloadSpreadSubsetStatuses(ws *appsv1alpha1.WorkloadSpread, + oldSubsetStatuses []appsv1alpha1.WorkloadSpreadSubsetStatus, podMap map[string][]*corev1.Pod, workloadReplicas int32, +) ([]appsv1alpha1.WorkloadSpreadSubsetStatus, map[string][]*corev1.Pod) { + subsetStatuses := make([]appsv1alpha1.WorkloadSpreadSubsetStatus, len(ws.Spec.Subsets)) scheduleFailedPodMap := make(map[string][]*corev1.Pod) // Using a map to restore name and old status of subset, because user could adjust the spec's subset sequence // to change priority of subset. We guarantee that operation and use subset name to distinguish which subset // from old status. - oldSubsetStatuses := ws.Status.SubsetStatuses oldSubsetStatusMap := make(map[string]*appsv1alpha1.WorkloadSpreadSubsetStatus, len(oldSubsetStatuses)) for i := range oldSubsetStatuses { oldSubsetStatusMap[oldSubsetStatuses[i].Name] = &oldSubsetStatuses[i] @@ -557,10 +622,10 @@ func (r *ReconcileWorkloadSpread) calculateWorkloadSpreadStatus(ws *appsv1alpha1 removeWorkloadSpreadSubsetCondition(subsetStatus, appsv1alpha1.SubsetSchedulable) } - status.SubsetStatuses[i] = *subsetStatus + subsetStatuses[i] = *subsetStatus } - return &status, scheduleFailedPodMap + return subsetStatuses, scheduleFailedPodMap } // calculateWorkloadSpreadSubsetStatus returns the current subsetStatus for subset. @@ -678,9 +743,7 @@ func (r *ReconcileWorkloadSpread) calculateWorkloadSpreadSubsetStatus(ws *appsv1 func (r *ReconcileWorkloadSpread) UpdateWorkloadSpreadStatus(ws *appsv1alpha1.WorkloadSpread, status *appsv1alpha1.WorkloadSpreadStatus) error { - if status.ObservedGeneration == ws.Status.ObservedGeneration && - // status.ObservedWorkloadReplicas == ws.Status.ObservedWorkloadReplicas && - apiequality.Semantic.DeepEqual(status.SubsetStatuses, ws.Status.SubsetStatuses) { + if apiequality.Semantic.DeepEqual(status, ws.Status) { return nil } diff --git a/pkg/controller/workloadspread/workloadspread_controller_test.go b/pkg/controller/workloadspread/workloadspread_controller_test.go index ae723b7dc5..abed601ab4 100644 --- a/pkg/controller/workloadspread/workloadspread_controller_test.go +++ b/pkg/controller/workloadspread/workloadspread_controller_test.go @@ -25,14 +25,14 @@ import ( "testing" "time" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" - + apps "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/tools/record" utilpointer "k8s.io/utils/pointer" "sigs.k8s.io/controller-runtime/pkg/client" @@ -106,6 +106,10 @@ var ( }, }, }, + Status: appsv1alpha1.CloneSetStatus{ + ObservedGeneration: 10, + UpdateRevision: wsutil.VersionIgnored, + }, } workloadSpreadDemo = &appsv1alpha1.WorkloadSpread{ @@ -1338,6 +1342,132 @@ func TestWorkloadSpreadReconcile(t *testing.T) { return workloadSpread }, }, + { + name: "multiVersion pods", + getPods: func() []*corev1.Pod { + pods := make([]*corev1.Pod, 10) + for i := 0; i < 6; i++ { + pods[i] = podDemo.DeepCopy() + pods[i].Name = fmt.Sprintf("test-pod-%d", i) + pods[i].Annotations = map[string]string{ + wsutil.MatchedWorkloadSpreadSubsetAnnotations: `{"Name":"test-workloadSpread","Subset":"subset-a"}`, + } + } + for i := 6; i < 10; i++ { + pods[i] = podDemo.DeepCopy() + pods[i].Name = fmt.Sprintf("test-pod-%d", i) + pods[i].Annotations = map[string]string{ + wsutil.MatchedWorkloadSpreadSubsetAnnotations: `{"Name":"test-workloadSpread","Subset":"subset-b"}`, + } + } + pods[0].Labels[apps.DefaultDeploymentUniqueLabelKey] = "oldVersion" + pods[2].Labels[apps.DefaultDeploymentUniqueLabelKey] = "oldVersion" + pods[4].Labels[apps.DefaultDeploymentUniqueLabelKey] = "oldVersion" + pods[6].Labels[apps.DefaultDeploymentUniqueLabelKey] = "oldVersion" + pods[8].Labels[apps.DefaultDeploymentUniqueLabelKey] = "oldVersion" + pods[1].Labels[apps.DefaultDeploymentUniqueLabelKey] = "newVersion" + pods[3].Labels[apps.DefaultDeploymentUniqueLabelKey] = "newVersion" + pods[5].Labels[apps.DefaultDeploymentUniqueLabelKey] = "newVersion" + pods[7].Labels[apps.DefaultDeploymentUniqueLabelKey] = "newVersion" + pods[9].Labels[apps.DefaultDeploymentUniqueLabelKey] = "newVersion" + return pods + }, + getWorkloadSpread: func() *appsv1alpha1.WorkloadSpread { + workloadSpread := workloadSpreadDemo.DeepCopy() + workloadSpread.Spec.Subsets = []appsv1alpha1.WorkloadSpreadSubset{ + { + Name: "subset-a", + MaxReplicas: &intstr.IntOrString{Type: intstr.Int, IntVal: 3}, + }, + { + Name: "subset-b", + }, + } + return workloadSpread + }, + getCloneSet: func() *appsv1alpha1.CloneSet { + clone := cloneSetDemo.DeepCopy() + clone.Status.UpdateRevision = "newVersion" + return clone + }, + expectPods: func() []*corev1.Pod { + pods := make([]*corev1.Pod, 10) + for i := 0; i < 6; i++ { + pods[i] = podDemo.DeepCopy() + pods[i].Name = fmt.Sprintf("test-pod-%d", i) + pods[i].Annotations = map[string]string{ + wsutil.MatchedWorkloadSpreadSubsetAnnotations: `{"Name":"test-workloadSpread","Subset":"subset-a"}`, + } + } + for i := 6; i < 10; i++ { + pods[i] = podDemo.DeepCopy() + pods[i].Name = fmt.Sprintf("test-pod-%d", i) + pods[i].Annotations = map[string]string{ + wsutil.MatchedWorkloadSpreadSubsetAnnotations: `{"Name":"test-workloadSpread","Subset":"subset-b"}`, + } + } + pods[0].Annotations[PodDeletionCostAnnotation] = "100" + pods[0].Labels[apps.DefaultDeploymentUniqueLabelKey] = "oldVersion" + pods[2].Annotations[PodDeletionCostAnnotation] = "100" + pods[2].Labels[apps.DefaultDeploymentUniqueLabelKey] = "oldVersion" + pods[4].Annotations[PodDeletionCostAnnotation] = "100" + pods[4].Labels[apps.DefaultDeploymentUniqueLabelKey] = "oldVersion" + pods[6].Annotations[PodDeletionCostAnnotation] = "200" + pods[6].Labels[apps.DefaultDeploymentUniqueLabelKey] = "oldVersion" + pods[8].Annotations[PodDeletionCostAnnotation] = "200" + pods[8].Labels[apps.DefaultDeploymentUniqueLabelKey] = "oldVersion" + + pods[1].Annotations[PodDeletionCostAnnotation] = "200" + pods[1].Labels[apps.DefaultDeploymentUniqueLabelKey] = "newVersion" + pods[3].Annotations[PodDeletionCostAnnotation] = "200" + pods[3].Labels[apps.DefaultDeploymentUniqueLabelKey] = "newVersion" + pods[5].Annotations[PodDeletionCostAnnotation] = "200" + pods[5].Labels[apps.DefaultDeploymentUniqueLabelKey] = "newVersion" + pods[7].Annotations[PodDeletionCostAnnotation] = "100" + pods[7].Labels[apps.DefaultDeploymentUniqueLabelKey] = "newVersion" + pods[9].Annotations[PodDeletionCostAnnotation] = "100" + pods[9].Labels[apps.DefaultDeploymentUniqueLabelKey] = "newVersion" + return pods + }, + expectWorkloadSpread: func() *appsv1alpha1.WorkloadSpread { + workloadSpread := workloadSpreadDemo.DeepCopy() + workloadSpread.Status.SubsetStatuses = make([]appsv1alpha1.WorkloadSpreadSubsetStatus, 2) + workloadSpread.Status.SubsetStatuses[0].Name = "subset-a" + workloadSpread.Status.SubsetStatuses[0].MissingReplicas = 0 + workloadSpread.Status.SubsetStatuses[0].Replicas = 6 + workloadSpread.Status.SubsetStatuses[1].Name = "subset-b" + workloadSpread.Status.SubsetStatuses[1].MissingReplicas = -1 + workloadSpread.Status.SubsetStatuses[1].Replicas = 4 + + workloadSpread.Status.VersionedSubsetStatuses = map[string][]appsv1alpha1.WorkloadSpreadSubsetStatus{ + "oldVersion": { + { + Name: "subset-a", + MissingReplicas: 0, + Replicas: 3, + }, + { + Name: "subset-b", + MissingReplicas: -1, + Replicas: 2, + }, + }, + "newVersion": { + { + Name: "subset-a", + MissingReplicas: 0, + Replicas: 3, + }, + { + Name: "subset-b", + MissingReplicas: -1, + Replicas: 2, + }, + }, + } + return workloadSpread + }, + }, { name: "create five pods, all succeed, maxReplicas = 3, missingReplicas = 0, pod-0, pod-1 deletion-cost = -100", getPods: func() []*corev1.Pod { @@ -1389,6 +1519,12 @@ func TestWorkloadSpreadReconcile(t *testing.T) { }, }, } + + if !wsutil.EnabledWorkloadSetForVersionedStatus.Has("cloneset") { + wsutil.EnabledWorkloadSetForVersionedStatus.Insert("cloneset") + defer wsutil.EnabledWorkloadSetForVersionedStatus.Delete("cloneset") + } + for _, cs := range cases { t.Run(cs.name, func(t *testing.T) { currentTime = time.Now() @@ -1437,17 +1573,31 @@ func TestWorkloadSpreadReconcile(t *testing.T) { t.Fatalf("getLatestWorkloadSpread failed: %s", err.Error()) } - latestStatus := latestWorkloadSpread.Status + latestStatus := latestWorkloadSpread.Status.SubsetStatuses by, _ := json.Marshal(latestStatus) fmt.Println(string(by)) - exceptStatus := cs.expectWorkloadSpread().Status + exceptStatus := cs.expectWorkloadSpread().Status.SubsetStatuses by, _ = json.Marshal(exceptStatus) fmt.Println(string(by)) if !apiequality.Semantic.DeepEqual(latestStatus, exceptStatus) { t.Fatalf("workloadSpread status DeepEqual failed") } + + if len(latestWorkloadSpread.Status.VersionedSubsetStatuses) > 1 { + latestVersionedStatus := latestWorkloadSpread.Status.VersionedSubsetStatuses + by, _ := json.Marshal(latestVersionedStatus) + fmt.Println(string(by)) + + exceptVersionedStatus := cs.expectWorkloadSpread().Status.VersionedSubsetStatuses + by, _ = json.Marshal(exceptVersionedStatus) + fmt.Println(string(by)) + + if !apiequality.Semantic.DeepEqual(latestVersionedStatus, exceptVersionedStatus) { + t.Fatalf("workloadSpread versioned status DeepEqual failed") + } + } }) } } @@ -1485,11 +1635,11 @@ func TestUpdateSubsetSequence(t *testing.T) { } r := ReconcileWorkloadSpread{} - subsetsPods, err := r.groupPod(workloadSpread, pods, 5) + versionedPodMap, subsetsPods, err := r.groupVersionedPods(workloadSpread, pods, 5) if err != nil { t.Fatalf("error group pods") } - status, _ := r.calculateWorkloadSpreadStatus(workloadSpread, subsetsPods, 5) + status, _ := r.calculateWorkloadSpreadStatus(workloadSpread, versionedPodMap, subsetsPods, 5) if status == nil { t.Fatalf("error get WorkloadSpread status") } else { diff --git a/pkg/controller/workloadspread/workloadspread_event_handler.go b/pkg/controller/workloadspread/workloadspread_event_handler.go index 7c6193d111..f8b5573803 100644 --- a/pkg/controller/workloadspread/workloadspread_event_handler.go +++ b/pkg/controller/workloadspread/workloadspread_event_handler.go @@ -20,11 +20,11 @@ import ( "context" "encoding/json" "reflect" - "strings" appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" @@ -39,16 +39,16 @@ import ( appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" appsv1beta1 "github.com/openkruise/kruise/apis/apps/v1beta1" - "github.com/openkruise/kruise/pkg/util/configuration" wsutil "github.com/openkruise/kruise/pkg/util/workloadspread" ) type EventAction string const ( - CreateEventAction EventAction = "Create" - UpdateEventAction EventAction = "Update" - DeleteEventAction EventAction = "Delete" + CreateEventAction EventAction = "Create" + UpdateEventAction EventAction = "Update" + DeleteEventAction EventAction = "Delete" + DeploymentRevisionAnnotation = "deployment.kubernetes.io/revision" ) var _ handler.EventHandler = &podEventHandler{} @@ -63,7 +63,7 @@ func (p *podEventHandler) Update(evt event.UpdateEvent, q workqueue.RateLimiting oldPod := evt.ObjectOld.(*corev1.Pod) newPod := evt.ObjectNew.(*corev1.Pod) - if kubecontroller.IsPodActive(oldPod) && !kubecontroller.IsPodActive(newPod) { + if kubecontroller.IsPodActive(oldPod) && !kubecontroller.IsPodActive(newPod) || wsutil.GetPodVersion(oldPod) != wsutil.GetPodVersion(newPod) { p.handlePod(q, newPod, UpdateEventAction) } } @@ -103,15 +103,22 @@ func (w workloadEventHandler) Update(evt event.UpdateEvent, q workqueue.RateLimi var gvk schema.GroupVersionKind var oldReplicas int32 var newReplicas int32 + var otherChanges bool switch evt.ObjectNew.(type) { case *appsv1alpha1.CloneSet: - oldReplicas = *evt.ObjectOld.(*appsv1alpha1.CloneSet).Spec.Replicas - newReplicas = *evt.ObjectNew.(*appsv1alpha1.CloneSet).Spec.Replicas + oldObject := evt.ObjectOld.(*appsv1alpha1.CloneSet) + newObject := evt.ObjectNew.(*appsv1alpha1.CloneSet) + oldReplicas = *oldObject.Spec.Replicas + newReplicas = *newObject.Spec.Replicas + otherChanges = newObject.Status.UpdateRevision != oldObject.Status.CurrentRevision gvk = controllerKruiseKindCS case *appsv1.Deployment: - oldReplicas = *evt.ObjectOld.(*appsv1.Deployment).Spec.Replicas - newReplicas = *evt.ObjectNew.(*appsv1.Deployment).Spec.Replicas + oldObject := evt.ObjectOld.(*appsv1.Deployment) + newObject := evt.ObjectNew.(*appsv1.Deployment) + oldReplicas = *oldObject.Spec.Replicas + newReplicas = *newObject.Spec.Replicas + otherChanges = newObject.Annotations[DeploymentRevisionAnnotation] != oldObject.Annotations[DeploymentRevisionAnnotation] gvk = controllerKindDep case *appsv1.ReplicaSet: oldReplicas = *evt.ObjectOld.(*appsv1.ReplicaSet).Spec.Replicas @@ -130,20 +137,21 @@ func (w workloadEventHandler) Update(evt event.UpdateEvent, q workqueue.RateLimi newReplicas = *evt.ObjectNew.(*appsv1beta1.StatefulSet).Spec.Replicas gvk = controllerKruiseKindSts case *unstructured.Unstructured: - oldReplicas = w.getReplicasFromUnstructured(evt.ObjectOld.(*unstructured.Unstructured)) - newReplicas = w.getReplicasFromUnstructured(evt.ObjectNew.(*unstructured.Unstructured)) + oldReplicas = wsutil.GetReplicasFromCustomWorkload(w.Reader, evt.ObjectOld.(*unstructured.Unstructured)) + newReplicas = wsutil.GetReplicasFromCustomWorkload(w.Reader, evt.ObjectNew.(*unstructured.Unstructured)) gvk = evt.ObjectNew.(*unstructured.Unstructured).GroupVersionKind() default: return } // workload replicas changed, and reconcile corresponding WorkloadSpread - if oldReplicas != newReplicas { + if oldReplicas != newReplicas || otherChanges { workloadNsn := types.NamespacedName{ Namespace: evt.ObjectNew.GetNamespace(), Name: evt.ObjectNew.GetName(), } - ws, err := w.getWorkloadSpreadForWorkload(workloadNsn, gvk) + owner := metav1.GetControllerOfNoCopy(evt.ObjectNew) + ws, err := w.getWorkloadSpreadForWorkload(workloadNsn, gvk, owner) if err != nil { klog.Errorf("unable to get WorkloadSpread related with %s (%s/%s), err: %v", gvk.Kind, workloadNsn.Namespace, workloadNsn.Name, err) @@ -158,40 +166,6 @@ func (w workloadEventHandler) Update(evt event.UpdateEvent, q workqueue.RateLimi } } -func (w *workloadEventHandler) getReplicasFromUnstructured(object *unstructured.Unstructured) int32 { - if object == nil || reflect.ValueOf(object).IsNil() { - return 0 - } - whiteList, err := configuration.GetWSWatchCustomWorkloadWhiteList(w.Reader) - if err != nil { - klog.Errorf("Failed to get workloadSpread custom workload white list from kruise config map") - return 0 - } - - gvk := object.GroupVersionKind() - for _, workload := range whiteList.Workloads { - if workload.GroupVersionKind.GroupKind() != gvk.GroupKind() { - continue - } - var exists bool - var replicas int64 - path := strings.Split(workload.ReplicasPath, ".") - if len(path) > 0 { - replicas, exists, err = unstructured.NestedInt64(object.Object, path...) - if err != nil || !exists { - klog.Errorf("Failed to get replicas from %v, replicas path %s", gvk, workload.ReplicasPath) - } - } else { - replicas, exists, err = unstructured.NestedInt64(object.Object, "spec", "replicas") - if err != nil || !exists { - klog.Errorf("Failed to get replicas from %v, replicas path %s", gvk, workload.ReplicasPath) - } - } - return int32(replicas) - } - return 0 -} - func (w workloadEventHandler) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) { w.handleWorkload(q, evt.Object, DeleteEventAction) } @@ -223,7 +197,8 @@ func (w *workloadEventHandler) handleWorkload(q workqueue.RateLimitingInterface, Namespace: obj.GetNamespace(), Name: obj.GetName(), } - ws, err := w.getWorkloadSpreadForWorkload(workloadNsn, gvk) + owner := metav1.GetControllerOfNoCopy(obj) + ws, err := w.getWorkloadSpreadForWorkload(workloadNsn, gvk, owner) if err != nil { klog.Errorf("unable to get WorkloadSpread related with %s (%s/%s), err: %v", gvk.Kind, workloadNsn.Namespace, workloadNsn.Name, err) @@ -239,7 +214,7 @@ func (w *workloadEventHandler) handleWorkload(q workqueue.RateLimitingInterface, func (w *workloadEventHandler) getWorkloadSpreadForWorkload( workloadNamespaceName types.NamespacedName, - gvk schema.GroupVersionKind) (*appsv1alpha1.WorkloadSpread, error) { + gvk schema.GroupVersionKind, ownerRef *metav1.OwnerReference) (*appsv1alpha1.WorkloadSpread, error) { wsList := &appsv1alpha1.WorkloadSpreadList{} listOptions := &client.ListOptions{Namespace: workloadNamespaceName.Namespace} if err := w.List(context.TODO(), wsList, listOptions); err != nil { @@ -247,6 +222,17 @@ func (w *workloadEventHandler) getWorkloadSpreadForWorkload( return nil, err } + // In case of ReplicaSet owned by Deployment, we should consider if the + // Deployment is referred by workloadSpread. + var ownerKey *types.NamespacedName + var ownerGvk schema.GroupVersionKind + if ownerRef != nil && reflect.DeepEqual(gvk, controllerKindRS) { + ownerGvk = schema.FromAPIVersionAndKind(ownerRef.APIVersion, ownerRef.Kind) + if reflect.DeepEqual(ownerGvk, controllerKindDep) { + ownerKey = &types.NamespacedName{Namespace: workloadNamespaceName.Namespace, Name: ownerRef.Name} + } + } + for _, ws := range wsList.Items { if ws.DeletionTimestamp != nil { continue @@ -257,13 +243,12 @@ func (w *workloadEventHandler) getWorkloadSpreadForWorkload( continue } - targetGV, err := schema.ParseGroupVersion(targetRef.APIVersion) - if err != nil { - klog.Errorf("failed to parse targetRef's group version: %s", targetRef.APIVersion) - continue + // Ignore version + targetGk := schema.FromAPIVersionAndKind(targetRef.APIVersion, targetRef.Kind).GroupKind() + if reflect.DeepEqual(targetGk, gvk.GroupKind()) && targetRef.Name == workloadNamespaceName.Name { + return &ws, nil } - - if targetRef.Kind == gvk.Kind && targetGV.Group == gvk.Group && targetRef.Name == workloadNamespaceName.Name { + if ownerKey != nil && reflect.DeepEqual(targetGk, ownerGvk.GroupKind()) && targetRef.Name == ownerKey.Name { return &ws, nil } } diff --git a/pkg/controller/workloadspread/workloadspread_event_handler_test.go b/pkg/controller/workloadspread/workloadspread_event_handler_test.go index 66433ff651..7f722de760 100644 --- a/pkg/controller/workloadspread/workloadspread_event_handler_test.go +++ b/pkg/controller/workloadspread/workloadspread_event_handler_test.go @@ -380,7 +380,7 @@ func TestGetWorkloadSpreadForCloneSet(t *testing.T) { Name: cs.getCloneSet().Name, } handler := workloadEventHandler{Reader: fakeClient} - workloadSpread, _ := handler.getWorkloadSpreadForWorkload(nsn, controllerKruiseKindCS) + workloadSpread, _ := handler.getWorkloadSpreadForWorkload(nsn, controllerKruiseKindCS, nil) expectTopology := cs.expectWorkloadSpread() if expectTopology == nil { @@ -506,7 +506,7 @@ func TestGetWorkloadSpreadForDeployment(t *testing.T) { Name: cs.getDeployment().Name, } handler := workloadEventHandler{Reader: fakeClient} - workloadSpread, _ := handler.getWorkloadSpreadForWorkload(nsn, controllerKindDep) + workloadSpread, _ := handler.getWorkloadSpreadForWorkload(nsn, controllerKindDep, nil) expectTopology := cs.expectWorkloadSpread() if expectTopology == nil { @@ -608,7 +608,7 @@ func TestGetWorkloadSpreadForJob(t *testing.T) { Name: cs.getJob().Name, } handler := workloadEventHandler{Reader: fakeClient} - workloadSpread, _ := handler.getWorkloadSpreadForWorkload(nsn, controllerKindJob) + workloadSpread, _ := handler.getWorkloadSpreadForWorkload(nsn, controllerKindJob, nil) expectTopology := cs.expectWorkloadSpread() if expectTopology == nil { @@ -734,7 +734,7 @@ func TestGetWorkloadSpreadForReplicaSet(t *testing.T) { Name: cs.getReplicaset().Name, } handler := workloadEventHandler{Reader: fakeClient} - workloadSpread, _ := handler.getWorkloadSpreadForWorkload(nsn, controllerKindRS) + workloadSpread, _ := handler.getWorkloadSpreadForWorkload(nsn, controllerKindRS, nil) expectTopology := cs.expectWorkloadSpread() if expectTopology == nil { @@ -860,7 +860,7 @@ func TestGetWorkloadSpreadForStatefulSet(t *testing.T) { Name: cs.getStatefulSet().Name, } handler := workloadEventHandler{Reader: fakeClient} - workloadSpread, _ := handler.getWorkloadSpreadForWorkload(nsn, controllerKindSts) + workloadSpread, _ := handler.getWorkloadSpreadForWorkload(nsn, controllerKindSts, nil) expectTopology := cs.expectWorkloadSpread() if expectTopology == nil { @@ -986,7 +986,7 @@ func TestGetWorkloadSpreadForAdvancedStatefulSet(t *testing.T) { Name: cs.getStatefulSet().Name, } handler := workloadEventHandler{Reader: fakeClient} - workloadSpread, _ := handler.getWorkloadSpreadForWorkload(nsn, controllerKruiseKindSts) + workloadSpread, _ := handler.getWorkloadSpreadForWorkload(nsn, controllerKruiseKindSts, nil) expectTopology := cs.expectWorkloadSpread() if expectTopology == nil { diff --git a/pkg/util/tools.go b/pkg/util/tools.go index 01b1b5ef9a..84b4388d4f 100644 --- a/pkg/util/tools.go +++ b/pkg/util/tools.go @@ -26,6 +26,9 @@ import ( "github.com/docker/distribution/reference" appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + apiequality "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/runtime/schema" intstrutil "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/klog/v2" @@ -223,3 +226,12 @@ func GetScaledValueFromIntOrPercent(intOrPercent *intstrutil.IntOrString, total } return 0, fmt.Errorf("invalid type: neither int nor percentage") } + +func EqualIgnoreHash(template1, template2 *corev1.PodTemplateSpec) bool { + t1Copy := template1.DeepCopy() + t2Copy := template2.DeepCopy() + // Remove hash labels from template.Labels before comparing + delete(t1Copy.Labels, appsv1.DefaultDeploymentUniqueLabelKey) + delete(t2Copy.Labels, appsv1.DefaultDeploymentUniqueLabelKey) + return apiequality.Semantic.DeepEqual(t1Copy, t2Copy) +} diff --git a/pkg/util/workloadspread/workloadspread.go b/pkg/util/workloadspread/workloadspread.go index 432b803677..57fa68063c 100644 --- a/pkg/util/workloadspread/workloadspread.go +++ b/pkg/util/workloadspread/workloadspread.go @@ -19,9 +19,12 @@ package workloadspread import ( "context" "encoding/json" + "flag" + "fmt" "math" "regexp" "strconv" + "strings" "time" appsv1 "k8s.io/api/apps/v1" @@ -32,6 +35,8 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" + intstrutil "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/client-go/util/retry" @@ -42,7 +47,9 @@ import ( appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" appsv1beta1 "github.com/openkruise/kruise/apis/apps/v1beta1" kubeClient "github.com/openkruise/kruise/pkg/client" + "github.com/openkruise/kruise/pkg/controller/cloneset/utils" "github.com/openkruise/kruise/pkg/util" + utilclient "github.com/openkruise/kruise/pkg/util/client" "github.com/openkruise/kruise/pkg/util/configuration" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" ) @@ -55,6 +62,10 @@ const ( PodDeletionCostPositive = 100 PodDeletionCostNegative = -100 + + // VersionIgnored means all Pods should be regard as a universal version, + // i.e., ignore the Pod/Workload version. + VersionIgnored = "versionIgnored" ) var ( @@ -65,8 +76,25 @@ var ( controllerKindRS = appsv1.SchemeGroupVersion.WithKind("ReplicaSet") controllerKindDep = appsv1.SchemeGroupVersion.WithKind("Deployment") controllerKindSts = appsv1.SchemeGroupVersion.WithKind("StatefulSet") + + enabledWorkloadStrForVersionedStatus = "deployment,replicaset" + EnabledWorkloadSetForVersionedStatus sets.String + + GenerateNotFoundError = func(object client.Object, msg string) error { + objectGroupKind := object.GetObjectKind().GroupVersionKind().GroupKind() + return errors.NewNotFound(schema.GroupResource{Group: objectGroupKind.Group, Resource: objectGroupKind.Kind}, msg) + } ) +func init() { + flag.StringVar(&enabledWorkloadStrForVersionedStatus, "ws-enabled-versioned-status", enabledWorkloadStrForVersionedStatus, "Enabled workload that uses versioned subset status of WorkloadSpread.") + enabledWorkloadStrForVersionedStatus = strings.ToLower(enabledWorkloadStrForVersionedStatus) + EnabledWorkloadSetForVersionedStatus = sets.NewString(strings.Split(enabledWorkloadStrForVersionedStatus, ",")...) + if EnabledWorkloadSetForVersionedStatus.Has("deployment") { + EnabledWorkloadSetForVersionedStatus.Insert("replicaset") + } +} + type Operation string const ( @@ -362,9 +390,9 @@ func (h *Handler) acquireSuitableSubset(matchedWS *appsv1alpha1.WorkloadSpread, // check whether WorkloadSpread has suitable subset for the pod // 1. changed indicates whether workloadSpread status changed // 2. suitableSubset is matched subset for the pod - changed, suitableSubset, generatedUID = h.updateSubsetForPod(wsClone, pod, injectWS, operation) - if !changed { - return nil + changed, suitableSubset, generatedUID, err = h.updateSubsetForPod(wsClone, pod, injectWS, operation) + if !changed || err != nil { + return err } // update WorkloadSpread status @@ -448,28 +476,43 @@ func (h *Handler) tryToGetTheLatestMatchedWS(matchedWS *appsv1alpha1.WorkloadSpr // 3. generatedUID(types.UID) indicates which workloadSpread generate a UID for identifying Pod without a full name. func (h *Handler) updateSubsetForPod(ws *appsv1alpha1.WorkloadSpread, pod *corev1.Pod, injectWS *InjectWorkloadSpread, operation Operation) ( - bool, *appsv1alpha1.WorkloadSpreadSubsetStatus, string) { + bool, *appsv1alpha1.WorkloadSpreadSubsetStatus, string, error) { var suitableSubset *appsv1alpha1.WorkloadSpreadSubsetStatus var generatedUID string + // We only care about the corresponding versioned subset status. + var err error + version := GetPodVersion(pod) + subsetStatuses := ws.Status.VersionedSubsetStatuses[version] + if len(subsetStatuses) == 0 { + subsetStatuses, err = h.initializedSubsetStatuses(ws) + if err != nil { + return false, nil, "", err + } + if ws.Status.VersionedSubsetStatuses == nil { + ws.Status.VersionedSubsetStatuses = map[string][]appsv1alpha1.WorkloadSpreadSubsetStatus{} + } + ws.Status.VersionedSubsetStatuses[version] = subsetStatuses + } + switch operation { case CreateOperation: if pod.Name != "" { // pod is already in CreatingPods/DeletingPods List, then return - if isRecord, subset := isPodRecordedInSubset(ws, pod.Name); isRecord { - return false, subset, "" + if isRecord, subset := isPodRecordedInSubset(subsetStatuses, pod.Name); isRecord { + return false, subset, "", nil } } - suitableSubset = h.getSuitableSubset(ws) + suitableSubset = h.getSuitableSubset(subsetStatuses) if suitableSubset == nil { - klog.V(5).Infof("WorkloadSpread (%s/%s) don't have a suitable subset for Pod (%s)", + klog.Warningf("WorkloadSpread (%s/%s) don't have a suitable subset for Pod (%s) when creating", ws.Namespace, ws.Name, pod.Name) - return false, nil, "" + return false, nil, "", nil } // no need to update WorkloadSpread status if MaxReplicas == nil if suitableSubset.MissingReplicas == -1 { - return false, suitableSubset, "" + return false, suitableSubset, "", nil } if suitableSubset.CreatingPods == nil { suitableSubset.CreatingPods = map[string]metav1.Time{} @@ -487,17 +530,18 @@ func (h *Handler) updateSubsetForPod(ws *appsv1alpha1.WorkloadSpread, } case DeleteOperation, EvictionOperation: // pod is already in DeletingPods/CreatingPods List, then return - if isRecord, _ := isPodRecordedInSubset(ws, pod.Name); isRecord { - return false, nil, "" + if isRecord, _ := isPodRecordedInSubset(subsetStatuses, pod.Name); isRecord { + return false, nil, "", nil } - suitableSubset = getSpecificSubset(ws, injectWS.Subset) + suitableSubset = getSpecificSubset(subsetStatuses, injectWS.Subset) if suitableSubset == nil { - klog.V(5).Infof("Pod (%s/%s) matched WorkloadSpread (%s) not found Subset(%s)", ws.Namespace, pod.Name, ws.Name, injectWS.Subset) - return false, nil, "" + klog.V(5).Infof("Pod (%s/%s) matched WorkloadSpread (%s) not found Subset(%s) when deleting", + ws.Namespace, pod.Name, ws.Name, injectWS.Subset) + return false, nil, "", nil } if suitableSubset.MissingReplicas == -1 { - return false, suitableSubset, "" + return false, suitableSubset, "", nil } if suitableSubset.DeletingPods == nil { suitableSubset.DeletingPods = map[string]metav1.Time{} @@ -507,24 +551,24 @@ func (h *Handler) updateSubsetForPod(ws *appsv1alpha1.WorkloadSpread, suitableSubset.MissingReplicas++ } default: - return false, nil, "" + return false, nil, "", nil } // update subset status - for i := range ws.Status.SubsetStatuses { - if ws.Status.SubsetStatuses[i].Name == suitableSubset.Name { - ws.Status.SubsetStatuses[i] = *suitableSubset + for i := range ws.Status.VersionedSubsetStatuses[version] { + if ws.Status.VersionedSubsetStatuses[version][i].Name == suitableSubset.Name { + ws.Status.VersionedSubsetStatuses[version][i] = *suitableSubset break } } - return true, suitableSubset, generatedUID + return true, suitableSubset, generatedUID, nil } // return two parameters // 1. isRecord(bool) 2. SubsetStatus -func isPodRecordedInSubset(ws *appsv1alpha1.WorkloadSpread, podName string) (bool, *appsv1alpha1.WorkloadSpreadSubsetStatus) { - for _, subset := range ws.Status.SubsetStatuses { +func isPodRecordedInSubset(subsetStatuses []appsv1alpha1.WorkloadSpreadSubsetStatus, podName string) (bool, *appsv1alpha1.WorkloadSpreadSubsetStatus) { + for _, subset := range subsetStatuses { if _, ok := subset.CreatingPods[podName]; ok { return true, &subset } @@ -609,8 +653,8 @@ func injectWorkloadSpreadIntoPod(ws *appsv1alpha1.WorkloadSpread, pod *corev1.Po return true, nil } -func getSpecificSubset(ws *appsv1alpha1.WorkloadSpread, specifySubset string) *appsv1alpha1.WorkloadSpreadSubsetStatus { - for _, subset := range ws.Status.SubsetStatuses { +func getSpecificSubset(subsetStatuses []appsv1alpha1.WorkloadSpreadSubsetStatus, specifySubset string) *appsv1alpha1.WorkloadSpreadSubsetStatus { + for _, subset := range subsetStatuses { if specifySubset == subset.Name { return &subset } @@ -618,9 +662,9 @@ func getSpecificSubset(ws *appsv1alpha1.WorkloadSpread, specifySubset string) *a return nil } -func (h *Handler) getSuitableSubset(ws *appsv1alpha1.WorkloadSpread) *appsv1alpha1.WorkloadSpreadSubsetStatus { - for i := range ws.Status.SubsetStatuses { - subset := &ws.Status.SubsetStatuses[i] +func (h *Handler) getSuitableSubset(subsetStatuses []appsv1alpha1.WorkloadSpreadSubsetStatus) *appsv1alpha1.WorkloadSpreadSubsetStatus { + for i := range subsetStatuses { + subset := &subsetStatuses[i] canSchedule := true for _, condition := range subset.Conditions { if condition.Type == appsv1alpha1.SubsetSchedulable && condition.Status == corev1.ConditionFalse { @@ -643,7 +687,7 @@ func (h *Handler) getSuitableSubset(ws *appsv1alpha1.WorkloadSpread) *appsv1alph return nil } -func (h Handler) isReferenceEqual(target *appsv1alpha1.TargetReference, owner *metav1.OwnerReference, namespace string) bool { +func (h *Handler) isReferenceEqual(target *appsv1alpha1.TargetReference, owner *metav1.OwnerReference, namespace string) bool { if owner == nil { return false } @@ -712,26 +756,10 @@ func getSubsetCondition(ws *appsv1alpha1.WorkloadSpread, subsetName string, cond return nil } -func (h Handler) getObjectOf(owner *metav1.OwnerReference, namespace string) (client.Object, error) { - var object client.Object +func (h *Handler) getObjectOf(owner *metav1.OwnerReference, namespace string) (client.Object, error) { objectKey := types.NamespacedName{Namespace: namespace, Name: owner.Name} objectGvk := schema.FromAPIVersionAndKind(owner.APIVersion, owner.Kind) - switch objectGvk { - case controllerKindRS: - object = &appsv1.ReplicaSet{} - case controllerKindDep: - object = &appsv1.Deployment{} - case controllerKindSts: - object = &appsv1.StatefulSet{} - case controllerKruiseKindBetaSts, controllerKruiseKindAlphaSts: - object = &appsv1beta1.StatefulSet{} - case controllerKruiseKindCS: - object = &appsv1alpha1.CloneSet{} - default: - o := unstructured.Unstructured{} - o.SetGroupVersionKind(objectGvk) - object = &o - } + object := GenerateEmptyWorkloadObject(objectGvk, objectKey) if err := h.Get(context.TODO(), objectKey, object); err != nil { return nil, err } @@ -760,3 +788,179 @@ func initializeWorkloadsInWhiteList(c client.Client) { } workloadsInWhiteListInitialized = true } + +func (h *Handler) initializedSubsetStatuses(ws *appsv1alpha1.WorkloadSpread) ([]appsv1alpha1.WorkloadSpreadSubsetStatus, error) { + replicas, err := h.getWorkloadReplicas(ws) + if err != nil { + return nil, err + } + var subsetStatuses []appsv1alpha1.WorkloadSpreadSubsetStatus + for i := range ws.Spec.Subsets { + subset := ws.Spec.Subsets[i] + subsetStatus := appsv1alpha1.WorkloadSpreadSubsetStatus{Name: subset.Name} + if subset.MaxReplicas == nil { + subsetStatus.MissingReplicas = -1 + } else { + missingReplicas, _ := intstrutil.GetScaledValueFromIntOrPercent(subset.MaxReplicas, int(replicas), true) + subsetStatus.MissingReplicas = int32(missingReplicas) + } + subsetStatuses = append(subsetStatuses, subsetStatus) + } + return subsetStatuses, nil +} + +func (h *Handler) getWorkloadReplicas(ws *appsv1alpha1.WorkloadSpread) (int32, error) { + if ws.Spec.TargetReference == nil { + return 0, nil + } + gvk := schema.FromAPIVersionAndKind(ws.Spec.TargetReference.APIVersion, ws.Spec.TargetReference.Kind) + key := types.NamespacedName{Namespace: ws.Namespace, Name: ws.Spec.TargetReference.Name} + object := GenerateEmptyWorkloadObject(gvk, key) + + // TODO: fetch workload from API Server directly to avoid latency of informer if using Percentage settings of subset[x].maxReplicas. + err := h.Get(context.TODO(), key, object) + if err != nil { + return 0, client.IgnoreNotFound(err) + } + + switch o := object.(type) { + case *appsv1.Deployment: + return *o.Spec.Replicas, nil + case *appsv1.ReplicaSet: + return *o.Spec.Replicas, nil + case *appsv1.StatefulSet: + return *o.Spec.Replicas, nil + case *batchv1.Job: + return *o.Spec.Parallelism, nil + case *appsv1alpha1.CloneSet: + return *o.Spec.Replicas, nil + case *appsv1beta1.StatefulSet: + return *o.Spec.Replicas, nil + case *unstructured.Unstructured: + return GetReplicasFromCustomWorkload(h.Client, o), nil + } + return 0, fmt.Errorf("got unexpected workload type for workloadspread %s/%s", ws.Namespace, ws.Name) +} + +func GenerateEmptyWorkloadObject(gvk schema.GroupVersionKind, key types.NamespacedName) (object client.Object) { + switch gvk { + case controllerKindRS: + object = &appsv1.ReplicaSet{} + case controllerKindDep: + object = &appsv1.Deployment{} + case controllerKindSts: + object = &appsv1.StatefulSet{} + case controllerKindJob: + object = &batchv1.Job{} + case controllerKruiseKindCS: + object = &appsv1alpha1.CloneSet{} + case controllerKruiseKindAlphaSts, controllerKruiseKindBetaSts: + object = &appsv1beta1.StatefulSet{} + default: + unstructuredObject := &unstructured.Unstructured{} + unstructuredObject.SetGroupVersionKind(gvk) + object = unstructuredObject + } + object.SetName(key.Name) + object.SetNamespace(key.Namespace) + return +} + +func GetReplicasFromCustomWorkload(reader client.Reader, object *unstructured.Unstructured) int32 { + if object == nil { + return 0 + } + whiteList, err := configuration.GetWSWatchCustomWorkloadWhiteList(reader) + if err != nil { + klog.Errorf("Failed to get workloadSpread custom workload white list from kruise config map") + return 0 + } + + gvk := object.GroupVersionKind() + for _, wl := range whiteList.Workloads { + if wl.GroupVersionKind.GroupKind() != gvk.GroupKind() { + continue + } + var exists bool + var replicas int64 + path := strings.Split(wl.ReplicasPath, ".") + if len(path) > 0 { + replicas, exists, err = unstructured.NestedInt64(object.Object, path...) + if err != nil || !exists { + klog.Errorf("Failed to get replicas from %v, replicas path %s", gvk, wl.ReplicasPath) + } + } else { + replicas, exists, err = unstructured.NestedInt64(object.Object, "spec", "replicas") + if err != nil || !exists { + klog.Errorf("Failed to get replicas from %v, replicas path %s", gvk, wl.ReplicasPath) + } + } + return int32(replicas) + } + return 0 +} + +func GetPodVersion(pod *corev1.Pod) string { + if !enableVersionedStatus(pod) { + return VersionIgnored + } + + if version, exists := pod.Labels[appsv1.DefaultDeploymentUniqueLabelKey]; exists { + return version + } + if version, exists := pod.Labels[appsv1.ControllerRevisionHashLabelKey]; exists { + return utils.GetShortHash(version) + } + return VersionIgnored +} + +func GetWorkloadVersion(reader client.Reader, object client.Object) (string, error) { + if !enableVersionedStatus(object) { + return VersionIgnored, nil + } + + switch o := object.(type) { + case *appsv1.ReplicaSet: + return o.Labels[appsv1.DefaultDeploymentUniqueLabelKey], nil + + case *appsv1alpha1.CloneSet: + if o.Generation > o.Status.ObservedGeneration { + return "", GenerateNotFoundError(o, fmt.Sprintf("%s latest version", klog.KObj(o))) + } + return utils.GetShortHash(o.Status.UpdateRevision), nil + + case *appsv1.Deployment: + rsLister := &appsv1.ReplicaSetList{} + selector, _ := metav1.LabelSelectorAsSelector(o.Spec.Selector) + err := reader.List(context.TODO(), rsLister, &client.ListOptions{LabelSelector: selector, Namespace: o.Namespace}, utilclient.DisableDeepCopy) + if err != nil { + return "", err + } + + for i := range rsLister.Items { + rs := &rsLister.Items[i] + owner := metav1.GetControllerOfNoCopy(rs) + if owner == nil || owner.UID != o.UID || !rs.DeletionTimestamp.IsZero() { + continue + } + if util.EqualIgnoreHash(&o.Spec.Template, &rs.Spec.Template) { + return rs.Labels[appsv1.DefaultDeploymentUniqueLabelKey], nil + } + } + return "", GenerateNotFoundError(o, fmt.Sprintf("%s latest version", klog.KObj(o))) + } + return VersionIgnored, nil +} + +func enableVersionedStatus(object client.Object) bool { + objectKind := object.GetObjectKind().GroupVersionKind().Kind + if EnabledWorkloadSetForVersionedStatus.Has(strings.ToLower(objectKind)) { + return true + } + + owner := metav1.GetControllerOfNoCopy(object) + if owner != nil && EnabledWorkloadSetForVersionedStatus.Has(strings.ToLower(owner.Kind)) { + return true + } + return false +} diff --git a/pkg/util/workloadspread/workloadspread_test.go b/pkg/util/workloadspread/workloadspread_test.go index dc79ae2ed5..6d624b15e9 100644 --- a/pkg/util/workloadspread/workloadspread_test.go +++ b/pkg/util/workloadspread/workloadspread_test.go @@ -21,11 +21,10 @@ import ( "encoding/json" "fmt" "reflect" + "strconv" "testing" "time" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" - appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -35,6 +34,8 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/apimachinery/pkg/util/uuid" utilpointer "k8s.io/utils/pointer" @@ -78,6 +79,32 @@ var ( }, } + podDemo2 = &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: "default", + Annotations: map[string]string{}, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps/v1", + Kind: "Deployment", + Name: "workload-xyz", + Controller: utilpointer.BoolPtr(true), + UID: types.UID("a03eb001-27eb-4713-b634-7c46f6861758"), + BlockOwnerDeletion: utilpointer.BoolPtr(true), + }, + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "nginx", + Image: "nginx:1.15.1", + }, + }, + }, + } + workloadSpreadDemo = &appsv1alpha1.WorkloadSpread{ TypeMeta: metav1.TypeMeta{ APIVersion: "apps.kruise.io/v1alpha1", @@ -155,6 +182,63 @@ var ( DeletingPods: map[string]metav1.Time{}, }, }, + VersionedSubsetStatuses: map[string][]appsv1alpha1.WorkloadSpreadSubsetStatus{ + VersionIgnored: { + { + Name: "subset-a", + MissingReplicas: 5, + CreatingPods: map[string]metav1.Time{}, + DeletingPods: map[string]metav1.Time{}, + }, + }, + }, + }, + } + + workloadSpreadDemo2 = &appsv1alpha1.WorkloadSpread{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "apps.kruise.io/v1alpha1", + Kind: "WorkloadSpread", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-ws", + Namespace: "default", + }, + Spec: appsv1alpha1.WorkloadSpreadSpec{ + TargetReference: &appsv1alpha1.TargetReference{ + APIVersion: "apps.kruise.io/v1alpha1", + Kind: "CloneSet", + Name: "workload", + }, + Subsets: []appsv1alpha1.WorkloadSpreadSubset{ + { + Name: "subset-a", + MaxReplicas: &intstr.IntOrString{Type: intstr.Int, IntVal: 2}, + }, + { + Name: "subset-b", + MaxReplicas: &intstr.IntOrString{Type: intstr.Int, IntVal: 3}, + }, + { + Name: "subset-c", + }, + }, + }, + Status: appsv1alpha1.WorkloadSpreadStatus{ + SubsetStatuses: []appsv1alpha1.WorkloadSpreadSubsetStatus{ + { + Name: "subset-a", + MissingReplicas: 2, + }, + { + Name: "subset-b", + MissingReplicas: 3, + }, + { + Name: "subset-c", + MissingReplicas: -1, + }, + }, }, } @@ -374,9 +458,11 @@ func TestWorkloadSpreadCreatePodWithoutFullName(t *testing.T) { DeletingPods: map[string]metav1.Time{}, } ws.Status.SubsetStatuses = append(ws.Status.SubsetStatuses, status) + ws.Status.VersionedSubsetStatuses = map[string][]appsv1alpha1.WorkloadSpreadSubsetStatus{} + ws.Status.VersionedSubsetStatuses[VersionIgnored] = ws.Status.SubsetStatuses pod := podDemo.DeepCopy() pod.Name = "" - _, suitableSubset, generatedUID := handler.updateSubsetForPod(ws, pod, nil, CreateOperation) + _, suitableSubset, generatedUID, _ := handler.updateSubsetForPod(ws, pod, nil, CreateOperation) if generatedUID == "" { t.Fatalf("generate id failed") } @@ -472,6 +558,9 @@ func TestWorkloadSpreadMutatingPod(t *testing.T) { workloadSpread.ResourceVersion = "1" workloadSpread.Status.SubsetStatuses[0].MissingReplicas = 4 workloadSpread.Status.SubsetStatuses[0].CreatingPods[podDemo.Name] = metav1.Time{Time: defaultTime} + workloadSpread.Status.VersionedSubsetStatuses = map[string][]appsv1alpha1.WorkloadSpreadSubsetStatus{ + VersionIgnored: workloadSpread.Status.SubsetStatuses, + } return workloadSpread }, }, @@ -503,6 +592,9 @@ func TestWorkloadSpreadMutatingPod(t *testing.T) { DeletingPods: map[string]metav1.Time{}, } demo.Status.SubsetStatuses = append(demo.Status.SubsetStatuses, status) + demo.Status.VersionedSubsetStatuses = map[string][]appsv1alpha1.WorkloadSpreadSubsetStatus{ + VersionIgnored: demo.Status.SubsetStatuses, + } return demo }, getOperation: func() Operation { @@ -556,6 +648,9 @@ func TestWorkloadSpreadMutatingPod(t *testing.T) { DeletingPods: map[string]metav1.Time{}, } demo.Status.SubsetStatuses = append(demo.Status.SubsetStatuses, status) + demo.Status.VersionedSubsetStatuses = map[string][]appsv1alpha1.WorkloadSpreadSubsetStatus{ + VersionIgnored: demo.Status.SubsetStatuses, + } demo.ResourceVersion = "1" return demo }, @@ -568,6 +663,9 @@ func TestWorkloadSpreadMutatingPod(t *testing.T) { getWorkloadSpread: func() *appsv1alpha1.WorkloadSpread { demo := workloadSpreadDemo.DeepCopy() demo.Status.SubsetStatuses[0].MissingReplicas = 0 + demo.Status.VersionedSubsetStatuses = map[string][]appsv1alpha1.WorkloadSpreadSubsetStatus{ + VersionIgnored: demo.Status.SubsetStatuses, + } return demo }, getOperation: func() Operation { @@ -580,6 +678,9 @@ func TestWorkloadSpreadMutatingPod(t *testing.T) { expectWorkloadSpread: func() *appsv1alpha1.WorkloadSpread { workloadSpread := workloadSpreadDemo.DeepCopy() workloadSpread.Status.SubsetStatuses[0].MissingReplicas = 0 + workloadSpread.Status.VersionedSubsetStatuses = map[string][]appsv1alpha1.WorkloadSpreadSubsetStatus{ + VersionIgnored: workloadSpread.Status.SubsetStatuses, + } return workloadSpread }, }, @@ -613,6 +714,9 @@ func TestWorkloadSpreadMutatingPod(t *testing.T) { getWorkloadSpread: func() *appsv1alpha1.WorkloadSpread { demo := workloadSpreadDemo.DeepCopy() demo.Status.SubsetStatuses[0].MissingReplicas = -1 + demo.Status.VersionedSubsetStatuses = map[string][]appsv1alpha1.WorkloadSpreadSubsetStatus{ + VersionIgnored: demo.Status.SubsetStatuses, + } return demo }, getOperation: func() Operation { @@ -684,6 +788,9 @@ func TestWorkloadSpreadMutatingPod(t *testing.T) { workloadSpread := workloadSpreadDemo.DeepCopy() workloadSpread.ResourceVersion = "1" workloadSpread.Status.SubsetStatuses[0].MissingReplicas = -1 + workloadSpread.Status.VersionedSubsetStatuses = map[string][]appsv1alpha1.WorkloadSpreadSubsetStatus{ + VersionIgnored: workloadSpread.Status.SubsetStatuses, + } return workloadSpread }, }, @@ -696,6 +803,9 @@ func TestWorkloadSpreadMutatingPod(t *testing.T) { demo := workloadSpreadDemo.DeepCopy() demo.Status.SubsetStatuses[0].MissingReplicas = 4 demo.Status.SubsetStatuses[0].CreatingPods[podDemo.Name] = metav1.Time{Time: defaultTime} + demo.Status.VersionedSubsetStatuses = map[string][]appsv1alpha1.WorkloadSpreadSubsetStatus{ + VersionIgnored: demo.Status.SubsetStatuses, + } return demo }, getOperation: func() Operation { @@ -768,6 +878,9 @@ func TestWorkloadSpreadMutatingPod(t *testing.T) { workloadSpread.ResourceVersion = "1" workloadSpread.Status.SubsetStatuses[0].MissingReplicas = 4 workloadSpread.Status.SubsetStatuses[0].CreatingPods[podDemo.Name] = metav1.Time{Time: defaultTime} + workloadSpread.Status.VersionedSubsetStatuses = map[string][]appsv1alpha1.WorkloadSpreadSubsetStatus{ + VersionIgnored: workloadSpread.Status.SubsetStatuses, + } return workloadSpread }, }, @@ -781,6 +894,9 @@ func TestWorkloadSpreadMutatingPod(t *testing.T) { getWorkloadSpread: func() *appsv1alpha1.WorkloadSpread { demo := workloadSpreadDemo.DeepCopy() demo.Status.SubsetStatuses[0].MissingReplicas = 0 + demo.Status.VersionedSubsetStatuses = map[string][]appsv1alpha1.WorkloadSpreadSubsetStatus{ + VersionIgnored: demo.Status.SubsetStatuses, + } return demo }, getOperation: func() Operation { @@ -795,6 +911,9 @@ func TestWorkloadSpreadMutatingPod(t *testing.T) { workloadSpread := workloadSpreadDemo.DeepCopy() workloadSpread.Status.SubsetStatuses[0].MissingReplicas = 1 workloadSpread.Status.SubsetStatuses[0].DeletingPods[podDemo.Name] = metav1.Time{Time: defaultTime} + workloadSpread.Status.VersionedSubsetStatuses = map[string][]appsv1alpha1.WorkloadSpreadSubsetStatus{ + VersionIgnored: workloadSpread.Status.SubsetStatuses, + } return workloadSpread }, }, @@ -808,6 +927,9 @@ func TestWorkloadSpreadMutatingPod(t *testing.T) { getWorkloadSpread: func() *appsv1alpha1.WorkloadSpread { demo := workloadSpreadDemo.DeepCopy() demo.Status.SubsetStatuses[0].MissingReplicas = 0 + demo.Status.VersionedSubsetStatuses = map[string][]appsv1alpha1.WorkloadSpreadSubsetStatus{ + VersionIgnored: demo.Status.SubsetStatuses, + } return demo }, getOperation: func() Operation { @@ -822,6 +944,9 @@ func TestWorkloadSpreadMutatingPod(t *testing.T) { workloadSpread := workloadSpreadDemo.DeepCopy() workloadSpread.Status.SubsetStatuses[0].MissingReplicas = 1 workloadSpread.Status.SubsetStatuses[0].DeletingPods[podDemo.Name] = metav1.Time{Time: defaultTime} + workloadSpread.Status.VersionedSubsetStatuses = map[string][]appsv1alpha1.WorkloadSpreadSubsetStatus{ + VersionIgnored: workloadSpread.Status.SubsetStatuses, + } return workloadSpread }, }, @@ -835,6 +960,9 @@ func TestWorkloadSpreadMutatingPod(t *testing.T) { getWorkloadSpread: func() *appsv1alpha1.WorkloadSpread { demo := workloadSpreadDemo.DeepCopy() demo.Status.SubsetStatuses[0].MissingReplicas = 0 + demo.Status.VersionedSubsetStatuses = map[string][]appsv1alpha1.WorkloadSpreadSubsetStatus{ + VersionIgnored: demo.Status.SubsetStatuses, + } return demo }, getOperation: func() Operation { @@ -848,6 +976,9 @@ func TestWorkloadSpreadMutatingPod(t *testing.T) { expectWorkloadSpread: func() *appsv1alpha1.WorkloadSpread { workloadSpread := workloadSpreadDemo.DeepCopy() workloadSpread.Status.SubsetStatuses[0].MissingReplicas = 0 + workloadSpread.Status.VersionedSubsetStatuses = map[string][]appsv1alpha1.WorkloadSpreadSubsetStatus{ + VersionIgnored: workloadSpread.Status.SubsetStatuses, + } return workloadSpread }, }, @@ -861,6 +992,9 @@ func TestWorkloadSpreadMutatingPod(t *testing.T) { getWorkloadSpread: func() *appsv1alpha1.WorkloadSpread { demo := workloadSpreadDemo.DeepCopy() demo.Status.SubsetStatuses[0].MissingReplicas = 0 + demo.Status.VersionedSubsetStatuses = map[string][]appsv1alpha1.WorkloadSpreadSubsetStatus{ + VersionIgnored: demo.Status.SubsetStatuses, + } return demo }, getOperation: func() Operation { @@ -874,6 +1008,9 @@ func TestWorkloadSpreadMutatingPod(t *testing.T) { expectWorkloadSpread: func() *appsv1alpha1.WorkloadSpread { workloadSpread := workloadSpreadDemo.DeepCopy() workloadSpread.Status.SubsetStatuses[0].MissingReplicas = 0 + workloadSpread.Status.VersionedSubsetStatuses = map[string][]appsv1alpha1.WorkloadSpreadSubsetStatus{ + VersionIgnored: workloadSpread.Status.SubsetStatuses, + } return workloadSpread }, }, @@ -888,6 +1025,9 @@ func TestWorkloadSpreadMutatingPod(t *testing.T) { demo := workloadSpreadDemo.DeepCopy() demo.Status.SubsetStatuses[0].MissingReplicas = 1 demo.Status.SubsetStatuses[0].DeletingPods[podDemo.Name] = metav1.Time{Time: defaultTime} + demo.Status.VersionedSubsetStatuses = map[string][]appsv1alpha1.WorkloadSpreadSubsetStatus{ + VersionIgnored: demo.Status.SubsetStatuses, + } return demo }, getOperation: func() Operation { @@ -902,6 +1042,9 @@ func TestWorkloadSpreadMutatingPod(t *testing.T) { workloadSpread := workloadSpreadDemo.DeepCopy() workloadSpread.Status.SubsetStatuses[0].MissingReplicas = 1 workloadSpread.Status.SubsetStatuses[0].DeletingPods[podDemo.Name] = metav1.Time{Time: defaultTime} + workloadSpread.Status.VersionedSubsetStatuses = map[string][]appsv1alpha1.WorkloadSpreadSubsetStatus{ + VersionIgnored: workloadSpread.Status.SubsetStatuses, + } return workloadSpread }, }, @@ -915,6 +1058,9 @@ func TestWorkloadSpreadMutatingPod(t *testing.T) { getWorkloadSpread: func() *appsv1alpha1.WorkloadSpread { demo := workloadSpreadDemo.DeepCopy() demo.Status.SubsetStatuses[0].MissingReplicas = -1 + demo.Status.VersionedSubsetStatuses = map[string][]appsv1alpha1.WorkloadSpreadSubsetStatus{ + VersionIgnored: demo.Status.SubsetStatuses, + } return demo }, getOperation: func() Operation { @@ -928,6 +1074,9 @@ func TestWorkloadSpreadMutatingPod(t *testing.T) { expectWorkloadSpread: func() *appsv1alpha1.WorkloadSpread { workloadSpread := workloadSpreadDemo.DeepCopy() workloadSpread.Status.SubsetStatuses[0].MissingReplicas = -1 + workloadSpread.Status.VersionedSubsetStatuses = map[string][]appsv1alpha1.WorkloadSpreadSubsetStatus{ + VersionIgnored: workloadSpread.Status.SubsetStatuses, + } return workloadSpread }, }, @@ -964,8 +1113,8 @@ func TestWorkloadSpreadMutatingPod(t *testing.T) { t.Fatalf("getLatestWorkloadSpread failed: %s", err.Error()) } setWorkloadSpreadSubset(latestWS) - statusby1, _ := json.Marshal(latestWS.Status) - statusby2, _ := json.Marshal(cs.expectWorkloadSpread().Status) + statusby1, _ := json.Marshal(latestWS.Status.VersionedSubsetStatuses) + statusby2, _ := json.Marshal(cs.expectWorkloadSpread().Status.VersionedSubsetStatuses) if !reflect.DeepEqual(statusby1, statusby2) { fmt.Println(latestWS.Status) fmt.Println(cs.expectWorkloadSpread().Status) @@ -1334,6 +1483,442 @@ func TestGetParentNameAndOrdinal(t *testing.T) { } } +func TestInitializedSubsetStatuses(t *testing.T) { + cases := []struct { + name string + spread func() *appsv1alpha1.WorkloadSpread + workload func() client.Object + }{ + { + name: "workload not found", + spread: func() *appsv1alpha1.WorkloadSpread { + return workloadSpreadDemo2.DeepCopy() + }, + workload: func() client.Object { + return nil + }, + }, + { + name: "cloneSet with absolute number settings", + spread: func() *appsv1alpha1.WorkloadSpread { + return workloadSpreadDemo2.DeepCopy() + }, + workload: func() client.Object { + clone := cloneset.DeepCopy() + clone.Name = "workload" + clone.Spec.Replicas = utilpointer.Int32(5) + return clone + }, + }, + { + name: "cloneSet with percentage settings", + spread: func() *appsv1alpha1.WorkloadSpread { + spread := workloadSpreadDemo2.DeepCopy() + spread.Spec.Subsets[0].MaxReplicas = &intstr.IntOrString{Type: intstr.String, StrVal: "20%"} + spread.Spec.Subsets[1].MaxReplicas = &intstr.IntOrString{Type: intstr.String, StrVal: "30%"} + spread.Spec.Subsets[2].MaxReplicas = &intstr.IntOrString{Type: intstr.String, StrVal: "50%"} + spread.Status.SubsetStatuses[0].MissingReplicas = 2 + spread.Status.SubsetStatuses[1].MissingReplicas = 3 + spread.Status.SubsetStatuses[2].MissingReplicas = 5 + return spread + }, + workload: func() client.Object { + clone := cloneset.DeepCopy() + clone.Name = "workload" + clone.Spec.Replicas = utilpointer.Int32(10) + return clone + }, + }, + { + name: "deployment with percentage settings", + spread: func() *appsv1alpha1.WorkloadSpread { + spread := workloadSpreadDemo2.DeepCopy() + spread.Spec.TargetReference.Kind = "Deployment" + spread.Spec.TargetReference.APIVersion = "apps/v1" + spread.Spec.Subsets[0].MaxReplicas = &intstr.IntOrString{Type: intstr.String, StrVal: "20%"} + spread.Spec.Subsets[1].MaxReplicas = &intstr.IntOrString{Type: intstr.String, StrVal: "30%"} + spread.Spec.Subsets[2].MaxReplicas = &intstr.IntOrString{Type: intstr.String, StrVal: "50%"} + spread.Status.SubsetStatuses[0].MissingReplicas = 2 + spread.Status.SubsetStatuses[1].MissingReplicas = 3 + spread.Status.SubsetStatuses[2].MissingReplicas = 5 + return spread + }, + workload: func() client.Object { + clone := deployment.DeepCopy() + clone.Name = "workload" + clone.Spec.Replicas = utilpointer.Int32(10) + return clone + }, + }, + { + name: "Other CRD with percentage settings", + spread: func() *appsv1alpha1.WorkloadSpread { + spread := workloadSpreadDemo2.DeepCopy() + spread.Spec.TargetReference.Kind = "GameServerSet" + spread.Spec.TargetReference.APIVersion = "mock.kruise.io/v1" + spread.Spec.Subsets[0].MaxReplicas = &intstr.IntOrString{Type: intstr.String, StrVal: "20%"} + spread.Spec.Subsets[1].MaxReplicas = &intstr.IntOrString{Type: intstr.String, StrVal: "30%"} + spread.Spec.Subsets[2].MaxReplicas = &intstr.IntOrString{Type: intstr.String, StrVal: "50%"} + spread.Status.SubsetStatuses[0].MissingReplicas = 2 + spread.Status.SubsetStatuses[1].MissingReplicas = 3 + spread.Status.SubsetStatuses[2].MissingReplicas = 5 + return spread + }, + workload: func() client.Object { + clone := cloneset.DeepCopy() + clone.Name = "workload" + clone.Kind = "GameServerSet" + clone.APIVersion = "mock.kruise.io/v1" + clone.Spec.Replicas = utilpointer.Int32(10) + unstructuredMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(clone) + if err != nil { + panic("err when convert to unstructured object") + } + return &unstructured.Unstructured{Object: unstructuredMap} + }, + }, + } + + kruiseConfig := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: webhookutil.GetNamespace(), + Name: "kruise-configuration", + }, + Data: map[string]string{ + "WorkloadSpread_Watch_Custom_Workload_WhiteList": ` + { + "workloads": [ + { + "Group": "mock.kruise.io", + "Version": "v1", + "Kind": "GameServerSet", + "replicasPath": "spec.replicas", + "subResources": [ + { + "Group": "mock.kruise.io", + "Version": "v1", + "Kind": "GameServer" + } + ] + } + ] + }`, + }, + } + + for _, cs := range cases { + t.Run(cs.name, func(t *testing.T) { + builder := fake.NewClientBuilder().WithScheme(scheme).WithObjects(kruiseConfig.DeepCopy()) + if cs.workload() != nil { + builder = builder.WithObjects(cs.workload()) + } + spread := cs.spread() + handler := &Handler{builder.Build()} + result, err := handler.initializedSubsetStatuses(spread) + if err != nil { + t.Fatal(err.Error()) + } + if !reflect.DeepEqual(result, spread.Status.SubsetStatuses) { + t.Fatalf("expect %v, but got %v", spread.Status.SubsetStatuses, result) + } + }) + } +} + +func TestGetPodVersion(t *testing.T) { + cases := []struct { + name string + pod func() *corev1.Pod + version string + }{ + { + name: "short hash only", + pod: func() *corev1.Pod { + pod := podDemo2.DeepCopy() + pod.Labels = map[string]string{ + appsv1.ControllerRevisionHashLabelKey: "5474f59575", + } + return pod + }, + version: "5474f59575", + }, + { + name: "long hash only", + pod: func() *corev1.Pod { + pod := podDemo2.DeepCopy() + pod.Labels = map[string]string{ + appsv1.ControllerRevisionHashLabelKey: "workload-xyz-5474f59575", + } + return pod + }, + version: "5474f59575", + }, + { + name: "template hash only", + pod: func() *corev1.Pod { + pod := podDemo2.DeepCopy() + pod.Labels = map[string]string{ + appsv1.DefaultDeploymentUniqueLabelKey: "5474f59575", + } + return pod + }, + version: "5474f59575", + }, + { + name: "template hash and long hash", + pod: func() *corev1.Pod { + pod := podDemo2.DeepCopy() + pod.Labels = map[string]string{ + appsv1.ControllerRevisionHashLabelKey: "workload-xyz-5474f59575", + appsv1.DefaultDeploymentUniqueLabelKey: "5474f59575", + } + return pod + }, + version: "5474f59575", + }, + { + name: "ignored pod", + pod: func() *corev1.Pod { + pod := podDemo.DeepCopy() + pod.Labels = map[string]string{ + appsv1.ControllerRevisionHashLabelKey: "workload-xyz-5474f59575", + appsv1.DefaultDeploymentUniqueLabelKey: "version-1", + } + return pod + }, + version: VersionIgnored, + }, + } + + for _, cs := range cases { + t.Run(cs.name, func(t *testing.T) { + if version := GetPodVersion(cs.pod()); version != cs.version { + t.Fatalf("expect %v, but got %v", cs.version, version) + } + }) + } +} + +func TestGetWorkloadVersion(t *testing.T) { + restored := EnabledWorkloadSetForVersionedStatus + EnabledWorkloadSetForVersionedStatus = sets.NewString("deployment", "replicaset", "cloneset") + defer func() { + EnabledWorkloadSetForVersionedStatus = restored + }() + + cases := []struct { + name string + workload func() client.Object + subWorkloads func() []client.Object + version string + }{ + { + name: "replicaset", + workload: func() client.Object { + return &appsv1.ReplicaSet{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "apps/v1", + Kind: "ReplicaSet", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "workload", + Labels: map[string]string{ + appsv1.DefaultDeploymentUniqueLabelKey: "5474f59575", + }, + }, + } + }, + subWorkloads: func() []client.Object { + return []client.Object{} + }, + version: "5474f59575", + }, + { + name: "cloneset with consistent generation", + workload: func() client.Object { + return &appsv1alpha1.CloneSet{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "apps.kruise.io/v1alpha1", + Kind: "CloneSet", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "workload", + Generation: 5, + }, + Status: appsv1alpha1.CloneSetStatus{ + ObservedGeneration: 5, + UpdateRevision: "workload-5474f59575", + }, + } + }, + subWorkloads: func() []client.Object { + return []client.Object{} + }, + version: "5474f59575", + }, + { + name: "cloneset with inconsistent generation", + workload: func() client.Object { + return &appsv1alpha1.CloneSet{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "apps.kruise.io/v1alpha1", + Kind: "CloneSet", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "workload", + Generation: 6, + }, + Status: appsv1alpha1.CloneSetStatus{ + ObservedGeneration: 5, + UpdateRevision: "workload-5474f59575", + }, + } + }, + subWorkloads: func() []client.Object { + return []client.Object{} + }, + version: "", + }, + { + name: "deployment with latest rs", + workload: func() client.Object { + latestVersion := template.DeepCopy() + latestVersion.Labels["version"] = "v5" + return &appsv1.Deployment{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "apps/v1", + Kind: "Deployment", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "workload", + Namespace: "test", + UID: "workload-uid", + }, + Spec: appsv1.DeploymentSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "test", + }, + }, + Template: *latestVersion.DeepCopy(), + }, + } + }, + subWorkloads: func() []client.Object { + var objects []client.Object + for i := 1; i <= 5; i++ { + version := template.DeepCopy() + version.Labels["version"] = "v" + strconv.Itoa(i) + objects = append(objects, &appsv1.ReplicaSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "workload" + strconv.Itoa(i), + Namespace: "test", + Labels: map[string]string{ + "app": "test", + appsv1.DefaultDeploymentUniqueLabelKey: "version-" + strconv.Itoa(i), + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps/v1", + Kind: "Deployment", + Name: "workload", + UID: "workload-uid", + Controller: utilpointer.Bool(true), + }, + }, + }, + Spec: appsv1.ReplicaSetSpec{ + Template: *version.DeepCopy(), + }, + }) + } + return objects + }, + version: "version-5", + }, + { + name: "deployment without latest rs", + workload: func() client.Object { + latestVersion := template.DeepCopy() + latestVersion.Labels["version"] = "v5" + return &appsv1.Deployment{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "apps/v1", + Kind: "Deployment", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "workload", + Namespace: "test", + UID: "workload-uid", + }, + Spec: appsv1.DeploymentSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "test", + }, + }, + Template: *latestVersion.DeepCopy(), + }, + } + }, + subWorkloads: func() []client.Object { + var objects []client.Object + for i := 1; i <= 4; i++ { + version := template.DeepCopy() + version.Labels["version"] = "v" + strconv.Itoa(i) + objects = append(objects, &appsv1.ReplicaSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "workload" + strconv.Itoa(i), + Namespace: "test", + Labels: map[string]string{ + "app": "test", + appsv1.DefaultDeploymentUniqueLabelKey: "version-" + strconv.Itoa(i), + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps/v1", + Kind: "Deployment", + Name: "workload", + UID: "workload-uid", + Controller: utilpointer.Bool(true), + }, + }, + }, + Spec: appsv1.ReplicaSetSpec{ + Template: *version.DeepCopy(), + }, + }) + } + return objects + }, + version: "", + }, + { + name: "un-support workload", + workload: func() client.Object { + return advancedStatefulSet.DeepCopy() + }, + subWorkloads: func() []client.Object { + return []client.Object{} + }, + version: VersionIgnored, + }, + } + + for _, cs := range cases { + t.Run(cs.name, func(t *testing.T) { + builder := fake.NewClientBuilder().WithScheme(scheme) + for _, object := range cs.subWorkloads() { + builder.WithObjects(object) + } + fc := builder.Build() + if version, _ := GetWorkloadVersion(fc, cs.workload()); version != cs.version { + t.Fatalf("expect %v, but got %v", cs.version, version) + } + }) + } +} + func setWorkloadSpreadSubset(workloadSpread *appsv1alpha1.WorkloadSpread) { for i := range workloadSpread.Status.SubsetStatuses { subset := &workloadSpread.Status.SubsetStatuses[i] diff --git a/test/e2e/apps/workloadspread.go b/test/e2e/apps/workloadspread.go index 6c9bd59eb1..f9f514f7f2 100644 --- a/test/e2e/apps/workloadspread.go +++ b/test/e2e/apps/workloadspread.go @@ -25,6 +25,7 @@ import ( "github.com/onsi/ginkgo" "github.com/onsi/gomega" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -45,7 +46,7 @@ const WorkloadSpreadFakeZoneKey = "e2e.kruise.io/workloadspread-fake-zone" var ( KruiseKindCloneSet = appsv1alpha1.SchemeGroupVersion.WithKind("CloneSet") KruiseKindStatefulSet = appsv1alpha1.SchemeGroupVersion.WithKind("StatefulSet") - //controllerKindDep = appsv1.SchemeGroupVersion.WithKind("Deployment") + controllerKindDep = appsv1.SchemeGroupVersion.WithKind("Deployment") //controllerKindJob = batchv1.SchemeGroupVersion.WithKind("Job") ) @@ -57,6 +58,15 @@ var _ = SIGDescribe("workloadspread", func() { var ns string var tester *framework.WorkloadSpreadTester + IsKubernetesVersionLessThan122 := func() bool { + if v, err := c.Discovery().ServerVersion(); err != nil { + framework.Logf("Failed to discovery server version: %v", err) + } else if minor, err := strconv.Atoi(v.Minor); err != nil || minor < 22 { + return true + } + return false + } + ginkgo.BeforeEach(func() { ns = f.Namespace.Name c = f.ClientSet @@ -614,6 +624,7 @@ var _ = SIGDescribe("workloadspread", func() { workloadSpread, err = kc.AppsV1alpha1().WorkloadSpreads(workloadSpread.Namespace).Get(context.TODO(), workloadSpread.Name, metav1.GetOptions{}) gomega.Expect(err).NotTo(gomega.HaveOccurred()) + time.Sleep(2 * time.Second) gomega.Expect(workloadSpread.Status.SubsetStatuses[0].Name).To(gomega.Equal(workloadSpread.Spec.Subsets[0].Name)) gomega.Expect(workloadSpread.Status.SubsetStatuses[0].MissingReplicas).To(gomega.Equal(int32(0))) gomega.Expect(workloadSpread.Status.SubsetStatuses[0].Replicas).To(gomega.Equal(int32(2))) @@ -1714,208 +1725,210 @@ var _ = SIGDescribe("workloadspread", func() { // ginkgo.By("deploy in two zone, maxReplicas=50%, done") //}) - // test k8s cluster version >= 1.21 - //ginkgo.It("elastic deploy for deployment, zone-a=2, zone-b=nil", func() { - // deployment := tester.NewBaseDeployment(ns) - // // create workloadSpread - // targetRef := appsv1alpha1.TargetReference{ - // APIVersion: controllerKindDep.GroupVersion().String(), - // Kind: controllerKindDep.Kind, - // Name: deployment.Name, - // } - // subset1 := appsv1alpha1.WorkloadSpreadSubset{ - // Name: "zone-a", - // RequiredNodeSelectorTerm: &corev1.NodeSelectorTerm{ - // MatchExpressions: []corev1.NodeSelectorRequirement{ - // { - // Key: WorkloadSpreadFakeZoneKey, - // Operator: corev1.NodeSelectorOpIn, - // Values: []string{"zone-a"}, - // }, - // }, - // }, - // MaxReplicas: &intstr.IntOrString{Type: intstr.Int, IntVal: 2}, - // Patch: runtime.RawExtension{ - // Raw: []byte(`{"metadata":{"annotations":{"subset":"zone-a"}}}`), - // }, - // } - // subset2 := appsv1alpha1.WorkloadSpreadSubset{ - // Name: "zone-b", - // RequiredNodeSelectorTerm: &corev1.NodeSelectorTerm{ - // MatchExpressions: []corev1.NodeSelectorRequirement{ - // { - // Key: WorkloadSpreadFakeZoneKey, - // Operator: corev1.NodeSelectorOpIn, - // Values: []string{"zone-b"}, - // }, - // }, - // }, - // MaxReplicas: nil, - // Patch: runtime.RawExtension{ - // Raw: []byte(`{"metadata":{"annotations":{"subset":"zone-b"}}}`), - // }, - // } - // workloadSpread := tester.NewWorkloadSpread(ns, workloadSpreadName, &targetRef, []appsv1alpha1.WorkloadSpreadSubset{subset1, subset2}) - // workloadSpread = tester.CreateWorkloadSpread(workloadSpread) - // - // // create deployment, replicas = 2 - // deployment = tester.CreateDeployment(deployment) - // tester.WaitForDeploymentRunning(deployment) - // - // // get pods, and check workloadSpread - // ginkgo.By(fmt.Sprintf("get deployment(%s/%s) pods, and check workloadSpread(%s/%s) status", deployment.Namespace, deployment.Name, workloadSpread.Namespace, workloadSpread.Name)) - // pods, err := tester.GetSelectorPods(deployment.Namespace, deployment.Spec.Selector) - // gomega.Expect(err).NotTo(gomega.HaveOccurred()) - // gomega.Expect(pods).To(gomega.HaveLen(2)) - // subset1Pods := 0 - // subset2Pods := 0 - // for _, pod := range pods { - // if str, ok := pod.Annotations[workloadspread.MatchedWorkloadSpreadSubsetAnnotations]; ok { - // var injectWorkloadSpread *workloadspread.InjectWorkloadSpread - // err := json.Unmarshal([]byte(str), &injectWorkloadSpread) - // gomega.Expect(err).NotTo(gomega.HaveOccurred()) - // if injectWorkloadSpread.Subset == subset1.Name { - // subset1Pods++ - // gomega.Expect(injectWorkloadSpread.Name).To(gomega.Equal(workloadSpread.Name)) - // gomega.Expect(pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions).To(gomega.Equal(subset1.RequiredNodeSelectorTerm.MatchExpressions)) - // gomega.Expect(pod.Annotations[workloadspread.PodDeletionCostAnnotation]).To(gomega.Equal("200")) - // } else if injectWorkloadSpread.Subset == subset2.Name { - // subset2Pods++ - // gomega.Expect(injectWorkloadSpread.Name).To(gomega.Equal(workloadSpread.Name)) - // gomega.Expect(pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions).To(gomega.Equal(subset2.RequiredNodeSelectorTerm.MatchExpressions)) - // gomega.Expect(pod.Annotations[workloadspread.PodDeletionCostAnnotation]).To(gomega.Equal("100")) - // } - // } else { - // // others PodDeletionCostAnnotation not set - // gomega.Expect(pod.Annotations[workloadspread.PodDeletionCostAnnotation]).To(gomega.Equal("")) - // } - // } - // gomega.Expect(subset1Pods).To(gomega.Equal(2)) - // gomega.Expect(subset2Pods).To(gomega.Equal(0)) - // - // //scale up deployment.replicas = 6 - // ginkgo.By(fmt.Sprintf("scale up deployment(%s/%s) replicas=6", deployment.Namespace, deployment.Name)) - // deployment.Spec.Replicas = pointer.Int32Ptr(6) - // tester.UpdateDeployment(deployment) - // tester.WaitForDeploymentRunning(deployment) - // - // // get pods, and check workloadSpread - // ginkgo.By(fmt.Sprintf("get deployment(%s/%s) pods, and check workloadSpread(%s/%s) status", deployment.Namespace, deployment.Name, workloadSpread.Namespace, workloadSpread.Name)) - // pods, err = tester.GetSelectorPods(deployment.Namespace, deployment.Spec.Selector) - // gomega.Expect(err).NotTo(gomega.HaveOccurred()) - // gomega.Expect(pods).To(gomega.HaveLen(6)) - // subset1Pods = 0 - // subset2Pods = 0 - // for _, pod := range pods { - // if str, ok := pod.Annotations[workloadspread.MatchedWorkloadSpreadSubsetAnnotations]; ok { - // var injectWorkloadSpread *workloadspread.InjectWorkloadSpread - // err := json.Unmarshal([]byte(str), &injectWorkloadSpread) - // gomega.Expect(err).NotTo(gomega.HaveOccurred()) - // if injectWorkloadSpread.Subset == subset1.Name { - // subset1Pods++ - // gomega.Expect(injectWorkloadSpread.Name).To(gomega.Equal(workloadSpread.Name)) - // gomega.Expect(pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions).To(gomega.Equal(subset1.RequiredNodeSelectorTerm.MatchExpressions)) - // gomega.Expect(pod.Annotations[workloadspread.PodDeletionCostAnnotation]).To(gomega.Equal("200")) - // } else if injectWorkloadSpread.Subset == subset2.Name { - // subset2Pods++ - // gomega.Expect(injectWorkloadSpread.Name).To(gomega.Equal(workloadSpread.Name)) - // gomega.Expect(pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions).To(gomega.Equal(subset2.RequiredNodeSelectorTerm.MatchExpressions)) - // gomega.Expect(pod.Annotations[workloadspread.PodDeletionCostAnnotation]).To(gomega.Equal("100")) - // } - // } else { - // // others PodDeletionCostAnnotation not set - // gomega.Expect(pod.Annotations[workloadspread.PodDeletionCostAnnotation]).To(gomega.Equal("")) - // } - // } - // gomega.Expect(subset1Pods).To(gomega.Equal(2)) - // gomega.Expect(subset2Pods).To(gomega.Equal(4)) - // - // workloadSpread, err = kc.AppsV1alpha1().WorkloadSpreads(workloadSpread.Namespace).Get(workloadSpread.Name, metav1.GetOptions{}) - // gomega.Expect(err).NotTo(gomega.HaveOccurred()) - // - // gomega.Expect(workloadSpread.Status.SubsetStatuses[0].Name).To(gomega.Equal(workloadSpread.Spec.Subsets[0].Name)) - // gomega.Expect(workloadSpread.Status.SubsetStatuses[0].MissingReplicas).To(gomega.Equal(int32(0))) - // gomega.Expect(len(workloadSpread.Status.SubsetStatuses[0].CreatingPods)).To(gomega.Equal(0)) - // gomega.Expect(len(workloadSpread.Status.SubsetStatuses[0].DeletingPods)).To(gomega.Equal(0)) - // - // // update deployment image - // ginkgo.By(fmt.Sprintf("update deployment(%s/%s) image=%s", deployment.Namespace, deployment.Name, NewWebserverImage)) - // deployment.Spec.Template.Spec.Containers[0].Image = NewWebserverImage - // tester.UpdateDeployment(deployment) - // tester.WaitForDeploymentRunning(deployment) - // - // // get pods, and check workloadSpread - // ginkgo.By(fmt.Sprintf("get deployment(%s/%s) pods, and check workloadSpread(%s/%s) status", deployment.Namespace, deployment.Name, workloadSpread.Namespace, workloadSpread.Name)) - // pods, err = tester.GetSelectorPods(deployment.Namespace, deployment.Spec.Selector) - // gomega.Expect(err).NotTo(gomega.HaveOccurred()) - // gomega.Expect(pods).To(gomega.HaveLen(6)) - // subset1Pods = 0 - // subset2Pods = 0 - // for _, pod := range pods { - // if str, ok := pod.Annotations[workloadspread.MatchedWorkloadSpreadSubsetAnnotations]; ok { - // var injectWorkloadSpread *workloadspread.InjectWorkloadSpread - // err := json.Unmarshal([]byte(str), &injectWorkloadSpread) - // gomega.Expect(err).NotTo(gomega.HaveOccurred()) - // if injectWorkloadSpread.Subset == subset1.Name { - // subset1Pods++ - // gomega.Expect(injectWorkloadSpread.Name).To(gomega.Equal(workloadSpread.Name)) - // gomega.Expect(pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions).To(gomega.Equal(subset1.RequiredNodeSelectorTerm.MatchExpressions)) - // gomega.Expect(pod.Annotations[workloadspread.PodDeletionCostAnnotation]).To(gomega.Equal("200")) - // } else if injectWorkloadSpread.Subset == subset2.Name { - // subset2Pods++ - // gomega.Expect(injectWorkloadSpread.Name).To(gomega.Equal(workloadSpread.Name)) - // gomega.Expect(pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions).To(gomega.Equal(subset2.RequiredNodeSelectorTerm.MatchExpressions)) - // gomega.Expect(pod.Annotations[workloadspread.PodDeletionCostAnnotation]).To(gomega.Equal("100")) - // } - // } else { - // // others PodDeletionCostAnnotation not set - // gomega.Expect(pod.Annotations[workloadspread.PodDeletionCostAnnotation]).To(gomega.Equal("")) - // } - // } - // gomega.Expect(subset1Pods).To(gomega.Equal(2)) - // gomega.Expect(subset2Pods).To(gomega.Equal(4)) - // - // //scale down deployment.replicas = 2 - // ginkgo.By(fmt.Sprintf("scale down deployment(%s/%s) replicas=2", deployment.Namespace, deployment.Name)) - // deployment.Spec.Replicas = pointer.Int32Ptr(2) - // tester.UpdateDeployment(deployment) - // tester.WaitForDeploymentRunning(deployment) - // - // time.Sleep(10 * time.Minute) - // - // // get pods, and check workloadSpread - // ginkgo.By(fmt.Sprintf("get deployment(%s/%s) pods, and check workloadSpread(%s/%s) status", deployment.Namespace, deployment.Name, workloadSpread.Namespace, workloadSpread.Name)) - // pods, err = tester.GetSelectorPods(deployment.Namespace, deployment.Spec.Selector) - // gomega.Expect(err).NotTo(gomega.HaveOccurred()) - // gomega.Expect(pods).To(gomega.HaveLen(2)) - // subset1Pods = 0 - // subset2Pods = 0 - // for _, pod := range pods { - // if str, ok := pod.Annotations[workloadspread.MatchedWorkloadSpreadSubsetAnnotations]; ok { - // var injectWorkloadSpread *workloadspread.InjectWorkloadSpread - // err := json.Unmarshal([]byte(str), &injectWorkloadSpread) - // gomega.Expect(err).NotTo(gomega.HaveOccurred()) - // if injectWorkloadSpread.Subset == subset1.Name { - // subset1Pods++ - // gomega.Expect(injectWorkloadSpread.Name).To(gomega.Equal(workloadSpread.Name)) - // gomega.Expect(pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions).To(gomega.Equal(subset1.RequiredNodeSelectorTerm.MatchExpressions)) - // gomega.Expect(pod.Annotations[workloadspread.PodDeletionCostAnnotation]).To(gomega.Equal("200")) - // } else if injectWorkloadSpread.Subset == subset2.Name { - // subset2Pods++ - // gomega.Expect(injectWorkloadSpread.Name).To(gomega.Equal(workloadSpread.Name)) - // gomega.Expect(pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions).To(gomega.Equal(subset2.RequiredNodeSelectorTerm.MatchExpressions)) - // gomega.Expect(pod.Annotations[workloadspread.PodDeletionCostAnnotation]).To(gomega.Equal("100")) - // } - // } else { - // // others PodDeletionCostAnnotation not set - // gomega.Expect(pod.Annotations[workloadspread.PodDeletionCostAnnotation]).To(gomega.Equal("")) - // } - // } - // gomega.Expect(subset1Pods).To(gomega.Equal(2)) - // gomega.Expect(subset2Pods).To(gomega.Equal(0)) - // - // ginkgo.By("elastic deploy for deployment, zone-a=2, zone-b=nil, done") - //}) + //test k8s cluster version >= 1.21 + ginkgo.It("elastic deploy for deployment, zone-a=2, zone-b=nil", func() { + if IsKubernetesVersionLessThan122() { + ginkgo.Skip("kip this e2e case, it can only run on K8s >= 1.22") + } + deployment := tester.NewBaseDeployment(ns) + // create workloadSpread + targetRef := appsv1alpha1.TargetReference{ + APIVersion: controllerKindDep.GroupVersion().String(), + Kind: controllerKindDep.Kind, + Name: deployment.Name, + } + subset1 := appsv1alpha1.WorkloadSpreadSubset{ + Name: "zone-a", + RequiredNodeSelectorTerm: &corev1.NodeSelectorTerm{ + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: WorkloadSpreadFakeZoneKey, + Operator: corev1.NodeSelectorOpIn, + Values: []string{"zone-a"}, + }, + }, + }, + MaxReplicas: &intstr.IntOrString{Type: intstr.Int, IntVal: 2}, + Patch: runtime.RawExtension{ + Raw: []byte(`{"metadata":{"annotations":{"subset":"zone-a"}}}`), + }, + } + subset2 := appsv1alpha1.WorkloadSpreadSubset{ + Name: "zone-b", + RequiredNodeSelectorTerm: &corev1.NodeSelectorTerm{ + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: WorkloadSpreadFakeZoneKey, + Operator: corev1.NodeSelectorOpIn, + Values: []string{"zone-b"}, + }, + }, + }, + MaxReplicas: nil, + Patch: runtime.RawExtension{ + Raw: []byte(`{"metadata":{"annotations":{"subset":"zone-b"}}}`), + }, + } + workloadSpread := tester.NewWorkloadSpread(ns, workloadSpreadName, &targetRef, []appsv1alpha1.WorkloadSpreadSubset{subset1, subset2}) + workloadSpread = tester.CreateWorkloadSpread(workloadSpread) + tester.WaitForWorkloadSpreadRunning(workloadSpread) + + // create deployment, replicas = 2 + deployment = tester.CreateDeployment(deployment) + tester.WaitForDeploymentRunning(deployment) + + // get pods, and check workloadSpread + ginkgo.By(fmt.Sprintf("get deployment(%s/%s) pods, and check workloadSpread(%s/%s) status", deployment.Namespace, deployment.Name, workloadSpread.Namespace, workloadSpread.Name)) + pods, err := tester.GetSelectorPods(deployment.Namespace, deployment.Spec.Selector) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(pods).To(gomega.HaveLen(2)) + subset1Pods := 0 + subset2Pods := 0 + for _, pod := range pods { + if str, ok := pod.Annotations[workloadspread.MatchedWorkloadSpreadSubsetAnnotations]; ok { + var injectWorkloadSpread *workloadspread.InjectWorkloadSpread + err := json.Unmarshal([]byte(str), &injectWorkloadSpread) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + if injectWorkloadSpread.Subset == subset1.Name { + subset1Pods++ + gomega.Expect(injectWorkloadSpread.Name).To(gomega.Equal(workloadSpread.Name)) + gomega.Expect(pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions).To(gomega.Equal(subset1.RequiredNodeSelectorTerm.MatchExpressions)) + gomega.Expect(pod.Annotations[workloadspread.PodDeletionCostAnnotation]).To(gomega.Equal("200")) + } else if injectWorkloadSpread.Subset == subset2.Name { + subset2Pods++ + gomega.Expect(injectWorkloadSpread.Name).To(gomega.Equal(workloadSpread.Name)) + gomega.Expect(pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions).To(gomega.Equal(subset2.RequiredNodeSelectorTerm.MatchExpressions)) + gomega.Expect(pod.Annotations[workloadspread.PodDeletionCostAnnotation]).To(gomega.Equal("100")) + } + } else { + // others PodDeletionCostAnnotation not set + gomega.Expect(pod.Annotations[workloadspread.PodDeletionCostAnnotation]).To(gomega.Equal("")) + } + } + gomega.Expect(subset1Pods).To(gomega.Equal(2)) + gomega.Expect(subset2Pods).To(gomega.Equal(0)) + + //scale up deployment.replicas = 6 + ginkgo.By(fmt.Sprintf("scale up deployment(%s/%s) replicas=6", deployment.Namespace, deployment.Name)) + deployment.Spec.Replicas = pointer.Int32Ptr(6) + tester.UpdateDeployment(deployment) + tester.WaitForDeploymentRunning(deployment) + + // get pods, and check workloadSpread + ginkgo.By(fmt.Sprintf("get deployment(%s/%s) pods, and check workloadSpread(%s/%s) status", deployment.Namespace, deployment.Name, workloadSpread.Namespace, workloadSpread.Name)) + pods, err = tester.GetSelectorPods(deployment.Namespace, deployment.Spec.Selector) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(pods).To(gomega.HaveLen(6)) + subset1Pods = 0 + subset2Pods = 0 + for _, pod := range pods { + if str, ok := pod.Annotations[workloadspread.MatchedWorkloadSpreadSubsetAnnotations]; ok { + var injectWorkloadSpread *workloadspread.InjectWorkloadSpread + err := json.Unmarshal([]byte(str), &injectWorkloadSpread) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + if injectWorkloadSpread.Subset == subset1.Name { + subset1Pods++ + gomega.Expect(injectWorkloadSpread.Name).To(gomega.Equal(workloadSpread.Name)) + gomega.Expect(pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions).To(gomega.Equal(subset1.RequiredNodeSelectorTerm.MatchExpressions)) + gomega.Expect(pod.Annotations[workloadspread.PodDeletionCostAnnotation]).To(gomega.Equal("200")) + } else if injectWorkloadSpread.Subset == subset2.Name { + subset2Pods++ + gomega.Expect(injectWorkloadSpread.Name).To(gomega.Equal(workloadSpread.Name)) + gomega.Expect(pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions).To(gomega.Equal(subset2.RequiredNodeSelectorTerm.MatchExpressions)) + gomega.Expect(pod.Annotations[workloadspread.PodDeletionCostAnnotation]).To(gomega.Equal("100")) + } + } else { + // others PodDeletionCostAnnotation not set + gomega.Expect(pod.Annotations[workloadspread.PodDeletionCostAnnotation]).To(gomega.Equal("")) + } + } + gomega.Expect(subset1Pods).To(gomega.Equal(2)) + gomega.Expect(subset2Pods).To(gomega.Equal(4)) + + workloadSpread, err = kc.AppsV1alpha1().WorkloadSpreads(workloadSpread.Namespace).Get(context.TODO(), workloadSpread.Name, metav1.GetOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + gomega.Expect(workloadSpread.Status.SubsetStatuses[0].Name).To(gomega.Equal(workloadSpread.Spec.Subsets[0].Name)) + gomega.Expect(workloadSpread.Status.SubsetStatuses[0].MissingReplicas).To(gomega.Equal(int32(0))) + gomega.Expect(len(workloadSpread.Status.SubsetStatuses[0].CreatingPods)).To(gomega.Equal(0)) + gomega.Expect(len(workloadSpread.Status.SubsetStatuses[0].DeletingPods)).To(gomega.Equal(0)) + + // update deployment image + ginkgo.By(fmt.Sprintf("update deployment(%s/%s) image=%s", deployment.Namespace, deployment.Name, NewWebserverImage)) + deployment.Spec.Template.Spec.Containers[0].Image = NewWebserverImage + tester.UpdateDeployment(deployment) + tester.WaitForDeploymentRunning(deployment) + + // get pods, and check workloadSpread + ginkgo.By(fmt.Sprintf("get deployment(%s/%s) pods, and check workloadSpread(%s/%s) status", deployment.Namespace, deployment.Name, workloadSpread.Namespace, workloadSpread.Name)) + pods, err = tester.GetSelectorPods(deployment.Namespace, deployment.Spec.Selector) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(pods).To(gomega.HaveLen(6)) + subset1Pods = 0 + subset2Pods = 0 + for _, pod := range pods { + if str, ok := pod.Annotations[workloadspread.MatchedWorkloadSpreadSubsetAnnotations]; ok { + var injectWorkloadSpread *workloadspread.InjectWorkloadSpread + err := json.Unmarshal([]byte(str), &injectWorkloadSpread) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + if injectWorkloadSpread.Subset == subset1.Name { + subset1Pods++ + gomega.Expect(injectWorkloadSpread.Name).To(gomega.Equal(workloadSpread.Name)) + gomega.Expect(pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions).To(gomega.Equal(subset1.RequiredNodeSelectorTerm.MatchExpressions)) + gomega.Expect(pod.Annotations[workloadspread.PodDeletionCostAnnotation]).To(gomega.Equal("200")) + } else if injectWorkloadSpread.Subset == subset2.Name { + subset2Pods++ + gomega.Expect(injectWorkloadSpread.Name).To(gomega.Equal(workloadSpread.Name)) + gomega.Expect(pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions).To(gomega.Equal(subset2.RequiredNodeSelectorTerm.MatchExpressions)) + gomega.Expect(pod.Annotations[workloadspread.PodDeletionCostAnnotation]).To(gomega.Equal("100")) + } + } else { + // others PodDeletionCostAnnotation not set + gomega.Expect(pod.Annotations[workloadspread.PodDeletionCostAnnotation]).To(gomega.Equal("")) + } + } + gomega.Expect(subset1Pods).To(gomega.Equal(2)) + gomega.Expect(subset2Pods).To(gomega.Equal(4)) + + //scale down deployment.replicas = 2 + ginkgo.By(fmt.Sprintf("scale down deployment(%s/%s) replicas=2", deployment.Namespace, deployment.Name)) + deployment.Spec.Replicas = pointer.Int32Ptr(2) + tester.UpdateDeployment(deployment) + tester.WaitForDeploymentRunning(deployment) + + // get pods, and check workloadSpread + ginkgo.By(fmt.Sprintf("get deployment(%s/%s) pods, and check workloadSpread(%s/%s) status", deployment.Namespace, deployment.Name, workloadSpread.Namespace, workloadSpread.Name)) + pods, err = tester.GetSelectorPods(deployment.Namespace, deployment.Spec.Selector) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(pods).To(gomega.HaveLen(2)) + subset1Pods = 0 + subset2Pods = 0 + for _, pod := range pods { + if str, ok := pod.Annotations[workloadspread.MatchedWorkloadSpreadSubsetAnnotations]; ok { + var injectWorkloadSpread *workloadspread.InjectWorkloadSpread + err := json.Unmarshal([]byte(str), &injectWorkloadSpread) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + if injectWorkloadSpread.Subset == subset1.Name { + subset1Pods++ + gomega.Expect(injectWorkloadSpread.Name).To(gomega.Equal(workloadSpread.Name)) + gomega.Expect(pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions).To(gomega.Equal(subset1.RequiredNodeSelectorTerm.MatchExpressions)) + gomega.Expect(pod.Annotations[workloadspread.PodDeletionCostAnnotation]).To(gomega.Equal("200")) + } else if injectWorkloadSpread.Subset == subset2.Name { + subset2Pods++ + gomega.Expect(injectWorkloadSpread.Name).To(gomega.Equal(workloadSpread.Name)) + gomega.Expect(pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions).To(gomega.Equal(subset2.RequiredNodeSelectorTerm.MatchExpressions)) + gomega.Expect(pod.Annotations[workloadspread.PodDeletionCostAnnotation]).To(gomega.Equal("100")) + } + } else { + // others PodDeletionCostAnnotation not set + gomega.Expect(pod.Annotations[workloadspread.PodDeletionCostAnnotation]).To(gomega.Equal("")) + } + } + gomega.Expect(subset1Pods).To(gomega.Equal(2)) + gomega.Expect(subset2Pods).To(gomega.Equal(0)) + + ginkgo.By("elastic deploy for deployment, zone-a=2, zone-b=nil, done") + }) //ginkgo.It("deploy for job, zone-a=1, zone-b=nil", func() { // job := tester.NewBaseJob(ns) @@ -2024,6 +2037,5 @@ var _ = SIGDescribe("workloadspread", func() { // // ginkgo.By("workloadSpread for job, done") //}) - }) })