Skip to content

Commit

Permalink
Merge pull request #1517 from hashicorp/f-reconcile-summaries
Browse files Browse the repository at this point in the history
Endpoint for reconciling job summaries
  • Loading branch information
diptanu authored Aug 4, 2016
2 parents 789a6b2 + 3bdaf91 commit ca857d9
Show file tree
Hide file tree
Showing 10 changed files with 471 additions and 195 deletions.
1 change: 1 addition & 0 deletions command/agent/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
s.mux.HandleFunc("/v1/status/peers", s.wrap(s.StatusPeersRequest))

s.mux.HandleFunc("/v1/system/gc", s.wrap(s.GarbageCollectRequest))
s.mux.HandleFunc("/v1/system/reconcile/summaries", s.wrap(s.ReconcileJobSummaries))

if enableDebug {
s.mux.HandleFunc("/debug/pprof/", pprof.Index)
Expand Down
17 changes: 17 additions & 0 deletions command/agent/system_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,20 @@ func (s *HTTPServer) GarbageCollectRequest(resp http.ResponseWriter, req *http.R
}
return nil, nil
}

func (s *HTTPServer) ReconcileJobSummaries(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != "PUT" {
return nil, CodedError(405, ErrInvalidMethod)
}

var args structs.GenericRequest
if s.parse(resp, req, &args.Region, &args.QueryOptions) {
return nil, nil
}

var gResp structs.GenericResponse
if err := s.agent.RPC("System.ReconcileJobSummaries", &args, &gResp); err != nil {
return nil, err
}
return nil, nil
}
20 changes: 20 additions & 0 deletions command/agent/system_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,23 @@ func TestHTTP_SystemGarbageCollect(t *testing.T) {
}
})
}

func TestHTTP_ReconcileJobSummaries(t *testing.T) {
httpTest(t, nil, func(s *TestServer) {
// Make the HTTP request
req, err := http.NewRequest("PUT", "/v1/system/reconcile/summaries", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
respW := httptest.NewRecorder()

// Make the request
if _, err := s.Server.ReconcileJobSummaries(respW, req); err != nil {
t.Fatalf("err: %v", err)
}

if respW.Code != 200 {
t.Fatalf("expected: %v, actual: %v", 200, respW.Code)
}
})
}
92 changes: 72 additions & 20 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
return n.applyAllocUpdate(buf[1:], log.Index)
case structs.AllocClientUpdateRequestType:
return n.applyAllocClientUpdate(buf[1:], log.Index)
case structs.ReconcileJobSummariesRequestType:
return n.applyReconcileSummaries(buf[1:], log.Index)
default:
if ignoreUnknown {
n.logger.Printf("[WARN] nomad.fsm: ignoring unknown message type (%d), upgrade to newer version", msgType)
Expand Down Expand Up @@ -444,6 +446,14 @@ func (n *nomadFSM) applyAllocClientUpdate(buf []byte, index uint64) interface{}
return nil
}

// applyReconcileSummaries reconciles summaries for all the jobs
func (n *nomadFSM) applyReconcileSummaries(buf []byte, index uint64) interface{} {
if err := n.state.ReconcileJobSummaries(index); err != nil {
return err
}
return n.reconcileQueuedAllocations(index)
}

func (n *nomadFSM) Snapshot() (raft.FSMSnapshot, error) {
// Create a new snapshot
snap, err := n.state.Snapshot()
Expand Down Expand Up @@ -578,39 +588,60 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error {
}
}

restore.Commit()

// Create Job Summaries
// The entire snapshot has to be restored first before we create the missing
// job summaries so that the indexes are updated and we know the highest
// index
// COMPAT 0.4 -> 0.4.1
jobs, err := restore.JobsWithoutSummary()
// We can remove this in 0.5. This exists so that the server creates job
// summaries if they were not present previously. When users upgrade to 0.5
// from 0.4.1, the snapshot will contain job summaries so it will be safe to
// remove this block.
index, err := n.state.Index("job_summary")
if err != nil {
fmt.Errorf("error retreiving jobs during restore: %v", err)
}
if err := restore.CreateJobSummaries(jobs); err != nil {
return fmt.Errorf("error creating job summaries: %v", err)
return fmt.Errorf("couldn't fetch index of job summary table: %v", err)
}

restore.Commit()
// If the index is 0 that means there is no job summary in the snapshot so
// we will have to create them
if index == 0 {
// query the latest index
latestIndex, err := n.state.LatestIndex()
if err != nil {
return fmt.Errorf("unable to query latest index: %v", index)
}
if err := n.state.ReconcileJobSummaries(latestIndex); err != nil {
return fmt.Errorf("error reconciling summaries: %v", err)
}
if err := n.reconcileQueuedAllocations(latestIndex); err != nil {
return fmt.Errorf("error re-computing the number of queued allocations:; %v", err)
}
}

// Reconciling the queued allocations
return n.reconcileSummaries(jobs)
return nil
}

// reconcileSummaries re-calculates the queued allocations for every job that we
// created a Job Summary during the snap shot restore
func (n *nomadFSM) reconcileSummaries(jobs []*structs.Job) error {
// Start the state restore
restore, err := n.state.Restore()
func (n *nomadFSM) reconcileQueuedAllocations(index uint64) error {
// Get all the jobs
iter, err := n.state.Jobs()
if err != nil {
return err
}
defer restore.Abort()

snap, err := n.state.Snapshot()
if err != nil {
return fmt.Errorf("unable to create snapshot: %v", err)
}
for _, job := range jobs {

// Invoking the scheduler for every job so that we can populate the number
// of queued allocations for every job
for {
rawJob := iter.Next()
if rawJob == nil {
break
}
job := rawJob.(*structs.Job)
planner := &scheduler.Harness{
State: &snap.StateStore,
}
Expand All @@ -635,10 +666,32 @@ func (n *nomadFSM) reconcileSummaries(jobs []*structs.Job) error {
if err := sched.Process(eval); err != nil {
return err
}
summary, err := snap.JobSummaryByID(job.ID)

// Get the job summary from the fsm state store
summary, err := n.state.JobSummaryByID(job.ID)
if err != nil {
return err
}

// Add the allocations scheduler has made to queued since these
// allocations are never getting placed until the scheduler is invoked
// with a real planner
if l := len(planner.Plans); l != 1 {
return fmt.Errorf("unexpected number of plans during restore %d. Please file an issue including the logs", l)
}
for _, allocations := range planner.Plans[0].NodeAllocation {
for _, allocation := range allocations {
tgSummary, ok := summary.Summary[allocation.TaskGroup]
if !ok {
return fmt.Errorf("task group %q not found while updating queued count", allocation.TaskGroup)
}
tgSummary.Queued += 1
summary.Summary[allocation.TaskGroup] = tgSummary
}
}

// Add the queued allocations attached to the evaluation to the queued
// counter of the job summary
if l := len(planner.Evals); l != 1 {
return fmt.Errorf("unexpected number of evals during restore %d. Please file an issue including the logs", l)
}
Expand All @@ -647,15 +700,14 @@ func (n *nomadFSM) reconcileSummaries(jobs []*structs.Job) error {
if !ok {
return fmt.Errorf("task group %q not found while updating queued count", tg)
}
tgSummary.Queued = queued
tgSummary.Queued += queued
summary.Summary[tg] = tgSummary
}

if err := restore.JobSummaryRestore(summary); err != nil {
if err := n.state.UpsertJobSummary(index, summary); err != nil {
return err
}
}
restore.Commit()
return nil
}

Expand Down
43 changes: 41 additions & 2 deletions nomad/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -981,13 +981,34 @@ func TestFSM_SnapshotRestore_AddMissingSummary(t *testing.T) {
fsm := testFSM(t)
state := fsm.State()

// Add a node
node := mock.Node()
state.UpsertNode(800, node)

// Make a job so that none of the tasks can be placed
job1 := mock.Job()
job1.TaskGroups[0].Tasks[0].Resources.CPU = 5000
state.UpsertJob(1000, job1)
state.DeleteJobSummary(1010, job1.ID)

// make a job which can make partial progress
alloc := mock.Alloc()
alloc.NodeID = node.ID
state.UpsertJob(1010, alloc.Job)
state.UpsertAllocs(1011, []*structs.Allocation{alloc})

// Delete the summaries
state.DeleteJobSummary(1030, job1.ID)
state.DeleteJobSummary(1040, alloc.Job.ID)

// Delete the index
if err := state.RemoveIndex("job_summary"); err != nil {
t.Fatalf("err: %v", err)
}

fsm2 := testSnapshotRestore(t, fsm)
state2 := fsm2.State()
latestIndex, _ := state.LatestIndex()

out1, _ := state2.JobSummaryByID(job1.ID)
expected := structs.JobSummary{
JobID: job1.ID,
Expand All @@ -999,8 +1020,26 @@ func TestFSM_SnapshotRestore_AddMissingSummary(t *testing.T) {
CreateIndex: latestIndex,
ModifyIndex: latestIndex,
}

if !reflect.DeepEqual(&expected, out1) {
t.Fatalf("expected: %#v, actual: %#v", &expected, out1)
}

// This exercises the code path which adds the allocations made by the
// planner and the number of unplaced allocations in the reconcile summaries
// codepath
out2, _ := state2.JobSummaryByID(alloc.Job.ID)
expected = structs.JobSummary{
JobID: alloc.Job.ID,
Summary: map[string]structs.TaskGroupSummary{
"web": structs.TaskGroupSummary{
Queued: 10,
Starting: 1,
},
},
CreateIndex: latestIndex,
ModifyIndex: latestIndex,
}
if !reflect.DeepEqual(&expected, out2) {
t.Fatalf("expected: %#v, actual: %#v", &expected, out2)
}
}
Loading

0 comments on commit ca857d9

Please sign in to comment.