Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Job Summary - Part 2 #1455

Merged
merged 31 commits into from
Jul 26, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
bac783a
Setting the number of queued allocations per task group
diptanu Jul 18, 2016
38b1ba7
Added the job summary related endpoints
diptanu Jul 18, 2016
6e9b955
Updating the job summary table only if an evaluation has any Queued A…
diptanu Jul 19, 2016
7497403
Fixed the logic for decrementing the count of queued based on plan re…
diptanu Jul 19, 2016
5b2f2ee
Not setting the desired and client status of an allocation during in-…
diptanu Jul 19, 2016
36abb97
Applying changes to job updates via FSM
diptanu Jul 19, 2016
0347f60
Updating the job summary while mutating jobs and allocation objects
diptanu Jul 20, 2016
7628283
Moved the job endpoint around
diptanu Jul 21, 2016
0a7489d
Adding the summary to the Job Stub
diptanu Jul 21, 2016
ba18de7
Added support for retreiving job summary in api
diptanu Jul 21, 2016
4095e33
Displaying the job summary in nomad status command
diptanu Jul 21, 2016
8f45222
Fixed some error messages and conditions
diptanu Jul 21, 2016
03c6692
Fixed some bugs
diptanu Jul 21, 2016
5e86e9a
Review comments
diptanu Jul 22, 2016
c79784d
Added code to create missing job summaries
diptanu Jul 22, 2016
e9d8e10
Added some more tests
diptanu Jul 22, 2016
46907a1
Added a test to ensure system scheduler records the correct number of…
diptanu Jul 22, 2016
fb3b109
Added a test to ensure failed batch allocations are being added to th…
diptanu Jul 22, 2016
7928f29
Added test for blocking query of job summary endpoint
diptanu Jul 22, 2016
230a59c
Fixed some more tests
diptanu Jul 22, 2016
73c6fbb
Added a test to ensure we record the queued allocations correctly whe…
diptanu Jul 22, 2016
ddf10e1
Initializing the queued allocations late
diptanu Jul 22, 2016
c18e431
Fixed more tests
diptanu Jul 25, 2016
c91f477
Renamed Job.GetJobSummary to Job.Summary
diptanu Jul 25, 2016
a9c995b
Added a test for adjustQueuedAllocations
diptanu Jul 25, 2016
3d4c185
Reconciling the queued allocations during restore
diptanu Jul 26, 2016
53a2cc9
Setting the right indexes while creating Job Summary
diptanu Jul 26, 2016
64a944c
Making the queued allocations bind late
diptanu Jul 26, 2016
c8870af
Fixed a test
diptanu Jul 26, 2016
d1da78a
Running the tests in verbose mode
diptanu Jul 26, 2016
2892224
Updated some tests
diptanu Jul 26, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions api/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,15 @@ func (j *Jobs) Plan(job *Job, diff bool, q *WriteOptions) (*JobPlanResponse, *Wr
return &resp, wm, nil
}

func (j *Jobs) Summary(jobID string, q *QueryOptions) (*JobSummary, *QueryMeta, error) {
var resp JobSummary
qm, err := j.client.query("/v1/job/"+jobID+"/summary", &resp, q)
if err != nil {
return nil, nil, err
}
return &resp, qm, nil
}

// periodicForceResponse is used to deserialize a force response
type periodicForceResponse struct {
EvalID string
Expand Down Expand Up @@ -199,6 +208,27 @@ type Job struct {
JobModifyIndex uint64
}

// JobSummary summarizes the state of the allocations of a job
type JobSummary struct {
JobID string
Summary map[string]TaskGroupSummary

// Raft Indexes
CreateIndex uint64
ModifyIndex uint64
}

// TaskGroup summarizes the state of all the allocations of a particular
// TaskGroup
type TaskGroupSummary struct {
Queued int
Complete int
Failed int
Running int
Starting int
Lost int
}

// JobListStub is used to return a subset of information about
// jobs during list operations.
type JobListStub struct {
Expand All @@ -209,6 +239,7 @@ type JobListStub struct {
Priority int
Status string
StatusDescription string
JobSummary *JobSummary
CreateIndex uint64
ModifyIndex uint64
JobModifyIndex uint64
Expand Down
42 changes: 42 additions & 0 deletions api/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,48 @@ func TestJobs_Plan(t *testing.T) {
}
}

func TestJobs_JobSummary(t *testing.T) {
c, s := makeClient(t, nil, nil)
defer s.Stop()
jobs := c.Jobs()

// Trying to retrieve a job summary before the job exists
// returns an error
_, _, err := jobs.Summary("job1", nil)
if err == nil || !strings.Contains(err.Error(), "not found") {
t.Fatalf("expected not found error, got: %#v", err)
}

// Register the job
job := testJob()
_, wm, err := jobs.Register(job, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
assertWriteMeta(t, wm)

// Query the job summary again and ensure it exists
result, qm, err := jobs.Summary("job1", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
assertQueryMeta(t, qm)

expectedJobSummary := JobSummary{
JobID: job.ID,
Summary: map[string]TaskGroupSummary{
job.TaskGroups[0].Name: {},
},
CreateIndex: result.CreateIndex,
ModifyIndex: result.ModifyIndex,
}

// Check that the result is what we expect
if !reflect.DeepEqual(&expectedJobSummary, result) {
t.Fatalf("expect: %#v, got: %#v", expectedJobSummary, result)
}
}

func TestJobs_NewBatchJob(t *testing.T) {
job := NewBatchJob("job1", "myjob", "region1", 5)
expect := &Job{
Expand Down
18 changes: 14 additions & 4 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,9 @@ func TestClient_UpdateAllocStatus(t *testing.T) {
alloc.ClientStatus = originalStatus

state := s1.State()
if err := state.UpsertJobSummary(99, mock.JobSummary(alloc.JobID)); err != nil {
t.Fatal(err)
}
state.UpsertAllocs(100, []*structs.Allocation{alloc})

testutil.WaitForResult(func() (bool, error) {
Expand Down Expand Up @@ -394,6 +397,12 @@ func TestClient_WatchAllocs(t *testing.T) {
alloc2.NodeID = c1.Node().ID

state := s1.State()
if err := state.UpsertJobSummary(998, mock.JobSummary(alloc1.JobID)); err != nil {
t.Fatal(err)
}
if err := state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID)); err != nil {
t.Fatal(err)
}
err := state.UpsertAllocs(100,
[]*structs.Allocation{alloc1, alloc2})
if err != nil {
Expand Down Expand Up @@ -469,8 +478,10 @@ func TestClient_SaveRestoreState(t *testing.T) {
task.Config["args"] = []string{"10"}

state := s1.State()
err := state.UpsertAllocs(100, []*structs.Allocation{alloc1})
if err != nil {
if err := state.UpsertJobSummary(99, mock.JobSummary(alloc1.JobID)); err != nil {
t.Fatal(err)
}
if err := state.UpsertAllocs(100, []*structs.Allocation{alloc1}); err != nil {
t.Fatalf("err: %v", err)
}

Expand All @@ -485,8 +496,7 @@ func TestClient_SaveRestoreState(t *testing.T) {
})

// Shutdown the client, saves state
err = c1.Shutdown()
if err != nil {
if err := c1.Shutdown(); err != nil {
t.Fatalf("err: %v", err)
}

Expand Down
19 changes: 16 additions & 3 deletions command/agent/alloc_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ func TestHTTP_AllocsList(t *testing.T) {
state := s.Agent.server.State()
alloc1 := mock.Alloc()
alloc2 := mock.Alloc()
state.UpsertJobSummary(998, mock.JobSummary(alloc1.JobID))
state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID))
err := state.UpsertAllocs(1000,
[]*structs.Allocation{alloc1, alloc2})
if err != nil {
Expand Down Expand Up @@ -58,13 +60,21 @@ func TestHTTP_AllocsPrefixList(t *testing.T) {
httpTest(t, nil, func(s *TestServer) {
// Directly manipulate the state
state := s.Agent.server.State()

alloc1 := mock.Alloc()
alloc1.ID = "aaaaaaaa-e8f7-fd38-c855-ab94ceb89706"
alloc2 := mock.Alloc()
alloc2.ID = "aaabbbbb-e8f7-fd38-c855-ab94ceb89706"
err := state.UpsertAllocs(1000,
[]*structs.Allocation{alloc1, alloc2})
if err != nil {
summary1 := mock.JobSummary(alloc1.JobID)
summary2 := mock.JobSummary(alloc2.JobID)
if err := state.UpsertJobSummary(998, summary1); err != nil {
t.Fatal(err)
}
if err := state.UpsertJobSummary(999, summary2); err != nil {
t.Fatal(err)
}
if err := state.UpsertAllocs(1000,
[]*structs.Allocation{alloc1, alloc2}); err != nil {
t.Fatalf("err: %v", err)
}

Expand Down Expand Up @@ -110,6 +120,9 @@ func TestHTTP_AllocQuery(t *testing.T) {
// Directly manipulate the state
state := s.Agent.server.State()
alloc := mock.Alloc()
if err := state.UpsertJobSummary(999, mock.JobSummary(alloc.JobID)); err != nil {
t.Fatal(err)
}
err := state.UpsertAllocs(1000,
[]*structs.Allocation{alloc})
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions command/agent/eval_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ func TestHTTP_EvalAllocations(t *testing.T) {
alloc1 := mock.Alloc()
alloc2 := mock.Alloc()
alloc2.EvalID = alloc1.EvalID
state.UpsertJobSummary(998, mock.JobSummary(alloc1.JobID))
state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID))
err := state.UpsertAllocs(1000,
[]*structs.Allocation{alloc1, alloc2})
if err != nil {
Expand Down
24 changes: 24 additions & 0 deletions command/agent/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ func (s *HTTPServer) JobSpecificRequest(resp http.ResponseWriter, req *http.Requ
case strings.HasSuffix(path, "/plan"):
jobName := strings.TrimSuffix(path, "/plan")
return s.jobPlan(resp, req, jobName)
case strings.HasSuffix(path, "/summary"):
jobName := strings.TrimSuffix(path, "/summary")
return s.jobSummaryRequest(resp, req, jobName)
default:
return s.jobCRUD(resp, req, path)
}
Expand Down Expand Up @@ -241,3 +244,24 @@ func (s *HTTPServer) jobDelete(resp http.ResponseWriter, req *http.Request,
setIndex(resp, out.Index)
return out, nil
}

func (s *HTTPServer) jobSummaryRequest(resp http.ResponseWriter, req *http.Request, name string) (interface{}, error) {
args := structs.JobSummaryRequest{
JobID: name,
}
if s.parse(resp, req, &args.Region, &args.QueryOptions) {
return nil, nil
}

var out structs.JobSummaryResponse
if err := s.agent.RPC("Job.Summary", &args, &out); err != nil {
return nil, err
}

setMeta(resp, &out.QueryMeta)
if out.JobSummary == nil {
return nil, CodedError(404, "job not found")
}
setIndex(resp, out.Index)
return out.JobSummary, nil
}
9 changes: 9 additions & 0 deletions command/agent/node_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ func TestHTTP_NodeForceEval(t *testing.T) {
state := s.Agent.server.State()
alloc1 := mock.Alloc()
alloc1.NodeID = node.ID
if err := state.UpsertJobSummary(999, mock.JobSummary(alloc1.JobID)); err != nil {
t.Fatal(err)
}
err := state.UpsertAllocs(1000, []*structs.Allocation{alloc1})
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -177,6 +180,9 @@ func TestHTTP_NodeAllocations(t *testing.T) {
state := s.Agent.server.State()
alloc1 := mock.Alloc()
alloc1.NodeID = node.ID
if err := state.UpsertJobSummary(999, mock.JobSummary(alloc1.JobID)); err != nil {
t.Fatal(err)
}
err := state.UpsertAllocs(1000, []*structs.Allocation{alloc1})
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -231,6 +237,9 @@ func TestHTTP_NodeDrain(t *testing.T) {
state := s.Agent.server.State()
alloc1 := mock.Alloc()
alloc1.NodeID = node.ID
if err := state.UpsertJobSummary(999, mock.JobSummary(alloc1.JobID)); err != nil {
t.Fatal(err)
}
err := state.UpsertAllocs(1000, []*structs.Allocation{alloc1})
if err != nil {
t.Fatalf("err: %v", err)
Expand Down
23 changes: 23 additions & 0 deletions command/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,29 @@ func (c *StatusCommand) outputJobInfo(client *api.Client, job *api.Job) error {
return fmt.Errorf("Error querying job evaluations: %s", err)
}

// Query the summary
summary, _, err := client.Jobs().Summary(job.ID, nil)
if err != nil {
return fmt.Errorf("Error querying job summary: %s", err)
}

// Format the summary
c.Ui.Output(c.Colorize().Color("\n[bold]Summary[reset]"))
if summary != nil {
summaries := make([]string, len(summary.Summary)+1)
summaries[0] = "Task Group|Queued|Starting|Running|Failed|Complete|Lost"
idx := 1
for tg, tgs := range summary.Summary {
summaries[idx] = fmt.Sprintf("%s|%d|%d|%d|%d|%d|%d",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Output this as well for the nomad status case

tg, tgs.Queued, tgs.Starting,
tgs.Running, tgs.Failed,
tgs.Complete, tgs.Lost,
)
idx += 1
}
c.Ui.Output(formatList(summaries))
}

// Determine latest evaluation with failures whose follow up hasn't
// completed, this is done while formatting
var latestFailedPlacement *api.Evaluation
Expand Down
3 changes: 3 additions & 0 deletions command/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ func TestStatusCommand_Run(t *testing.T) {
if !strings.Contains(out, "Allocations") {
t.Fatalf("should dump allocations")
}
if !strings.Contains(out, "Summary") {
t.Fatalf("should dump summary")
}
ui.OutputWriter.Reset()

// Query a single job showing evals
Expand Down
26 changes: 20 additions & 6 deletions nomad/alloc_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@ func TestAllocEndpoint_List(t *testing.T) {

// Create the register request
alloc := mock.Alloc()
summary := mock.JobSummary(alloc.JobID)
state := s1.fsm.State()
err := state.UpsertAllocs(1000, []*structs.Allocation{alloc})
if err != nil {

if err := state.UpsertJobSummary(999, summary); err != nil {
t.Fatalf("err: %v", err)
}
if err := state.UpsertAllocs(1000, []*structs.Allocation{alloc}); err != nil {
t.Fatalf("err: %v", err)
}

Expand Down Expand Up @@ -75,6 +79,10 @@ func TestAllocEndpoint_List_Blocking(t *testing.T) {
// Create the alloc
alloc := mock.Alloc()

summary := mock.JobSummary(alloc.JobID)
if err := state.UpsertJobSummary(1, summary); err != nil {
t.Fatalf("err: %v", err)
}
// Upsert alloc triggers watches
time.AfterFunc(100*time.Millisecond, func() {
if err := state.UpsertAllocs(2, []*structs.Allocation{alloc}); err != nil {
Expand Down Expand Up @@ -109,12 +117,13 @@ func TestAllocEndpoint_List_Blocking(t *testing.T) {
alloc2.ID = alloc.ID
alloc2.ClientStatus = structs.AllocClientStatusRunning
time.AfterFunc(100*time.Millisecond, func() {
if err := state.UpdateAllocsFromClient(3, []*structs.Allocation{alloc2}); err != nil {
state.UpsertJobSummary(3, mock.JobSummary(alloc2.JobID))
if err := state.UpdateAllocsFromClient(4, []*structs.Allocation{alloc2}); err != nil {
t.Fatalf("err: %v", err)
}
})

req.MinQueryIndex = 2
req.MinQueryIndex = 3
start = time.Now()
var resp2 structs.AllocListResponse
if err := msgpackrpc.CallWithCodec(codec, "Alloc.List", req, &resp2); err != nil {
Expand All @@ -124,8 +133,8 @@ func TestAllocEndpoint_List_Blocking(t *testing.T) {
if elapsed := time.Since(start); elapsed < 100*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp2)
}
if resp2.Index != 3 {
t.Fatalf("Bad index: %d %d", resp2.Index, 3)
if resp2.Index != 4 {
t.Fatalf("Bad index: %d %d", resp2.Index, 4)
}
if len(resp2.Allocations) != 1 || resp.Allocations[0].ID != alloc.ID ||
resp2.Allocations[0].ClientStatus != structs.AllocClientStatusRunning {
Expand All @@ -142,6 +151,7 @@ func TestAllocEndpoint_GetAlloc(t *testing.T) {
// Create the register request
alloc := mock.Alloc()
state := s1.fsm.State()
state.UpsertJobSummary(999, mock.JobSummary(alloc.JobID))
err := state.UpsertAllocs(1000, []*structs.Allocation{alloc})
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -178,6 +188,7 @@ func TestAllocEndpoint_GetAlloc_Blocking(t *testing.T) {

// First create an unrelated alloc
time.AfterFunc(100*time.Millisecond, func() {
state.UpsertJobSummary(99, mock.JobSummary(alloc1.JobID))
err := state.UpsertAllocs(100, []*structs.Allocation{alloc1})
if err != nil {
t.Fatalf("err: %v", err)
Expand All @@ -186,6 +197,7 @@ func TestAllocEndpoint_GetAlloc_Blocking(t *testing.T) {

// Create the alloc we are watching later
time.AfterFunc(200*time.Millisecond, func() {
state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID))
err := state.UpsertAllocs(200, []*structs.Allocation{alloc2})
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -227,6 +239,8 @@ func TestAllocEndpoint_GetAllocs(t *testing.T) {
alloc := mock.Alloc()
alloc2 := mock.Alloc()
state := s1.fsm.State()
state.UpsertJobSummary(998, mock.JobSummary(alloc.JobID))
state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID))
err := state.UpsertAllocs(1000, []*structs.Allocation{alloc, alloc2})
if err != nil {
t.Fatalf("err: %v", err)
Expand Down
Loading