Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Node Register handles transistioning to ready and creating evals #1456

Merged
merged 2 commits into from
Jul 25, 2016
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1100,7 +1100,9 @@ func (c *Client) watchAllocations(updates chan *allocUpdates) {
// watchNodeUpdates periodically checks for changes to the node attributes or meta map
func (c *Client) watchNodeUpdates() {
c.logger.Printf("[DEBUG] client: periodically checking for node changes at duration %v", nodeUpdateRetryIntv)
var attrHash, metaHash uint64

// Initialize the hashes
_, attrHash, metaHash := c.hasNodeChanged(0, 0)
var changed bool
for {
select {
Expand Down
31 changes: 26 additions & 5 deletions nomad/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,16 @@ func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUp
return fmt.Errorf("failed to computed node class: %v", err)
}

// Look for the node so we can detect a state transistion
snap, err := n.srv.fsm.State().Snapshot()
if err != nil {
return err
}
originalNode, err := snap.NodeByID(args.Node.ID)
if err != nil {
return err
}

// Commit this update via Raft
_, index, err := n.srv.raftApply(structs.NodeRegisterRequestType, args)
if err != nil {
Expand All @@ -83,7 +93,12 @@ func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUp
reply.NodeModifyIndex = index

// Check if we should trigger evaluations
if structs.ShouldDrainNode(args.Node.Status) {
originalStatus := structs.NodeStatusInit
if originalNode != nil {
originalStatus = originalNode.Status
}
transitionToReady := transitionedToReady(args.Node.Status, originalStatus)
if structs.ShouldDrainNode(args.Node.Status) || transitionToReady {
evalIDs, evalIndex, err := n.createNodeEvals(args.Node.ID, index)
if err != nil {
n.srv.logger.Printf("[ERR] nomad.client: eval creation failed: %v", err)
Expand All @@ -105,7 +120,7 @@ func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUp

// Set the reply index
reply.Index = index
snap, err := n.srv.fsm.State().Snapshot()
snap, err = n.srv.fsm.State().Snapshot()
if err != nil {
return err
}
Expand Down Expand Up @@ -236,9 +251,7 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct
}

// Check if we should trigger evaluations
initToReady := node.Status == structs.NodeStatusInit && args.Status == structs.NodeStatusReady
terminalToReady := node.Status == structs.NodeStatusDown && args.Status == structs.NodeStatusReady
transitionToReady := initToReady || terminalToReady
transitionToReady := transitionedToReady(args.Status, node.Status)
if structs.ShouldDrainNode(args.Status) || transitionToReady {
evalIDs, evalIndex, err := n.createNodeEvals(args.NodeID, index)
if err != nil {
Expand Down Expand Up @@ -271,6 +284,14 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct
return nil
}

// transitionedToReady is a helper that takes a nodes new and old status and
// returns whether it has transistioned to ready.
func transitionedToReady(newStatus, oldStatus string) bool {
initToReady := oldStatus == structs.NodeStatusInit && newStatus == structs.NodeStatusReady
terminalToReady := oldStatus == structs.NodeStatusDown && newStatus == structs.NodeStatusReady
return initToReady || terminalToReady
}

// UpdateDrain is used to update the drain mode of a client node
func (n *Node) UpdateDrain(args *structs.NodeUpdateDrainRequest,
reply *structs.NodeDrainUpdateResponse) error {
Expand Down
61 changes: 61 additions & 0 deletions nomad/node_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,67 @@ func TestClientEndpoint_UpdateStatus(t *testing.T) {
}
}

func TestClientEndpoint_Register_GetEvals(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

// Register a system job.
job := mock.SystemJob()
state := s1.fsm.State()
if err := state.UpsertJob(1, job); err != nil {
t.Fatalf("err: %v", err)
}

// Create the register request going directly to ready
node := mock.Node()
node.Status = structs.NodeStatusReady
reg := &structs.NodeRegisterRequest{
Node: node,
WriteRequest: structs.WriteRequest{Region: "global"},
}

// Fetch the response
var resp structs.NodeUpdateResponse
if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil {
t.Fatalf("err: %v", err)
}

// Check for heartbeat interval
ttl := resp.HeartbeatTTL
if ttl < s1.config.MinHeartbeatTTL || ttl > 2*s1.config.MinHeartbeatTTL {
t.Fatalf("bad: %#v", ttl)
}

// Check for an eval caused by the system job.
if len(resp.EvalIDs) != 1 {
t.Fatalf("expected one eval; got %#v", resp.EvalIDs)
}

evalID := resp.EvalIDs[0]
eval, err := state.EvalByID(evalID)
if err != nil {
t.Fatalf("could not get eval %v", evalID)
}

if eval.Type != "system" {
t.Fatalf("unexpected eval type; got %v; want %q", eval.Type, "system")
}

// Check for the node in the FSM
out, err := state.NodeByID(node.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("expected node")
}
if out.ModifyIndex != resp.Index {
t.Fatalf("index mis-match")
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the logic of transitionedToReady we should be triggering evals if the node transitions from down -> ready and init -> read.

Can we transition the node to down and then move it to Ready again and ensure that an eval is created?


func TestClientEndpoint_UpdateStatus_GetEvals(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
Expand Down