Skip to content

Commit

Permalink
Merge pull request #652 from Huang-Wei/coscheduling-migrate-controlle…
Browse files Browse the repository at this point in the history
…r-runtime

refactor coscheduling to use controller-runtime client
  • Loading branch information
k8s-ci-robot authored Oct 31, 2023
2 parents 5eb736d + c0e0cbf commit 40c0fee
Show file tree
Hide file tree
Showing 8 changed files with 794 additions and 489 deletions.
44 changes: 14 additions & 30 deletions pkg/coscheduling/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,9 @@ import (
listerv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"
"sigs.k8s.io/controller-runtime/pkg/client"

"sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1"
pgclientset "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned"
pginformer "sigs.k8s.io/scheduler-plugins/pkg/generated/informers/externalversions/scheduling/v1alpha1"
pglister "sigs.k8s.io/scheduler-plugins/pkg/generated/listers/scheduling/v1alpha1"
"sigs.k8s.io/scheduler-plugins/pkg/util"
)

Expand All @@ -56,7 +54,7 @@ const (
type Manager interface {
PreFilter(context.Context, *corev1.Pod) error
Permit(context.Context, *corev1.Pod) Status
GetPodGroup(*corev1.Pod) (string, *v1alpha1.PodGroup)
GetPodGroup(context.Context, *corev1.Pod) (string, *v1alpha1.PodGroup)
GetCreationTimestamp(*corev1.Pod, time.Time) time.Time
DeletePermittedPodGroup(string)
CalculateAssignedPods(string, string) int
Expand All @@ -66,8 +64,8 @@ type Manager interface {

// PodGroupManager defines the scheduling operation called
type PodGroupManager struct {
// pgClient is a podGroup client
pgClient pgclientset.Interface
// client is a generic controller-runtime client to manipulate both core resources and PodGroups.
client client.Client
// snapshotSharedLister is pod shared list
snapshotSharedLister framework.SharedLister
// scheduleTimeout is the default timeout for podgroup scheduling.
Expand All @@ -77,21 +75,17 @@ type PodGroupManager struct {
permittedPG *gochache.Cache
// backedOffPG stores the podgorup name which failed scheudling recently.
backedOffPG *gochache.Cache
// pgLister is podgroup lister
pgLister pglister.PodGroupLister
// podLister is pod lister
podLister listerv1.PodLister
sync.RWMutex
}

// NewPodGroupManager creates a new operation object.
func NewPodGroupManager(pgClient pgclientset.Interface, snapshotSharedLister framework.SharedLister, scheduleTimeout *time.Duration,
pgInformer pginformer.PodGroupInformer, podInformer informerv1.PodInformer) *PodGroupManager {
func NewPodGroupManager(client client.Client, snapshotSharedLister framework.SharedLister, scheduleTimeout *time.Duration, podInformer informerv1.PodInformer) *PodGroupManager {
pgMgr := &PodGroupManager{
pgClient: pgClient,
client: client,
snapshotSharedLister: snapshotSharedLister,
scheduleTimeout: scheduleTimeout,
pgLister: pgInformer.Lister(),
podLister: podInformer.Lister(),
permittedPG: gochache.New(3*time.Second, 3*time.Second),
backedOffPG: gochache.New(10*time.Second, 10*time.Second),
Expand Down Expand Up @@ -149,7 +143,7 @@ func (pgMgr *PodGroupManager) ActivateSiblings(pod *corev1.Pod, state *framework
// that is required to be scheduled.
func (pgMgr *PodGroupManager) PreFilter(ctx context.Context, pod *corev1.Pod) error {
klog.V(5).InfoS("Pre-filter", "pod", klog.KObj(pod))
pgFullName, pg := pgMgr.GetPodGroup(pod)
pgFullName, pg := pgMgr.GetPodGroup(ctx, pod)
if pg == nil {
return nil
}
Expand Down Expand Up @@ -200,7 +194,7 @@ func (pgMgr *PodGroupManager) PreFilter(ctx context.Context, pod *corev1.Pod) er

// Permit permits a pod to run, if the minMember match, it would send a signal to chan.
func (pgMgr *PodGroupManager) Permit(ctx context.Context, pod *corev1.Pod) Status {
pgFullName, pg := pgMgr.GetPodGroup(pod)
pgFullName, pg := pgMgr.GetPodGroup(ctx, pod)
if pgFullName == "" {
return PodGroupNotSpecified
}
Expand All @@ -224,8 +218,8 @@ func (pgMgr *PodGroupManager) GetCreationTimestamp(pod *corev1.Pod, ts time.Time
if len(pgName) == 0 {
return ts
}
pg, err := pgMgr.pgLister.PodGroups(pod.Namespace).Get(pgName)
if err != nil {
var pg v1alpha1.PodGroup
if err := pgMgr.client.Get(context.TODO(), types.NamespacedName{Namespace: pod.Namespace, Name: pgName}, &pg); err != nil {
return ts
}
return pg.CreationTimestamp.Time
Expand All @@ -236,27 +230,17 @@ func (pgMgr *PodGroupManager) DeletePermittedPodGroup(pgFullName string) {
pgMgr.permittedPG.Delete(pgFullName)
}

// PatchPodGroup patches a podGroup.
func (pgMgr *PodGroupManager) PatchPodGroup(pgName string, namespace string, patch []byte) error {
if len(patch) == 0 {
return nil
}
_, err := pgMgr.pgClient.SchedulingV1alpha1().PodGroups(namespace).Patch(context.TODO(), pgName,
types.MergePatchType, patch, metav1.PatchOptions{})
return err
}

// GetPodGroup returns the PodGroup that a Pod belongs to in cache.
func (pgMgr *PodGroupManager) GetPodGroup(pod *corev1.Pod) (string, *v1alpha1.PodGroup) {
func (pgMgr *PodGroupManager) GetPodGroup(ctx context.Context, pod *corev1.Pod) (string, *v1alpha1.PodGroup) {
pgName := util.GetPodGroupLabel(pod)
if len(pgName) == 0 {
return "", nil
}
pg, err := pgMgr.pgLister.PodGroups(pod.Namespace).Get(pgName)
if err != nil {
var pg v1alpha1.PodGroup
if err := pgMgr.client.Get(ctx, types.NamespacedName{Namespace: pod.Namespace, Name: pgName}, &pg); err != nil {
return fmt.Sprintf("%v/%v", pod.Namespace, pgName), nil
}
return fmt.Sprintf("%v/%v", pod.Namespace, pgName), pg
return fmt.Sprintf("%v/%v", pod.Namespace, pgName), &pg
}

// CalculateAssignedPods returns the number of pods that has been assigned nodes: assumed or bound.
Expand Down
Loading

0 comments on commit 40c0fee

Please sign in to comment.