Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cordon queue (#183) #3851

Merged
merged 2 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions internal/armada/mocks/mock_repository.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions internal/armada/permissions/permissions.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ const (
WatchAllEvents = "watch_all_events"
CreateQueue = "create_queue"
DeleteQueue = "delete_queue"
CordonQueue = "cordon_queue"
CordonNodes = "cordon_nodes"
)
20 changes: 20 additions & 0 deletions internal/armada/queue/queue_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type QueueRepository interface {
CreateQueue(*armadacontext.Context, queue.Queue) error
UpdateQueue(*armadacontext.Context, queue.Queue) error
DeleteQueue(ctx *armadacontext.Context, name string) error
CordonQueue(ctx *armadacontext.Context, name string) error
UncordonQueue(ctx *armadacontext.Context, name string) error
}

type ReadOnlyQueueRepository interface {
Expand Down Expand Up @@ -116,6 +118,24 @@ func (r *PostgresQueueRepository) DeleteQueue(ctx *armadacontext.Context, name s
return nil
}

func (r *PostgresQueueRepository) CordonQueue(ctx *armadacontext.Context, name string) error {
queueToCordon, err := r.GetQueue(ctx, name)
if err != nil {
return err
}
queueToCordon.Cordoned = true
return r.upsertQueue(ctx, queueToCordon)
}

func (r *PostgresQueueRepository) UncordonQueue(ctx *armadacontext.Context, name string) error {
queueToUncordon, err := r.GetQueue(ctx, name)
if err != nil {
return err
}
queueToUncordon.Cordoned = false
return r.upsertQueue(ctx, queueToUncordon)
}

func (r *PostgresQueueRepository) upsertQueue(ctx *armadacontext.Context, queue queue.Queue) error {
data, err := proto.Marshal(queue.ToAPI())
if err != nil {
Expand Down
69 changes: 54 additions & 15 deletions internal/armada/queue/queue_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package queue

import (
"context"
"fmt"
"math"

"github.com/gogo/protobuf/types"
Expand Down Expand Up @@ -37,9 +38,9 @@ func (s *Server) CreateQueue(grpcCtx context.Context, req *api.Queue) (*types.Em
err := s.authorizer.AuthorizeAction(ctx, permissions.CreateQueue)
var ep *armadaerrors.ErrUnauthorized
if errors.As(err, &ep) {
return nil, status.Errorf(codes.PermissionDenied, "[CreateQueue] error creating queue %s: %s", req.Name, ep)
return nil, status.Errorf(codes.PermissionDenied, "error creating queue %s: %s", req.Name, ep)
} else if err != nil {
return nil, status.Errorf(codes.Unavailable, "[CreateQueue] error checking permissions: %s", err)
return nil, status.Errorf(codes.Unavailable, "error checking permissions: %s", err)
}

if len(req.UserOwners) == 0 {
Expand All @@ -49,15 +50,15 @@ func (s *Server) CreateQueue(grpcCtx context.Context, req *api.Queue) (*types.Em

queue, err := queue.NewQueue(req)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "[CreateQueue] error validating queue: %s", err)
return nil, status.Errorf(codes.InvalidArgument, "error validating queue: %s", err)
}

err = s.queueRepository.CreateQueue(ctx, queue)
var eq *ErrQueueAlreadyExists
if errors.As(err, &eq) {
return nil, status.Errorf(codes.AlreadyExists, "[CreateQueue] error creating queue: %s", err)
return nil, status.Errorf(codes.AlreadyExists, "error creating queue: %s", err)
} else if err != nil {
return nil, status.Errorf(codes.Unavailable, "[CreateQueue] error creating queue: %s", err)
return nil, status.Errorf(codes.Unavailable, "error creating queue: %s", err)
}

return &types.Empty{}, nil
Expand Down Expand Up @@ -87,22 +88,22 @@ func (s *Server) UpdateQueue(grpcCtx context.Context, req *api.Queue) (*types.Em
err := s.authorizer.AuthorizeAction(ctx, permissions.CreateQueue)
var ep *armadaerrors.ErrUnauthorized
if errors.As(err, &ep) {
return nil, status.Errorf(codes.PermissionDenied, "[UpdateQueue] error updating queue %s: %s", req.Name, ep)
return nil, status.Errorf(codes.PermissionDenied, "error updating queue %s: %s", req.Name, ep)
} else if err != nil {
return nil, status.Errorf(codes.Unavailable, "[UpdateQueue] error checking permissions: %s", err)
return nil, status.Errorf(codes.Unavailable, "error checking permissions: %s", err)
}

queue, err := queue.NewQueue(req)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "[UpdateQueue] error: %s", err)
return nil, status.Errorf(codes.InvalidArgument, "error: %s", err)
}

err = s.queueRepository.UpdateQueue(ctx, queue)
var e *ErrQueueNotFound
if errors.As(err, &e) {
return nil, status.Errorf(codes.NotFound, "[UpdateQueue] error: %s", err)
return nil, status.Errorf(codes.NotFound, "error: %s", err)
} else if err != nil {
return nil, status.Errorf(codes.Unavailable, "[UpdateQueue] error getting queue %q: %s", queue.Name, err)
return nil, status.Errorf(codes.Unavailable, "error getting queue %q: %s", queue.Name, err)
}

return &types.Empty{}, nil
Expand Down Expand Up @@ -133,13 +134,13 @@ func (s *Server) DeleteQueue(grpcCtx context.Context, req *api.QueueDeleteReques
err := s.authorizer.AuthorizeAction(ctx, permissions.DeleteQueue)
var ep *armadaerrors.ErrUnauthorized
if errors.As(err, &ep) {
return nil, status.Errorf(codes.PermissionDenied, "[DeleteQueue] error deleting queue %s: %s", req.Name, ep)
return nil, status.Errorf(codes.PermissionDenied, "error deleting queue %s: %s", req.Name, ep)
} else if err != nil {
return nil, status.Errorf(codes.Unavailable, "[DeleteQueue] error checking permissions: %s", err)
return nil, status.Errorf(codes.Unavailable, "error checking permissions: %s", err)
}
err = s.queueRepository.DeleteQueue(ctx, req.Name)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "[DeleteQueue] error deleting queue %s: %s", req.Name, err)
return nil, status.Errorf(codes.InvalidArgument, "error deleting queue %s: %s", req.Name, err)
}
return &types.Empty{}, nil
}
Expand All @@ -149,9 +150,9 @@ func (s *Server) GetQueue(grpcCtx context.Context, req *api.QueueGetRequest) (*a
queue, err := s.queueRepository.GetQueue(ctx, req.Name)
var e *ErrQueueNotFound
if errors.As(err, &e) {
return nil, status.Errorf(codes.NotFound, "[GetQueue] error: %s", err)
return nil, status.Errorf(codes.NotFound, "error: %s", err)
} else if err != nil {
return nil, status.Errorf(codes.Unavailable, "[GetQueue] error getting queue %q: %s", req.Name, err)
return nil, status.Errorf(codes.Unavailable, "error getting queue %q: %s", req.Name, err)
}
return queue.ToAPI(), nil
}
Expand Down Expand Up @@ -189,3 +190,41 @@ func (s *Server) GetQueues(req *api.StreamingQueueGetRequest, stream api.QueueSe
}
return nil
}

func (s *Server) CordonQueue(grpcCtx context.Context, req *api.QueueCordonRequest) (*types.Empty, error) {
ctx := armadacontext.FromGrpcCtx(grpcCtx)

err := s.authorizer.AuthorizeAction(ctx, permissions.CordonQueue)
var ep *armadaerrors.ErrUnauthorized
if errors.As(err, &ep) {
return nil, status.Errorf(codes.PermissionDenied, "error cordoning queue %s: %s", req.Name, ep)
} else if err != nil {
return nil, status.Errorf(codes.Unavailable, "error checking permissions: %s", err)
}

queueName := req.Name
if queueName == "" {
return nil, fmt.Errorf("cannot cordon queue with empty name")
}

return &types.Empty{}, s.queueRepository.CordonQueue(ctx, queueName)
}

func (s *Server) UncordonQueue(grpcCtx context.Context, req *api.QueueUncordonRequest) (*types.Empty, error) {
ctx := armadacontext.FromGrpcCtx(grpcCtx)

err := s.authorizer.AuthorizeAction(ctx, permissions.CordonQueue)
var ep *armadaerrors.ErrUnauthorized
if errors.As(err, &ep) {
return nil, status.Errorf(codes.PermissionDenied, "error uncordoning queue %s: %s", req.Name, ep)
} else if err != nil {
return nil, status.Errorf(codes.Unavailable, "error checking permissions: %s", err)
}

queueName := req.Name
if queueName == "" {
return nil, fmt.Errorf("cannot uncordon queue with empty name")
}

return &types.Empty{}, s.queueRepository.UncordonQueue(ctx, queueName)
}
11 changes: 7 additions & 4 deletions internal/armadactl/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,13 @@ type Params struct {
// However, they are user-replaceable to facilitate testing.
// TODO Consider replacing with an interface
type QueueAPI struct {
Create queue.CreateAPI
Delete queue.DeleteAPI
Get queue.GetAPI
Update queue.UpdateAPI
Create queue.CreateAPI
Delete queue.DeleteAPI
Get queue.GetAPI
GetAll queue.GetAllAPI
Update queue.UpdateAPI
Cordon queue.CordonAPI
Uncordon queue.UncordonAPI
}

// New instantiates an App with default parameters, including standard output
Expand Down
22 changes: 11 additions & 11 deletions internal/scheduler/constraints/constraints.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const (
// Indicates that the scheduling rate limit has been exceeded.
GlobalRateLimitExceededUnschedulableReason = "global scheduling rate limit exceeded"
QueueRateLimitExceededUnschedulableReason = "queue scheduling rate limit exceeded"
SchedulingPausedOnQueueUnschedulableReason = "scheduling paused on queue"

// Indicates that scheduling a gang would exceed the rate limit.
GlobalRateLimitExceededByGangUnschedulableReason = "gang would exceed global scheduling rate limit"
Expand Down Expand Up @@ -54,7 +55,7 @@ func IsTerminalUnschedulableReason(reason string) bool {
// IsTerminalQueueUnschedulableReason returns true if reason indicates
// it's not possible to schedule any more jobs from this queue in this round.
func IsTerminalQueueUnschedulableReason(reason string) bool {
return reason == QueueRateLimitExceededUnschedulableReason
return reason == QueueRateLimitExceededUnschedulableReason || reason == SchedulingPausedOnQueueUnschedulableReason
}

// SchedulingConstraints contains scheduling constraints, e.g., per-queue resource limits.
Expand All @@ -75,6 +76,8 @@ type SchedulingConstraints struct {
type queueSchedulingConstraints struct {
// Scheduling constraints by priority class.
PriorityClassSchedulingConstraintsByPriorityClassName map[string]priorityClassSchedulingConstraints
// Determines whether scheduling has been paused for this queue
Cordoned bool
}

// priorityClassSchedulingConstraints contains scheduling constraints that apply to jobs of a specific priority class.
Expand All @@ -84,12 +87,7 @@ type priorityClassSchedulingConstraints struct {
MaximumResourcesPerQueue map[string]resource.Quantity
}

func NewSchedulingConstraints(
pool string,
totalResources schedulerobjects.ResourceList,
config configuration.SchedulingConfig,
queues []*api.Queue,
) SchedulingConstraints {
func NewSchedulingConstraints(pool string, totalResources schedulerobjects.ResourceList, config configuration.SchedulingConfig, queues []*api.Queue, cordonStatusByQueue map[string]bool) SchedulingConstraints {
priorityClassSchedulingConstraintsByPriorityClassName := make(map[string]priorityClassSchedulingConstraints, len(config.PriorityClasses))
for name, priorityClass := range config.PriorityClasses {
maximumResourceFractionPerQueue := priorityClass.MaximumResourceFractionPerQueue
Expand Down Expand Up @@ -117,10 +115,9 @@ func NewSchedulingConstraints(
MaximumResourcesPerQueue: absoluteFromRelativeLimits(totalResources.Resources, maximumResourceFraction),
}
}
if len(priorityClassSchedulingConstraintsByPriorityClassNameForQueue) > 0 {
queueSchedulingConstraintsByQueueName[queue.Name] = queueSchedulingConstraints{
PriorityClassSchedulingConstraintsByPriorityClassName: priorityClassSchedulingConstraintsByPriorityClassNameForQueue,
}
queueSchedulingConstraintsByQueueName[queue.Name] = queueSchedulingConstraints{
PriorityClassSchedulingConstraintsByPriorityClassName: priorityClassSchedulingConstraintsByPriorityClassNameForQueue,
Cordoned: cordonStatusByQueue[queue.Name],
}
}

Expand Down Expand Up @@ -182,6 +179,9 @@ func (constraints *SchedulingConstraints) CheckConstraints(
return false, GlobalRateLimitExceededByGangUnschedulableReason, nil
}

if queueConstraints, ok := constraints.queueSchedulingConstraintsByQueueName[qctx.Queue]; ok && queueConstraints.Cordoned {
return false, SchedulingPausedOnQueueUnschedulableReason, nil
}
// Per-queue rate limiter check.
tokens = qctx.Limiter.TokensAt(sctx.Started)
if tokens <= 0 {
Expand Down
Loading