Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: allow setting and propagation of eval priority on job de/registration #11532

Merged
merged 9 commits into from
Nov 23, 2021
3 changes: 3 additions & 0 deletions .changelog/11532.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
core: allow setting and propagation of eval priority on job de/registration
```
35 changes: 31 additions & 4 deletions api/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ type RegisterOptions struct {
ModifyIndex uint64
PolicyOverride bool
PreserveCounts bool
EvalPriority int
}

// Register is used to register a new job. It returns the ID
Expand All @@ -105,8 +106,8 @@ func (j *Jobs) EnforceRegister(job *Job, modifyIndex uint64, q *WriteOptions) (*
return j.RegisterOpts(job, &opts, q)
}

// Register is used to register a new job. It returns the ID
// of the evaluation, along with any errors encountered.
// RegisterOpts is used to register a new job with the passed RegisterOpts. It
// returns the ID of the evaluation, along with any errors encountered.
func (j *Jobs) RegisterOpts(job *Job, opts *RegisterOptions, q *WriteOptions) (*JobRegisterResponse, *WriteMeta, error) {
// Format the request
req := &JobRegisterRequest{
Expand All @@ -119,6 +120,7 @@ func (j *Jobs) RegisterOpts(job *Job, opts *RegisterOptions, q *WriteOptions) (*
}
req.PolicyOverride = opts.PolicyOverride
req.PreserveCounts = opts.PreserveCounts
req.EvalPriority = opts.EvalPriority
}

var resp JobRegisterResponse
Expand Down Expand Up @@ -290,14 +292,31 @@ type DeregisterOptions struct {
// If Global is set to true, all regions of a multiregion job will be
// stopped.
Global bool

// EvalPriority is an optional priority to use on any evaluation created as
// a result on this job deregistration. This value must be between 1-100
// inclusively, where a larger value corresponds to a higher priority. This
// is useful when an operator wishes to push through a job deregistration
// in busy clusters with a large evaluation backlog.
EvalPriority int
}

// DeregisterOpts is used to remove an existing job. See DeregisterOptions
// for parameters.
func (j *Jobs) DeregisterOpts(jobID string, opts *DeregisterOptions, q *WriteOptions) (string, *WriteMeta, error) {
var resp JobDeregisterResponse
wm, err := j.client.delete(fmt.Sprintf("/v1/job/%v?purge=%t&global=%t",
url.PathEscape(jobID), opts.Purge, opts.Global), &resp, q)

// The base endpoint to add query params to.
endpoint := "/v1/job/" + url.PathEscape(jobID)

// Protect against nil opts. url.Values expects a string, and so using
// fmt.Sprintf is the best way to do this.
if opts != nil {
endpoint += fmt.Sprintf("?purge=%t&global=%t&eval_priority=%v",
opts.Purge, opts.Global, opts.EvalPriority)
}
jrasell marked this conversation as resolved.
Show resolved Hide resolved

wm, err := j.client.delete(endpoint, &resp, q)
if err != nil {
return "", nil, err
}
Expand Down Expand Up @@ -1170,6 +1189,14 @@ type JobRegisterRequest struct {
PolicyOverride bool `json:",omitempty"`
PreserveCounts bool `json:",omitempty"`

// EvalPriority is an optional priority to use on any evaluation created as
// a result on this job registration. This value must be between 1-100
// inclusively, where a larger value corresponds to a higher priority. This
// is useful when an operator wishes to push through a job registration in
// busy clusters with a large evaluation backlog. This avoids needing to
// change the job priority which also impacts preemption.
EvalPriority int `json:",omitempty"`

WriteRequest
}

Expand Down
116 changes: 116 additions & 0 deletions api/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,60 @@ func TestJobs_Register_NoPreserveCounts(t *testing.T) {
require.Equal(3, status.TaskGroups["group3"].Desired) // new => as specified
}

func TestJobs_Register_EvalPriority(t *testing.T) {
t.Parallel()
requireAssert := require.New(t)

c, s := makeClient(t, nil, nil)
defer s.Stop()

// Listing jobs before registering returns nothing
listResp, _, err := c.Jobs().List(nil)
requireAssert.Nil(err)
requireAssert.Len(listResp, 0)

// Create a job and register it with an eval priority.
job := testJob()
registerResp, wm, err := c.Jobs().RegisterOpts(job, &RegisterOptions{EvalPriority: 99}, nil)
requireAssert.Nil(err)
requireAssert.NotNil(registerResp)
requireAssert.NotEmpty(registerResp.EvalID)
assertWriteMeta(t, wm)

// Check the created job evaluation has a priority that matches our desired
// value.
evalInfo, _, err := c.Evaluations().Info(registerResp.EvalID, nil)
requireAssert.NoError(err)
requireAssert.Equal(99, evalInfo.Priority)
}

func TestJobs_Register_NoEvalPriority(t *testing.T) {
t.Parallel()
requireAssert := require.New(t)

c, s := makeClient(t, nil, nil)
defer s.Stop()

// Listing jobs before registering returns nothing
listResp, _, err := c.Jobs().List(nil)
requireAssert.Nil(err)
requireAssert.Len(listResp, 0)

// Create a job and register it with an eval priority.
job := testJob()
registerResp, wm, err := c.Jobs().RegisterOpts(job, nil, nil)
requireAssert.Nil(err)
requireAssert.NotNil(registerResp)
requireAssert.NotEmpty(registerResp.EvalID)
assertWriteMeta(t, wm)

// Check the created job evaluation has a priority that matches the job
// priority.
evalInfo, _, err := c.Evaluations().Info(registerResp.EvalID, nil)
requireAssert.NoError(err)
requireAssert.Equal(*job.Priority, evalInfo.Priority)
}

func TestJobs_Validate(t *testing.T) {
t.Parallel()
c, s := makeClient(t, nil, nil)
Expand Down Expand Up @@ -1628,6 +1682,68 @@ func TestJobs_Deregister(t *testing.T) {
}
}

func TestJobs_Deregister_EvalPriority(t *testing.T) {
t.Parallel()
requireAssert := require.New(t)

c, s := makeClient(t, nil, nil)
defer s.Stop()

// Listing jobs before registering returns nothing
listResp, _, err := c.Jobs().List(nil)
requireAssert.Nil(err)
requireAssert.Len(listResp, 0)

// Create a job and register it.
job := testJob()
registerResp, wm, err := c.Jobs().Register(job, nil)
requireAssert.Nil(err)
requireAssert.NotNil(registerResp)
requireAssert.NotEmpty(registerResp.EvalID)
assertWriteMeta(t, wm)

// Deregister the job with an eval priority.
evalID, _, err := c.Jobs().DeregisterOpts(*job.ID, &DeregisterOptions{EvalPriority: 97}, nil)
requireAssert.NoError(err)
requireAssert.NotEmpty(t, evalID)

// Lookup the eval and check the priority on it.
evalInfo, _, err := c.Evaluations().Info(evalID, nil)
requireAssert.NoError(err)
requireAssert.Equal(97, evalInfo.Priority)
}

func TestJobs_Deregister_NoEvalPriority(t *testing.T) {
t.Parallel()
requireAssert := require.New(t)

c, s := makeClient(t, nil, nil)
defer s.Stop()

// Listing jobs before registering returns nothing
listResp, _, err := c.Jobs().List(nil)
requireAssert.Nil(err)
requireAssert.Len(listResp, 0)

// Create a job and register it.
job := testJob()
registerResp, wm, err := c.Jobs().Register(job, nil)
requireAssert.Nil(err)
requireAssert.NotNil(registerResp)
requireAssert.NotEmpty(registerResp.EvalID)
assertWriteMeta(t, wm)

// Deregister the job with an eval priority.
evalID, _, err := c.Jobs().DeregisterOpts(*job.ID, &DeregisterOptions{}, nil)
requireAssert.NoError(err)
requireAssert.NotEmpty(t, evalID)

// Lookup the eval and check the priority on it.
evalInfo, _, err := c.Evaluations().Info(evalID, nil)
requireAssert.NoError(err)
requireAssert.Equal(*job.Priority, evalInfo.Priority)
}

func TestJobs_ForceEvaluate(t *testing.T) {
t.Parallel()
c, s := makeClient(t, nil, nil)
Expand Down
13 changes: 13 additions & 0 deletions command/agent/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,19 @@ func parseBool(req *http.Request, field string) (*bool, error) {
return nil, nil
}

// parseInt parses a query parameter to a int or returns (nil, nil) if the
// parameter is not present.
func parseInt(req *http.Request, field string) (*int, error) {
if str := req.URL.Query().Get(field); str != "" {
param, err := strconv.Atoi(str)
if err != nil {
return nil, fmt.Errorf("Failed to parse value of %q (%v) as a int: %v", field, str, err)
}
return &param, nil
}
return nil, nil
}

// parseToken is used to parse the X-Nomad-Token param
func (s *HTTPServer) parseToken(req *http.Request, token *string) {
if other := req.Header.Get("X-Nomad-Token"); other != "" {
Expand Down
47 changes: 47 additions & 0 deletions command/agent/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,53 @@ func TestParseBool(t *testing.T) {
}
}

func Test_parseInt(t *testing.T) {
t.Parallel()

cases := []struct {
Input string
Expected *int
Err bool
}{
{
Input: "",
Expected: nil,
},
{
Input: "13",
Expected: helper.IntToPtr(13),
},
{
Input: "99",
Expected: helper.IntToPtr(99),
},
{
Input: "ten",
Err: true,
},
}

for i := range cases {
tc := cases[i]
t.Run("Input-"+tc.Input, func(t *testing.T) {
testURL, err := url.Parse("http://localhost/foo?eval_priority=" + tc.Input)
require.NoError(t, err)
req := &http.Request{
URL: testURL,
}

result, err := parseInt(req, "eval_priority")
if tc.Err {
require.Error(t, err)
require.Nil(t, result)
} else {
require.NoError(t, err)
require.Equal(t, tc.Expected, result)
}
})
}
}

func TestParsePagination(t *testing.T) {
t.Parallel()
s := makeHTTPServer(t, nil)
Expand Down
44 changes: 40 additions & 4 deletions command/agent/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,13 +390,23 @@ func (s *HTTPServer) jobUpdate(resp http.ResponseWriter, req *http.Request,
}
}

// Validate the evaluation priority if the user supplied a non-default
// value. It's more efficient to do it here, within the agent rather than
// sending a bad request for the server to reject.
if args.EvalPriority != 0 {
if err := validateEvalPriorityOpt(args.EvalPriority); err != nil {
return nil, err
}
}

sJob, writeReq := s.apiJobAndRequestToStructs(args.Job, req, args.WriteRequest)
regReq := structs.JobRegisterRequest{
Job: sJob,
EnforceIndex: args.EnforceIndex,
JobModifyIndex: args.JobModifyIndex,
PolicyOverride: args.PolicyOverride,
PreserveCounts: args.PreserveCounts,
EvalPriority: args.EvalPriority,
WriteRequest: *writeReq,
}

Expand All @@ -411,6 +421,9 @@ func (s *HTTPServer) jobUpdate(resp http.ResponseWriter, req *http.Request,
func (s *HTTPServer) jobDelete(resp http.ResponseWriter, req *http.Request,
jobName string) (interface{}, error) {

args := structs.JobDeregisterRequest{JobID: jobName}

// Identify the purge query param and parse.
purgeStr := req.URL.Query().Get("purge")
var purgeBool bool
if purgeStr != "" {
Expand All @@ -420,7 +433,9 @@ func (s *HTTPServer) jobDelete(resp http.ResponseWriter, req *http.Request,
return nil, fmt.Errorf("Failed to parse value of %q (%v) as a bool: %v", "purge", purgeStr, err)
}
}
args.Purge = purgeBool

// Identify the global query param and parse.
globalStr := req.URL.Query().Get("global")
var globalBool bool
if globalStr != "" {
Expand All @@ -430,12 +445,24 @@ func (s *HTTPServer) jobDelete(resp http.ResponseWriter, req *http.Request,
return nil, fmt.Errorf("Failed to parse value of %q (%v) as a bool: %v", "global", globalStr, err)
}
}
args.Global = globalBool

args := structs.JobDeregisterRequest{
JobID: jobName,
Purge: purgeBool,
Global: globalBool,
// Parse the eval priority from the request URL query if present.
evalPriority, err := parseInt(req, "eval_priority")
if err != nil {
return nil, err
}

// Validate the evaluation priority if the user supplied a non-default
// value. It's more efficient to do it here, within the agent rather than
// sending a bad request for the server to reject.
if evalPriority != nil && *evalPriority > 0 {
if err := validateEvalPriorityOpt(*evalPriority); err != nil {
return nil, err
}
args.EvalPriority = *evalPriority
}

s.parseWriteRequest(req, &args.WriteRequest)

var out structs.JobDeregisterResponse
Expand Down Expand Up @@ -1661,3 +1688,12 @@ func ApiSpreadToStructs(a1 *api.Spread) *structs.Spread {
}
return ret
}

// validateEvalPriorityOpt ensures the supplied evaluation priority override
// value is within acceptable bounds.
func validateEvalPriorityOpt(priority int) HTTPCodedError {
if priority < 1 || priority > 100 {
return CodedError(400, "Eval priority must be between 1 and 100 inclusively")
}
return nil
}
Loading