From c702353863e23cd845a06da791602afc42e3e315 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Tue, 21 May 2019 15:45:00 -0400 Subject: [PATCH 1/2] set node.StatusUpdatedAt in raft Fix a case where `node.StatusUpdatedAt` was manipulated directly in memory. This ensures that StatusUpdatedAt is set in raft layer, and ensures that the field is updated when node drain/eligibility is updated too. --- nomad/drainer_shims.go | 7 ++++++- nomad/fsm.go | 8 ++++---- nomad/node_endpoint.go | 8 +++++++- nomad/state/state_store.go | 17 ++++++++++------- nomad/state/state_store_test.go | 21 +++++++++++++-------- nomad/structs/structs.go | 10 ++++++++++ 6 files changed, 50 insertions(+), 21 deletions(-) diff --git a/nomad/drainer_shims.go b/nomad/drainer_shims.go index c9795d5ac4ee..1df9b9aa47e6 100644 --- a/nomad/drainer_shims.go +++ b/nomad/drainer_shims.go @@ -1,6 +1,10 @@ package nomad -import "github.com/hashicorp/nomad/nomad/structs" +import ( + "time" + + "github.com/hashicorp/nomad/nomad/structs" +) // drainerShim implements the drainer.RaftApplier interface required by the // NodeDrainer. @@ -13,6 +17,7 @@ func (d drainerShim) NodesDrainComplete(nodes []string, event *structs.NodeEvent Updates: make(map[string]*structs.DrainUpdate, len(nodes)), NodeEvents: make(map[string]*structs.NodeEvent, len(nodes)), WriteRequest: structs.WriteRequest{Region: d.s.config.Region}, + UpdatedAt: time.Now().Unix(), } update := &structs.DrainUpdate{} diff --git a/nomad/fsm.go b/nomad/fsm.go index 3c91b8f5c694..e2f47783bfc0 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -310,7 +310,7 @@ func (n *nomadFSM) applyStatusUpdate(buf []byte, index uint64) interface{} { panic(fmt.Errorf("failed to decode request: %v", err)) } - if err := n.state.UpdateNodeStatus(index, req.NodeID, req.Status, req.NodeEvent); err != nil { + if err := n.state.UpdateNodeStatus(index, req.NodeID, req.Status, req.UpdatedAt, req.NodeEvent); err != nil { n.logger.Error("UpdateNodeStatus failed", "error", err) return err } @@ -352,7 +352,7 @@ func (n *nomadFSM) applyDrainUpdate(buf []byte, index uint64) interface{} { } } - if err := n.state.UpdateNodeDrain(index, req.NodeID, req.DrainStrategy, req.MarkEligible, req.NodeEvent); err != nil { + if err := n.state.UpdateNodeDrain(index, req.NodeID, req.DrainStrategy, req.MarkEligible, req.UpdatedAt, req.NodeEvent); err != nil { n.logger.Error("UpdateNodeDrain failed", "error", err) return err } @@ -366,7 +366,7 @@ func (n *nomadFSM) applyBatchDrainUpdate(buf []byte, index uint64) interface{} { panic(fmt.Errorf("failed to decode request: %v", err)) } - if err := n.state.BatchUpdateNodeDrain(index, req.Updates, req.NodeEvents); err != nil { + if err := n.state.BatchUpdateNodeDrain(index, req.UpdatedAt, req.Updates, req.NodeEvents); err != nil { n.logger.Error("BatchUpdateNodeDrain failed", "error", err) return err } @@ -387,7 +387,7 @@ func (n *nomadFSM) applyNodeEligibilityUpdate(buf []byte, index uint64) interfac return err } - if err := n.state.UpdateNodeEligibility(index, req.NodeID, req.Eligibility, req.NodeEvent); err != nil { + if err := n.state.UpdateNodeEligibility(index, req.NodeID, req.Eligibility, req.UpdatedAt, req.NodeEvent); err != nil { n.logger.Error("UpdateNodeEligibility failed", "error", err) return err } diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index 5e57406f8a0c..1fed74bbc28f 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -369,7 +369,7 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct // to track SecretIDs. // Update the timestamp of when the node status was updated - node.StatusUpdatedAt = time.Now().Unix() + args.UpdatedAt = time.Now().Unix() // Commit this update via Raft var index uint64 @@ -484,6 +484,9 @@ func (n *Node) UpdateDrain(args *structs.NodeUpdateDrainRequest, return fmt.Errorf("node not found") } + // Update the timestamp of when the node status was updated + args.UpdatedAt = time.Now().Unix() + // COMPAT: Remove in 0.9. Attempt to upgrade the request if it is of the old // format. if args.Drain && args.DrainStrategy == nil { @@ -589,6 +592,9 @@ func (n *Node) UpdateEligibility(args *structs.NodeUpdateEligibilityRequest, return fmt.Errorf("invalid scheduling eligibility %q", args.Eligibility) } + // Update the timestamp of when the node status was updated + args.UpdatedAt = time.Now().Unix() + // Construct the node event args.NodeEvent = structs.NewNodeEvent().SetSubsystem(structs.NodeEventSubsystemCluster) if node.SchedulingEligibility == args.Eligibility { diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 8bb3b3352e50..cff0aa2347b6 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -732,7 +732,7 @@ func (s *StateStore) DeleteNode(index uint64, nodeID string) error { } // UpdateNodeStatus is used to update the status of a node -func (s *StateStore) UpdateNodeStatus(index uint64, nodeID, status string, event *structs.NodeEvent) error { +func (s *StateStore) UpdateNodeStatus(index uint64, nodeID, status string, updatedAt int64, event *structs.NodeEvent) error { txn := s.db.Txn(true) defer txn.Abort() @@ -748,6 +748,7 @@ func (s *StateStore) UpdateNodeStatus(index uint64, nodeID, status string, event // Copy the existing node existingNode := existing.(*structs.Node) copyNode := existingNode.Copy() + copyNode.StatusUpdatedAt = updatedAt // Add the event if given if event != nil { @@ -771,11 +772,11 @@ func (s *StateStore) UpdateNodeStatus(index uint64, nodeID, status string, event } // BatchUpdateNodeDrain is used to update the drain of a node set of nodes -func (s *StateStore) BatchUpdateNodeDrain(index uint64, updates map[string]*structs.DrainUpdate, events map[string]*structs.NodeEvent) error { +func (s *StateStore) BatchUpdateNodeDrain(index uint64, updatedAt int64, updates map[string]*structs.DrainUpdate, events map[string]*structs.NodeEvent) error { txn := s.db.Txn(true) defer txn.Abort() for node, update := range updates { - if err := s.updateNodeDrainImpl(txn, index, node, update.DrainStrategy, update.MarkEligible, events[node]); err != nil { + if err := s.updateNodeDrainImpl(txn, index, node, update.DrainStrategy, update.MarkEligible, updatedAt, events[node]); err != nil { return err } } @@ -785,11 +786,11 @@ func (s *StateStore) BatchUpdateNodeDrain(index uint64, updates map[string]*stru // UpdateNodeDrain is used to update the drain of a node func (s *StateStore) UpdateNodeDrain(index uint64, nodeID string, - drain *structs.DrainStrategy, markEligible bool, event *structs.NodeEvent) error { + drain *structs.DrainStrategy, markEligible bool, updatedAt int64, event *structs.NodeEvent) error { txn := s.db.Txn(true) defer txn.Abort() - if err := s.updateNodeDrainImpl(txn, index, nodeID, drain, markEligible, event); err != nil { + if err := s.updateNodeDrainImpl(txn, index, nodeID, drain, markEligible, updatedAt, event); err != nil { return err } txn.Commit() @@ -797,7 +798,7 @@ func (s *StateStore) UpdateNodeDrain(index uint64, nodeID string, } func (s *StateStore) updateNodeDrainImpl(txn *memdb.Txn, index uint64, nodeID string, - drain *structs.DrainStrategy, markEligible bool, event *structs.NodeEvent) error { + drain *structs.DrainStrategy, markEligible bool, updatedAt int64, event *structs.NodeEvent) error { // Lookup the node existing, err := txn.First("nodes", "id", nodeID) @@ -811,6 +812,7 @@ func (s *StateStore) updateNodeDrainImpl(txn *memdb.Txn, index uint64, nodeID st // Copy the existing node existingNode := existing.(*structs.Node) copyNode := existingNode.Copy() + copyNode.StatusUpdatedAt = updatedAt // Add the event if given if event != nil { @@ -840,7 +842,7 @@ func (s *StateStore) updateNodeDrainImpl(txn *memdb.Txn, index uint64, nodeID st } // UpdateNodeEligibility is used to update the scheduling eligibility of a node -func (s *StateStore) UpdateNodeEligibility(index uint64, nodeID string, eligibility string, event *structs.NodeEvent) error { +func (s *StateStore) UpdateNodeEligibility(index uint64, nodeID string, eligibility string, updatedAt int64, event *structs.NodeEvent) error { txn := s.db.Txn(true) defer txn.Abort() @@ -857,6 +859,7 @@ func (s *StateStore) UpdateNodeEligibility(index uint64, nodeID string, eligibil // Copy the existing node existingNode := existing.(*structs.Node) copyNode := existingNode.Copy() + copyNode.StatusUpdatedAt = updatedAt // Add the event if given if event != nil { diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 3cc7a23ce020..7db7a2c7ff90 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -857,7 +857,7 @@ func TestStateStore_UpdateNodeStatus_Node(t *testing.T) { Timestamp: time.Now(), } - require.NoError(state.UpdateNodeStatus(801, node.ID, structs.NodeStatusReady, event)) + require.NoError(state.UpdateNodeStatus(801, node.ID, structs.NodeStatusReady, 70, event)) require.True(watchFired(ws)) ws = memdb.NewWatchSet() @@ -865,6 +865,7 @@ func TestStateStore_UpdateNodeStatus_Node(t *testing.T) { require.NoError(err) require.Equal(structs.NodeStatusReady, out.Status) require.EqualValues(801, out.ModifyIndex) + require.EqualValues(70, out.StatusUpdatedAt) require.Len(out.Events, 2) require.Equal(event.Message, out.Events[1].Message) @@ -912,7 +913,7 @@ func TestStateStore_BatchUpdateNodeDrain(t *testing.T) { n2.ID: event, } - require.Nil(state.BatchUpdateNodeDrain(1002, update, events)) + require.Nil(state.BatchUpdateNodeDrain(1002, 7, update, events)) require.True(watchFired(ws)) ws = memdb.NewWatchSet() @@ -924,6 +925,7 @@ func TestStateStore_BatchUpdateNodeDrain(t *testing.T) { require.Equal(out.DrainStrategy, expectedDrain) require.Len(out.Events, 2) require.EqualValues(1002, out.ModifyIndex) + require.EqualValues(7, out.StatusUpdatedAt) } index, err := state.Index("nodes") @@ -955,7 +957,7 @@ func TestStateStore_UpdateNodeDrain_Node(t *testing.T) { Subsystem: structs.NodeEventSubsystemDrain, Timestamp: time.Now(), } - require.Nil(state.UpdateNodeDrain(1001, node.ID, expectedDrain, false, event)) + require.Nil(state.UpdateNodeDrain(1001, node.ID, expectedDrain, false, 7, event)) require.True(watchFired(ws)) ws = memdb.NewWatchSet() @@ -966,6 +968,7 @@ func TestStateStore_UpdateNodeDrain_Node(t *testing.T) { require.Equal(out.DrainStrategy, expectedDrain) require.Len(out.Events, 2) require.EqualValues(1001, out.ModifyIndex) + require.EqualValues(7, out.StatusUpdatedAt) index, err := state.Index("nodes") require.Nil(err) @@ -1084,7 +1087,7 @@ func TestStateStore_UpdateNodeDrain_ResetEligiblity(t *testing.T) { Subsystem: structs.NodeEventSubsystemDrain, Timestamp: time.Now(), } - require.Nil(state.UpdateNodeDrain(1001, node.ID, drain, false, event1)) + require.Nil(state.UpdateNodeDrain(1001, node.ID, drain, false, 7, event1)) require.True(watchFired(ws)) // Remove the drain @@ -1093,7 +1096,7 @@ func TestStateStore_UpdateNodeDrain_ResetEligiblity(t *testing.T) { Subsystem: structs.NodeEventSubsystemDrain, Timestamp: time.Now(), } - require.Nil(state.UpdateNodeDrain(1002, node.ID, nil, true, event2)) + require.Nil(state.UpdateNodeDrain(1002, node.ID, nil, true, 9, event2)) ws = memdb.NewWatchSet() out, err := state.NodeByID(ws, node.ID) @@ -1103,6 +1106,7 @@ func TestStateStore_UpdateNodeDrain_ResetEligiblity(t *testing.T) { require.Equal(out.SchedulingEligibility, structs.NodeSchedulingEligible) require.Len(out.Events, 3) require.EqualValues(1002, out.ModifyIndex) + require.EqualValues(9, out.StatusUpdatedAt) index, err := state.Index("nodes") require.Nil(err) @@ -1133,7 +1137,7 @@ func TestStateStore_UpdateNodeEligibility(t *testing.T) { Subsystem: structs.NodeEventSubsystemCluster, Timestamp: time.Now(), } - require.Nil(state.UpdateNodeEligibility(1001, node.ID, expectedEligibility, event)) + require.Nil(state.UpdateNodeEligibility(1001, node.ID, expectedEligibility, 7, event)) require.True(watchFired(ws)) ws = memdb.NewWatchSet() @@ -1143,6 +1147,7 @@ func TestStateStore_UpdateNodeEligibility(t *testing.T) { require.Len(out.Events, 2) require.Equal(out.Events[1], event) require.EqualValues(1001, out.ModifyIndex) + require.EqualValues(7, out.StatusUpdatedAt) index, err := state.Index("nodes") require.Nil(err) @@ -1155,10 +1160,10 @@ func TestStateStore_UpdateNodeEligibility(t *testing.T) { Deadline: -1 * time.Second, }, } - require.Nil(state.UpdateNodeDrain(1002, node.ID, expectedDrain, false, nil)) + require.Nil(state.UpdateNodeDrain(1002, node.ID, expectedDrain, false, 7, nil)) // Try to set the node to eligible - err = state.UpdateNodeEligibility(1003, node.ID, structs.NodeSchedulingEligible, nil) + err = state.UpdateNodeEligibility(1003, node.ID, structs.NodeSchedulingEligible, 9, nil) require.NotNil(err) require.Contains(err.Error(), "while it is draining") } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 33c81dd7e24d..b1d259c9ecf9 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -347,6 +347,7 @@ type NodeUpdateStatusRequest struct { NodeID string Status string NodeEvent *NodeEvent + UpdatedAt int64 WriteRequest } @@ -367,6 +368,9 @@ type NodeUpdateDrainRequest struct { // NodeEvent is the event added to the node NodeEvent *NodeEvent + // UpdatedAt represents server time of receiving request + UpdatedAt int64 + WriteRequest } @@ -379,6 +383,9 @@ type BatchNodeUpdateDrainRequest struct { // NodeEvents is a mapping of the node to the event to add to the node NodeEvents map[string]*NodeEvent + // UpdatedAt represents server time of receiving request + UpdatedAt int64 + WriteRequest } @@ -399,6 +406,9 @@ type NodeUpdateEligibilityRequest struct { // NodeEvent is the event added to the node NodeEvent *NodeEvent + // UpdatedAt represents server time of receiving request + UpdatedAt int64 + WriteRequest } From 40b0c641636f922c8d73d00e0a0c7b3a43330da2 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Tue, 21 May 2019 21:10:17 -0400 Subject: [PATCH 2/2] update callers in tests --- nomad/drainer/watch_nodes_test.go | 4 ++-- nomad/node_endpoint_test.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/nomad/drainer/watch_nodes_test.go b/nomad/drainer/watch_nodes_test.go index ca64e2dd134d..6155d577f392 100644 --- a/nomad/drainer/watch_nodes_test.go +++ b/nomad/drainer/watch_nodes_test.go @@ -88,7 +88,7 @@ func TestNodeDrainWatcher_Remove(t *testing.T) { require.Equal(n, tracked[n.ID]) // Change the node to be not draining and wait for it to be untracked - require.Nil(state.UpdateNodeDrain(101, n.ID, nil, false, nil)) + require.Nil(state.UpdateNodeDrain(101, n.ID, nil, false, 0, nil)) testutil.WaitForResult(func() (bool, error) { return len(m.Events) == 2, nil }, func(err error) { @@ -166,7 +166,7 @@ func TestNodeDrainWatcher_Update(t *testing.T) { // Change the node to have a new spec s2 := n.DrainStrategy.Copy() s2.Deadline += time.Hour - require.Nil(state.UpdateNodeDrain(101, n.ID, s2, false, nil)) + require.Nil(state.UpdateNodeDrain(101, n.ID, s2, false, 0, nil)) // Wait for it to be updated testutil.WaitForResult(func() (bool, error) { diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 70542392bdcb..6b7b90e2f400 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -2662,7 +2662,7 @@ func TestClientEndpoint_ListNodes_Blocking(t *testing.T) { Deadline: 10 * time.Second, }, } - errCh <- state.UpdateNodeDrain(3, node.ID, s, false, nil) + errCh <- state.UpdateNodeDrain(3, node.ID, s, false, 0, nil) }) req.MinQueryIndex = 2 @@ -2688,7 +2688,7 @@ func TestClientEndpoint_ListNodes_Blocking(t *testing.T) { // Node status update triggers watches time.AfterFunc(100*time.Millisecond, func() { - errCh <- state.UpdateNodeStatus(40, node.ID, structs.NodeStatusDown, nil) + errCh <- state.UpdateNodeStatus(40, node.ID, structs.NodeStatusDown, 0, nil) }) req.MinQueryIndex = 38