Skip to content

Commit

Permalink
Merge pull request #663 from hashicorp/f-job-status
Browse files Browse the repository at this point in the history
Populate Job Status
  • Loading branch information
dadgar committed Jan 12, 2016
2 parents c7d06eb + 47fbfd3 commit 1ea6137
Show file tree
Hide file tree
Showing 7 changed files with 396 additions and 15 deletions.
1 change: 1 addition & 0 deletions nomad/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,7 @@ func TestJobEndpoint_GetJob(t *testing.T) {
}
job.CreateIndex = resp.JobModifyIndex
job.ModifyIndex = resp.JobModifyIndex
job.JobModifyIndex = resp.JobModifyIndex

// Lookup the job
get := &structs.JobSpecificRequest{
Expand Down
7 changes: 4 additions & 3 deletions nomad/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,10 @@ func Job() *structs.Job {
Meta: map[string]string{
"owner": "armon",
},
Status: structs.JobStatusPending,
CreateIndex: 42,
ModifyIndex: 99,
Status: structs.JobStatusPending,
CreateIndex: 42,
ModifyIndex: 99,
JobModifyIndex: 99,
}
job.InitFields()
return job
Expand Down
166 changes: 166 additions & 0 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,9 +295,26 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error {
if existing != nil {
job.CreateIndex = existing.(*structs.Job).CreateIndex
job.ModifyIndex = index
job.JobModifyIndex = index

// Compute the job status
var err error
job.Status, err = s.getJobStatus(txn, job, false)
if err != nil {
return fmt.Errorf("setting job status for %q failed: %v", job.ID, err)
}
} else {
job.CreateIndex = index
job.ModifyIndex = index
job.JobModifyIndex = index

// If we are inserting the job for the first time, we don't need to
// calculate the jobs status as it is known.
if job.IsPeriodic() {
job.Status = structs.JobStatusRunning
} else {
job.Status = structs.JobStatusPending
}
}

// Insert the job
Expand Down Expand Up @@ -524,11 +541,19 @@ func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) erro
watcher.Add(watch.Item{Table: "evals"})

// Do a nested upsert
jobs := make(map[string]string, len(evals))
for _, eval := range evals {
watcher.Add(watch.Item{Eval: eval.ID})
if err := s.nestedUpsertEval(txn, index, eval); err != nil {
return err
}

jobs[eval.JobID] = ""
}

// Set the job's status
if err := s.setJobStatuses(index, watcher, txn, jobs, false); err != nil {
return fmt.Errorf("setting job status failed: %v", err)
}

txn.Defer(func() { s.watch.notify(watcher) })
Expand Down Expand Up @@ -571,6 +596,7 @@ func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) e
watcher.Add(watch.Item{Table: "evals"})
watcher.Add(watch.Item{Table: "allocs"})

jobs := make(map[string]string, len(evals))
for _, eval := range evals {
existing, err := txn.First("evals", "id", eval)
if err != nil {
Expand All @@ -583,6 +609,7 @@ func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) e
return fmt.Errorf("eval delete failed: %v", err)
}
watcher.Add(watch.Item{Eval: eval})
jobs[existing.(*structs.Evaluation).JobID] = ""
}

for _, alloc := range allocs {
Expand Down Expand Up @@ -611,6 +638,11 @@ func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) e
return fmt.Errorf("index update failed: %v", err)
}

// Set the job's status
if err := s.setJobStatuses(index, watcher, txn, jobs, true); err != nil {
return fmt.Errorf("setting job status failed: %v", err)
}

txn.Defer(func() { s.watch.notify(watcher) })
txn.Commit()
return nil
Expand Down Expand Up @@ -726,6 +758,16 @@ func (s *StateStore) UpdateAllocFromClient(index uint64, alloc *structs.Allocati
return fmt.Errorf("index update failed: %v", err)
}

// Set the job's status
forceStatus := ""
if !copyAlloc.TerminalStatus() {
forceStatus = structs.JobStatusRunning
}
jobs := map[string]string{alloc.JobID: forceStatus}
if err := s.setJobStatuses(index, watcher, txn, jobs, false); err != nil {
return fmt.Errorf("setting job status failed: %v", err)
}

txn.Defer(func() { s.watch.notify(watcher) })
txn.Commit()
return nil
Expand All @@ -741,6 +783,7 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er
watcher.Add(watch.Item{Table: "allocs"})

// Handle the allocations
jobs := make(map[string]string, 1)
for _, alloc := range allocs {
existing, err := txn.First("allocs", "id", alloc.ID)
if err != nil {
Expand All @@ -761,6 +804,13 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er
return fmt.Errorf("alloc insert failed: %v", err)
}

// If the allocation is running, force the job to running status.
forceStatus := ""
if !alloc.TerminalStatus() {
forceStatus = structs.JobStatusRunning
}
jobs[alloc.JobID] = forceStatus

watcher.Add(watch.Item{Alloc: alloc.ID})
watcher.Add(watch.Item{AllocEval: alloc.EvalID})
watcher.Add(watch.Item{AllocJob: alloc.JobID})
Expand All @@ -772,6 +822,11 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er
return fmt.Errorf("index update failed: %v", err)
}

// Set the job's status
if err := s.setJobStatuses(index, watcher, txn, jobs, false); err != nil {
return fmt.Errorf("setting job status failed: %v", err)
}

txn.Defer(func() { s.watch.notify(watcher) })
txn.Commit()
return nil
Expand Down Expand Up @@ -906,6 +961,117 @@ func (s *StateStore) Indexes() (memdb.ResultIterator, error) {
return iter, nil
}

// setJobStatuses is a helper for calling setJobStatus on multiple jobs by ID.
// It takes a map of job IDs to an optional forceStatus string. It returns an
// error if the job doesn't exist or setJobStatus fails.
func (s *StateStore) setJobStatuses(index uint64, watcher watch.Items, txn *memdb.Txn,
jobs map[string]string, evalDelete bool) error {
for job, forceStatus := range jobs {
existing, err := txn.First("jobs", "id", job)
if err != nil {
return fmt.Errorf("job lookup failed: %v", err)
}

if existing == nil {
continue
}

if err := s.setJobStatus(index, watcher, txn, existing.(*structs.Job), evalDelete, forceStatus); err != nil {
return err
}
}

return nil
}

// setJobStatus sets the status of the job by looking up associated evaluations
// and allocations. evalDelete should be set to true if setJobStatus is being
// called because an evaluation is being deleted (potentially because of garbage
// collection). If forceStatus is non-empty, the job's status will be set to the
// passed status.
func (s *StateStore) setJobStatus(index uint64, watcher watch.Items, txn *memdb.Txn,
job *structs.Job, evalDelete bool, forceStatus string) error {

// Capture the current status so we can check if there is a change
oldStatus := job.Status
newStatus := forceStatus

// If forceStatus is not set, compute the jobs status.
if forceStatus == "" {
var err error
newStatus, err = s.getJobStatus(txn, job, evalDelete)
if err != nil {
return err
}
}

// Fast-path if nothing has changed.
if oldStatus == newStatus {
return nil
}

// The job has changed, so add to watcher.
watcher.Add(watch.Item{Table: "jobs"})
watcher.Add(watch.Item{Job: job.ID})

// Copy and update the existing job
updated := job.Copy()
updated.Status = newStatus
updated.ModifyIndex = index

// Insert the job
if err := txn.Insert("jobs", updated); err != nil {
return fmt.Errorf("job insert failed: %v", err)
}
if err := txn.Insert("index", &IndexEntry{"jobs", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
return nil
}

func (s *StateStore) getJobStatus(txn *memdb.Txn, job *structs.Job, evalDelete bool) (string, error) {
allocs, err := txn.Get("allocs", "job", job.ID)
if err != nil {
return "", err
}

// If there is a non-terminal allocation, the job is running.
hasAlloc := false
for alloc := allocs.Next(); alloc != nil; alloc = allocs.Next() {
hasAlloc = true
if !alloc.(*structs.Allocation).TerminalStatus() {
return structs.JobStatusRunning, nil
}
}

evals, err := txn.Get("evals", "job", job.ID)
if err != nil {
return "", err
}

hasEval := false
for eval := evals.Next(); eval != nil; eval = evals.Next() {
hasEval = true
if !eval.(*structs.Evaluation).TerminalStatus() {
return structs.JobStatusPending, nil
}
}

// The job is dead if all the allocations and evals are terminal or if there
// are no evals because of garbage collection.
if evalDelete || hasEval || hasAlloc {
return structs.JobStatusDead, nil
}

// If there are no allocations or evaluations it is a new job. If the job is
// periodic, we mark it as running as it will never have an
// allocation/evaluation against it.
if job.IsPeriodic() {
return structs.JobStatusRunning, nil
}
return structs.JobStatusPending, nil
}

// StateSnapshot is used to provide a point-in-time snapshot
type StateSnapshot struct {
StateStore
Expand Down
Loading

0 comments on commit 1ea6137

Please sign in to comment.