Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: add Reservation restricted options to control allocatable resources #1894

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions apis/extension/reservation.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ const (

// AnnotationReservationAffinity represents the constraints of Pod selection Reservation
AnnotationReservationAffinity = SchedulingDomainPrefix + "/reservation-affinity"

// AnnotationReservationRestrictedOptions represent the Reservation Restricted options
AnnotationReservationRestrictedOptions = SchedulingDomainPrefix + "/reservation-restricted-options"
)

type ReservationAllocated struct {
Expand Down Expand Up @@ -67,6 +70,14 @@ type ReservationAffinitySelector struct {
ReservationSelectorTerms []corev1.NodeSelectorTerm `json:"reservationSelectorTerms,omitempty"`
}

type ReservationRestrictedOptions struct {
// Resources means that when the Pod intersects with these resources,
// it can only allocate the reserved amount at most.
// If the Reservation's AllocatePolicy is Restricted, and no resources configured,
// by default the resources equal all reserved resources by the Reservation.
Resources []corev1.ResourceName `json:"resources,omitempty"`
}
eahydra marked this conversation as resolved.
Show resolved Hide resolved

func GetReservationAllocated(pod *corev1.Pod) (*ReservationAllocated, error) {
if pod.Annotations == nil {
return nil, nil
Expand Down Expand Up @@ -122,3 +133,27 @@ func SetReservationAffinity(obj metav1.Object, affinity *ReservationAffinity) er
obj.SetAnnotations(annotations)
return nil
}

func GetReservationRestrictedOptions(annotations map[string]string) (*ReservationRestrictedOptions, error) {
var options ReservationRestrictedOptions
if s, ok := annotations[AnnotationReservationRestrictedOptions]; ok && s != "" {
if err := json.Unmarshal([]byte(s), &options); err != nil {
return nil, err
}
}
return &options, nil
}

func SetReservationRestrictedOptions(obj metav1.Object, options *ReservationRestrictedOptions) error {
data, err := json.Marshal(options)
if err != nil {
return err
}
annotations := obj.GetAnnotations()
if annotations == nil {
annotations = map[string]string{}
}
annotations[AnnotationReservationRestrictedOptions] = string(data)
obj.SetAnnotations(annotations)
return nil
}
34 changes: 32 additions & 2 deletions pkg/scheduler/frameworkext/reservation_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
quotav1 "k8s.io/apiserver/pkg/quota/v1"
corev1helpers "k8s.io/component-helpers/scheduling/corev1"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -77,15 +78,30 @@ func (p *PodRequirement) Clone() *PodRequirement {
}

func NewReservationInfo(r *schedulingv1alpha1.Reservation) *ReservationInfo {
var parseErrors []error
allocatable := reservationutil.ReservationRequests(r)
resourceNames := quotav1.ResourceNames(allocatable)
if r.Spec.AllocatePolicy == schedulingv1alpha1.ReservationAllocatePolicyRestricted {
options, err := apiext.GetReservationRestrictedOptions(r.Annotations)
if err == nil {
resourceNames = reservationutil.GetReservationRestrictedResources(resourceNames, options)
} else {
parseErrors = append(parseErrors, err)
}
}
reservedPod := reservationutil.NewReservePod(r)

ownerMatchers, err := reservationutil.ParseReservationOwnerMatchers(r.Spec.Owners)
if err != nil {
parseErrors = append(parseErrors, err)
klog.ErrorS(err, "Failed to parse reservation owner matchers", "reservation", klog.KObj(r))
}

var parseError error
if len(parseErrors) > 0 {
parseError = utilerrors.NewAggregate(parseErrors)
}

return &ReservationInfo{
Reservation: r,
Pod: reservedPod,
Expand All @@ -94,13 +110,21 @@ func NewReservationInfo(r *schedulingv1alpha1.Reservation) *ReservationInfo {
AllocatablePorts: util.RequestedHostPorts(reservedPod),
AssignedPods: map[types.UID]*PodRequirement{},
OwnerMatchers: ownerMatchers,
ParseError: err,
ParseError: parseError,
}
}

func NewReservationInfoFromPod(pod *corev1.Pod) *ReservationInfo {
var parseErrors []error

allocatable, _ := resource.PodRequestsAndLimits(pod)
resourceNames := quotav1.ResourceNames(allocatable)
options, err := apiext.GetReservationRestrictedOptions(pod.Annotations)
if err == nil {
resourceNames = reservationutil.GetReservationRestrictedResources(resourceNames, options)
} else {
parseErrors = append(parseErrors, err)
}

owners, err := apiext.GetReservationOwners(pod.Annotations)
if err != nil {
Expand All @@ -110,18 +134,24 @@ func NewReservationInfoFromPod(pod *corev1.Pod) *ReservationInfo {
if owners != nil {
ownerMatchers, err = reservationutil.ParseReservationOwnerMatchers(owners)
if err != nil {
parseErrors = append(parseErrors, err)
klog.ErrorS(err, "Failed to parse reservation owner matchers of pod", "pod", klog.KObj(pod))
}
}

var parseError error
if len(parseErrors) > 0 {
parseError = utilerrors.NewAggregate(parseErrors)
}

return &ReservationInfo{
Pod: pod,
ResourceNames: resourceNames,
Allocatable: allocatable,
AllocatablePorts: util.RequestedHostPorts(pod),
AssignedPods: map[types.UID]*PodRequirement{},
OwnerMatchers: ownerMatchers,
ParseError: err,
ParseError: parseError,
}
}

Expand Down
19 changes: 19 additions & 0 deletions pkg/util/reservation/reservation.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,3 +528,22 @@ func UpdateReservationResizeAllocatable(obj metav1.Object, resources corev1.Reso
}
return SetReservationResizeAllocatable(obj, resizeAllocatable)
}

func GetReservationRestrictedResources(allocatableResources []corev1.ResourceName, options *extension.ReservationRestrictedOptions) []corev1.ResourceName {
if options == nil {
return allocatableResources
}
result := make([]corev1.ResourceName, 0, len(allocatableResources))
for _, resourceName := range allocatableResources {
for _, v := range options.Resources {
if resourceName == v {
result = append(result, resourceName)
break
}
}
}
if len(result) == 0 {
result = allocatableResources
}
return result
}
47 changes: 47 additions & 0 deletions pkg/util/reservation/reservation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/utils/pointer"

apiext "github.com/koordinator-sh/koordinator/apis/extension"
schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1"
)

Expand Down Expand Up @@ -747,3 +748,49 @@ func TestReservationRequests(t *testing.T) {
})
}
}

func TestGetReservationRestrictedResources(t *testing.T) {
tests := []struct {
name string
resourceNames []corev1.ResourceName
options *apiext.ReservationRestrictedOptions
want []corev1.ResourceName
}{
{
name: "no options, got all allocatable resources",
resourceNames: []corev1.ResourceName{"cpu", "memory"},
options: nil,
want: []corev1.ResourceName{"cpu", "memory"},
},
{
name: "has options and same as resourceNames",
resourceNames: []corev1.ResourceName{"cpu", "memory"},
options: &apiext.ReservationRestrictedOptions{
Resources: []corev1.ResourceName{"cpu", "memory"},
},
want: []corev1.ResourceName{"cpu", "memory"},
},
{
name: "has options but different resourceNames",
resourceNames: []corev1.ResourceName{"cpu", "memory"},
options: &apiext.ReservationRestrictedOptions{
Resources: []corev1.ResourceName{"cpu"},
},
want: []corev1.ResourceName{"cpu"},
},
{
name: "has options but no resourceNames",
resourceNames: []corev1.ResourceName{"cpu", "memory"},
options: &apiext.ReservationRestrictedOptions{
Resources: []corev1.ResourceName{},
},
want: []corev1.ResourceName{"cpu", "memory"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := GetReservationRestrictedResources(tt.resourceNames, tt.options)
assert.Equal(t, tt.want, got)
})
}
}
Loading