Skip to content

Commit

Permalink
Node Drain events and Node Events
Browse files Browse the repository at this point in the history
  • Loading branch information
drewbailey committed Sep 29, 2020
1 parent 7f6f1ad commit bbae0ec
Show file tree
Hide file tree
Showing 6 changed files with 272 additions and 18 deletions.
16 changes: 10 additions & 6 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
case structs.NodeUpdateStatusRequestType:
return n.applyStatusUpdate(buf[1:], log.Index)
case structs.NodeUpdateDrainRequestType:
return n.applyDrainUpdate(buf[1:], log.Index)
return n.applyDrainUpdate(msgType, buf[1:], log.Index)
case structs.JobRegisterRequestType:
return n.applyUpsertJob(buf[1:], log.Index)
case structs.JobDeregisterRequestType:
Expand Down Expand Up @@ -250,7 +250,7 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
case structs.AutopilotRequestType:
return n.applyAutopilotUpdate(buf[1:], log.Index)
case structs.UpsertNodeEventsType:
return n.applyUpsertNodeEvent(buf[1:], log.Index)
return n.applyUpsertNodeEvent(msgType, buf[1:], log.Index)
case structs.JobBatchDeregisterRequestType:
return n.applyBatchDeregisterJob(buf[1:], log.Index)
case structs.AllocUpdateDesiredTransitionRequestType:
Expand Down Expand Up @@ -402,13 +402,15 @@ func (n *nomadFSM) applyStatusUpdate(buf []byte, index uint64) interface{} {
return nil
}

func (n *nomadFSM) applyDrainUpdate(buf []byte, index uint64) interface{} {
func (n *nomadFSM) applyDrainUpdate(reqType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "node_drain_update"}, time.Now())
var req structs.NodeUpdateDrainRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}

ctx := context.WithValue(context.Background(), state.CtxMsgType, reqType)

// COMPAT Remove in version 0.10
// As part of Nomad 0.8 we have deprecated the drain boolean in favor of a
// drain strategy but we need to handle the upgrade path where the Raft log
Expand All @@ -423,7 +425,7 @@ func (n *nomadFSM) applyDrainUpdate(buf []byte, index uint64) interface{} {
}
}

if err := n.state.UpdateNodeDrain(index, req.NodeID, req.DrainStrategy, req.MarkEligible, req.UpdatedAt, req.NodeEvent); err != nil {
if err := n.state.UpdateNodeDrainCtx(ctx, index, req.NodeID, req.DrainStrategy, req.MarkEligible, req.UpdatedAt, req.NodeEvent); err != nil {
n.logger.Error("UpdateNodeDrain failed", "error", err)
return err
}
Expand Down Expand Up @@ -874,14 +876,16 @@ func (n *nomadFSM) applyReconcileSummaries(buf []byte, index uint64) interface{}
}

// applyUpsertNodeEvent tracks the given node events.
func (n *nomadFSM) applyUpsertNodeEvent(buf []byte, index uint64) interface{} {
func (n *nomadFSM) applyUpsertNodeEvent(msgType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "upsert_node_events"}, time.Now())
var req structs.EmitNodeEventsRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode EmitNodeEventsRequest: %v", err))
}

if err := n.state.UpsertNodeEvents(index, req.NodeEvents); err != nil {
ctx := context.WithValue(context.Background(), state.CtxMsgType, msgType)

if err := n.state.UpsertNodeEventsCtx(ctx, index, req.NodeEvents); err != nil {
n.logger.Error("failed to add node events", "error", err)
return err
}
Expand Down
101 changes: 99 additions & 2 deletions nomad/state/node_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import (
)

const (
TopicNodeRegistration = "NodeRegistration"
TopicNodeDeregistration = "NodeDeregistration"
TopicNodeRegistration stream.Topic = "NodeRegistration"
TopicNodeDeregistration stream.Topic = "NodeDeregistration"
TopicNodeDrain stream.Topic = "NodeDrain"
TopicNodeEvent stream.Topic = "NodeEvent"
)

type NodeRegistrationEvent struct {
Expand All @@ -21,6 +23,28 @@ type NodeDeregistrationEvent struct {
NodeID string
}

type NodeEvent struct {
Node *structs.Node
}

// NNodeDrainEvent is the Payload for a NodeDrain event. It contains
// information related to the Node being drained as well as high level
// information about the current allocations on the Node
type NodeDrainEvent struct {
Node *structs.Node
JobAllocs map[string]*JobDrainDetails
}

type NodeDrainAllocDetails struct {
ID string
Migrate *structs.MigrateStrategy
}

type JobDrainDetails struct {
Type string
AllocDetails map[string]NodeDrainAllocDetails
}

// NodeRegisterEventFromChanges generates a NodeRegistrationEvent from a set
// of transaction changes.
func NodeRegisterEventFromChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) {
Expand Down Expand Up @@ -73,3 +97,76 @@ func NodeDeregisterEventFromChanges(tx ReadTxn, changes Changes) ([]stream.Event
}
return events, nil
}

// NodeEventFromChanges generates a NodeDeregistrationEvent from a set
// of transaction changes.
func NodeEventFromChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) {
var events []stream.Event
for _, change := range changes.Changes {
switch change.Table {
case "nodes":
after, ok := change.After.(*structs.Node)
if !ok {
return nil, fmt.Errorf("transaction change was not a Node")
}

event := stream.Event{
Topic: TopicNodeEvent,
Index: changes.Index,
Key: after.ID,
Payload: &NodeEvent{
Node: after,
},
}
events = append(events, event)
}
}
return events, nil
}

func NodeDrainEventFromChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) {
var events []stream.Event
for _, change := range changes.Changes {
switch change.Table {
case "nodes":
after, ok := change.After.(*structs.Node)
if !ok {
return nil, fmt.Errorf("transaction change was not a Node")
}

// retrieve allocations currently on node
allocs, err := allocsByNodeTxn(tx, nil, after.ID)
if err != nil {
return nil, fmt.Errorf("retrieving allocations for node drain event: %w", err)
}

// build job/alloc details for node drain
jobAllocs := make(map[string]*JobDrainDetails)
for _, a := range allocs {
if _, ok := jobAllocs[a.Job.Name]; !ok {
jobAllocs[a.Job.Name] = &JobDrainDetails{
AllocDetails: make(map[string]NodeDrainAllocDetails),
Type: a.Job.Type,
}
}

jobAllocs[a.Job.Name].AllocDetails[a.ID] = NodeDrainAllocDetails{
Migrate: a.MigrateStrategy(),
ID: a.ID,
}
}

event := stream.Event{
Topic: TopicNodeDrain,
Index: changes.Index,
Key: after.ID,
Payload: &NodeDrainEvent{
Node: after,
JobAllocs: jobAllocs,
},
}
events = append(events, event)
}
}
return events, nil
}
129 changes: 119 additions & 10 deletions nomad/state/node_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,23 @@ package state

import (
"testing"
"time"

"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/stream"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
)

func TestNodeRegisterEventFromChanges(t *testing.T) {
func TestNodeEventsFromChanges(t *testing.T) {
cases := []struct {
Name string
MsgType structs.MessageType
Setup func(s *StateStore, tx *txn) error
Mutate func(s *StateStore, tx *txn) error
WantEvents []stream.Event
WantErr bool
WantTopic string
WantTopic stream.Topic
}{
{
MsgType: structs.NodeRegisterRequestType,
Expand Down Expand Up @@ -112,6 +113,56 @@ func TestNodeRegisterEventFromChanges(t *testing.T) {
},
WantErr: false,
},
{
MsgType: structs.UpsertNodeEventsType,
WantTopic: TopicNodeEvent,
Name: "batch node events upserted",
Setup: func(s *StateStore, tx *txn) error {
require.NoError(t, upsertNodeTxn(tx, tx.Index, testNode()))
return upsertNodeTxn(tx, tx.Index, testNode(nodeIDTwo))
},
Mutate: func(s *StateStore, tx *txn) error {
eventFn := func(id string) []*structs.NodeEvent {
return []*structs.NodeEvent{
{
Message: "test event one",
Subsystem: "Cluster",
Details: map[string]string{
"NodeID": id,
},
},
{
Message: "test event two",
Subsystem: "Cluster",
Details: map[string]string{
"NodeID": id,
},
},
}
}
require.NoError(t, s.upsertNodeEvents(tx.Index, testNodeID(), eventFn(testNodeID()), tx))
return s.upsertNodeEvents(tx.Index, testNodeIDTwo(), eventFn(testNodeIDTwo()), tx)
},
WantEvents: []stream.Event{
{
Topic: TopicNodeEvent,
Key: testNodeID(),
Index: 100,
Payload: &NodeEvent{
Node: testNode(),
},
},
{
Topic: TopicNodeEvent,
Key: testNodeIDTwo(),
Index: 100,
Payload: &NodeEvent{
Node: testNode(nodeIDTwo),
},
},
},
WantErr: false,
},
}

for _, tc := range cases {
Expand Down Expand Up @@ -140,24 +191,80 @@ func TestNodeRegisterEventFromChanges(t *testing.T) {

require.Equal(t, len(tc.WantEvents), len(got))
for idx, g := range got {
// assert equality of shared fields

want := tc.WantEvents[idx]
require.Equal(t, want.Index, g.Index)
require.Equal(t, want.Key, g.Key)
require.Equal(t, want.Topic, g.Topic)

switch tc.MsgType {
case structs.NodeRegisterRequestType:
requireNodeRegistrationEventEqual(t, tc.WantEvents[idx], g)
case structs.NodeDeregisterRequestType:
requireNodeDeregistrationEventEqual(t, tc.WantEvents[idx], g)
case structs.UpsertNodeEventsType:
requireNodeEventEqual(t, tc.WantEvents[idx], g)
default:
require.Fail(t, "unhandled message type")
}
}
})
}
}

func TestNodeDrainEventFromChanges(t *testing.T) {
t.Parallel()
s := TestStateStoreCfg(t, TestStateStorePublisher(t))
defer s.StopEventPublisher()

// setup
setupTx := s.db.WriteTxn(10)

node := mock.Node()
alloc1 := mock.Alloc()
alloc2 := mock.Alloc()
alloc1.NodeID = node.ID
alloc2.NodeID = node.ID

require.NoError(t, upsertNodeTxn(setupTx, 10, node))
require.NoError(t, s.upsertAllocsImpl(100, []*structs.Allocation{alloc1, alloc2}, setupTx))
setupTx.Txn.Commit()

// changes
tx := s.db.WriteTxn(100)

strat := &structs.DrainStrategy{
DrainSpec: structs.DrainSpec{
Deadline: 10 * time.Minute,
IgnoreSystemJobs: false,
},
StartedAt: time.Now(),
}
markEligible := false
updatedAt := time.Now()
event := &structs.NodeEvent{}

require.NoError(t, s.updateNodeDrainImpl(tx, 100, node.ID, strat, markEligible, updatedAt.UnixNano(), event))
changes := Changes{Changes: tx.Changes(), Index: 100, MsgType: structs.NodeUpdateDrainRequestType}
got, err := processDBChanges(tx, changes)
require.NoError(t, err)

require.Len(t, got, 1)

require.Equal(t, TopicNodeDrain, got[0].Topic)
require.Equal(t, uint64(100), got[0].Index)

nodeEvent, ok := got[0].Payload.(*NodeDrainEvent)
require.True(t, ok)

require.Equal(t, structs.NodeSchedulingIneligible, nodeEvent.Node.SchedulingEligibility)
require.Equal(t, strat, nodeEvent.Node.DrainStrategy)
}

func requireNodeRegistrationEventEqual(t *testing.T, want, got stream.Event) {
t.Helper()

require.Equal(t, want.Index, got.Index)
require.Equal(t, want.Key, got.Key)
require.Equal(t, want.Topic, got.Topic)

wantPayload := want.Payload.(*NodeRegistrationEvent)
gotPayload := got.Payload.(*NodeRegistrationEvent)

Expand All @@ -170,16 +277,18 @@ func requireNodeRegistrationEventEqual(t *testing.T, want, got stream.Event) {
func requireNodeDeregistrationEventEqual(t *testing.T, want, got stream.Event) {
t.Helper()

require.Equal(t, want.Index, got.Index)
require.Equal(t, want.Key, got.Key)
require.Equal(t, want.Topic, got.Topic)

wantPayload := want.Payload.(*NodeDeregistrationEvent)
gotPayload := got.Payload.(*NodeDeregistrationEvent)

require.Equal(t, wantPayload, gotPayload)
}

func requireNodeEventEqual(t *testing.T, want, got stream.Event) {
gotPayload := got.Payload.(*NodeEvent)

require.Len(t, gotPayload.Node.Events, 3)
}

type nodeOpts func(n *structs.Node)

func nodeNotReady(n *structs.Node) {
Expand Down
4 changes: 4 additions & 0 deletions nomad/state/state_changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,10 @@ func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) {
return NodeRegisterEventFromChanges(tx, changes)
case structs.NodeDeregisterRequestType:
return NodeDeregisterEventFromChanges(tx, changes)
case structs.NodeUpdateDrainRequestType:
return NodeDrainEventFromChanges(tx, changes)
case structs.UpsertNodeEventsType:
return NodeEventFromChanges(tx, changes)
}
return []stream.Event{}, nil
}
Loading

0 comments on commit bbae0ec

Please sign in to comment.