Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Emit Node Events for draining #4284

Merged
merged 4 commits into from
May 22, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions nomad/drainer/drainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,21 @@ const (
// NodeDeadlineCoalesceWindow is the duration in which deadlining nodes will
// be coalesced together
NodeDeadlineCoalesceWindow = 5 * time.Second

// NodeDrainEventComplete is used to indicate that the node drain is
// finished.
NodeDrainEventComplete = "Node drain complete"

// NodeDrainEventDetailDeadlined is the key to use when the drain is
// complete because a deadline. The acceptable values are "true" and "false"
NodeDrainEventDetailDeadlined = "deadline_reached"
)

// RaftApplier contains methods for applying the raft requests required by the
// NodeDrainer.
type RaftApplier interface {
AllocUpdateDesiredTransition(allocs map[string]*structs.DesiredTransition, evals []*structs.Evaluation) (uint64, error)
NodesDrainComplete(nodes []string) (uint64, error)
NodesDrainComplete(nodes []string, event *structs.NodeEvent) (uint64, error)
}

// NodeTracker is the interface to notify an object that is tracking draining
Expand Down Expand Up @@ -254,10 +262,16 @@ func (n *NodeDrainer) handleDeadlinedNodes(nodes []string) {
n.l.RUnlock()
n.batchDrainAllocs(forceStop)

// Create the node event
event := structs.NewNodeEvent().
SetSubsystem(structs.NodeEventSubsystemDrain).
SetMessage(NodeDrainEventComplete).
AddDetail(NodeDrainEventDetailDeadlined, "true")

// Submit the node transitions in a sharded form to ensure a reasonable
// Raft transaction size.
for _, nodes := range partitionIds(defaultMaxIdsPerTxn, nodes) {
if _, err := n.raft.NodesDrainComplete(nodes); err != nil {
if _, err := n.raft.NodesDrainComplete(nodes, event); err != nil {
n.logger.Printf("[ERR] nomad.drain: failed to unset drain for nodes: %v", err)
}
}
Expand Down Expand Up @@ -324,10 +338,15 @@ func (n *NodeDrainer) handleMigratedAllocs(allocs []*structs.Allocation) {
}
}

// Create the node event
event := structs.NewNodeEvent().
SetSubsystem(structs.NodeEventSubsystemDrain).
SetMessage(NodeDrainEventComplete)

// Submit the node transitions in a sharded form to ensure a reasonable
// Raft transaction size.
for _, nodes := range partitionIds(defaultMaxIdsPerTxn, done) {
if _, err := n.raft.NodesDrainComplete(nodes); err != nil {
if _, err := n.raft.NodesDrainComplete(nodes, event); err != nil {
n.logger.Printf("[ERR] nomad.drain: failed to unset drain for nodes: %v", err)
}
}
Expand Down
7 changes: 6 additions & 1 deletion nomad/drainer/watch_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,12 @@ func (n *NodeDrainer) Update(node *structs.Node) {
}
}

index, err := n.raft.NodesDrainComplete([]string{node.ID})
// Create the node event
event := structs.NewNodeEvent().
SetSubsystem(structs.NodeEventSubsystemDrain).
SetMessage(NodeDrainEventComplete)

index, err := n.raft.NodesDrainComplete([]string{node.ID}, event)
if err != nil {
n.logger.Printf("[ERR] nomad.drain: failed to unset drain for node %q: %v", node.ID, err)
} else {
Expand Down
4 changes: 2 additions & 2 deletions nomad/drainer/watch_nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func TestNodeDrainWatcher_Remove(t *testing.T) {
require.Equal(n, tracked[n.ID])

// Change the node to be not draining and wait for it to be untracked
require.Nil(state.UpdateNodeDrain(101, n.ID, nil, false))
require.Nil(state.UpdateNodeDrain(101, n.ID, nil, false, nil))
testutil.WaitForResult(func() (bool, error) {
return len(m.Events) == 2, nil
}, func(err error) {
Expand Down Expand Up @@ -175,7 +175,7 @@ func TestNodeDrainWatcher_Update(t *testing.T) {
// Change the node to have a new spec
s2 := n.DrainStrategy.Copy()
s2.Deadline += time.Hour
require.Nil(state.UpdateNodeDrain(101, n.ID, s2, false))
require.Nil(state.UpdateNodeDrain(101, n.ID, s2, false, nil))

// Wait for it to be updated
testutil.WaitForResult(func() (bool, error) {
Expand Down
47 changes: 47 additions & 0 deletions nomad/drainer_int_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/drainer"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -212,6 +213,12 @@ func TestDrainer_Simple_ServiceOnly(t *testing.T) {
}, func(err error) {
t.Fatalf("err: %v", err)
})

// Check we got the right events
node, err := state.NodeByID(nil, n1.ID)
require.NoError(err)
require.Len(node.Events, 3)
require.Equal(drainer.NodeDrainEventComplete, node.Events[2].Message)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what are the other two events expected and should they also be asserted here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be Register and Drain Enabled. I am not asserting it here since those aren't set by the node drainer system which this is testing.

}

func TestDrainer_Simple_ServiceOnly_Deadline(t *testing.T) {
Expand Down Expand Up @@ -300,6 +307,13 @@ func TestDrainer_Simple_ServiceOnly_Deadline(t *testing.T) {
}, func(err error) {
t.Fatalf("err: %v", err)
})

// Check we got the right events
node, err := state.NodeByID(nil, n1.ID)
require.NoError(err)
require.Len(node.Events, 3)
require.Equal(drainer.NodeDrainEventComplete, node.Events[2].Message)
require.Contains(node.Events[2].Details, drainer.NodeDrainEventDetailDeadlined)
}

func TestDrainer_DrainEmptyNode(t *testing.T) {
Expand Down Expand Up @@ -343,6 +357,12 @@ func TestDrainer_DrainEmptyNode(t *testing.T) {
}, func(err error) {
t.Fatalf("err: %v", err)
})

// Check we got the right events
node, err := state.NodeByID(nil, n1.ID)
require.NoError(err)
require.Len(node.Events, 3)
require.Equal(drainer.NodeDrainEventComplete, node.Events[2].Message)
}

func TestDrainer_AllTypes_Deadline(t *testing.T) {
Expand Down Expand Up @@ -500,6 +520,13 @@ func TestDrainer_AllTypes_Deadline(t *testing.T) {
}
}
require.True(serviceMax < batchMax)

// Check we got the right events
node, err := state.NodeByID(nil, n1.ID)
require.NoError(err)
require.Len(node.Events, 3)
require.Equal(drainer.NodeDrainEventComplete, node.Events[2].Message)
require.Contains(node.Events[2].Details, drainer.NodeDrainEventDetailDeadlined)
}

// Test that drain is unset when batch jobs naturally finish
Expand Down Expand Up @@ -659,6 +686,12 @@ func TestDrainer_AllTypes_NoDeadline(t *testing.T) {
}, func(err error) {
t.Fatalf("err: %v", err)
})

// Check we got the right events
node, err := state.NodeByID(nil, n1.ID)
require.NoError(err)
require.Len(node.Events, 3)
require.Equal(drainer.NodeDrainEventComplete, node.Events[2].Message)
}

func TestDrainer_AllTypes_Deadline_GarbageCollectedNode(t *testing.T) {
Expand Down Expand Up @@ -824,6 +857,13 @@ func TestDrainer_AllTypes_Deadline_GarbageCollectedNode(t *testing.T) {
}, func(err error) {
t.Fatalf("err: %v", err)
})

// Check we got the right events
node, err := state.NodeByID(nil, n1.ID)
require.NoError(err)
require.Len(node.Events, 3)
require.Equal(drainer.NodeDrainEventComplete, node.Events[2].Message)
require.Contains(node.Events[2].Details, drainer.NodeDrainEventDetailDeadlined)
}

// Test that transitions to force drain work.
Expand Down Expand Up @@ -962,6 +1002,13 @@ func TestDrainer_Batch_TransitionToForce(t *testing.T) {
}, func(err error) {
t.Fatalf("err: %v", err)
})

// Check we got the right events
node, err := state.NodeByID(nil, n1.ID)
require.NoError(err)
require.Len(node.Events, 4)
require.Equal(drainer.NodeDrainEventComplete, node.Events[3].Message)
require.Contains(node.Events[3].Details, drainer.NodeDrainEventDetailDeadlined)
})
}
}
6 changes: 5 additions & 1 deletion nomad/drainer_shims.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,19 @@ type drainerShim struct {
s *Server
}

func (d drainerShim) NodesDrainComplete(nodes []string) (uint64, error) {
func (d drainerShim) NodesDrainComplete(nodes []string, event *structs.NodeEvent) (uint64, error) {
args := &structs.BatchNodeUpdateDrainRequest{
Updates: make(map[string]*structs.DrainUpdate, len(nodes)),
NodeEvents: make(map[string]*structs.NodeEvent, len(nodes)),
WriteRequest: structs.WriteRequest{Region: d.s.config.Region},
}

update := &structs.DrainUpdate{}
for _, node := range nodes {
args.Updates[node] = update
if event != nil {
args.NodeEvents[node] = event
}
}

resp, index, err := d.s.raftApply(structs.BatchNodeUpdateDrainRequestType, args)
Expand Down
4 changes: 2 additions & 2 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ func (n *nomadFSM) applyDrainUpdate(buf []byte, index uint64) interface{} {
}
}

if err := n.state.UpdateNodeDrain(index, req.NodeID, req.DrainStrategy, req.MarkEligible); err != nil {
if err := n.state.UpdateNodeDrain(index, req.NodeID, req.DrainStrategy, req.MarkEligible, req.NodeEvent); err != nil {
n.logger.Printf("[ERR] nomad.fsm: UpdateNodeDrain failed: %v", err)
return err
}
Expand All @@ -363,7 +363,7 @@ func (n *nomadFSM) applyBatchDrainUpdate(buf []byte, index uint64) interface{} {
panic(fmt.Errorf("failed to decode request: %v", err))
}

if err := n.state.BatchUpdateNodeDrain(index, req.Updates); err != nil {
if err := n.state.BatchUpdateNodeDrain(index, req.Updates, req.NodeEvents); err != nil {
n.logger.Printf("[ERR] nomad.fsm: BatchUpdateNodeDrain failed: %v", err)
return err
}
Expand Down
15 changes: 15 additions & 0 deletions nomad/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,12 +327,20 @@ func TestFSM_BatchUpdateNodeDrain(t *testing.T) {
Deadline: 10 * time.Second,
},
}
event := &structs.NodeEvent{
Message: "Drain strategy enabled",
Subsystem: structs.NodeEventSubsystemDrain,
Timestamp: time.Now(),
}
req2 := structs.BatchNodeUpdateDrainRequest{
Updates: map[string]*structs.DrainUpdate{
node.ID: {
DrainStrategy: strategy,
},
},
NodeEvents: map[string]*structs.NodeEvent{
node.ID: event,
},
}
buf, err = structs.Encode(structs.BatchNodeUpdateDrainRequestType, req2)
require.Nil(err)
Expand All @@ -346,6 +354,7 @@ func TestFSM_BatchUpdateNodeDrain(t *testing.T) {
require.Nil(err)
require.True(node.Drain)
require.Equal(node.DrainStrategy, strategy)
require.Len(node.Events, 2)
}

func TestFSM_UpdateNodeDrain(t *testing.T) {
Expand All @@ -371,6 +380,11 @@ func TestFSM_UpdateNodeDrain(t *testing.T) {
req2 := structs.NodeUpdateDrainRequest{
NodeID: node.ID,
DrainStrategy: strategy,
NodeEvent: &structs.NodeEvent{
Message: "Drain strategy enabled",
Subsystem: structs.NodeEventSubsystemDrain,
Timestamp: time.Now(),
},
}
buf, err = structs.Encode(structs.NodeUpdateDrainRequestType, req2)
require.Nil(err)
Expand All @@ -384,6 +398,7 @@ func TestFSM_UpdateNodeDrain(t *testing.T) {
require.Nil(err)
require.True(node.Drain)
require.Equal(node.DrainStrategy, strategy)
require.Len(node.Events, 2)
}

func TestFSM_UpdateNodeDrain_Pre08_Compatibility(t *testing.T) {
Expand Down
20 changes: 20 additions & 0 deletions nomad/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ const (
// maxParallelRequestsPerDerive is the maximum number of parallel Vault
// create token requests that may be outstanding per derive request
maxParallelRequestsPerDerive = 16

// NodeDrainEvents are the various drain messages
NodeDrainEventDrainSet = "Node drain strategy set"
NodeDrainEventDrainDisabled = "Node drain disabled"
NodeDrainEventDrainUpdated = "Node drain stategy updated"
)

// Node endpoint is used for client interactions
Expand Down Expand Up @@ -439,6 +444,9 @@ func (n *Node) UpdateDrain(args *structs.NodeUpdateDrainRequest,
if args.NodeID == "" {
return fmt.Errorf("missing node ID for drain update")
}
if args.NodeEvent != nil {
return fmt.Errorf("node event must not be set")
}

// Look for the node
snap, err := n.srv.fsm.State().Snapshot()
Expand Down Expand Up @@ -468,6 +476,18 @@ func (n *Node) UpdateDrain(args *structs.NodeUpdateDrainRequest,
args.DrainStrategy.ForceDeadline = time.Now().Add(args.DrainStrategy.Deadline)
}

// Construct the node event
args.NodeEvent = structs.NewNodeEvent().SetSubsystem(structs.NodeEventSubsystemDrain)
if node.DrainStrategy == nil && args.DrainStrategy == nil {
return nil // Nothing to do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this go to line 492 where you apply the node update raft transaction? If there is no node drain event, seems like it should still apply the node update without the event.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I made this skip the raft transaction since it is a no-op. It is essentially saying, please update the node to have no drain strategy even when the node already doesn't have one.

} else if node.DrainStrategy == nil && args.DrainStrategy != nil {
args.NodeEvent.SetMessage(NodeDrainEventDrainSet)
} else if node.DrainStrategy != nil && args.DrainStrategy != nil {
args.NodeEvent.SetMessage(NodeDrainEventDrainUpdated)
} else if node.DrainStrategy != nil && args.DrainStrategy == nil {
args.NodeEvent.SetMessage(NodeDrainEventDrainDisabled)
}

// Commit this update via Raft
_, index, err := n.srv.raftApply(structs.NodeUpdateDrainRequestType, args)
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion nomad/node_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,8 @@ func TestClientEndpoint_UpdateDrain(t *testing.T) {
require.Nil(err)
require.True(out.Drain)
require.Equal(strategy.Deadline, out.DrainStrategy.Deadline)
require.Len(out.Events, 2)
require.Equal(NodeDrainEventDrainSet, out.Events[1].Message)

// before+deadline should be before the forced deadline
require.True(beforeUpdate.Add(strategy.Deadline).Before(out.DrainStrategy.ForceDeadline))
Expand Down Expand Up @@ -2587,7 +2589,7 @@ func TestClientEndpoint_ListNodes_Blocking(t *testing.T) {
Deadline: 10 * time.Second,
},
}
errCh <- state.UpdateNodeDrain(3, node.ID, s, false)
errCh <- state.UpdateNodeDrain(3, node.ID, s, false, nil)
})

req.MinQueryIndex = 2
Expand Down
Loading