diff --git a/.changelog/11610.txt b/.changelog/11610.txt new file mode 100644 index 000000000000..1cf42c7212a3 --- /dev/null +++ b/.changelog/11610.txt @@ -0,0 +1,3 @@ +```release-note:improvement +scheduler: Added a `RejectJobRegistration` field to the scheduler configuration API that enabled a setting to reject job register, dispatch, and scale requests without a management ACL token +``` diff --git a/api/operator.go b/api/operator.go index 7adf79f1cbf2..439fabe052e2 100644 --- a/api/operator.go +++ b/api/operator.go @@ -129,6 +129,10 @@ type SchedulerConfiguration struct { // MemoryOversubscriptionEnabled specifies whether memory oversubscription is enabled MemoryOversubscriptionEnabled bool + // RejectJobRegistration disables new job registrations except with a + // management ACL token + RejectJobRegistration bool + // CreateIndex/ModifyIndex store the create/modify indexes of this configuration. CreateIndex uint64 ModifyIndex uint64 diff --git a/command/agent/http.go b/command/agent/http.go index 6371873d0e60..f56215f72303 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -451,6 +451,9 @@ func errCodeFromHandler(err error) (int, string) { } else if strings.HasSuffix(errMsg, structs.ErrTokenNotFound.Error()) { errMsg = structs.ErrTokenNotFound.Error() code = 403 + } else if strings.HasSuffix(errMsg, structs.ErrJobRegistrationDisabled.Error()) { + errMsg = structs.ErrJobRegistrationDisabled.Error() + code = 403 } } @@ -487,6 +490,9 @@ func (s *HTTPServer) wrap(handler func(resp http.ResponseWriter, req *http.Reque } else if strings.HasSuffix(errMsg, structs.ErrTokenNotFound.Error()) { errMsg = structs.ErrTokenNotFound.Error() code = 403 + } else if strings.HasSuffix(errMsg, structs.ErrJobRegistrationDisabled.Error()) { + errMsg = structs.ErrJobRegistrationDisabled.Error() + code = 403 } } diff --git a/command/agent/operator_endpoint.go b/command/agent/operator_endpoint.go index 78da1fd2c220..a0f7575f4c6d 100644 --- a/command/agent/operator_endpoint.go +++ b/command/agent/operator_endpoint.go @@ -261,6 +261,7 @@ func (s *HTTPServer) schedulerUpdateConfig(resp http.ResponseWriter, req *http.R args.Config = structs.SchedulerConfiguration{ SchedulerAlgorithm: structs.SchedulerAlgorithm(conf.SchedulerAlgorithm), MemoryOversubscriptionEnabled: conf.MemoryOversubscriptionEnabled, + RejectJobRegistration: conf.RejectJobRegistration, PreemptionConfig: structs.PreemptionConfig{ SystemSchedulerEnabled: conf.PreemptionConfig.SystemSchedulerEnabled, SysBatchSchedulerEnabled: conf.PreemptionConfig.SysBatchSchedulerEnabled, diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 59b5dd106f8b..8d9119800e7a 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -114,7 +114,8 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis reply.Warnings = structs.MergeMultierrorWarnings(warnings...) // Check job submission permissions - if aclObj, err := j.srv.ResolveToken(args.AuthToken); err != nil { + var aclObj *acl.ACL + if aclObj, err = j.srv.ResolveToken(args.AuthToken); err != nil { return err } else if aclObj != nil { if !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilitySubmitJob) { @@ -175,6 +176,11 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis } } + if ok, err := registrationsAreAllowed(aclObj, j.srv.State()); !ok || err != nil { + j.logger.Warn("job registration is currently disabled for non-management ACL") + return structs.ErrJobRegistrationDisabled + } + // Lookup the job snap, err := j.srv.State().Snapshot() if err != nil { @@ -1022,6 +1028,11 @@ func (j *Job) Scale(args *structs.JobScaleRequest, reply *structs.JobRegisterRes } } + if ok, err := registrationsAreAllowed(aclObj, j.srv.State()); !ok || err != nil { + j.logger.Warn("job scaling is currently disabled for non-management ACL") + return structs.ErrJobRegistrationDisabled + } + // Validate args err = args.Validate() if err != nil { @@ -1304,6 +1315,22 @@ func allowedNSes(aclObj *acl.ACL, state *state.StateStore, allow func(ns string) return r, nil } +// registrationsAreAllowed checks that the scheduler is not in +// RejectJobRegistration mode for load-shedding. +func registrationsAreAllowed(aclObj *acl.ACL, state *state.StateStore) (bool, error) { + _, cfg, err := state.SchedulerConfig() + if err != nil { + return false, err + } + if cfg != nil && !cfg.RejectJobRegistration { + return true, nil + } + if aclObj != nil && aclObj.IsManagement() { + return true, nil + } + return false, nil +} + // List is used to list the jobs registered in the system func (j *Job) List(args *structs.JobListRequest, reply *structs.JobListResponse) error { if done, err := j.srv.forward("Job.List", args, args, reply); done { @@ -1852,12 +1879,19 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa defer metrics.MeasureSince([]string{"nomad", "job", "dispatch"}, time.Now()) // Check for submit-job permissions - if aclObj, err := j.srv.ResolveToken(args.AuthToken); err != nil { + var aclObj *acl.ACL + var err error + if aclObj, err = j.srv.ResolveToken(args.AuthToken); err != nil { return err } else if aclObj != nil && !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityDispatchJob) { return structs.ErrPermissionDenied } + if ok, err := registrationsAreAllowed(aclObj, j.srv.State()); !ok || err != nil { + j.logger.Warn("job dispatch is currently disabled for non-management ACL") + return structs.ErrJobRegistrationDisabled + } + // Lookup the parameterized job if args.JobID == "" { return fmt.Errorf("missing parameterized job ID") diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index a3e12332f311..9fce33ca7e2a 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -2248,6 +2248,100 @@ func TestJobEndpoint_Register_ACL_Namespace(t *testing.T) { assert.NotNil(out, "expected job") } +func TestJobRegister_ACL_RejectedBySchedulerConfig(t *testing.T) { + t.Parallel() + s1, root, cleanupS1 := TestACLServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + submitJobToken := mock.CreatePolicyAndToken(t, s1.State(), 1001, "test-valid-write", + mock.NamespacePolicy(structs.DefaultNamespace, "write", nil)). + SecretID + + cases := []struct { + name string + token string + rejectEnabled bool + errExpected string + }{ + { + name: "reject disabled, with a submit token", + token: submitJobToken, + rejectEnabled: false, + }, + { + name: "reject enabled, with a submit token", + token: submitJobToken, + rejectEnabled: true, + errExpected: structs.ErrJobRegistrationDisabled.Error(), + }, + { + name: "reject enabled, without a token", + token: "", + rejectEnabled: true, + errExpected: structs.ErrPermissionDenied.Error(), + }, + { + name: "reject enabled, with a management token", + token: root.SecretID, + rejectEnabled: true, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + job := mock.Job() + req := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + req.AuthToken = tc.token + + cfgReq := &structs.SchedulerSetConfigRequest{ + Config: structs.SchedulerConfiguration{ + RejectJobRegistration: tc.rejectEnabled, + }, + WriteRequest: structs.WriteRequest{ + Region: "global", + }, + } + cfgReq.AuthToken = root.SecretID + err := msgpackrpc.CallWithCodec(codec, "Operator.SchedulerSetConfiguration", + cfgReq, &structs.SchedulerSetConfigurationResponse{}, + ) + require.NoError(t, err) + + var resp structs.JobRegisterResponse + err = msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp) + + if tc.errExpected != "" { + require.Error(t, err, "expected error") + require.EqualError(t, err, tc.errExpected) + state := s1.fsm.State() + ws := memdb.NewWatchSet() + out, err := state.JobByID(ws, job.Namespace, job.ID) + require.NoError(t, err) + require.Nil(t, out) + } else { + require.NoError(t, err, "unexpected error") + require.NotEqual(t, 0, resp.Index) + state := s1.fsm.State() + ws := memdb.NewWatchSet() + out, err := state.JobByID(ws, job.Namespace, job.ID) + require.NoError(t, err) + require.NotNil(t, out) + require.Equal(t, job.TaskGroups, out.TaskGroups) + } + }) + } +} + func TestJobEndpoint_Revert(t *testing.T) { t.Parallel() @@ -6646,6 +6740,95 @@ func TestJobEndpoint_Dispatch_JobChildrenSummary(t *testing.T) { require.Equal(t, structs.JobStatusDead, dispatchedStatus()) } +func TestJobEndpoint_Dispatch_ACL_RejectedBySchedulerConfig(t *testing.T) { + t.Parallel() + s1, root, cleanupS1 := TestACLServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + state := s1.fsm.State() + + job := mock.BatchJob() + job.ParameterizedJob = &structs.ParameterizedJobConfig{} + + err := state.UpsertJob(structs.MsgTypeTestSetup, 1000, job) + require.NoError(t, err) + + dispatch := &structs.JobDispatchRequest{ + JobID: job.ID, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + + submitJobToken := mock.CreatePolicyAndToken(t, state, 1001, "test-valid-write", + mock.NamespacePolicy(structs.DefaultNamespace, "write", nil)). + SecretID + + cases := []struct { + name string + token string + rejectEnabled bool + errExpected string + }{ + { + name: "reject disabled, with a submit token", + token: submitJobToken, + rejectEnabled: false, + }, + { + name: "reject enabled, with a submit token", + token: submitJobToken, + rejectEnabled: true, + errExpected: structs.ErrJobRegistrationDisabled.Error(), + }, + { + name: "reject enabled, without a token", + token: "", + rejectEnabled: true, + errExpected: structs.ErrPermissionDenied.Error(), + }, + { + name: "reject enabled, with a management token", + token: root.SecretID, + rejectEnabled: true, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + + cfgReq := &structs.SchedulerSetConfigRequest{ + Config: structs.SchedulerConfiguration{ + RejectJobRegistration: tc.rejectEnabled, + }, + WriteRequest: structs.WriteRequest{ + Region: "global", + }, + } + cfgReq.AuthToken = root.SecretID + err := msgpackrpc.CallWithCodec(codec, "Operator.SchedulerSetConfiguration", + cfgReq, &structs.SchedulerSetConfigurationResponse{}, + ) + require.NoError(t, err) + + dispatch.AuthToken = tc.token + var resp structs.JobDispatchResponse + err = msgpackrpc.CallWithCodec(codec, "Job.Dispatch", dispatch, &resp) + + if tc.errExpected != "" { + require.Error(t, err, "expected error") + require.EqualError(t, err, tc.errExpected) + } else { + require.NoError(t, err, "unexpected error") + require.NotEqual(t, 0, resp.Index) + } + }) + } + +} + func TestJobEndpoint_Scale(t *testing.T) { t.Parallel() require := require.New(t) @@ -6934,6 +7117,97 @@ func TestJobEndpoint_Scale_ACL(t *testing.T) { } +func TestJobEndpoint_Scale_ACL_RejectedBySchedulerConfig(t *testing.T) { + t.Parallel() + s1, root, cleanupS1 := TestACLServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + state := s1.fsm.State() + + job := mock.Job() + err := state.UpsertJob(structs.MsgTypeTestSetup, 1000, job) + require.NoError(t, err) + + scale := &structs.JobScaleRequest{ + JobID: job.ID, + Target: map[string]string{ + structs.ScalingTargetGroup: job.TaskGroups[0].Name, + }, + Message: "because of the load", + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + + submitJobToken := mock.CreatePolicyAndToken(t, state, 1001, "test-valid-write", + mock.NamespacePolicy(structs.DefaultNamespace, "write", nil)). + SecretID + + cases := []struct { + name string + token string + rejectEnabled bool + errExpected string + }{ + { + name: "reject disabled, with a submit token", + token: submitJobToken, + rejectEnabled: false, + }, + { + name: "reject enabled, with a submit token", + token: submitJobToken, + rejectEnabled: true, + errExpected: structs.ErrJobRegistrationDisabled.Error(), + }, + { + name: "reject enabled, without a token", + token: "", + rejectEnabled: true, + errExpected: structs.ErrPermissionDenied.Error(), + }, + { + name: "reject enabled, with a management token", + token: root.SecretID, + rejectEnabled: true, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + + cfgReq := &structs.SchedulerSetConfigRequest{ + Config: structs.SchedulerConfiguration{ + RejectJobRegistration: tc.rejectEnabled, + }, + WriteRequest: structs.WriteRequest{ + Region: "global", + }, + } + cfgReq.AuthToken = root.SecretID + err := msgpackrpc.CallWithCodec(codec, "Operator.SchedulerSetConfiguration", + cfgReq, &structs.SchedulerSetConfigurationResponse{}, + ) + require.NoError(t, err) + + var resp structs.JobRegisterResponse + scale.AuthToken = tc.token + err = msgpackrpc.CallWithCodec(codec, "Job.Scale", scale, &resp) + + if tc.errExpected != "" { + require.Error(t, err, "expected error") + require.EqualError(t, err, tc.errExpected) + } else { + require.NoError(t, err, "unexpected error") + require.NotEqual(t, 0, resp.Index) + } + }) + } + +} + func TestJobEndpoint_Scale_Invalid(t *testing.T) { t.Parallel() require := require.New(t) diff --git a/nomad/structs/errors.go b/nomad/structs/errors.go index 7a71708f909b..10f8fac1cd22 100644 --- a/nomad/structs/errors.go +++ b/nomad/structs/errors.go @@ -13,6 +13,7 @@ const ( errNoRegionPath = "No path to region" errTokenNotFound = "ACL token not found" errPermissionDenied = "Permission denied" + errJobRegistrationDisabled = "Job registration, dispatch, and scale are disabled by the scheduler configuration" errNoNodeConn = "No path to node" errUnknownMethod = "Unknown rpc method" errUnknownNomadVersion = "Unable to determine Nomad version" @@ -46,6 +47,7 @@ var ( ErrNoRegionPath = errors.New(errNoRegionPath) ErrTokenNotFound = errors.New(errTokenNotFound) ErrPermissionDenied = errors.New(errPermissionDenied) + ErrJobRegistrationDisabled = errors.New(errJobRegistrationDisabled) ErrNoNodeConn = errors.New(errNoNodeConn) ErrUnknownMethod = errors.New(errUnknownMethod) ErrUnknownNomadVersion = errors.New(errUnknownNomadVersion) diff --git a/nomad/structs/operator.go b/nomad/structs/operator.go index d39b1a31ec3f..633afa6c33eb 100644 --- a/nomad/structs/operator.go +++ b/nomad/structs/operator.go @@ -149,8 +149,13 @@ type SchedulerConfiguration struct { // priority jobs to place higher priority jobs. PreemptionConfig PreemptionConfig `hcl:"preemption_config"` + // MemoryOversubscriptionEnabled specifies whether memory oversubscription is enabled MemoryOversubscriptionEnabled bool `hcl:"memory_oversubscription_enabled"` + // RejectJobRegistration disables new job registrations except with a + // management ACL token + RejectJobRegistration bool `hcl:"reject_job_registration"` + // CreateIndex/ModifyIndex store the create/modify indexes of this configuration. CreateIndex uint64 ModifyIndex uint64 diff --git a/website/content/api-docs/operator/scheduler.mdx b/website/content/api-docs/operator/scheduler.mdx index cb44b2d0f3d2..899bcadc004f 100644 --- a/website/content/api-docs/operator/scheduler.mdx +++ b/website/content/api-docs/operator/scheduler.mdx @@ -45,6 +45,7 @@ $ curl \ "ModifyIndex": 5, "SchedulerAlgorithm": "spread", "MemoryOversubscriptionEnabled": true, + "RejectJobRegistration": false, "PreemptionConfig": { "SystemSchedulerEnabled": true, "BatchSchedulerEnabled": false, @@ -114,6 +115,7 @@ server state is authoritative. { "SchedulerAlgorithm": "spread", "MemoryOversubscriptionEnabled": false, + "RejectJobRegistration": false, "PreemptionConfig": { "SystemSchedulerEnabled": true, "BatchSchedulerEnabled": false, @@ -128,6 +130,8 @@ server state is authoritative. - `MemoryOversubscriptionEnabled` `(bool: false)` 1.1 Beta - When `true`, tasks may exceed their reserved memory limit, if the client has excess memory capacity. Tasks must specify [`memory_max`](/docs/job-specification/resources#memory_max) to take advantage of memory oversubscription. +- `RejectJobRegistration` `(bool: false)` - When `true`, the server will return permission denied errors for job registration, job dispatch, and job scale APIs, unless the ACL token for the request is a management token. If ACLs are disabled, no user will be able to register jobs. This allows operators to shed load from automated proceses during incident response. + - `PreemptionConfig` `(PreemptionConfig)` - Options to enable preemption for various schedulers. diff --git a/website/content/docs/configuration/server.mdx b/website/content/docs/configuration/server.mdx index 8e6eeaa43c57..dc7163b7c261 100644 --- a/website/content/docs/configuration/server.mdx +++ b/website/content/docs/configuration/server.mdx @@ -312,6 +312,8 @@ server { memory_oversubscription_enabled = true + reject_job_registration = false + preemption_config { batch_scheduler_enabled = true system_scheduler_enabled = true