Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Emit events when node eligibility is set #4291

Merged
merged 2 commits into from
May 22, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ func (n *nomadFSM) applyNodeEligibilityUpdate(buf []byte, index uint64) interfac
return err
}

if err := n.state.UpdateNodeEligibility(index, req.NodeID, req.Eligibility); err != nil {
if err := n.state.UpdateNodeEligibility(index, req.NodeID, req.Eligibility, req.NodeEvent); err != nil {
n.logger.Printf("[ERR] nomad.fsm: UpdateNodeEligibility failed: %v", err)
return err
}
Expand Down
9 changes: 9 additions & 0 deletions nomad/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,10 +451,17 @@ func TestFSM_UpdateNodeEligibility(t *testing.T) {
resp := fsm.Apply(makeLog(buf))
require.Nil(resp)

event := &structs.NodeEvent{
Message: "Node marked as ineligible",
Subsystem: structs.NodeEventSubsystemCluster,
Timestamp: time.Now(),
}

// Set the eligibility
req2 := structs.NodeUpdateEligibilityRequest{
NodeID: node.ID,
Eligibility: structs.NodeSchedulingIneligible,
NodeEvent: event,
}
buf, err = structs.Encode(structs.NodeUpdateEligibilityRequestType, req2)
require.Nil(err)
Expand All @@ -466,6 +473,8 @@ func TestFSM_UpdateNodeEligibility(t *testing.T) {
node, err = fsm.State().NodeByID(nil, req.Node.ID)
require.Nil(err)
require.Equal(node.SchedulingEligibility, structs.NodeSchedulingIneligible)
require.Len(node.Events, 2)
require.Equal(event.Message, node.Events[1].Message)

// Update the drain
strategy := &structs.DrainStrategy{
Expand Down
21 changes: 21 additions & 0 deletions nomad/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ const (
NodeDrainEventDrainSet = "Node drain strategy set"
NodeDrainEventDrainDisabled = "Node drain disabled"
NodeDrainEventDrainUpdated = "Node drain stategy updated"

// NodeEligibilityEventEligible is used when the nodes eligiblity is marked
// eligible
NodeEligibilityEventEligible = "Node marked as eligible for scheduling"

// NodeEligibilityEventIneligible is used when the nodes eligiblity is marked
// ineligible
NodeEligibilityEventIneligible = "Node marked as ineligible for scheduling"
)

// Node endpoint is used for client interactions
Expand Down Expand Up @@ -532,6 +540,9 @@ func (n *Node) UpdateEligibility(args *structs.NodeUpdateEligibilityRequest,
if args.NodeID == "" {
return fmt.Errorf("missing node ID for setting scheduling eligibility")
}
if args.NodeEvent != nil {
return fmt.Errorf("node event must not be set")
}

// Check that only allowed types are set
switch args.Eligibility {
Expand Down Expand Up @@ -563,6 +574,16 @@ func (n *Node) UpdateEligibility(args *structs.NodeUpdateEligibilityRequest,
return fmt.Errorf("invalid scheduling eligibility %q", args.Eligibility)
}

// Construct the node event
args.NodeEvent = structs.NewNodeEvent().SetSubsystem(structs.NodeEventSubsystemCluster)
if node.SchedulingEligibility == args.Eligibility {
return nil // Nothing to do
} else if args.Eligibility == structs.NodeSchedulingEligible {
args.NodeEvent.SetMessage(NodeEligibilityEventEligible)
} else {
args.NodeEvent.SetMessage(NodeEligibilityEventIneligible)
}

// Commit this update via Raft
outErr, index, err := n.srv.raftApply(structs.NodeUpdateEligibilityRequestType, args)
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions nomad/node_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1095,6 +1095,8 @@ func TestClientEndpoint_UpdateEligibility(t *testing.T) {
out, err := state.NodeByID(nil, node.ID)
require.Nil(err)
require.Equal(out.SchedulingEligibility, structs.NodeSchedulingIneligible)
require.Len(out.Events, 2)
require.Equal(NodeEligibilityEventIneligible, out.Events[1].Message)

// Register a system job
job := mock.SystemJob()
Expand All @@ -1107,6 +1109,11 @@ func TestClientEndpoint_UpdateEligibility(t *testing.T) {
require.NotZero(resp3.Index)
require.NotZero(resp3.EvalCreateIndex)
require.Len(resp3.EvalIDs, 1)

out, err = state.NodeByID(nil, node.ID)
require.Nil(err)
require.Len(out.Events, 3)
require.Equal(NodeEligibilityEventEligible, out.Events[2].Message)
}

func TestClientEndpoint_UpdateEligibility_ACL(t *testing.T) {
Expand Down
7 changes: 6 additions & 1 deletion nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,7 @@ func (s *StateStore) updateNodeDrainImpl(txn *memdb.Txn, index uint64, nodeID st
}

// UpdateNodeEligibility is used to update the scheduling eligibility of a node
func (s *StateStore) UpdateNodeEligibility(index uint64, nodeID string, eligibility string) error {
func (s *StateStore) UpdateNodeEligibility(index uint64, nodeID string, eligibility string, event *structs.NodeEvent) error {

txn := s.db.Txn(true)
defer txn.Abort()
Expand All @@ -706,6 +706,11 @@ func (s *StateStore) UpdateNodeEligibility(index uint64, nodeID string, eligibil
existingNode := existing.(*structs.Node)
copyNode := existingNode.Copy()

// Add the event if given
if event != nil {
appendNodeEvents(index, copyNode, []*structs.NodeEvent{event})
}

// Check if this is a valid action
if copyNode.DrainStrategy != nil && eligibility == structs.NodeSchedulingEligible {
return fmt.Errorf("can not set node's scheduling eligibility to eligible while it is draining")
Expand Down
11 changes: 9 additions & 2 deletions nomad/state/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -952,13 +952,20 @@ func TestStateStore_UpdateNodeEligibility(t *testing.T) {
t.Fatalf("bad: %v", err)
}

require.Nil(state.UpdateNodeEligibility(1001, node.ID, expectedEligibility))
event := &structs.NodeEvent{
Message: "Node marked as ineligible",
Subsystem: structs.NodeEventSubsystemCluster,
Timestamp: time.Now(),
}
require.Nil(state.UpdateNodeEligibility(1001, node.ID, expectedEligibility, event))
require.True(watchFired(ws))

ws = memdb.NewWatchSet()
out, err := state.NodeByID(ws, node.ID)
require.Nil(err)
require.Equal(out.SchedulingEligibility, expectedEligibility)
require.Len(out.Events, 2)
require.Equal(out.Events[1], event)
require.EqualValues(1001, out.ModifyIndex)

index, err := state.Index("nodes")
Expand All @@ -975,7 +982,7 @@ func TestStateStore_UpdateNodeEligibility(t *testing.T) {
require.Nil(state.UpdateNodeDrain(1002, node.ID, expectedDrain, false, nil))

// Try to set the node to eligible
err = state.UpdateNodeEligibility(1003, node.ID, structs.NodeSchedulingEligible)
err = state.UpdateNodeEligibility(1003, node.ID, structs.NodeSchedulingEligible, nil)
require.NotNil(err)
require.Contains(err.Error(), "while it is draining")
}
Expand Down
4 changes: 4 additions & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,10 @@ type DrainUpdate struct {
type NodeUpdateEligibilityRequest struct {
NodeID string
Eligibility string

// NodeEvent is the event added to the node
NodeEvent *NodeEvent

WriteRequest
}

Expand Down