Skip to content

Commit

Permalink
Merge pull request #1456 from hashicorp/b-system-job
Browse files Browse the repository at this point in the history
Node Register handles transistioning to ready and creating evals
  • Loading branch information
dadgar committed Jul 25, 2016
2 parents e8356bc + 82ee68a commit cb0ff81
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 6 deletions.
4 changes: 3 additions & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1100,7 +1100,9 @@ func (c *Client) watchAllocations(updates chan *allocUpdates) {
// watchNodeUpdates periodically checks for changes to the node attributes or meta map
func (c *Client) watchNodeUpdates() {
c.logger.Printf("[DEBUG] client: periodically checking for node changes at duration %v", nodeUpdateRetryIntv)
var attrHash, metaHash uint64

// Initialize the hashes
_, attrHash, metaHash := c.hasNodeChanged(0, 0)
var changed bool
for {
select {
Expand Down
31 changes: 26 additions & 5 deletions nomad/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,16 @@ func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUp
return fmt.Errorf("failed to computed node class: %v", err)
}

// Look for the node so we can detect a state transistion
snap, err := n.srv.fsm.State().Snapshot()
if err != nil {
return err
}
originalNode, err := snap.NodeByID(args.Node.ID)
if err != nil {
return err
}

// Commit this update via Raft
_, index, err := n.srv.raftApply(structs.NodeRegisterRequestType, args)
if err != nil {
Expand All @@ -83,7 +93,12 @@ func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUp
reply.NodeModifyIndex = index

// Check if we should trigger evaluations
if structs.ShouldDrainNode(args.Node.Status) {
originalStatus := structs.NodeStatusInit
if originalNode != nil {
originalStatus = originalNode.Status
}
transitionToReady := transitionedToReady(args.Node.Status, originalStatus)
if structs.ShouldDrainNode(args.Node.Status) || transitionToReady {
evalIDs, evalIndex, err := n.createNodeEvals(args.Node.ID, index)
if err != nil {
n.srv.logger.Printf("[ERR] nomad.client: eval creation failed: %v", err)
Expand All @@ -105,7 +120,7 @@ func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUp

// Set the reply index
reply.Index = index
snap, err := n.srv.fsm.State().Snapshot()
snap, err = n.srv.fsm.State().Snapshot()
if err != nil {
return err
}
Expand Down Expand Up @@ -236,9 +251,7 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct
}

// Check if we should trigger evaluations
initToReady := node.Status == structs.NodeStatusInit && args.Status == structs.NodeStatusReady
terminalToReady := node.Status == structs.NodeStatusDown && args.Status == structs.NodeStatusReady
transitionToReady := initToReady || terminalToReady
transitionToReady := transitionedToReady(args.Status, node.Status)
if structs.ShouldDrainNode(args.Status) || transitionToReady {
evalIDs, evalIndex, err := n.createNodeEvals(args.NodeID, index)
if err != nil {
Expand Down Expand Up @@ -271,6 +284,14 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct
return nil
}

// transitionedToReady is a helper that takes a nodes new and old status and
// returns whether it has transistioned to ready.
func transitionedToReady(newStatus, oldStatus string) bool {
initToReady := oldStatus == structs.NodeStatusInit && newStatus == structs.NodeStatusReady
terminalToReady := oldStatus == structs.NodeStatusDown && newStatus == structs.NodeStatusReady
return initToReady || terminalToReady
}

// UpdateDrain is used to update the drain mode of a client node
func (n *Node) UpdateDrain(args *structs.NodeUpdateDrainRequest,
reply *structs.NodeDrainUpdateResponse) error {
Expand Down
92 changes: 92 additions & 0 deletions nomad/node_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,98 @@ func TestClientEndpoint_UpdateStatus(t *testing.T) {
}
}

func TestClientEndpoint_Register_GetEvals(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

// Register a system job.
job := mock.SystemJob()
state := s1.fsm.State()
if err := state.UpsertJob(1, job); err != nil {
t.Fatalf("err: %v", err)
}

// Create the register request going directly to ready
node := mock.Node()
node.Status = structs.NodeStatusReady
reg := &structs.NodeRegisterRequest{
Node: node,
WriteRequest: structs.WriteRequest{Region: "global"},
}

// Fetch the response
var resp structs.NodeUpdateResponse
if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil {
t.Fatalf("err: %v", err)
}

// Check for heartbeat interval
ttl := resp.HeartbeatTTL
if ttl < s1.config.MinHeartbeatTTL || ttl > 2*s1.config.MinHeartbeatTTL {
t.Fatalf("bad: %#v", ttl)
}

// Check for an eval caused by the system job.
if len(resp.EvalIDs) != 1 {
t.Fatalf("expected one eval; got %#v", resp.EvalIDs)
}

evalID := resp.EvalIDs[0]
eval, err := state.EvalByID(evalID)
if err != nil {
t.Fatalf("could not get eval %v", evalID)
}

if eval.Type != "system" {
t.Fatalf("unexpected eval type; got %v; want %q", eval.Type, "system")
}

// Check for the node in the FSM
out, err := state.NodeByID(node.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("expected node")
}
if out.ModifyIndex != resp.Index {
t.Fatalf("index mis-match")
}

// Transistion it to down and then ready
node.Status = structs.NodeStatusDown
reg = &structs.NodeRegisterRequest{
Node: node,
WriteRequest: structs.WriteRequest{Region: "global"},
}

// Fetch the response
if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil {
t.Fatalf("err: %v", err)
}

if len(resp.EvalIDs) != 1 {
t.Fatalf("expected one eval; got %#v", resp.EvalIDs)
}

node.Status = structs.NodeStatusReady
reg = &structs.NodeRegisterRequest{
Node: node,
WriteRequest: structs.WriteRequest{Region: "global"},
}

// Fetch the response
if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil {
t.Fatalf("err: %v", err)
}

if len(resp.EvalIDs) != 1 {
t.Fatalf("expected one eval; got %#v", resp.EvalIDs)
}
}

func TestClientEndpoint_UpdateStatus_GetEvals(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
Expand Down

0 comments on commit cb0ff81

Please sign in to comment.