diff --git a/nomad/fsm.go b/nomad/fsm.go index dda83f49b4d1..83053a15da9c 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -1,6 +1,7 @@ package nomad import ( + "context" "fmt" "io" "reflect" @@ -195,9 +196,9 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} { switch msgType { case structs.NodeRegisterRequestType: - return n.applyUpsertNode(buf[1:], log.Index) + return n.applyUpsertNode(msgType, buf[1:], log.Index) case structs.NodeDeregisterRequestType: - return n.applyDeregisterNode(buf[1:], log.Index) + return n.applyDeregisterNode(msgType, buf[1:], log.Index) case structs.NodeUpdateStatusRequestType: return n.applyStatusUpdate(buf[1:], log.Index) case structs.NodeUpdateDrainRequestType: @@ -310,17 +311,19 @@ func (n *nomadFSM) applyClusterMetadata(buf []byte, index uint64) interface{} { return nil } -func (n *nomadFSM) applyUpsertNode(buf []byte, index uint64) interface{} { +func (n *nomadFSM) applyUpsertNode(reqType structs.MessageType, buf []byte, index uint64) interface{} { defer metrics.MeasureSince([]string{"nomad", "fsm", "register_node"}, time.Now()) var req structs.NodeRegisterRequest if err := structs.Decode(buf, &req); err != nil { panic(fmt.Errorf("failed to decode request: %v", err)) } + ctx := context.WithValue(context.Background(), state.CtxMsgType, reqType) + // Handle upgrade paths req.Node.Canonicalize() - if err := n.state.UpsertNode(index, req.Node); err != nil { + if err := n.state.UpsertNodeCtx(ctx, index, req.Node); err != nil { n.logger.Error("UpsertNode failed", "error", err) return err } @@ -334,14 +337,16 @@ func (n *nomadFSM) applyUpsertNode(buf []byte, index uint64) interface{} { return nil } -func (n *nomadFSM) applyDeregisterNode(buf []byte, index uint64) interface{} { +func (n *nomadFSM) applyDeregisterNode(reqType structs.MessageType, buf []byte, index uint64) interface{} { defer metrics.MeasureSince([]string{"nomad", "fsm", "deregister_node"}, time.Now()) var req structs.NodeDeregisterRequest if err := structs.Decode(buf, &req); err != nil { panic(fmt.Errorf("failed to decode request: %v", err)) } - if err := n.state.DeleteNode(index, []string{req.NodeID}); err != nil { + ctx := context.WithValue(context.Background(), state.CtxMsgType, reqType) + + if err := n.state.DeleteNodeCtx(ctx, index, []string{req.NodeID}); err != nil { n.logger.Error("DeleteNode failed", "error", err) return err } diff --git a/nomad/state/node_events.go b/nomad/state/node_events.go new file mode 100644 index 000000000000..b2889e3a3076 --- /dev/null +++ b/nomad/state/node_events.go @@ -0,0 +1,75 @@ +package state + +import ( + "fmt" + + "github.com/hashicorp/nomad/nomad/stream" + "github.com/hashicorp/nomad/nomad/structs" +) + +const ( + TopicNodeRegistration = "NodeRegistration" + TopicNodeDeregistration = "NodeDeregistration" +) + +type NodeRegistrationEvent struct { + Event *structs.NodeEvent + NodeStatus string +} + +type NodeDeregistrationEvent struct { + NodeID string +} + +// NodeRegisterEventFromChanges generates a NodeRegistrationEvent from a set +// of transaction changes. +func NodeRegisterEventFromChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) { + var events []stream.Event + for _, change := range changes.Changes { + switch change.Table { + case "nodes": + after, ok := change.After.(*structs.Node) + if !ok { + return nil, fmt.Errorf("transaction change was not a Node") + } + + event := stream.Event{ + Topic: TopicNodeRegistration, + Index: changes.Index, + Key: after.ID, + Payload: &NodeRegistrationEvent{ + Event: after.Events[len(after.Events)-1], + NodeStatus: after.Status, + }, + } + events = append(events, event) + } + } + return events, nil +} + +// NodeDeregisterEventFromChanges generates a NodeDeregistrationEvent from a set +// of transaction changes. +func NodeDeregisterEventFromChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) { + var events []stream.Event + for _, change := range changes.Changes { + switch change.Table { + case "nodes": + before, ok := change.Before.(*structs.Node) + if !ok { + return nil, fmt.Errorf("transaction change was not a Node") + } + + event := stream.Event{ + Topic: TopicNodeDeregistration, + Index: changes.Index, + Key: before.ID, + Payload: &NodeDeregistrationEvent{ + NodeID: before.ID, + }, + } + events = append(events, event) + } + } + return events, nil +} diff --git a/nomad/state/node_events_test.go b/nomad/state/node_events_test.go new file mode 100644 index 000000000000..10b9458a635a --- /dev/null +++ b/nomad/state/node_events_test.go @@ -0,0 +1,211 @@ +package state + +import ( + "testing" + + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/stream" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/stretchr/testify/require" +) + +func TestNodeRegisterEventFromChanges(t *testing.T) { + cases := []struct { + Name string + MsgType structs.MessageType + Setup func(s *StateStore, tx *txn) error + Mutate func(s *StateStore, tx *txn) error + WantEvents []stream.Event + WantErr bool + WantTopic string + }{ + { + MsgType: structs.NodeRegisterRequestType, + WantTopic: TopicNodeRegistration, + Name: "node registered", + Mutate: func(s *StateStore, tx *txn) error { + return upsertNodeTxn(tx, tx.Index, testNode()) + }, + WantEvents: []stream.Event{{ + Topic: TopicNodeRegistration, + Key: testNodeID(), + Index: 100, + Payload: &NodeRegistrationEvent{ + Event: &structs.NodeEvent{ + Message: "Node registered", + Subsystem: "Cluster", + }, + NodeStatus: structs.NodeStatusReady, + }, + }}, + WantErr: false, + }, + { + MsgType: structs.NodeRegisterRequestType, + WantTopic: TopicNodeRegistration, + Name: "node registered initializing", + Mutate: func(s *StateStore, tx *txn) error { + return upsertNodeTxn(tx, tx.Index, testNode(nodeNotReady)) + }, + WantEvents: []stream.Event{{ + Topic: TopicNodeRegistration, + Key: testNodeID(), + Index: 100, + Payload: &NodeRegistrationEvent{ + Event: &structs.NodeEvent{ + Message: "Node registered", + Subsystem: "Cluster", + }, + NodeStatus: structs.NodeStatusInit, + }, + }}, + WantErr: false, + }, + { + MsgType: structs.NodeDeregisterRequestType, + WantTopic: TopicNodeDeregistration, + Name: "node deregistered", + Setup: func(s *StateStore, tx *txn) error { + return upsertNodeTxn(tx, tx.Index, testNode()) + }, + Mutate: func(s *StateStore, tx *txn) error { + return deleteNodeTxn(tx, tx.Index, []string{testNodeID()}) + }, + WantEvents: []stream.Event{{ + Topic: TopicNodeDeregistration, + Key: testNodeID(), + Index: 100, + Payload: &NodeDeregistrationEvent{ + NodeID: testNodeID(), + }, + }}, + WantErr: false, + }, + { + MsgType: structs.NodeDeregisterRequestType, + WantTopic: TopicNodeDeregistration, + Name: "batch node deregistered", + Setup: func(s *StateStore, tx *txn) error { + require.NoError(t, upsertNodeTxn(tx, tx.Index, testNode())) + return upsertNodeTxn(tx, tx.Index, testNode(nodeIDTwo)) + }, + Mutate: func(s *StateStore, tx *txn) error { + return deleteNodeTxn(tx, tx.Index, []string{testNodeID(), testNodeIDTwo()}) + }, + WantEvents: []stream.Event{ + { + Topic: TopicNodeDeregistration, + Key: testNodeID(), + Index: 100, + Payload: &NodeDeregistrationEvent{ + NodeID: testNodeID(), + }, + }, + { + Topic: TopicNodeDeregistration, + Key: testNodeIDTwo(), + Index: 100, + Payload: &NodeDeregistrationEvent{ + NodeID: testNodeIDTwo(), + }, + }, + }, + WantErr: false, + }, + } + + for _, tc := range cases { + t.Run(tc.Name, func(t *testing.T) { + s := TestStateStoreCfg(t, TestStateStorePublisher(t)) + defer s.StopEventPublisher() + + if tc.Setup != nil { + // Bypass publish mechanism for setup + setupTx := s.db.WriteTxn(10) + require.NoError(t, tc.Setup(s, setupTx)) + setupTx.Txn.Commit() + } + + tx := s.db.WriteTxn(100) + require.NoError(t, tc.Mutate(s, tx)) + + changes := Changes{Changes: tx.Changes(), Index: 100, MsgType: tc.MsgType} + got, err := processDBChanges(tx, changes) + + if tc.WantErr { + require.Error(t, err) + return + } + require.NoError(t, err) + + require.Equal(t, len(tc.WantEvents), len(got)) + for idx, g := range got { + switch tc.MsgType { + case structs.NodeRegisterRequestType: + requireNodeRegistrationEventEqual(t, tc.WantEvents[idx], g) + case structs.NodeDeregisterRequestType: + requireNodeDeregistrationEventEqual(t, tc.WantEvents[idx], g) + } + } + }) + } +} + +func requireNodeRegistrationEventEqual(t *testing.T, want, got stream.Event) { + t.Helper() + + require.Equal(t, want.Index, got.Index) + require.Equal(t, want.Key, got.Key) + require.Equal(t, want.Topic, got.Topic) + + wantPayload := want.Payload.(*NodeRegistrationEvent) + gotPayload := got.Payload.(*NodeRegistrationEvent) + + // Check payload equality for the fields that we can easily control + require.Equal(t, wantPayload.NodeStatus, gotPayload.NodeStatus) + require.Equal(t, wantPayload.Event.Message, gotPayload.Event.Message) + require.Equal(t, wantPayload.Event.Subsystem, gotPayload.Event.Subsystem) +} + +func requireNodeDeregistrationEventEqual(t *testing.T, want, got stream.Event) { + t.Helper() + + require.Equal(t, want.Index, got.Index) + require.Equal(t, want.Key, got.Key) + require.Equal(t, want.Topic, got.Topic) + + wantPayload := want.Payload.(*NodeDeregistrationEvent) + gotPayload := got.Payload.(*NodeDeregistrationEvent) + + require.Equal(t, wantPayload, gotPayload) +} + +type nodeOpts func(n *structs.Node) + +func nodeNotReady(n *structs.Node) { + n.Status = structs.NodeStatusInit +} + +func nodeIDTwo(n *structs.Node) { + n.ID = testNodeIDTwo() +} + +func testNode(opts ...nodeOpts) *structs.Node { + n := mock.Node() + n.ID = testNodeID() + + n.SecretID = "ab9812d3-6a21-40d3-973d-d9d2174a23ee" + + for _, opt := range opts { + opt(n) + } + return n +} + +func testNodeID() string { + return "9d5741c1-3899-498a-98dd-eb3c05665863" +} + +func testNodeIDTwo() string { + return "694ff31d-8c59-4030-ac83-e15692560c8d" +} diff --git a/nomad/state/state_changes.go b/nomad/state/state_changes.go index a71056e0f9d0..f105f49b5f35 100644 --- a/nomad/state/state_changes.go +++ b/nomad/state/state_changes.go @@ -1,10 +1,16 @@ package state import ( + "context" "fmt" "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/nomad/stream" + "github.com/hashicorp/nomad/nomad/structs" +) + +const ( + CtxMsgType = "type" ) // ReadTxn is implemented by memdb.Txn to perform read operations. @@ -21,6 +27,7 @@ type Changes struct { // Index is the latest index at the time these changes were committed. Index uint64 Changes memdb.Changes + MsgType structs.MessageType } // changeTrackerDB is a thin wrapper around memdb.DB which enables TrackChanges on @@ -81,6 +88,18 @@ func (c *changeTrackerDB) WriteTxn(idx uint64) *txn { return t } +// WriteTxnCtx is identical to WriteTxn but takes a ctx used for event sourcing +func (c *changeTrackerDB) WriteTxnCtx(ctx context.Context, idx uint64) *txn { + t := &txn{ + ctx: ctx, + Txn: c.db.Txn(true), + Index: idx, + publish: c.publish, + } + t.Txn.TrackChanges() + return t +} + func (c *changeTrackerDB) publish(changes Changes) error { readOnlyTx := c.db.Txn(false) defer readOnlyTx.Abort() @@ -113,6 +132,9 @@ func (c *changeTrackerDB) WriteTxnRestore() *txn { // error. Any errors from the callback would be lost, which would result in a // missing change event, even though the state store had changed. type txn struct { + // ctx is used to hold message type information from an FSM request + ctx context.Context + *memdb.Txn // Index in raft where the write is occurring. The value is zero for a // read-only, or WriteTxnRestore transaction. @@ -136,6 +158,7 @@ func (tx *txn) Commit() error { changes := Changes{ Index: tx.Index, Changes: tx.Txn.Changes(), + MsgType: tx.MsgType(), } if err := tx.publish(changes); err != nil { return err @@ -146,7 +169,35 @@ func (tx *txn) Commit() error { return nil } +// MsgType returns a MessageType from the txn's context. +// If the context is empty or the value isn't set IgnoreUnknownTypeFlag will +// be returned to signal that the MsgType is unknown. +func (tx *txn) MsgType() structs.MessageType { + if tx.ctx == nil { + return structs.IgnoreUnknownTypeFlag + } + + raw := tx.ctx.Value(CtxMsgType) + if raw == nil { + return structs.IgnoreUnknownTypeFlag + } + + msgType, ok := raw.(structs.MessageType) + if !ok { + return structs.IgnoreUnknownTypeFlag + } + return msgType +} + func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) { - // TODO: add handlers here. + switch changes.MsgType { + case structs.IgnoreUnknownTypeFlag: + // unknown event type + return []stream.Event{}, nil + case structs.NodeRegisterRequestType: + return NodeRegisterEventFromChanges(tx, changes) + case structs.NodeDeregisterRequestType: + return NodeDeregisterEventFromChanges(tx, changes) + } return []stream.Event{}, nil } diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 32bf37396cd5..7616951974a5 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -206,10 +206,16 @@ func (s *StateStore) AbandonCh() <-chan struct{} { // Abandon is used to signal that the given state store has been abandoned. // Calling this more than one time will panic. func (s *StateStore) Abandon() { - s.stopEventPublisher() + s.StopEventPublisher() close(s.abandonCh) } +// StopStopEventPublisher calls the cancel func for the state stores event +// publisher. It should be called during server shutdown. +func (s *StateStore) StopEventPublisher() { + s.stopEventPublisher() +} + // QueryFn is the definition of a function that can be used to implement a basic // blocking query against the state store. type QueryFn func(memdb.WatchSet, *StateStore) (resp interface{}, index uint64, err error) @@ -740,6 +746,21 @@ func (s *StateStore) ScalingEventsByJob(ws memdb.WatchSet, namespace, jobID stri return nil, 0, nil } +// UpsertNodeCtx is used to register a node or update a node definition +// This is assumed to be triggered by the client, so we retain the value +// of drain/eligibility which is set by the scheduler. +func (s *StateStore) UpsertNodeCtx(ctx context.Context, index uint64, node *structs.Node) error { + txn := s.db.WriteTxnCtx(ctx, index) + defer txn.Abort() + + err := upsertNodeTxn(txn, index, node) + if err != nil { + return nil + } + txn.Commit() + return nil +} + // UpsertNode is used to register a node or update a node definition // This is assumed to be triggered by the client, so we retain the value // of drain/eligibility which is set by the scheduler. @@ -747,6 +768,15 @@ func (s *StateStore) UpsertNode(index uint64, node *structs.Node) error { txn := s.db.WriteTxn(index) defer txn.Abort() + err := upsertNodeTxn(txn, index, node) + if err != nil { + return nil + } + txn.Commit() + return nil +} + +func upsertNodeTxn(txn *txn, index uint64, node *structs.Node) error { // Check if the node already exists existing, err := txn.First("nodes", "id", node.ID) if err != nil { @@ -795,19 +825,40 @@ func (s *StateStore) UpsertNode(index uint64, node *structs.Node) error { return fmt.Errorf("csi plugin update failed: %v", err) } + return nil +} + +// DeleteNode deregisters a batch of nodes +func (s *StateStore) DeleteNodeCtx(ctx context.Context, index uint64, nodes []string) error { + txn := s.db.WriteTxnCtx(ctx, index) + defer txn.Abort() + + err := deleteNodeTxn(txn, index, nodes) + if err != nil { + return nil + } txn.Commit() return nil } // DeleteNode deregisters a batch of nodes func (s *StateStore) DeleteNode(index uint64, nodes []string) error { + txn := s.db.WriteTxn(index) + defer txn.Abort() + + err := deleteNodeTxn(txn, index, nodes) + if err != nil { + return nil + } + txn.Commit() + return nil +} + +func deleteNodeTxn(txn *txn, index uint64, nodes []string) error { if len(nodes) == 0 { return fmt.Errorf("node ids missing") } - txn := s.db.WriteTxn(index) - defer txn.Abort() - for _, nodeID := range nodes { existing, err := txn.First("nodes", "id", nodeID) if err != nil { @@ -832,7 +883,6 @@ func (s *StateStore) DeleteNode(index uint64, nodes []string) error { return fmt.Errorf("index update failed: %v", err) } - txn.Commit() return nil } diff --git a/nomad/state/testing.go b/nomad/state/testing.go index de069e4098b4..6aa9039f4d79 100644 --- a/nomad/state/testing.go +++ b/nomad/state/testing.go @@ -24,6 +24,25 @@ func TestStateStore(t testing.T) *StateStore { return state } +func TestStateStorePublisher(t testing.T) *StateStoreConfig { + return &StateStoreConfig{ + Logger: testlog.HCLogger(t), + Region: "global", + EnablePublisher: true, + } +} +func TestStateStoreCfg(t testing.T, cfg *StateStoreConfig) *StateStore { + state, err := NewStateStore(cfg) + if err != nil { + t.Fatalf("err: %v", err) + } + + if state == nil { + t.Fatalf("missing state") + } + return state +} + // CreateTestCSIPlugin is a helper that generates the node + fingerprint results necessary // to create a CSIPlugin by directly inserting into the state store. The plugin requires a // controller.