diff --git a/installer/helm/chart/volcano/config/volcano-scheduler-ci.conf b/installer/helm/chart/volcano/config/volcano-scheduler-ci.conf index 31f8f1a687..796cb21a67 100644 --- a/installer/helm/chart/volcano/config/volcano-scheduler-ci.conf +++ b/installer/helm/chart/volcano/config/volcano-scheduler-ci.conf @@ -10,3 +10,4 @@ tiers: - name: proportion - name: nodeorder - name: binpack + - name: overcommit diff --git a/pkg/scheduler/actions/enqueue/enqueue.go b/pkg/scheduler/actions/enqueue/enqueue.go index be051b6bdb..d68b99003c 100644 --- a/pkg/scheduler/actions/enqueue/enqueue.go +++ b/pkg/scheduler/actions/enqueue/enqueue.go @@ -28,19 +28,6 @@ import ( "volcano.sh/volcano/pkg/scheduler/util" ) -const ( - // overCommitFactor is resource overCommit factor for enqueue action - // It determines the number of `pending` pods that the scheduler will tolerate - // when the resources of the cluster is insufficient - overCommitFactor = "overcommit-factor" -) - -var ( - // defaultOverCommitFactor defines the default overCommit resource factor for enqueue action - defaultOverCommitFactor = 1.2 - targetJob = util.Reservation.TargetJob -) - type Action struct{} func New() *Action { @@ -90,31 +77,11 @@ func (enqueue *Action) Execute(ssn *framework.Session) { klog.V(3).Infof("Try to enqueue PodGroup to %d Queues", len(jobsMap)) - total := api.EmptyResource() - used := api.EmptyResource() - lockedNodesIdle := api.EmptyResource() - if targetJob != nil && len(util.Reservation.LockedNodes) != 0 { - for _, node := range util.Reservation.LockedNodes { - lockedNodesIdle.Add(node.Idle) - klog.V(4).Infof("locked node: %s", node.Name) - } - } - for _, node := range ssn.Nodes { - total.Add(node.Allocatable) - used.Add(node.Used) - } - idle := total.Clone().Multi(enqueue.getOverCommitFactor(ssn)).Sub(used).Sub(lockedNodesIdle) - for { if queues.Empty() { break } - if idle.IsEmpty() { - klog.V(3).Infof("Node idle resource is overused, ignore it.") - break - } - queue := queues.Pop().(*api.QueueInfo) // Found "high" priority job @@ -123,24 +90,8 @@ func (enqueue *Action) Execute(ssn *framework.Session) { continue } job := jobs.Pop().(*api.JobInfo) - if targetJob != nil && job.UID == targetJob.UID { - klog.V(3).Infof("Target Job name: %s", targetJob.Name) - continue - } - - inqueue := false - - if job.PodGroup.Spec.MinResources == nil { - inqueue = true - } else { - minReq := api.NewResource(*job.PodGroup.Spec.MinResources) - if ssn.JobEnqueueable(job) && minReq.LessEqual(idle) { - idle.Sub(minReq) - inqueue = true - } - } - if inqueue { + if job.PodGroup.Spec.MinResources == nil || ssn.JobEnqueueable(job) { job.PodGroup.Status.Phase = scheduling.PodGroupInqueue ssn.Jobs[job.UID] = job } @@ -148,27 +99,6 @@ func (enqueue *Action) Execute(ssn *framework.Session) { // Added Queue back until no job in Queue. queues.Push(queue) } - // if target job exists, judge whether it can be inqueue or not - if targetJob != nil && targetJob.PodGroup.Status.Phase == scheduling.PodGroupPending && len(util.Reservation.LockedNodes) != 0 { - klog.V(4).Infof("Start to deal with Target Job") - minReq := api.NewResource(*targetJob.PodGroup.Spec.MinResources) - idle = idle.Add(lockedNodesIdle) - if ssn.JobEnqueueable(targetJob) && minReq.LessEqual(idle) { - klog.V(3).Infof("Turn Target Job phase to Inqueue") - targetJob.PodGroup.Status.Phase = scheduling.PodGroupInqueue - ssn.Jobs[targetJob.UID] = targetJob - } - } } func (enqueue *Action) UnInitialize() {} - -func (enqueue *Action) getOverCommitFactor(ssn *framework.Session) float64 { - factor := defaultOverCommitFactor - arg := framework.GetArgOfActionFromConf(ssn.Configurations, enqueue.Name()) - if arg != nil { - arg.GetFloat64(&factor, overCommitFactor) - } - - return factor -} diff --git a/pkg/scheduler/actions/enqueue/enqueue_test.go b/pkg/scheduler/actions/enqueue/enqueue_test.go deleted file mode 100644 index 5c3787e192..0000000000 --- a/pkg/scheduler/actions/enqueue/enqueue_test.go +++ /dev/null @@ -1,53 +0,0 @@ -package enqueue - -import ( - "testing" - - "volcano.sh/volcano/pkg/scheduler/conf" - "volcano.sh/volcano/pkg/scheduler/framework" -) - -func TestGetOverCommitFactor(t *testing.T) { - cases := []struct { - name string - ssn *framework.Session - expectedValue float64 - }{ - { - name: "arguments of action not exist", - ssn: &framework.Session{ - Configurations: []conf.Configuration{ - { - Name: "allocate", - Arguments: map[string]string{ - "placeholder": "placeholder", - }, - }, - }, - }, - expectedValue: 1.2, - }, - { - name: "arguments of action exist", - ssn: &framework.Session{ - Configurations: []conf.Configuration{ - { - Name: "enqueue", - Arguments: map[string]string{ - overCommitFactor: "2", - }, - }, - }, - }, - expectedValue: 2, - }, - } - - enqueue := New() - for index, c := range cases { - factor := enqueue.getOverCommitFactor(c.ssn) - if factor != c.expectedValue { - t.Errorf("index %d, case %s, expected %v, but got %v", index, c.name, c.expectedValue, factor) - } - } -} diff --git a/pkg/scheduler/conf/scheduler_conf.go b/pkg/scheduler/conf/scheduler_conf.go index 0ff80e6103..0b67433387 100644 --- a/pkg/scheduler/conf/scheduler_conf.go +++ b/pkg/scheduler/conf/scheduler_conf.go @@ -71,6 +71,8 @@ type PluginOption struct { EnabledTargetJob *bool `yaml:"enableTargetJob"` // EnabledReservedNodes defines whether reservedNodesFn is enabled EnabledReservedNodes *bool `yaml:"enableReservedNodes"` + // EnabledJobEnqueued defines whether jobEnqueuedFn is enabled + EnabledJobEnqueued *bool `yaml:"enableJobEnqueued"` // EnabledVictim defines whether victimsFn is enabled EnabledVictim *bool `yaml:"enabledVictim"` // EnabledJobStarving defines whether jobStarvingFn is enabled diff --git a/pkg/scheduler/framework/session_plugins.go b/pkg/scheduler/framework/session_plugins.go index a194da89e8..110272bf55 100644 --- a/pkg/scheduler/framework/session_plugins.go +++ b/pkg/scheduler/framework/session_plugins.go @@ -269,7 +269,7 @@ func (ssn *Session) JobPipelined(obj interface{}) bool { return false } } - // this tier registed function + // this tier registered function if hasFound { return true } @@ -297,7 +297,7 @@ func (ssn *Session) JobStarving(obj interface{}) bool { return false } } - // this tier registed function + // this tier registered function if hasFound { return true } @@ -328,18 +328,25 @@ func (ssn *Session) JobValid(obj interface{}) *api.ValidateResult { // JobEnqueueable invoke jobEnqueueableFns function of the plugins func (ssn *Session) JobEnqueueable(obj interface{}) bool { for _, tier := range ssn.Tiers { + var hasFound bool for _, plugin := range tier.Plugins { + if !isEnabled(plugin.EnabledJobEnqueued) { + continue + } fn, found := ssn.jobEnqueueableFns[plugin.Name] if !found { continue } - + hasFound = true if res := fn(obj); !res { return res } } + // this tier registered function + if hasFound { + return true + } } - return true } diff --git a/pkg/scheduler/plugins/defaults.go b/pkg/scheduler/plugins/defaults.go index 168f83aa3a..fdc8040546 100644 --- a/pkg/scheduler/plugins/defaults.go +++ b/pkg/scheduler/plugins/defaults.go @@ -34,6 +34,9 @@ func ApplyPluginConfDefaults(option *conf.PluginOption) { if option.EnabledJobPipelined == nil { option.EnabledJobPipelined = &t } + if option.EnabledJobEnqueued == nil { + option.EnabledJobEnqueued = &t + } if option.EnabledTaskOrder == nil { option.EnabledTaskOrder = &t } diff --git a/pkg/scheduler/plugins/factory.go b/pkg/scheduler/plugins/factory.go index 9ddeaf2d7e..8994140d9a 100644 --- a/pkg/scheduler/plugins/factory.go +++ b/pkg/scheduler/plugins/factory.go @@ -23,6 +23,7 @@ import ( "volcano.sh/volcano/pkg/scheduler/plugins/drf" "volcano.sh/volcano/pkg/scheduler/plugins/gang" "volcano.sh/volcano/pkg/scheduler/plugins/nodeorder" + "volcano.sh/volcano/pkg/scheduler/plugins/overcommit" "volcano.sh/volcano/pkg/scheduler/plugins/predicates" "volcano.sh/volcano/pkg/scheduler/plugins/priority" "volcano.sh/volcano/pkg/scheduler/plugins/proportion" @@ -41,6 +42,7 @@ func init() { framework.RegisterPluginBuilder(binpack.PluginName, binpack.New) framework.RegisterPluginBuilder(reservation.PluginName, reservation.New) framework.RegisterPluginBuilder(tdm.PluginName, tdm.New) + framework.RegisterPluginBuilder(overcommit.PluginName, overcommit.New) // Plugins for Queues framework.RegisterPluginBuilder(proportion.PluginName, proportion.New) diff --git a/pkg/scheduler/plugins/overcommit/overcommit.go b/pkg/scheduler/plugins/overcommit/overcommit.go new file mode 100644 index 0000000000..b2e3600058 --- /dev/null +++ b/pkg/scheduler/plugins/overcommit/overcommit.go @@ -0,0 +1,123 @@ +/* +Copyright 2021 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package overcommit + +import ( + "k8s.io/klog" + + "volcano.sh/volcano/pkg/apis/scheduling" + "volcano.sh/volcano/pkg/scheduler/api" + "volcano.sh/volcano/pkg/scheduler/framework" +) + +const ( + // PluginName is name of plugin + PluginName = "overcommit" + // overCommitFactor is resource overCommit factor for enqueue action + // It determines the number of `pending` pods that the scheduler will tolerate + // when the resources of the cluster is insufficient + overCommitFactor = "overcommit-factor" + // defaultOverCommitFactor defines the default overCommit resource factor for enqueue action + defaultOverCommitFactor = 1.2 +) + +type overcommitPlugin struct { + // Arguments given for the plugin + pluginArguments framework.Arguments + idleResource *api.Resource + inqueueResource *api.Resource + overCommitFactor float64 +} + +// New function returns overcommit plugin object +func New(arguments framework.Arguments) framework.Plugin { + return &overcommitPlugin{ + pluginArguments: arguments, + idleResource: api.EmptyResource(), + overCommitFactor: defaultOverCommitFactor, + } +} + +func (op *overcommitPlugin) Name() string { + return PluginName +} + +/* + User should give overcommit-factor through overcommit plugin arguments as format below: + + actions: "enqueue, allocate, backfill" + tiers: + - plugins: + - name: overcommit + arguments: + overcommit-factor: 1.0 +*/ +func (op *overcommitPlugin) OnSessionOpen(ssn *framework.Session) { + op.pluginArguments.GetFloat64(&op.overCommitFactor, overCommitFactor) + if op.overCommitFactor < 1.0 { + klog.Warningf("invalid input %f for overcommit-factor, reason: overcommit-factor cannot be less than 1,"+ + " using default value: %f", op.overCommitFactor, defaultOverCommitFactor) + op.overCommitFactor = defaultOverCommitFactor + } + klog.V(4).Infof("Enter overcommit plugin ...") + defer klog.V(4).Infof("Leaving overcommit plugin.") + + // calculate idle resources of total cluster, overcommit resources included + total := api.EmptyResource() + used := api.EmptyResource() + for _, node := range ssn.Nodes { + total.Add(node.Allocatable) + used.Add(node.Used) + } + op.idleResource = total.Clone().Multi(op.overCommitFactor).Sub(used) + + // calculate inqueue job resources + inqueue := api.EmptyResource() + for _, job := range ssn.Jobs { + if job.PodGroup.Status.Phase == scheduling.PodGroupInqueue { + inqueue.Add(api.NewResource(*job.PodGroup.Spec.MinResources)) + } + } + op.inqueueResource = inqueue.Clone() + + ssn.AddJobEnqueueableFn(op.Name(), func(obj interface{}) bool { + job := obj.(*api.JobInfo) + idle := op.idleResource + inqueue := api.EmptyResource() + inqueue.Add(op.inqueueResource) + if job.PodGroup.Spec.MinResources == nil { + klog.V(4).Infof("job <%s/%s> is bestEffort, allow it be inqueue", job.Namespace, job.Name) + return true + } + + //TODO: if allow 1 more job to be inqueue beyond overcommit-factor, large job may be inqueue and create pods + jobMinReq := api.NewResource(*job.PodGroup.Spec.MinResources) + if inqueue.Add(jobMinReq).LessEqual(idle) { + klog.V(4).Infof("sufficient resources, allow job <%s/%s> be inqueue", job.Namespace, job.Name) + op.inqueueResource.Add(jobMinReq) + return true + } + klog.V(4).Infof("resource in cluster is overused, not allow job <%s/%s> be inqueue", + job.Namespace, job.Name) + return false + }) +} + +func (op *overcommitPlugin) OnSessionClose(ssn *framework.Session) { + op.idleResource = nil + op.inqueueResource = nil +} diff --git a/pkg/scheduler/util_test.go b/pkg/scheduler/util_test.go index 79ea5af58c..dd08624801 100644 --- a/pkg/scheduler/util_test.go +++ b/pkg/scheduler/util_test.go @@ -27,10 +27,6 @@ import ( func TestLoadSchedulerConf(t *testing.T) { configuration := ` actions: "enqueue, allocate, backfill" -configurations: -- name: enqueue - arguments: - "overcommit-factor": 1.5 tiers: - plugins: - name: priority @@ -62,6 +58,7 @@ tiers: EnabledNodeOrder: &trueValue, EnabledTargetJob: &trueValue, EnabledReservedNodes: &trueValue, + EnabledJobEnqueued: &trueValue, EnabledVictim: &trueValue, EnabledJobStarving: &trueValue, }, @@ -80,6 +77,7 @@ tiers: EnabledNodeOrder: &trueValue, EnabledTargetJob: &trueValue, EnabledReservedNodes: &trueValue, + EnabledJobEnqueued: &trueValue, EnabledVictim: &trueValue, EnabledJobStarving: &trueValue, }, @@ -98,6 +96,7 @@ tiers: EnabledNodeOrder: &trueValue, EnabledTargetJob: &trueValue, EnabledReservedNodes: &trueValue, + EnabledJobEnqueued: &trueValue, EnabledVictim: &trueValue, EnabledJobStarving: &trueValue, }, @@ -120,6 +119,7 @@ tiers: EnabledNodeOrder: &trueValue, EnabledTargetJob: &trueValue, EnabledReservedNodes: &trueValue, + EnabledJobEnqueued: &trueValue, EnabledVictim: &trueValue, EnabledJobStarving: &trueValue, }, @@ -138,6 +138,7 @@ tiers: EnabledNodeOrder: &trueValue, EnabledTargetJob: &trueValue, EnabledReservedNodes: &trueValue, + EnabledJobEnqueued: &trueValue, EnabledVictim: &trueValue, EnabledJobStarving: &trueValue, }, @@ -156,6 +157,7 @@ tiers: EnabledNodeOrder: &trueValue, EnabledTargetJob: &trueValue, EnabledReservedNodes: &trueValue, + EnabledJobEnqueued: &trueValue, EnabledVictim: &trueValue, EnabledJobStarving: &trueValue, }, @@ -174,6 +176,7 @@ tiers: EnabledNodeOrder: &trueValue, EnabledTargetJob: &trueValue, EnabledReservedNodes: &trueValue, + EnabledJobEnqueued: &trueValue, EnabledVictim: &trueValue, EnabledJobStarving: &trueValue, }, @@ -181,14 +184,7 @@ tiers: }, } - expectedConfigurations := []conf.Configuration{ - { - Name: "enqueue", - Arguments: map[string]string{ - "overcommit-factor": "1.5", - }, - }, - } + var expectedConfigurations []conf.Configuration _, tiers, configurations, err := unmarshalSchedulerConf(configuration) if err != nil {