Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduling controls on queues and labels #3751

Closed
wants to merge 8 commits into from
1 change: 1 addition & 0 deletions cmd/armadactl/cmd/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func getCmd() *cobra.Command {
}
cmd.AddCommand(
queueGetCmd(),
queuesGetCmd(),
getSchedulingReportCmd(armadactl.New()),
getQueueSchedulingReportCmd(armadactl.New()),
getJobSchedulingReportCmd(armadactl.New()),
Expand Down
2 changes: 1 addition & 1 deletion cmd/armadactl/cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func useContextCmd(a *armadactl.App) *cobra.Command {
Short: "Sets the default context for future armadactl calls",
Long: `This command allows you to set the default context value for future armadactl calls.

It stores this state within the specified, or otherwise default ($HOME/.armadactl) configuration file.`,
It stores this state within the specified, or otherwise default ($HOME/.armadactl.yaml) configuration file.`,
Args: cobra.ExactArgs(1),
PreRunE: func(cmd *cobra.Command, args []string) error {
return initParams(cmd, a.Params)
Expand Down
1 change: 1 addition & 0 deletions cmd/armadactl/cmd/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func initParams(cmd *cobra.Command, params *armadactl.Params) error {
// Setup the armadactl to use pkg/client as its backend for queue-related commands
params.QueueAPI.Create = cq.Create(client.ExtractCommandlineArmadaApiConnectionDetails)
params.QueueAPI.Delete = cq.Delete(client.ExtractCommandlineArmadaApiConnectionDetails)
params.QueueAPI.GetAll = cq.GetAll(client.ExtractCommandlineArmadaApiConnectionDetails)
params.QueueAPI.Get = cq.Get(client.ExtractCommandlineArmadaApiConnectionDetails)
params.QueueAPI.Update = cq.Update(client.ExtractCommandlineArmadaApiConnectionDetails)

Expand Down
94 changes: 81 additions & 13 deletions cmd/armadactl/cmd/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func queueCreateCmdWithApp(a *armadactl.App) *cobra.Command {
Short: "Create new queue",
Long: `Every job submitted to armada needs to be associated with queue.

Job priority is evaluated inside queue, queue has its own priority.`,
Job priority is evaluated inside queue, queue has its own priority. Any labels on the queue must have a Kubernetes-like key-value structure, for example: armadaproject.io/submitter=airflow.`,
Args: cobra.ExactArgs(1),
PreRunE: func(cmd *cobra.Command, args []string) error {
return initParams(cmd, a.Params)
Expand All @@ -46,22 +46,36 @@ Job priority is evaluated inside queue, queue has its own priority.`,
return fmt.Errorf("error reading group-owners: %s", err)
}

queue, err := queue.NewQueue(&api.Queue{
Name: name,
PriorityFactor: priorityFactor,
UserOwners: owners,
GroupOwners: groups,
schedulingPaused, err := cmd.Flags().GetBool("pause-scheduling")
if err != nil {
return fmt.Errorf("error reading pause-scheduling: %s", err)
}

labels, err := cmd.Flags().GetStringSlice("labels")
if err != nil {
return fmt.Errorf("error reading queue labels: %s", err)
}

newQueue, err := queue.NewQueue(&api.Queue{
Name: name,
PriorityFactor: priorityFactor,
UserOwners: owners,
GroupOwners: groups,
SchedulingPaused: schedulingPaused,
Labels: labels,
})
if err != nil {
return fmt.Errorf("invalid queue data: %s", err)
}

return a.CreateQueue(queue)
return a.CreateQueue(newQueue)
},
}
cmd.Flags().Float64("priority-factor", 1, "Set queue priority factor - lower number makes queue more important, must be > 0.")
cmd.Flags().StringSlice("owners", []string{}, "Comma separated list of queue owners, defaults to current user.")
cmd.Flags().StringSlice("group-owners", []string{}, "Comma separated list of queue group owners, defaults to empty list.")
cmd.Flags().Bool("pause-scheduling", false, "Used to pause scheduling on specified queue. Defaults to false.")
cmd.Flags().StringSliceP("labels", "l", []string{}, "Comma separated list of key-value queue labels, for example: armadaproject.io/submitter=airflow. Defaults to empty list.")
return cmd
}

Expand Down Expand Up @@ -109,6 +123,46 @@ func queueGetCmdWithApp(a *armadactl.App) *cobra.Command {
return cmd
}

func queuesGetCmd() *cobra.Command {
return queuesGetCmdWithApp(armadactl.New())
}

// Takes a caller-supplied app struct; useful for testing.
func queuesGetCmdWithApp(a *armadactl.App) *cobra.Command {
cmd := &cobra.Command{
Use: "queues",
Short: "Gets information from multiple queues.",
Long: "Gets information from multiple queues, filtering by queue name and/or matching labels. Defaults to retrieving all queues.",
Args: cobra.ExactArgs(0),
PreRunE: func(cmd *cobra.Command, args []string) error {
return initParams(cmd, a.Params)
},
RunE: func(cmd *cobra.Command, args []string) error {
inverse, err := cmd.Flags().GetBool("inverse")
if err != nil {
return fmt.Errorf("error reading inverse flag: %s", err)
}

labels, err := cmd.Flags().GetStringSlice("match-labels")
if err != nil {
return fmt.Errorf("error reading queue labels: %s", err)
}

queues, err := cmd.Flags().GetStringSlice("match-queues")
if err != nil {
return fmt.Errorf("error reading queue labels: %s", err)
}

return a.GetAllQueues(queues, labels, inverse)
},
}
cmd.Flags().StringSliceP("match-queues", "q", []string{}, "Select queues matching provided queue names.")
cmd.Flags().StringSliceP("match-labels", "l", []string{}, "Select queues by label.")
cmd.Flags().Bool("inverse", false, "Inverts result to get all queues which don't contain provided set of labels. Defaults to false.")

return cmd
}

func queueUpdateCmd() *cobra.Command {
return queueUpdateCmdWithApp(armadactl.New())
}
Expand Down Expand Up @@ -141,22 +195,36 @@ func queueUpdateCmdWithApp(a *armadactl.App) *cobra.Command {
return fmt.Errorf("error reading group-owners: %s", err)
}

queue, err := queue.NewQueue(&api.Queue{
Name: name,
PriorityFactor: priorityFactor,
UserOwners: owners,
GroupOwners: groups,
schedulingPaused, err := cmd.Flags().GetBool("pause-scheduling")
if err != nil {
return fmt.Errorf("error reading pause-scheduling: %s", err)
}

labels, err := cmd.Flags().GetStringSlice("labels")
if err != nil {
return fmt.Errorf("error reading queue labels: %s", err)
}

newQueue, err := queue.NewQueue(&api.Queue{
Name: name,
PriorityFactor: priorityFactor,
UserOwners: owners,
GroupOwners: groups,
SchedulingPaused: schedulingPaused,
Labels: labels,
})
if err != nil {
return fmt.Errorf("invalid queue data: %s", err)
}

return a.UpdateQueue(queue)
return a.UpdateQueue(newQueue)
},
}
// TODO this will overwrite existing values with default values if not all flags are provided
cmd.Flags().Float64("priority-factor", 1, "Set queue priority factor - lower number makes queue more important, must be > 0.")
cmd.Flags().StringSlice("owners", []string{}, "Comma separated list of queue owners, defaults to current user.")
cmd.Flags().StringSlice("group-owners", []string{}, "Comma separated list of queue group owners, defaults to empty list.")
cmd.Flags().Bool("pause-scheduling", false, "Used to pause scheduling on specified queue. Defaults to false.")
cmd.Flags().StringSliceP("labels", "l", []string{}, "Comma separated list of key-value queue labels, for example: armadaproject.io/submitter=airflow. Defaults to empty list.")
return cmd
}
2 changes: 2 additions & 0 deletions cmd/armadactl/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ func RootCmd() *cobra.Command {
configCmd(armadactl.New()),
preemptCmd(),
docsCmd(),
cordon(),
uncordon(),
)

return cmd
Expand Down
116 changes: 116 additions & 0 deletions cmd/armadactl/cmd/scheduling_control.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package cmd

import (
"fmt"

Check failure on line 4 in cmd/armadactl/cmd/scheduling_control.go

View workflow job for this annotation

GitHub Actions / lint / Lint Go

File is not `gofumpt`-ed (gofumpt)
"github.com/spf13/cobra"

"github.com/armadaproject/armada/internal/armadactl"
)

func cordon() *cobra.Command {
a := armadactl.New()
cmd := &cobra.Command{
Use: "cordon",
Short: "Pause scheduling by resource",
Long: "Pause scheduling by resource. Supported: queue",
}
cmd.AddCommand(cordonQueue(a))
return cmd
}

func uncordon() *cobra.Command {
a := armadactl.New()
cmd := &cobra.Command{
Use: "uncordon",
Short: "Resume scheduling by resource",
Long: "Resume scheduling by resource. Supported: queue",
}
cmd.AddCommand(uncordonQueue(a))
return cmd
}

func cordonQueue(a *armadactl.App) *cobra.Command {
cmd := &cobra.Command{
Use: "queue",
Short: "Pause scheduling for select queues",
Long: "Pause scheduling for select queues. This can be achieved by queue name or by labels.",
PreRunE: func(cmd *cobra.Command, args []string) error {
return initParams(cmd, a.Params)
},
RunE: func(cmd *cobra.Command, args []string) error {
matchQueues, err := cmd.Flags().GetStringSlice("match-queues")
if err != nil {
return fmt.Errorf("error reading queue selection: %s", err)
}

matchLabels, err := cmd.Flags().GetStringSlice("match-labels")
if err != nil {
return fmt.Errorf("error reading label selection: %s", err)
}

inverse, err := cmd.Flags().GetBool("inverse")
if err != nil {
return fmt.Errorf("error reading inverse flag: %s", err)
}

dryRun, err := cmd.Flags().GetBool("dry-run")
if err != nil {
return fmt.Errorf("error reading dry-run flag: %s", err)
}

if len(matchQueues) == 0 && len(matchLabels) == 0 {
return fmt.Errorf("either match-queues or match-labels must be set to narrow down queues to cordon")
}

return a.CordonQueues(matchQueues, matchLabels, dryRun, inverse)
},
}
cmd.Flags().StringSliceP("match-queues", "q", []string{}, "Provide a comma separated list of queues you'd like to pause scheduling for. Defaults to empty.")
cmd.Flags().StringSliceP("match-labels", "l", []string{}, "Provide a comma separated list of labels. Queues matching all provided labels will have scheduling paused. Defaults to empty.")
cmd.Flags().Bool("inverse", false, "Select all queues which do not match the provided parameters")
cmd.Flags().Bool("dry-run", false, "Show selection of queues that will be modified in this operation")
return cmd
}

func uncordonQueue(a *armadactl.App) *cobra.Command {
cmd := &cobra.Command{
Use: "queue",
Short: "Resume scheduling for select queues",
Long: "Resume scheduling for select queues. This can be achieved by queue name or by labels.",
PreRunE: func(cmd *cobra.Command, args []string) error {
return initParams(cmd, a.Params)
},
RunE: func(cmd *cobra.Command, args []string) error {
matchQueues, err := cmd.Flags().GetStringSlice("match-queues")
if err != nil {
return fmt.Errorf("error reading queue selection: %s", err)
}

matchLabels, err := cmd.Flags().GetStringSlice("match-labels")
if err != nil {
return fmt.Errorf("error reading label selection: %s", err)
}

inverse, err := cmd.Flags().GetBool("inverse")
if err != nil {
return fmt.Errorf("error reading inverse flag: %s", err)
}

dryRun, err := cmd.Flags().GetBool("dry-run")
if err != nil {
return fmt.Errorf("error reading dry-run flag: %s", err)
}

if len(matchQueues) == 0 && len(matchLabels) == 0 {
return fmt.Errorf("either match-queues or match-labels must be set to narrow down queues to uncordon")
}

return a.UncordonQueues(matchQueues, matchLabels, dryRun, inverse)
},
}
cmd.Flags().StringSliceP("match-queues", "q", []string{}, "Provide a comma separated list of queues you'd like to resume scheduling for. Defaults to empty.")
cmd.Flags().StringSliceP("match-labels", "l", []string{}, "Provide a comma separated list of labels. Queues matching all provided labels will have scheduling resumed. Defaults to empty.")
cmd.Flags().Bool("inverse", false, "Select all queues which do not match the provided parameters")
cmd.Flags().Bool("dry-run", false, "Show selection of queues that will be modified in this operation")
return cmd
}
1 change: 1 addition & 0 deletions internal/armadactl/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type QueueAPI struct {
Create queue.CreateAPI
Delete queue.DeleteAPI
Get queue.GetAPI
GetAll queue.GetAllAPI
Update queue.UpdateAPI
}

Expand Down
42 changes: 42 additions & 0 deletions internal/armadactl/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ package armadactl

import (
"fmt"
"strings"

"github.com/pkg/errors"
"sigs.k8s.io/yaml"

"github.com/armadaproject/armada/internal/common/slices"
"github.com/armadaproject/armada/pkg/api"

"github.com/armadaproject/armada/pkg/client"
"github.com/armadaproject/armada/pkg/client/queue"
"github.com/armadaproject/armada/pkg/client/util"
Expand Down Expand Up @@ -68,6 +72,44 @@ func (a *App) GetQueue(name string) error {
return nil
}

func (a *App) getAllQueuesAsAPIQueue(queueNames []string, labels []string, inverse bool) ([]*api.Queue, error) {
queueContainsAllLabelsAndInProvidedNames := func(q *api.Queue) bool {
containsAllLabels := slices.AllFunc(labels, func(label string) bool {
// If the label is a key, map the labels slice to only keys
labelsToCompare := q.Labels
if len(strings.Split(label, "=")) == 1 {
labelsToCompare = slices.Map(q.Labels, func(queueLabel string) string { return strings.Split(queueLabel, "=")[0] })
}

return slices.Contains(labelsToCompare, label)
})
inQueues := len(queueNames) == 0 || slices.Contains(queueNames, q.Name)
return inverse != (containsAllLabels && inQueues)
}
queuesToReturn, err := a.Params.QueueAPI.GetAll()
if err != nil {
return nil, errors.Errorf("[armadactl.getAllQueuesAsAPIQueue] error getting all queues: %s", err)
}

// Filter function seems expensive enough to justify parallel filter
return slices.ParallelFilter(queuesToReturn, queueContainsAllLabelsAndInProvidedNames), nil
}

// GetQueue calls app.QueueAPI.GetAll with the provided parameters.
func (a *App) GetAllQueues(queueNames []string, labels []string, inverse bool) error {
queues, err := a.getAllQueuesAsAPIQueue(queueNames, labels, inverse)
if err != nil {
return errors.Errorf("[armadactl.GetAllQueues] error getting all queues: %s", err)
}

b, err := yaml.Marshal(queues)
if err != nil {
return errors.Errorf("[armadactl.GetAllQueues] error unmarshalling queues: %s", err)
}
fmt.Fprintf(a.Out, headerYaml()+string(b))
return nil
}

func headerYaml() string {
b, err := yaml.Marshal(client.Resource{
Version: client.APIVersionV1,
Expand Down
Loading
Loading