Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: config option to reject job registration #11610

Merged
merged 2 commits into from
Dec 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/11610.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
scheduler: Added a `RejectJobRegistration` field to the scheduler configuration API that enabled a setting to reject job register, dispatch, and scale requests without a management ACL token
```
4 changes: 4 additions & 0 deletions api/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ type SchedulerConfiguration struct {
// MemoryOversubscriptionEnabled specifies whether memory oversubscription is enabled
MemoryOversubscriptionEnabled bool

// RejectJobRegistration disables new job registrations except with a
// management ACL token
RejectJobRegistration bool

// CreateIndex/ModifyIndex store the create/modify indexes of this configuration.
CreateIndex uint64
ModifyIndex uint64
Expand Down
6 changes: 6 additions & 0 deletions command/agent/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,9 @@ func errCodeFromHandler(err error) (int, string) {
} else if strings.HasSuffix(errMsg, structs.ErrTokenNotFound.Error()) {
errMsg = structs.ErrTokenNotFound.Error()
code = 403
} else if strings.HasSuffix(errMsg, structs.ErrJobRegistrationDisabled.Error()) {
errMsg = structs.ErrJobRegistrationDisabled.Error()
code = 403
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you feel about 409 here instead of 403? We're trying to get more intentional and consistent with status codes and error messages when the opportunity arrises. 409 is documented as

409 Conflict
The request could not be completed due to a conflict with the current state of the resource

This seems more in line, and the error message should let clients know what the conflict is.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intent of 409 is to avoid write skew and the direction in the standard is to adjust the request and submit it again. Which is exactly what we don't want here.

Copy link
Member Author

@tgross tgross Dec 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm open to the idea of finding a more specific error code but I feel like it needs to be one that doesn't tell any automated system "keep retrying!" (ex. 420 or 429). The 403 tells them "you can't submit because the permissions to submit have been changed"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I was afraid this might be the answer. I'll keep looking for a possible alternative code.

}
}

Expand Down Expand Up @@ -487,6 +490,9 @@ func (s *HTTPServer) wrap(handler func(resp http.ResponseWriter, req *http.Reque
} else if strings.HasSuffix(errMsg, structs.ErrTokenNotFound.Error()) {
errMsg = structs.ErrTokenNotFound.Error()
code = 403
} else if strings.HasSuffix(errMsg, structs.ErrJobRegistrationDisabled.Error()) {
errMsg = structs.ErrJobRegistrationDisabled.Error()
code = 403
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question here.

}
}

Expand Down
1 change: 1 addition & 0 deletions command/agent/operator_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ func (s *HTTPServer) schedulerUpdateConfig(resp http.ResponseWriter, req *http.R
args.Config = structs.SchedulerConfiguration{
SchedulerAlgorithm: structs.SchedulerAlgorithm(conf.SchedulerAlgorithm),
MemoryOversubscriptionEnabled: conf.MemoryOversubscriptionEnabled,
RejectJobRegistration: conf.RejectJobRegistration,
PreemptionConfig: structs.PreemptionConfig{
SystemSchedulerEnabled: conf.PreemptionConfig.SystemSchedulerEnabled,
SysBatchSchedulerEnabled: conf.PreemptionConfig.SysBatchSchedulerEnabled,
Expand Down
38 changes: 36 additions & 2 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
reply.Warnings = structs.MergeMultierrorWarnings(warnings...)

// Check job submission permissions
if aclObj, err := j.srv.ResolveToken(args.AuthToken); err != nil {
var aclObj *acl.ACL
if aclObj, err = j.srv.ResolveToken(args.AuthToken); err != nil {
return err
} else if aclObj != nil {
if !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilitySubmitJob) {
Expand Down Expand Up @@ -175,6 +176,11 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
}
}

if ok, err := registrationsAreAllowed(aclObj, j.srv.State()); !ok || err != nil {
j.logger.Warn("job registration is currently disabled for non-management ACL")
return structs.ErrJobRegistrationDisabled
}

// Lookup the job
snap, err := j.srv.State().Snapshot()
if err != nil {
Expand Down Expand Up @@ -1022,6 +1028,11 @@ func (j *Job) Scale(args *structs.JobScaleRequest, reply *structs.JobRegisterRes
}
}

if ok, err := registrationsAreAllowed(aclObj, j.srv.State()); !ok || err != nil {
j.logger.Warn("job scaling is currently disabled for non-management ACL")
return structs.ErrJobRegistrationDisabled
}

// Validate args
err = args.Validate()
if err != nil {
Expand Down Expand Up @@ -1304,6 +1315,22 @@ func allowedNSes(aclObj *acl.ACL, state *state.StateStore, allow func(ns string)
return r, nil
}

// registrationsAreAllowed checks that the scheduler is not in
// RejectJobRegistration mode for load-shedding.
func registrationsAreAllowed(aclObj *acl.ACL, state *state.StateStore) (bool, error) {
_, cfg, err := state.SchedulerConfig()
if err != nil {
return false, err
}
if cfg != nil && !cfg.RejectJobRegistration {
return true, nil
}
if aclObj != nil && aclObj.IsManagement() {
return true, nil
}
return false, nil
}

// List is used to list the jobs registered in the system
func (j *Job) List(args *structs.JobListRequest, reply *structs.JobListResponse) error {
if done, err := j.srv.forward("Job.List", args, args, reply); done {
Expand Down Expand Up @@ -1852,12 +1879,19 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa
defer metrics.MeasureSince([]string{"nomad", "job", "dispatch"}, time.Now())

// Check for submit-job permissions
if aclObj, err := j.srv.ResolveToken(args.AuthToken); err != nil {
var aclObj *acl.ACL
var err error
if aclObj, err = j.srv.ResolveToken(args.AuthToken); err != nil {
return err
} else if aclObj != nil && !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityDispatchJob) {
return structs.ErrPermissionDenied
}

if ok, err := registrationsAreAllowed(aclObj, j.srv.State()); !ok || err != nil {
j.logger.Warn("job dispatch is currently disabled for non-management ACL")
return structs.ErrJobRegistrationDisabled
}

// Lookup the parameterized job
if args.JobID == "" {
return fmt.Errorf("missing parameterized job ID")
Expand Down
Loading