Skip to content

Commit

Permalink
feat: update volumebinding codes with 1.31 k8s
Browse files Browse the repository at this point in the history
Signed-off-by: vie-serendipity <2733147505@qq.com>
  • Loading branch information
vie-serendipity committed Oct 21, 2024
1 parent 7931706 commit eb5c068
Show file tree
Hide file tree
Showing 3 changed files with 680 additions and 14 deletions.
17 changes: 8 additions & 9 deletions pkg/scheduler/capabilities/volumebinding/binder.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
storagev1 "k8s.io/api/storage/v1"
storagev1beta1 "k8s.io/api/storage/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
Expand Down Expand Up @@ -1012,12 +1013,6 @@ func (b *volumeBinder) revertAssumedPVCs(claims []*v1.PersistentVolumeClaim) {
// hasEnoughCapacity checks whether the provisioner has enough capacity left for a new volume of the given size
// that is available from the node.
func (b *volumeBinder) hasEnoughCapacity(logger klog.Logger, provisioner string, claim *v1.PersistentVolumeClaim, storageClass *storagev1.StorageClass, node *v1.Node) (bool, error) {
// This is an optional feature. If disabled, we assume that
// there is enough storage.
if !b.capacityCheckEnabled {
return true, nil
}

quantity, ok := claim.Spec.Resources.Requests[v1.ResourceStorage]
if !ok {
// No capacity to check for.
Expand Down Expand Up @@ -1064,12 +1059,16 @@ func (b *volumeBinder) hasEnoughCapacity(logger klog.Logger, provisioner string,
}

func capacitySufficient(capacity *storagev1beta1.CSIStorageCapacity, sizeInBytes int64) bool {
limit := capacity.Capacity
limit := volumeLimit(capacity)
return limit != nil && limit.Value() >= sizeInBytes
}

func volumeLimit(capacity *storagev1beta1.CSIStorageCapacity) *resource.Quantity {
if capacity.MaximumVolumeSize != nil {
// Prefer MaximumVolumeSize if available, it is more precise.
limit = capacity.MaximumVolumeSize
return capacity.MaximumVolumeSize
}
return limit != nil && limit.Value() >= sizeInBytes
return capacity.Capacity
}

func (b *volumeBinder) nodeHasAccess(logger klog.Logger, node *v1.Node, capacity *storagev1beta1.CSIStorageCapacity) bool {
Expand Down
156 changes: 151 additions & 5 deletions pkg/scheduler/capabilities/volumebinding/volume_binding.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import (
"time"

v1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
storagev1beta1 "k8s.io/api/storage/v1beta1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
corelisters "k8s.io/client-go/listers/core/v1"
Expand All @@ -35,6 +38,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
"k8s.io/kubernetes/pkg/scheduler/util"

"volcano.sh/volcano/cmd/scheduler/app/options"
)
Expand Down Expand Up @@ -98,14 +102,15 @@ func (pl *VolumeBinding) EventsToRegister(_ context.Context) ([]framework.Cluste
// Pods may fail because of missing or mis-configured storage class
// (e.g., allowedTopologies, volumeBindingMode), and hence may become
// schedulable upon StorageClass Add or Update events.
{Event: framework.ClusterEvent{Resource: framework.StorageClass, ActionType: framework.Add | framework.Update}},
{Event: framework.ClusterEvent{Resource: framework.StorageClass, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterStorageClassChange},

// We bind PVCs with PVs, so any changes may make the pods schedulable.
{Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add | framework.Update}},
{Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterPersistentVolumeClaimChange},
{Event: framework.ClusterEvent{Resource: framework.PersistentVolume, ActionType: framework.Add | framework.Update}},

// Pods may fail to find available PVs because the node labels do not
// match the storage class's allowed topologies or PV's node affinity.
// A new or updated node may make pods schedulable.
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel}},
//
// A note about UpdateNodeTaint event:
// NodeAdd QueueingHint isn't always called because of the internal feature called preCheck.
Expand All @@ -116,16 +121,157 @@ func (pl *VolumeBinding) EventsToRegister(_ context.Context) ([]framework.Cluste
// We can remove UpdateNodeTaint when we remove the preCheck feature.
// See: https://github.com/kubernetes/kubernetes/issues/110175
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel | framework.UpdateNodeTaint}},

// We rely on CSI node to translate in-tree PV to CSI.
{Event: framework.ClusterEvent{Resource: framework.CSINode, ActionType: framework.Add | framework.Update}},
// TODO: kube-schduler will unregister the CSINode events once all the volume plugins has completed their CSI migration.
{Event: framework.ClusterEvent{Resource: framework.CSINode, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterCSINodeChange},

// When CSIStorageCapacity is enabled, pods may become schedulable
// on CSI driver & storage capacity changes.
{Event: framework.ClusterEvent{Resource: framework.CSIDriver, ActionType: framework.Add | framework.Update}},
{Event: framework.ClusterEvent{Resource: framework.CSIStorageCapacity, ActionType: framework.Add | framework.Update}},
{Event: framework.ClusterEvent{Resource: framework.CSIStorageCapacity, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterCSIStorageCapacityChange},
}
return events, nil
}

func (pl *VolumeBinding) isSchedulableAfterCSINodeChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
if oldObj == nil {
logger.V(5).Info("CSINode creation could make the pod schedulable")
return framework.Queue, nil
}
oldCSINode, modifiedCSINode, err := util.As[*storagev1.CSINode](oldObj, newObj)
if err != nil {
return framework.Queue, err
}

logger = klog.LoggerWithValues(
logger,
"Pod", klog.KObj(pod),
"CSINode", klog.KObj(modifiedCSINode),
)

if oldCSINode.ObjectMeta.Annotations[v1.MigratedPluginsAnnotationKey] != modifiedCSINode.ObjectMeta.Annotations[v1.MigratedPluginsAnnotationKey] {
logger.V(5).Info("CSINode's migrated plugins annotation is updated and that may make the pod schedulable")
return framework.Queue, nil
}

logger.V(5).Info("CISNode was created or updated but it doesn't make this pod schedulable")
return framework.QueueSkip, nil
}

func (pl *VolumeBinding) isSchedulableAfterPersistentVolumeClaimChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
_, newPVC, err := util.As[*v1.PersistentVolumeClaim](oldObj, newObj)
if err != nil {
return framework.Queue, err
}

logger = klog.LoggerWithValues(
logger,
"Pod", klog.KObj(pod),
"PersistentVolumeClaim", klog.KObj(newPVC),
)

if pod.Namespace != newPVC.Namespace {
logger.V(5).Info("PersistentVolumeClaim was created or updated, but it doesn't make this pod schedulable because the PVC belongs to a different namespace")
return framework.QueueSkip, nil
}

for _, vol := range pod.Spec.Volumes {
var pvcName string
switch {
case vol.PersistentVolumeClaim != nil:
pvcName = vol.PersistentVolumeClaim.ClaimName
case vol.Ephemeral != nil:
pvcName = ephemeral.VolumeClaimName(pod, &vol)
default:
continue
}

if pvcName == newPVC.Name {
// Return Queue because, in this case,
// all PVC creations and almost all PVC updates could make the Pod schedulable.
logger.V(5).Info("PersistentVolumeClaim the pod requires was created or updated, potentially making the target Pod schedulable")
return framework.Queue, nil
}
}

logger.V(5).Info("PersistentVolumeClaim was created or updated, but it doesn't make this pod schedulable")
return framework.QueueSkip, nil
}

// isSchedulableAfterStorageClassChange checks whether an StorageClass event might make a Pod schedulable or not.
// Any StorageClass addition and a StorageClass update to allowedTopologies
// might make a Pod schedulable.
// Note that an update to volume binding mode is not allowed and we don't have to consider while examining the update event.
func (pl *VolumeBinding) isSchedulableAfterStorageClassChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
oldSC, newSC, err := util.As[*storagev1.StorageClass](oldObj, newObj)
if err != nil {
return framework.Queue, err
}

logger = klog.LoggerWithValues(
logger,
"Pod", klog.KObj(pod),
"StorageClass", klog.KObj(newSC),
)

if oldSC == nil {
// No further filtering can be made for a creation event,
// and we just always return Queue.
logger.V(5).Info("A new StorageClass was created, which could make a Pod schedulable")
return framework.Queue, nil
}

if !apiequality.Semantic.DeepEqual(newSC.AllowedTopologies, oldSC.AllowedTopologies) {
logger.V(5).Info("StorageClass got an update in AllowedTopologies", "AllowedTopologies", newSC.AllowedTopologies)
return framework.Queue, nil
}

logger.V(5).Info("StorageClass was updated, but it doesn't make this pod schedulable")
return framework.QueueSkip, nil
}

// isSchedulableAfterCSIStorageCapacityChange checks whether a CSIStorageCapacity event
// might make a Pod schedulable or not.
// Any CSIStorageCapacity addition and a CSIStorageCapacity update to volume limit
// (calculated based on capacity and maximumVolumeSize) might make a Pod schedulable.
// Note that an update to nodeTopology and storageClassName is not allowed and
// we don't have to consider while examining the update event.
func (pl *VolumeBinding) isSchedulableAfterCSIStorageCapacityChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
oldCap, newCap, err := util.As[*storagev1beta1.CSIStorageCapacity](oldObj, newObj)
if err != nil {
return framework.Queue, err
}

if oldCap == nil {
logger.V(5).Info(
"A new CSIStorageCapacity was created, which could make a Pod schedulable",
"Pod", klog.KObj(pod),
"CSIStorageCapacity", klog.KObj(newCap),
)
return framework.Queue, nil
}

oldLimit := volumeLimit(oldCap)
newLimit := volumeLimit(newCap)

logger = klog.LoggerWithValues(
logger,
"Pod", klog.KObj(pod),
"CSIStorageCapacity", klog.KObj(newCap),
"volumeLimit(new)", newLimit,
"volumeLimit(old)", oldLimit,
)

if newLimit != nil && (oldLimit == nil || newLimit.Value() > oldLimit.Value()) {
logger.V(5).Info("VolumeLimit was increased, which could make a Pod schedulable")
return framework.Queue, nil
}

logger.V(5).Info("CSIStorageCapacity was updated, but it doesn't make this pod schedulable")
return framework.QueueSkip, nil
}

// podHasPVCs returns 2 values:
// - the first one to denote if the given "pod" has any PVC defined.
// - the second one to return any error if the requested PVC is illegal.
Expand Down
Loading

0 comments on commit eb5c068

Please sign in to comment.