Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
dadgar committed Jan 12, 2016
1 parent 894b3e3 commit 875bf47
Show file tree
Hide file tree
Showing 2 changed files with 179 additions and 92 deletions.
94 changes: 60 additions & 34 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,26 +292,29 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error {
}

// Setup the indexes correctly
forceStatus := ""
if existing != nil {
job.CreateIndex = existing.(*structs.Job).CreateIndex
job.ModifyIndex = index

// Compute the job status
var err error
job.Status, err = s.getJobStatus(txn, job, false)
if err != nil {
return fmt.Errorf("setting job status for %q failed: %v", job.ID, err)
}
} else {
job.CreateIndex = index
job.ModifyIndex = index

// If we are inserting the job for the first time, we don't need to
// calculate the jobs status as it is known.
if job.IsPeriodic() {
forceStatus = structs.JobStatusRunning
job.Status = structs.JobStatusRunning
} else {
forceStatus = structs.JobStatusPending
job.Status = structs.JobStatusPending
}
}

// Set the job's status
if err := s.setJobStatus(watcher, txn, job, false, forceStatus); err != nil {
return fmt.Errorf("setting job status failed: %v", err)
}

// Insert the job
if err := txn.Insert("jobs", job); err != nil {
return fmt.Errorf("job insert failed: %v", err)
Expand Down Expand Up @@ -547,7 +550,7 @@ func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) erro
}

// Set the job's status
if err := s.setJobStatuses(watcher, txn, jobs, false); err != nil {
if err := s.setJobStatuses(index, watcher, txn, jobs, false); err != nil {
return fmt.Errorf("setting job status failed: %v", err)
}

Expand Down Expand Up @@ -623,7 +626,6 @@ func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) e
watcher.Add(watch.Item{AllocEval: realAlloc.EvalID})
watcher.Add(watch.Item{AllocJob: realAlloc.JobID})
watcher.Add(watch.Item{AllocNode: realAlloc.NodeID})
jobs[realAlloc.JobID] = ""
}

// Update the indexes
Expand All @@ -635,7 +637,7 @@ func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) e
}

// Set the job's status
if err := s.setJobStatuses(watcher, txn, jobs, true); err != nil {
if err := s.setJobStatuses(index, watcher, txn, jobs, true); err != nil {
return fmt.Errorf("setting job status failed: %v", err)
}

Expand Down Expand Up @@ -760,7 +762,7 @@ func (s *StateStore) UpdateAllocFromClient(index uint64, alloc *structs.Allocati
forceStatus = structs.JobStatusRunning
}
jobs := map[string]string{alloc.JobID: forceStatus}
if err := s.setJobStatuses(watcher, txn, jobs, false); err != nil {
if err := s.setJobStatuses(index, watcher, txn, jobs, false); err != nil {
return fmt.Errorf("setting job status failed: %v", err)
}

Expand All @@ -779,7 +781,7 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er
watcher.Add(watch.Item{Table: "allocs"})

// Handle the allocations
jobs := make(map[string]string, len(allocs))
jobs := make(map[string]string, 1)
for _, alloc := range allocs {
existing, err := txn.First("allocs", "id", alloc.ID)
if err != nil {
Expand All @@ -800,6 +802,7 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er
return fmt.Errorf("alloc insert failed: %v", err)
}

// If the allocation is running, force the job to running status.
forceStatus := ""
if !alloc.TerminalStatus() {
forceStatus = structs.JobStatusRunning
Expand All @@ -818,7 +821,7 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er
}

// Set the job's status
if err := s.setJobStatuses(watcher, txn, jobs, false); err != nil {
if err := s.setJobStatuses(index, watcher, txn, jobs, false); err != nil {
return fmt.Errorf("setting job status failed: %v", err)
}

Expand Down Expand Up @@ -959,7 +962,7 @@ func (s *StateStore) Indexes() (memdb.ResultIterator, error) {
// setJobStatuses is a helper for calling setJobStatus on multiple jobs by ID.
// It takes a map of job IDs to an optional forceStatus string. It returns an
// error if the job doesn't exist or setJobStatus fails.
func (s *StateStore) setJobStatuses(watcher watch.Items, txn *memdb.Txn,
func (s *StateStore) setJobStatuses(index uint64, watcher watch.Items, txn *memdb.Txn,
jobs map[string]string, evalDelete bool) error {
for job, forceStatus := range jobs {
existing, err := txn.First("jobs", "id", job)
Expand All @@ -971,7 +974,7 @@ func (s *StateStore) setJobStatuses(watcher watch.Items, txn *memdb.Txn,
continue
}

if err := s.setJobStatus(watcher, txn, existing.(*structs.Job), evalDelete, forceStatus); err != nil {
if err := s.setJobStatus(index, watcher, txn, existing.(*structs.Job), evalDelete, forceStatus); err != nil {
return err
}
}
Expand All @@ -984,64 +987,87 @@ func (s *StateStore) setJobStatuses(watcher watch.Items, txn *memdb.Txn,
// called because an evaluation is being deleted (potentially because of garbage
// collection). If forceStatus is non-empty, the job's status will be set to the
// passed status.
func (s *StateStore) setJobStatus(watcher watch.Items, txn *memdb.Txn,
func (s *StateStore) setJobStatus(index uint64, watcher watch.Items, txn *memdb.Txn,
job *structs.Job, evalDelete bool, forceStatus string) error {

// Capture the current status so we can check if there is a change
oldStatus := job.Status
newStatus := forceStatus

// If forceStatus is not set, compute the jobs status.
if forceStatus == "" {
var err error
newStatus, err = s.getJobStatus(txn, job, evalDelete)
if err != nil {
return err
}
}

// Fast-path if nothing has changed.
if oldStatus == newStatus {
return nil
}

// The job has changed, so add to watcher.
watcher.Add(watch.Item{Table: "jobs"})
watcher.Add(watch.Item{Job: job.ID})

// If forceStatus is set, immediately set the job's status
if forceStatus != "" {
job.Status = forceStatus
return nil
// Copy and update the existing job
updated := job.Copy()
updated.Status = newStatus
updated.ModifyIndex = index

// Insert the job
if err := txn.Insert("jobs", updated); err != nil {
return fmt.Errorf("job insert failed: %v", err)
}
if err := txn.Insert("index", &IndexEntry{"jobs", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
return nil
}

func (s *StateStore) getJobStatus(txn *memdb.Txn, job *structs.Job, evalDelete bool) (string, error) {
allocs, err := txn.Get("allocs", "job", job.ID)
if err != nil {
return err
return "", err
}

// If there is a non-terminal allocation, the job is running.
hasAlloc := false
for alloc := allocs.Next(); alloc != nil; alloc = allocs.Next() {
hasAlloc = true
if !alloc.(*structs.Allocation).TerminalStatus() {
job.Status = structs.JobStatusRunning
return nil
return structs.JobStatusRunning, nil
}
}

evals, err := txn.Get("evals", "job", job.ID)
if err != nil {
return err
return "", err
}

hasEval := false
for eval := evals.Next(); eval != nil; eval = evals.Next() {
hasEval = true
if !eval.(*structs.Evaluation).TerminalStatus() {
job.Status = structs.JobStatusPending
return nil
return structs.JobStatusPending, nil
}
}

// The job is dead if all the allocations and evals are terminal or if there
// are no evals because of garbage collection.
if evalDelete || hasEval || hasAlloc {
job.Status = structs.JobStatusDead
return nil
return structs.JobStatusDead, nil
}

// If there are no allocations or evaluations it is a new job. If the job is
// periodic, we mark it as running as it will never have an
// allocation/evaluation against it.
if job.IsPeriodic() {
job.Status = structs.JobStatusRunning
} else {
job.Status = structs.JobStatusPending
return structs.JobStatusRunning, nil
}

return nil
return structs.JobStatusPending, nil
}

// StateSnapshot is used to provide a point-in-time snapshot
Expand Down
Loading

0 comments on commit 875bf47

Please sign in to comment.