diff --git a/nomad/fsm.go b/nomad/fsm.go index e807a049f619..1dfb4cbbecda 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -206,7 +206,7 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} { case structs.NodeUpdateStatusRequestType: return n.applyStatusUpdate(buf[1:], log.Index) case structs.NodeUpdateDrainRequestType: - return n.applyDrainUpdate(buf[1:], log.Index) + return n.applyDrainUpdate(msgType, buf[1:], log.Index) case structs.JobRegisterRequestType: return n.applyUpsertJob(buf[1:], log.Index) case structs.JobDeregisterRequestType: @@ -250,7 +250,7 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} { case structs.AutopilotRequestType: return n.applyAutopilotUpdate(buf[1:], log.Index) case structs.UpsertNodeEventsType: - return n.applyUpsertNodeEvent(buf[1:], log.Index) + return n.applyUpsertNodeEvent(msgType, buf[1:], log.Index) case structs.JobBatchDeregisterRequestType: return n.applyBatchDeregisterJob(buf[1:], log.Index) case structs.AllocUpdateDesiredTransitionRequestType: @@ -402,13 +402,15 @@ func (n *nomadFSM) applyStatusUpdate(buf []byte, index uint64) interface{} { return nil } -func (n *nomadFSM) applyDrainUpdate(buf []byte, index uint64) interface{} { +func (n *nomadFSM) applyDrainUpdate(reqType structs.MessageType, buf []byte, index uint64) interface{} { defer metrics.MeasureSince([]string{"nomad", "fsm", "node_drain_update"}, time.Now()) var req structs.NodeUpdateDrainRequest if err := structs.Decode(buf, &req); err != nil { panic(fmt.Errorf("failed to decode request: %v", err)) } + ctx := context.WithValue(context.Background(), state.CtxMsgType, reqType) + // COMPAT Remove in version 0.10 // As part of Nomad 0.8 we have deprecated the drain boolean in favor of a // drain strategy but we need to handle the upgrade path where the Raft log @@ -423,7 +425,7 @@ func (n *nomadFSM) applyDrainUpdate(buf []byte, index uint64) interface{} { } } - if err := n.state.UpdateNodeDrain(index, req.NodeID, req.DrainStrategy, req.MarkEligible, req.UpdatedAt, req.NodeEvent); err != nil { + if err := n.state.UpdateNodeDrainCtx(ctx, index, req.NodeID, req.DrainStrategy, req.MarkEligible, req.UpdatedAt, req.NodeEvent); err != nil { n.logger.Error("UpdateNodeDrain failed", "error", err) return err } @@ -874,14 +876,16 @@ func (n *nomadFSM) applyReconcileSummaries(buf []byte, index uint64) interface{} } // applyUpsertNodeEvent tracks the given node events. -func (n *nomadFSM) applyUpsertNodeEvent(buf []byte, index uint64) interface{} { +func (n *nomadFSM) applyUpsertNodeEvent(msgType structs.MessageType, buf []byte, index uint64) interface{} { defer metrics.MeasureSince([]string{"nomad", "fsm", "upsert_node_events"}, time.Now()) var req structs.EmitNodeEventsRequest if err := structs.Decode(buf, &req); err != nil { panic(fmt.Errorf("failed to decode EmitNodeEventsRequest: %v", err)) } - if err := n.state.UpsertNodeEvents(index, req.NodeEvents); err != nil { + ctx := context.WithValue(context.Background(), state.CtxMsgType, msgType) + + if err := n.state.UpsertNodeEventsCtx(ctx, index, req.NodeEvents); err != nil { n.logger.Error("failed to add node events", "error", err) return err } diff --git a/nomad/state/node_events.go b/nomad/state/node_events.go index b2889e3a3076..f6b43858dcdd 100644 --- a/nomad/state/node_events.go +++ b/nomad/state/node_events.go @@ -8,8 +8,10 @@ import ( ) const ( - TopicNodeRegistration = "NodeRegistration" - TopicNodeDeregistration = "NodeDeregistration" + TopicNodeRegistration stream.Topic = "NodeRegistration" + TopicNodeDeregistration stream.Topic = "NodeDeregistration" + TopicNodeDrain stream.Topic = "NodeDrain" + TopicNodeEvent stream.Topic = "NodeEvent" ) type NodeRegistrationEvent struct { @@ -21,6 +23,28 @@ type NodeDeregistrationEvent struct { NodeID string } +type NodeEvent struct { + Node *structs.Node +} + +// NNodeDrainEvent is the Payload for a NodeDrain event. It contains +// information related to the Node being drained as well as high level +// information about the current allocations on the Node +type NodeDrainEvent struct { + Node *structs.Node + JobAllocs map[string]*JobDrainDetails +} + +type NodeDrainAllocDetails struct { + ID string + Migrate *structs.MigrateStrategy +} + +type JobDrainDetails struct { + Type string + AllocDetails map[string]NodeDrainAllocDetails +} + // NodeRegisterEventFromChanges generates a NodeRegistrationEvent from a set // of transaction changes. func NodeRegisterEventFromChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) { @@ -73,3 +97,76 @@ func NodeDeregisterEventFromChanges(tx ReadTxn, changes Changes) ([]stream.Event } return events, nil } + +// NodeEventFromChanges generates a NodeDeregistrationEvent from a set +// of transaction changes. +func NodeEventFromChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) { + var events []stream.Event + for _, change := range changes.Changes { + switch change.Table { + case "nodes": + after, ok := change.After.(*structs.Node) + if !ok { + return nil, fmt.Errorf("transaction change was not a Node") + } + + event := stream.Event{ + Topic: TopicNodeEvent, + Index: changes.Index, + Key: after.ID, + Payload: &NodeEvent{ + Node: after, + }, + } + events = append(events, event) + } + } + return events, nil +} + +func NodeDrainEventFromChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) { + var events []stream.Event + for _, change := range changes.Changes { + switch change.Table { + case "nodes": + after, ok := change.After.(*structs.Node) + if !ok { + return nil, fmt.Errorf("transaction change was not a Node") + } + + // retrieve allocations currently on node + allocs, err := allocsByNodeTxn(tx, nil, after.ID) + if err != nil { + return nil, fmt.Errorf("retrieving allocations for node drain event: %w", err) + } + + // build job/alloc details for node drain + jobAllocs := make(map[string]*JobDrainDetails) + for _, a := range allocs { + if _, ok := jobAllocs[a.Job.Name]; !ok { + jobAllocs[a.Job.Name] = &JobDrainDetails{ + AllocDetails: make(map[string]NodeDrainAllocDetails), + Type: a.Job.Type, + } + } + + jobAllocs[a.Job.Name].AllocDetails[a.ID] = NodeDrainAllocDetails{ + Migrate: a.MigrateStrategy(), + ID: a.ID, + } + } + + event := stream.Event{ + Topic: TopicNodeDrain, + Index: changes.Index, + Key: after.ID, + Payload: &NodeDrainEvent{ + Node: after, + JobAllocs: jobAllocs, + }, + } + events = append(events, event) + } + } + return events, nil +} diff --git a/nomad/state/node_events_test.go b/nomad/state/node_events_test.go index 10b9458a635a..2bc768640aae 100644 --- a/nomad/state/node_events_test.go +++ b/nomad/state/node_events_test.go @@ -2,6 +2,7 @@ package state import ( "testing" + "time" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/stream" @@ -9,7 +10,7 @@ import ( "github.com/stretchr/testify/require" ) -func TestNodeRegisterEventFromChanges(t *testing.T) { +func TestNodeEventsFromChanges(t *testing.T) { cases := []struct { Name string MsgType structs.MessageType @@ -17,7 +18,7 @@ func TestNodeRegisterEventFromChanges(t *testing.T) { Mutate func(s *StateStore, tx *txn) error WantEvents []stream.Event WantErr bool - WantTopic string + WantTopic stream.Topic }{ { MsgType: structs.NodeRegisterRequestType, @@ -112,6 +113,56 @@ func TestNodeRegisterEventFromChanges(t *testing.T) { }, WantErr: false, }, + { + MsgType: structs.UpsertNodeEventsType, + WantTopic: TopicNodeEvent, + Name: "batch node events upserted", + Setup: func(s *StateStore, tx *txn) error { + require.NoError(t, upsertNodeTxn(tx, tx.Index, testNode())) + return upsertNodeTxn(tx, tx.Index, testNode(nodeIDTwo)) + }, + Mutate: func(s *StateStore, tx *txn) error { + eventFn := func(id string) []*structs.NodeEvent { + return []*structs.NodeEvent{ + { + Message: "test event one", + Subsystem: "Cluster", + Details: map[string]string{ + "NodeID": id, + }, + }, + { + Message: "test event two", + Subsystem: "Cluster", + Details: map[string]string{ + "NodeID": id, + }, + }, + } + } + require.NoError(t, s.upsertNodeEvents(tx.Index, testNodeID(), eventFn(testNodeID()), tx)) + return s.upsertNodeEvents(tx.Index, testNodeIDTwo(), eventFn(testNodeIDTwo()), tx) + }, + WantEvents: []stream.Event{ + { + Topic: TopicNodeEvent, + Key: testNodeID(), + Index: 100, + Payload: &NodeEvent{ + Node: testNode(), + }, + }, + { + Topic: TopicNodeEvent, + Key: testNodeIDTwo(), + Index: 100, + Payload: &NodeEvent{ + Node: testNode(nodeIDTwo), + }, + }, + }, + WantErr: false, + }, } for _, tc := range cases { @@ -140,24 +191,80 @@ func TestNodeRegisterEventFromChanges(t *testing.T) { require.Equal(t, len(tc.WantEvents), len(got)) for idx, g := range got { + // assert equality of shared fields + + want := tc.WantEvents[idx] + require.Equal(t, want.Index, g.Index) + require.Equal(t, want.Key, g.Key) + require.Equal(t, want.Topic, g.Topic) + switch tc.MsgType { case structs.NodeRegisterRequestType: requireNodeRegistrationEventEqual(t, tc.WantEvents[idx], g) case structs.NodeDeregisterRequestType: requireNodeDeregistrationEventEqual(t, tc.WantEvents[idx], g) + case structs.UpsertNodeEventsType: + requireNodeEventEqual(t, tc.WantEvents[idx], g) + default: + require.Fail(t, "unhandled message type") } } }) } } +func TestNodeDrainEventFromChanges(t *testing.T) { + t.Parallel() + s := TestStateStoreCfg(t, TestStateStorePublisher(t)) + defer s.StopEventPublisher() + + // setup + setupTx := s.db.WriteTxn(10) + + node := mock.Node() + alloc1 := mock.Alloc() + alloc2 := mock.Alloc() + alloc1.NodeID = node.ID + alloc2.NodeID = node.ID + + require.NoError(t, upsertNodeTxn(setupTx, 10, node)) + require.NoError(t, s.upsertAllocsImpl(100, []*structs.Allocation{alloc1, alloc2}, setupTx)) + setupTx.Txn.Commit() + + // changes + tx := s.db.WriteTxn(100) + + strat := &structs.DrainStrategy{ + DrainSpec: structs.DrainSpec{ + Deadline: 10 * time.Minute, + IgnoreSystemJobs: false, + }, + StartedAt: time.Now(), + } + markEligible := false + updatedAt := time.Now() + event := &structs.NodeEvent{} + + require.NoError(t, s.updateNodeDrainImpl(tx, 100, node.ID, strat, markEligible, updatedAt.UnixNano(), event)) + changes := Changes{Changes: tx.Changes(), Index: 100, MsgType: structs.NodeUpdateDrainRequestType} + got, err := processDBChanges(tx, changes) + require.NoError(t, err) + + require.Len(t, got, 1) + + require.Equal(t, TopicNodeDrain, got[0].Topic) + require.Equal(t, uint64(100), got[0].Index) + + nodeEvent, ok := got[0].Payload.(*NodeDrainEvent) + require.True(t, ok) + + require.Equal(t, structs.NodeSchedulingIneligible, nodeEvent.Node.SchedulingEligibility) + require.Equal(t, strat, nodeEvent.Node.DrainStrategy) +} + func requireNodeRegistrationEventEqual(t *testing.T, want, got stream.Event) { t.Helper() - require.Equal(t, want.Index, got.Index) - require.Equal(t, want.Key, got.Key) - require.Equal(t, want.Topic, got.Topic) - wantPayload := want.Payload.(*NodeRegistrationEvent) gotPayload := got.Payload.(*NodeRegistrationEvent) @@ -170,16 +277,18 @@ func requireNodeRegistrationEventEqual(t *testing.T, want, got stream.Event) { func requireNodeDeregistrationEventEqual(t *testing.T, want, got stream.Event) { t.Helper() - require.Equal(t, want.Index, got.Index) - require.Equal(t, want.Key, got.Key) - require.Equal(t, want.Topic, got.Topic) - wantPayload := want.Payload.(*NodeDeregistrationEvent) gotPayload := got.Payload.(*NodeDeregistrationEvent) require.Equal(t, wantPayload, gotPayload) } +func requireNodeEventEqual(t *testing.T, want, got stream.Event) { + gotPayload := got.Payload.(*NodeEvent) + + require.Len(t, gotPayload.Node.Events, 3) +} + type nodeOpts func(n *structs.Node) func nodeNotReady(n *structs.Node) { diff --git a/nomad/state/state_changes.go b/nomad/state/state_changes.go index be80c53c6bb9..e9a102264984 100644 --- a/nomad/state/state_changes.go +++ b/nomad/state/state_changes.go @@ -190,6 +190,10 @@ func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) { return NodeRegisterEventFromChanges(tx, changes) case structs.NodeDeregisterRequestType: return NodeDeregisterEventFromChanges(tx, changes) + case structs.NodeUpdateDrainRequestType: + return NodeDrainEventFromChanges(tx, changes) + case structs.UpsertNodeEventsType: + return NodeEventFromChanges(tx, changes) } return []stream.Event{}, nil } diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index bd9529d8ccd4..c0f8f5139773 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -956,6 +956,19 @@ func (s *StateStore) BatchUpdateNodeDrain(index uint64, updatedAt int64, updates return nil } +// UpdateNodeDrain is used to update the drain of a node +func (s *StateStore) UpdateNodeDrainCtx(ctx context.Context, index uint64, nodeID string, + drain *structs.DrainStrategy, markEligible bool, updatedAt int64, event *structs.NodeEvent) error { + + txn := s.db.WriteTxnCtx(ctx, index) + defer txn.Abort() + if err := s.updateNodeDrainImpl(txn, index, nodeID, drain, markEligible, updatedAt, event); err != nil { + return err + } + txn.Commit() + return nil +} + // UpdateNodeDrain is used to update the drain of a node func (s *StateStore) UpdateNodeDrain(index uint64, nodeID string, drain *structs.DrainStrategy, markEligible bool, updatedAt int64, event *structs.NodeEvent) error { @@ -1059,6 +1072,20 @@ func (s *StateStore) UpdateNodeEligibility(index uint64, nodeID string, eligibil return nil } +func (s *StateStore) UpsertNodeEventsCtx(ctx context.Context, index uint64, nodeEvents map[string][]*structs.NodeEvent) error { + txn := s.db.WriteTxnCtx(ctx, index) + defer txn.Abort() + + for nodeID, events := range nodeEvents { + if err := s.upsertNodeEvents(index, nodeID, events, txn); err != nil { + return err + } + } + + txn.Commit() + return nil +} + // UpsertNodeEvents adds the node events to the nodes, rotating events as // necessary. func (s *StateStore) UpsertNodeEvents(index uint64, nodeEvents map[string][]*structs.NodeEvent) error { @@ -3375,6 +3402,10 @@ func (s *StateStore) AllocsByIDPrefixInNSes(ws memdb.WatchSet, namespaces map[st func (s *StateStore) AllocsByNode(ws memdb.WatchSet, node string) ([]*structs.Allocation, error) { txn := s.db.ReadTxn() + return allocsByNodeTxn(txn, ws, node) +} + +func allocsByNodeTxn(txn ReadTxn, ws memdb.WatchSet, node string) ([]*structs.Allocation, error) { // Get an iterator over the node allocations, using only the // node prefix which ignores the terminal status iter, err := txn.Get("allocs", "node_prefix", node) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index a416b3d46348..667ec724b74f 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -8836,6 +8836,15 @@ func (a *Allocation) ReschedulePolicy() *ReschedulePolicy { return tg.ReschedulePolicy } +// MigrateStrategy returns the migrate strategy based on the task group +func (a *Allocation) MigrateStrategy() *MigrateStrategy { + tg := a.Job.LookupTaskGroup(a.TaskGroup) + if tg == nil { + return nil + } + return tg.Migrate +} + // NextRescheduleTime returns a time on or after which the allocation is eligible to be rescheduled, // and whether the next reschedule time is within policy's interval if the policy doesn't allow unlimited reschedules func (a *Allocation) NextRescheduleTime() (time.Time, bool) {