Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix dispatch of periodic job #2489

Merged
merged 1 commit into from
Mar 28, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions command/job_dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,20 @@ func (c *JobDispatchCommand) Run(args []string) int {
return 1
}

// See if an evaluation was created. If the job is periodic there will be no
// eval.
evalCreated := resp.EvalID != ""

basic := []string{
fmt.Sprintf("Dispatched Job ID|%s", resp.DispatchedJobID),
fmt.Sprintf("Evaluation ID|%s", limit(resp.EvalID, length)),
}
if evalCreated {
basic = append(basic, fmt.Sprintf("Evaluation ID|%s", limit(resp.EvalID, length)))
}
c.Ui.Output(formatKV(basic))

if detach {
// Nothing to do
if detach || !evalCreated {
return 0
}

Expand Down
2 changes: 1 addition & 1 deletion nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} {
return nil
}

if parent.IsPeriodic() {
if parent.IsPeriodic() && !parent.IsParameterized() {
t, err := n.periodicDispatcher.LaunchTime(req.Job.ID)
if err != nil {
n.logger.Printf("[ERR] nomad.fsm: LaunchTime(%v) failed: %v", req.Job.ID, err)
Expand Down
56 changes: 31 additions & 25 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -842,34 +842,40 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa
return err
}

// Create a new evaluation
eval := &structs.Evaluation{
ID: structs.GenerateUUID(),
Priority: dispatchJob.Priority,
Type: dispatchJob.Type,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: dispatchJob.ID,
JobModifyIndex: jobCreateIndex,
Status: structs.EvalStatusPending,
}
update := &structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
WriteRequest: structs.WriteRequest{Region: args.Region},
}
reply.JobCreateIndex = jobCreateIndex
reply.DispatchedJobID = dispatchJob.ID
reply.Index = jobCreateIndex

// If the job is periodic, we don't create an eval.
if !dispatchJob.IsPeriodic() {
// Create a new evaluation
eval := &structs.Evaluation{
ID: structs.GenerateUUID(),
Priority: dispatchJob.Priority,
Type: dispatchJob.Type,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: dispatchJob.ID,
JobModifyIndex: jobCreateIndex,
Status: structs.EvalStatusPending,
}
update := &structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
WriteRequest: structs.WriteRequest{Region: args.Region},
}

// Commit this evaluation via Raft
_, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update)
if err != nil {
j.srv.logger.Printf("[ERR] nomad.job: Eval create failed: %v", err)
return err
// Commit this evaluation via Raft
_, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update)
if err != nil {
j.srv.logger.Printf("[ERR] nomad.job: Eval create failed: %v", err)
return err
}

// Setup the reply
reply.EvalID = eval.ID
reply.EvalCreateIndex = evalIndex
reply.Index = evalIndex
}

// Setup the reply
reply.EvalID = eval.ID
reply.EvalCreateIndex = evalIndex
reply.JobCreateIndex = jobCreateIndex
reply.DispatchedJobID = dispatchJob.ID
reply.Index = evalIndex
return nil
}

Expand Down
29 changes: 28 additions & 1 deletion nomad/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1938,6 +1938,10 @@ func TestJobEndpoint_Dispatch(t *testing.T) {
MetaOptional: []string{"foo", "bar"},
}

// Periodic dispatch job
d6 := mock.PeriodicJob()
d6.ParameterizedJob = &structs.ParameterizedJobConfig{}

reqNoInputNoMeta := &structs.JobDispatchRequest{}
reqInputDataNoMeta := &structs.JobDispatchRequest{
Payload: []byte("hello world"),
Expand Down Expand Up @@ -1971,6 +1975,7 @@ func TestJobEndpoint_Dispatch(t *testing.T) {
name string
parameterizedJob *structs.Job
dispatchReq *structs.JobDispatchRequest
noEval bool
err bool
errStr string
}
Expand Down Expand Up @@ -2052,6 +2057,12 @@ func TestJobEndpoint_Dispatch(t *testing.T) {
err: true,
errStr: "Payload exceeds maximum size",
},
{
name: "periodic job dispatched, ensure no eval",
parameterizedJob: d6,
dispatchReq: reqNoInputNoMeta,
noEval: true,
},
}

for _, tc := range cases {
Expand Down Expand Up @@ -2088,7 +2099,18 @@ func TestJobEndpoint_Dispatch(t *testing.T) {
}

// Check that we got an eval and job id back
if dispatchResp.EvalID == "" || dispatchResp.DispatchedJobID == "" {
switch dispatchResp.EvalID {
case "":
if !tc.noEval {
t.Fatalf("Bad response")
}
default:
if tc.noEval {
t.Fatalf("Got eval %q", dispatchResp.EvalID)
}
}

if dispatchResp.DispatchedJobID == "" {
t.Fatalf("Bad response")
}

Expand All @@ -2108,11 +2130,16 @@ func TestJobEndpoint_Dispatch(t *testing.T) {
t.Fatalf("bad parent ID")
}

if tc.noEval {
return
}

// Lookup the evaluation
eval, err := state.EvalByID(ws, dispatchResp.EvalID)
if err != nil {
t.Fatalf("err: %v", err)
}

if eval == nil {
t.Fatalf("expected eval")
}
Expand Down