Skip to content

Commit

Permalink
scheduler: config option to reject job registration
Browse files Browse the repository at this point in the history
During incident response, operators may find that automated processes
elsewhere in the organization can be generating new workloads on Nomad
clusters that are unable to handle the workload. This changeset adds a
field to the `SchedulerConfiguration` API that causes all job
registration calls to be rejected unless the request has a management
ACL token.
  • Loading branch information
tgross committed Dec 3, 2021
1 parent 1391c37 commit bce7d6c
Show file tree
Hide file tree
Showing 10 changed files with 344 additions and 2 deletions.
3 changes: 3 additions & 0 deletions .changelog/11610.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
scheduler: Added a `RejectJobRegistration` field to the scheduler configuration API that enabled a setting to reject job register, dispatch, and scale requests without a management ACL token
```
4 changes: 4 additions & 0 deletions api/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ type SchedulerConfiguration struct {
// MemoryOversubscriptionEnabled specifies whether memory oversubscription is enabled
MemoryOversubscriptionEnabled bool

// RejectJobRegistration disables new job registrations except with a
// management ACL token
RejectJobRegistration bool

// CreateIndex/ModifyIndex store the create/modify indexes of this configuration.
CreateIndex uint64
ModifyIndex uint64
Expand Down
6 changes: 6 additions & 0 deletions command/agent/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,9 @@ func errCodeFromHandler(err error) (int, string) {
} else if strings.HasSuffix(errMsg, structs.ErrTokenNotFound.Error()) {
errMsg = structs.ErrTokenNotFound.Error()
code = 403
} else if strings.HasSuffix(errMsg, structs.ErrJobRegistrationDisabled.Error()) {
errMsg = structs.ErrJobRegistrationDisabled.Error()
code = 403
}
}

Expand Down Expand Up @@ -487,6 +490,9 @@ func (s *HTTPServer) wrap(handler func(resp http.ResponseWriter, req *http.Reque
} else if strings.HasSuffix(errMsg, structs.ErrTokenNotFound.Error()) {
errMsg = structs.ErrTokenNotFound.Error()
code = 403
} else if strings.HasSuffix(errMsg, structs.ErrJobRegistrationDisabled.Error()) {
errMsg = structs.ErrJobRegistrationDisabled.Error()
code = 403
}
}

Expand Down
1 change: 1 addition & 0 deletions command/agent/operator_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ func (s *HTTPServer) schedulerUpdateConfig(resp http.ResponseWriter, req *http.R
args.Config = structs.SchedulerConfiguration{
SchedulerAlgorithm: structs.SchedulerAlgorithm(conf.SchedulerAlgorithm),
MemoryOversubscriptionEnabled: conf.MemoryOversubscriptionEnabled,
RejectJobRegistration: conf.RejectJobRegistration,
PreemptionConfig: structs.PreemptionConfig{
SystemSchedulerEnabled: conf.PreemptionConfig.SystemSchedulerEnabled,
SysBatchSchedulerEnabled: conf.PreemptionConfig.SysBatchSchedulerEnabled,
Expand Down
39 changes: 37 additions & 2 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
reply.Warnings = structs.MergeMultierrorWarnings(warnings...)

// Check job submission permissions
if aclObj, err := j.srv.ResolveToken(args.AuthToken); err != nil {
var aclObj *acl.ACL
if aclObj, err = j.srv.ResolveToken(args.AuthToken); err != nil {
return err
} else if aclObj != nil {
if !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilitySubmitJob) {
Expand Down Expand Up @@ -175,11 +176,17 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
}
}

if ok, err := allowedRegistration(aclObj, j.srv.State()); !ok || err != nil {
j.logger.Warn("job registration for non-management ACL rejected")
return structs.ErrJobRegistrationDisabled
}

// Lookup the job
snap, err := j.srv.State().Snapshot()
if err != nil {
return err
}

ws := memdb.NewWatchSet()
existingJob, err := snap.JobByID(ws, args.RequestNamespace(), args.Job.ID)
if err != nil {
Expand Down Expand Up @@ -1022,6 +1029,11 @@ func (j *Job) Scale(args *structs.JobScaleRequest, reply *structs.JobRegisterRes
}
}

if ok, err := allowedRegistration(aclObj, j.srv.State()); !ok || err != nil {
j.logger.Warn("job scaling for non-management ACL rejected")
return structs.ErrJobRegistrationDisabled
}

// Validate args
err = args.Validate()
if err != nil {
Expand Down Expand Up @@ -1304,6 +1316,22 @@ func allowedNSes(aclObj *acl.ACL, state *state.StateStore, allow func(ns string)
return r, nil
}

// allowedRegistration checks that the scheduler is not in
// RejectJobRegistration mode for load-shedding.
func allowedRegistration(aclObj *acl.ACL, state *state.StateStore) (bool, error) {
_, cfg, err := state.SchedulerConfig()
if err != nil {
return false, err
}
if cfg != nil && !cfg.RejectJobRegistration {
return true, nil
}
if aclObj != nil && aclObj.IsManagement() {
return true, nil
}
return false, nil
}

// List is used to list the jobs registered in the system
func (j *Job) List(args *structs.JobListRequest, reply *structs.JobListResponse) error {
if done, err := j.srv.forward("Job.List", args, args, reply); done {
Expand Down Expand Up @@ -1852,12 +1880,19 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa
defer metrics.MeasureSince([]string{"nomad", "job", "dispatch"}, time.Now())

// Check for submit-job permissions
if aclObj, err := j.srv.ResolveToken(args.AuthToken); err != nil {
var aclObj *acl.ACL
var err error
if aclObj, err = j.srv.ResolveToken(args.AuthToken); err != nil {
return err
} else if aclObj != nil && !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityDispatchJob) {
return structs.ErrPermissionDenied
}

if ok, err := allowedRegistration(aclObj, j.srv.State()); !ok || err != nil {
j.logger.Warn("job dispatch for non-management ACL rejected")
return structs.ErrJobRegistrationDisabled
}

// Lookup the parameterized job
if args.JobID == "" {
return fmt.Errorf("missing parameterized job ID")
Expand Down
Loading

0 comments on commit bce7d6c

Please sign in to comment.