Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add hierarchical queues for capacity plugin #3743

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion installer/helm/chart/volcano/templates/scheduler.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ rules:
verbs: ["get", "list", "watch"]
- apiGroups: ["scheduling.incubator.k8s.io", "scheduling.volcano.sh"]
resources: ["queues"]
verbs: ["get", "list", "watch", "create", "delete"]
verbs: ["get", "list", "watch", "create", "delete", "update"]
- apiGroups: ["scheduling.incubator.k8s.io", "scheduling.volcano.sh"]
resources: ["queues/status"]
verbs: ["update"]
Expand Down
2 changes: 1 addition & 1 deletion installer/volcano-development.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4445,7 +4445,7 @@ rules:
verbs: ["get", "list", "watch"]
- apiGroups: ["scheduling.incubator.k8s.io", "scheduling.volcano.sh"]
resources: ["queues"]
verbs: ["get", "list", "watch", "create", "delete"]
verbs: ["get", "list", "watch", "create", "delete", "update"]
- apiGroups: ["scheduling.incubator.k8s.io", "scheduling.volcano.sh"]
resources: ["queues/status"]
verbs: ["update"]
Expand Down
71 changes: 71 additions & 0 deletions pkg/controllers/queue/queue_controller_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,19 @@ package queue
import (
"context"
"fmt"
"strings"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"

"volcano.sh/apis/pkg/apis/bus/v1alpha1"
busv1alpha1 "volcano.sh/apis/pkg/apis/bus/v1alpha1"
schedulingv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
"volcano.sh/volcano/pkg/controllers/apis"
"volcano.sh/volcano/pkg/controllers/queue/state"
)

Expand Down Expand Up @@ -92,6 +96,13 @@ func (c *queuecontroller) syncQueue(queue *schedulingv1beta1.Queue, updateStateF
func (c *queuecontroller) openQueue(queue *schedulingv1beta1.Queue, updateStateFn state.UpdateQueueStatusFn) error {
klog.V(4).Infof("Begin to open queue %s.", queue.Name)

if queue.Status.State != schedulingv1beta1.QueueStateOpen {
continued, err := c.openHierarchicalQueue(queue)
if !continued {
return err
}
}

newQueue := queue.DeepCopy()
newQueue.Status.State = schedulingv1beta1.QueueStateOpen

Expand Down Expand Up @@ -134,6 +145,13 @@ func (c *queuecontroller) openQueue(queue *schedulingv1beta1.Queue, updateStateF
func (c *queuecontroller) closeQueue(queue *schedulingv1beta1.Queue, updateStateFn state.UpdateQueueStatusFn) error {
klog.V(4).Infof("Begin to close queue %s.", queue.Name)

if queue.Status.State != schedulingv1beta1.QueueStateClosed && queue.Status.State != schedulingv1beta1.QueueStateClosing {
continued, err := c.closeHierarchicalQueue(queue)
if !continued {
return err
}
}

newQueue := queue.DeepCopy()
newQueue.Status.State = schedulingv1beta1.QueueStateClosed

Expand Down Expand Up @@ -173,3 +191,56 @@ func (c *queuecontroller) closeQueue(queue *schedulingv1beta1.Queue, updateState

return nil
}

func (c *queuecontroller) openHierarchicalQueue(queue *schedulingv1beta1.Queue) (bool, error) {
Rui-Gan marked this conversation as resolved.
Show resolved Hide resolved
if queue.Spec.Parent == "" || queue.Spec.Parent == "root" {
return true, nil
}

parentQueue, err := c.queueLister.Get(queue.Spec.Parent)
if err != nil {
return false, fmt.Errorf("Failed to get parent queue <%s> of queue <%s>: %v", queue.Spec.Parent, queue.Name, err)
}

if parentQueue.Status.State == schedulingv1beta1.QueueStateClosing || parentQueue.Status.State == schedulingv1beta1.QueueStateClosed {
klog.Errorf("Failed to open queue %s because its parent queue %s is closing or closed. Open the parent queue first.", queue.Name, queue.Spec.Parent)
return false, nil
}

return true, nil
}

func (c *queuecontroller) closeHierarchicalQueue(queue *schedulingv1beta1.Queue) (bool, error) {
if queue.Name == "root" {
klog.Errorf("Root queue cannot be closed")
return false, nil
}

queueList, err := c.queueLister.List(labels.Everything())
if err != nil {
return false, err
}

openChildQueue := make([]string, 0)
for _, childQueue := range queueList {
if childQueue.Spec.Parent != queue.Name {
continue
}
if childQueue.Status.State != schedulingv1beta1.QueueStateClosed && childQueue.Status.State != schedulingv1beta1.QueueStateClosing {
req := &apis.Request{
QueueName: childQueue.Name,
Action: busv1alpha1.CloseQueueAction,
}

c.enqueue(req)
openChildQueue = append(openChildQueue, childQueue.Name)
klog.V(3).Infof("Closing child queue <%s> because its parent queue %s is closing or closed.", childQueue.Name, queue.Name)
}
}

if len(openChildQueue) > 0 {
return false, fmt.Errorf("failed to close queue %s because its child queues %v are still open", queue.Name, strings.Join(openChildQueue, ","))
}

return true, nil
}
2 changes: 1 addition & 1 deletion pkg/scheduler/actions/preempt/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ func (pmpt *Action) preempt(
continue
}

victimsQueue := ssn.BuildVictimsPriorityQueue(victims)
victimsQueue := ssn.BuildVictimsPriorityQueue(victims, preemptor)
// Preempt victims for tasks, pick lowest priority task first.
preempted := api.EmptyResource()

Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/actions/reclaim/reclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (ra *Action) Execute(ssn *framework.Session) {
continue
}

victimsQueue := ssn.BuildVictimsPriorityQueue(victims)
victimsQueue := ssn.BuildVictimsPriorityQueue(victims, task)

resreq := task.InitResreq.Clone()
reclaimed := api.EmptyResource()
Expand Down
3 changes: 3 additions & 0 deletions pkg/scheduler/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ type LessFn func(interface{}, interface{}) bool
// CompareFn is the func declaration used by sort or priority queue.
type CompareFn func(interface{}, interface{}) int

// VictimCompareFn is the func declaration used by sort or priority victims.
type VictimCompareFn func(interface{}, interface{}, interface{}) int

// ValidateFn is the func declaration used to check object's status.
type ValidateFn func(interface{}) bool

Expand Down
38 changes: 38 additions & 0 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,35 @@ func newDefaultQueue(vcClient vcclient.Interface, defaultQueue string) {
}
}

// newRootQueue init root queue
func newRootQueue(vcClient vcclient.Interface) {
Rui-Gan marked this conversation as resolved.
Show resolved Hide resolved
reclaimable := false
rootQueue := vcv1beta1.Queue{
ObjectMeta: metav1.ObjectMeta{
Name: "root",
},
Spec: vcv1beta1.QueueSpec{
Reclaimable: &reclaimable,
Weight: 1,
},
}

err := retry.OnError(wait.Backoff{
Steps: 60,
Duration: time.Second,
Factor: 1,
Jitter: 0.1,
}, func(err error) bool {
return !apierrors.IsAlreadyExists(err)
}, func() error {
_, err := vcClient.SchedulingV1beta1().Queues().Create(context.TODO(), &rootQueue, metav1.CreateOptions{})
return err
})
if err != nil && !apierrors.IsAlreadyExists(err) {
panic(fmt.Errorf("failed init root queue, with err: %v", err))
}
}

func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueue string, nodeSelectors []string, nodeWorkers uint32, ignoredProvisioners []string) *SchedulerCache {
kubeClient, err := kubernetes.NewForConfig(config)
if err != nil {
Expand All @@ -532,6 +561,10 @@ func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueu
newDefaultQueue(vcClient, defaultQueue)
klog.Infof("Create init queue named default")

// create root queue
newRootQueue(vcClient)
klog.Infof("Create init queue named root")

errTaskRateLimiter := workqueue.NewMaxOfRateLimiter(
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(100), 1000)},
Expand Down Expand Up @@ -941,6 +974,11 @@ func (sc *SchedulerCache) Client() kubernetes.Interface {
return sc.kubeClient
}

// VCClient returns the volcano clientSet
func (sc *SchedulerCache) VCClient() vcclient.Interface {
return sc.vcClient
}

// ClientConfig returns the rest config
func (sc *SchedulerCache) ClientConfig() *rest.Config {
return sc.restConfig
Expand Down
4 changes: 4 additions & 0 deletions pkg/scheduler/cache/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"

vcclient "volcano.sh/apis/pkg/client/clientset/versioned"
"volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/capabilities/volumebinding"
)
Expand Down Expand Up @@ -74,6 +75,9 @@ type Cache interface {
// Client returns the kubernetes clientSet, which can be used by plugins
Client() kubernetes.Interface

// VCClient returns the volcano clientSet, which can be used by plugins
VCClient() vcclient.Interface

// ClientConfig returns the rest config
ClientConfig() *rest.Config

Expand Down
94 changes: 52 additions & 42 deletions pkg/scheduler/framework/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"volcano.sh/apis/pkg/apis/scheduling"
schedulingscheme "volcano.sh/apis/pkg/apis/scheduling/scheme"
vcv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
vcclient "volcano.sh/apis/pkg/client/clientset/versioned"
"volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/cache"
"volcano.sh/volcano/pkg/scheduler/conf"
Expand All @@ -46,6 +47,7 @@ type Session struct {
UID types.UID

kubeClient kubernetes.Interface
vcClient vcclient.Interface
recorder record.EventRecorder
cache cache.Cache
restConfig *rest.Config
Expand All @@ -72,22 +74,23 @@ type Session struct {
Configurations []conf.Configuration
NodeList []*api.NodeInfo

plugins map[string]Plugin
eventHandlers []*EventHandler
jobOrderFns map[string]api.CompareFn
queueOrderFns map[string]api.CompareFn
taskOrderFns map[string]api.CompareFn
clusterOrderFns map[string]api.CompareFn
predicateFns map[string]api.PredicateFn
prePredicateFns map[string]api.PrePredicateFn
bestNodeFns map[string]api.BestNodeFn
nodeOrderFns map[string]api.NodeOrderFn
batchNodeOrderFns map[string]api.BatchNodeOrderFn
nodeMapFns map[string]api.NodeMapFn
nodeReduceFns map[string]api.NodeReduceFn
preemptableFns map[string]api.EvictableFn
reclaimableFns map[string]api.EvictableFn
overusedFns map[string]api.ValidateFn
plugins map[string]Plugin
eventHandlers []*EventHandler
jobOrderFns map[string]api.CompareFn
queueOrderFns map[string]api.CompareFn
victimQueueOrderFns map[string]api.VictimCompareFn
taskOrderFns map[string]api.CompareFn
clusterOrderFns map[string]api.CompareFn
predicateFns map[string]api.PredicateFn
prePredicateFns map[string]api.PrePredicateFn
bestNodeFns map[string]api.BestNodeFn
nodeOrderFns map[string]api.NodeOrderFn
batchNodeOrderFns map[string]api.BatchNodeOrderFn
nodeMapFns map[string]api.NodeMapFn
nodeReduceFns map[string]api.NodeReduceFn
preemptableFns map[string]api.EvictableFn
reclaimableFns map[string]api.EvictableFn
overusedFns map[string]api.ValidateFn
// preemptiveFns means whether current queue can reclaim from other queue,
// while reclaimableFns means whether current queue's resources can be reclaimed.
preemptiveFns map[string]api.ValidateWithCandidateFn
Expand All @@ -107,6 +110,7 @@ func openSession(cache cache.Cache) *Session {
ssn := &Session{
UID: uuid.NewUUID(),
kubeClient: cache.Client(),
vcClient: cache.VCClient(),
restConfig: cache.ClientConfig(),
recorder: cache.EventRecorder(),
cache: cache,
Expand All @@ -121,32 +125,33 @@ func openSession(cache cache.Cache) *Session {
RevocableNodes: map[string]*api.NodeInfo{},
Queues: map[api.QueueID]*api.QueueInfo{},

plugins: map[string]Plugin{},
jobOrderFns: map[string]api.CompareFn{},
queueOrderFns: map[string]api.CompareFn{},
taskOrderFns: map[string]api.CompareFn{},
clusterOrderFns: map[string]api.CompareFn{},
predicateFns: map[string]api.PredicateFn{},
prePredicateFns: map[string]api.PrePredicateFn{},
bestNodeFns: map[string]api.BestNodeFn{},
nodeOrderFns: map[string]api.NodeOrderFn{},
batchNodeOrderFns: map[string]api.BatchNodeOrderFn{},
nodeMapFns: map[string]api.NodeMapFn{},
nodeReduceFns: map[string]api.NodeReduceFn{},
preemptableFns: map[string]api.EvictableFn{},
reclaimableFns: map[string]api.EvictableFn{},
overusedFns: map[string]api.ValidateFn{},
preemptiveFns: map[string]api.ValidateWithCandidateFn{},
allocatableFns: map[string]api.AllocatableFn{},
jobReadyFns: map[string]api.ValidateFn{},
jobPipelinedFns: map[string]api.VoteFn{},
jobValidFns: map[string]api.ValidateExFn{},
jobEnqueueableFns: map[string]api.VoteFn{},
jobEnqueuedFns: map[string]api.JobEnqueuedFn{},
targetJobFns: map[string]api.TargetJobFn{},
reservedNodesFns: map[string]api.ReservedNodesFn{},
victimTasksFns: map[string][]api.VictimTasksFn{},
jobStarvingFns: map[string]api.ValidateFn{},
plugins: map[string]Plugin{},
jobOrderFns: map[string]api.CompareFn{},
queueOrderFns: map[string]api.CompareFn{},
victimQueueOrderFns: map[string]api.VictimCompareFn{},
taskOrderFns: map[string]api.CompareFn{},
clusterOrderFns: map[string]api.CompareFn{},
predicateFns: map[string]api.PredicateFn{},
prePredicateFns: map[string]api.PrePredicateFn{},
bestNodeFns: map[string]api.BestNodeFn{},
nodeOrderFns: map[string]api.NodeOrderFn{},
batchNodeOrderFns: map[string]api.BatchNodeOrderFn{},
nodeMapFns: map[string]api.NodeMapFn{},
nodeReduceFns: map[string]api.NodeReduceFn{},
preemptableFns: map[string]api.EvictableFn{},
reclaimableFns: map[string]api.EvictableFn{},
overusedFns: map[string]api.ValidateFn{},
preemptiveFns: map[string]api.ValidateWithCandidateFn{},
allocatableFns: map[string]api.AllocatableFn{},
jobReadyFns: map[string]api.ValidateFn{},
jobPipelinedFns: map[string]api.VoteFn{},
jobValidFns: map[string]api.ValidateExFn{},
jobEnqueueableFns: map[string]api.VoteFn{},
jobEnqueuedFns: map[string]api.JobEnqueuedFn{},
targetJobFns: map[string]api.TargetJobFn{},
reservedNodesFns: map[string]api.ReservedNodesFn{},
victimTasksFns: map[string][]api.VictimTasksFn{},
jobStarvingFns: map[string]api.ValidateFn{},
}

snapshot := cache.Snapshot()
Expand Down Expand Up @@ -590,6 +595,11 @@ func (ssn Session) KubeClient() kubernetes.Interface {
return ssn.kubeClient
}

// VCClient returns the volcano client
func (ssn Session) VCClient() vcclient.Interface {
return ssn.vcClient
}

// ClientConfig returns the rest client
func (ssn Session) ClientConfig() *rest.Config {
return ssn.restConfig
Expand Down
Loading
Loading