From 44c4ae7bdd9ea92826c0401c4bbd2f1b3050bdaa Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Thu, 2 Dec 2021 15:01:32 -0500 Subject: [PATCH] scheduler: config option to reject job registration During incident response, operators may find that automated processes elsewhere in the organization can be generating new workloads on Nomad clusters that are unable to handle the workload. This changeset adds a field to the `SchedulerConfiguration` API that causes all job registration calls to be rejected unless the request has a management ACL token. --- .changelog/11610.txt | 3 + api/operator.go | 4 + command/agent/http.go | 6 + command/agent/operator_endpoint.go | 1 + nomad/job_endpoint.go | 39 ++- nomad/job_endpoint_test.go | 274 ++++++++++++++++++ nomad/structs/errors.go | 2 + nomad/structs/operator.go | 5 + .../content/api-docs/operator/scheduler.mdx | 4 + website/content/docs/configuration/server.mdx | 2 + 10 files changed, 338 insertions(+), 2 deletions(-) create mode 100644 .changelog/11610.txt diff --git a/.changelog/11610.txt b/.changelog/11610.txt new file mode 100644 index 000000000000..1cf42c7212a3 --- /dev/null +++ b/.changelog/11610.txt @@ -0,0 +1,3 @@ +```release-note:improvement +scheduler: Added a `RejectJobRegistration` field to the scheduler configuration API that enabled a setting to reject job register, dispatch, and scale requests without a management ACL token +``` diff --git a/api/operator.go b/api/operator.go index 7adf79f1cbf2..439fabe052e2 100644 --- a/api/operator.go +++ b/api/operator.go @@ -129,6 +129,10 @@ type SchedulerConfiguration struct { // MemoryOversubscriptionEnabled specifies whether memory oversubscription is enabled MemoryOversubscriptionEnabled bool + // RejectJobRegistration disables new job registrations except with a + // management ACL token + RejectJobRegistration bool + // CreateIndex/ModifyIndex store the create/modify indexes of this configuration. CreateIndex uint64 ModifyIndex uint64 diff --git a/command/agent/http.go b/command/agent/http.go index 6371873d0e60..f56215f72303 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -451,6 +451,9 @@ func errCodeFromHandler(err error) (int, string) { } else if strings.HasSuffix(errMsg, structs.ErrTokenNotFound.Error()) { errMsg = structs.ErrTokenNotFound.Error() code = 403 + } else if strings.HasSuffix(errMsg, structs.ErrJobRegistrationDisabled.Error()) { + errMsg = structs.ErrJobRegistrationDisabled.Error() + code = 403 } } @@ -487,6 +490,9 @@ func (s *HTTPServer) wrap(handler func(resp http.ResponseWriter, req *http.Reque } else if strings.HasSuffix(errMsg, structs.ErrTokenNotFound.Error()) { errMsg = structs.ErrTokenNotFound.Error() code = 403 + } else if strings.HasSuffix(errMsg, structs.ErrJobRegistrationDisabled.Error()) { + errMsg = structs.ErrJobRegistrationDisabled.Error() + code = 403 } } diff --git a/command/agent/operator_endpoint.go b/command/agent/operator_endpoint.go index 78da1fd2c220..a0f7575f4c6d 100644 --- a/command/agent/operator_endpoint.go +++ b/command/agent/operator_endpoint.go @@ -261,6 +261,7 @@ func (s *HTTPServer) schedulerUpdateConfig(resp http.ResponseWriter, req *http.R args.Config = structs.SchedulerConfiguration{ SchedulerAlgorithm: structs.SchedulerAlgorithm(conf.SchedulerAlgorithm), MemoryOversubscriptionEnabled: conf.MemoryOversubscriptionEnabled, + RejectJobRegistration: conf.RejectJobRegistration, PreemptionConfig: structs.PreemptionConfig{ SystemSchedulerEnabled: conf.PreemptionConfig.SystemSchedulerEnabled, SysBatchSchedulerEnabled: conf.PreemptionConfig.SysBatchSchedulerEnabled, diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 59b5dd106f8b..e584b4f5a6d7 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -114,7 +114,8 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis reply.Warnings = structs.MergeMultierrorWarnings(warnings...) // Check job submission permissions - if aclObj, err := j.srv.ResolveToken(args.AuthToken); err != nil { + var aclObj *acl.ACL + if aclObj, err = j.srv.ResolveToken(args.AuthToken); err != nil { return err } else if aclObj != nil { if !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilitySubmitJob) { @@ -175,11 +176,17 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis } } + if ok, err := allowedRegistration(aclObj, j.srv.State()); !ok || err != nil { + j.logger.Warn("job registration for non-management ACL rejected") + return structs.ErrJobRegistrationDisabled + } + // Lookup the job snap, err := j.srv.State().Snapshot() if err != nil { return err } + ws := memdb.NewWatchSet() existingJob, err := snap.JobByID(ws, args.RequestNamespace(), args.Job.ID) if err != nil { @@ -1022,6 +1029,11 @@ func (j *Job) Scale(args *structs.JobScaleRequest, reply *structs.JobRegisterRes } } + if ok, err := allowedRegistration(aclObj, j.srv.State()); !ok || err != nil { + j.logger.Warn("job scaling for non-management ACL rejected") + return structs.ErrJobRegistrationDisabled + } + // Validate args err = args.Validate() if err != nil { @@ -1304,6 +1316,22 @@ func allowedNSes(aclObj *acl.ACL, state *state.StateStore, allow func(ns string) return r, nil } +// allowedRegistration checks that the scheduler is not in +// RejectJobRegistration mode for load-shedding. +func allowedRegistration(aclObj *acl.ACL, state *state.StateStore) (bool, error) { + _, cfg, err := state.SchedulerConfig() + if err != nil { + return false, err + } + if cfg != nil && !cfg.RejectJobRegistration { + return true, nil + } + if aclObj != nil && aclObj.IsManagement() { + return true, nil + } + return false, nil +} + // List is used to list the jobs registered in the system func (j *Job) List(args *structs.JobListRequest, reply *structs.JobListResponse) error { if done, err := j.srv.forward("Job.List", args, args, reply); done { @@ -1852,12 +1880,19 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa defer metrics.MeasureSince([]string{"nomad", "job", "dispatch"}, time.Now()) // Check for submit-job permissions - if aclObj, err := j.srv.ResolveToken(args.AuthToken); err != nil { + var aclObj *acl.ACL + var err error + if aclObj, err = j.srv.ResolveToken(args.AuthToken); err != nil { return err } else if aclObj != nil && !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityDispatchJob) { return structs.ErrPermissionDenied } + if ok, err := allowedRegistration(aclObj, j.srv.State()); !ok || err != nil { + j.logger.Warn("job dispatch for non-management ACL rejected") + return structs.ErrJobRegistrationDisabled + } + // Lookup the parameterized job if args.JobID == "" { return fmt.Errorf("missing parameterized job ID") diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index a3e12332f311..9fce33ca7e2a 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -2248,6 +2248,100 @@ func TestJobEndpoint_Register_ACL_Namespace(t *testing.T) { assert.NotNil(out, "expected job") } +func TestJobRegister_ACL_RejectedBySchedulerConfig(t *testing.T) { + t.Parallel() + s1, root, cleanupS1 := TestACLServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + submitJobToken := mock.CreatePolicyAndToken(t, s1.State(), 1001, "test-valid-write", + mock.NamespacePolicy(structs.DefaultNamespace, "write", nil)). + SecretID + + cases := []struct { + name string + token string + rejectEnabled bool + errExpected string + }{ + { + name: "reject disabled, with a submit token", + token: submitJobToken, + rejectEnabled: false, + }, + { + name: "reject enabled, with a submit token", + token: submitJobToken, + rejectEnabled: true, + errExpected: structs.ErrJobRegistrationDisabled.Error(), + }, + { + name: "reject enabled, without a token", + token: "", + rejectEnabled: true, + errExpected: structs.ErrPermissionDenied.Error(), + }, + { + name: "reject enabled, with a management token", + token: root.SecretID, + rejectEnabled: true, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + job := mock.Job() + req := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + req.AuthToken = tc.token + + cfgReq := &structs.SchedulerSetConfigRequest{ + Config: structs.SchedulerConfiguration{ + RejectJobRegistration: tc.rejectEnabled, + }, + WriteRequest: structs.WriteRequest{ + Region: "global", + }, + } + cfgReq.AuthToken = root.SecretID + err := msgpackrpc.CallWithCodec(codec, "Operator.SchedulerSetConfiguration", + cfgReq, &structs.SchedulerSetConfigurationResponse{}, + ) + require.NoError(t, err) + + var resp structs.JobRegisterResponse + err = msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp) + + if tc.errExpected != "" { + require.Error(t, err, "expected error") + require.EqualError(t, err, tc.errExpected) + state := s1.fsm.State() + ws := memdb.NewWatchSet() + out, err := state.JobByID(ws, job.Namespace, job.ID) + require.NoError(t, err) + require.Nil(t, out) + } else { + require.NoError(t, err, "unexpected error") + require.NotEqual(t, 0, resp.Index) + state := s1.fsm.State() + ws := memdb.NewWatchSet() + out, err := state.JobByID(ws, job.Namespace, job.ID) + require.NoError(t, err) + require.NotNil(t, out) + require.Equal(t, job.TaskGroups, out.TaskGroups) + } + }) + } +} + func TestJobEndpoint_Revert(t *testing.T) { t.Parallel() @@ -6646,6 +6740,95 @@ func TestJobEndpoint_Dispatch_JobChildrenSummary(t *testing.T) { require.Equal(t, structs.JobStatusDead, dispatchedStatus()) } +func TestJobEndpoint_Dispatch_ACL_RejectedBySchedulerConfig(t *testing.T) { + t.Parallel() + s1, root, cleanupS1 := TestACLServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + state := s1.fsm.State() + + job := mock.BatchJob() + job.ParameterizedJob = &structs.ParameterizedJobConfig{} + + err := state.UpsertJob(structs.MsgTypeTestSetup, 1000, job) + require.NoError(t, err) + + dispatch := &structs.JobDispatchRequest{ + JobID: job.ID, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + + submitJobToken := mock.CreatePolicyAndToken(t, state, 1001, "test-valid-write", + mock.NamespacePolicy(structs.DefaultNamespace, "write", nil)). + SecretID + + cases := []struct { + name string + token string + rejectEnabled bool + errExpected string + }{ + { + name: "reject disabled, with a submit token", + token: submitJobToken, + rejectEnabled: false, + }, + { + name: "reject enabled, with a submit token", + token: submitJobToken, + rejectEnabled: true, + errExpected: structs.ErrJobRegistrationDisabled.Error(), + }, + { + name: "reject enabled, without a token", + token: "", + rejectEnabled: true, + errExpected: structs.ErrPermissionDenied.Error(), + }, + { + name: "reject enabled, with a management token", + token: root.SecretID, + rejectEnabled: true, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + + cfgReq := &structs.SchedulerSetConfigRequest{ + Config: structs.SchedulerConfiguration{ + RejectJobRegistration: tc.rejectEnabled, + }, + WriteRequest: structs.WriteRequest{ + Region: "global", + }, + } + cfgReq.AuthToken = root.SecretID + err := msgpackrpc.CallWithCodec(codec, "Operator.SchedulerSetConfiguration", + cfgReq, &structs.SchedulerSetConfigurationResponse{}, + ) + require.NoError(t, err) + + dispatch.AuthToken = tc.token + var resp structs.JobDispatchResponse + err = msgpackrpc.CallWithCodec(codec, "Job.Dispatch", dispatch, &resp) + + if tc.errExpected != "" { + require.Error(t, err, "expected error") + require.EqualError(t, err, tc.errExpected) + } else { + require.NoError(t, err, "unexpected error") + require.NotEqual(t, 0, resp.Index) + } + }) + } + +} + func TestJobEndpoint_Scale(t *testing.T) { t.Parallel() require := require.New(t) @@ -6934,6 +7117,97 @@ func TestJobEndpoint_Scale_ACL(t *testing.T) { } +func TestJobEndpoint_Scale_ACL_RejectedBySchedulerConfig(t *testing.T) { + t.Parallel() + s1, root, cleanupS1 := TestACLServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + state := s1.fsm.State() + + job := mock.Job() + err := state.UpsertJob(structs.MsgTypeTestSetup, 1000, job) + require.NoError(t, err) + + scale := &structs.JobScaleRequest{ + JobID: job.ID, + Target: map[string]string{ + structs.ScalingTargetGroup: job.TaskGroups[0].Name, + }, + Message: "because of the load", + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + + submitJobToken := mock.CreatePolicyAndToken(t, state, 1001, "test-valid-write", + mock.NamespacePolicy(structs.DefaultNamespace, "write", nil)). + SecretID + + cases := []struct { + name string + token string + rejectEnabled bool + errExpected string + }{ + { + name: "reject disabled, with a submit token", + token: submitJobToken, + rejectEnabled: false, + }, + { + name: "reject enabled, with a submit token", + token: submitJobToken, + rejectEnabled: true, + errExpected: structs.ErrJobRegistrationDisabled.Error(), + }, + { + name: "reject enabled, without a token", + token: "", + rejectEnabled: true, + errExpected: structs.ErrPermissionDenied.Error(), + }, + { + name: "reject enabled, with a management token", + token: root.SecretID, + rejectEnabled: true, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + + cfgReq := &structs.SchedulerSetConfigRequest{ + Config: structs.SchedulerConfiguration{ + RejectJobRegistration: tc.rejectEnabled, + }, + WriteRequest: structs.WriteRequest{ + Region: "global", + }, + } + cfgReq.AuthToken = root.SecretID + err := msgpackrpc.CallWithCodec(codec, "Operator.SchedulerSetConfiguration", + cfgReq, &structs.SchedulerSetConfigurationResponse{}, + ) + require.NoError(t, err) + + var resp structs.JobRegisterResponse + scale.AuthToken = tc.token + err = msgpackrpc.CallWithCodec(codec, "Job.Scale", scale, &resp) + + if tc.errExpected != "" { + require.Error(t, err, "expected error") + require.EqualError(t, err, tc.errExpected) + } else { + require.NoError(t, err, "unexpected error") + require.NotEqual(t, 0, resp.Index) + } + }) + } + +} + func TestJobEndpoint_Scale_Invalid(t *testing.T) { t.Parallel() require := require.New(t) diff --git a/nomad/structs/errors.go b/nomad/structs/errors.go index 7a71708f909b..10f8fac1cd22 100644 --- a/nomad/structs/errors.go +++ b/nomad/structs/errors.go @@ -13,6 +13,7 @@ const ( errNoRegionPath = "No path to region" errTokenNotFound = "ACL token not found" errPermissionDenied = "Permission denied" + errJobRegistrationDisabled = "Job registration, dispatch, and scale are disabled by the scheduler configuration" errNoNodeConn = "No path to node" errUnknownMethod = "Unknown rpc method" errUnknownNomadVersion = "Unable to determine Nomad version" @@ -46,6 +47,7 @@ var ( ErrNoRegionPath = errors.New(errNoRegionPath) ErrTokenNotFound = errors.New(errTokenNotFound) ErrPermissionDenied = errors.New(errPermissionDenied) + ErrJobRegistrationDisabled = errors.New(errJobRegistrationDisabled) ErrNoNodeConn = errors.New(errNoNodeConn) ErrUnknownMethod = errors.New(errUnknownMethod) ErrUnknownNomadVersion = errors.New(errUnknownNomadVersion) diff --git a/nomad/structs/operator.go b/nomad/structs/operator.go index d39b1a31ec3f..633afa6c33eb 100644 --- a/nomad/structs/operator.go +++ b/nomad/structs/operator.go @@ -149,8 +149,13 @@ type SchedulerConfiguration struct { // priority jobs to place higher priority jobs. PreemptionConfig PreemptionConfig `hcl:"preemption_config"` + // MemoryOversubscriptionEnabled specifies whether memory oversubscription is enabled MemoryOversubscriptionEnabled bool `hcl:"memory_oversubscription_enabled"` + // RejectJobRegistration disables new job registrations except with a + // management ACL token + RejectJobRegistration bool `hcl:"reject_job_registration"` + // CreateIndex/ModifyIndex store the create/modify indexes of this configuration. CreateIndex uint64 ModifyIndex uint64 diff --git a/website/content/api-docs/operator/scheduler.mdx b/website/content/api-docs/operator/scheduler.mdx index cb44b2d0f3d2..899bcadc004f 100644 --- a/website/content/api-docs/operator/scheduler.mdx +++ b/website/content/api-docs/operator/scheduler.mdx @@ -45,6 +45,7 @@ $ curl \ "ModifyIndex": 5, "SchedulerAlgorithm": "spread", "MemoryOversubscriptionEnabled": true, + "RejectJobRegistration": false, "PreemptionConfig": { "SystemSchedulerEnabled": true, "BatchSchedulerEnabled": false, @@ -114,6 +115,7 @@ server state is authoritative. { "SchedulerAlgorithm": "spread", "MemoryOversubscriptionEnabled": false, + "RejectJobRegistration": false, "PreemptionConfig": { "SystemSchedulerEnabled": true, "BatchSchedulerEnabled": false, @@ -128,6 +130,8 @@ server state is authoritative. - `MemoryOversubscriptionEnabled` `(bool: false)` 1.1 Beta - When `true`, tasks may exceed their reserved memory limit, if the client has excess memory capacity. Tasks must specify [`memory_max`](/docs/job-specification/resources#memory_max) to take advantage of memory oversubscription. +- `RejectJobRegistration` `(bool: false)` - When `true`, the server will return permission denied errors for job registration, job dispatch, and job scale APIs, unless the ACL token for the request is a management token. If ACLs are disabled, no user will be able to register jobs. This allows operators to shed load from automated proceses during incident response. + - `PreemptionConfig` `(PreemptionConfig)` - Options to enable preemption for various schedulers. diff --git a/website/content/docs/configuration/server.mdx b/website/content/docs/configuration/server.mdx index 8e6eeaa43c57..dc7163b7c261 100644 --- a/website/content/docs/configuration/server.mdx +++ b/website/content/docs/configuration/server.mdx @@ -312,6 +312,8 @@ server { memory_oversubscription_enabled = true + reject_job_registration = false + preemption_config { batch_scheduler_enabled = true system_scheduler_enabled = true