Skip to content

Commit

Permalink
scheduler: config option to reject job registration (#11610)
Browse files Browse the repository at this point in the history
During incident response, operators may find that automated processes
elsewhere in the organization can be generating new workloads on Nomad
clusters that are unable to handle the workload. This changeset adds a
field to the `SchedulerConfiguration` API that causes all job
registration calls to be rejected unless the request has a management
ACL token.
  • Loading branch information
tgross committed Dec 6, 2021
1 parent aa7aa6b commit 2c3db7e
Show file tree
Hide file tree
Showing 10 changed files with 337 additions and 2 deletions.
3 changes: 3 additions & 0 deletions .changelog/11610.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
scheduler: Added a `RejectJobRegistration` field to the scheduler configuration API that enabled a setting to reject job register, dispatch, and scale requests without a management ACL token
```
4 changes: 4 additions & 0 deletions api/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ type SchedulerConfiguration struct {
// MemoryOversubscriptionEnabled specifies whether memory oversubscription is enabled
MemoryOversubscriptionEnabled bool

// RejectJobRegistration disables new job registrations except with a
// management ACL token
RejectJobRegistration bool

// CreateIndex/ModifyIndex store the create/modify indexes of this configuration.
CreateIndex uint64
ModifyIndex uint64
Expand Down
6 changes: 6 additions & 0 deletions command/agent/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,9 @@ func errCodeFromHandler(err error) (int, string) {
} else if strings.HasSuffix(errMsg, structs.ErrTokenNotFound.Error()) {
errMsg = structs.ErrTokenNotFound.Error()
code = 403
} else if strings.HasSuffix(errMsg, structs.ErrJobRegistrationDisabled.Error()) {
errMsg = structs.ErrJobRegistrationDisabled.Error()
code = 403
}
}

Expand Down Expand Up @@ -487,6 +490,9 @@ func (s *HTTPServer) wrap(handler func(resp http.ResponseWriter, req *http.Reque
} else if strings.HasSuffix(errMsg, structs.ErrTokenNotFound.Error()) {
errMsg = structs.ErrTokenNotFound.Error()
code = 403
} else if strings.HasSuffix(errMsg, structs.ErrJobRegistrationDisabled.Error()) {
errMsg = structs.ErrJobRegistrationDisabled.Error()
code = 403
}
}

Expand Down
1 change: 1 addition & 0 deletions command/agent/operator_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ func (s *HTTPServer) schedulerUpdateConfig(resp http.ResponseWriter, req *http.R
args.Config = structs.SchedulerConfiguration{
SchedulerAlgorithm: structs.SchedulerAlgorithm(conf.SchedulerAlgorithm),
MemoryOversubscriptionEnabled: conf.MemoryOversubscriptionEnabled,
RejectJobRegistration: conf.RejectJobRegistration,
PreemptionConfig: structs.PreemptionConfig{
SystemSchedulerEnabled: conf.PreemptionConfig.SystemSchedulerEnabled,
SysBatchSchedulerEnabled: conf.PreemptionConfig.SysBatchSchedulerEnabled,
Expand Down
38 changes: 36 additions & 2 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
reply.Warnings = structs.MergeMultierrorWarnings(warnings...)

// Check job submission permissions
if aclObj, err := j.srv.ResolveToken(args.AuthToken); err != nil {
var aclObj *acl.ACL
if aclObj, err = j.srv.ResolveToken(args.AuthToken); err != nil {
return err
} else if aclObj != nil {
if !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilitySubmitJob) {
Expand Down Expand Up @@ -175,6 +176,11 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
}
}

if ok, err := registrationsAreAllowed(aclObj, j.srv.State()); !ok || err != nil {
j.logger.Warn("job registration is currently disabled for non-management ACL")
return structs.ErrJobRegistrationDisabled
}

// Lookup the job
snap, err := j.srv.State().Snapshot()
if err != nil {
Expand Down Expand Up @@ -1022,6 +1028,11 @@ func (j *Job) Scale(args *structs.JobScaleRequest, reply *structs.JobRegisterRes
}
}

if ok, err := registrationsAreAllowed(aclObj, j.srv.State()); !ok || err != nil {
j.logger.Warn("job scaling is currently disabled for non-management ACL")
return structs.ErrJobRegistrationDisabled
}

// Validate args
err = args.Validate()
if err != nil {
Expand Down Expand Up @@ -1304,6 +1315,22 @@ func allowedNSes(aclObj *acl.ACL, state *state.StateStore, allow func(ns string)
return r, nil
}

// registrationsAreAllowed checks that the scheduler is not in
// RejectJobRegistration mode for load-shedding.
func registrationsAreAllowed(aclObj *acl.ACL, state *state.StateStore) (bool, error) {
_, cfg, err := state.SchedulerConfig()
if err != nil {
return false, err
}
if cfg != nil && !cfg.RejectJobRegistration {
return true, nil
}
if aclObj != nil && aclObj.IsManagement() {
return true, nil
}
return false, nil
}

// List is used to list the jobs registered in the system
func (j *Job) List(args *structs.JobListRequest, reply *structs.JobListResponse) error {
if done, err := j.srv.forward("Job.List", args, args, reply); done {
Expand Down Expand Up @@ -1852,12 +1879,19 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa
defer metrics.MeasureSince([]string{"nomad", "job", "dispatch"}, time.Now())

// Check for submit-job permissions
if aclObj, err := j.srv.ResolveToken(args.AuthToken); err != nil {
var aclObj *acl.ACL
var err error
if aclObj, err = j.srv.ResolveToken(args.AuthToken); err != nil {
return err
} else if aclObj != nil && !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityDispatchJob) {
return structs.ErrPermissionDenied
}

if ok, err := registrationsAreAllowed(aclObj, j.srv.State()); !ok || err != nil {
j.logger.Warn("job dispatch is currently disabled for non-management ACL")
return structs.ErrJobRegistrationDisabled
}

// Lookup the parameterized job
if args.JobID == "" {
return fmt.Errorf("missing parameterized job ID")
Expand Down
Loading

0 comments on commit 2c3db7e

Please sign in to comment.