Skip to content

Commit

Permalink
code review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
chelseakomlo committed Mar 12, 2018
1 parent d68d90e commit afe6637
Show file tree
Hide file tree
Showing 10 changed files with 95 additions and 87 deletions.
42 changes: 17 additions & 25 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1097,14 +1097,6 @@ func (c *Client) registerAndHeartbeat() {

// if heartbeating fails, trigger Consul discovery
c.triggerDiscovery()

// trigger a node event to register that the heartbeat failed
nodeEvent := &structs.NodeEvent{
Message: fmt.Sprintf("Client heartbeat failed at %s", intv),
Subsystem: "Heartbeat",
Timestamp: time.Now().Unix(),
}
c.triggerNodeEvent(nodeEvent)
}
} else {
c.heartbeatLock.Lock()
Expand Down Expand Up @@ -1154,28 +1146,26 @@ func (c *Client) run() {
// submitNodeEvents is used to submit a client-side node event. Examples of
// these kinds of events include when a driver moves from healthy to unhealthy
// (and vice versa)
func (c *Client) submitNodeEvents(e []*structs.NodeEvent) error {
node := c.Node()
func (c *Client) submitNodeEvents(events []*structs.NodeEvent) error {
nodeID := c.Node().ID
nodeEvents := map[string][]*structs.NodeEvent{
node.ID: e,
nodeID: events,
}
req := structs.EmitNodeEventRequest{
req := structs.EmitNodeEventsRequest{
NodeEvents: nodeEvents,
WriteRequest: structs.WriteRequest{Region: c.Region()},
}
var resp structs.EmitNodeEventResponse
if err := c.RPC("Node.EmitEvent", &req, &resp); err != nil {
c.logger.Printf("[ERR] client: emitting node events failed %v", err)
return err
var resp structs.EmitNodeEventsResponse
if err := c.RPC("Node.EmitEvents", &req, &resp); err != nil {
return fmt.Errorf("Emitting node event failed: %v", err)
}
c.logger.Printf("[INFO] client: emit node events complete")
return nil
}

// emitEvent is a handler which receives node events and on a interval and
// watchEmitEvents is a handler which receives node events and on a interval and
// submits them in batch format to the server
func (c *Client) watchEmitEvents() {
batchEventsLock := sync.Mutex{}
batchEvents := make([]*structs.NodeEvent, 0)

timer := time.NewTimer(c.retryIntv(nodeEventsEmitIntv))
Expand All @@ -1184,23 +1174,25 @@ func (c *Client) watchEmitEvents() {
for {
select {
case event := <-c.triggerEmitNodeEvent:
batchEventsLock.Lock()
batchEvents = append(batchEvents, event)
batchEventsLock.Unlock()

case <-timer.C:
timer.Reset(c.retryIntv(nodeUpdateRetryIntv))

batchEventsLock.Lock()
if len(batchEvents) == 0 {
// if we haven't received any events to emit, continue until the next
// time interval
batchEventsLock.Unlock()
continue
}

c.submitNodeEvents(batchEvents)
batchEventsLock.Unlock()
err := c.submitNodeEvents(batchEvents)
if err != nil {
batchEvents = make([]*structs.NodeEvent, 0)
c.logger.Printf("[ERR] client: Failure in thie process of trying to submit node events: %v", err)
} else if len(batchEvents) >= structs.MaxRetainedNodeEvents {
// Truncate list to under 10
batchEvents = make([]*structs.NodeEvent, 0)
}

case <-c.shutdownCh:
return
Expand All @@ -1209,7 +1201,7 @@ func (c *Client) watchEmitEvents() {
}
}

// emitEvent triggers a emit node event
// triggerNodeEvent triggers a emit node event
func (c *Client) triggerNodeEvent(nodeEvent *structs.NodeEvent) {
select {
case c.triggerEmitNodeEvent <- nodeEvent:
Expand Down
2 changes: 1 addition & 1 deletion command/agent/node_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ func TestHTTP_NodeQuery(t *testing.T) {
if n.ID != node.ID {
t.Fatalf("bad: %#v", n)
}
if len(n.NodeEvents) != 1 {
if len(n.NodeEvents) < 1 {
t.Fatalf("Expected node registration event to be populated: %#v", n)
}
if n.NodeEvents[0].Message != "Node Registered" {
Expand Down
6 changes: 5 additions & 1 deletion command/node_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,14 +389,18 @@ func (c *NodeStatusCommand) formatNode(client *api.Client, node *api.Node) int {
}

func (c *NodeStatusCommand) outputNodeStatusEvents(node *api.Node) {
if !c.verbose {
return
}

c.Ui.Output(c.Colorize().Color("\n[bold]Node Events "))
c.outputNodeEvent(node.NodeEvents)
}

func (c *NodeStatusCommand) outputNodeEvent(events []*api.NodeEvent) {
size := len(events)
nodeEvents := make([]string, size+1)
nodeEvents[0] = "Timestamp|Subsystem|Message|Details"
nodeEvents[0] = "Time|Subsystem|Message|Details"

for i, event := range events {
timestamp := formatUnixNanoTime(event.Timestamp)
Expand Down
25 changes: 6 additions & 19 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
return n.applyACLTokenBootstrap(buf[1:], log.Index)
case structs.AutopilotRequestType:
return n.applyAutopilotUpdate(buf[1:], log.Index)
case structs.AddNodeEventType:
case structs.AddNodeEventsType:
return n.applyAddNodeEventType(buf[1:], log.Index)
}

Expand Down Expand Up @@ -633,28 +633,15 @@ func (n *nomadFSM) applyReconcileSummaries(buf []byte, index uint64) interface{}
// applyAddNodeEventType applies a node event to the set of currently-available
// events.
func (n *nomadFSM) applyAddNodeEventType(buf []byte, index uint64) interface{} {
var req structs.EmitNodeEventRequest
var req structs.EmitNodeEventsRequest
if err := structs.Decode(buf, &req); err != nil {
n.logger.Printf("[ERR] nomad.fsm: failed to decode EmitNodeEventREquest: %v", err)
n.logger.Printf("[ERR] nomad.fsm: failed to decode EmitNodeEventRequest: %v", err)
return err
}

for nodeID, nodeEvents := range req.NodeEvents {
ws := memdb.NewWatchSet()
node, err := n.state.NodeByID(ws, nodeID)

if err != nil {
return fmt.Errorf("encountered error when looking up nodes by id to insert node event: %v", err)
}

if node == nil {
return fmt.Errorf("unable to look up node by id %s to insert node event", nodeID)
}

if err := n.state.AddNodeEvent(index, node, nodeEvents); err != nil {
n.logger.Printf("[ERR] nomad.fsm: EmitNodeEventRequest failed to add node event: %v", err)
return err
}
if err := n.state.AddNodeEvent(index, req.NodeEvents); err != nil {
n.logger.Printf("[ERR] nomad.fsm: EmitNodeEventRequest failed to add node event: %v", err)
return err
}

return nil
Expand Down
4 changes: 2 additions & 2 deletions nomad/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,11 @@ func TestFSM_ApplyNodeEvent(t *testing.T) {
nodeEvents := []*structs.NodeEvent{nodeEvent}
allEvents := map[string][]*structs.NodeEvent{node.ID: nodeEvents}

req := structs.EmitNodeEventRequest{
req := structs.EmitNodeEventsRequest{
NodeEvents: allEvents,
WriteRequest: structs.WriteRequest{Region: "global"},
}
buf, err := structs.Encode(structs.AddNodeEventType, req)
buf, err := structs.Encode(structs.AddNodeEventsType, req)
require.Nil(err)

// the response in this case will be an error
Expand Down
8 changes: 4 additions & 4 deletions nomad/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,16 @@ type Node struct {
updatesLock sync.Mutex
}

func (n *Node) EmitEvent(args *structs.EmitNodeEventRequest, reply *structs.EmitNodeEventResponse) error {
if done, err := n.srv.forward("Node.EmitEvent", args, args, reply); done {
func (n *Node) EmitEvents(args *structs.EmitNodeEventsRequest, reply *structs.EmitNodeEventsResponse) error {
if done, err := n.srv.forward("Node.EmitEvents", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "client", "emit_event"}, time.Now())

_, index, err := n.srv.raftApply(structs.AddNodeEventType, args)
_, index, err := n.srv.raftApply(structs.AddNodeEventsType, args)

if err != nil {
n.srv.logger.Printf("[ERR] nomad.node AddNodeEventType failed: %v", err)
n.srv.logger.Printf("[ERR] nomad.node AddNodeEventsType failed: %v", err)
return err
}

Expand Down
6 changes: 3 additions & 3 deletions nomad/node_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func TestClientEndpoint_Register(t *testing.T) {
})
}

func TestClientEndpoint_EmitEvent(t *testing.T) {
func TestClientEndpoint_EmitEvents(t *testing.T) {
t.Parallel()
require := require.New(t)

Expand All @@ -100,13 +100,13 @@ func TestClientEndpoint_EmitEvent(t *testing.T) {
}

nodeEvents := map[string][]*structs.NodeEvent{node.ID: {nodeEvent}}
req := structs.EmitNodeEventRequest{
req := structs.EmitNodeEventsRequest{
NodeEvents: nodeEvents,
WriteRequest: structs.WriteRequest{Region: "global"},
}

var resp structs.GenericResponse
err = msgpackrpc.CallWithCodec(codec, "Node.EmitEvent", &req, &resp)
err = msgpackrpc.CallWithCodec(codec, "Node.EmitEvents", &req, &resp)
require.Nil(err)
require.NotEqual(0, resp.Index)

Expand Down
61 changes: 37 additions & 24 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ func (s *StateStore) UpsertNode(index uint64, node *structs.Node) error {
Timestamp: node.StatusUpdatedAt,
}

node.NodeEvents = make([]*structs.NodeEvent, 0)
node.NodeEvents = make([]*structs.NodeEvent, 0, 1)
node.NodeEvents = append(node.NodeEvents, nodeEvent)

node.CreateIndex = index
Expand Down Expand Up @@ -3695,45 +3695,58 @@ func (r *StateRestore) addEphemeralDiskToTaskGroups(job *structs.Job) {
}

// addNodeEvent is a function which wraps upsertNodeEvent
func (s *StateStore) AddNodeEvent(index uint64, node *structs.Node, events []*structs.NodeEvent) error {
func (s *StateStore) AddNodeEvent(index uint64, events map[string][]*structs.NodeEvent) error {
txn := s.db.Txn(true)
defer txn.Abort()

err := s.upsertNodeEvents(index, node, events, txn)
err := s.upsertNodeEvents(index, events, txn)
txn.Commit()
return err
}

// upsertNodeEvent upserts a node event for a respective node. It also maintains
// that only 10 node events are ever stored simultaneously, deleting older
// events once this bound has been reached.
func (s *StateStore) upsertNodeEvents(index uint64, node *structs.Node, events []*structs.NodeEvent, txn *memdb.Txn) error {
func (s *StateStore) upsertNodeEvents(index uint64, nodeEvents map[string][]*structs.NodeEvent, txn *memdb.Txn) error {

// Copy the existing node
copyNode := new(structs.Node)
*copyNode = *node
for nodeID, events := range nodeEvents {
ws := memdb.NewWatchSet()
node, err := s.NodeByID(ws, nodeID)

if err != nil {
return fmt.Errorf("encountered error when looking up nodes by id to insert node event: %v", err)
}

nodeEvents := node.NodeEvents
if node == nil {
return fmt.Errorf("unable to look up node by id %s to insert node event", nodeID)
}

// Copy the existing node
copyNode := new(structs.Node)
*copyNode = *node

nodeEvents := node.NodeEvents

for _, e := range events {
e.CreateIndex = index
e.ModifyIndex = index
for _, e := range events {
e.CreateIndex = index
e.ModifyIndex = index

// keep node events pruned to below 10 simultaneously
if len(nodeEvents) >= 10 {
delta := len(nodeEvents) - 10
nodeEvents = nodeEvents[delta+1:]
// keep node events pruned to below 10 simultaneously
if len(nodeEvents) >= structs.MaxRetainedNodeEvents {
delta := len(nodeEvents) - 10
nodeEvents = nodeEvents[delta+1:]
}
nodeEvents = append(nodeEvents, e)
copyNode.NodeEvents = nodeEvents
}
nodeEvents = append(nodeEvents, e)
copyNode.NodeEvents = nodeEvents
}

// Insert the node
if err := txn.Insert("nodes", copyNode); err != nil {
return fmt.Errorf("node update failed: %v", err)
}
if err := txn.Insert("index", &IndexEntry{"nodes", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
// Insert the node
if err := txn.Insert("nodes", copyNode); err != nil {
return fmt.Errorf("node update failed: %v", err)
}
if err := txn.Insert("index", &IndexEntry{"nodes", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
}

return nil
Expand Down
11 changes: 9 additions & 2 deletions nomad/state/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6493,7 +6493,10 @@ func TestStateStore_AddSingleNodeEvent(t *testing.T) {
Subsystem: "Driver",
Timestamp: time.Now().Unix(),
}
err = state.AddNodeEvent(1001, node, []*structs.NodeEvent{nodeEvent})
nodeEvents := map[string][]*structs.NodeEvent{
node.ID: []*structs.NodeEvent{nodeEvent},
}
err = state.AddNodeEvent(uint64(1001), nodeEvents)
require.Nil(err)

require.True(watchFired(ws))
Expand Down Expand Up @@ -6533,7 +6536,11 @@ func TestStateStore_NodeEvents_RetentionWindow(t *testing.T) {
Subsystem: "Driver",
Timestamp: time.Now().Unix(),
}
err := state.AddNodeEvent(uint64(i), out, []*structs.NodeEvent{nodeEvent})

nodeEvents := map[string][]*structs.NodeEvent{
out.ID: []*structs.NodeEvent{nodeEvent},
}
err := state.AddNodeEvent(uint64(i), nodeEvents)
require.Nil(err)

require.True(watchFired(ws))
Expand Down
17 changes: 11 additions & 6 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ const (
ACLTokenDeleteRequestType
ACLTokenBootstrapRequestType
AutopilotRequestType
AddNodeEventType
AddNodeEventsType
)

const (
Expand Down Expand Up @@ -123,6 +123,10 @@ const (
// the fraction. So 16 == 6.25% limit of jitter. This jitter is also
// applied to RPCHoldTimeout.
JitterFraction = 16

// MaxRetainedNodeEvents is the maximum number of node events that will be
// retained for a single node
MaxRetainedNodeEvents = 10
)

// Context defines the scope in which a search for Nomad object operates, and
Expand Down Expand Up @@ -1154,7 +1158,8 @@ type Node struct {
// updated
StatusUpdatedAt int64

// NodeEvents is the most recent set of events generated for the node
// NodeEvents is the most recent set of events generated for the node,
// retaining only MaxRetainedNodeEvents number at a time
NodeEvents []*NodeEvent

// Raft Indexes
Expand Down Expand Up @@ -1183,19 +1188,19 @@ type NodeEvent struct {
ModifyIndex uint64
}

// EmitNodeEventRequest is a request to update the node events source
// EmitNodeEventsRequest is a request to update the node events source
// with a new client-side event
type EmitNodeEventRequest struct {
type EmitNodeEventsRequest struct {
// NodeEvents are a map where the key is a node id, and value is a list of
// events for that node
NodeEvents map[string][]*NodeEvent

WriteRequest
}

// EmitNodeEventResponse is a response to the client about the status of
// EmitNodeEventsResponse is a response to the client about the status of
// the node event source update.
type EmitNodeEventResponse struct {
type EmitNodeEventsResponse struct {
Index uint64
WriteMeta
}
Expand Down

0 comments on commit afe6637

Please sign in to comment.