Skip to content

Commit

Permalink
removed deprecated fields from Drain structs and API
Browse files Browse the repository at this point in the history
node drain: use msgtype on txn so that events are emitted
wip: encoding extension to add Node.Drain field back to API responses
  • Loading branch information
cgbaker committed Mar 19, 2021
1 parent 7c75696 commit 3a0e029
Show file tree
Hide file tree
Showing 17 changed files with 93 additions and 212 deletions.
22 changes: 7 additions & 15 deletions api/nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,7 @@ func TestNodes_ToggleDrain(t *testing.T) {
// Check for drain mode
out, _, err := nodes.Info(nodeID, nil)
require.Nil(err)
if out.Drain {
t.Fatalf("drain mode should be off")
}
require.False(out.Drain)

// Toggle it on
spec := &DrainSpec{
Expand All @@ -221,9 +219,9 @@ func TestNodes_ToggleDrain(t *testing.T) {
// Check again
out, _, err = nodes.Info(nodeID, nil)
require.Nil(err)
if out.SchedulingEligibility != NodeSchedulingIneligible {
t.Fatalf("bad eligibility: %v vs %v", out.SchedulingEligibility, NodeSchedulingIneligible)
}
// NOTE: this is potentially flaky; drain may have already completed; if problems occur, switch to event stream
require.True(out.Drain)
require.Equal(NodeSchedulingIneligible, out.SchedulingEligibility)

// Toggle off again
drainOut, err = nodes.UpdateDrain(nodeID, nil, true, nil)
Expand All @@ -233,15 +231,9 @@ func TestNodes_ToggleDrain(t *testing.T) {
// Check again
out, _, err = nodes.Info(nodeID, nil)
require.Nil(err)
if out.Drain {
t.Fatalf("drain mode should be off")
}
if out.DrainStrategy != nil {
t.Fatalf("drain strategy should be unset")
}
if out.SchedulingEligibility != NodeSchedulingEligible {
t.Fatalf("should be eligible")
}
require.False(out.Drain)
require.Nil(out.DrainStrategy)
require.Equal(NodeSchedulingEligible, out.SchedulingEligibility)
}

func TestNodes_ToggleEligibility(t *testing.T) {
Expand Down
7 changes: 4 additions & 3 deletions command/agent/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ import (
"github.com/hashicorp/go-connlimit"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-msgpack/codec"
"github.com/rs/cors"

"github.com/hashicorp/nomad/helper/noxssrw"
"github.com/hashicorp/nomad/helper/tlsutil"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/rs/cors"
)

const (
Expand Down Expand Up @@ -494,13 +495,13 @@ func (s *HTTPServer) wrap(handler func(resp http.ResponseWriter, req *http.Reque
if obj != nil {
var buf bytes.Buffer
if prettyPrint {
enc := codec.NewEncoder(&buf, structs.JsonHandlePretty)
enc := codec.NewEncoder(&buf, registerExtensions(structs.JsonHandlePretty))
err = enc.Encode(obj)
if err == nil {
buf.Write([]byte("\n"))
}
} else {
enc := codec.NewEncoder(&buf, structs.JsonHandle)
enc := codec.NewEncoder(&buf, registerExtensions(structs.JsonHandle))
err = enc.Encode(obj)
}
if err != nil {
Expand Down
37 changes: 37 additions & 0 deletions command/agent/json_encoding.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package agent

import (
"reflect"

"github.com/hashicorp/go-msgpack/codec"

"github.com/hashicorp/nomad/nomad/structs"
)

// Special encoding for structs.Node, to perform the following:
// 1. ensure that Node.SecretID is zeroed out
// 2. provide backwards compatibility for the following fields:
// * Node.Drain
type nodesExt struct{}

// ConvertExt converts a structs.Node to an anonymous struct with the extra field, Drain
func (n nodesExt) ConvertExt(v interface{}) interface{} {
node := v.(*structs.Node)
copy := node.Copy()
copy.SecretID = ""
return struct {
*structs.Node
Drain bool
}{
Node: copy,
Drain: node.DrainStrategy != nil,
}
}

// UpdateExt is not used
func (n nodesExt) UpdateExt(_ interface{}, _ interface{}) {}

func registerExtensions(h *codec.JsonHandle) *codec.JsonHandle {
h.SetInterfaceExt(reflect.TypeOf(structs.Node{}), 1, nodesExt{})
return h
}
30 changes: 3 additions & 27 deletions command/agent/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ package agent

import (
"net/http"
"strconv"
"strings"
"time"

"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -119,31 +117,9 @@ func (s *HTTPServer) nodeToggleDrain(resp http.ResponseWriter, req *http.Request

var drainRequest api.NodeUpdateDrainRequest

// COMPAT: Remove in 0.10. Allow the old style enable query param.
// Get the enable parameter
enableRaw := req.URL.Query().Get("enable")
var enable bool
if enableRaw != "" {
var err error
enable, err = strconv.ParseBool(enableRaw)
if err != nil {
return nil, CodedError(400, "invalid enable value")
}

// Use the force drain to have it keep the same behavior as old clients.
if enable {
drainRequest.DrainSpec = &api.DrainSpec{
Deadline: -1 * time.Second,
}
} else {
// If drain is disabled on an old client, mark the node as eligible for backwards compatibility
drainRequest.MarkEligible = true
}
} else {
err := decodeBody(req, &drainRequest)
if err != nil {
return nil, CodedError(400, err.Error())
}
err := decodeBody(req, &drainRequest)
if err != nil {
return nil, CodedError(400, err.Error())
}

args := structs.NodeUpdateDrainRequest{
Expand Down
14 changes: 0 additions & 14 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,20 +429,6 @@ func (n *nomadFSM) applyDrainUpdate(reqType structs.MessageType, buf []byte, ind
panic(fmt.Errorf("failed to decode request: %v", err))
}

// COMPAT Remove in version 0.10
// As part of Nomad 0.8 we have deprecated the drain boolean in favor of a
// drain strategy but we need to handle the upgrade path where the Raft log
// contains drain updates with just the drain boolean being manipulated.
if req.Drain && req.DrainStrategy == nil {
// Mark the drain strategy as a force to imitate the old style drain
// functionality.
req.DrainStrategy = &structs.DrainStrategy{
DrainSpec: structs.DrainSpec{
Deadline: -1 * time.Second,
},
}
}

if err := n.state.UpdateNodeDrain(reqType, index, req.NodeID, req.DrainStrategy, req.MarkEligible, req.UpdatedAt, req.NodeEvent); err != nil {
n.logger.Error("UpdateNodeDrain failed", "error", err)
return err
Expand Down
86 changes: 5 additions & 81 deletions nomad/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,35 +180,6 @@ func TestFSM_UpsertNode(t *testing.T) {

}

func TestFSM_UpsertNode_Canonicalize(t *testing.T) {
t.Parallel()
require := require.New(t)

fsm := testFSM(t)
fsm.blockedEvals.SetEnabled(true)

// Setup a node without eligibility
node := mock.Node()
node.SchedulingEligibility = ""

req := structs.NodeRegisterRequest{
Node: node,
}
buf, err := structs.Encode(structs.NodeRegisterRequestType, req)
require.Nil(err)

resp := fsm.Apply(makeLog(buf))
require.Nil(resp)

// Verify we are registered
ws := memdb.NewWatchSet()
n, err := fsm.State().NodeByID(ws, req.Node.ID)
require.Nil(err)
require.NotNil(n)
require.EqualValues(1, n.CreateIndex)
require.Equal(structs.NodeSchedulingEligible, n.SchedulingEligibility)
}

func TestFSM_DeregisterNode(t *testing.T) {
t.Parallel()
fsm := testFSM(t)
Expand Down Expand Up @@ -353,7 +324,6 @@ func TestFSM_BatchUpdateNodeDrain(t *testing.T) {
ws := memdb.NewWatchSet()
node, err = fsm.State().NodeByID(ws, req.Node.ID)
require.Nil(err)
require.True(node.Drain)
require.Equal(node.DrainStrategy, strategy)
require.Len(node.Events, 2)
}
Expand Down Expand Up @@ -397,46 +367,10 @@ func TestFSM_UpdateNodeDrain(t *testing.T) {
ws := memdb.NewWatchSet()
node, err = fsm.State().NodeByID(ws, req.Node.ID)
require.Nil(err)
require.True(node.Drain)
require.Equal(node.DrainStrategy, strategy)
require.Len(node.Events, 2)
}

func TestFSM_UpdateNodeDrain_Pre08_Compatibility(t *testing.T) {
t.Parallel()
require := require.New(t)
fsm := testFSM(t)

// Force a node into the state store without eligiblity
node := mock.Node()
node.SchedulingEligibility = ""
require.Nil(fsm.State().UpsertNode(structs.MsgTypeTestSetup, 1, node))

// Do an old style drain
req := structs.NodeUpdateDrainRequest{
NodeID: node.ID,
Drain: true,
}
buf, err := structs.Encode(structs.NodeUpdateDrainRequestType, req)
require.Nil(err)

resp := fsm.Apply(makeLog(buf))
require.Nil(resp)

// Verify we have upgraded to a force drain
ws := memdb.NewWatchSet()
node, err = fsm.State().NodeByID(ws, req.NodeID)
require.Nil(err)
require.True(node.Drain)

expected := &structs.DrainStrategy{
DrainSpec: structs.DrainSpec{
Deadline: -1 * time.Second,
},
}
require.Equal(expected, node.DrainStrategy)
}

func TestFSM_UpdateNodeEligibility(t *testing.T) {
t.Parallel()
require := require.New(t)
Expand Down Expand Up @@ -2495,25 +2429,15 @@ func TestFSM_SnapshotRestore_Nodes(t *testing.T) {
// Add some state
fsm := testFSM(t)
state := fsm.State()
node1 := mock.Node()
state.UpsertNode(structs.MsgTypeTestSetup, 1000, node1)

// Upgrade this node
node2 := mock.Node()
node2.SchedulingEligibility = ""
state.UpsertNode(structs.MsgTypeTestSetup, 1001, node2)
node := mock.Node()
state.UpsertNode(structs.MsgTypeTestSetup, 1000, node)

// Verify the contents
fsm2 := testSnapshotRestore(t, fsm)
state2 := fsm2.State()
out1, _ := state2.NodeByID(nil, node1.ID)
out2, _ := state2.NodeByID(nil, node2.ID)
node2.SchedulingEligibility = structs.NodeSchedulingEligible
if !reflect.DeepEqual(node1, out1) {
t.Fatalf("bad: \n%#v\n%#v", out1, node1)
}
if !reflect.DeepEqual(node2, out2) {
t.Fatalf("bad: \n%#v\n%#v", out2, node2)
out, _ := state2.NodeByID(nil, node.ID)
if !reflect.DeepEqual(node, out) {
t.Fatalf("bad: \n%#v\n%#v", out, node)
}
}

Expand Down
14 changes: 1 addition & 13 deletions nomad/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,16 +548,6 @@ func (n *Node) UpdateDrain(args *structs.NodeUpdateDrainRequest,
// Update the timestamp of when the node status was updated
args.UpdatedAt = now.Unix()

// COMPAT: Remove in 0.9. Attempt to upgrade the request if it is of the old
// format.
if args.Drain && args.DrainStrategy == nil {
args.DrainStrategy = &structs.DrainStrategy{
DrainSpec: structs.DrainSpec{
Deadline: -1 * time.Second, // Force drain
},
}
}

// Setup drain strategy
if args.DrainStrategy != nil {
// Mark start time for the drain
Expand Down Expand Up @@ -811,9 +801,7 @@ func (n *Node) GetNode(args *structs.NodeSpecificRequest,

// Setup the output
if out != nil {
// Clear the secret ID
reply.Node = out.Copy()
reply.Node.SecretID = ""
reply.Node = out
reply.Index = out.ModifyIndex
} else {
// Use the last index that affected the nodes table
Expand Down
10 changes: 7 additions & 3 deletions nomad/node_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -903,7 +903,10 @@ func TestClientEndpoint_UpdateDrain(t *testing.T) {
dereg := &structs.NodeUpdateDrainRequest{
NodeID: node.ID,
DrainStrategy: strategy,
WriteRequest: structs.WriteRequest{Region: "global"},
Meta: map[string]string{
"message": "this node looks funny",
},
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp2 structs.NodeDrainUpdateResponse
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", dereg, &resp2))
Expand All @@ -914,7 +917,7 @@ func TestClientEndpoint_UpdateDrain(t *testing.T) {
ws := memdb.NewWatchSet()
out, err := state.NodeByID(ws, node.ID)
require.Nil(err)
require.True(out.Drain)
require.NotNil(out.DrainStrategy)
require.Equal(strategy.Deadline, out.DrainStrategy.Deadline)
require.Len(out.Events, 2)
require.Equal(NodeDrainEventDrainSet, out.Events[1].Message)
Expand Down Expand Up @@ -1006,6 +1009,8 @@ func TestClientEndpoint_UpdateDrain_ACL(t *testing.T) {
{
var resp structs.NodeDrainUpdateResponse
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", dereg, &resp), "RPC")
_, err := state.NodeByID(nil, node.ID)
require.NoError(err)
}

// Try with a invalid token
Expand Down Expand Up @@ -1314,7 +1319,6 @@ func TestClientEndpoint_GetNode(t *testing.T) {

// Update the status updated at value
node.StatusUpdatedAt = resp2.Node.StatusUpdatedAt
node.SecretID = ""
node.Events = resp2.Node.Events
if !reflect.DeepEqual(node, resp2.Node) {
t.Fatalf("bad: %#v \n %#v", node, resp2.Node)
Expand Down
2 changes: 1 addition & 1 deletion nomad/plan_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,7 @@ func evaluateNodePlan(snap *state.StateSnapshot, plan *structs.Plan, nodeID stri
return false, "node is not ready for placements", nil
} else if node.SchedulingEligibility == structs.NodeSchedulingIneligible {
return false, "node is not eligible for draining", nil
} else if node.Drain {
} else if node.DrainStrategy != nil {
// Deprecate in favor of scheduling eligibility and remove post-0.8
return false, "node is draining", nil
}
Expand Down
7 changes: 6 additions & 1 deletion nomad/plan_apply_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,12 @@ func TestPlanApply_EvalNodePlan_NodeDrain(t *testing.T) {
t.Parallel()
state := testStateStore(t)
node := mock.Node()
node.Drain = true
node.DrainStrategy = &structs.DrainStrategy{
DrainSpec: structs.DrainSpec{
Deadline: 0,
IgnoreSystemJobs: false,
},
}
state.UpsertNode(structs.MsgTypeTestSetup, 1000, node)
snap, _ := state.Snapshot()

Expand Down
Loading

0 comments on commit 3a0e029

Please sign in to comment.