Skip to content

Commit

Permalink
Merge pull request #4274 from hashicorp/f-force-rescheduling
Browse files Browse the repository at this point in the history
Add CLI and API support for forcing rescheduling of failed allocs
  • Loading branch information
preetapan committed May 21, 2018
2 parents 8715013 + a5ca379 commit 447527a
Show file tree
Hide file tree
Showing 15 changed files with 672 additions and 11 deletions.
28 changes: 28 additions & 0 deletions api/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,22 @@ func (j *Jobs) ForceEvaluate(jobID string, q *WriteOptions) (string, *WriteMeta,
return resp.EvalID, wm, nil
}

// EvaluateWithOpts is used to force-evaluate an existing job and takes additional options
// for whether to force reschedule failed allocations
func (j *Jobs) EvaluateWithOpts(jobID string, opts EvalOptions, q *WriteOptions) (string, *WriteMeta, error) {
req := &JobEvaluateRequest{
JobID: jobID,
EvalOptions: opts,
}

var resp JobRegisterResponse
wm, err := j.client.write("/v1/job/"+jobID+"/evaluate", req, &resp, q)
if err != nil {
return "", nil, err
}
return resp.EvalID, wm, nil
}

// PeriodicForce spawns a new instance of the periodic job and returns the eval ID
func (j *Jobs) PeriodicForce(jobID string, q *WriteOptions) (string, *WriteMeta, error) {
var resp periodicForceResponse
Expand Down Expand Up @@ -1032,3 +1048,15 @@ type JobStabilityResponse struct {
JobModifyIndex uint64
WriteMeta
}

// JobEvaluateRequest is used when we just need to re-evaluate a target job
type JobEvaluateRequest struct {
JobID string
EvalOptions EvalOptions
WriteRequest
}

// EvalOptions is used to encapsulate options when forcing a job evaluation
type EvalOptions struct {
ForceReschedule bool
}
21 changes: 19 additions & 2 deletions command/agent/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,25 @@ func (s *HTTPServer) jobForceEvaluate(resp http.ResponseWriter, req *http.Reques
if req.Method != "PUT" && req.Method != "POST" {
return nil, CodedError(405, ErrInvalidMethod)
}
args := structs.JobEvaluateRequest{
JobID: jobName,
var args structs.JobEvaluateRequest

// TODO(preetha): remove in 0.9
// COMPAT: For backwards compatibility allow using this endpoint without a payload
if req.ContentLength == 0 {
args = structs.JobEvaluateRequest{
JobID: jobName,
}
} else {
if err := decodeBody(req, &args); err != nil {
return nil, CodedError(400, err.Error())
}
if args.JobID == "" {
return nil, CodedError(400, "Job ID must be specified")
}

if jobName != "" && args.JobID != jobName {
return nil, CodedError(400, "JobID not same as job name")
}
}
s.parseWriteRequest(req, &args.WriteRequest)

Expand Down
51 changes: 51 additions & 0 deletions command/agent/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,57 @@ func TestHTTP_JobForceEvaluate(t *testing.T) {
})
}

func TestHTTP_JobEvaluate_ForceReschedule(t *testing.T) {
t.Parallel()
httpTest(t, nil, func(s *TestAgent) {
// Create the job
job := mock.Job()
args := structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: structs.DefaultNamespace,
},
}
var resp structs.JobRegisterResponse
if err := s.Agent.RPC("Job.Register", &args, &resp); err != nil {
t.Fatalf("err: %v", err)
}
jobEvalReq := api.JobEvaluateRequest{
JobID: job.ID,
EvalOptions: api.EvalOptions{
ForceReschedule: true,
},
}

buf := encodeReq(jobEvalReq)

// Make the HTTP request
req, err := http.NewRequest("POST", "/v1/job/"+job.ID+"/evaluate", buf)
if err != nil {
t.Fatalf("err: %v", err)
}
respW := httptest.NewRecorder()

// Make the request
obj, err := s.Server.JobSpecificRequest(respW, req)
if err != nil {
t.Fatalf("err: %v", err)
}

// Check the response
reg := obj.(structs.JobRegisterResponse)
if reg.EvalID == "" {
t.Fatalf("bad: %v", reg)
}

// Check for the index
if respW.HeaderMap.Get("X-Nomad-Index") == "" {
t.Fatalf("missing index")
}
})
}

func TestHTTP_JobEvaluations(t *testing.T) {
t.Parallel()
httpTest(t, nil, func(s *TestAgent) {
Expand Down
5 changes: 5 additions & 0 deletions command/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,11 @@ func Commands(metaPtr *Meta, agentUi cli.Ui) map[string]cli.CommandFactory {
Meta: meta,
}, nil
},
"job eval": func() (cli.Command, error) {
return &JobEvalCommand{
Meta: meta,
}, nil
},
"job history": func() (cli.Command, error) {
return &JobHistoryCommand{
Meta: meta,
Expand Down
128 changes: 128 additions & 0 deletions command/job_eval.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package command

import (
"fmt"
"strings"

"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/api/contexts"
"github.com/posener/complete"
)

type JobEvalCommand struct {
Meta
forceRescheduling bool
}

func (c *JobEvalCommand) Help() string {
helpText := `
Usage: nomad job eval [options] <job_id>
Force an evaluation of the provided job ID. Forcing an evaluation will trigger the scheduler
to re-evaluate the job. The force flags allow operators to force the scheduler to create
new allocations under certain scenarios.
General Options:
` + generalOptionsUsage() + `
Eval Options:
-force-reschedule
Force reschedule failed allocations even if they are not currently
eligible for rescheduling.
-detach
Return immediately instead of entering monitor mode. The ID
of the evaluation created will be printed to the screen, which can be
used to examine the evaluation using the eval-status command.
-verbose
Display full information.
`
return strings.TrimSpace(helpText)
}

func (c *JobEvalCommand) Synopsis() string {
return "Force an evaluation for the job"
}

func (c *JobEvalCommand) AutocompleteFlags() complete.Flags {
return mergeAutocompleteFlags(c.Meta.AutocompleteFlags(FlagSetClient),
complete.Flags{
"-force-reschedule": complete.PredictNothing,
"-detach": complete.PredictNothing,
"-verbose": complete.PredictNothing,
})
}

func (c *JobEvalCommand) AutocompleteArgs() complete.Predictor {
return complete.PredictFunc(func(a complete.Args) []string {
client, err := c.Meta.Client()
if err != nil {
return nil
}

resp, _, err := client.Search().PrefixSearch(a.Last, contexts.Jobs, nil)
if err != nil {
return []string{}
}
return resp.Matches[contexts.Jobs]
})
}

func (c *JobEvalCommand) Name() string { return "job eval" }

func (c *JobEvalCommand) Run(args []string) int {
var detach, verbose bool

flags := c.Meta.FlagSet(c.Name(), FlagSetClient)
flags.Usage = func() { c.Ui.Output(c.Help()) }
flags.BoolVar(&c.forceRescheduling, "force-reschedule", false, "")
flags.BoolVar(&detach, "detach", false, "")
flags.BoolVar(&verbose, "verbose", false, "")

if err := flags.Parse(args); err != nil {
return 1
}

// Check that we either got no jobs or exactly one.
args = flags.Args()
if len(args) != 1 {
c.Ui.Error("This command takes one argument: <job>")
c.Ui.Error(commandErrorText(c))
return 1
}

// Get the HTTP client
client, err := c.Meta.Client()
if err != nil {
c.Ui.Error(fmt.Sprintf("Error initializing client: %s", err))
return 1
}

// Truncate the id unless full length is requested
length := shortId
if verbose {
length = fullId
}
// Call eval endpoint
jobID := args[0]

opts := api.EvalOptions{
ForceReschedule: c.forceRescheduling,
}
evalId, _, err := client.Jobs().EvaluateWithOpts(jobID, opts, nil)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error evaluating job: %s", err))
return 1
}

if detach {
c.Ui.Output(fmt.Sprintf("Created eval ID: %q ", limit(evalId, length)))
return 0
}

mon := newMonitor(c.Ui, client, length)
return mon.monitor(evalId, false)
}
127 changes: 127 additions & 0 deletions command/job_eval_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package command

import (
"strings"
"testing"

"fmt"

"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/mitchellh/cli"
"github.com/posener/complete"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestJobEvalCommand_Implements(t *testing.T) {
t.Parallel()
var _ cli.Command = &JobEvalCommand{}
}

func TestJobEvalCommand_Fails(t *testing.T) {
t.Parallel()
ui := new(cli.MockUi)
cmd := &JobEvalCommand{Meta: Meta{Ui: ui}}

// Fails on misuse
if code := cmd.Run([]string{"some", "bad", "args"}); code != 1 {
t.Fatalf("expected exit code 1, got: %d", code)
}
if out := ui.ErrorWriter.String(); !strings.Contains(out, commandErrorText(cmd)) {
t.Fatalf("expected help output, got: %s", out)
}
ui.ErrorWriter.Reset()

// Fails when job ID is not specified
if code := cmd.Run([]string{}); code != 1 {
t.Fatalf("expect exit 1, got: %d", code)
}
if out := ui.ErrorWriter.String(); !strings.Contains(out, "This command takes one argument") {
t.Fatalf("unexpected error: %v", out)
}
ui.ErrorWriter.Reset()

}

func TestJobEvalCommand_Run(t *testing.T) {
t.Parallel()
srv, client, url := testServer(t, true, nil)
defer srv.Shutdown()

// Wait for a node to be ready
testutil.WaitForResult(func() (bool, error) {
nodes, _, err := client.Nodes().List(nil)
if err != nil {
return false, err
}
for _, node := range nodes {
if node.Status == structs.NodeStatusReady {
return true, nil
}
}
return false, fmt.Errorf("no ready nodes")
}, func(err error) {
t.Fatalf("err: %v", err)
})

ui := new(cli.MockUi)
cmd := &JobEvalCommand{Meta: Meta{Ui: ui}}
require := require.New(t)

state := srv.Agent.Server().State()

// Create a job
job := mock.Job()
err := state.UpsertJob(11, job)
require.Nil(err)

job, err = state.JobByID(nil, structs.DefaultNamespace, job.ID)
require.Nil(err)

// Create a failed alloc for the job
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.TaskGroup = job.TaskGroups[0].Name
alloc.Namespace = job.Namespace
alloc.ClientStatus = structs.AllocClientStatusFailed
err = state.UpsertAllocs(12, []*structs.Allocation{alloc})
require.Nil(err)

if code := cmd.Run([]string{"-address=" + url, "-force-reschedule", "-detach", job.ID}); code != 0 {
t.Fatalf("expected exit 0, got: %d", code)
}

// Lookup alloc again
alloc, err = state.AllocByID(nil, alloc.ID)
require.NotNil(alloc)
require.Nil(err)
require.True(*alloc.DesiredTransition.ForceReschedule)

}

func TestJobEvalCommand_AutocompleteArgs(t *testing.T) {
assert := assert.New(t)
t.Parallel()

srv, _, url := testServer(t, true, nil)
defer srv.Shutdown()

ui := new(cli.MockUi)
cmd := &JobEvalCommand{Meta: Meta{Ui: ui, flagAddress: url}}

// Create a fake job
state := srv.Agent.Server().State()
j := mock.Job()
assert.Nil(state.UpsertJob(1000, j))

prefix := j.ID[:len(j.ID)-5]
args := complete.Args{Last: prefix}
predictor := cmd.AutocompleteArgs()

res := predictor.Predict(args)
assert.Equal(1, len(res))
assert.Equal(j.ID, res[0])
}
Loading

0 comments on commit 447527a

Please sign in to comment.