From c1e9b5d30961c2212198a3e5cd1d8943b8d7c5fb Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Thu, 2 Dec 2021 15:01:32 -0500 Subject: [PATCH] scheduler: config option to reject job registration During incident response, operators may find that automated processes elsewhere in the organization can be generating new workloads on Nomad clusters that are unable to handle the workload. This changeset adds a field to the `SchedulerConfiguration` API that causes all job registration calls to be rejected unless the request has a management ACL token. --- api/operator.go | 4 + command/agent/operator_endpoint.go | 1 + nomad/job_endpoint.go | 39 +++++- nomad/job_endpoint_test.go | 189 +++++++++++++++++++++++++++++ nomad/structs/operator.go | 5 + 5 files changed, 236 insertions(+), 2 deletions(-) diff --git a/api/operator.go b/api/operator.go index 7adf79f1cbf2..439fabe052e2 100644 --- a/api/operator.go +++ b/api/operator.go @@ -129,6 +129,10 @@ type SchedulerConfiguration struct { // MemoryOversubscriptionEnabled specifies whether memory oversubscription is enabled MemoryOversubscriptionEnabled bool + // RejectJobRegistration disables new job registrations except with a + // management ACL token + RejectJobRegistration bool + // CreateIndex/ModifyIndex store the create/modify indexes of this configuration. CreateIndex uint64 ModifyIndex uint64 diff --git a/command/agent/operator_endpoint.go b/command/agent/operator_endpoint.go index 78da1fd2c220..a0f7575f4c6d 100644 --- a/command/agent/operator_endpoint.go +++ b/command/agent/operator_endpoint.go @@ -261,6 +261,7 @@ func (s *HTTPServer) schedulerUpdateConfig(resp http.ResponseWriter, req *http.R args.Config = structs.SchedulerConfiguration{ SchedulerAlgorithm: structs.SchedulerAlgorithm(conf.SchedulerAlgorithm), MemoryOversubscriptionEnabled: conf.MemoryOversubscriptionEnabled, + RejectJobRegistration: conf.RejectJobRegistration, PreemptionConfig: structs.PreemptionConfig{ SystemSchedulerEnabled: conf.PreemptionConfig.SystemSchedulerEnabled, SysBatchSchedulerEnabled: conf.PreemptionConfig.SysBatchSchedulerEnabled, diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 59b5dd106f8b..9613f136682a 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -114,7 +114,8 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis reply.Warnings = structs.MergeMultierrorWarnings(warnings...) // Check job submission permissions - if aclObj, err := j.srv.ResolveToken(args.AuthToken); err != nil { + var aclObj *acl.ACL + if aclObj, err = j.srv.ResolveToken(args.AuthToken); err != nil { return err } else if aclObj != nil { if !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilitySubmitJob) { @@ -175,11 +176,17 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis } } + if ok, err := allowedRegistration(aclObj, j.srv.State()); !ok || err != nil { + j.logger.Warn("job registration for non-management ACL rejected") + return structs.ErrPermissionDenied + } + // Lookup the job snap, err := j.srv.State().Snapshot() if err != nil { return err } + ws := memdb.NewWatchSet() existingJob, err := snap.JobByID(ws, args.RequestNamespace(), args.Job.ID) if err != nil { @@ -1022,6 +1029,11 @@ func (j *Job) Scale(args *structs.JobScaleRequest, reply *structs.JobRegisterRes } } + if ok, err := allowedRegistration(aclObj, j.srv.State()); !ok || err != nil { + j.logger.Warn("job scaling for non-management ACL rejected") + return structs.ErrPermissionDenied + } + // Validate args err = args.Validate() if err != nil { @@ -1304,6 +1316,22 @@ func allowedNSes(aclObj *acl.ACL, state *state.StateStore, allow func(ns string) return r, nil } +// allowedRegistration checks that the scheduler is not in +// RejectJobRegistration mode for load-shedding. +func allowedRegistration(aclObj *acl.ACL, state *state.StateStore) (bool, error) { + _, cfg, err := state.SchedulerConfig() + if err != nil { + return false, err + } + if cfg != nil && !cfg.RejectJobRegistration { + return true, nil + } + if aclObj != nil && aclObj.IsManagement() { + return true, nil + } + return false, nil +} + // List is used to list the jobs registered in the system func (j *Job) List(args *structs.JobListRequest, reply *structs.JobListResponse) error { if done, err := j.srv.forward("Job.List", args, args, reply); done { @@ -1852,12 +1880,19 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa defer metrics.MeasureSince([]string{"nomad", "job", "dispatch"}, time.Now()) // Check for submit-job permissions - if aclObj, err := j.srv.ResolveToken(args.AuthToken); err != nil { + var aclObj *acl.ACL + var err error + if aclObj, err = j.srv.ResolveToken(args.AuthToken); err != nil { return err } else if aclObj != nil && !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityDispatchJob) { return structs.ErrPermissionDenied } + if ok, err := allowedRegistration(aclObj, j.srv.State()); !ok || err != nil { + j.logger.Warn("job dispatch for non-management ACL rejected") + return structs.ErrPermissionDenied + } + // Lookup the parameterized job if args.JobID == "" { return fmt.Errorf("missing parameterized job ID") diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index a3e12332f311..3b4aa48fabed 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -2248,6 +2248,102 @@ func TestJobEndpoint_Register_ACL_Namespace(t *testing.T) { assert.NotNil(out, "expected job") } +func TestJobRegister_ACL_RejectedBySchedulerConfig(t *testing.T) { + t.Parallel() + s1, root, cleanupS1 := TestACLServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + submitJobToken := mock.CreatePolicyAndToken(t, s1.State(), 1001, "test-valid-write", + mock.NamespacePolicy(structs.DefaultNamespace, "write", nil)). + SecretID + + cases := []struct { + name string + token string + rejectEnabled bool + errExpected bool + }{ + { + name: "reject disabled, with a submit token", + token: submitJobToken, + rejectEnabled: false, + errExpected: false, + }, + { + name: "reject enabled, with a submit token", + token: submitJobToken, + rejectEnabled: true, + errExpected: true, + }, + { + name: "reject enabled, without a token", + token: "", + rejectEnabled: true, + errExpected: true, + }, + { + name: "reject enabled, with a management token", + token: root.SecretID, + rejectEnabled: true, + errExpected: false, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + job := mock.Job() + req := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + req.AuthToken = tc.token + + cfgReq := &structs.SchedulerSetConfigRequest{ + Config: structs.SchedulerConfiguration{ + RejectJobRegistration: tc.rejectEnabled, + }, + WriteRequest: structs.WriteRequest{ + Region: "global", + }, + } + cfgReq.AuthToken = root.SecretID + err := msgpackrpc.CallWithCodec(codec, "Operator.SchedulerSetConfiguration", + cfgReq, &structs.SchedulerSetConfigurationResponse{}, + ) + require.NoError(t, err) + + var resp structs.JobRegisterResponse + err = msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp) + + if tc.errExpected { + require.Error(t, err, "expected error") + require.EqualError(t, err, structs.ErrPermissionDenied.Error()) + state := s1.fsm.State() + ws := memdb.NewWatchSet() + out, err := state.JobByID(ws, job.Namespace, job.ID) + require.NoError(t, err) + require.Nil(t, out) + } else { + require.NoError(t, err, "unexpected error") + require.NotEqual(t, 0, resp.Index) + state := s1.fsm.State() + ws := memdb.NewWatchSet() + out, err := state.JobByID(ws, job.Namespace, job.ID) + require.NoError(t, err) + require.NotNil(t, out) + require.Equal(t, job.TaskGroups, out.TaskGroups) + } + }) + } +} + func TestJobEndpoint_Revert(t *testing.T) { t.Parallel() @@ -6934,6 +7030,99 @@ func TestJobEndpoint_Scale_ACL(t *testing.T) { } +func TestJobEndpoint_Scale_ACL_RejectedBySchedulerConfig(t *testing.T) { + t.Parallel() + s1, root, cleanupS1 := TestACLServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + state := s1.fsm.State() + + job := mock.Job() + err := state.UpsertJob(structs.MsgTypeTestSetup, 1000, job) + require.NoError(t, err) + + scale := &structs.JobScaleRequest{ + JobID: job.ID, + Target: map[string]string{ + structs.ScalingTargetGroup: job.TaskGroups[0].Name, + }, + Message: "because of the load", + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + + submitJobToken := mock.CreatePolicyAndToken(t, state, 1001, "test-valid-write", + mock.NamespacePolicy(structs.DefaultNamespace, "write", nil)). + SecretID + + cases := []struct { + name string + token string + rejectEnabled bool + errExpected bool + }{ + { + name: "reject disabled, with a submit token", + token: submitJobToken, + rejectEnabled: false, + errExpected: false, + }, + { + name: "reject enabled, with a submit token", + token: submitJobToken, + rejectEnabled: true, + errExpected: true, + }, + { + name: "reject enabled, without a token", + token: "", + rejectEnabled: true, + errExpected: true, + }, + { + name: "reject enabled, with a management token", + token: root.SecretID, + rejectEnabled: true, + errExpected: false, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + + cfgReq := &structs.SchedulerSetConfigRequest{ + Config: structs.SchedulerConfiguration{ + RejectJobRegistration: tc.rejectEnabled, + }, + WriteRequest: structs.WriteRequest{ + Region: "global", + }, + } + cfgReq.AuthToken = root.SecretID + err := msgpackrpc.CallWithCodec(codec, "Operator.SchedulerSetConfiguration", + cfgReq, &structs.SchedulerSetConfigurationResponse{}, + ) + require.NoError(t, err) + + var resp structs.JobRegisterResponse + scale.AuthToken = tc.token + err = msgpackrpc.CallWithCodec(codec, "Job.Scale", scale, &resp) + + if tc.errExpected { + require.Error(t, err, "expected error") + require.EqualError(t, err, structs.ErrPermissionDenied.Error()) + } else { + require.NoError(t, err, "unexpected error") + require.NotEqual(t, 0, resp.Index) + } + }) + } + +} + func TestJobEndpoint_Scale_Invalid(t *testing.T) { t.Parallel() require := require.New(t) diff --git a/nomad/structs/operator.go b/nomad/structs/operator.go index d39b1a31ec3f..633afa6c33eb 100644 --- a/nomad/structs/operator.go +++ b/nomad/structs/operator.go @@ -149,8 +149,13 @@ type SchedulerConfiguration struct { // priority jobs to place higher priority jobs. PreemptionConfig PreemptionConfig `hcl:"preemption_config"` + // MemoryOversubscriptionEnabled specifies whether memory oversubscription is enabled MemoryOversubscriptionEnabled bool `hcl:"memory_oversubscription_enabled"` + // RejectJobRegistration disables new job registrations except with a + // management ACL token + RejectJobRegistration bool `hcl:"reject_job_registration"` + // CreateIndex/ModifyIndex store the create/modify indexes of this configuration. CreateIndex uint64 ModifyIndex uint64