Skip to content

Commit

Permalink
Merge pull request #310 from hashicorp/f-node-up-eval
Browse files Browse the repository at this point in the history
Create evals for system jobs on node registration
  • Loading branch information
dadgar committed Oct 22, 2015
2 parents 9370661 + d234a84 commit 1e9416a
Show file tree
Hide file tree
Showing 5 changed files with 267 additions and 36 deletions.
44 changes: 40 additions & 4 deletions nomad/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,10 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct
}

// Check if we should trigger evaluations
if structs.ShouldDrainNode(args.Status) {
initToReady := node.Status == structs.NodeStatusInit && args.Status == structs.NodeStatusReady
terminalToReady := node.Status == structs.NodeStatusDown && args.Status == structs.NodeStatusReady
transitionToReady := initToReady || terminalToReady
if structs.ShouldDrainNode(args.Status) || transitionToReady {
evalIDs, evalIndex, err := n.createNodeEvals(args.NodeID, index)
if err != nil {
n.srv.logger.Printf("[ERR] nomad.client: eval creation failed: %v", err)
Expand Down Expand Up @@ -271,7 +274,7 @@ func (n *Node) Evaluate(args *structs.NodeEvaluateRequest, reply *structs.NodeUp
return nil
}

// GetNode is used to request information about a specific ndoe
// GetNode is used to request information about a specific node
func (n *Node) GetNode(args *structs.NodeSpecificRequest,
reply *structs.SingleNodeResponse) error {
if done, err := n.srv.forward("Node.GetNode", args, args, reply); done {
Expand Down Expand Up @@ -312,7 +315,7 @@ func (n *Node) GetNode(args *structs.NodeSpecificRequest,
return nil
}

// GetAllocs is used to request allocations for a specific ndoe
// GetAllocs is used to request allocations for a specific node
func (n *Node) GetAllocs(args *structs.NodeSpecificRequest,
reply *structs.NodeAllocsResponse) error {
if done, err := n.srv.forward("Node.GetAllocs", args, args, reply); done {
Expand Down Expand Up @@ -447,8 +450,18 @@ func (n *Node) createNodeEvals(nodeID string, nodeIndex uint64) ([]string, uint6
return nil, 0, fmt.Errorf("failed to find allocs for '%s': %v", nodeID, err)
}

sysJobsIter, err := snap.JobsByScheduler("system")
if err != nil {
return nil, 0, fmt.Errorf("failed to find system jobs for '%s': %v", nodeID, err)
}

var sysJobs []*structs.Job
for job := sysJobsIter.Next(); job != nil; job = sysJobsIter.Next() {
sysJobs = append(sysJobs, job.(*structs.Job))
}

// Fast-path if nothing to do
if len(allocs) == 0 {
if len(allocs) == 0 && len(sysJobs) == 0 {
return nil, 0, nil
}

Expand Down Expand Up @@ -479,6 +492,29 @@ func (n *Node) createNodeEvals(nodeID string, nodeIndex uint64) ([]string, uint6
evalIDs = append(evalIDs, eval.ID)
}

// Create an evaluation for each system job.
for _, job := range sysJobs {
// Still dedup on JobID as the node may already have the system job.
if _, ok := jobIDs[job.ID]; ok {
continue
}
jobIDs[job.ID] = struct{}{}

// Create a new eval
eval := &structs.Evaluation{
ID: structs.GenerateUUID(),
Priority: job.Priority,
Type: job.Type,
TriggeredBy: structs.EvalTriggerNodeUpdate,
JobID: job.ID,
NodeID: nodeID,
NodeModifyIndex: nodeIndex,
Status: structs.EvalStatusPending,
}
evals = append(evals, eval)
evalIDs = append(evalIDs, eval.ID)
}

// Create the Raft transaction
update := &structs.EvalUpdateRequest{
Evals: evals,
Expand Down
170 changes: 138 additions & 32 deletions nomad/node_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,87 @@ func TestClientEndpoint_UpdateStatus(t *testing.T) {
}
}

func TestClientEndpoint_UpdateStatus_GetEvals(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

// Register a system job.
job := mock.SystemJob()
state := s1.fsm.State()
if err := state.UpsertJob(1, job); err != nil {
t.Fatalf("err: %v", err)
}

// Create the register request
node := mock.Node()
node.Status = structs.NodeStatusInit
reg := &structs.NodeRegisterRequest{
Node: node,
WriteRequest: structs.WriteRequest{Region: "global"},
}

// Fetch the response
var resp structs.NodeUpdateResponse
if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil {
t.Fatalf("err: %v", err)
}

// Check for heartbeat interval
ttl := resp.HeartbeatTTL
if ttl < s1.config.MinHeartbeatTTL || ttl > 2*s1.config.MinHeartbeatTTL {
t.Fatalf("bad: %#v", ttl)
}

// Update the status
update := &structs.NodeUpdateStatusRequest{
NodeID: node.ID,
Status: structs.NodeStatusReady,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp2 structs.NodeUpdateResponse
if err := msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", update, &resp2); err != nil {
t.Fatalf("err: %v", err)
}
if resp2.Index == 0 {
t.Fatalf("bad index: %d", resp2.Index)
}

// Check for an eval caused by the system job.
if len(resp2.EvalIDs) != 1 {
t.Fatalf("expected one eval; got %#v", resp2.EvalIDs)
}

evalID := resp2.EvalIDs[0]
eval, err := state.EvalByID(evalID)
if err != nil {
t.Fatalf("could not get eval %v", evalID)
}

if eval.Type != "system" {
t.Fatalf("unexpected eval type; got %v; want %q", eval.Type, "system")
}

// Check for heartbeat interval
ttl = resp2.HeartbeatTTL
if ttl < s1.config.MinHeartbeatTTL || ttl > 2*s1.config.MinHeartbeatTTL {
t.Fatalf("bad: %#v", ttl)
}

// Check for the node in the FSM
out, err := state.NodeByID(node.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("expected node")
}
if out.ModifyIndex != resp2.Index {
t.Fatalf("index mis-match")
}
}

func TestClientEndpoint_UpdateStatus_HeartbeatOnly(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
Expand Down Expand Up @@ -476,8 +557,13 @@ func TestClientEndpoint_CreateNodeEvals(t *testing.T) {
// Inject fake evaluations
alloc := mock.Alloc()
state := s1.fsm.State()
err := state.UpsertAllocs(1, []*structs.Allocation{alloc})
if err != nil {
if err := state.UpsertAllocs(1, []*structs.Allocation{alloc}); err != nil {
t.Fatalf("err: %v", err)
}

// Inject a fake system job.
job := mock.SystemJob()
if err := state.UpsertJob(1, job); err != nil {
t.Fatalf("err: %v", err)
}

Expand All @@ -489,42 +575,62 @@ func TestClientEndpoint_CreateNodeEvals(t *testing.T) {
if index == 0 {
t.Fatalf("bad: %d", index)
}
if len(ids) != 1 {
if len(ids) != 2 {
t.Fatalf("bad: %s", ids)
}

// Lookup the evaluation
eval, err := state.EvalByID(ids[0])
if err != nil {
t.Fatalf("err: %v", err)
}
if eval == nil {
t.Fatalf("expected eval")
}
if eval.CreateIndex != index {
t.Fatalf("index mis-match")
}
// Lookup the evaluations
evalByType := make(map[string]*structs.Evaluation, 2)
for _, id := range ids {
eval, err := state.EvalByID(id)
if err != nil {
t.Fatalf("err: %v", err)
}
if eval == nil {
t.Fatalf("expected eval")
}

if eval.Priority != alloc.Job.Priority {
t.Fatalf("bad: %#v", eval)
}
if eval.Type != alloc.Job.Type {
t.Fatalf("bad: %#v", eval)
}
if eval.TriggeredBy != structs.EvalTriggerNodeUpdate {
t.Fatalf("bad: %#v", eval)
}
if eval.JobID != alloc.JobID {
t.Fatalf("bad: %#v", eval)
}
if eval.NodeID != alloc.NodeID {
t.Fatalf("bad: %#v", eval)
if old, ok := evalByType[eval.Type]; ok {
t.Fatalf("multiple evals of the same type: %v and %v", old, eval)
}

evalByType[eval.Type] = eval
}
if eval.NodeModifyIndex != 1 {
t.Fatalf("bad: %#v", eval)

if len(evalByType) != 2 {
t.Fatalf("Expected a service and system job; got %#v", evalByType)
}
if eval.Status != structs.EvalStatusPending {
t.Fatalf("bad: %#v", eval)

// Ensure the evals are correct.
for schedType, eval := range evalByType {
expPriority := alloc.Job.Priority
expJobID := alloc.JobID
if schedType == "system" {
expPriority = job.Priority
expJobID = job.ID
}

if eval.CreateIndex != index {
t.Fatalf("CreateIndex mis-match on type %v: %#v", schedType, eval)
}
if eval.TriggeredBy != structs.EvalTriggerNodeUpdate {
t.Fatalf("TriggeredBy incorrect on type %v: %#v", schedType, eval)
}
if eval.NodeID != alloc.NodeID {
t.Fatalf("NodeID incorrect on type %v: %#v", schedType, eval)
}
if eval.NodeModifyIndex != 1 {
t.Fatalf("NodeModifyIndex incorrect on type %v: %#v", schedType, eval)
}
if eval.Status != structs.EvalStatusPending {
t.Fatalf("Status incorrect on type %v: %#v", schedType, eval)
}
if eval.Priority != expPriority {
t.Fatalf("Priority incorrect on type %v: %#v", schedType, eval)
}
if eval.JobID != expJobID {
t.Fatalf("JobID incorrect on type %v: %#v", schedType, eval)
}
}
}

Expand Down
9 changes: 9 additions & 0 deletions nomad/state/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,15 @@ func jobTableSchema() *memdb.TableSchema {
Lowercase: true,
},
},
"type": &memdb.IndexSchema{
Name: "type",
AllowMissing: false,
Unique: false,
Indexer: &memdb.StringFieldIndex{
Field: "Type",
Lowercase: false,
},
},
},
}
}
Expand Down
13 changes: 13 additions & 0 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,19 @@ func (s *StateStore) Jobs() (memdb.ResultIterator, error) {
return iter, nil
}

// JobsByScheduler returns an iterator over all the jobs with the specific
// scheduler type.
func (s *StateStore) JobsByScheduler(schedulerType string) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)

// Return an iterator for jobs with the specific type.
iter, err := txn.Get("jobs", "type", schedulerType)
if err != nil {
return nil, err
}
return iter, nil
}

// UpsertEvaluation is used to upsert an evaluation
func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) error {
txn := s.db.Txn(true)
Expand Down
67 changes: 67 additions & 0 deletions nomad/state/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,73 @@ func TestStateStore_Jobs(t *testing.T) {
}
}

func TestStateStore_JobsByScheduler(t *testing.T) {
state := testStateStore(t)
var serviceJobs []*structs.Job
var sysJobs []*structs.Job

for i := 0; i < 10; i++ {
job := mock.Job()
serviceJobs = append(serviceJobs, job)

err := state.UpsertJob(1000+uint64(i), job)
if err != nil {
t.Fatalf("err: %v", err)
}
}

for i := 0; i < 10; i++ {
job := mock.SystemJob()
sysJobs = append(sysJobs, job)

err := state.UpsertJob(2000+uint64(i), job)
if err != nil {
t.Fatalf("err: %v", err)
}
}

iter, err := state.JobsByScheduler("service")
if err != nil {
t.Fatalf("err: %v", err)
}

var outService []*structs.Job
for {
raw := iter.Next()
if raw == nil {
break
}
outService = append(outService, raw.(*structs.Job))
}

iter, err = state.JobsByScheduler("system")
if err != nil {
t.Fatalf("err: %v", err)
}

var outSystem []*structs.Job
for {
raw := iter.Next()
if raw == nil {
break
}
outSystem = append(outSystem, raw.(*structs.Job))
}

sort.Sort(JobIDSort(serviceJobs))
sort.Sort(JobIDSort(sysJobs))
sort.Sort(JobIDSort(outService))
sort.Sort(JobIDSort(outSystem))

if !reflect.DeepEqual(serviceJobs, outService) {
t.Fatalf("bad: %#v %#v", serviceJobs, outService)
}

if !reflect.DeepEqual(sysJobs, outSystem) {
t.Fatalf("bad: %#v %#v", sysJobs, outSystem)
}
}

func TestStateStore_RestoreJob(t *testing.T) {
state := testStateStore(t)

Expand Down

0 comments on commit 1e9416a

Please sign in to comment.