Skip to content

Commit

Permalink
code review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
chelseakomlo committed Mar 13, 2018
1 parent cf0d091 commit 2295446
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 47 deletions.
56 changes: 25 additions & 31 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,6 @@ const (
// allocSyncRetryIntv is the interval on which we retry updating
// the status of the allocation
allocSyncRetryIntv = 5 * time.Second

// nodeEventsEmitIntv is the interval at which node events are synced with
// the server
nodeEventsEmitIntv = 3 * time.Second
)

// ClientStatsReporter exposes all the APIs related to resource usage of a Nomad
Expand Down Expand Up @@ -1062,7 +1058,7 @@ func (c *Client) registerAndHeartbeat() {
go c.watchNodeUpdates()

// Start watching for emitting node events
go c.watchEmitEvents()
go c.watchNodeEvents()

// Setup the heartbeat timer, for the initial registration
// we want to do this quickly. We want to do it extra quickly
Expand Down Expand Up @@ -1147,7 +1143,7 @@ func (c *Client) run() {
// these kinds of events include when a driver moves from healthy to unhealthy
// (and vice versa)
func (c *Client) submitNodeEvents(events []*structs.NodeEvent) error {
nodeID := c.Node().ID
nodeID := c.NodeID()
nodeEvents := map[string][]*structs.NodeEvent{
nodeID: events,
}
Expand All @@ -1159,44 +1155,42 @@ func (c *Client) submitNodeEvents(events []*structs.NodeEvent) error {
if err := c.RPC("Node.EmitEvents", &req, &resp); err != nil {
return fmt.Errorf("Emitting node event failed: %v", err)
}
c.logger.Printf("[INFO] client: emit node events complete")
return nil
}

// watchEmitEvents is a handler which receives node events and on a interval and
// submits them in batch format to the server
func (c *Client) watchEmitEvents() {
batchEvents := make([]*structs.NodeEvent, 0)
// watchNodeEvents is a handler which receives node events and on a interval
// and submits them in batch format to the server
func (c *Client) watchNodeEvents() {
// batchEvents stores events that have yet to be published
var batchEvents []*structs.NodeEvent

timer := time.NewTimer(c.retryIntv(nodeEventsEmitIntv))
// Create and drain the timer
timer := time.NewTimer(0)
timer.Stop()
select {
case <-timer.C:
default:
}
defer timer.Stop()

for {
select {
case event := <-c.triggerEmitNodeEvent:
batchEvents = append(batchEvents, event)

case <-timer.C:
timer.Reset(c.retryIntv(nodeUpdateRetryIntv))

if len(batchEvents) == 0 {
// if we haven't received any events to emit, continue until the next
// time interval
continue
if l := len(batchEvents); l <= structs.MaxRetainedNodeEvents {
batchEvents = append(batchEvents, event)
} else {
// Drop the oldest event
c.logger.Printf("[WARN] client: dropping node event: %v", batchEvents[0])
batchEvents = append(batchEvents[1:], event)
}

err := c.submitNodeEvents(batchEvents)
if err != nil {
batchEvents = make([]*structs.NodeEvent, 0)
c.logger.Printf("[ERR] client: Failure in thie process of trying to submit node events: %v", err)
} else if len(batchEvents) >= structs.MaxRetainedNodeEvents {
// Truncate list to under 10
batchEvents = make([]*structs.NodeEvent, 0)
timer.Reset(c.retryIntv(nodeUpdateRetryIntv))
case <-timer.C:
if err := c.submitNodeEvents(batchEvents); err != nil {
c.logger.Printf("[ERR] client: submitting node events failed: %v", err)
timer.Reset(c.retryIntv(nodeUpdateRetryIntv))
}

case <-c.shutdownCh:
return
default:
}
}
}
Expand Down
17 changes: 11 additions & 6 deletions command/node_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,9 +389,6 @@ func (c *NodeStatusCommand) formatNode(client *api.Client, node *api.Node) int {
}

func (c *NodeStatusCommand) outputNodeStatusEvents(node *api.Node) {
if !c.verbose {
return
}

c.Ui.Output(c.Colorize().Color("\n[bold]Node Events "))
c.outputNodeEvent(node.NodeEvents)
Expand All @@ -400,14 +397,22 @@ func (c *NodeStatusCommand) outputNodeStatusEvents(node *api.Node) {
func (c *NodeStatusCommand) outputNodeEvent(events []*api.NodeEvent) {
size := len(events)
nodeEvents := make([]string, size+1)
nodeEvents[0] = "Time|Subsystem|Message|Details"
if c.verbose {
nodeEvents[0] = "Time|Subsystem|Message|Details"
} else {
nodeEvents[0] = "Time|Subsystem|Message"
}

for i, event := range events {
timestamp := formatUnixNanoTime(event.Timestamp)
subsystem := event.Subsystem
msg := event.Message
details := formatEventDetails(event.Details)
nodeEvents[size-i] = fmt.Sprintf("%s|%s|%s|%s", timestamp, subsystem, msg, details)
if c.verbose {
details := formatEventDetails(event.Details)
nodeEvents[size-i] = fmt.Sprintf("%s|%s|%s|%s", timestamp, subsystem, msg, details)
} else {
nodeEvents[size-i] = fmt.Sprintf("%s|%s|%s", timestamp, subsystem, msg)
}
}
c.Ui.Output(formatList(nodeEvents))
}
Expand Down
6 changes: 6 additions & 0 deletions nomad/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ func (n *Node) EmitEvents(args *structs.EmitNodeEventsRequest, reply *structs.Em
}
defer metrics.MeasureSince([]string{"nomad", "client", "emit_event"}, time.Now())

if args.NodeEvents == nil {
err := fmt.Errorf("No event to add; node event map is nil")
n.srv.logger.Printf("[ERR] nomad.node AddNodeEventsType failed: %v", err)
return err
}

_, index, err := n.srv.raftApply(structs.AddNodeEventsType, args)

if err != nil {
Expand Down
5 changes: 2 additions & 3 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3722,8 +3722,7 @@ func (s *StateStore) upsertNodeEvents(index uint64, nodeEvents map[string][]*str
}

// Copy the existing node
copyNode := new(structs.Node)
*copyNode = *node
copyNode := node.Copy()

nodeEvents := node.NodeEvents

Expand All @@ -3733,7 +3732,7 @@ func (s *StateStore) upsertNodeEvents(index uint64, nodeEvents map[string][]*str

// keep node events pruned to below 10 simultaneously
if len(nodeEvents) >= structs.MaxRetainedNodeEvents {
delta := len(nodeEvents) - 10
delta := len(nodeEvents) - structs.MaxRetainedNodeEvents
nodeEvents = nodeEvents[delta+1:]
}
nodeEvents = append(nodeEvents, e)
Expand Down
21 changes: 14 additions & 7 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1188,6 +1188,15 @@ type NodeEvent struct {
ModifyIndex uint64
}

func (ne *NodeEvent) String() string {
var details string
for k, v := range ne.Details {
details = fmt.Sprintf("%s: %s", k, v)
}

return fmt.Sprintf("Message: %s, Subsystem: %s, Details: %s, Timestamp: %s", ne.Message, string(ne.Subsystem), details, ne.Timestamp)
}

// EmitNodeEventsRequest is a request to update the node events source
// with a new client-side event
type EmitNodeEventsRequest struct {
Expand Down Expand Up @@ -1221,16 +1230,14 @@ func (n *Node) Copy() *Node {
nn.Reserved = nn.Reserved.Copy()
nn.Links = helper.CopyMapStringString(nn.Links)
nn.Meta = helper.CopyMapStringString(nn.Meta)
nn.NodeEvents = copyNodeEvents(n)
nn.NodeEvents = copyNodeEvents(n.NodeEvents)
return nn
}

func copyNodeEvents(first *Node) []*NodeEvent {
nodeEvents := make([]*NodeEvent, 0)
for _, e := range first.NodeEvents {
nodeEvents = append(nodeEvents, e)
}
return nodeEvents
func copyNodeEvents(first []*NodeEvent) []*NodeEvent {
second := make([]*NodeEvent, len(first))
copy(second, first)
return second
}

// TerminalStatus returns if the current status is terminal and
Expand Down

0 comments on commit 2295446

Please sign in to comment.