Skip to content

Commit

Permalink
RPC, FSM, state store for Node.EmitEvent
Browse files Browse the repository at this point in the history
add node event when registering a node for the first time
  • Loading branch information
chelseakomlo committed Mar 6, 2018
1 parent c9bcf56 commit cb27331
Show file tree
Hide file tree
Showing 8 changed files with 252 additions and 0 deletions.
18 changes: 18 additions & 0 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
return n.applyACLTokenBootstrap(buf[1:], log.Index)
case structs.AutopilotRequestType:
return n.applyAutopilotUpdate(buf[1:], log.Index)
case structs.AddNodeEventType:
return n.applyAddNodeEventType(buf[1:], log.Index)
}

// Check enterprise only message types.
Expand Down Expand Up @@ -628,6 +630,22 @@ func (n *nomadFSM) applyReconcileSummaries(buf []byte, index uint64) interface{}
return n.reconcileQueuedAllocations(index)
}

// applyAddNodeEventType applies a node event to the set of currently-available
// events.
func (n *nomadFSM) applyAddNodeEventType(buf []byte, index uint64) interface{} {
var req structs.EmitNodeEventRequest
if err := structs.Decode(buf, &req); err != nil {
n.logger.Printf("[ERR] nomad.fsm: failed to decode EmitNodeEventREquest: %v", err)
return err
}

if err := n.state.AddNodeEvent(index, req.NodeID, req.NodeEvent); err != nil {
n.logger.Printf("[ERR] nomad.fsm: EmitNodeEventRequest failed to add node event: %v", err)
return err
}
return nil
}

// applyUpsertVaultAccessor stores the Vault accessors for a given allocation
// and task
func (n *nomadFSM) applyUpsertVaultAccessor(buf []byte, index uint64) interface{} {
Expand Down
41 changes: 41 additions & 0 deletions nomad/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,47 @@ func makeLog(buf []byte) *raft.Log {
}
}

func TestFSM_ApplyNodeEvent(t *testing.T) {
t.Parallel()
require := require.New(t)
fsm := testFSM(t)
state := fsm.State()

node := mock.Node()

err := state.UpsertNode(1000, node)
if err != nil {
t.Fatalf("err: %v", err)
}

nodeEvent := &structs.NodeEvent{
Message: "Registration failed",
Subsystem: "Server",
Timestamp: time.Now().Unix(),
}

req := structs.EmitNodeEventRequest{
NodeID: node.ID,
NodeEvent: nodeEvent,
}
buf, err := structs.Encode(structs.AddNodeEventType, req)
require.Nil(err)

// the response in this case will be an error
resp := fsm.Apply(makeLog(buf))
require.Nil(resp)

ws := memdb.NewWatchSet()
actualNode, err := state.NodeByID(ws, node.ID)
require.Nil(err)

require.Equal(1, len(actualNode.NodeEvents))

first := actualNode.NodeEvents[0]
require.Equal(uint64(1), first.CreateIndex)
require.Equal("Registration failed", first.Message)
}

func TestFSM_UpsertNode(t *testing.T) {
t.Parallel()
fsm := testFSM(t)
Expand Down
27 changes: 27 additions & 0 deletions nomad/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,23 @@ type Node struct {
updatesLock sync.Mutex
}

func (n *Node) EmitEvent(args *structs.EmitNodeEventRequest, reply *structs.EmitNodeEventResponse) error {
if done, err := n.srv.forward("Node.EmitEvent", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "client", "emit_event"}, time.Now())

_, index, err := n.srv.raftApply(structs.AddNodeEventType, args)

if err != nil {
n.srv.logger.Printf("[ERR] nomad.node AddNodeEventType failed: %+v", err)
return err
}

reply.Index = index
return nil
}

// Register is used to upsert a client that is available for scheduling
func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUpdateResponse) error {
if done, err := n.srv.forward("Node.Register", args, args, reply); done {
Expand Down Expand Up @@ -112,6 +129,16 @@ func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUp
if args.Node.SecretID != originalNode.SecretID && originalNode.SecretID != "" {
return fmt.Errorf("node secret ID does not match. Not registering node.")
}
} else {
// Because this is the first time the node is being registered, we should
// also create a node registration event
nodeEvent := &structs.NodeEvent{
Message: "Node Registered",
Subsystem: "Server",
Timestamp: time.Now().Unix(),
}
args.Node.NodeEvents = make([]*structs.NodeEvent, 0)
args.Node.NodeEvents = append(args.Node.NodeEvents, nodeEvent)
}

// We have a valid node connection, so add the mapping to cache the
Expand Down
27 changes: 27 additions & 0 deletions nomad/node_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,32 @@ func TestClientEndpoint_Register(t *testing.T) {
})
}

func TestClientEndpoint_EmitEvent(t *testing.T) {
t.Parallel()
require := require.New(t)

s1 := TestServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

nodeEvent := &structs.NodeEvent{
Message: "Registration failed",
Subsystem: "Server",
Timestamp: time.Now().Unix(),
}

req := structs.EmitNodeEventRequest{
NodeEvent: nodeEvent,
WriteRequest: structs.WriteRequest{Region: "global"},
}

var resp structs.GenericResponse
err := msgpackrpc.CallWithCodec(codec, "Node.EmitEvent", &req, &resp)
require.Nil(err)
require.NotEqual(0, resp.Index)
}

func TestClientEndpoint_Register_SecretMismatch(t *testing.T) {
t.Parallel()
s1 := TestServer(t, nil)
Expand Down Expand Up @@ -947,6 +973,7 @@ func TestClientEndpoint_GetNode(t *testing.T) {
// Update the status updated at value
node.StatusUpdatedAt = resp2.Node.StatusUpdatedAt
node.SecretID = ""
node.NodeEvents = resp2.Node.NodeEvents
if !reflect.DeepEqual(node, resp2.Node) {
t.Fatalf("bad: %#v \n %#v", node, resp2.Node)
}
Expand Down
47 changes: 47 additions & 0 deletions nomad/state/events_state_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package state

import (
"fmt"

memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad/structs"
)

// addNodeEvent is a function which wraps upsertNodeEvent
func (s *StateStore) AddNodeEvent(index uint64, nodeID string, event *structs.NodeEvent) error {
txn := s.db.Txn(true)
defer txn.Abort()

return s.upsertNodeEvent(index, nodeID, event, txn)
}

// upsertNodeEvent upserts a node event for a respective node. It also maintains
// that only 10 node events are ever stored simultaneously, deleting older
// events once this bound has been reached.
func (s *StateStore) upsertNodeEvent(index uint64, nodeID string, event *structs.NodeEvent, txn *memdb.Txn) error {

ws := memdb.NewWatchSet()
node, err := s.NodeByID(ws, nodeID)

if err != nil {
return fmt.Errorf("unable to look up nodes by id %+v", err)
}

if node == nil {
return fmt.Errorf("unable to look up nodes by id %s", nodeID)
}

event.CreateIndex = index

nodeEvents := node.NodeEvents

if len(nodeEvents) >= 10 {
delta := len(nodeEvents) - 10
nodeEvents = nodeEvents[delta+1:]
}
nodeEvents = append(nodeEvents, event)
node.NodeEvents = nodeEvents

txn.Commit()
return nil
}
69 changes: 69 additions & 0 deletions nomad/state/events_state_store_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package state

import (
"testing"
"time"

memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
)

func TestStateStore_AddSingleNodeEvent(t *testing.T) {
require := require.New(t)
state := testStateStore(t)

node := mock.Node()

err := state.UpsertNode(1000, node)
if err != nil {
t.Fatalf("err: %v", err)
}

nodeEvent := &structs.NodeEvent{
Message: "failed",
Subsystem: "Driver",
Timestamp: time.Now().Unix(),
}
err = state.AddNodeEvent(1001, node.ID, nodeEvent)
require.Nil(err)

ws := memdb.NewWatchSet()
actualNode, err := state.NodeByID(ws, node.ID)
require.Nil(err)
require.Equal(1, len(actualNode.NodeEvents))
require.Equal(nodeEvent, actualNode.NodeEvents[0])
}

// To prevent stale node events from accumulating, we limit the number of
// stored node events to 10.
func TestStateStore_NodeEvents_RetentionWindow(t *testing.T) {
require := require.New(t)
state := testStateStore(t)

node := mock.Node()

err := state.UpsertNode(1000, node)
if err != nil {
t.Fatalf("err: %v", err)
}

for i := 1; i <= 20; i++ {
nodeEvent := &structs.NodeEvent{
Message: "failed",
Subsystem: "Driver",
Timestamp: time.Now().Unix(),
}
err := state.AddNodeEvent(uint64(i), node.ID, nodeEvent)
require.Nil(err)
}

ws := memdb.NewWatchSet()
actualNode, err := state.NodeByID(ws, node.ID)
require.Nil(err)

require.Equal(10, len(actualNode.NodeEvents))
require.Equal(uint64(11), actualNode.NodeEvents[0].CreateIndex)
require.Equal(uint64(20), actualNode.NodeEvents[len(actualNode.NodeEvents)-1].CreateIndex)
}
19 changes: 19 additions & 0 deletions nomad/structs/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,31 @@ const (
Drain Subsystem = "Drain"
Driver Subsystem = "Driver"
Heartbeating Subsystem = "Heartbeating"
Server Subsystem = "Server"
)

// NodeEvent is a single unit representing a node’s state change
type NodeEvent struct {
NodeID string
Message string
Subsystem
Details map[string]string
Timestamp int64

CreateIndex uint64
}

// EmitNodeEventRequest is a client request to update the node events source
// with a new client-side event
type EmitNodeEventRequest struct {
NodeID string
NodeEvent *NodeEvent
WriteRequest
}

// EmitNodeEventResponse is a server response to the client about the status of
// the node event source update.
type EmitNodeEventResponse struct {
Index uint64
WriteRequest
}
4 changes: 4 additions & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ const (
ACLTokenDeleteRequestType
ACLTokenBootstrapRequestType
AutopilotRequestType
AddNodeEventType
)

const (
Expand Down Expand Up @@ -1153,6 +1154,9 @@ type Node struct {
// updated
StatusUpdatedAt int64

// NodeEvents is a list of the last 10 or lest events for this node
NodeEvents []*NodeEvent

// Raft Indexes
CreateIndex uint64
ModifyIndex uint64
Expand Down

0 comments on commit cb27331

Please sign in to comment.