Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
KunWuLuan committed Apr 3, 2024
1 parent 9bfafc9 commit d7b2a45
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 15 deletions.
30 changes: 30 additions & 0 deletions pkg/coscheduling/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,29 @@ type PodGroupManager struct {
sync.RWMutex
}

func AddPodFactory(pgMgr *PodGroupManager) func(obj interface{}) {
return func(obj interface{}) {
p, ok := obj.(*corev1.Pod)
if !ok {
return
}
if p.Spec.NodeName == "" {
return
}
pgFullName, _ := pgMgr.GetPodGroup(context.Background(), p)
if pgFullName == "" {
return
}
pgMgr.RWMutex.Lock()
defer pgMgr.RWMutex.Unlock()
if assigned, exist := pgMgr.assignedPodsByPG[pgFullName]; exist {
assigned.Insert(p.Name)
} else {
pgMgr.assignedPodsByPG[pgFullName] = sets.New(p.Name)
}
}
}

// NewPodGroupManager creates a new operation object.
func NewPodGroupManager(client client.Client, snapshotSharedLister framework.SharedLister, scheduleTimeout *time.Duration, podInformer informerv1.PodInformer) *PodGroupManager {
pgMgr := &PodGroupManager{
Expand All @@ -97,17 +120,24 @@ func NewPodGroupManager(client client.Client, snapshotSharedLister framework.Sha
assignedPodsByPG: map[string]sets.Set[string]{},
}
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: AddPodFactory(pgMgr),
DeleteFunc: func(obj interface{}) {
switch t := obj.(type) {
case *corev1.Pod:
pod := t
if pod.Spec.NodeName == "" {
return
}
pgMgr.Unreserve(context.Background(), pod)
return
case cache.DeletedFinalStateUnknown:
pod, ok := t.Obj.(*corev1.Pod)
if !ok {
return
}
if pod.Spec.NodeName == "" {
return
}
pgMgr.Unreserve(context.Background(), pod)
return
default:
Expand Down
12 changes: 6 additions & 6 deletions pkg/coscheduling/core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers"
clientsetfake "k8s.io/client-go/kubernetes/fake"
clicache "k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -169,6 +170,7 @@ func TestPreFilter(t *testing.T) {
scheduleTimeout: &scheduleTimeout,
permittedPG: newCache(),
backedOffPG: newCache(),
assignedPodsByPG: make(map[string]sets.Set[string]),
}

informerFactory.Start(ctx.Done())
Expand Down Expand Up @@ -263,19 +265,17 @@ func TestPermit(t *testing.T) {
informerFactory := informers.NewSharedInformerFactory(cs, 0)
podInformer := informerFactory.Core().V1().Pods()

pgMgr := &PodGroupManager{
client: client,
snapshotSharedLister: tu.NewFakeSharedLister(tt.existingPods, nodes),
podLister: podInformer.Lister(),
scheduleTimeout: &scheduleTimeout,
}
pgMgr := NewPodGroupManager(client, tu.NewFakeSharedLister(tt.existingPods, nodes), &scheduleTimeout, podInformer)

informerFactory.Start(ctx.Done())
if !clicache.WaitForCacheSync(ctx.Done(), podInformer.Informer().HasSynced) {
t.Fatal("WaitForCacheSync failed")
}
addFunc := AddPodFactory(pgMgr)
for _, p := range tt.existingPods {
podInformer.Informer().GetStore().Add(p)
// we call add func here because we can not ensure existing pods are added before premit are called
addFunc(p)
}

if got := pgMgr.Permit(ctx, tt.pod); got != tt.want {
Expand Down
2 changes: 1 addition & 1 deletion pkg/coscheduling/coscheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (cs *Coscheduling) PostFilter(ctx context.Context, state *framework.CycleSt

// This indicates there are already enough Pods satisfying the PodGroup,
// so don't bother to reject the whole PodGroup.
assigned := cs.pgMgr.GetAssignedPodCount(pg.Name)
assigned := cs.pgMgr.GetAssignedPodCount(pgName)
if assigned >= int(pg.Spec.MinMember) {
klog.V(4).InfoS("Assigned pods", "podGroup", klog.KObj(pg), "assigned", assigned)
return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable)
Expand Down
19 changes: 11 additions & 8 deletions pkg/coscheduling/coscheduling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,24 +605,27 @@ func TestPostFilter(t *testing.T) {
cs := clientsetfake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(cs, 0)
podInformer := informerFactory.Core().V1().Pods()

pgMgr := core.NewPodGroupManager(
client,
tu.NewFakeSharedLister(tt.existingPods, nodes),
&scheduleTimeout,
podInformer,
)
pl := &Coscheduling{
frameworkHandler: f,
pgMgr: core.NewPodGroupManager(
client,
tu.NewFakeSharedLister(tt.existingPods, nodes),
&scheduleTimeout,
podInformer,
),
scheduleTimeout: &scheduleTimeout,
pgMgr: pgMgr,
scheduleTimeout: &scheduleTimeout,
}

informerFactory.Start(ctx.Done())
if !clicache.WaitForCacheSync(ctx.Done(), podInformer.Informer().HasSynced) {
t.Fatal("WaitForCacheSync failed")
}
addFunc := core.AddPodFactory(pgMgr)
for _, p := range tt.existingPods {
podInformer.Informer().GetStore().Add(p)
// we call add func here because we can not ensure existing pods are added before premit are called
addFunc(p)
}

_, got := pl.PostFilter(ctx, framework.NewCycleState(), tt.pod, nodeStatusMap)
Expand Down

0 comments on commit d7b2a45

Please sign in to comment.