Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Making the status command return the allocs of currently registered job #2032

Merged
merged 2 commits into from
Dec 20, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions api/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package api

import (
"fmt"
"net/url"
"sort"
"strconv"
"time"
)

Expand Down Expand Up @@ -89,9 +91,18 @@ func (j *Jobs) Info(jobID string, q *QueryOptions) (*Job, *QueryMeta, error) {
}

// Allocations is used to return the allocs for a given job ID.
func (j *Jobs) Allocations(jobID string, q *QueryOptions) ([]*AllocationListStub, *QueryMeta, error) {
func (j *Jobs) Allocations(jobID string, allAllocs bool, q *QueryOptions) ([]*AllocationListStub, *QueryMeta, error) {
var resp []*AllocationListStub
qm, err := j.client.query("/v1/job/"+jobID+"/allocations", &resp, q)
u, err := url.Parse("/v1/job/" + jobID + "/allocations")
if err != nil {
return nil, nil, err
}

v := u.Query()
v.Add("all", strconv.FormatBool(allAllocs))
u.RawQuery = v.Encode()

qm, err := j.client.query(u.String(), &resp, q)
if err != nil {
return nil, nil, err
}
Expand Down
6 changes: 5 additions & 1 deletion command/agent/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package agent

import (
"net/http"
"strconv"
"strings"

"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -130,8 +131,11 @@ func (s *HTTPServer) jobAllocations(resp http.ResponseWriter, req *http.Request,
if req.Method != "GET" {
return nil, CodedError(405, ErrInvalidMethod)
}
allAllocs, _ := strconv.ParseBool(req.URL.Query().Get("all"))

args := structs.JobSpecificRequest{
JobID: jobName,
JobID: jobName,
AllAllocs: allAllocs,
}
if s.parse(resp, req, &args.Region, &args.QueryOptions) {
return nil, nil
Expand Down
8 changes: 3 additions & 5 deletions command/agent/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,9 +396,9 @@ func TestHTTP_JobEvaluations(t *testing.T) {
func TestHTTP_JobAllocations(t *testing.T) {
httpTest(t, nil, func(s *TestServer) {
// Create the job
job := mock.Job()
alloc1 := mock.Alloc()
args := structs.JobRegisterRequest{
Job: job,
Job: alloc1.Job,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.JobRegisterResponse
Expand All @@ -408,15 +408,13 @@ func TestHTTP_JobAllocations(t *testing.T) {

// Directly manipulate the state
state := s.Agent.server.State()
alloc1 := mock.Alloc()
alloc1.JobID = job.ID
err := state.UpsertAllocs(1000, []*structs.Allocation{alloc1})
if err != nil {
t.Fatalf("err: %v", err)
}

// Make the HTTP request
req, err := http.NewRequest("GET", "/v1/job/"+job.ID+"/allocations", nil)
req, err := http.NewRequest("GET", "/v1/job/"+alloc1.Job.ID+"/allocations?all=true", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion command/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ func (f *FSCommand) followFile(client *api.Client, alloc *api.Allocation,
// but use a dead allocation if no running allocations are found
func getRandomJobAlloc(client *api.Client, jobID string) (string, error) {
var runningAllocs []*api.AllocationListStub
allocs, _, err := client.Jobs().Allocations(jobID, nil)
allocs, _, err := client.Jobs().Allocations(jobID, false, nil)

// Check that the job actually has allocations
if len(allocs) == 0 {
Expand Down
14 changes: 10 additions & 4 deletions command/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ const (

type StatusCommand struct {
Meta
length int
evals bool
verbose bool
length int
evals bool
allAllocs bool
verbose bool
}

func (c *StatusCommand) Help() string {
Expand All @@ -45,6 +46,10 @@ Status Options:
-evals
Display the evaluations associated with the job.
-all-allocs
Display all allocations matching the job ID, including those from an older
instance of the job.
-verbose
Display full information.
`
Expand All @@ -62,6 +67,7 @@ func (c *StatusCommand) Run(args []string) int {
flags.Usage = func() { c.Ui.Output(c.Help()) }
flags.BoolVar(&short, "short", false, "")
flags.BoolVar(&c.evals, "evals", false, "")
flags.BoolVar(&c.allAllocs, "all-allocs", false, "")
flags.BoolVar(&c.verbose, "verbose", false, "")

if err := flags.Parse(args); err != nil {
Expand Down Expand Up @@ -218,7 +224,7 @@ func (c *StatusCommand) outputJobInfo(client *api.Client, job *api.Job) error {
var evals, allocs []string

// Query the allocations
jobAllocs, _, err := client.Jobs().Allocations(job.ID, nil)
jobAllocs, _, err := client.Jobs().Allocations(job.ID, c.allAllocs, nil)
if err != nil {
return fmt.Errorf("Error querying job allocations: %s", err)
}
Expand Down
2 changes: 1 addition & 1 deletion nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,7 @@ func (j *Job) Allocations(args *structs.JobSpecificRequest,
if err != nil {
return err
}
allocs, err := snap.AllocsByJob(args.JobID)
allocs, err := snap.AllocsByJob(args.JobID, args.AllAllocs)
if err != nil {
return err
}
Expand Down
20 changes: 19 additions & 1 deletion nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1073,9 +1073,19 @@ func (s *StateStore) AllocsByNodeTerminal(node string, terminal bool) ([]*struct
}

// AllocsByJob returns all the allocations by job id
func (s *StateStore) AllocsByJob(jobID string) ([]*structs.Allocation, error) {
func (s *StateStore) AllocsByJob(jobID string, all bool) ([]*structs.Allocation, error) {
txn := s.db.Txn(false)

// Get the job
var job *structs.Job
rawJob, err := txn.First("jobs", "id", jobID)
if err != nil {
return nil, err
}
if rawJob != nil {
job = rawJob.(*structs.Job)
}

// Get an iterator over the node allocations
iter, err := txn.Get("allocs", "job", jobID)
if err != nil {
Expand All @@ -1088,6 +1098,14 @@ func (s *StateStore) AllocsByJob(jobID string) ([]*structs.Allocation, error) {
if raw == nil {
break
}

alloc := raw.(*structs.Allocation)
// If the allocation belongs to a job with the same ID but a different
// create index and we are not getting all the allocations whose Jobs
// matches the same Job ID then we skip it
if !all && job != nil && alloc.Job.CreateIndex != job.CreateIndex {
continue
}
out = append(out, raw.(*structs.Allocation))
}
return out, nil
Expand Down
57 changes: 56 additions & 1 deletion nomad/state/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2362,7 +2362,7 @@ func TestStateStore_AllocsByJob(t *testing.T) {
t.Fatalf("err: %v", err)
}

out, err := state.AllocsByJob("foo")
out, err := state.AllocsByJob("foo", false)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand All @@ -2375,6 +2375,61 @@ func TestStateStore_AllocsByJob(t *testing.T) {
}
}

func TestStateStore_AllocsForRegisteredJob(t *testing.T) {
state := testStateStore(t)
var allocs []*structs.Allocation
var allocs1 []*structs.Allocation

job := mock.Job()
job.ID = "foo"
state.UpsertJob(100, job)
for i := 0; i < 3; i++ {
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
allocs = append(allocs, alloc)
}
if err := state.UpsertAllocs(200, allocs); err != nil {
t.Fatalf("err: %v", err)
}

if err := state.DeleteJob(250, job.ID); err != nil {
t.Fatalf("err: %v", err)
}

job1 := mock.Job()
job1.ID = "foo"
job1.CreateIndex = 50
state.UpsertJob(300, job1)
for i := 0; i < 4; i++ {
alloc := mock.Alloc()
alloc.Job = job1
alloc.JobID = job1.ID
allocs1 = append(allocs1, alloc)
}

if err := state.UpsertAllocs(1000, allocs1); err != nil {
t.Fatalf("err: %v", err)
}

out, err := state.AllocsByJob(job1.ID, true)
if err != nil {
t.Fatalf("err: %v", err)
}

expected := len(allocs) + len(allocs1)
if len(out) != expected {
t.Fatalf("expected: %v, actual: %v", expected, len(out))
}

out1, err := state.AllocsByJob(job1.ID, false)
expected = len(allocs1)
if len(out1) != expected {
t.Fatalf("expected: %v, actual: %v", expected, len(out1))
}

}

func TestStateStore_AllocsByIDPrefix(t *testing.T) {
state := testStateStore(t)
var allocs []*structs.Allocation
Expand Down
3 changes: 2 additions & 1 deletion nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,8 @@ type JobEvaluateRequest struct {

// JobSpecificRequest is used when we just need to specify a target job
type JobSpecificRequest struct {
JobID string
JobID string
AllAllocs bool
QueryOptions
}

Expand Down
2 changes: 1 addition & 1 deletion scheduler/generic_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ func (s *GenericScheduler) computeJobAllocs() error {
}

// Lookup the allocations by JobID
allocs, err := s.state.AllocsByJob(s.eval.JobID)
allocs, err := s.state.AllocsByJob(s.eval.JobID, true)
if err != nil {
return fmt.Errorf("failed to get allocs for job '%s': %v",
s.eval.JobID, err)
Expand Down
2 changes: 1 addition & 1 deletion scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type State interface {
Nodes() (memdb.ResultIterator, error)

// AllocsByJob returns the allocations by JobID
AllocsByJob(jobID string) ([]*structs.Allocation, error)
AllocsByJob(jobID string, all bool) ([]*structs.Allocation, error)

// AllocsByNode returns all the allocations by node
AllocsByNode(node string) ([]*structs.Allocation, error)
Expand Down
2 changes: 1 addition & 1 deletion scheduler/system_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (s *SystemScheduler) process() (bool, error) {
// existing allocations and node status to update the allocations.
func (s *SystemScheduler) computeJobAllocs() error {
// Lookup the allocations by JobID
allocs, err := s.state.AllocsByJob(s.eval.JobID)
allocs, err := s.state.AllocsByJob(s.eval.JobID, true)
if err != nil {
return fmt.Errorf("failed to get allocs for job '%s': %v",
s.eval.JobID, err)
Expand Down