diff --git a/client/client.go b/client/client.go index dd1baa851de3..04851e29a254 100644 --- a/client/client.go +++ b/client/client.go @@ -1100,7 +1100,9 @@ func (c *Client) watchAllocations(updates chan *allocUpdates) { // watchNodeUpdates periodically checks for changes to the node attributes or meta map func (c *Client) watchNodeUpdates() { c.logger.Printf("[DEBUG] client: periodically checking for node changes at duration %v", nodeUpdateRetryIntv) - var attrHash, metaHash uint64 + + // Initialize the hashes + _, attrHash, metaHash := c.hasNodeChanged(0, 0) var changed bool for { select { diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index 51e041a7550f..67d3d5c29a6c 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -74,6 +74,16 @@ func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUp return fmt.Errorf("failed to computed node class: %v", err) } + // Look for the node so we can detect a state transistion + snap, err := n.srv.fsm.State().Snapshot() + if err != nil { + return err + } + originalNode, err := snap.NodeByID(args.Node.ID) + if err != nil { + return err + } + // Commit this update via Raft _, index, err := n.srv.raftApply(structs.NodeRegisterRequestType, args) if err != nil { @@ -83,7 +93,12 @@ func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUp reply.NodeModifyIndex = index // Check if we should trigger evaluations - if structs.ShouldDrainNode(args.Node.Status) { + originalStatus := structs.NodeStatusInit + if originalNode != nil { + originalStatus = originalNode.Status + } + transitionToReady := transitionedToReady(args.Node.Status, originalStatus) + if structs.ShouldDrainNode(args.Node.Status) || transitionToReady { evalIDs, evalIndex, err := n.createNodeEvals(args.Node.ID, index) if err != nil { n.srv.logger.Printf("[ERR] nomad.client: eval creation failed: %v", err) @@ -105,7 +120,7 @@ func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUp // Set the reply index reply.Index = index - snap, err := n.srv.fsm.State().Snapshot() + snap, err = n.srv.fsm.State().Snapshot() if err != nil { return err } @@ -236,9 +251,7 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct } // Check if we should trigger evaluations - initToReady := node.Status == structs.NodeStatusInit && args.Status == structs.NodeStatusReady - terminalToReady := node.Status == structs.NodeStatusDown && args.Status == structs.NodeStatusReady - transitionToReady := initToReady || terminalToReady + transitionToReady := transitionedToReady(args.Status, node.Status) if structs.ShouldDrainNode(args.Status) || transitionToReady { evalIDs, evalIndex, err := n.createNodeEvals(args.NodeID, index) if err != nil { @@ -271,6 +284,14 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct return nil } +// transitionedToReady is a helper that takes a nodes new and old status and +// returns whether it has transistioned to ready. +func transitionedToReady(newStatus, oldStatus string) bool { + initToReady := oldStatus == structs.NodeStatusInit && newStatus == structs.NodeStatusReady + terminalToReady := oldStatus == structs.NodeStatusDown && newStatus == structs.NodeStatusReady + return initToReady || terminalToReady +} + // UpdateDrain is used to update the drain mode of a client node func (n *Node) UpdateDrain(args *structs.NodeUpdateDrainRequest, reply *structs.NodeDrainUpdateResponse) error { diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index e796b194c4c7..e8ad81a6aa83 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -153,6 +153,98 @@ func TestClientEndpoint_UpdateStatus(t *testing.T) { } } +func TestClientEndpoint_Register_GetEvals(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Register a system job. + job := mock.SystemJob() + state := s1.fsm.State() + if err := state.UpsertJob(1, job); err != nil { + t.Fatalf("err: %v", err) + } + + // Create the register request going directly to ready + node := mock.Node() + node.Status = structs.NodeStatusReady + reg := &structs.NodeRegisterRequest{ + Node: node, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + // Fetch the response + var resp structs.NodeUpdateResponse + if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil { + t.Fatalf("err: %v", err) + } + + // Check for heartbeat interval + ttl := resp.HeartbeatTTL + if ttl < s1.config.MinHeartbeatTTL || ttl > 2*s1.config.MinHeartbeatTTL { + t.Fatalf("bad: %#v", ttl) + } + + // Check for an eval caused by the system job. + if len(resp.EvalIDs) != 1 { + t.Fatalf("expected one eval; got %#v", resp.EvalIDs) + } + + evalID := resp.EvalIDs[0] + eval, err := state.EvalByID(evalID) + if err != nil { + t.Fatalf("could not get eval %v", evalID) + } + + if eval.Type != "system" { + t.Fatalf("unexpected eval type; got %v; want %q", eval.Type, "system") + } + + // Check for the node in the FSM + out, err := state.NodeByID(node.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if out == nil { + t.Fatalf("expected node") + } + if out.ModifyIndex != resp.Index { + t.Fatalf("index mis-match") + } + + // Transistion it to down and then ready + node.Status = structs.NodeStatusDown + reg = &structs.NodeRegisterRequest{ + Node: node, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + // Fetch the response + if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil { + t.Fatalf("err: %v", err) + } + + if len(resp.EvalIDs) != 1 { + t.Fatalf("expected one eval; got %#v", resp.EvalIDs) + } + + node.Status = structs.NodeStatusReady + reg = &structs.NodeRegisterRequest{ + Node: node, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + // Fetch the response + if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil { + t.Fatalf("err: %v", err) + } + + if len(resp.EvalIDs) != 1 { + t.Fatalf("expected one eval; got %#v", resp.EvalIDs) + } +} + func TestClientEndpoint_UpdateStatus_GetEvals(t *testing.T) { s1 := testServer(t, nil) defer s1.Shutdown()