Skip to content

Commit

Permalink
wip: more work on node drain metadata, needs testing
Browse files Browse the repository at this point in the history
  • Loading branch information
cgbaker committed Mar 29, 2021
1 parent b68671e commit b86fd05
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 25 deletions.
2 changes: 1 addition & 1 deletion api/event_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func TestEventStream_PayloadValue(t *testing.T) {
require.Contains(t, raw, "Node")
rawNode := raw["Node"]
require.Equal(t, n.ID, rawNode["ID"])
require.NotContains(t, rawNode, "SecretID")
require.Empty(t, rawNode["SecretID"])
}
case <-time.After(5 * time.Second):
require.Fail(t, "failed waiting for event stream event")
Expand Down
14 changes: 14 additions & 0 deletions command/agent/node_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,13 @@ func TestHTTP_NodeDrain(t *testing.T) {
DrainSpec: &api.DrainSpec{
Deadline: 10 * time.Second,
},
Meta: map[string]string{
"reason": "drain",
},
}

beforeDrain := time.Now().Add(-1 * time.Second) // handle roundoff

// Make the HTTP request
buf := encodeReq(drainReq)
req, err := http.NewRequest("POST", "/v1/node/"+node.ID+"/drain", buf)
Expand Down Expand Up @@ -292,6 +297,11 @@ func TestHTTP_NodeDrain(t *testing.T) {
require.Equal(structs.NodeSchedulingIneligible, out.SchedulingEligibility)
}

require.NotNil(out.LastDrain)
require.Equal(map[string]string{
"reason": "drain",
}, out.LastDrain.Meta)

// Make the HTTP request to unset drain
drainReq.DrainSpec = nil
buf = encodeReq(drainReq)
Expand All @@ -306,6 +316,10 @@ func TestHTTP_NodeDrain(t *testing.T) {
out, err = state.NodeByID(nil, node.ID)
require.Nil(err)
require.Nil(out.DrainStrategy)
require.NotNil(out.LastDrain)
require.False(out.LastDrain.StartedAt.Before(beforeDrain))
require.False(out.LastDrain.UpdatedAt.Before(out.LastDrain.StartedAt))
require.Equal(structs.DrainStatusCompleted, out.LastDrain.Status)
})
}

Expand Down
4 changes: 2 additions & 2 deletions nomad/drainer/watch_nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestNodeDrainWatcher_Remove(t *testing.T) {
require.Equal(n, tracked[n.ID])

// Change the node to be not draining and wait for it to be untracked
require.Nil(state.UpdateNodeDrain(structs.MsgTypeTestSetup, 101, n.ID, nil, false, 0, nil))
require.Nil(state.UpdateNodeDrain(structs.MsgTypeTestSetup, 101, n.ID, nil, false, 0, nil, nil, ""))
testutil.WaitForResult(func() (bool, error) {
return len(m.events()) == 2, nil
}, func(err error) {
Expand Down Expand Up @@ -166,7 +166,7 @@ func TestNodeDrainWatcher_Update(t *testing.T) {
// Change the node to have a new spec
s2 := n.DrainStrategy.Copy()
s2.Deadline += time.Hour
require.Nil(state.UpdateNodeDrain(structs.MsgTypeTestSetup, 101, n.ID, s2, false, 0, nil))
require.Nil(state.UpdateNodeDrain(structs.MsgTypeTestSetup, 101, n.ID, s2, false, 0, nil, nil, ""))

// Wait for it to be updated
testutil.WaitForResult(func() (bool, error) {
Expand Down
3 changes: 3 additions & 0 deletions nomad/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,8 @@ func (n *Node) UpdateDrain(args *structs.NodeUpdateDrainRequest,
}
defer metrics.MeasureSince([]string{"nomad", "client", "update_drain"}, time.Now())

n.logger.Warn("Node.UpdateDrain info", "meta", args.Meta)

// Check node write permissions
if aclObj, err := n.srv.ResolveToken(args.AuthToken); err != nil {
return err
Expand Down Expand Up @@ -801,6 +803,7 @@ func (n *Node) GetNode(args *structs.NodeSpecificRequest,

// Setup the output
if out != nil {
n.logger.Warn("Node.GetNode()", "LastDrain", out.LastDrain)
out = out.Sanitize()
reply.Node = out
reply.Index = out.ModifyIndex
Expand Down
44 changes: 22 additions & 22 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -997,70 +997,70 @@ func (s *StateStore) updateNodeDrainImpl(txn *txn, index uint64, nodeID string,

// Copy the existing node
existingNode := existing.(*structs.Node)
copyNode := existingNode.Copy()
copyNode.StatusUpdatedAt = updatedAt
updatedNode := existingNode.Copy()
updatedNode.StatusUpdatedAt = updatedAt

// Add the event if given
if event != nil {
appendNodeEvents(index, copyNode, []*structs.NodeEvent{event})
appendNodeEvents(index, updatedNode, []*structs.NodeEvent{event})
}

// Update the drain in the copy
copyNode.DrainStrategy = drain
updatedNode.DrainStrategy = drain
if drain != nil {
copyNode.SchedulingEligibility = structs.NodeSchedulingIneligible
updatedNode.SchedulingEligibility = structs.NodeSchedulingIneligible
} else if markEligible {
copyNode.SchedulingEligibility = structs.NodeSchedulingEligible
updatedNode.SchedulingEligibility = structs.NodeSchedulingEligible
}

// Update LastDrain
updateTime := time.Unix(updatedAt, 0)
// when done with this method, copyNode.LastDrain should be set
// when done with this method, updatedNode.LastDrain should be set
// this is either a new LastDrain struct or an update of the existing one
//
// if starting a new drain operation, create a new LastDrain. otherwise, update the existing one.
// if already draining and LastDrain doesn't exist, we'll need to create a new one.
// this might happen if we upgrade/transition to 1.1 during a drain operation
if existingNode.DrainStrategy == nil && copyNode.DrainStrategy != nil ||
copyNode.LastDrain == nil {
if existingNode.DrainStrategy == nil && updatedNode.DrainStrategy != nil ||
updatedNode.LastDrain == nil {

copyNode.LastDrain = &structs.DrainMetadata{
updatedNode.LastDrain = &structs.DrainMetadata{
StartedAt: updateTime,
UpdatedAt: updateTime,
AccessorID: accessorId,
Meta: drainMeta,
}
switch {
case drain != nil:
copyNode.LastDrain.Status = structs.DrainStatusDraining
case updatedNode.DrainStrategy != nil:
updatedNode.LastDrain.Status = structs.DrainStatusDraining
case drainCompleted:
copyNode.LastDrain.Status = structs.DrainStatusCompleted
updatedNode.LastDrain.Status = structs.DrainStatusCompleted
default:
copyNode.LastDrain.Status = structs.DrainStatusCancelled
updatedNode.LastDrain.Status = structs.DrainStatusCancelled
}
} else {
copyNode.LastDrain.UpdatedAt = updateTime
updatedNode.LastDrain.UpdatedAt = updateTime
if accessorId != "" {
// we won't have an accessor ID for drain complete; don't overwrite the existing one
copyNode.LastDrain.AccessorID = accessorId
updatedNode.LastDrain.AccessorID = accessorId
}
if drainMeta != nil {
// similarly, won't have metadata for drain complete; keep the existing operator-provided metadata
copyNode.LastDrain.Meta = drainMeta
updatedNode.LastDrain.Meta = drainMeta
}
if drain == nil {
if updatedNode.DrainStrategy == nil {
if drainCompleted {
copyNode.LastDrain.Status = structs.DrainStatusCompleted
updatedNode.LastDrain.Status = structs.DrainStatusCompleted
} else {
copyNode.LastDrain.Status = structs.DrainStatusCancelled
updatedNode.LastDrain.Status = structs.DrainStatusCancelled
}
}
}

copyNode.ModifyIndex = index
updatedNode.ModifyIndex = index

// Insert the node
if err := txn.Insert("nodes", copyNode); err != nil {
if err := txn.Insert("nodes", updatedNode); err != nil {
return fmt.Errorf("node update failed: %v", err)
}
if err := txn.Insert("index", &IndexEntry{"nodes", index}); err != nil {
Expand Down

0 comments on commit b86fd05

Please sign in to comment.