diff --git a/api/operator.go b/api/operator.go index 7adf79f1cbf2..439fabe052e2 100644 --- a/api/operator.go +++ b/api/operator.go @@ -129,6 +129,10 @@ type SchedulerConfiguration struct { // MemoryOversubscriptionEnabled specifies whether memory oversubscription is enabled MemoryOversubscriptionEnabled bool + // RejectJobRegistration disables new job registrations except with a + // management ACL token + RejectJobRegistration bool + // CreateIndex/ModifyIndex store the create/modify indexes of this configuration. CreateIndex uint64 ModifyIndex uint64 diff --git a/command/agent/operator_endpoint.go b/command/agent/operator_endpoint.go index 78da1fd2c220..a0f7575f4c6d 100644 --- a/command/agent/operator_endpoint.go +++ b/command/agent/operator_endpoint.go @@ -261,6 +261,7 @@ func (s *HTTPServer) schedulerUpdateConfig(resp http.ResponseWriter, req *http.R args.Config = structs.SchedulerConfiguration{ SchedulerAlgorithm: structs.SchedulerAlgorithm(conf.SchedulerAlgorithm), MemoryOversubscriptionEnabled: conf.MemoryOversubscriptionEnabled, + RejectJobRegistration: conf.RejectJobRegistration, PreemptionConfig: structs.PreemptionConfig{ SystemSchedulerEnabled: conf.PreemptionConfig.SystemSchedulerEnabled, SysBatchSchedulerEnabled: conf.PreemptionConfig.SysBatchSchedulerEnabled, diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 59b5dd106f8b..ded8093a7fb3 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -114,7 +114,8 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis reply.Warnings = structs.MergeMultierrorWarnings(warnings...) // Check job submission permissions - if aclObj, err := j.srv.ResolveToken(args.AuthToken); err != nil { + var aclObj *acl.ACL + if aclObj, err = j.srv.ResolveToken(args.AuthToken); err != nil { return err } else if aclObj != nil { if !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilitySubmitJob) { @@ -175,11 +176,17 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis } } + if ok, err := allowedRegistration(aclObj, j.srv.State()); !ok || err != nil { + j.logger.Warn("job registration for non-management ACL rejected") + return structs.ErrPermissionDenied + } + // Lookup the job snap, err := j.srv.State().Snapshot() if err != nil { return err } + ws := memdb.NewWatchSet() existingJob, err := snap.JobByID(ws, args.RequestNamespace(), args.Job.ID) if err != nil { @@ -1022,6 +1029,11 @@ func (j *Job) Scale(args *structs.JobScaleRequest, reply *structs.JobRegisterRes } } + if ok, err := allowedRegistration(aclObj, j.srv.State()); !ok || err != nil { + j.logger.Warn("job scaling for non-management ACL rejected") + return structs.ErrPermissionDenied + } + // Validate args err = args.Validate() if err != nil { @@ -1304,6 +1316,22 @@ func allowedNSes(aclObj *acl.ACL, state *state.StateStore, allow func(ns string) return r, nil } +// allowedRegistration checks that the scheduler is not in +// RejectJobRegistration mode for load-shedding. +func allowedRegistration(aclObj *acl.ACL, state *state.StateStore) (bool, error) { + _, cfg, err := state.SchedulerConfig() + if err != nil { + return false, err + } + if cfg != nil && !cfg.RejectJobRegistration { + return true, nil + } + if aclObj != nil && aclObj.IsManagement() { + return true, nil + } + return false, nil +} + // List is used to list the jobs registered in the system func (j *Job) List(args *structs.JobListRequest, reply *structs.JobListResponse) error { if done, err := j.srv.forward("Job.List", args, args, reply); done { @@ -1852,12 +1880,19 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa defer metrics.MeasureSince([]string{"nomad", "job", "dispatch"}, time.Now()) // Check for submit-job permissions - if aclObj, err := j.srv.ResolveToken(args.AuthToken); err != nil { + var aclObj *acl.ACL + var err error + if aclObj, err = j.srv.ResolveToken(args.AuthToken); err != nil { return err } else if aclObj != nil && !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityDispatchJob) { return structs.ErrPermissionDenied } + if ok, err := allowedRegistration(aclObj, j.srv.State()); !ok || err != nil { + j.logger.Warn("job dispatch for non-management ACL rejected") + return structs.ErrPermissionDenied + } + // Lookup the parameterized job if args.JobID == "" { return fmt.Errorf("missing parameterized job ID") @@ -1867,6 +1902,7 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa if err != nil { return err } + ws := memdb.NewWatchSet() parameterizedJob, err := snap.JobByID(ws, args.RequestNamespace(), args.JobID) if err != nil { diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index a3e12332f311..8bf013a0c2b2 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -2248,6 +2248,106 @@ func TestJobEndpoint_Register_ACL_Namespace(t *testing.T) { assert.NotNil(out, "expected job") } +func TestJobRegister_ACL_RejectedBySchedulerConfig(t *testing.T) { + + t.Parallel() + + s1, root, cleanupS1 := TestACLServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer cleanupS1() + testutil.WaitForLeader(t, s1.RPC) + + submitJobPolicy := mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob, acl.NamespaceCapabilitySubmitJob}) + + submitJobToken := mock.CreatePolicyAndToken(t, s1.State(), 1001, "test-submit-job", submitJobPolicy) + + cases := []struct { + Name string + Token string + RejectEnabled bool + ErrExpected bool + }{ + { + Name: "reject disabled, with a submit token", + Token: submitJobToken.SecretID, + RejectEnabled: false, + ErrExpected: false, + }, + { + Name: "reject enabled, with a submit token", + Token: submitJobToken.SecretID, + RejectEnabled: true, + ErrExpected: true, + }, + { + Name: "reject enabled, without a token", + Token: "", + RejectEnabled: true, + ErrExpected: true, + }, + { + Name: "reject enabled, with a management token", + Token: root.SecretID, + RejectEnabled: true, + ErrExpected: false, + }, + } + + for _, tc := range cases { + t.Run(tc.Name, func(t *testing.T) { + job := mock.Job() + codec := rpcClient(t, s1) + req := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + req.AuthToken = tc.Token + + cfgReq := &structs.SchedulerSetConfigRequest{ + Config: structs.SchedulerConfiguration{ + RejectJobRegistration: tc.RejectEnabled, + }, + WriteRequest: structs.WriteRequest{ + Region: "global", + }, + } + cfgReq.AuthToken = root.SecretID + err := msgpackrpc.CallWithCodec(codec, "Operator.SchedulerSetConfiguration", + cfgReq, &structs.SchedulerSetConfigurationResponse{}, + ) + require.NoError(t, err) + + var resp structs.JobRegisterResponse + err = msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp) + + if tc.ErrExpected { + require.Error(t, err, "expected error") + require.EqualError(t, err, structs.ErrPermissionDenied.Error()) + // require.True(t, errors.Is(err, structs.ErrPermissionDenied), err.Error()) + //require.ErrorIs(t, err, structs.ErrPermissionDenied) + state := s1.fsm.State() + ws := memdb.NewWatchSet() + out, err := state.JobByID(ws, job.Namespace, job.ID) + require.NoError(t, err) + require.Nil(t, out) + } else { + require.NoError(t, err, "unexpected error") + require.NotEqual(t, 0, resp.Index) + state := s1.fsm.State() + ws := memdb.NewWatchSet() + out, err := state.JobByID(ws, job.Namespace, job.ID) + require.NoError(t, err) + require.NotNil(t, out) + require.Equal(t, job.TaskGroups, out.TaskGroups) + } + }) + } +} + func TestJobEndpoint_Revert(t *testing.T) { t.Parallel() diff --git a/nomad/structs/operator.go b/nomad/structs/operator.go index d39b1a31ec3f..633afa6c33eb 100644 --- a/nomad/structs/operator.go +++ b/nomad/structs/operator.go @@ -149,8 +149,13 @@ type SchedulerConfiguration struct { // priority jobs to place higher priority jobs. PreemptionConfig PreemptionConfig `hcl:"preemption_config"` + // MemoryOversubscriptionEnabled specifies whether memory oversubscription is enabled MemoryOversubscriptionEnabled bool `hcl:"memory_oversubscription_enabled"` + // RejectJobRegistration disables new job registrations except with a + // management ACL token + RejectJobRegistration bool `hcl:"reject_job_registration"` + // CreateIndex/ModifyIndex store the create/modify indexes of this configuration. CreateIndex uint64 ModifyIndex uint64