Skip to content

Commit

Permalink
Reconciling the queued allocations during restore
Browse files Browse the repository at this point in the history
  • Loading branch information
diptanu committed Jul 26, 2016
1 parent a9c995b commit 3d4c185
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 10 deletions.
67 changes: 65 additions & 2 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/armon/go-metrics"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/scheduler"
"github.com/hashicorp/raft"
"github.com/ugorji/go/codec"
)
Expand Down Expand Up @@ -579,11 +580,73 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error {

// Create Job Summaries
// COMPAT 0.4 -> 0.4.1
if err := restore.CreateJobSummaries(); err != nil {
jobs, err := restore.JobsWithoutSummary()
if err != nil {
fmt.Errorf("error retreiving jobs during restore: %v", err)
}
if err := restore.CreateJobSummaries(jobs); err != nil {
return fmt.Errorf("error creating job summaries: %v", err)
}

// Commit the state restore
restore.Commit()
return n.reconcileSummaries(jobs)
}

// reconcileSummaries re-calculates the queued allocations for every job that we
// created a Job Summary during the snap shot restore
func (n *nomadFSM) reconcileSummaries(jobs []*structs.Job) error {
// Start the state restore
restore, err := n.state.Restore()
if err != nil {
return err
}
defer restore.Abort()
snap, err := n.state.Snapshot()
if err != nil {
return fmt.Errorf("unable to create snapshot: %v", err)
}
for _, job := range jobs {
planner := &scheduler.Harness{
State: &snap.StateStore,
}
// Create an eval and mark it as requiring annotations and insert that as well
eval := &structs.Evaluation{
ID: structs.GenerateUUID(),
Priority: job.Priority,
Type: job.Type,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
JobModifyIndex: job.JobModifyIndex + 1,
Status: structs.EvalStatusPending,
AnnotatePlan: true,
}

// Create the scheduler and run it
sched, err := scheduler.NewScheduler(eval.Type, n.logger, snap, planner)
if err != nil {
return err
}

if err := sched.Process(eval); err != nil {
return err
}
summary, err := snap.JobSummaryByID(job.ID)
if err != nil {
return err
}
for tg, queued := range planner.Evals[0].QueuedAllocations {
tgSummary, ok := summary.Summary[tg]
if !ok {
return fmt.Errorf("task group %q not found while updating queued count", tg)
}
tgSummary.Queued = queued
summary.Summary[tg] = tgSummary
}

if err := restore.JobSummaryRestore(summary); err != nil {
return err
}
}
restore.Commit()
return nil
}
Expand Down
26 changes: 26 additions & 0 deletions nomad/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -972,3 +972,29 @@ func TestFSM_SnapshotRestore_JobSummary(t *testing.T) {
t.Fatalf("bad: \n%#v\n%#v", js2, out2)
}
}

func TestFSM_SnapshotRestore_AddMissingSummary(t *testing.T) {
// Add some state
fsm := testFSM(t)
state := fsm.State()

job1 := mock.Job()
state.UpsertJob(1000, job1)
state.DeleteJobSummary(1010, job1.ID)

fsm2 := testSnapshotRestore(t, fsm)
state2 := fsm2.State()
out1, _ := state2.JobSummaryByID(job1.ID)
expected := structs.JobSummary{
JobID: job1.ID,
Summary: map[string]structs.TaskGroupSummary{
"web": structs.TaskGroupSummary{
Queued: 10,
},
},
}

if !reflect.DeepEqual(&expected, out1) {
t.Fatalf("expected: %#v, actual: %#v", expected, out1)
}
}
35 changes: 28 additions & 7 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,20 @@ func (s *StateStore) UpsertJobSummary(index uint64, jobSummary *structs.JobSumma
return nil
}

// DeleteJobSummary deletes the job summary with the given ID. This is for
// testing purposes only.
func (s *StateStore) DeleteJobSummary(index uint64, id string) error {
txn := s.db.Txn(true)
defer txn.Abort()

// Delete the job summary
if _, err := txn.DeleteAll("job_summary", "id", id); err != nil {
return fmt.Errorf("deleting job summary failed: %v", err)
}
txn.Commit()
return nil
}

// UpsertNode is used to register a node or update a node definition
// This is assumed to be triggered by the client, so we retain the value
// of drain which is set by the scheduler.
Expand Down Expand Up @@ -1501,13 +1515,13 @@ func (r *StateRestore) JobSummaryRestore(jobSummary *structs.JobSummary) error {
return nil
}

// CreateJobSummaries computes the job summaries for all the jobs
func (r *StateRestore) CreateJobSummaries() error {
// JobsWithoutSummary returns the list of jobs which don't have any summary
func (r *StateRestore) JobsWithoutSummary() ([]*structs.Job, error) {
// Get all the jobs
var jobs []*structs.Job
iter, err := r.txn.Get("jobs", "id")
if err != nil {
return fmt.Errorf("couldn't retrieve jobs: %v", err)
return nil, fmt.Errorf("couldn't retrieve jobs: %v", err)
}
for {
raw := iter.Next()
Expand All @@ -1517,21 +1531,24 @@ func (r *StateRestore) CreateJobSummaries() error {

// Filter the jobs which have summaries
job := raw.(*structs.Job)
jobSummary, err := r.txn.Get("job_summary", "id", job.ID)
jobSummary, err := r.txn.First("job_summary", "id", job.ID)
if err != nil {
return fmt.Errorf("unable to get job summary: %v", err)
return nil, fmt.Errorf("unable to get job summary: %v", err)
}
if jobSummary != nil {
continue
}

jobs = append(jobs, job)
}
return jobs, nil
}

// CreateJobSummaries computes the job summaries for all the jobs
func (r *StateRestore) CreateJobSummaries(jobs []*structs.Job) error {
for _, job := range jobs {

// Get all the allocations for the job
iter, err = r.txn.Get("allocs", "job", job.ID)
iter, err := r.txn.Get("allocs", "job", job.ID)
if err != nil {
return fmt.Errorf("couldn't retrieve allocations for job %v: %v", job.ID, err)
}
Expand All @@ -1549,6 +1566,9 @@ func (r *StateRestore) CreateJobSummaries() error {
JobID: job.ID,
Summary: make(map[string]structs.TaskGroupSummary),
}
for _, tg := range job.TaskGroups {
summary.Summary[tg.Name] = structs.TaskGroupSummary{}
}
// Calculate the summary for the job
for _, alloc := range allocs {
if _, ok := summary.Summary[alloc.TaskGroup]; !ok {
Expand All @@ -1570,6 +1590,7 @@ func (r *StateRestore) CreateJobSummaries() error {
summary.Summary[alloc.TaskGroup] = tg
}
// Insert the job summary

if err := r.txn.Insert("job_summary", summary); err != nil {
return fmt.Errorf("error inserting job summary: %v", err)
}
Expand Down
26 changes: 25 additions & 1 deletion nomad/state/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1132,7 +1132,7 @@ func TestStateStore_CreateJobSummaries(t *testing.T) {
}

// Create the job summaries
if err := restore.CreateJobSummaries(); err != nil {
if err := restore.CreateJobSummaries([]*structs.Job{job}); err != nil {
t.Fatalf("err: %v", err)
}
restore.Commit()
Expand All @@ -1155,6 +1155,30 @@ func TestStateStore_CreateJobSummaries(t *testing.T) {
}
}

func TestStateRestore_JobsWithoutSummaries(t *testing.T) {
state := testStateStore(t)
restore, err := state.Restore()
if err != nil {
t.Fatalf("err: %v", err)
}
// Restore a job
job := mock.Job()
if err := restore.JobRestore(job); err != nil {
t.Fatalf("err: %v", err)
}

jobs, err := restore.JobsWithoutSummary()
if err != nil {
t.Fatalf("err: %v", err)
}
if len(jobs) != 1 {
t.Fatalf("expected: %v, actual: %v", 1, len(jobs))
}
if !reflect.DeepEqual(job, jobs[0]) {
t.Fatalf("Bad: %#v %#v", job, jobs[0])
}
}

func TestStateStore_Indexes(t *testing.T) {
state := testStateStore(t)
node := mock.Node()
Expand Down

0 comments on commit 3d4c185

Please sign in to comment.