diff --git a/apis/apps/defaults/v1alpha1.go b/apis/apps/defaults/v1alpha1.go index ea0991c1d8..8673b62556 100644 --- a/apis/apps/defaults/v1alpha1.go +++ b/apis/apps/defaults/v1alpha1.go @@ -225,6 +225,25 @@ func SetDefaultsUnitedDeployment(obj *v1alpha1.UnitedDeployment, injectTemplateD } } } + + hasReplicasSettings := false + hasCapacitySettings := false + for _, subset := range obj.Spec.Topology.Subsets { + if subset.Replicas != nil { + hasReplicasSettings = true + } + if subset.MinReplicas != nil || subset.MaxReplicas != nil { + hasCapacitySettings = true + } + } + if hasCapacitySettings && !hasReplicasSettings { + for i := range obj.Spec.Topology.Subsets { + subset := &obj.Spec.Topology.Subsets[i] + if subset.MinReplicas == nil { + subset.MinReplicas = &intstr.IntOrString{Type: intstr.Int, IntVal: 0} + } + } + } } // SetDefaults_CloneSet set default values for CloneSet. diff --git a/apis/apps/v1alpha1/uniteddeployment_types.go b/apis/apps/v1alpha1/uniteddeployment_types.go index c441f3b38e..1e4c18c947 100644 --- a/apis/apps/v1alpha1/uniteddeployment_types.go +++ b/apis/apps/v1alpha1/uniteddeployment_types.go @@ -189,8 +189,26 @@ type Subset struct { // percentage like '10%', which means 10% of UnitedDeployment replicas of pods will be distributed // under this subset. If nil, the number of replicas in this subset is determined by controller. // Controller will try to keep all the subsets with nil replicas have average pods. + // Replicas and MinReplicas/MaxReplicas are mutually exclusive in a UnitedDeployment. // +optional Replicas *intstr.IntOrString `json:"replicas,omitempty"` + + // Indicates the lower bounded replicas of the subset. + // MinReplicas must be more than or equal to 0 if it is set. + // Controller will prioritize satisfy minReplicas for each subset + // according to the order of Topology.Subsets. + // Defaults to 0. + // +optional + MinReplicas *intstr.IntOrString `json:"minReplicas,omitempty"` + + // Indicates the upper bounded replicas of the subset. + // MaxReplicas must be more than or equal to MinReplicas. + // MaxReplicas == nil means no limitation. + // Please ensure that at least one subset has empty MaxReplicas(no limitation) to avoid stuck scaling. + // Defaults to nil. + // +optional + MaxReplicas *intstr.IntOrString `json:"maxReplicas,omitempty"` + // Patch indicates patching to the templateSpec. // Patch takes precedence over other fields // If the Patch also modifies the Replicas, NodeSelectorTerm or Tolerations, use value in the Patch diff --git a/apis/apps/v1alpha1/zz_generated.deepcopy.go b/apis/apps/v1alpha1/zz_generated.deepcopy.go index 2652f045df..55d1a2cfba 100644 --- a/apis/apps/v1alpha1/zz_generated.deepcopy.go +++ b/apis/apps/v1alpha1/zz_generated.deepcopy.go @@ -3170,6 +3170,16 @@ func (in *Subset) DeepCopyInto(out *Subset) { *out = new(intstr.IntOrString) **out = **in } + if in.MinReplicas != nil { + in, out := &in.MinReplicas, &out.MinReplicas + *out = new(intstr.IntOrString) + **out = **in + } + if in.MaxReplicas != nil { + in, out := &in.MaxReplicas, &out.MaxReplicas + *out = new(intstr.IntOrString) + **out = **in + } in.Patch.DeepCopyInto(&out.Patch) } diff --git a/config/crd/bases/apps.kruise.io_uniteddeployments.yaml b/config/crd/bases/apps.kruise.io_uniteddeployments.yaml index 1361511840..d47ea85582 100644 --- a/config/crd/bases/apps.kruise.io_uniteddeployments.yaml +++ b/config/crd/bases/apps.kruise.io_uniteddeployments.yaml @@ -975,6 +975,26 @@ spec: items: description: Subset defines the detail of a subset. properties: + maxReplicas: + anyOf: + - type: integer + - type: string + description: Indicates the upper bounded replicas of the + subset. MaxReplicas must be more than or equal to MinReplicas. + MaxReplicas == nil means no limitation. Please ensure + that at least one subset has empty MaxReplicas(no limitation) + to avoid stuck scaling. Defaults to nil. + x-kubernetes-int-or-string: true + minReplicas: + anyOf: + - type: integer + - type: string + description: Indicates the lower bounded replicas of the + subset. MinReplicas must be more than or equal to 0 if + it is set. Controller will prioritize satisfy minReplicas + for each subset according to the order of Topology.Subsets. + Defaults to 0. + x-kubernetes-int-or-string: true name: description: Indicates subset name as a DNS_LABEL, which will be used to generate subset workload name prefix in @@ -1072,7 +1092,8 @@ spec: pods will be distributed under this subset. If nil, the number of replicas in this subset is determined by controller. Controller will try to keep all the subsets with nil replicas - have average pods. + have average pods. Replicas and MinReplicas/MaxReplicas + are mutually exclusive in a UnitedDeployment. x-kubernetes-int-or-string: true tolerations: description: Indicates the tolerations the pods under this diff --git a/pkg/controller/uniteddeployment/allocator.go b/pkg/controller/uniteddeployment/allocator.go index 0c0abea2d9..177da918f3 100644 --- a/pkg/controller/uniteddeployment/allocator.go +++ b/pkg/controller/uniteddeployment/allocator.go @@ -22,6 +22,7 @@ import ( "strings" "k8s.io/klog/v2" + "k8s.io/utils/integer" appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" ) @@ -54,33 +55,43 @@ func (n subsetInfos) Swap(i, j int) { n[i], n[j] = n[j], n[i] } -// GetAllocatedReplicas returns a mapping from subset to next replicas. -// Next replicas is allocated by replicasAllocator, which will consider the current replicas of each subset and -// new replicas indicated from UnitedDeployment.Spec.Topology.Subsets. -func GetAllocatedReplicas(nameToSubset *map[string]*Subset, ud *appsv1alpha1.UnitedDeployment) (*map[string]int32, error) { - subsetInfos := getSubsetInfos(nameToSubset, ud) +type ReplicaAllocator interface { + Alloc(nameToSubset *map[string]*Subset) (*map[string]int32, error) +} - var expectedReplicas int32 = -1 - if ud.Spec.Replicas != nil { - expectedReplicas = *ud.Spec.Replicas +func NewReplicaAllocator(ud *appsv1alpha1.UnitedDeployment) ReplicaAllocator { + for _, subset := range ud.Spec.Topology.Subsets { + if subset.MinReplicas != nil || subset.MaxReplicas != nil { + return &elasticAllocator{ud} + } } - - specifiedReplicas := getSpecifiedSubsetReplicas(expectedReplicas, ud) - klog.V(4).Infof("UnitedDeployment %s/%s specifiedReplicas: %v", ud.Namespace, ud.Name, specifiedReplicas) - // call SortToAllocator to sort all subset by subset.Replicas in order of increment - return subsetInfos.SortToAllocator().AllocateReplicas(expectedReplicas, specifiedReplicas) + return &specificAllocator{UnitedDeployment: ud} } -func (n subsetInfos) SortToAllocator() *replicasAllocator { - sort.Sort(n) - return &replicasAllocator{subsets: &n} +type specificAllocator struct { + *appsv1alpha1.UnitedDeployment + subsets *subsetInfos } -type replicasAllocator struct { - subsets *subsetInfos +// Alloc returns a mapping from subset to next replicas. +// Next replicas is allocated by realReplicasAllocator, which will consider the current replicas of each subset and +// new replicas indicated from UnitedDeployment.Spec.Topology.Subsets. +func (s *specificAllocator) Alloc(nameToSubset *map[string]*Subset) (*map[string]int32, error) { + // SortToAllocator to sort all subset by subset.Replicas in order of increment + s.subsets = getSubsetInfos(nameToSubset, s.UnitedDeployment) + sort.Sort(s.subsets) + + var expectedReplicas int32 = -1 + if s.Spec.Replicas != nil { + expectedReplicas = *s.Spec.Replicas + } + + specifiedReplicas := getSpecifiedSubsetReplicas(expectedReplicas, s.UnitedDeployment) + klog.V(4).Infof("UnitedDeployment %s/%s specifiedReplicas: %v", s.Namespace, s.Name, specifiedReplicas) + return s.AllocateReplicas(expectedReplicas, specifiedReplicas) } -func (s *replicasAllocator) validateReplicas(replicas int32, subsetReplicasLimits *map[string]int32) error { +func (s *specificAllocator) validateReplicas(replicas int32, subsetReplicasLimits *map[string]int32) error { if subsetReplicasLimits == nil { return nil } @@ -150,7 +161,7 @@ func getSubsetInfos(nameToSubset *map[string]*Subset, ud *appsv1alpha1.UnitedDep // AllocateReplicas will first try to check the specifiedSubsetReplicas is valid or not. // If valid , normalAllocate will be called. It will apply these specified replicas, then average the rest replicas to left unspecified subsets. // If not, it will return error -func (s *replicasAllocator) AllocateReplicas(replicas int32, specifiedSubsetReplicas *map[string]int32) ( +func (s *specificAllocator) AllocateReplicas(replicas int32, specifiedSubsetReplicas *map[string]int32) ( *map[string]int32, error) { if err := s.validateReplicas(replicas, specifiedSubsetReplicas); err != nil { return nil, err @@ -159,7 +170,7 @@ func (s *replicasAllocator) AllocateReplicas(replicas int32, specifiedSubsetRepl return s.normalAllocate(replicas, specifiedSubsetReplicas), nil } -func (s *replicasAllocator) normalAllocate(expectedReplicas int32, specifiedSubsetReplicas *map[string]int32) *map[string]int32 { +func (s *specificAllocator) normalAllocate(expectedReplicas int32, specifiedSubsetReplicas *map[string]int32) *map[string]int32 { var specifiedReplicas int32 specifiedSubsetCount := 0 // Step 1: apply replicas to specified subsets, and mark them as specified = true. @@ -203,7 +214,7 @@ func (s *replicasAllocator) normalAllocate(expectedReplicas int32, specifiedSubs return s.toSubsetReplicaMap() } -func (s *replicasAllocator) toSubsetReplicaMap() *map[string]int32 { +func (s *specificAllocator) toSubsetReplicaMap() *map[string]int32 { allocatedReplicas := map[string]int32{} for _, subset := range *s.subsets { allocatedReplicas[subset.SubsetName] = subset.Replicas @@ -212,7 +223,7 @@ func (s *replicasAllocator) toSubsetReplicaMap() *map[string]int32 { return &allocatedReplicas } -func (s *replicasAllocator) String() string { +func (s *specificAllocator) String() string { result := "" sort.Sort(s.subsets) for _, subset := range *s.subsets { @@ -221,3 +232,88 @@ func (s *replicasAllocator) String() string { return result } + +type elasticAllocator struct { + *appsv1alpha1.UnitedDeployment +} + +// Alloc returns a mapping from subset to next replicas. +// Next replicas is allocated by elasticAllocator, which will consider the current minReplicas and maxReplicas +// of each subset and spec.replicas of UnitedDeployment. For example: +// spec.replicas: 5 +// subsets: +// - name: subset-a +// minReplicas: 2 # will be satisfied with 1st priority +// maxReplicas: 4 # will be satisfied with 3rd priority +// - name: subset-b +// minReplicas: 2 # will be satisfied with 2nd priority +// maxReplicas: nil # will be satisfied with 4th priority +// +// the results of map will be: {"subset-a": 3, "subset-b": 2} +func (ac *elasticAllocator) Alloc(_ *map[string]*Subset) (*map[string]int32, error) { + replicas := int32(1) + if ac.Spec.Replicas != nil { + replicas = *ac.Spec.Replicas + } + + minReplicasMap, maxReplicasMap, err := ac.validateAndCalculateMinMaxMap(replicas) + if err != nil { + return nil, err + } + return ac.alloc(replicas, minReplicasMap, maxReplicasMap), nil +} + +func (ac *elasticAllocator) validateAndCalculateMinMaxMap(replicas int32) (map[string]int32, map[string]int32, error) { + totalMin, totalMax := int64(0), int64(0) + numSubset := len(ac.Spec.Topology.Subsets) + minReplicasMap := make(map[string]int32, numSubset) + maxReplicasMap := make(map[string]int32, numSubset) + for index, subset := range ac.Spec.Topology.Subsets { + minReplicas := int32(0) + if subset.MinReplicas != nil { + minReplicas, _ = ParseSubsetReplicas(replicas, *subset.MinReplicas) + } + totalMin += int64(minReplicas) + minReplicasMap[subset.Name] = minReplicas + + maxReplicas := int32(1000000) + if subset.MaxReplicas != nil { + maxReplicas, _ = ParseSubsetReplicas(replicas, *subset.MaxReplicas) + } + totalMax += int64(maxReplicas) + maxReplicasMap[subset.Name] = maxReplicas + + if minReplicas > maxReplicas { + return nil, nil, fmt.Errorf("subset[%d].maxReplicas must be more than or equal to minReplicas", index) + } + } + return minReplicasMap, maxReplicasMap, nil +} + +func (ac *elasticAllocator) alloc(replicas int32, minReplicasMap, maxReplicasMap map[string]int32) *map[string]int32 { + allocated := int32(0) + // Step 1: satisfy the minimum replicas of each subset firstly. + subsetReplicas := make(map[string]int32, len(ac.Spec.Topology.Subsets)) + for _, subset := range ac.Spec.Topology.Subsets { + minReplicas := minReplicasMap[subset.Name] + addReplicas := integer.Int32Min(minReplicas, replicas-allocated) + addReplicas = integer.Int32Max(addReplicas, 0) + subsetReplicas[subset.Name] = addReplicas + allocated += addReplicas + } + + if allocated >= replicas { // no quota to allocate. + return &subsetReplicas + } + + // Step 2: satisfy the maximum replicas of each subset. + for _, subset := range ac.Spec.Topology.Subsets { + maxReplicas := maxReplicasMap[subset.Name] + minReplicas := minReplicasMap[subset.Name] + addReplicas := integer.Int32Min(maxReplicas-minReplicas, replicas-allocated) + addReplicas = integer.Int32Max(addReplicas, 0) + subsetReplicas[subset.Name] += addReplicas + allocated += addReplicas + } + return &subsetReplicas +} diff --git a/pkg/controller/uniteddeployment/allocator_test.go b/pkg/controller/uniteddeployment/allocator_test.go index d1c2ba0abd..cc50ab411b 100644 --- a/pkg/controller/uniteddeployment/allocator_test.go +++ b/pkg/controller/uniteddeployment/allocator_test.go @@ -17,9 +17,20 @@ limitations under the License. package uniteddeployment import ( + "fmt" + "sort" "testing" + + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/utils/pointer" ) +func sortToAllocator(infos subsetInfos) *specificAllocator { + sort.Sort(infos) + return &specificAllocator{subsets: &infos} +} + func TestScaleReplicas(t *testing.T) { infos := subsetInfos{ createSubset("t1", 1), @@ -27,7 +38,7 @@ func TestScaleReplicas(t *testing.T) { createSubset("t3", 2), createSubset("t4", 2), } - allocator := infos.SortToAllocator() + allocator := sortToAllocator(infos) allocator.AllocateReplicas(5, &map[string]int32{}) if " t1 -> 1; t3 -> 1; t4 -> 1; t2 -> 2;" != allocator.String() { t.Fatalf("unexpected %s", allocator) @@ -36,7 +47,7 @@ func TestScaleReplicas(t *testing.T) { infos = subsetInfos{ createSubset("t1", 2), } - allocator = infos.SortToAllocator() + allocator = sortToAllocator(infos) allocator.AllocateReplicas(0, &map[string]int32{}) if " t1 -> 0;" != allocator.String() { t.Fatalf("unexpected %s", allocator) @@ -48,7 +59,7 @@ func TestScaleReplicas(t *testing.T) { createSubset("t3", 2), createSubset("t4", 2), } - allocator = infos.SortToAllocator() + allocator = sortToAllocator(infos) allocator.AllocateReplicas(17, &map[string]int32{}) if " t1 -> 4; t3 -> 4; t4 -> 4; t2 -> 5;" != allocator.String() { t.Fatalf("unexpected %s", allocator) @@ -60,7 +71,7 @@ func TestScaleReplicas(t *testing.T) { createSubset("t3", 2), createSubset("t4", 2), } - allocator = infos.SortToAllocator() + allocator = sortToAllocator(infos) allocator.AllocateReplicas(9, &map[string]int32{}) if " t1 -> 2; t3 -> 2; t4 -> 2; t2 -> 3;" != allocator.String() { t.Fatalf("unexpected %s", allocator) @@ -70,7 +81,7 @@ func TestScaleReplicas(t *testing.T) { createSubset("t1", 0), createSubset("t2", 10), } - allocator = infos.SortToAllocator() + allocator = sortToAllocator(infos) allocator.AllocateReplicas(19, &map[string]int32{}) if " t1 -> 9; t2 -> 10;" != allocator.String() { t.Fatalf("unexpected %s", allocator) @@ -80,7 +91,7 @@ func TestScaleReplicas(t *testing.T) { createSubset("t1", 0), createSubset("t2", 10), } - allocator = infos.SortToAllocator() + allocator = sortToAllocator(infos) allocator.AllocateReplicas(21, &map[string]int32{}) if " t1 -> 10; t2 -> 11;" != allocator.String() { t.Fatalf("unexpected %s", allocator) @@ -94,7 +105,7 @@ func TestSpecifyValidReplicas(t *testing.T) { createSubset("t3", 2), createSubset("t4", 2), } - allocator := infos.SortToAllocator() + allocator := sortToAllocator(infos) allocator.AllocateReplicas(27, &map[string]int32{ "t1": 4, "t3": 4, @@ -109,7 +120,7 @@ func TestSpecifyValidReplicas(t *testing.T) { createSubset("t3", 2), createSubset("t4", 2), } - allocator = infos.SortToAllocator() + allocator = sortToAllocator(infos) allocator.AllocateReplicas(8, &map[string]int32{ "t1": 4, "t3": 4, @@ -124,7 +135,7 @@ func TestSpecifyValidReplicas(t *testing.T) { createSubset("t3", 2), createSubset("t4", 2), } - allocator = infos.SortToAllocator() + allocator = sortToAllocator(infos) allocator.AllocateReplicas(16, &map[string]int32{ "t1": 4, "t2": 4, @@ -141,7 +152,7 @@ func TestSpecifyValidReplicas(t *testing.T) { createSubset("t3", 2), createSubset("t4", 2), } - allocator = infos.SortToAllocator() + allocator = sortToAllocator(infos) allocator.AllocateReplicas(10, &map[string]int32{ "t1": 1, "t2": 2, @@ -157,7 +168,7 @@ func TestSpecifyValidReplicas(t *testing.T) { createSubset("t3", 2), createSubset("t4", 2), } - allocator = infos.SortToAllocator() + allocator = sortToAllocator(infos) allocator.AllocateReplicas(10, &map[string]int32{ "t1": 1, "t2": 2, @@ -174,7 +185,7 @@ func TestSpecifyValidReplicas(t *testing.T) { createSubset("t3", 2), createSubset("t4", 2), } - allocator = infos.SortToAllocator() + allocator = sortToAllocator(infos) allocator.AllocateReplicas(-1, &map[string]int32{ "t1": 1, "t2": 2, @@ -191,7 +202,7 @@ func TestSpecifyInvalidReplicas(t *testing.T) { createSubset("t1", 10), createSubset("t2", 4), } - allocator := infos.SortToAllocator() + allocator := sortToAllocator(infos) allocator.AllocateReplicas(14, &map[string]int32{ "t1": 6, "t2": 6, @@ -204,7 +215,7 @@ func TestSpecifyInvalidReplicas(t *testing.T) { createSubset("t1", 10), createSubset("t2", 4), } - allocator = infos.SortToAllocator() + allocator = sortToAllocator(infos) allocator.AllocateReplicas(14, &map[string]int32{ "t1": 10, "t2": 11, @@ -214,6 +225,88 @@ func TestSpecifyInvalidReplicas(t *testing.T) { } } +func TestCapacityAllocator(t *testing.T) { + cases := []struct { + name string + replicas int32 + minReplicas []int32 + maxReplicas []int32 + desiredReplicas []int32 + }{ + { + name: "sum_all_min_replicas == replicas", + replicas: 10, + minReplicas: []int32{ + 2, 2, 2, 2, 2, + }, + maxReplicas: []int32{ + 5, 5, 5, 5, -1, + }, + desiredReplicas: []int32{ + 2, 2, 2, 2, 2, + }, + }, + { + name: "sum_all_min_replicas < replicas", + replicas: 14, + minReplicas: []int32{ + 2, 2, 2, 2, 2, + }, + maxReplicas: []int32{ + 5, 5, 5, 5, -1, + }, + desiredReplicas: []int32{ + 5, 3, 2, 2, 2, + }, + }, + { + name: "sum_all_min_replicas > replicas", + replicas: 5, + minReplicas: []int32{ + 2, 2, 2, 2, 2, + }, + maxReplicas: []int32{ + 5, 5, 5, 5, -1, + }, + desiredReplicas: []int32{ + 2, 2, 1, 0, 0, + }, + }, + } + + for _, cs := range cases { + t.Run(cs.name, func(t *testing.T) { + ud := appsv1alpha1.UnitedDeployment{} + ud.Spec.Replicas = pointer.Int32(cs.replicas) + ud.Spec.Topology.Subsets = []appsv1alpha1.Subset{} + for index := range cs.minReplicas { + min := intstr.FromInt(int(cs.minReplicas[index])) + var max *intstr.IntOrString + if cs.maxReplicas[index] != -1 { + m := intstr.FromInt(int(cs.maxReplicas[index])) + max = &m + } + ud.Spec.Topology.Subsets = append(ud.Spec.Topology.Subsets, appsv1alpha1.Subset{ + Name: fmt.Sprintf("subset-%d", index), + MinReplicas: &min, + MaxReplicas: max, + }) + } + + ca := elasticAllocator{&ud} + result, err := ca.Alloc(nil) + if err != nil { + t.Fatalf("unexpected error %v", err) + } + for index := range cs.desiredReplicas { + if (*result)[fmt.Sprintf("subset-%d", index)] != cs.desiredReplicas[index] { + t.Fatalf("unexpected result %v", result) + } + } + }) + } +} + func createSubset(name string, replicas int32) *nameToReplicas { return &nameToReplicas{ Replicas: replicas, diff --git a/pkg/controller/uniteddeployment/uniteddeployment_controller.go b/pkg/controller/uniteddeployment/uniteddeployment_controller.go index d785450263..b5334c4606 100644 --- a/pkg/controller/uniteddeployment/uniteddeployment_controller.go +++ b/pkg/controller/uniteddeployment/uniteddeployment_controller.go @@ -213,7 +213,7 @@ func (r *ReconcileUnitedDeployment) Reconcile(_ context.Context, request reconci return reconcile.Result{}, err } - nextReplicas, err := GetAllocatedReplicas(nameToSubset, instance) + nextReplicas, err := NewReplicaAllocator(instance).Alloc(nameToSubset) klog.V(4).Infof("Get UnitedDeployment %s/%s next replicas %v", instance.Namespace, instance.Name, nextReplicas) if err != nil { klog.Errorf("UnitedDeployment %s/%s Specified subset replicas is ineffective: %s", diff --git a/pkg/webhook/uniteddeployment/validating/uniteddeployment_validation.go b/pkg/webhook/uniteddeployment/validating/uniteddeployment_validation.go index 2de34fd4a0..e09976573e 100644 --- a/pkg/webhook/uniteddeployment/validating/uniteddeployment_validation.go +++ b/pkg/webhook/uniteddeployment/validating/uniteddeployment_validation.go @@ -32,6 +32,7 @@ import ( "k8s.io/kubernetes/pkg/apis/core" corev1 "k8s.io/kubernetes/pkg/apis/core/v1" apivalidation "k8s.io/kubernetes/pkg/apis/core/validation" + "k8s.io/utils/pointer" appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" udctrl "github.com/openkruise/kruise/pkg/controller/uniteddeployment" @@ -62,14 +63,9 @@ func validateUnitedDeploymentSpec(spec *appsv1alpha1.UnitedDeploymentSpec, fldPa allErrs = append(allErrs, validateSubsetTemplate(&spec.Template, selector, fldPath.Child("template"))...) } - var sumReplicas int32 - var expectedReplicas int32 = -1 - if spec.Replicas != nil { - expectedReplicas = *spec.Replicas - } - count := 0 - subSetNames := sets.String{} + allErrs = append(allErrs, validateSubsetReplicas(spec.Replicas, spec.Topology.Subsets, fldPath.Child("topology", "subsets"))...) + subSetNames := sets.String{} for i, subset := range spec.Topology.Subsets { if len(subset.Name) == 0 { allErrs = append(allErrs, field.Required(fldPath.Child("topology", "subsets").Index(i).Child("name"), "")) @@ -108,39 +104,108 @@ func validateUnitedDeploymentSpec(spec *appsv1alpha1.UnitedDeploymentSpec, fldPa if subset.Replicas == nil { continue } + } - replicas, err := udctrl.ParseSubsetReplicas(expectedReplicas, *subset.Replicas) - if err != nil { - allErrs = append(allErrs, field.Invalid(fldPath.Child("topology", "subsets").Index(i).Child("replicas"), subset.Replicas, fmt.Sprintf("invalid replicas %s", subset.Replicas.String()))) - } else { - sumReplicas += replicas - count++ + if spec.UpdateStrategy.ManualUpdate != nil { + for subset := range spec.UpdateStrategy.ManualUpdate.Partitions { + if !subSetNames.Has(subset) { + allErrs = append(allErrs, field.Invalid(fldPath.Child("updateStrategy", "partitions"), spec.UpdateStrategy.ManualUpdate.Partitions, fmt.Sprintf("subset %s does not exist", subset))) + } } } - if expectedReplicas != -1 { - // sum of subset replicas may be less than uniteddployment replicas - if sumReplicas > expectedReplicas { - allErrs = append(allErrs, field.Invalid(fldPath.Child("topology", "subsets"), sumReplicas, fmt.Sprintf("sum of indicated subset replicas %d should not be greater than UnitedDeployment replicas %d", sumReplicas, expectedReplicas))) + return allErrs +} + +func validateSubsetReplicas(expectedReplicas *int32, subsets []appsv1alpha1.Subset, fldPath *field.Path) field.ErrorList { + var ( + sumReplicas = int64(0) + sumMinReplicas = int64(0) + sumMaxReplicas = int64(0) + + countReplicas = 0 + countMaxReplicas = 0 + + hasReplicasSettings = false + hasCapacitySettings = false + + err error + errList field.ErrorList + ) + + if expectedReplicas == nil { + expectedReplicas = pointer.Int32(-1) + } + + for i, subset := range subsets { + replicas := int32(0) + if subset.Replicas != nil { + countReplicas++ + hasReplicasSettings = true + replicas, err = udctrl.ParseSubsetReplicas(*expectedReplicas, *subset.Replicas) + if err != nil { + errList = append(errList, field.Invalid(fldPath.Index(i).Child("replicas"), subset.Replicas, err.Error())) + } + } + sumReplicas += int64(replicas) + + minReplicas := int32(0) + if subset.MinReplicas != nil { + hasCapacitySettings = true + minReplicas, err = udctrl.ParseSubsetReplicas(*expectedReplicas, *subset.MinReplicas) + if err != nil { + errList = append(errList, field.Invalid(fldPath.Index(i).Child("minReplicas"), subset.MaxReplicas, err.Error())) + } + } + sumMinReplicas += int64(minReplicas) + + maxReplicas := int32(1000000) + if subset.MaxReplicas != nil { + countMaxReplicas++ + hasCapacitySettings = true + maxReplicas, err = udctrl.ParseSubsetReplicas(*expectedReplicas, *subset.MaxReplicas) + if err != nil { + errList = append(errList, field.Invalid(fldPath.Index(i).Child("minReplicas"), subset.MaxReplicas, err.Error())) + } } + sumMaxReplicas += int64(maxReplicas) - if count > 0 && count == len(spec.Topology.Subsets) && sumReplicas != expectedReplicas { - allErrs = append(allErrs, field.Invalid(fldPath.Child("topology", "subsets"), sumReplicas, fmt.Sprintf("if replicas of all subsets are provided, the sum of indicated subset replicas %d should equal UnitedDeployment replicas %d", sumReplicas, expectedReplicas))) + if minReplicas > maxReplicas { + errList = append(errList, field.Invalid(fldPath.Index(i).Child("minReplicas"), subset.MaxReplicas, + fmt.Sprintf("subset[%d].minReplicas must be more than or equal to maxReplicas", i))) } - } else if count != len(spec.Topology.Subsets) { - // validate all of subsets replicas are not nil - allErrs = append(allErrs, field.Invalid(fldPath.Child("topology", "subsets"), sumReplicas, "if UnitedDeployment replicas is not provided, replicas of all subsets should be provided")) } - if spec.UpdateStrategy.ManualUpdate != nil { - for subset := range spec.UpdateStrategy.ManualUpdate.Partitions { - if !subSetNames.Has(subset) { - allErrs = append(allErrs, field.Invalid(fldPath.Child("updateStrategy", "partitions"), spec.UpdateStrategy.ManualUpdate.Partitions, fmt.Sprintf("subset %s does not exist", subset))) + if hasReplicasSettings && hasCapacitySettings { + errList = append(errList, field.Invalid(fldPath, subsets, "subset.Replicas and subset.MinReplicas/subset.MaxReplicas are mutually exclusive in a UnitedDeployment")) + return errList + } + + if hasCapacitySettings { + if *expectedReplicas == -1 { + errList = append(errList, field.Invalid(fldPath, expectedReplicas, "spec.replicas must be not empty if you set subset.minReplicas/maxReplicas")) + } + if countMaxReplicas >= len(subsets) { + errList = append(errList, field.Invalid(fldPath, countMaxReplicas, "at least one subset.maxReplicas must be empty")) + } + if sumMinReplicas > sumMaxReplicas { + errList = append(errList, field.Invalid(fldPath, sumMinReplicas, "sum of indicated subset.minReplicas should not be greater than sum of indicated subset.maxReplicas")) + } + } else { + if *expectedReplicas != -1 { + // sum of subset replicas may be less than uniteddployment replicas + if sumReplicas > int64(*expectedReplicas) { + errList = append(errList, field.Invalid(fldPath, sumReplicas, fmt.Sprintf("sum of indicated subset replicas %d should not be greater than UnitedDeployment replicas %d", sumReplicas, expectedReplicas))) + } + if countReplicas > 0 && countReplicas == len(subsets) && sumReplicas != int64(*expectedReplicas) { + errList = append(errList, field.Invalid(fldPath, sumReplicas, fmt.Sprintf("if replicas of all subsets are provided, the sum of indicated subset replicas %d should equal UnitedDeployment replicas %d", sumReplicas, expectedReplicas))) } + } else if countReplicas != len(subsets) { + // validate all of subsets replicas are not nil + errList = append(errList, field.Invalid(fldPath, sumReplicas, "if UnitedDeployment replicas is not provided, replicas of all subsets should be provided")) } } - - return allErrs + return errList } // validateUnitedDeployment validates a UnitedDeployment. diff --git a/pkg/webhook/uniteddeployment/validating/uniteddeployment_validation_test.go b/pkg/webhook/uniteddeployment/validating/uniteddeployment_validation_test.go index d93e4c8d1f..08216a9003 100644 --- a/pkg/webhook/uniteddeployment/validating/uniteddeployment_validation_test.go +++ b/pkg/webhook/uniteddeployment/validating/uniteddeployment_validation_test.go @@ -17,6 +17,7 @@ limitations under the License. package validating import ( + "fmt" "strconv" "strings" "testing" @@ -25,6 +26,8 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/validation/field" + "k8s.io/utils/pointer" appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" appsv1beta1 "github.com/openkruise/kruise/apis/apps/v1beta1" @@ -1030,6 +1033,99 @@ func TestValidateUnitedDeploymentUpdate(t *testing.T) { } } +func Test(t *testing.T) { + cases := []struct { + name string + replicas int32 + minReplicas []int32 + maxReplicas []int32 + errorHappen bool + }{ + { + name: "sum_all_min_replicas == replicas", + replicas: 10, + minReplicas: []int32{ + 2, 2, 2, 2, 2, + }, + maxReplicas: []int32{ + 5, 5, 5, 5, -1, + }, + errorHappen: false, + }, + { + name: "sum_all_min_replicas < replicas", + replicas: 14, + minReplicas: []int32{ + 2, 2, 2, 2, 2, + }, + maxReplicas: []int32{ + 5, 5, 5, 5, -1, + }, + errorHappen: false, + }, + { + name: "sum_all_min_replicas > replicas", + replicas: 5, + minReplicas: []int32{ + 2, 2, 2, 2, 2, + }, + maxReplicas: []int32{ + 5, 5, 5, 5, -1, + }, + errorHappen: false, + }, + { + name: "min_replicas > max_replicas", + replicas: 5, + minReplicas: []int32{ + 2, 2, 2, 6, 2, + }, + maxReplicas: []int32{ + 5, 5, 5, 5, -1, + }, + errorHappen: true, + }, + { + name: "all_max_replicas != nil", + replicas: 5, + minReplicas: []int32{ + 2, 2, 2, 2, 2, + }, + maxReplicas: []int32{ + 5, 5, 5, 5, 5, + }, + errorHappen: true, + }, + } + + for _, cs := range cases { + t.Run(cs.name, func(t *testing.T) { + ud := appsv1alpha1.UnitedDeployment{} + ud.Spec.Replicas = pointer.Int32(cs.replicas) + ud.Spec.Topology.Subsets = []appsv1alpha1.Subset{} + for index := range cs.minReplicas { + min := intstr.FromInt(int(cs.minReplicas[index])) + var max *intstr.IntOrString + if cs.maxReplicas[index] != -1 { + m := intstr.FromInt(int(cs.maxReplicas[index])) + max = &m + } + ud.Spec.Topology.Subsets = append(ud.Spec.Topology.Subsets, appsv1alpha1.Subset{ + Name: fmt.Sprintf("subset-%d", index), + MinReplicas: &min, + MaxReplicas: max, + }) + } + errList := validateSubsetReplicas(&cs.replicas, ud.Spec.Topology.Subsets, field.NewPath("subset")) + if len(errList) > 0 && !cs.errorHappen { + t.Errorf("expected success, but got error: %v", errList) + } else if len(errList) == 0 && cs.errorHappen { + t.Errorf("expected error, but got success") + } + }) + } +} + func setTestDefault(obj *appsv1alpha1.UnitedDeployment) { if obj.Spec.RevisionHistoryLimit == nil { obj.Spec.RevisionHistoryLimit = new(int32) diff --git a/test/e2e/apps/uniteddeployment.go b/test/e2e/apps/uniteddeployment.go new file mode 100644 index 0000000000..9d41cd9d01 --- /dev/null +++ b/test/e2e/apps/uniteddeployment.go @@ -0,0 +1,80 @@ +package apps + +import ( + "fmt" + + "github.com/onsi/ginkgo" + kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned" + "github.com/openkruise/kruise/test/e2e/framework" + "k8s.io/apimachinery/pkg/util/intstr" + clientset "k8s.io/client-go/kubernetes" +) + +var _ = SIGDescribe("uniteddeployment", func() { + f := framework.NewDefaultFramework("uniteddeployment") + var ns string + var c clientset.Interface + var kc kruiseclientset.Interface + var tester *framework.UnitedDeploymentTester + + ginkgo.BeforeEach(func() { + c = f.ClientSet + kc = f.KruiseClientSet + ns = f.Namespace.Name + tester = framework.NewUnitedDeploymentTester(c, kc, ns) + }) + + ginkgo.It("united deployment with elastic allocator", func() { + replicas := func(r int) *intstr.IntOrString { p := intstr.FromInt(r); return &p } + udManager := tester.NewUnitedDeploymentManager("ud-elastic-test") + udManager.AddSubset("subset-0", nil, replicas(1), replicas(2)) + udManager.AddSubset("subset-1", nil, replicas(1), replicas(2)) + udManager.AddSubset("subset-2", nil, replicas(1), nil) + + replicasMap := func(replicas []int32) map[string]int32 { + replicaMap := make(map[string]int32) + for i, r := range replicas { + replicaMap[fmt.Sprintf("subset-%d", i)] = r + } + return replicaMap + } + ginkgo.By("test replicas equals to sum of min replicas") + udManager.Create(3) + udManager.CheckSubsets(replicasMap([]int32{1, 1, 1})) + + ginkgo.By("test replicas more than sum of min replicas") + udManager.Scale(7) + udManager.CheckSubsets(replicasMap([]int32{2, 2, 3})) + + ginkgo.By("test replicas less than sum of min replicas") + udManager.Scale(1) + udManager.CheckSubsets(replicasMap([]int32{1, 0, 0})) + }) + + ginkgo.It("united deployment with specific allocator", func() { + replicas := func(p string) *intstr.IntOrString { x := intstr.FromString(p); return &x } + udManager := tester.NewUnitedDeploymentManager("ud-specific-test") + udManager.AddSubset("subset-0", replicas("25%"), nil, nil) + udManager.AddSubset("subset-1", replicas("25%"), nil, nil) + udManager.AddSubset("subset-2", nil, nil, nil) + + replicasMap := func(replicas []int32) map[string]int32 { + replicaMap := make(map[string]int32) + for i, r := range replicas { + replicaMap[fmt.Sprintf("subset-%d", i)] = r + } + return replicaMap + } + ginkgo.By("create ud") + udManager.Create(3) + udManager.CheckSubsets(replicasMap([]int32{1, 1, 1})) + + ginkgo.By("scale up") + udManager.Scale(4) + udManager.CheckSubsets(replicasMap([]int32{1, 1, 2})) + + ginkgo.By("scale down") + udManager.Scale(1) + udManager.CheckSubsets(replicasMap([]int32{0, 0, 1})) + }) +}) diff --git a/test/e2e/framework/uniteddeployment.go b/test/e2e/framework/uniteddeployment.go new file mode 100644 index 0000000000..02bab7b662 --- /dev/null +++ b/test/e2e/framework/uniteddeployment.go @@ -0,0 +1,133 @@ +package framework + +import ( + "context" + "fmt" + "github.com/onsi/gomega" + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned" + appsv1 "k8s.io/api/apps/v1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/utils/pointer" + "reflect" + "time" +) + +type UnitedDeploymentTester struct { + c clientset.Interface + kc kruiseclientset.Interface + ns string +} + +func NewUnitedDeploymentTester(c clientset.Interface, kc kruiseclientset.Interface, ns string) *UnitedDeploymentTester { + return &UnitedDeploymentTester{ + c: c, + kc: kc, + ns: ns, + } +} + +func (t *UnitedDeploymentTester) NewUnitedDeploymentManager(name string) *UnitedDeploymentManager { + return &UnitedDeploymentManager{ + UnitedDeployment: &appsv1alpha1.UnitedDeployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: t.ns, + }, + Spec: appsv1alpha1.UnitedDeploymentSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": name, + }, + }, + Template: appsv1alpha1.SubsetTemplate{ + DeploymentTemplate: &appsv1alpha1.DeploymentTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": name, + }, + }, + Spec: appsv1.DeploymentSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": name, + }, + }, + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": name, + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "busybox", + Image: "busybox:1.32", + Command: []string{ + "/bin/sh", "-c", "sleep 100d", + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + kc: t.kc, + } +} + +type UnitedDeploymentManager struct { + *appsv1alpha1.UnitedDeployment + kc kruiseclientset.Interface +} + +func (m *UnitedDeploymentManager) AddSubset(name string, replicas, minReplicas, maxReplicas *intstr.IntOrString) { + m.Spec.Topology.Subsets = append(m.Spec.Topology.Subsets, appsv1alpha1.Subset{ + Name: name, + Replicas: replicas, + MinReplicas: minReplicas, + MaxReplicas: maxReplicas, + }) +} + +func (m *UnitedDeploymentManager) Scale(replicas int32) { + m.Spec.Replicas = pointer.Int32(replicas) + _, err := m.kc.AppsV1alpha1().UnitedDeployments(m.Namespace).Patch(context.TODO(), m.Name, types.MergePatchType, + []byte(fmt.Sprintf(`{"spec":{"replicas":%d}}`, replicas)), metav1.PatchOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + gomega.Eventually(func() bool { + ud, err := m.kc.AppsV1alpha1().UnitedDeployments(m.Namespace).Get(context.TODO(), m.Name, metav1.GetOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + return ud.Status.Replicas == replicas && ud.Generation == ud.Status.ObservedGeneration + }, time.Minute, time.Second).Should(gomega.BeTrue()) +} + +func (m *UnitedDeploymentManager) Create(replicas int32) { + m.Spec.Replicas = pointer.Int32(replicas) + _, err := m.kc.AppsV1alpha1().UnitedDeployments(m.Namespace).Create(context.TODO(), m.UnitedDeployment, metav1.CreateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + time.Sleep(3 * time.Second) + gomega.Eventually(func() bool { + ud, err := m.kc.AppsV1alpha1().UnitedDeployments(m.Namespace).Get(context.TODO(), m.Name, metav1.GetOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + return ud.Status.Replicas == replicas && ud.Generation == ud.Status.ObservedGeneration + }, time.Minute, time.Second).Should(gomega.BeTrue()) +} + +func (m *UnitedDeploymentManager) CheckSubsets(replicas map[string]int32) { + gomega.Eventually(func() bool { + ud, err := m.kc.AppsV1alpha1().UnitedDeployments(m.Namespace).Get(context.TODO(), m.Name, metav1.GetOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + return ud.GetGeneration() == ud.Status.ObservedGeneration && *ud.Spec.Replicas == ud.Status.Replicas && reflect.DeepEqual(replicas, ud.Status.SubsetReplicas) + }, time.Minute, time.Second).Should(gomega.BeTrue()) +}