From 3a2829b4fc24f02e2435c0126b56a799e592c2d7 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Fri, 11 May 2018 14:32:34 -0700 Subject: [PATCH 1/2] Emit events based on eligibility --- nomad/fsm.go | 2 +- nomad/fsm_test.go | 9 +++++++++ nomad/node_endpoint.go | 21 +++++++++++++++++++++ nomad/node_endpoint_test.go | 7 +++++++ nomad/state/state_store.go | 7 ++++++- nomad/state/state_store_test.go | 11 +++++++++-- nomad/structs/structs.go | 4 ++++ 7 files changed, 57 insertions(+), 4 deletions(-) diff --git a/nomad/fsm.go b/nomad/fsm.go index 9453a6d5a56d..abf858af2791 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -384,7 +384,7 @@ func (n *nomadFSM) applyNodeEligibilityUpdate(buf []byte, index uint64) interfac return err } - if err := n.state.UpdateNodeEligibility(index, req.NodeID, req.Eligibility); err != nil { + if err := n.state.UpdateNodeEligibility(index, req.NodeID, req.Eligibility, req.NodeEvent); err != nil { n.logger.Printf("[ERR] nomad.fsm: UpdateNodeEligibility failed: %v", err) return err } diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index d2b65b6b020c..2440b50075a5 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -451,10 +451,17 @@ func TestFSM_UpdateNodeEligibility(t *testing.T) { resp := fsm.Apply(makeLog(buf)) require.Nil(resp) + event := &structs.NodeEvent{ + Message: "Node marked as ineligible", + Subsystem: structs.NodeEventSubsystemCluster, + Timestamp: time.Now(), + } + // Set the eligibility req2 := structs.NodeUpdateEligibilityRequest{ NodeID: node.ID, Eligibility: structs.NodeSchedulingIneligible, + NodeEvent: event, } buf, err = structs.Encode(structs.NodeUpdateEligibilityRequestType, req2) require.Nil(err) @@ -466,6 +473,8 @@ func TestFSM_UpdateNodeEligibility(t *testing.T) { node, err = fsm.State().NodeByID(nil, req.Node.ID) require.Nil(err) require.Equal(node.SchedulingEligibility, structs.NodeSchedulingIneligible) + require.Len(node.Events, 2) + require.Equal(event.Message, node.Events[1].Message) // Update the drain strategy := &structs.DrainStrategy{ diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index 675b7116e616..4822b4a76400 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -32,6 +32,14 @@ const ( NodeDrainEventDrainSet = "Node drain strategy set" NodeDrainEventDrainDisabled = "Node drain disabled" NodeDrainEventDrainUpdated = "Node drain stategy updated" + + // NodeEligibilityEventEligible is used when the nodes eligiblity is marked + // eligible + NodeEligibilityEventEligible = "Node marked as eligible for scheduling" + + // NodeEligibilityEventIneligible is used when the nodes eligiblity is marked + // ineligible + NodeEligibilityEventIneligible = "Node marked as ineligible for scheduling" ) // Node endpoint is used for client interactions @@ -532,6 +540,9 @@ func (n *Node) UpdateEligibility(args *structs.NodeUpdateEligibilityRequest, if args.NodeID == "" { return fmt.Errorf("missing node ID for setting scheduling eligibility") } + if args.NodeEvent != nil { + return fmt.Errorf("node event may not be set") + } // Check that only allowed types are set switch args.Eligibility { @@ -563,6 +574,16 @@ func (n *Node) UpdateEligibility(args *structs.NodeUpdateEligibilityRequest, return fmt.Errorf("invalid scheduling eligibility %q", args.Eligibility) } + // Construct the node event + args.NodeEvent = structs.NewNodeEvent().SetSubsystem(structs.NodeEventSubsystemCluster) + if node.SchedulingEligibility == args.Eligibility { + return nil // Nothing to do + } else if args.Eligibility == structs.NodeSchedulingEligible { + args.NodeEvent.SetMessage(NodeEligibilityEventEligible) + } else { + args.NodeEvent.SetMessage(NodeEligibilityEventIneligible) + } + // Commit this update via Raft outErr, index, err := n.srv.raftApply(structs.NodeUpdateEligibilityRequestType, args) if err != nil { diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 073110f7ca22..7d2616a502e9 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -1095,6 +1095,8 @@ func TestClientEndpoint_UpdateEligibility(t *testing.T) { out, err := state.NodeByID(nil, node.ID) require.Nil(err) require.Equal(out.SchedulingEligibility, structs.NodeSchedulingIneligible) + require.Len(out.Events, 2) + require.Equal(NodeEligibilityEventIneligible, out.Events[1].Message) // Register a system job job := mock.SystemJob() @@ -1107,6 +1109,11 @@ func TestClientEndpoint_UpdateEligibility(t *testing.T) { require.NotZero(resp3.Index) require.NotZero(resp3.EvalCreateIndex) require.Len(resp3.EvalIDs, 1) + + out, err = state.NodeByID(nil, node.ID) + require.Nil(err) + require.Len(out.Events, 3) + require.Equal(NodeEligibilityEventEligible, out.Events[2].Message) } func TestClientEndpoint_UpdateEligibility_ACL(t *testing.T) { diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index cb257b56e7f8..142b63b9ac3e 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -688,7 +688,7 @@ func (s *StateStore) updateNodeDrainImpl(txn *memdb.Txn, index uint64, nodeID st } // UpdateNodeEligibility is used to update the scheduling eligibility of a node -func (s *StateStore) UpdateNodeEligibility(index uint64, nodeID string, eligibility string) error { +func (s *StateStore) UpdateNodeEligibility(index uint64, nodeID string, eligibility string, event *structs.NodeEvent) error { txn := s.db.Txn(true) defer txn.Abort() @@ -706,6 +706,11 @@ func (s *StateStore) UpdateNodeEligibility(index uint64, nodeID string, eligibil existingNode := existing.(*structs.Node) copyNode := existingNode.Copy() + // Add the event if given + if event != nil { + appendNodeEvents(index, copyNode, []*structs.NodeEvent{event}) + } + // Check if this is a valid action if copyNode.DrainStrategy != nil && eligibility == structs.NodeSchedulingEligible { return fmt.Errorf("can not set node's scheduling eligibility to eligible while it is draining") diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 1a6c66de90b6..ec9e76076b85 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -952,13 +952,20 @@ func TestStateStore_UpdateNodeEligibility(t *testing.T) { t.Fatalf("bad: %v", err) } - require.Nil(state.UpdateNodeEligibility(1001, node.ID, expectedEligibility)) + event := &structs.NodeEvent{ + Message: "Node marked as ineligible", + Subsystem: structs.NodeEventSubsystemCluster, + Timestamp: time.Now(), + } + require.Nil(state.UpdateNodeEligibility(1001, node.ID, expectedEligibility, event)) require.True(watchFired(ws)) ws = memdb.NewWatchSet() out, err := state.NodeByID(ws, node.ID) require.Nil(err) require.Equal(out.SchedulingEligibility, expectedEligibility) + require.Len(out.Events, 2) + require.Equal(out.Events[1], event) require.EqualValues(1001, out.ModifyIndex) index, err := state.Index("nodes") @@ -975,7 +982,7 @@ func TestStateStore_UpdateNodeEligibility(t *testing.T) { require.Nil(state.UpdateNodeDrain(1002, node.ID, expectedDrain, false, nil)) // Try to set the node to eligible - err = state.UpdateNodeEligibility(1003, node.ID, structs.NodeSchedulingEligible) + err = state.UpdateNodeEligibility(1003, node.ID, structs.NodeSchedulingEligible, nil) require.NotNil(err) require.Contains(err.Error(), "while it is draining") } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 77610cd0b551..dee23e42ba4c 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -376,6 +376,10 @@ type DrainUpdate struct { type NodeUpdateEligibilityRequest struct { NodeID string Eligibility string + + // NodeEvent is the event added to the node + NodeEvent *NodeEvent + WriteRequest } From adb58086c55bd5ce6a2fa0240976042ab48dc4ee Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Tue, 22 May 2018 14:02:44 -0700 Subject: [PATCH 2/2] update error message --- nomad/node_endpoint.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index 4822b4a76400..65b9374392f1 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -541,7 +541,7 @@ func (n *Node) UpdateEligibility(args *structs.NodeUpdateEligibilityRequest, return fmt.Errorf("missing node ID for setting scheduling eligibility") } if args.NodeEvent != nil { - return fmt.Errorf("node event may not be set") + return fmt.Errorf("node event must not be set") } // Check that only allowed types are set