diff --git a/client/client.go b/client/client.go index f414ed5c7255..57d70c69f0aa 100644 --- a/client/client.go +++ b/client/client.go @@ -1097,14 +1097,6 @@ func (c *Client) registerAndHeartbeat() { // if heartbeating fails, trigger Consul discovery c.triggerDiscovery() - - // trigger a node event to register that the heartbeat failed - nodeEvent := &structs.NodeEvent{ - Message: fmt.Sprintf("Client heartbeat failed at %s", intv), - Subsystem: "Heartbeat", - Timestamp: time.Now().Unix(), - } - c.triggerNodeEvent(nodeEvent) } } else { c.heartbeatLock.Lock() @@ -1154,28 +1146,26 @@ func (c *Client) run() { // submitNodeEvents is used to submit a client-side node event. Examples of // these kinds of events include when a driver moves from healthy to unhealthy // (and vice versa) -func (c *Client) submitNodeEvents(e []*structs.NodeEvent) error { - node := c.Node() +func (c *Client) submitNodeEvents(events []*structs.NodeEvent) error { + nodeID := c.Node().ID nodeEvents := map[string][]*structs.NodeEvent{ - node.ID: e, + nodeID: events, } - req := structs.EmitNodeEventRequest{ + req := structs.EmitNodeEventsRequest{ NodeEvents: nodeEvents, WriteRequest: structs.WriteRequest{Region: c.Region()}, } - var resp structs.EmitNodeEventResponse - if err := c.RPC("Node.EmitEvent", &req, &resp); err != nil { - c.logger.Printf("[ERR] client: emitting node events failed %v", err) - return err + var resp structs.EmitNodeEventsResponse + if err := c.RPC("Node.EmitEvents", &req, &resp); err != nil { + return fmt.Errorf("Emitting node event failed: %v", err) } c.logger.Printf("[INFO] client: emit node events complete") return nil } -// emitEvent is a handler which receives node events and on a interval and +// watchEmitEvents is a handler which receives node events and on a interval and // submits them in batch format to the server func (c *Client) watchEmitEvents() { - batchEventsLock := sync.Mutex{} batchEvents := make([]*structs.NodeEvent, 0) timer := time.NewTimer(c.retryIntv(nodeEventsEmitIntv)) @@ -1184,23 +1174,25 @@ func (c *Client) watchEmitEvents() { for { select { case event := <-c.triggerEmitNodeEvent: - batchEventsLock.Lock() batchEvents = append(batchEvents, event) - batchEventsLock.Unlock() case <-timer.C: timer.Reset(c.retryIntv(nodeUpdateRetryIntv)) - batchEventsLock.Lock() if len(batchEvents) == 0 { // if we haven't received any events to emit, continue until the next // time interval - batchEventsLock.Unlock() continue } - c.submitNodeEvents(batchEvents) - batchEventsLock.Unlock() + err := c.submitNodeEvents(batchEvents) + if err != nil { + batchEvents = make([]*structs.NodeEvent, 0) + c.logger.Printf("[ERR] client: Failure in thie process of trying to submit node events: %v", err) + } else if len(batchEvents) >= structs.MaxRetainedNodeEvents { + // Truncate list to under 10 + batchEvents = make([]*structs.NodeEvent, 0) + } case <-c.shutdownCh: return @@ -1209,7 +1201,7 @@ func (c *Client) watchEmitEvents() { } } -// emitEvent triggers a emit node event +// triggerNodeEvent triggers a emit node event func (c *Client) triggerNodeEvent(nodeEvent *structs.NodeEvent) { select { case c.triggerEmitNodeEvent <- nodeEvent: diff --git a/command/agent/node_endpoint_test.go b/command/agent/node_endpoint_test.go index 113fa5de8774..16ba008de6fc 100644 --- a/command/agent/node_endpoint_test.go +++ b/command/agent/node_endpoint_test.go @@ -396,7 +396,7 @@ func TestHTTP_NodeQuery(t *testing.T) { if n.ID != node.ID { t.Fatalf("bad: %#v", n) } - if len(n.NodeEvents) != 1 { + if len(n.NodeEvents) < 1 { t.Fatalf("Expected node registration event to be populated: %#v", n) } if n.NodeEvents[0].Message != "Node Registered" { diff --git a/command/node_status.go b/command/node_status.go index 054b8ed98d99..720fc4b9700a 100644 --- a/command/node_status.go +++ b/command/node_status.go @@ -389,6 +389,10 @@ func (c *NodeStatusCommand) formatNode(client *api.Client, node *api.Node) int { } func (c *NodeStatusCommand) outputNodeStatusEvents(node *api.Node) { + if !c.verbose { + return + } + c.Ui.Output(c.Colorize().Color("\n[bold]Node Events ")) c.outputNodeEvent(node.NodeEvents) } @@ -396,7 +400,7 @@ func (c *NodeStatusCommand) outputNodeStatusEvents(node *api.Node) { func (c *NodeStatusCommand) outputNodeEvent(events []*api.NodeEvent) { size := len(events) nodeEvents := make([]string, size+1) - nodeEvents[0] = "Timestamp|Subsystem|Message|Details" + nodeEvents[0] = "Time|Subsystem|Message|Details" for i, event := range events { timestamp := formatUnixNanoTime(event.Timestamp) diff --git a/nomad/fsm.go b/nomad/fsm.go index c2f1ef68297e..b8e440a500a2 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -236,7 +236,7 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} { return n.applyACLTokenBootstrap(buf[1:], log.Index) case structs.AutopilotRequestType: return n.applyAutopilotUpdate(buf[1:], log.Index) - case structs.AddNodeEventType: + case structs.AddNodeEventsType: return n.applyAddNodeEventType(buf[1:], log.Index) } @@ -633,28 +633,15 @@ func (n *nomadFSM) applyReconcileSummaries(buf []byte, index uint64) interface{} // applyAddNodeEventType applies a node event to the set of currently-available // events. func (n *nomadFSM) applyAddNodeEventType(buf []byte, index uint64) interface{} { - var req structs.EmitNodeEventRequest + var req structs.EmitNodeEventsRequest if err := structs.Decode(buf, &req); err != nil { - n.logger.Printf("[ERR] nomad.fsm: failed to decode EmitNodeEventREquest: %v", err) + n.logger.Printf("[ERR] nomad.fsm: failed to decode EmitNodeEventRequest: %v", err) return err } - for nodeID, nodeEvents := range req.NodeEvents { - ws := memdb.NewWatchSet() - node, err := n.state.NodeByID(ws, nodeID) - - if err != nil { - return fmt.Errorf("encountered error when looking up nodes by id to insert node event: %v", err) - } - - if node == nil { - return fmt.Errorf("unable to look up node by id %s to insert node event", nodeID) - } - - if err := n.state.AddNodeEvent(index, node, nodeEvents); err != nil { - n.logger.Printf("[ERR] nomad.fsm: EmitNodeEventRequest failed to add node event: %v", err) - return err - } + if err := n.state.AddNodeEvent(index, req.NodeEvents); err != nil { + n.logger.Printf("[ERR] nomad.fsm: EmitNodeEventRequest failed to add node event: %v", err) + return err } return nil diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 738a84429436..687a85fc969c 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -96,11 +96,11 @@ func TestFSM_ApplyNodeEvent(t *testing.T) { nodeEvents := []*structs.NodeEvent{nodeEvent} allEvents := map[string][]*structs.NodeEvent{node.ID: nodeEvents} - req := structs.EmitNodeEventRequest{ + req := structs.EmitNodeEventsRequest{ NodeEvents: allEvents, WriteRequest: structs.WriteRequest{Region: "global"}, } - buf, err := structs.Encode(structs.AddNodeEventType, req) + buf, err := structs.Encode(structs.AddNodeEventsType, req) require.Nil(err) // the response in this case will be an error diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index 40e6443f97bf..96e5e1d71c7e 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -52,16 +52,16 @@ type Node struct { updatesLock sync.Mutex } -func (n *Node) EmitEvent(args *structs.EmitNodeEventRequest, reply *structs.EmitNodeEventResponse) error { - if done, err := n.srv.forward("Node.EmitEvent", args, args, reply); done { +func (n *Node) EmitEvents(args *structs.EmitNodeEventsRequest, reply *structs.EmitNodeEventsResponse) error { + if done, err := n.srv.forward("Node.EmitEvents", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"nomad", "client", "emit_event"}, time.Now()) - _, index, err := n.srv.raftApply(structs.AddNodeEventType, args) + _, index, err := n.srv.raftApply(structs.AddNodeEventsType, args) if err != nil { - n.srv.logger.Printf("[ERR] nomad.node AddNodeEventType failed: %v", err) + n.srv.logger.Printf("[ERR] nomad.node AddNodeEventsType failed: %v", err) return err } diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index c140b0eb696e..70c27d1c1c35 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -78,7 +78,7 @@ func TestClientEndpoint_Register(t *testing.T) { }) } -func TestClientEndpoint_EmitEvent(t *testing.T) { +func TestClientEndpoint_EmitEvents(t *testing.T) { t.Parallel() require := require.New(t) @@ -100,13 +100,13 @@ func TestClientEndpoint_EmitEvent(t *testing.T) { } nodeEvents := map[string][]*structs.NodeEvent{node.ID: {nodeEvent}} - req := structs.EmitNodeEventRequest{ + req := structs.EmitNodeEventsRequest{ NodeEvents: nodeEvents, WriteRequest: structs.WriteRequest{Region: "global"}, } var resp structs.GenericResponse - err = msgpackrpc.CallWithCodec(codec, "Node.EmitEvent", &req, &resp) + err = msgpackrpc.CallWithCodec(codec, "Node.EmitEvents", &req, &resp) require.Nil(err) require.NotEqual(0, resp.Index) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index f4b0c6f2a742..b3ca77765ad3 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -538,7 +538,7 @@ func (s *StateStore) UpsertNode(index uint64, node *structs.Node) error { Timestamp: node.StatusUpdatedAt, } - node.NodeEvents = make([]*structs.NodeEvent, 0) + node.NodeEvents = make([]*structs.NodeEvent, 0, 1) node.NodeEvents = append(node.NodeEvents, nodeEvent) node.CreateIndex = index @@ -3695,11 +3695,11 @@ func (r *StateRestore) addEphemeralDiskToTaskGroups(job *structs.Job) { } // addNodeEvent is a function which wraps upsertNodeEvent -func (s *StateStore) AddNodeEvent(index uint64, node *structs.Node, events []*structs.NodeEvent) error { +func (s *StateStore) AddNodeEvent(index uint64, events map[string][]*structs.NodeEvent) error { txn := s.db.Txn(true) defer txn.Abort() - err := s.upsertNodeEvents(index, node, events, txn) + err := s.upsertNodeEvents(index, events, txn) txn.Commit() return err } @@ -3707,33 +3707,46 @@ func (s *StateStore) AddNodeEvent(index uint64, node *structs.Node, events []*st // upsertNodeEvent upserts a node event for a respective node. It also maintains // that only 10 node events are ever stored simultaneously, deleting older // events once this bound has been reached. -func (s *StateStore) upsertNodeEvents(index uint64, node *structs.Node, events []*structs.NodeEvent, txn *memdb.Txn) error { +func (s *StateStore) upsertNodeEvents(index uint64, nodeEvents map[string][]*structs.NodeEvent, txn *memdb.Txn) error { - // Copy the existing node - copyNode := new(structs.Node) - *copyNode = *node + for nodeID, events := range nodeEvents { + ws := memdb.NewWatchSet() + node, err := s.NodeByID(ws, nodeID) + + if err != nil { + return fmt.Errorf("encountered error when looking up nodes by id to insert node event: %v", err) + } - nodeEvents := node.NodeEvents + if node == nil { + return fmt.Errorf("unable to look up node by id %s to insert node event", nodeID) + } + + // Copy the existing node + copyNode := new(structs.Node) + *copyNode = *node + + nodeEvents := node.NodeEvents - for _, e := range events { - e.CreateIndex = index - e.ModifyIndex = index + for _, e := range events { + e.CreateIndex = index + e.ModifyIndex = index - // keep node events pruned to below 10 simultaneously - if len(nodeEvents) >= 10 { - delta := len(nodeEvents) - 10 - nodeEvents = nodeEvents[delta+1:] + // keep node events pruned to below 10 simultaneously + if len(nodeEvents) >= structs.MaxRetainedNodeEvents { + delta := len(nodeEvents) - 10 + nodeEvents = nodeEvents[delta+1:] + } + nodeEvents = append(nodeEvents, e) + copyNode.NodeEvents = nodeEvents } - nodeEvents = append(nodeEvents, e) - copyNode.NodeEvents = nodeEvents - } - // Insert the node - if err := txn.Insert("nodes", copyNode); err != nil { - return fmt.Errorf("node update failed: %v", err) - } - if err := txn.Insert("index", &IndexEntry{"nodes", index}); err != nil { - return fmt.Errorf("index update failed: %v", err) + // Insert the node + if err := txn.Insert("nodes", copyNode); err != nil { + return fmt.Errorf("node update failed: %v", err) + } + if err := txn.Insert("index", &IndexEntry{"nodes", index}); err != nil { + return fmt.Errorf("index update failed: %v", err) + } } return nil diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index def5b45facff..99e399ffdcbd 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -6493,7 +6493,10 @@ func TestStateStore_AddSingleNodeEvent(t *testing.T) { Subsystem: "Driver", Timestamp: time.Now().Unix(), } - err = state.AddNodeEvent(1001, node, []*structs.NodeEvent{nodeEvent}) + nodeEvents := map[string][]*structs.NodeEvent{ + node.ID: []*structs.NodeEvent{nodeEvent}, + } + err = state.AddNodeEvent(uint64(1001), nodeEvents) require.Nil(err) require.True(watchFired(ws)) @@ -6533,7 +6536,11 @@ func TestStateStore_NodeEvents_RetentionWindow(t *testing.T) { Subsystem: "Driver", Timestamp: time.Now().Unix(), } - err := state.AddNodeEvent(uint64(i), out, []*structs.NodeEvent{nodeEvent}) + + nodeEvents := map[string][]*structs.NodeEvent{ + out.ID: []*structs.NodeEvent{nodeEvent}, + } + err := state.AddNodeEvent(uint64(i), nodeEvents) require.Nil(err) require.True(watchFired(ws)) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 390ac15a24d5..5b727bb2aa6a 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -74,7 +74,7 @@ const ( ACLTokenDeleteRequestType ACLTokenBootstrapRequestType AutopilotRequestType - AddNodeEventType + AddNodeEventsType ) const ( @@ -123,6 +123,10 @@ const ( // the fraction. So 16 == 6.25% limit of jitter. This jitter is also // applied to RPCHoldTimeout. JitterFraction = 16 + + // MaxRetainedNodeEvents is the maximum number of node events that will be + // retained for a single node + MaxRetainedNodeEvents = 10 ) // Context defines the scope in which a search for Nomad object operates, and @@ -1154,7 +1158,8 @@ type Node struct { // updated StatusUpdatedAt int64 - // NodeEvents is the most recent set of events generated for the node + // NodeEvents is the most recent set of events generated for the node, + // retaining only MaxRetainedNodeEvents number at a time NodeEvents []*NodeEvent // Raft Indexes @@ -1183,9 +1188,9 @@ type NodeEvent struct { ModifyIndex uint64 } -// EmitNodeEventRequest is a request to update the node events source +// EmitNodeEventsRequest is a request to update the node events source // with a new client-side event -type EmitNodeEventRequest struct { +type EmitNodeEventsRequest struct { // NodeEvents are a map where the key is a node id, and value is a list of // events for that node NodeEvents map[string][]*NodeEvent @@ -1193,9 +1198,9 @@ type EmitNodeEventRequest struct { WriteRequest } -// EmitNodeEventResponse is a response to the client about the status of +// EmitNodeEventsResponse is a response to the client about the status of // the node event source update. -type EmitNodeEventResponse struct { +type EmitNodeEventsResponse struct { Index uint64 WriteMeta }