Skip to content

Commit

Permalink
add subset capacity planning for UnitiedDeployment
Browse files Browse the repository at this point in the history
Signed-off-by: mingzhou.swx <mingzhou.swx@alibaba-inc.com>
  • Loading branch information
mingzhou.swx committed Oct 20, 2023
1 parent 024644c commit 4e7bb73
Show file tree
Hide file tree
Showing 10 changed files with 596 additions and 47 deletions.
19 changes: 19 additions & 0 deletions apis/apps/defaults/v1alpha1.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,25 @@ func SetDefaultsUnitedDeployment(obj *v1alpha1.UnitedDeployment, injectTemplateD
}
}
}

hasReplicasSettings := false
hasCapacitySettings := false
for _, subset := range obj.Spec.Topology.Subsets {
if subset.Replicas != nil {
hasReplicasSettings = true
}
if subset.MinReplicas != nil || subset.MaxReplicas != nil {
hasCapacitySettings = true
}
}
if hasCapacitySettings && !hasReplicasSettings {
for i := range obj.Spec.Topology.Subsets {
subset := &obj.Spec.Topology.Subsets[i]
if subset.MinReplicas == nil {
subset.MinReplicas = &intstr.IntOrString{Type: intstr.Int, IntVal: 0}
}
}
}
}

// SetDefaults_CloneSet set default values for CloneSet.
Expand Down
2 changes: 1 addition & 1 deletion apis/apps/v1alpha1/uniteddeployment_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ type Subset struct {
// Indicates the lower bounded replicas of the subset.
// MinReplicas must be more than or equal to 0 if it is set.
// Spec.Replicas must be more than or equal the sum of MinReplicas of all subsets.
// Defaults to nil.
// Defaults to 0.
// +optional
MinReplicas *intstr.IntOrString `json:"minReplicas,omitempty"`

Expand Down
2 changes: 1 addition & 1 deletion config/crd/bases/apps.kruise.io_uniteddeployments.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -992,7 +992,7 @@ spec:
description: Indicates the lower bounded replicas of the
subset. MinReplicas must be more than or equal to 0 if
it is set. Spec.Replicas must be more than or equal the
sum of MinReplicas of all subsets. Defaults to nil.
sum of MinReplicas of all subsets. Defaults to 0.
x-kubernetes-int-or-string: true
name:
description: Indicates subset name as a DNS_LABEL, which
Expand Down
128 changes: 112 additions & 16 deletions pkg/controller/uniteddeployment/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"strings"

"k8s.io/klog/v2"
"k8s.io/utils/integer"

appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
)
Expand Down Expand Up @@ -54,33 +55,50 @@ func (n subsetInfos) Swap(i, j int) {
n[i], n[j] = n[j], n[i]
}

// GetAllocatedReplicas returns a mapping from subset to next replicas.
// Next replicas is allocated by replicasAllocator, which will consider the current replicas of each subset and
type ReplicaAllocator interface {
Alloc(nameToSubset *map[string]*Subset) (*map[string]int32, error)
}

func NewReplicaAllocator(ud *appsv1alpha1.UnitedDeployment) ReplicaAllocator {
for _, subset := range ud.Spec.Topology.Subsets {
if subset.MinReplicas != nil || subset.MaxReplicas != nil {
return &capacityAllocator{ud}
}
}
return &typicalAllocator{ud}
}

type typicalAllocator struct {
*appsv1alpha1.UnitedDeployment
}

// Alloc returns a mapping from subset to next replicas.
// Next replicas is allocated by realReplicasAllocator, which will consider the current replicas of each subset and
// new replicas indicated from UnitedDeployment.Spec.Topology.Subsets.
func GetAllocatedReplicas(nameToSubset *map[string]*Subset, ud *appsv1alpha1.UnitedDeployment) (*map[string]int32, error) {
subsetInfos := getSubsetInfos(nameToSubset, ud)
func (ac *typicalAllocator) Alloc(nameToSubset *map[string]*Subset) (*map[string]int32, error) {
subsetInfos := getSubsetInfos(nameToSubset, ac.UnitedDeployment)

var expectedReplicas int32 = -1
if ud.Spec.Replicas != nil {
expectedReplicas = *ud.Spec.Replicas
if ac.Spec.Replicas != nil {
expectedReplicas = *ac.Spec.Replicas
}

specifiedReplicas := getSpecifiedSubsetReplicas(expectedReplicas, ud)
klog.V(4).Infof("UnitedDeployment %s/%s specifiedReplicas: %v", ud.Namespace, ud.Name, specifiedReplicas)
specifiedReplicas := getSpecifiedSubsetReplicas(expectedReplicas, ac.UnitedDeployment)
klog.V(4).Infof("UnitedDeployment %s/%s specifiedReplicas: %v", ac.Namespace, ac.Name, specifiedReplicas)
// call SortToAllocator to sort all subset by subset.Replicas in order of increment
return subsetInfos.SortToAllocator().AllocateReplicas(expectedReplicas, specifiedReplicas)
}

func (n subsetInfos) SortToAllocator() *replicasAllocator {
func (n subsetInfos) SortToAllocator() *realReplicasAllocator {
sort.Sort(n)
return &replicasAllocator{subsets: &n}
return &realReplicasAllocator{subsets: &n}
}

type replicasAllocator struct {
type realReplicasAllocator struct {
subsets *subsetInfos
}

func (s *replicasAllocator) validateReplicas(replicas int32, subsetReplicasLimits *map[string]int32) error {
func (s *realReplicasAllocator) validateReplicas(replicas int32, subsetReplicasLimits *map[string]int32) error {
if subsetReplicasLimits == nil {
return nil
}
Expand Down Expand Up @@ -150,7 +168,7 @@ func getSubsetInfos(nameToSubset *map[string]*Subset, ud *appsv1alpha1.UnitedDep
// AllocateReplicas will first try to check the specifiedSubsetReplicas is valid or not.
// If valid , normalAllocate will be called. It will apply these specified replicas, then average the rest replicas to left unspecified subsets.
// If not, it will return error
func (s *replicasAllocator) AllocateReplicas(replicas int32, specifiedSubsetReplicas *map[string]int32) (
func (s *realReplicasAllocator) AllocateReplicas(replicas int32, specifiedSubsetReplicas *map[string]int32) (
*map[string]int32, error) {
if err := s.validateReplicas(replicas, specifiedSubsetReplicas); err != nil {
return nil, err
Expand All @@ -159,7 +177,7 @@ func (s *replicasAllocator) AllocateReplicas(replicas int32, specifiedSubsetRepl
return s.normalAllocate(replicas, specifiedSubsetReplicas), nil
}

func (s *replicasAllocator) normalAllocate(expectedReplicas int32, specifiedSubsetReplicas *map[string]int32) *map[string]int32 {
func (s *realReplicasAllocator) normalAllocate(expectedReplicas int32, specifiedSubsetReplicas *map[string]int32) *map[string]int32 {
var specifiedReplicas int32
specifiedSubsetCount := 0
// Step 1: apply replicas to specified subsets, and mark them as specified = true.
Expand Down Expand Up @@ -203,7 +221,7 @@ func (s *replicasAllocator) normalAllocate(expectedReplicas int32, specifiedSubs
return s.toSubsetReplicaMap()
}

func (s *replicasAllocator) toSubsetReplicaMap() *map[string]int32 {
func (s *realReplicasAllocator) toSubsetReplicaMap() *map[string]int32 {
allocatedReplicas := map[string]int32{}
for _, subset := range *s.subsets {
allocatedReplicas[subset.SubsetName] = subset.Replicas
Expand All @@ -212,7 +230,7 @@ func (s *replicasAllocator) toSubsetReplicaMap() *map[string]int32 {
return &allocatedReplicas
}

func (s *replicasAllocator) String() string {
func (s *realReplicasAllocator) String() string {
result := ""
sort.Sort(s.subsets)
for _, subset := range *s.subsets {
Expand All @@ -221,3 +239,81 @@ func (s *replicasAllocator) String() string {

return result
}

type capacityAllocator struct {
*appsv1alpha1.UnitedDeployment
}

// Alloc returns a mapping from subset to next replicas.
// Next replicas is allocated by realReplicasAllocator, which will consider the current replicas of each subset and
// new replicas indicated from UnitedDeployment.Spec.Topology.Subsets.
func (ac *capacityAllocator) Alloc(_ *map[string]*Subset) (*map[string]int32, error) {
replicas := int32(1)
if ac.Spec.Replicas != nil {
replicas = *ac.Spec.Replicas
}

minReplicasMap, maxReplicasMap, err := ac.validateAndCalculateMinMaxMap(replicas)
if err != nil {
return nil, err
}
return ac.alloc(replicas, minReplicasMap, maxReplicasMap), nil
}

func (ac *capacityAllocator) validateAndCalculateMinMaxMap(replicas int32) (map[string]int32, map[string]int32, error) {
totalMin, totalMax := int64(0), int64(0)
numSubset := len(ac.Spec.Topology.Subsets)
minReplicasMap := make(map[string]int32, numSubset)
maxReplicasMap := make(map[string]int32, numSubset)
for index, subset := range ac.Spec.Topology.Subsets {
minReplicas := int32(0)
if subset.MinReplicas != nil {
minReplicas, _ = ParseSubsetReplicas(replicas, *subset.MinReplicas)
}
totalMin += int64(minReplicas)
minReplicasMap[subset.Name] = minReplicas

maxReplicas := int32(1000000)
if subset.MaxReplicas != nil {
maxReplicas, _ = ParseSubsetReplicas(replicas, *subset.MaxReplicas)
}
totalMax += int64(maxReplicas)
maxReplicasMap[subset.Name] = maxReplicas

if minReplicas > maxReplicas {
return nil, nil, fmt.Errorf("subset[%d].maxReplicas must be more than or equal to minReplicas", index)
}
}
if int64(replicas) < totalMin {
klog.Warningf("UnitedDeployment %s spec.replicas must be more than or equal to sum of minReplicas of all subsets %d", klog.KObj(ac.UnitedDeployment), totalMin)
}
return minReplicasMap, maxReplicasMap, nil
}

func (ac *capacityAllocator) alloc(replicas int32, minReplicasMap, maxReplicasMap map[string]int32) *map[string]int32 {
allocated := int32(0)
// Step 1: satisfy the minimum replicas of each subset firstly.
subsetReplicas := make(map[string]int32, len(ac.Spec.Topology.Subsets))
for _, subset := range ac.Spec.Topology.Subsets {
minReplicas := minReplicasMap[subset.Name]
addReplicas := integer.Int32Min(minReplicas, replicas-allocated)
addReplicas = integer.Int32Max(addReplicas, 0)
subsetReplicas[subset.Name] = addReplicas
allocated += addReplicas
}

if allocated >= replicas { // no quota to allocate.
return &subsetReplicas
}

// Step 2: satisfy the maximum replicas of each subset.
for _, subset := range ac.Spec.Topology.Subsets {
maxReplicas := maxReplicasMap[subset.Name]
minReplicas := minReplicasMap[subset.Name]
addReplicas := integer.Int32Min(maxReplicas-minReplicas, replicas-allocated)
addReplicas = integer.Int32Max(addReplicas, 0)
subsetReplicas[subset.Name] += addReplicas
allocated += addReplicas
}
return &subsetReplicas
}
87 changes: 87 additions & 0 deletions pkg/controller/uniteddeployment/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@ limitations under the License.
package uniteddeployment

import (
"fmt"
"testing"

appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/utils/pointer"
)

func TestScaleReplicas(t *testing.T) {
Expand Down Expand Up @@ -214,6 +219,88 @@ func TestSpecifyInvalidReplicas(t *testing.T) {
}
}

func TestCapacityAllocator(t *testing.T) {
cases := []struct {
name string
replicas int32
minReplicas []int32
maxReplicas []int32
desiredReplicas []int32
}{
{
name: "sum_all_min_replicas == replicas",
replicas: 10,
minReplicas: []int32{
2, 2, 2, 2, 2,
},
maxReplicas: []int32{
5, 5, 5, 5, -1,
},
desiredReplicas: []int32{
2, 2, 2, 2, 2,
},
},
{
name: "sum_all_min_replicas < replicas",
replicas: 14,
minReplicas: []int32{
2, 2, 2, 2, 2,
},
maxReplicas: []int32{
5, 5, 5, 5, -1,
},
desiredReplicas: []int32{
5, 3, 2, 2, 2,
},
},
{
name: "sum_all_min_replicas > replicas",
replicas: 5,
minReplicas: []int32{
2, 2, 2, 2, 2,
},
maxReplicas: []int32{
5, 5, 5, 5, -1,
},
desiredReplicas: []int32{
2, 2, 1, 0, 0,
},
},
}

for _, cs := range cases {
t.Run(cs.name, func(t *testing.T) {
ud := appsv1alpha1.UnitedDeployment{}
ud.Spec.Replicas = pointer.Int32(cs.replicas)
ud.Spec.Topology.Subsets = []appsv1alpha1.Subset{}
for index := range cs.minReplicas {
min := intstr.FromInt(int(cs.minReplicas[index]))
var max *intstr.IntOrString
if cs.maxReplicas[index] != -1 {
m := intstr.FromInt(int(cs.maxReplicas[index]))
max = &m
}
ud.Spec.Topology.Subsets = append(ud.Spec.Topology.Subsets, appsv1alpha1.Subset{
Name: fmt.Sprintf("subset-%d", index),
MinReplicas: &min,
MaxReplicas: max,
})
}

ca := capacityAllocator{&ud}
result, err := ca.Alloc(nil)
if err != nil {
t.Fatalf("unexpected error %v", err)
}
for index := range cs.desiredReplicas {
if (*result)[fmt.Sprintf("subset-%d", index)] != cs.desiredReplicas[index] {
t.Fatalf("unexpected result %v", result)
}
}
})
}
}

func createSubset(name string, replicas int32) *nameToReplicas {
return &nameToReplicas{
Replicas: replicas,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (r *ReconcileUnitedDeployment) Reconcile(_ context.Context, request reconci
return reconcile.Result{}, err
}

nextReplicas, err := GetAllocatedReplicas(nameToSubset, instance)
nextReplicas, err := NewReplicaAllocator(instance).Alloc(nameToSubset)
klog.V(4).Infof("Get UnitedDeployment %s/%s next replicas %v", instance.Namespace, instance.Name, nextReplicas)
if err != nil {
klog.Errorf("UnitedDeployment %s/%s Specified subset replicas is ineffective: %s",
Expand Down
Loading

0 comments on commit 4e7bb73

Please sign in to comment.