Skip to content

Commit

Permalink
Merge branch 'master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
asifdxtreme authored Jul 11, 2019
2 parents 8459b4c + c98a6eb commit acbef03
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 31 deletions.
2 changes: 1 addition & 1 deletion pkg/cli/job/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func InitRunFlags(cmd *cobra.Command) {
cmd.Flags().IntVarP(&launchJobFlags.Replicas, "replicas", "r", 1, "the total tasks of job")
cmd.Flags().StringVarP(&launchJobFlags.Requests, "requests", "R", "cpu=1000m,memory=100Mi", "the resource request of the task")
cmd.Flags().StringVarP(&launchJobFlags.Limits, "limits", "L", "cpu=1000m,memory=100Mi", "the resource limit of the task")
cmd.Flags().StringVarP(&launchJobFlags.SchedulerName, "scheduler", "S", "volcano-scheduler", "the scheduler for this job")
cmd.Flags().StringVarP(&launchJobFlags.SchedulerName, "scheduler", "S", "volcano", "the scheduler for this job")
cmd.Flags().StringVarP(&launchJobFlags.FileName, "filename", "f", "", "the yaml file of job")
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/cli/queue/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type commonFlags struct {
}

func initFlags(cmd *cobra.Command, cf *commonFlags) {
cmd.Flags().StringVarP(&cf.SchedulerName, "scheduler", "", "kube-batch", "the scheduler for this job")
cmd.Flags().StringVarP(&cf.SchedulerName, "scheduler", "", "volcano", "the scheduler for this job")
cmd.Flags().StringVarP(&cf.Master, "master", "s", "", "the address of apiserver")

if home := homeDir(); home != "" {
Expand Down
12 changes: 0 additions & 12 deletions pkg/controllers/job/job_controller_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,11 @@ func (cc *Controller) addPod(obj interface{}) {

jobName, found := pod.Annotations[vkbatchv1.JobNameKey]
if !found {
glog.Infof("Failed to find jobName of Pod <%s/%s>, skipping",
pod.Namespace, pod.Name)
return
}

version, found := pod.Annotations[vkbatchv1.JobVersion]
if !found {
glog.Infof("Failed to find jobVersion of Pod <%s/%s>, skipping",
pod.Namespace, pod.Name)
return
}

Expand Down Expand Up @@ -188,8 +184,6 @@ func (cc *Controller) updatePod(oldObj, newObj interface{}) {

taskName, found := newPod.Annotations[vkbatchv1.TaskSpecKey]
if !found {
glog.Infof("Failed to find taskName of Pod <%s/%s>, skipping",
newPod.Namespace, newPod.Name)
return
}

Expand All @@ -202,8 +196,6 @@ func (cc *Controller) updatePod(oldObj, newObj interface{}) {

version, found := newPod.Annotations[vkbatchv1.JobVersion]
if !found {
glog.Infof("Failed to find jobVersion of Pod <%s/%s>, skipping",
newPod.Namespace, newPod.Name)
return
}

Expand Down Expand Up @@ -283,15 +275,11 @@ func (cc *Controller) deletePod(obj interface{}) {

jobName, found := pod.Annotations[vkbatchv1.JobNameKey]
if !found {
glog.Infof("Failed to find jobName of Pod <%s/%s>, skipping",
pod.Namespace, pod.Name)
return
}

version, found := pod.Annotations[vkbatchv1.JobVersion]
if !found {
glog.Infof("Failed to find jobVersion of Pod <%s/%s>, skipping",
pod.Namespace, pod.Name)
return
}

Expand Down
39 changes: 22 additions & 17 deletions pkg/scheduler/plugins/drf/drf.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ import (
"volcano.sh/volcano/pkg/scheduler/framework"
)

// PluginName indicates name of volcano scheduler plugin.
const PluginName = "drf"

var shareDelta = 0.000001

type drfAttr struct {
Expand All @@ -38,7 +41,7 @@ type drfPlugin struct {
totalResource *api.Resource

// Key is Job ID
jobOpts map[api.JobID]*drfAttr
jobAttrs map[api.JobID]*drfAttr

// Arguments given for the plugin
pluginArguments framework.Arguments
Expand All @@ -48,13 +51,13 @@ type drfPlugin struct {
func New(arguments framework.Arguments) framework.Plugin {
return &drfPlugin{
totalResource: api.EmptyResource(),
jobOpts: map[api.JobID]*drfAttr{},
jobAttrs: map[api.JobID]*drfAttr{},
pluginArguments: arguments,
}
}

func (drf *drfPlugin) Name() string {
return "drf"
return PluginName
}

func (drf *drfPlugin) OnSessionOpen(ssn *framework.Session) {
Expand All @@ -79,25 +82,25 @@ func (drf *drfPlugin) OnSessionOpen(ssn *framework.Session) {
// Calculate the init share of Job
drf.updateShare(attr)

drf.jobOpts[job.UID] = attr
drf.jobAttrs[job.UID] = attr
}

preemptableFn := func(preemptor *api.TaskInfo, preemptees []*api.TaskInfo) []*api.TaskInfo {
var victims []*api.TaskInfo

latt := drf.jobOpts[preemptor.Job]
latt := drf.jobAttrs[preemptor.Job]
lalloc := latt.allocated.Clone().Add(preemptor.Resreq)
ls := drf.calculateShare(lalloc, drf.totalResource)
_, ls := drf.calculateShare(lalloc, drf.totalResource)

allocations := map[api.JobID]*api.Resource{}

for _, preemptee := range preemptees {
if _, found := allocations[preemptee.Job]; !found {
ratt := drf.jobOpts[preemptee.Job]
ratt := drf.jobAttrs[preemptee.Job]
allocations[preemptee.Job] = ratt.allocated.Clone()
}
ralloc := allocations[preemptee.Job].Sub(preemptee.Resreq)
rs := drf.calculateShare(ralloc, drf.totalResource)
_, rs := drf.calculateShare(ralloc, drf.totalResource)

if ls < rs || math.Abs(ls-rs) <= shareDelta {
victims = append(victims, preemptee)
Expand All @@ -116,13 +119,13 @@ func (drf *drfPlugin) OnSessionOpen(ssn *framework.Session) {
rv := r.(*api.JobInfo)

glog.V(4).Infof("DRF JobOrderFn: <%v/%v> share state: %v, <%v/%v> share state: %v",
lv.Namespace, lv.Name, drf.jobOpts[lv.UID].share, rv.Namespace, rv.Name, drf.jobOpts[rv.UID].share)
lv.Namespace, lv.Name, drf.jobAttrs[lv.UID].share, rv.Namespace, rv.Name, drf.jobAttrs[rv.UID].share)

if drf.jobOpts[lv.UID].share == drf.jobOpts[rv.UID].share {
if drf.jobAttrs[lv.UID].share == drf.jobAttrs[rv.UID].share {
return 0
}

if drf.jobOpts[lv.UID].share < drf.jobOpts[rv.UID].share {
if drf.jobAttrs[lv.UID].share < drf.jobAttrs[rv.UID].share {
return -1
}

Expand All @@ -134,7 +137,7 @@ func (drf *drfPlugin) OnSessionOpen(ssn *framework.Session) {
// Register event handlers.
ssn.AddEventHandler(&framework.EventHandler{
AllocateFunc: func(event *framework.Event) {
attr := drf.jobOpts[event.Task.Job]
attr := drf.jobAttrs[event.Task.Job]
attr.allocated.Add(event.Task.Resreq)

drf.updateShare(attr)
Expand All @@ -143,7 +146,7 @@ func (drf *drfPlugin) OnSessionOpen(ssn *framework.Session) {
event.Task.Namespace, event.Task.Name, event.Task.Resreq, attr.share)
},
DeallocateFunc: func(event *framework.Event) {
attr := drf.jobOpts[event.Task.Job]
attr := drf.jobAttrs[event.Task.Job]
attr.allocated.Sub(event.Task.Resreq)

drf.updateShare(attr)
Expand All @@ -155,23 +158,25 @@ func (drf *drfPlugin) OnSessionOpen(ssn *framework.Session) {
}

func (drf *drfPlugin) updateShare(attr *drfAttr) {
attr.share = drf.calculateShare(attr.allocated, drf.totalResource)
attr.dominantResource, attr.share = drf.calculateShare(attr.allocated, drf.totalResource)
}

func (drf *drfPlugin) calculateShare(allocated, totalResource *api.Resource) float64 {
func (drf *drfPlugin) calculateShare(allocated, totalResource *api.Resource) (string, float64) {
res := float64(0)
dominantResource := ""
for _, rn := range totalResource.ResourceNames() {
share := helpers.Share(allocated.Get(rn), totalResource.Get(rn))
if share > res {
res = share
dominantResource = string(rn)
}
}

return res
return dominantResource, res
}

func (drf *drfPlugin) OnSessionClose(session *framework.Session) {
// Clean schedule data.
drf.totalResource = api.EmptyResource()
drf.jobOpts = map[api.JobID]*drfAttr{}
drf.jobAttrs = map[api.JobID]*drfAttr{}
}

0 comments on commit acbef03

Please sign in to comment.