Skip to content

Commit

Permalink
set node.StatusUpdatedAt in raft
Browse files Browse the repository at this point in the history
Fix a case where `node.StatusUpdatedAt` was manipulated directly in
memory.

This ensures that StatusUpdatedAt is set in raft layer, and ensures that
the field is updated when node drain/eligibility is updated too.
  • Loading branch information
Mahmood Ali committed May 21, 2019
1 parent 121927f commit 0a4fc19
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 13 deletions.
7 changes: 6 additions & 1 deletion nomad/drainer_shims.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package nomad

import "github.com/hashicorp/nomad/nomad/structs"
import (
"time"

"github.com/hashicorp/nomad/nomad/structs"
)

// drainerShim implements the drainer.RaftApplier interface required by the
// NodeDrainer.
Expand All @@ -13,6 +17,7 @@ func (d drainerShim) NodesDrainComplete(nodes []string, event *structs.NodeEvent
Updates: make(map[string]*structs.DrainUpdate, len(nodes)),
NodeEvents: make(map[string]*structs.NodeEvent, len(nodes)),
WriteRequest: structs.WriteRequest{Region: d.s.config.Region},
UpdatedAt: time.Now().Unix(),
}

update := &structs.DrainUpdate{}
Expand Down
8 changes: 4 additions & 4 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func (n *nomadFSM) applyStatusUpdate(buf []byte, index uint64) interface{} {
panic(fmt.Errorf("failed to decode request: %v", err))
}

if err := n.state.UpdateNodeStatus(index, req.NodeID, req.Status, req.NodeEvent); err != nil {
if err := n.state.UpdateNodeStatus(index, req.NodeID, req.Status, req.UpdatedAt, req.NodeEvent); err != nil {
n.logger.Error("UpdateNodeStatus failed", "error", err)
return err
}
Expand Down Expand Up @@ -352,7 +352,7 @@ func (n *nomadFSM) applyDrainUpdate(buf []byte, index uint64) interface{} {
}
}

if err := n.state.UpdateNodeDrain(index, req.NodeID, req.DrainStrategy, req.MarkEligible, req.NodeEvent); err != nil {
if err := n.state.UpdateNodeDrain(index, req.NodeID, req.DrainStrategy, req.MarkEligible, req.UpdatedAt, req.NodeEvent); err != nil {
n.logger.Error("UpdateNodeDrain failed", "error", err)
return err
}
Expand All @@ -366,7 +366,7 @@ func (n *nomadFSM) applyBatchDrainUpdate(buf []byte, index uint64) interface{} {
panic(fmt.Errorf("failed to decode request: %v", err))
}

if err := n.state.BatchUpdateNodeDrain(index, req.Updates, req.NodeEvents); err != nil {
if err := n.state.BatchUpdateNodeDrain(index, req.UpdatedAt, req.Updates, req.NodeEvents); err != nil {
n.logger.Error("BatchUpdateNodeDrain failed", "error", err)
return err
}
Expand All @@ -387,7 +387,7 @@ func (n *nomadFSM) applyNodeEligibilityUpdate(buf []byte, index uint64) interfac
return err
}

if err := n.state.UpdateNodeEligibility(index, req.NodeID, req.Eligibility, req.NodeEvent); err != nil {
if err := n.state.UpdateNodeEligibility(index, req.NodeID, req.Eligibility, req.UpdatedAt, req.NodeEvent); err != nil {
n.logger.Error("UpdateNodeEligibility failed", "error", err)
return err
}
Expand Down
8 changes: 7 additions & 1 deletion nomad/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct
// to track SecretIDs.

// Update the timestamp of when the node status was updated
node.StatusUpdatedAt = time.Now().Unix()
args.UpdatedAt = time.Now().Unix()

// Commit this update via Raft
var index uint64
Expand Down Expand Up @@ -484,6 +484,9 @@ func (n *Node) UpdateDrain(args *structs.NodeUpdateDrainRequest,
return fmt.Errorf("node not found")
}

// Update the timestamp of when the node status was updated
args.UpdatedAt = time.Now().Unix()

// COMPAT: Remove in 0.9. Attempt to upgrade the request if it is of the old
// format.
if args.Drain && args.DrainStrategy == nil {
Expand Down Expand Up @@ -589,6 +592,9 @@ func (n *Node) UpdateEligibility(args *structs.NodeUpdateEligibilityRequest,
return fmt.Errorf("invalid scheduling eligibility %q", args.Eligibility)
}

// Update the timestamp of when the node status was updated
args.UpdatedAt = time.Now().Unix()

// Construct the node event
args.NodeEvent = structs.NewNodeEvent().SetSubsystem(structs.NodeEventSubsystemCluster)
if node.SchedulingEligibility == args.Eligibility {
Expand Down
17 changes: 10 additions & 7 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -732,7 +732,7 @@ func (s *StateStore) DeleteNode(index uint64, nodeID string) error {
}

// UpdateNodeStatus is used to update the status of a node
func (s *StateStore) UpdateNodeStatus(index uint64, nodeID, status string, event *structs.NodeEvent) error {
func (s *StateStore) UpdateNodeStatus(index uint64, nodeID, status string, updatedAt int64, event *structs.NodeEvent) error {
txn := s.db.Txn(true)
defer txn.Abort()

Expand All @@ -748,6 +748,7 @@ func (s *StateStore) UpdateNodeStatus(index uint64, nodeID, status string, event
// Copy the existing node
existingNode := existing.(*structs.Node)
copyNode := existingNode.Copy()
copyNode.StatusUpdatedAt = updatedAt

// Add the event if given
if event != nil {
Expand All @@ -771,11 +772,11 @@ func (s *StateStore) UpdateNodeStatus(index uint64, nodeID, status string, event
}

// BatchUpdateNodeDrain is used to update the drain of a node set of nodes
func (s *StateStore) BatchUpdateNodeDrain(index uint64, updates map[string]*structs.DrainUpdate, events map[string]*structs.NodeEvent) error {
func (s *StateStore) BatchUpdateNodeDrain(index uint64, updatedAt int64, updates map[string]*structs.DrainUpdate, events map[string]*structs.NodeEvent) error {
txn := s.db.Txn(true)
defer txn.Abort()
for node, update := range updates {
if err := s.updateNodeDrainImpl(txn, index, node, update.DrainStrategy, update.MarkEligible, events[node]); err != nil {
if err := s.updateNodeDrainImpl(txn, index, node, update.DrainStrategy, update.MarkEligible, updatedAt, events[node]); err != nil {
return err
}
}
Expand All @@ -785,19 +786,19 @@ func (s *StateStore) BatchUpdateNodeDrain(index uint64, updates map[string]*stru

// UpdateNodeDrain is used to update the drain of a node
func (s *StateStore) UpdateNodeDrain(index uint64, nodeID string,
drain *structs.DrainStrategy, markEligible bool, event *structs.NodeEvent) error {
drain *structs.DrainStrategy, markEligible bool, updatedAt int64, event *structs.NodeEvent) error {

txn := s.db.Txn(true)
defer txn.Abort()
if err := s.updateNodeDrainImpl(txn, index, nodeID, drain, markEligible, event); err != nil {
if err := s.updateNodeDrainImpl(txn, index, nodeID, drain, markEligible, updatedAt, event); err != nil {
return err
}
txn.Commit()
return nil
}

func (s *StateStore) updateNodeDrainImpl(txn *memdb.Txn, index uint64, nodeID string,
drain *structs.DrainStrategy, markEligible bool, event *structs.NodeEvent) error {
drain *structs.DrainStrategy, markEligible bool, updatedAt int64, event *structs.NodeEvent) error {

// Lookup the node
existing, err := txn.First("nodes", "id", nodeID)
Expand All @@ -811,6 +812,7 @@ func (s *StateStore) updateNodeDrainImpl(txn *memdb.Txn, index uint64, nodeID st
// Copy the existing node
existingNode := existing.(*structs.Node)
copyNode := existingNode.Copy()
copyNode.StatusUpdatedAt = updatedAt

// Add the event if given
if event != nil {
Expand Down Expand Up @@ -840,7 +842,7 @@ func (s *StateStore) updateNodeDrainImpl(txn *memdb.Txn, index uint64, nodeID st
}

// UpdateNodeEligibility is used to update the scheduling eligibility of a node
func (s *StateStore) UpdateNodeEligibility(index uint64, nodeID string, eligibility string, event *structs.NodeEvent) error {
func (s *StateStore) UpdateNodeEligibility(index uint64, nodeID string, eligibility string, updatedAt int64, event *structs.NodeEvent) error {

txn := s.db.Txn(true)
defer txn.Abort()
Expand All @@ -857,6 +859,7 @@ func (s *StateStore) UpdateNodeEligibility(index uint64, nodeID string, eligibil
// Copy the existing node
existingNode := existing.(*structs.Node)
copyNode := existingNode.Copy()
copyNode.StatusUpdatedAt = updatedAt

// Add the event if given
if event != nil {
Expand Down
10 changes: 10 additions & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ type NodeUpdateStatusRequest struct {
NodeID string
Status string
NodeEvent *NodeEvent
UpdatedAt int64
WriteRequest
}

Expand All @@ -367,6 +368,9 @@ type NodeUpdateDrainRequest struct {
// NodeEvent is the event added to the node
NodeEvent *NodeEvent

// UpdatedAt represents server time of receiving request
UpdatedAt int64

WriteRequest
}

Expand All @@ -379,6 +383,9 @@ type BatchNodeUpdateDrainRequest struct {
// NodeEvents is a mapping of the node to the event to add to the node
NodeEvents map[string]*NodeEvent

// UpdatedAt represents server time of receiving request
UpdatedAt int64

WriteRequest
}

Expand All @@ -399,6 +406,9 @@ type NodeUpdateEligibilityRequest struct {
// NodeEvent is the event added to the node
NodeEvent *NodeEvent

// UpdatedAt represents server time of receiving request
UpdatedAt int64

WriteRequest
}

Expand Down

0 comments on commit 0a4fc19

Please sign in to comment.