Skip to content

Commit

Permalink
add podGroup backoff time for coscheduling
Browse files Browse the repository at this point in the history
  • Loading branch information
KunWuLuan committed Jul 12, 2023
1 parent 5af1783 commit dbad1a1
Show file tree
Hide file tree
Showing 16 changed files with 357 additions and 6 deletions.
14 changes: 13 additions & 1 deletion apis/config/scheme/scheme_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/apis/config/testing/defaults"

"sigs.k8s.io/scheduler-plugins/apis/config"
"sigs.k8s.io/scheduler-plugins/apis/config/v1"
v1 "sigs.k8s.io/scheduler-plugins/apis/config/v1"
"sigs.k8s.io/scheduler-plugins/apis/config/v1beta2"
"sigs.k8s.io/scheduler-plugins/apis/config/v1beta3"
"sigs.k8s.io/scheduler-plugins/pkg/coscheduling"
Expand Down Expand Up @@ -66,6 +66,7 @@ profiles:
- name: Coscheduling
args:
permitWaitingTimeSeconds: 10
podGroupBackoffSeconds: 0
deniedPGExpirationTimeSeconds: 3
- name: NodeResourcesAllocatable
args:
Expand Down Expand Up @@ -472,6 +473,7 @@ kind: KubeSchedulerConfiguration
profiles:
- schedulerName: scheduler-plugins
pluginConfig:
- name: Coscheduling # Test argument defaulting logic
- name: TopologicalSort
args:
namespaces:
Expand All @@ -488,6 +490,12 @@ profiles:
SchedulerName: "scheduler-plugins",
Plugins: defaults.PluginsV1,
PluginConfig: []schedconfig.PluginConfig{
{
Name: coscheduling.Name,
Args: &config.CoschedulingArgs{
PermitWaitingTimeSeconds: 60,
},
},
{
Name: topologicalsort.Name,
Args: &config.TopologicalSortArgs{
Expand Down Expand Up @@ -728,6 +736,7 @@ profiles:
apiVersion: kubescheduler.config.k8s.io/v1beta2
kind: CoschedulingArgs
permitWaitingTimeSeconds: 10
podGroupBackoffSeconds: 0
name: Coscheduling
- args:
apiVersion: kubescheduler.config.k8s.io/v1beta2
Expand Down Expand Up @@ -781,6 +790,7 @@ profiles:
Name: coscheduling.Name,
Args: &config.CoschedulingArgs{
PermitWaitingTimeSeconds: 10,
PodGroupBackoffSeconds: 20,
},
},
{
Expand Down Expand Up @@ -868,6 +878,7 @@ profiles:
apiVersion: kubescheduler.config.k8s.io/v1beta3
kind: CoschedulingArgs
permitWaitingTimeSeconds: 10
podGroupBackoffSeconds: 20
name: Coscheduling
- args:
apiVersion: kubescheduler.config.k8s.io/v1beta3
Expand Down Expand Up @@ -1022,6 +1033,7 @@ profiles:
apiVersion: kubescheduler.config.k8s.io/v1
kind: CoschedulingArgs
permitWaitingTimeSeconds: 10
podGroupBackoffSeconds: 0
name: Coscheduling
- args:
apiVersion: kubescheduler.config.k8s.io/v1
Expand Down
2 changes: 2 additions & 0 deletions apis/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type CoschedulingArgs struct {

// PermitWaitingTimeSeconds is the waiting timeout in seconds.
PermitWaitingTimeSeconds int64
// PodGroupBackoffSeconds is the backoff time in seconds before a pod group can be scheduled again.
PodGroupBackoffSeconds int64
}

// ModeType is a "string" type.
Expand Down
4 changes: 4 additions & 0 deletions apis/config/v1/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

var (
defaultPermitWaitingTimeSeconds int64 = 60
defaultPodGroupBackoffSeconds int64 = 0

defaultNodeResourcesAllocatableMode = Least

Expand Down Expand Up @@ -83,6 +84,9 @@ func SetDefaults_CoschedulingArgs(obj *CoschedulingArgs) {
if obj.PermitWaitingTimeSeconds == nil {
obj.PermitWaitingTimeSeconds = &defaultPermitWaitingTimeSeconds
}
if obj.PodGroupBackoffSeconds == nil {
obj.PodGroupBackoffSeconds = &defaultPodGroupBackoffSeconds
}
}

// SetDefaults_NodeResourcesAllocatableArgs sets the defaults parameters for NodeResourceAllocatable.
Expand Down
3 changes: 3 additions & 0 deletions apis/config/v1/defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,18 @@ func TestSchedulingDefaults(t *testing.T) {
config: &CoschedulingArgs{},
expect: &CoschedulingArgs{
PermitWaitingTimeSeconds: pointer.Int64Ptr(60),
PodGroupBackoffSeconds: pointer.Int64Ptr(0),
},
},
{
name: "set non default CoschedulingArgs",
config: &CoschedulingArgs{
PermitWaitingTimeSeconds: pointer.Int64Ptr(60),
PodGroupBackoffSeconds: pointer.Int64Ptr(20),
},
expect: &CoschedulingArgs{
PermitWaitingTimeSeconds: pointer.Int64Ptr(60),
PodGroupBackoffSeconds: pointer.Int64Ptr(20),
},
},
{
Expand Down
2 changes: 2 additions & 0 deletions apis/config/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type CoschedulingArgs struct {

// PermitWaitingTimeSeconds is the waiting timeout in seconds.
PermitWaitingTimeSeconds *int64 `json:"permitWaitingTimeSeconds,omitempty"`
// PodGroupBackoffSeconds is the backoff time in seconds before a pod group can be scheduled again.
PodGroupBackoffSeconds *int64 `json:"podGroupBackoffSeconds,omitempty"`
}

// ModeType is a type "string".
Expand Down
4 changes: 4 additions & 0 deletions apis/config/v1beta2/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

var (
defaultPermitWaitingTimeSeconds int64 = 60
defaultPodGroupBackoffSeconds int64 = 0
defaultDeniedPGExpirationTimeSeconds int64 = 20

defaultNodeResourcesAllocatableMode = Least
Expand Down Expand Up @@ -81,6 +82,9 @@ func SetDefaults_CoschedulingArgs(obj *CoschedulingArgs) {
if obj.DeniedPGExpirationTimeSeconds == nil {
obj.DeniedPGExpirationTimeSeconds = &defaultDeniedPGExpirationTimeSeconds
}
if obj.PodGroupBackoffSeconds == nil {
obj.PodGroupBackoffSeconds = &defaultPodGroupBackoffSeconds
}
}

// SetDefaults_NodeResourcesAllocatableArgs sets the defaults parameters for NodeResourceAllocatable.
Expand Down
3 changes: 3 additions & 0 deletions apis/config/v1beta2/defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,20 @@ func TestSchedulingDefaults(t *testing.T) {
expect: &CoschedulingArgs{
PermitWaitingTimeSeconds: pointer.Int64Ptr(60),
DeniedPGExpirationTimeSeconds: pointer.Int64Ptr(20),
PodGroupBackoffSeconds: pointer.Int64Ptr(0),
},
},
{
name: "set non default CoschedulingArgs",
config: &CoschedulingArgs{
PermitWaitingTimeSeconds: pointer.Int64Ptr(60),
DeniedPGExpirationTimeSeconds: pointer.Int64Ptr(10),
PodGroupBackoffSeconds: pointer.Int64Ptr(20),
},
expect: &CoschedulingArgs{
PermitWaitingTimeSeconds: pointer.Int64Ptr(60),
DeniedPGExpirationTimeSeconds: pointer.Int64Ptr(10),
PodGroupBackoffSeconds: pointer.Int64Ptr(20),
},
},
{
Expand Down
2 changes: 2 additions & 0 deletions apis/config/v1beta2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type CoschedulingArgs struct {

// PermitWaitingTimeSeconds is the waiting timeout in seconds.
PermitWaitingTimeSeconds *int64 `json:"permitWaitingTimeSeconds,omitempty"`
// PodGroupBackoffSeconds is the backoff time in seconds before a pod group can be scheduled again.
PodGroupBackoffSeconds *int64 `json:"podGroupBackoffSeconds,omitempty"`
// DeniedPGExpirationTimeSeconds is the expiration time of the denied podgroup store.
DeniedPGExpirationTimeSeconds *int64 `json:"deniedPGExpirationTimeSeconds,omitempty"`
}
Expand Down
12 changes: 8 additions & 4 deletions apis/config/v1beta3/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,20 @@ limitations under the License.
package v1beta3

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"strconv"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
schedulerconfigv1beta3 "k8s.io/kube-scheduler/config/v1beta3"
k8sschedulerconfigv1beta3 "k8s.io/kubernetes/pkg/scheduler/apis/config/v1beta3"
)

var (
defaultPermitWaitingTimeSeconds int64 = 60

defaultNodeResourcesAllocatableMode = Least
defaultPermitWaitingTimeSeconds int64 = 60
defaultPodGroupBackoffSeconds int64 = 0
defaultNodeResourcesAllocatableMode = Least

// defaultResourcesToWeightMap is used to set the default resourceToWeight map for CPU and memory
// used by the NodeResourcesAllocatable scoring plugin.
Expand Down Expand Up @@ -83,6 +84,9 @@ func SetDefaults_CoschedulingArgs(obj *CoschedulingArgs) {
if obj.PermitWaitingTimeSeconds == nil {
obj.PermitWaitingTimeSeconds = &defaultPermitWaitingTimeSeconds
}
if obj.PodGroupBackoffSeconds == nil {
obj.PodGroupBackoffSeconds = &defaultPodGroupBackoffSeconds
}
}

// SetDefaults_NodeResourcesAllocatableArgs sets the defaults parameters for NodeResourceAllocatable.
Expand Down
3 changes: 3 additions & 0 deletions apis/config/v1beta3/defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,18 @@ func TestSchedulingDefaults(t *testing.T) {
config: &CoschedulingArgs{},
expect: &CoschedulingArgs{
PermitWaitingTimeSeconds: pointer.Int64Ptr(60),
PodGroupBackoffSeconds: pointer.Int64Ptr(0),
},
},
{
name: "set non default CoschedulingArgs",
config: &CoschedulingArgs{
PermitWaitingTimeSeconds: pointer.Int64Ptr(60),
PodGroupBackoffSeconds: pointer.Int64Ptr(20),
},
expect: &CoschedulingArgs{
PermitWaitingTimeSeconds: pointer.Int64Ptr(60),
PodGroupBackoffSeconds: pointer.Int64Ptr(20),
},
},
{
Expand Down
2 changes: 2 additions & 0 deletions apis/config/v1beta3/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type CoschedulingArgs struct {

// PermitWaitingTimeSeconds is the waiting timeout in seconds.
PermitWaitingTimeSeconds *int64 `json:"permitWaitingTimeSeconds,omitempty"`
// PodGroupBackoffSeconds is the backoff time in seconds before a pod group can be scheduled again.
PodGroupBackoffSeconds *int64 `json:"podGroupBackoffSeconds,omitempty"`
}

// ModeType is a type "string".
Expand Down
15 changes: 15 additions & 0 deletions pkg/coscheduling/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type Manager interface {
DeletePermittedPodGroup(string)
CalculateAssignedPods(string, string) int
ActivateSiblings(pod *corev1.Pod, state *framework.CycleState)
BackoffPodGroup(string, time.Duration)
}

// PodGroupManager defines the scheduling operation called
Expand All @@ -74,6 +75,8 @@ type PodGroupManager struct {
scheduleTimeout *time.Duration
// permittedPG stores the podgroup name which has passed the pre resource check.
permittedPG *gochache.Cache
// backedOffPG stores the podgorup name which failed scheudling recently.
backedOffPG *gochache.Cache
// pgLister is podgroup lister
pgLister pglister.PodGroupLister
// podLister is pod lister
Expand All @@ -91,10 +94,18 @@ func NewPodGroupManager(pgClient pgclientset.Interface, snapshotSharedLister fra
pgLister: pgInformer.Lister(),
podLister: podInformer.Lister(),
permittedPG: gochache.New(3*time.Second, 3*time.Second),
backedOffPG: gochache.New(10*time.Second, 10*time.Second),
}
return pgMgr
}

func (pgMgr *PodGroupManager) BackoffPodGroup(pgName string, backoff time.Duration) {
if backoff == time.Duration(0) {
return
}
pgMgr.backedOffPG.Add(pgName, nil, backoff)
}

// ActivateSiblings stashes the pods belonging to the same PodGroup of the given pod
// in the given state, with a reserved key "kubernetes.io/pods-to-activate".
func (pgMgr *PodGroupManager) ActivateSiblings(pod *corev1.Pod, state *framework.CycleState) {
Expand Down Expand Up @@ -143,6 +154,10 @@ func (pgMgr *PodGroupManager) PreFilter(ctx context.Context, pod *corev1.Pod) er
return nil
}

if _, exist := pgMgr.backedOffPG.Get(pgFullName); exist {
return fmt.Errorf("podGroup %v failed recently", pgFullName)
}

pods, err := pgMgr.podLister.Pods(pod.Namespace).List(
labels.SelectorFromSet(labels.Set{v1alpha1.PodGroupLabel: util.GetPodGroupLabel(pod)}),
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/coscheduling/core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func TestPreFilter(t *testing.T) {
existingPods, allNodes := testutil.MakeNodesAndPods(map[string]string{"test": "a"}, 60, 30)
snapshot := testutil.NewFakeSharedLister(existingPods, allNodes)
pgMgr := &PodGroupManager{pgLister: pgLister, permittedPG: newCache(),
snapshotSharedLister: snapshot, podLister: podInformer.Lister(), scheduleTimeout: &scheduleTimeout}
snapshotSharedLister: snapshot, podLister: podInformer.Lister(), scheduleTimeout: &scheduleTimeout, backedOffPG: gochache.New(10*time.Second, 10*time.Second)}
informerFactory.Start(ctx.Done())
if !clicache.WaitForCacheSync(ctx.Done(), podInformer.Informer().HasSynced) {
t.Fatal("WaitForCacheSync failed")
Expand Down
21 changes: 21 additions & 0 deletions pkg/coscheduling/coscheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/cache"
corev1helpers "k8s.io/component-helpers/scheduling/corev1"
Expand All @@ -30,6 +31,7 @@ import (

"sigs.k8s.io/scheduler-plugins/apis/config"
"sigs.k8s.io/scheduler-plugins/apis/scheduling"
"sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1"
"sigs.k8s.io/scheduler-plugins/pkg/coscheduling/core"
pgclientset "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned"
pgformers "sigs.k8s.io/scheduler-plugins/pkg/generated/informers/externalversions"
Expand All @@ -41,6 +43,7 @@ type Coscheduling struct {
frameworkHandler framework.Handle
pgMgr core.Manager
scheduleTimeout *time.Duration
pgBackoff *time.Duration
}

var _ framework.QueueSortPlugin = &Coscheduling{}
Expand Down Expand Up @@ -78,6 +81,14 @@ func New(obj runtime.Object, handle framework.Handle) (framework.Plugin, error)
pgMgr: pgMgr,
scheduleTimeout: &scheduleTimeDuration,
}
if args.PodGroupBackoffSeconds < 0 {
err := fmt.Errorf("Parse Arguments Failed")
klog.ErrorS(err, "PodGroupBackoffSeconds cannot be negative")
return nil, err
} else if args.PodGroupBackoffSeconds > 0 {
pgBackoff := time.Duration(args.PodGroupBackoffSeconds) * time.Second
plugin.pgBackoff = &pgBackoff
}
pgInformerFactory.Start(ctx.Done())
if !cache.WaitForCacheSync(ctx.Done(), pgInformer.Informer().HasSynced) {
err := fmt.Errorf("WaitForCacheSync failed")
Expand Down Expand Up @@ -166,6 +177,16 @@ func (cs *Coscheduling) PostFilter(ctx context.Context, state *framework.CycleSt
waitingPod.Reject(cs.Name(), "optimistic rejection in PostFilter")
}
})

if cs.pgBackoff != nil {
pods, err := cs.frameworkHandler.SharedInformerFactory().Core().V1().Pods().Lister().Pods(pod.Namespace).List(
labels.SelectorFromSet(labels.Set{v1alpha1.PodGroupLabel: util.GetPodGroupLabel(pod)}),
)
if err == nil && len(pods) >= int(pg.Spec.MinMember) {
cs.pgMgr.BackoffPodGroup(pgName, *cs.pgBackoff)
}
}

cs.pgMgr.DeletePermittedPodGroup(pgName)
return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable,
fmt.Sprintf("PodGroup %v gets rejected due to Pod %v is unschedulable even after PostFilter", pgName, pod.Name))
Expand Down
Loading

0 comments on commit dbad1a1

Please sign in to comment.