Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement blocking queries for /v1/job/evaluations #1892

Merged
merged 1 commit into from
Oct 31, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 28 additions & 18 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,26 +570,36 @@ func (j *Job) Evaluations(args *structs.JobSpecificRequest,
}
defer metrics.MeasureSince([]string{"nomad", "job", "evaluations"}, time.Now())

// Capture the evaluations
snap, err := j.srv.fsm.State().Snapshot()
if err != nil {
return err
}
reply.Evaluations, err = snap.EvalsByJob(args.JobID)
if err != nil {
return err
}
// Setup the blocking query
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
watch: watch.NewItems(watch.Item{EvalJob: args.JobID}),
run: func() error {
// Capture the evals
snap, err := j.srv.fsm.State().Snapshot()
if err != nil {
return err
}

// Use the last index that affected the evals table
index, err := snap.Index("evals")
if err != nil {
return err
}
reply.Index = index
reply.Evaluations, err = snap.EvalsByJob(args.JobID)
if err != nil {
return err
}

// Set the query response
j.srv.setQueryMeta(&reply.QueryMeta)
return nil
// Use the last index that affected the evals table
index, err := snap.Index("evals")
if err != nil {
return err
}
reply.Index = index

// Set the query response
j.srv.setQueryMeta(&reply.QueryMeta)
return nil
}}

return j.srv.blockingRPC(&opts)
}

// Plan is used to cause a dry-run evaluation of the Job and return the results
Expand Down
53 changes: 53 additions & 0 deletions nomad/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1421,6 +1421,59 @@ func TestJobEndpoint_Evaluations(t *testing.T) {
}
}

func TestJobEndpoint_Evaluations_Blocking(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

// Create the register request
eval1 := mock.Eval()
eval2 := mock.Eval()
eval2.JobID = "job1"
state := s1.fsm.State()

// First upsert an unrelated eval
time.AfterFunc(100*time.Millisecond, func() {
err := state.UpsertEvals(100, []*structs.Evaluation{eval1})
if err != nil {
t.Fatalf("err: %v", err)
}
})

// Upsert an eval for the job we are interested in later
time.AfterFunc(200*time.Millisecond, func() {
err := state.UpsertEvals(200, []*structs.Evaluation{eval2})
if err != nil {
t.Fatalf("err: %v", err)
}
})

// Lookup the jobs
get := &structs.JobSpecificRequest{
JobID: "job1",
QueryOptions: structs.QueryOptions{
Region: "global",
MinQueryIndex: 50,
},
}
var resp structs.JobEvaluationsResponse
start := time.Now()
if err := msgpackrpc.CallWithCodec(codec, "Job.Evaluations", get, &resp); err != nil {
t.Fatalf("err: %v", err)
}

if elapsed := time.Since(start); elapsed < 200*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
}
if resp.Index != 200 {
t.Fatalf("Bad index: %d %d", resp.Index, 200)
}
if len(resp.Evaluations) != 1 || resp.Evaluations[0].JobID != "job1" {
t.Fatalf("bad: %#v", resp.Evaluations)
}
}

func TestJobEndpoint_Plan_WithDiff(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
Expand Down
6 changes: 5 additions & 1 deletion nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,7 @@ func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) erro
jobs := make(map[string]string, len(evals))
for _, eval := range evals {
watcher.Add(watch.Item{Eval: eval.ID})
watcher.Add(watch.Item{EvalJob: eval.JobID})
if err := s.nestedUpsertEval(txn, index, eval); err != nil {
return err
}
Expand Down Expand Up @@ -734,8 +735,10 @@ func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) e
if err := txn.Delete("evals", existing); err != nil {
return fmt.Errorf("eval delete failed: %v", err)
}
jobID := existing.(*structs.Evaluation).JobID
watcher.Add(watch.Item{Eval: eval})
jobs[existing.(*structs.Evaluation).JobID] = ""
watcher.Add(watch.Item{EvalJob: jobID})
jobs[jobID] = ""
}

for _, alloc := range allocs {
Expand Down Expand Up @@ -1729,6 +1732,7 @@ func (r *StateRestore) JobRestore(job *structs.Job) error {
func (r *StateRestore) EvalRestore(eval *structs.Evaluation) error {
r.items.Add(watch.Item{Table: "evals"})
r.items.Add(watch.Item{Eval: eval.ID})
r.items.Add(watch.Item{EvalJob: eval.JobID})
if err := r.txn.Insert("evals", eval); err != nil {
return fmt.Errorf("eval insert failed: %v", err)
}
Expand Down
9 changes: 7 additions & 2 deletions nomad/state/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1227,7 +1227,8 @@ func TestStateStore_UpsertEvals_Eval(t *testing.T) {
notify := setupNotifyTest(
state,
watch.Item{Table: "evals"},
watch.Item{Eval: eval.ID})
watch.Item{Eval: eval.ID},
watch.Item{EvalJob: eval.JobID})

err := state.UpsertEvals(1000, []*structs.Evaluation{eval})
if err != nil {
Expand Down Expand Up @@ -1266,10 +1267,12 @@ func TestStateStore_Update_UpsertEvals_Eval(t *testing.T) {
notify := setupNotifyTest(
state,
watch.Item{Table: "evals"},
watch.Item{Eval: eval.ID})
watch.Item{Eval: eval.ID},
watch.Item{EvalJob: eval.JobID})

eval2 := mock.Eval()
eval2.ID = eval.ID
eval2.JobID = eval.JobID
err = state.UpsertEvals(1001, []*structs.Evaluation{eval2})
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -1315,6 +1318,8 @@ func TestStateStore_DeleteEval_Eval(t *testing.T) {
watch.Item{Table: "allocs"},
watch.Item{Eval: eval1.ID},
watch.Item{Eval: eval2.ID},
watch.Item{EvalJob: eval1.JobID},
watch.Item{EvalJob: eval2.JobID},
watch.Item{Alloc: alloc1.ID},
watch.Item{Alloc: alloc2.ID},
watch.Item{AllocEval: alloc1.EvalID},
Expand Down
1 change: 1 addition & 0 deletions nomad/watch/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type Item struct {
AllocJob string
AllocNode string
Eval string
EvalJob string
Job string
JobSummary string
Node string
Expand Down