Skip to content

Commit

Permalink
cache assigned pod count
Browse files Browse the repository at this point in the history
Signed-off-by: KunWuLuan <kunwuluan@gmail.com>
  • Loading branch information
KunWuLuan committed Mar 28, 2024
1 parent cd3e4fb commit 07beae1
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 24 deletions.
81 changes: 58 additions & 23 deletions pkg/coscheduling/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
informerv1 "k8s.io/client-go/informers/core/v1"
listerv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -54,10 +56,11 @@ const (
type Manager interface {
PreFilter(context.Context, *corev1.Pod) error
Permit(context.Context, *corev1.Pod) Status
Unreserve(context.Context, *corev1.Pod)
GetPodGroup(context.Context, *corev1.Pod) (string, *v1alpha1.PodGroup)
GetAssignedPodCount(string) int
GetCreationTimestamp(*corev1.Pod, time.Time) time.Time
DeletePermittedPodGroup(string)
CalculateAssignedPods(string, string) int
ActivateSiblings(pod *corev1.Pod, state *framework.CycleState)
BackoffPodGroup(string, time.Duration)
}
Expand All @@ -77,9 +80,15 @@ type PodGroupManager struct {
backedOffPG *gochache.Cache
// podLister is pod lister
podLister listerv1.PodLister
//
podgroups map[string]PodGroupInfo
sync.RWMutex
}

type PodGroupInfo struct {
assigned sets.Set[string]
}

// NewPodGroupManager creates a new operation object.
func NewPodGroupManager(client client.Client, snapshotSharedLister framework.SharedLister, scheduleTimeout *time.Duration, podInformer informerv1.PodInformer) *PodGroupManager {
pgMgr := &PodGroupManager{
Expand All @@ -89,10 +98,37 @@ func NewPodGroupManager(client client.Client, snapshotSharedLister framework.Sha
podLister: podInformer.Lister(),
permittedPG: gochache.New(3*time.Second, 3*time.Second),
backedOffPG: gochache.New(10*time.Second, 10*time.Second),
podgroups: map[string]PodGroupInfo{},
}
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: func(obj interface{}) {
switch t := obj.(type) {
case *corev1.Pod:
pod := t
pgMgr.Unreserve(context.Background(), pod)
return
case cache.DeletedFinalStateUnknown:
pod, ok := t.Obj.(*corev1.Pod)
if !ok {
return
}
pgMgr.Unreserve(context.Background(), pod)
return
default:
return
}
},
})
return pgMgr
}

func (pgMgr *PodGroupManager) GetAssignedPodCount(pgName string) int {
if pgInfo, exist := pgMgr.podgroups[pgName]; exist {
return len(pgInfo.assigned)
}
return 0
}

func (pgMgr *PodGroupManager) BackoffPodGroup(pgName string, backoff time.Duration) {
if backoff == time.Duration(0) {
return
Expand Down Expand Up @@ -203,15 +239,34 @@ func (pgMgr *PodGroupManager) Permit(ctx context.Context, pod *corev1.Pod) Statu
return PodGroupNotFound
}

assigned := pgMgr.CalculateAssignedPods(pg.Name, pg.Namespace)
pgInfo, exist := pgMgr.podgroups[pgFullName]
if !exist {
pgInfo = PodGroupInfo{assigned: sets.Set[string]{}}
}
pgInfo.assigned.Insert(pod.Name)
// The number of pods that have been assigned nodes is calculated from the snapshot.
// The current pod in not included in the snapshot during the current scheduling cycle.
if int32(assigned)+1 >= pg.Spec.MinMember {
if len(pgInfo.assigned) >= int(pg.Spec.MinMember) {
return Success
}
return Wait
}

func (pgMgr *PodGroupManager) Unreserve(ctx context.Context, pod *corev1.Pod) {
pgFullName, _ := pgMgr.GetPodGroup(ctx, pod)
if pgFullName == "" {
return
}

pgInfo, exist := pgMgr.podgroups[pgFullName]
if exist {
pgInfo.assigned.Delete(pod.Name)
if len(pgInfo.assigned) == 0 {
delete(pgInfo.assigned, pgFullName)
}
}
}

// GetCreationTimestamp returns the creation time of a podGroup or a pod.
func (pgMgr *PodGroupManager) GetCreationTimestamp(pod *corev1.Pod, ts time.Time) time.Time {
pgName := util.GetPodGroupLabel(pod)
Expand Down Expand Up @@ -243,26 +298,6 @@ func (pgMgr *PodGroupManager) GetPodGroup(ctx context.Context, pod *corev1.Pod)
return fmt.Sprintf("%v/%v", pod.Namespace, pgName), &pg
}

// CalculateAssignedPods returns the number of pods that has been assigned nodes: assumed or bound.
func (pgMgr *PodGroupManager) CalculateAssignedPods(podGroupName, namespace string) int {
nodeInfos, err := pgMgr.snapshotSharedLister.NodeInfos().List()
if err != nil {
klog.ErrorS(err, "Cannot get nodeInfos from frameworkHandle")
return 0
}
var count int
for _, nodeInfo := range nodeInfos {
for _, podInfo := range nodeInfo.Pods {
pod := podInfo.Pod
if util.GetPodGroupLabel(pod) == podGroupName && pod.Namespace == namespace && pod.Spec.NodeName != "" {
count++
}
}
}

return count
}

// CheckClusterResource checks if resource capacity of the cluster can satisfy <resourceRequest>.
// It returns an error detailing the resource gap if not satisfied; otherwise returns nil.
func CheckClusterResource(nodeList []*framework.NodeInfo, resourceRequest corev1.ResourceList, desiredPodGroupName string) error {
Expand Down
3 changes: 2 additions & 1 deletion pkg/coscheduling/coscheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (cs *Coscheduling) PostFilter(ctx context.Context, state *framework.CycleSt

// This indicates there are already enough Pods satisfying the PodGroup,
// so don't bother to reject the whole PodGroup.
assigned := cs.pgMgr.CalculateAssignedPods(pg.Name, pod.Namespace)
assigned := cs.pgMgr.GetAssignedPodCount(pg.Name)
if assigned >= int(pg.Spec.MinMember) {
klog.V(4).InfoS("Assigned pods", "podGroup", klog.KObj(pg), "assigned", assigned)
return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable)
Expand Down Expand Up @@ -247,6 +247,7 @@ func (cs *Coscheduling) Unreserve(ctx context.Context, state *framework.CycleSta
if pg == nil {
return
}
cs.pgMgr.Unreserve(ctx, pod)
cs.frameworkHandler.IterateOverWaitingPods(func(waitingPod framework.WaitingPod) {
if waitingPod.GetPod().Namespace == pod.Namespace && util.GetPodGroupLabel(waitingPod.GetPod()) == pg.Name {
klog.V(3).InfoS("Unreserve rejects", "pod", klog.KObj(waitingPod.GetPod()), "podGroup", klog.KObj(pg))
Expand Down

0 comments on commit 07beae1

Please sign in to comment.