Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

set node.StatusUpdatedAt in raft #5746

Merged
merged 2 commits into from
Jun 5, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion nomad/drainer_shims.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package nomad

import "github.com/hashicorp/nomad/nomad/structs"
import (
"time"

"github.com/hashicorp/nomad/nomad/structs"
)

// drainerShim implements the drainer.RaftApplier interface required by the
// NodeDrainer.
Expand All @@ -13,6 +17,7 @@ func (d drainerShim) NodesDrainComplete(nodes []string, event *structs.NodeEvent
Updates: make(map[string]*structs.DrainUpdate, len(nodes)),
NodeEvents: make(map[string]*structs.NodeEvent, len(nodes)),
WriteRequest: structs.WriteRequest{Region: d.s.config.Region},
UpdatedAt: time.Now().Unix(),
}

update := &structs.DrainUpdate{}
Expand Down
8 changes: 4 additions & 4 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func (n *nomadFSM) applyStatusUpdate(buf []byte, index uint64) interface{} {
panic(fmt.Errorf("failed to decode request: %v", err))
}

if err := n.state.UpdateNodeStatus(index, req.NodeID, req.Status, req.NodeEvent); err != nil {
if err := n.state.UpdateNodeStatus(index, req.NodeID, req.Status, req.UpdatedAt, req.NodeEvent); err != nil {
n.logger.Error("UpdateNodeStatus failed", "error", err)
return err
}
Expand Down Expand Up @@ -352,7 +352,7 @@ func (n *nomadFSM) applyDrainUpdate(buf []byte, index uint64) interface{} {
}
}

if err := n.state.UpdateNodeDrain(index, req.NodeID, req.DrainStrategy, req.MarkEligible, req.NodeEvent); err != nil {
if err := n.state.UpdateNodeDrain(index, req.NodeID, req.DrainStrategy, req.MarkEligible, req.UpdatedAt, req.NodeEvent); err != nil {
n.logger.Error("UpdateNodeDrain failed", "error", err)
return err
}
Expand All @@ -366,7 +366,7 @@ func (n *nomadFSM) applyBatchDrainUpdate(buf []byte, index uint64) interface{} {
panic(fmt.Errorf("failed to decode request: %v", err))
}

if err := n.state.BatchUpdateNodeDrain(index, req.Updates, req.NodeEvents); err != nil {
if err := n.state.BatchUpdateNodeDrain(index, req.UpdatedAt, req.Updates, req.NodeEvents); err != nil {
n.logger.Error("BatchUpdateNodeDrain failed", "error", err)
return err
}
Expand All @@ -387,7 +387,7 @@ func (n *nomadFSM) applyNodeEligibilityUpdate(buf []byte, index uint64) interfac
return err
}

if err := n.state.UpdateNodeEligibility(index, req.NodeID, req.Eligibility, req.NodeEvent); err != nil {
if err := n.state.UpdateNodeEligibility(index, req.NodeID, req.Eligibility, req.UpdatedAt, req.NodeEvent); err != nil {
n.logger.Error("UpdateNodeEligibility failed", "error", err)
return err
}
Expand Down
8 changes: 7 additions & 1 deletion nomad/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct
// to track SecretIDs.

// Update the timestamp of when the node status was updated
node.StatusUpdatedAt = time.Now().Unix()
args.UpdatedAt = time.Now().Unix()

// Commit this update via Raft
var index uint64
Expand Down Expand Up @@ -484,6 +484,9 @@ func (n *Node) UpdateDrain(args *structs.NodeUpdateDrainRequest,
return fmt.Errorf("node not found")
}

// Update the timestamp of when the node status was updated
args.UpdatedAt = time.Now().Unix()

// COMPAT: Remove in 0.9. Attempt to upgrade the request if it is of the old
// format.
if args.Drain && args.DrainStrategy == nil {
Expand Down Expand Up @@ -589,6 +592,9 @@ func (n *Node) UpdateEligibility(args *structs.NodeUpdateEligibilityRequest,
return fmt.Errorf("invalid scheduling eligibility %q", args.Eligibility)
}

// Update the timestamp of when the node status was updated
args.UpdatedAt = time.Now().Unix()

// Construct the node event
args.NodeEvent = structs.NewNodeEvent().SetSubsystem(structs.NodeEventSubsystemCluster)
if node.SchedulingEligibility == args.Eligibility {
Expand Down
17 changes: 10 additions & 7 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -732,7 +732,7 @@ func (s *StateStore) DeleteNode(index uint64, nodeID string) error {
}

// UpdateNodeStatus is used to update the status of a node
func (s *StateStore) UpdateNodeStatus(index uint64, nodeID, status string, event *structs.NodeEvent) error {
func (s *StateStore) UpdateNodeStatus(index uint64, nodeID, status string, updatedAt int64, event *structs.NodeEvent) error {
txn := s.db.Txn(true)
defer txn.Abort()

Expand All @@ -748,6 +748,7 @@ func (s *StateStore) UpdateNodeStatus(index uint64, nodeID, status string, event
// Copy the existing node
existingNode := existing.(*structs.Node)
copyNode := existingNode.Copy()
copyNode.StatusUpdatedAt = updatedAt

// Add the event if given
if event != nil {
Expand All @@ -771,11 +772,11 @@ func (s *StateStore) UpdateNodeStatus(index uint64, nodeID, status string, event
}

// BatchUpdateNodeDrain is used to update the drain of a node set of nodes
func (s *StateStore) BatchUpdateNodeDrain(index uint64, updates map[string]*structs.DrainUpdate, events map[string]*structs.NodeEvent) error {
func (s *StateStore) BatchUpdateNodeDrain(index uint64, updatedAt int64, updates map[string]*structs.DrainUpdate, events map[string]*structs.NodeEvent) error {
txn := s.db.Txn(true)
defer txn.Abort()
for node, update := range updates {
if err := s.updateNodeDrainImpl(txn, index, node, update.DrainStrategy, update.MarkEligible, events[node]); err != nil {
if err := s.updateNodeDrainImpl(txn, index, node, update.DrainStrategy, update.MarkEligible, updatedAt, events[node]); err != nil {
return err
}
}
Expand All @@ -785,19 +786,19 @@ func (s *StateStore) BatchUpdateNodeDrain(index uint64, updates map[string]*stru

// UpdateNodeDrain is used to update the drain of a node
func (s *StateStore) UpdateNodeDrain(index uint64, nodeID string,
drain *structs.DrainStrategy, markEligible bool, event *structs.NodeEvent) error {
drain *structs.DrainStrategy, markEligible bool, updatedAt int64, event *structs.NodeEvent) error {

txn := s.db.Txn(true)
defer txn.Abort()
if err := s.updateNodeDrainImpl(txn, index, nodeID, drain, markEligible, event); err != nil {
if err := s.updateNodeDrainImpl(txn, index, nodeID, drain, markEligible, updatedAt, event); err != nil {
return err
}
txn.Commit()
return nil
}

func (s *StateStore) updateNodeDrainImpl(txn *memdb.Txn, index uint64, nodeID string,
drain *structs.DrainStrategy, markEligible bool, event *structs.NodeEvent) error {
drain *structs.DrainStrategy, markEligible bool, updatedAt int64, event *structs.NodeEvent) error {

// Lookup the node
existing, err := txn.First("nodes", "id", nodeID)
Expand All @@ -811,6 +812,7 @@ func (s *StateStore) updateNodeDrainImpl(txn *memdb.Txn, index uint64, nodeID st
// Copy the existing node
existingNode := existing.(*structs.Node)
copyNode := existingNode.Copy()
copyNode.StatusUpdatedAt = updatedAt

// Add the event if given
if event != nil {
Expand Down Expand Up @@ -840,7 +842,7 @@ func (s *StateStore) updateNodeDrainImpl(txn *memdb.Txn, index uint64, nodeID st
}

// UpdateNodeEligibility is used to update the scheduling eligibility of a node
func (s *StateStore) UpdateNodeEligibility(index uint64, nodeID string, eligibility string, event *structs.NodeEvent) error {
func (s *StateStore) UpdateNodeEligibility(index uint64, nodeID string, eligibility string, updatedAt int64, event *structs.NodeEvent) error {

txn := s.db.Txn(true)
defer txn.Abort()
Expand All @@ -857,6 +859,7 @@ func (s *StateStore) UpdateNodeEligibility(index uint64, nodeID string, eligibil
// Copy the existing node
existingNode := existing.(*structs.Node)
copyNode := existingNode.Copy()
copyNode.StatusUpdatedAt = updatedAt

// Add the event if given
if event != nil {
Expand Down
21 changes: 13 additions & 8 deletions nomad/state/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -857,14 +857,15 @@ func TestStateStore_UpdateNodeStatus_Node(t *testing.T) {
Timestamp: time.Now(),
}

require.NoError(state.UpdateNodeStatus(801, node.ID, structs.NodeStatusReady, event))
require.NoError(state.UpdateNodeStatus(801, node.ID, structs.NodeStatusReady, 70, event))
require.True(watchFired(ws))

ws = memdb.NewWatchSet()
out, err := state.NodeByID(ws, node.ID)
require.NoError(err)
require.Equal(structs.NodeStatusReady, out.Status)
require.EqualValues(801, out.ModifyIndex)
require.EqualValues(70, out.StatusUpdatedAt)
require.Len(out.Events, 2)
require.Equal(event.Message, out.Events[1].Message)

Expand Down Expand Up @@ -912,7 +913,7 @@ func TestStateStore_BatchUpdateNodeDrain(t *testing.T) {
n2.ID: event,
}

require.Nil(state.BatchUpdateNodeDrain(1002, update, events))
require.Nil(state.BatchUpdateNodeDrain(1002, 7, update, events))
require.True(watchFired(ws))

ws = memdb.NewWatchSet()
Expand All @@ -924,6 +925,7 @@ func TestStateStore_BatchUpdateNodeDrain(t *testing.T) {
require.Equal(out.DrainStrategy, expectedDrain)
require.Len(out.Events, 2)
require.EqualValues(1002, out.ModifyIndex)
require.EqualValues(7, out.StatusUpdatedAt)
}

index, err := state.Index("nodes")
Expand Down Expand Up @@ -955,7 +957,7 @@ func TestStateStore_UpdateNodeDrain_Node(t *testing.T) {
Subsystem: structs.NodeEventSubsystemDrain,
Timestamp: time.Now(),
}
require.Nil(state.UpdateNodeDrain(1001, node.ID, expectedDrain, false, event))
require.Nil(state.UpdateNodeDrain(1001, node.ID, expectedDrain, false, 7, event))
require.True(watchFired(ws))

ws = memdb.NewWatchSet()
Expand All @@ -966,6 +968,7 @@ func TestStateStore_UpdateNodeDrain_Node(t *testing.T) {
require.Equal(out.DrainStrategy, expectedDrain)
require.Len(out.Events, 2)
require.EqualValues(1001, out.ModifyIndex)
require.EqualValues(7, out.StatusUpdatedAt)

index, err := state.Index("nodes")
require.Nil(err)
Expand Down Expand Up @@ -1084,7 +1087,7 @@ func TestStateStore_UpdateNodeDrain_ResetEligiblity(t *testing.T) {
Subsystem: structs.NodeEventSubsystemDrain,
Timestamp: time.Now(),
}
require.Nil(state.UpdateNodeDrain(1001, node.ID, drain, false, event1))
require.Nil(state.UpdateNodeDrain(1001, node.ID, drain, false, 7, event1))
require.True(watchFired(ws))

// Remove the drain
Expand All @@ -1093,7 +1096,7 @@ func TestStateStore_UpdateNodeDrain_ResetEligiblity(t *testing.T) {
Subsystem: structs.NodeEventSubsystemDrain,
Timestamp: time.Now(),
}
require.Nil(state.UpdateNodeDrain(1002, node.ID, nil, true, event2))
require.Nil(state.UpdateNodeDrain(1002, node.ID, nil, true, 9, event2))

ws = memdb.NewWatchSet()
out, err := state.NodeByID(ws, node.ID)
Expand All @@ -1103,6 +1106,7 @@ func TestStateStore_UpdateNodeDrain_ResetEligiblity(t *testing.T) {
require.Equal(out.SchedulingEligibility, structs.NodeSchedulingEligible)
require.Len(out.Events, 3)
require.EqualValues(1002, out.ModifyIndex)
require.EqualValues(9, out.StatusUpdatedAt)

index, err := state.Index("nodes")
require.Nil(err)
Expand Down Expand Up @@ -1133,7 +1137,7 @@ func TestStateStore_UpdateNodeEligibility(t *testing.T) {
Subsystem: structs.NodeEventSubsystemCluster,
Timestamp: time.Now(),
}
require.Nil(state.UpdateNodeEligibility(1001, node.ID, expectedEligibility, event))
require.Nil(state.UpdateNodeEligibility(1001, node.ID, expectedEligibility, 7, event))
require.True(watchFired(ws))

ws = memdb.NewWatchSet()
Expand All @@ -1143,6 +1147,7 @@ func TestStateStore_UpdateNodeEligibility(t *testing.T) {
require.Len(out.Events, 2)
require.Equal(out.Events[1], event)
require.EqualValues(1001, out.ModifyIndex)
require.EqualValues(7, out.StatusUpdatedAt)

index, err := state.Index("nodes")
require.Nil(err)
Expand All @@ -1155,10 +1160,10 @@ func TestStateStore_UpdateNodeEligibility(t *testing.T) {
Deadline: -1 * time.Second,
},
}
require.Nil(state.UpdateNodeDrain(1002, node.ID, expectedDrain, false, nil))
require.Nil(state.UpdateNodeDrain(1002, node.ID, expectedDrain, false, 7, nil))

// Try to set the node to eligible
err = state.UpdateNodeEligibility(1003, node.ID, structs.NodeSchedulingEligible, nil)
err = state.UpdateNodeEligibility(1003, node.ID, structs.NodeSchedulingEligible, 9, nil)
require.NotNil(err)
require.Contains(err.Error(), "while it is draining")
}
Expand Down
10 changes: 10 additions & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ type NodeUpdateStatusRequest struct {
NodeID string
Status string
NodeEvent *NodeEvent
UpdatedAt int64
WriteRequest
}

Expand All @@ -367,6 +368,9 @@ type NodeUpdateDrainRequest struct {
// NodeEvent is the event added to the node
NodeEvent *NodeEvent

// UpdatedAt represents server time of receiving request
UpdatedAt int64

WriteRequest
}

Expand All @@ -379,6 +383,9 @@ type BatchNodeUpdateDrainRequest struct {
// NodeEvents is a mapping of the node to the event to add to the node
NodeEvents map[string]*NodeEvent

// UpdatedAt represents server time of receiving request
UpdatedAt int64

WriteRequest
}

Expand All @@ -399,6 +406,9 @@ type NodeUpdateEligibilityRequest struct {
// NodeEvent is the event added to the node
NodeEvent *NodeEvent

// UpdatedAt represents server time of receiving request
UpdatedAt int64

WriteRequest
}

Expand Down