Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Populate Job Status #663

Merged
merged 4 commits into from
Jan 12, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions nomad/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,7 @@ func TestJobEndpoint_GetJob(t *testing.T) {
}
job.CreateIndex = resp.JobModifyIndex
job.ModifyIndex = resp.JobModifyIndex
job.JobModifyIndex = resp.JobModifyIndex

// Lookup the job
get := &structs.JobSpecificRequest{
Expand Down
7 changes: 4 additions & 3 deletions nomad/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,10 @@ func Job() *structs.Job {
Meta: map[string]string{
"owner": "armon",
},
Status: structs.JobStatusPending,
CreateIndex: 42,
ModifyIndex: 99,
Status: structs.JobStatusPending,
CreateIndex: 42,
ModifyIndex: 99,
JobModifyIndex: 99,
}
job.InitFields()
return job
Expand Down
166 changes: 166 additions & 0 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,9 +295,26 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error {
if existing != nil {
job.CreateIndex = existing.(*structs.Job).CreateIndex
job.ModifyIndex = index
job.JobModifyIndex = index

// Compute the job status
var err error
job.Status, err = s.getJobStatus(txn, job, false)
if err != nil {
return fmt.Errorf("setting job status for %q failed: %v", job.ID, err)
}
} else {
job.CreateIndex = index
job.ModifyIndex = index
job.JobModifyIndex = index

// If we are inserting the job for the first time, we don't need to
// calculate the jobs status as it is known.
if job.IsPeriodic() {
job.Status = structs.JobStatusRunning
} else {
job.Status = structs.JobStatusPending
}
}

// Insert the job
Expand Down Expand Up @@ -524,11 +541,19 @@ func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) erro
watcher.Add(watch.Item{Table: "evals"})

// Do a nested upsert
jobs := make(map[string]string, len(evals))
for _, eval := range evals {
watcher.Add(watch.Item{Eval: eval.ID})
if err := s.nestedUpsertEval(txn, index, eval); err != nil {
return err
}

jobs[eval.JobID] = ""
}

// Set the job's status
if err := s.setJobStatuses(index, watcher, txn, jobs, false); err != nil {
return fmt.Errorf("setting job status failed: %v", err)
}

txn.Defer(func() { s.watch.notify(watcher) })
Expand Down Expand Up @@ -571,6 +596,7 @@ func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) e
watcher.Add(watch.Item{Table: "evals"})
watcher.Add(watch.Item{Table: "allocs"})

jobs := make(map[string]string, len(evals))
for _, eval := range evals {
existing, err := txn.First("evals", "id", eval)
if err != nil {
Expand All @@ -583,6 +609,7 @@ func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) e
return fmt.Errorf("eval delete failed: %v", err)
}
watcher.Add(watch.Item{Eval: eval})
jobs[existing.(*structs.Evaluation).JobID] = ""
}

for _, alloc := range allocs {
Expand Down Expand Up @@ -611,6 +638,11 @@ func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) e
return fmt.Errorf("index update failed: %v", err)
}

// Set the job's status
if err := s.setJobStatuses(index, watcher, txn, jobs, true); err != nil {
return fmt.Errorf("setting job status failed: %v", err)
}

txn.Defer(func() { s.watch.notify(watcher) })
txn.Commit()
return nil
Expand Down Expand Up @@ -726,6 +758,16 @@ func (s *StateStore) UpdateAllocFromClient(index uint64, alloc *structs.Allocati
return fmt.Errorf("index update failed: %v", err)
}

// Set the job's status
forceStatus := ""
if !copyAlloc.TerminalStatus() {
forceStatus = structs.JobStatusRunning
}
jobs := map[string]string{alloc.JobID: forceStatus}
if err := s.setJobStatuses(index, watcher, txn, jobs, false); err != nil {
return fmt.Errorf("setting job status failed: %v", err)
}

txn.Defer(func() { s.watch.notify(watcher) })
txn.Commit()
return nil
Expand All @@ -741,6 +783,7 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er
watcher.Add(watch.Item{Table: "allocs"})

// Handle the allocations
jobs := make(map[string]string, 1)
for _, alloc := range allocs {
existing, err := txn.First("allocs", "id", alloc.ID)
if err != nil {
Expand All @@ -761,6 +804,13 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er
return fmt.Errorf("alloc insert failed: %v", err)
}

// If the allocation is running, force the job to running status.
forceStatus := ""
if !alloc.TerminalStatus() {
forceStatus = structs.JobStatusRunning
}
jobs[alloc.JobID] = forceStatus

watcher.Add(watch.Item{Alloc: alloc.ID})
watcher.Add(watch.Item{AllocEval: alloc.EvalID})
watcher.Add(watch.Item{AllocJob: alloc.JobID})
Expand All @@ -772,6 +822,11 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er
return fmt.Errorf("index update failed: %v", err)
}

// Set the job's status
if err := s.setJobStatuses(index, watcher, txn, jobs, false); err != nil {
return fmt.Errorf("setting job status failed: %v", err)
}

txn.Defer(func() { s.watch.notify(watcher) })
txn.Commit()
return nil
Expand Down Expand Up @@ -906,6 +961,117 @@ func (s *StateStore) Indexes() (memdb.ResultIterator, error) {
return iter, nil
}

// setJobStatuses is a helper for calling setJobStatus on multiple jobs by ID.
// It takes a map of job IDs to an optional forceStatus string. It returns an
// error if the job doesn't exist or setJobStatus fails.
func (s *StateStore) setJobStatuses(index uint64, watcher watch.Items, txn *memdb.Txn,
jobs map[string]string, evalDelete bool) error {
for job, forceStatus := range jobs {
existing, err := txn.First("jobs", "id", job)
if err != nil {
return fmt.Errorf("job lookup failed: %v", err)
}

if existing == nil {
continue
}

if err := s.setJobStatus(index, watcher, txn, existing.(*structs.Job), evalDelete, forceStatus); err != nil {
return err
}
}

return nil
}

// setJobStatus sets the status of the job by looking up associated evaluations
// and allocations. evalDelete should be set to true if setJobStatus is being
// called because an evaluation is being deleted (potentially because of garbage
// collection). If forceStatus is non-empty, the job's status will be set to the
// passed status.
func (s *StateStore) setJobStatus(index uint64, watcher watch.Items, txn *memdb.Txn,
job *structs.Job, evalDelete bool, forceStatus string) error {

// Capture the current status so we can check if there is a change
oldStatus := job.Status
newStatus := forceStatus

// If forceStatus is not set, compute the jobs status.
if forceStatus == "" {
var err error
newStatus, err = s.getJobStatus(txn, job, evalDelete)
if err != nil {
return err
}
}

// Fast-path if nothing has changed.
if oldStatus == newStatus {
return nil
}

// The job has changed, so add to watcher.
watcher.Add(watch.Item{Table: "jobs"})
watcher.Add(watch.Item{Job: job.ID})

// Copy and update the existing job
updated := job.Copy()
updated.Status = newStatus
updated.ModifyIndex = index

// Insert the job
if err := txn.Insert("jobs", updated); err != nil {
return fmt.Errorf("job insert failed: %v", err)
}
if err := txn.Insert("index", &IndexEntry{"jobs", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
return nil
}

func (s *StateStore) getJobStatus(txn *memdb.Txn, job *structs.Job, evalDelete bool) (string, error) {
allocs, err := txn.Get("allocs", "job", job.ID)
if err != nil {
return "", err
}

// If there is a non-terminal allocation, the job is running.
hasAlloc := false
for alloc := allocs.Next(); alloc != nil; alloc = allocs.Next() {
hasAlloc = true
if !alloc.(*structs.Allocation).TerminalStatus() {
return structs.JobStatusRunning, nil
}
}

evals, err := txn.Get("evals", "job", job.ID)
if err != nil {
return "", err
}

hasEval := false
for eval := evals.Next(); eval != nil; eval = evals.Next() {
hasEval = true
if !eval.(*structs.Evaluation).TerminalStatus() {
return structs.JobStatusPending, nil
}
}

// The job is dead if all the allocations and evals are terminal or if there
// are no evals because of garbage collection.
if evalDelete || hasEval || hasAlloc {
return structs.JobStatusDead, nil
}

// If there are no allocations or evaluations it is a new job. If the job is
// periodic, we mark it as running as it will never have an
// allocation/evaluation against it.
if job.IsPeriodic() {
return structs.JobStatusRunning, nil
}
return structs.JobStatusPending, nil
}

// StateSnapshot is used to provide a point-in-time snapshot
type StateSnapshot struct {
StateStore
Expand Down
Loading