From 93d5187e7be4ed9b8f3e1dcb3104d0cab9cbb7cb Mon Sep 17 00:00:00 2001 From: Chris Baker <1675087+cgbaker@users.noreply.github.com> Date: Thu, 11 Feb 2021 15:40:59 +0000 Subject: [PATCH 01/15] removed deprecated fields from Drain structs and API node drain: use msgtype on txn so that events are emitted wip: encoding extension to add Node.Drain field back to API responses new approach for hiding Node.SecretID in the API, using `json` tag documented this approach in the contributing guide refactored the JSON handlers with extensions modified event stream encoding to use the go-msgpack encoders with the extensions --- api/nodes_test.go | 22 +++----- command/agent/http.go | 5 +- command/agent/node_endpoint.go | 30 +--------- command/agent/node_endpoint_test.go | 7 +-- contributing/checklist-jobspec.md | 4 +- nomad/fsm.go | 14 ----- nomad/fsm_test.go | 86 ++--------------------------- nomad/mock/mock.go | 8 +++ nomad/node_endpoint.go | 14 +---- nomad/node_endpoint_test.go | 3 +- nomad/plan_apply.go | 2 +- nomad/plan_apply_test.go | 7 ++- nomad/state/events.go | 16 ++---- nomad/state/events_test.go | 10 ++-- nomad/state/state_store.go | 8 +-- nomad/state/state_store_test.go | 3 - nomad/stream/ndjson.go | 10 +++- nomad/structs/json_encoding.go | 36 ++++++++++++ nomad/structs/structs.go | 51 +++++++---------- nomad/structs/structs_test.go | 6 +- scheduler/generic_sched_test.go | 23 +++----- scheduler/reconcile_test.go | 16 ++---- scheduler/reconcile_util_test.go | 5 +- scheduler/system_sched_test.go | 9 +-- scheduler/util.go | 4 +- scheduler/util_test.go | 12 ++-- 26 files changed, 147 insertions(+), 264 deletions(-) create mode 100644 nomad/structs/json_encoding.go diff --git a/api/nodes_test.go b/api/nodes_test.go index a1d41cda2d11..1b3835e662e9 100644 --- a/api/nodes_test.go +++ b/api/nodes_test.go @@ -206,9 +206,7 @@ func TestNodes_ToggleDrain(t *testing.T) { // Check for drain mode out, _, err := nodes.Info(nodeID, nil) require.Nil(err) - if out.Drain { - t.Fatalf("drain mode should be off") - } + require.False(out.Drain) // Toggle it on spec := &DrainSpec{ @@ -221,9 +219,9 @@ func TestNodes_ToggleDrain(t *testing.T) { // Check again out, _, err = nodes.Info(nodeID, nil) require.Nil(err) - if out.SchedulingEligibility != NodeSchedulingIneligible { - t.Fatalf("bad eligibility: %v vs %v", out.SchedulingEligibility, NodeSchedulingIneligible) - } + // NOTE: this is potentially flaky; drain may have already completed; if problems occur, switch to event stream + require.True(out.Drain) + require.Equal(NodeSchedulingIneligible, out.SchedulingEligibility) // Toggle off again drainOut, err = nodes.UpdateDrain(nodeID, nil, true, nil) @@ -233,15 +231,9 @@ func TestNodes_ToggleDrain(t *testing.T) { // Check again out, _, err = nodes.Info(nodeID, nil) require.Nil(err) - if out.Drain { - t.Fatalf("drain mode should be off") - } - if out.DrainStrategy != nil { - t.Fatalf("drain strategy should be unset") - } - if out.SchedulingEligibility != NodeSchedulingEligible { - t.Fatalf("should be eligible") - } + require.False(out.Drain) + require.Nil(out.DrainStrategy) + require.Equal(NodeSchedulingEligible, out.SchedulingEligibility) } func TestNodes_ToggleEligibility(t *testing.T) { diff --git a/command/agent/http.go b/command/agent/http.go index 8853a0e814e8..320a4518a233 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -20,10 +20,11 @@ import ( "github.com/hashicorp/go-connlimit" log "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-msgpack/codec" + "github.com/rs/cors" + "github.com/hashicorp/nomad/helper/noxssrw" "github.com/hashicorp/nomad/helper/tlsutil" "github.com/hashicorp/nomad/nomad/structs" - "github.com/rs/cors" ) const ( @@ -500,7 +501,7 @@ func (s *HTTPServer) wrap(handler func(resp http.ResponseWriter, req *http.Reque buf.Write([]byte("\n")) } } else { - enc := codec.NewEncoder(&buf, structs.JsonHandle) + enc := codec.NewEncoder(&buf, structs.JsonHandleWithExtensions) err = enc.Encode(obj) } if err != nil { diff --git a/command/agent/node_endpoint.go b/command/agent/node_endpoint.go index 6498151a9422..c5ff81d8156c 100644 --- a/command/agent/node_endpoint.go +++ b/command/agent/node_endpoint.go @@ -2,9 +2,7 @@ package agent import ( "net/http" - "strconv" "strings" - "time" "github.com/hashicorp/nomad/api" "github.com/hashicorp/nomad/nomad/structs" @@ -119,31 +117,9 @@ func (s *HTTPServer) nodeToggleDrain(resp http.ResponseWriter, req *http.Request var drainRequest api.NodeUpdateDrainRequest - // COMPAT: Remove in 0.10. Allow the old style enable query param. - // Get the enable parameter - enableRaw := req.URL.Query().Get("enable") - var enable bool - if enableRaw != "" { - var err error - enable, err = strconv.ParseBool(enableRaw) - if err != nil { - return nil, CodedError(400, "invalid enable value") - } - - // Use the force drain to have it keep the same behavior as old clients. - if enable { - drainRequest.DrainSpec = &api.DrainSpec{ - Deadline: -1 * time.Second, - } - } else { - // If drain is disabled on an old client, mark the node as eligible for backwards compatibility - drainRequest.MarkEligible = true - } - } else { - err := decodeBody(req, &drainRequest) - if err != nil { - return nil, CodedError(400, err.Error()) - } + err := decodeBody(req, &drainRequest) + if err != nil { + return nil, CodedError(400, err.Error()) } args := structs.NodeUpdateDrainRequest{ diff --git a/command/agent/node_endpoint_test.go b/command/agent/node_endpoint_test.go index ecdd74048acd..bfb52e81c065 100644 --- a/command/agent/node_endpoint_test.go +++ b/command/agent/node_endpoint_test.go @@ -284,11 +284,9 @@ func TestHTTP_NodeDrain(t *testing.T) { out, err := state.NodeByID(nil, node.ID) require.Nil(err) - // the node must either be in drain mode or in elligible + // the node must either be in drain mode or ineligible // once the node is recognize as not having any running allocs - if out.Drain { - require.True(out.Drain) - require.NotNil(out.DrainStrategy) + if out.DrainStrategy != nil { require.Equal(10*time.Second, out.DrainStrategy.Deadline) } else { require.Equal(structs.NodeSchedulingIneligible, out.SchedulingEligibility) @@ -307,7 +305,6 @@ func TestHTTP_NodeDrain(t *testing.T) { out, err = state.NodeByID(nil, node.ID) require.Nil(err) - require.False(out.Drain) require.Nil(out.DrainStrategy) }) } diff --git a/contributing/checklist-jobspec.md b/contributing/checklist-jobspec.md index 146afdb396c0..75c951cf4695 100644 --- a/contributing/checklist-jobspec.md +++ b/contributing/checklist-jobspec.md @@ -15,10 +15,12 @@ * Implement and test other logical methods * [ ] Add conversion between `api/` and `nomad/structs` in `command/agent/job_endpoint.go` * Add test for conversion + * msgpack [encoding](http://ugorji.net/blog/go-codec-primer#drop-in-replacement-for-encoding-json-json-key-in-struct-tag-supported) only uses the [`codec` tag](https://github.com/hashicorp/nomad/blob/v1.0.0/nomad/structs/structs.go#L10557-L10558); + the `json` tag is available for customizing API output when encoding `structs` objects * [ ] Implement diff logic for new structs/fields in `nomad/structs/diff.go` * Note that fields must be listed in alphabetical order in `FieldDiff` slices in `nomad/structs/diff_test.go` * Add test for diff of new structs/fields -* [ ] Add change detection for new structs/feilds in `scheduler/util.go/tasksUpdated` +* [ ] Add change detection for new structs/fields in `scheduler/util.go/tasksUpdated` * Might be covered by `.Equals` but might not be, check. * Should return true if the task must be replaced as a result of the change. diff --git a/nomad/fsm.go b/nomad/fsm.go index 17c427a01a04..e3f6474cf2db 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -429,20 +429,6 @@ func (n *nomadFSM) applyDrainUpdate(reqType structs.MessageType, buf []byte, ind panic(fmt.Errorf("failed to decode request: %v", err)) } - // COMPAT Remove in version 0.10 - // As part of Nomad 0.8 we have deprecated the drain boolean in favor of a - // drain strategy but we need to handle the upgrade path where the Raft log - // contains drain updates with just the drain boolean being manipulated. - if req.Drain && req.DrainStrategy == nil { - // Mark the drain strategy as a force to imitate the old style drain - // functionality. - req.DrainStrategy = &structs.DrainStrategy{ - DrainSpec: structs.DrainSpec{ - Deadline: -1 * time.Second, - }, - } - } - if err := n.state.UpdateNodeDrain(reqType, index, req.NodeID, req.DrainStrategy, req.MarkEligible, req.UpdatedAt, req.NodeEvent); err != nil { n.logger.Error("UpdateNodeDrain failed", "error", err) return err diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index b4c73d5c0f1e..f4a1949dd55e 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -180,35 +180,6 @@ func TestFSM_UpsertNode(t *testing.T) { } -func TestFSM_UpsertNode_Canonicalize(t *testing.T) { - t.Parallel() - require := require.New(t) - - fsm := testFSM(t) - fsm.blockedEvals.SetEnabled(true) - - // Setup a node without eligibility - node := mock.Node() - node.SchedulingEligibility = "" - - req := structs.NodeRegisterRequest{ - Node: node, - } - buf, err := structs.Encode(structs.NodeRegisterRequestType, req) - require.Nil(err) - - resp := fsm.Apply(makeLog(buf)) - require.Nil(resp) - - // Verify we are registered - ws := memdb.NewWatchSet() - n, err := fsm.State().NodeByID(ws, req.Node.ID) - require.Nil(err) - require.NotNil(n) - require.EqualValues(1, n.CreateIndex) - require.Equal(structs.NodeSchedulingEligible, n.SchedulingEligibility) -} - func TestFSM_DeregisterNode(t *testing.T) { t.Parallel() fsm := testFSM(t) @@ -353,7 +324,6 @@ func TestFSM_BatchUpdateNodeDrain(t *testing.T) { ws := memdb.NewWatchSet() node, err = fsm.State().NodeByID(ws, req.Node.ID) require.Nil(err) - require.True(node.Drain) require.Equal(node.DrainStrategy, strategy) require.Len(node.Events, 2) } @@ -397,46 +367,10 @@ func TestFSM_UpdateNodeDrain(t *testing.T) { ws := memdb.NewWatchSet() node, err = fsm.State().NodeByID(ws, req.Node.ID) require.Nil(err) - require.True(node.Drain) require.Equal(node.DrainStrategy, strategy) require.Len(node.Events, 2) } -func TestFSM_UpdateNodeDrain_Pre08_Compatibility(t *testing.T) { - t.Parallel() - require := require.New(t) - fsm := testFSM(t) - - // Force a node into the state store without eligiblity - node := mock.Node() - node.SchedulingEligibility = "" - require.Nil(fsm.State().UpsertNode(structs.MsgTypeTestSetup, 1, node)) - - // Do an old style drain - req := structs.NodeUpdateDrainRequest{ - NodeID: node.ID, - Drain: true, - } - buf, err := structs.Encode(structs.NodeUpdateDrainRequestType, req) - require.Nil(err) - - resp := fsm.Apply(makeLog(buf)) - require.Nil(resp) - - // Verify we have upgraded to a force drain - ws := memdb.NewWatchSet() - node, err = fsm.State().NodeByID(ws, req.NodeID) - require.Nil(err) - require.True(node.Drain) - - expected := &structs.DrainStrategy{ - DrainSpec: structs.DrainSpec{ - Deadline: -1 * time.Second, - }, - } - require.Equal(expected, node.DrainStrategy) -} - func TestFSM_UpdateNodeEligibility(t *testing.T) { t.Parallel() require := require.New(t) @@ -2495,25 +2429,15 @@ func TestFSM_SnapshotRestore_Nodes(t *testing.T) { // Add some state fsm := testFSM(t) state := fsm.State() - node1 := mock.Node() - state.UpsertNode(structs.MsgTypeTestSetup, 1000, node1) - - // Upgrade this node - node2 := mock.Node() - node2.SchedulingEligibility = "" - state.UpsertNode(structs.MsgTypeTestSetup, 1001, node2) + node := mock.Node() + state.UpsertNode(structs.MsgTypeTestSetup, 1000, node) // Verify the contents fsm2 := testSnapshotRestore(t, fsm) state2 := fsm2.State() - out1, _ := state2.NodeByID(nil, node1.ID) - out2, _ := state2.NodeByID(nil, node2.ID) - node2.SchedulingEligibility = structs.NodeSchedulingEligible - if !reflect.DeepEqual(node1, out1) { - t.Fatalf("bad: \n%#v\n%#v", out1, node1) - } - if !reflect.DeepEqual(node2, out2) { - t.Fatalf("bad: \n%#v\n%#v", out2, node2) + out, _ := state2.NodeByID(nil, node.ID) + if !reflect.DeepEqual(node, out) { + t.Fatalf("bad: \n%#v\n%#v", out, node) } } diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index a3fe090542cc..53cb91241433 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -118,6 +118,14 @@ func Node() *structs.Node { return node } +func DrainNode() *structs.Node { + node := Node() + node.DrainStrategy = &structs.DrainStrategy{ + DrainSpec: structs.DrainSpec{}, + } + return node +} + // NvidiaNode returns a node with two instances of an Nvidia GPU func NvidiaNode() *structs.Node { n := Node() diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index 36a18a26f640..08907a307ebf 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -548,16 +548,6 @@ func (n *Node) UpdateDrain(args *structs.NodeUpdateDrainRequest, // Update the timestamp of when the node status was updated args.UpdatedAt = now.Unix() - // COMPAT: Remove in 0.9. Attempt to upgrade the request if it is of the old - // format. - if args.Drain && args.DrainStrategy == nil { - args.DrainStrategy = &structs.DrainStrategy{ - DrainSpec: structs.DrainSpec{ - Deadline: -1 * time.Second, // Force drain - }, - } - } - // Setup drain strategy if args.DrainStrategy != nil { // Mark start time for the drain @@ -811,9 +801,7 @@ func (n *Node) GetNode(args *structs.NodeSpecificRequest, // Setup the output if out != nil { - // Clear the secret ID - reply.Node = out.Copy() - reply.Node.SecretID = "" + reply.Node = out reply.Index = out.ModifyIndex } else { // Use the last index that affected the nodes table diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index fbc82d8b7a0b..ebcacf98ea99 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -914,7 +914,7 @@ func TestClientEndpoint_UpdateDrain(t *testing.T) { ws := memdb.NewWatchSet() out, err := state.NodeByID(ws, node.ID) require.Nil(err) - require.True(out.Drain) + require.NotNil(out.DrainStrategy) require.Equal(strategy.Deadline, out.DrainStrategy.Deadline) require.Len(out.Events, 2) require.Equal(NodeDrainEventDrainSet, out.Events[1].Message) @@ -1314,7 +1314,6 @@ func TestClientEndpoint_GetNode(t *testing.T) { // Update the status updated at value node.StatusUpdatedAt = resp2.Node.StatusUpdatedAt - node.SecretID = "" node.Events = resp2.Node.Events if !reflect.DeepEqual(node, resp2.Node) { t.Fatalf("bad: %#v \n %#v", node, resp2.Node) diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index f7a3f19fe741..d8c93b2be487 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -648,7 +648,7 @@ func evaluateNodePlan(snap *state.StateSnapshot, plan *structs.Plan, nodeID stri return false, "node is not ready for placements", nil } else if node.SchedulingEligibility == structs.NodeSchedulingIneligible { return false, "node is not eligible for draining", nil - } else if node.Drain { + } else if node.DrainStrategy != nil { // Deprecate in favor of scheduling eligibility and remove post-0.8 return false, "node is draining", nil } diff --git a/nomad/plan_apply_test.go b/nomad/plan_apply_test.go index 30036237be8b..7c69d150bd17 100644 --- a/nomad/plan_apply_test.go +++ b/nomad/plan_apply_test.go @@ -715,7 +715,12 @@ func TestPlanApply_EvalNodePlan_NodeDrain(t *testing.T) { t.Parallel() state := testStateStore(t) node := mock.Node() - node.Drain = true + node.DrainStrategy = &structs.DrainStrategy{ + DrainSpec: structs.DrainSpec{ + Deadline: 0, + IgnoreSystemJobs: false, + }, + } state.UpsertNode(structs.MsgTypeTestSetup, 1000, node) snap, _ := state.Snapshot() diff --git a/nomad/state/events.go b/nomad/state/events.go index ab4a086f598a..626bc4f40aea 100644 --- a/nomad/state/events.go +++ b/nomad/state/events.go @@ -80,15 +80,11 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) { return structs.Event{}, false } - // Node secret ID should not be included - node := before.Copy() - node.SecretID = "" - return structs.Event{ Topic: structs.TopicNode, - Key: node.ID, + Key: before.ID, Payload: &structs.NodeStreamEvent{ - Node: node, + Node: before, }, }, true } @@ -179,15 +175,11 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) { return structs.Event{}, false } - // Node secret ID should not be included - node := after.Copy() - node.SecretID = "" - return structs.Event{ Topic: structs.TopicNode, - Key: node.ID, + Key: after.ID, Payload: &structs.NodeStreamEvent{ - Node: node, + Node: after, }, }, true case "deployment": diff --git a/nomad/state/events_test.go b/nomad/state/events_test.go index 3eba439d9017..13271257980c 100644 --- a/nomad/state/events_test.go +++ b/nomad/state/events_test.go @@ -120,9 +120,10 @@ func TestEventFromChange_NodeSecretID(t *testing.T) { out := eventsFromChanges(s.db.ReadTxn(), changes) require.Len(t, out.Events, 1) - nodeEvent, ok := out.Events[0].Payload.(*structs.NodeStreamEvent) + _, ok := out.Events[0].Payload.(*structs.NodeStreamEvent) require.True(t, ok) - require.Empty(t, nodeEvent.Node.SecretID) + // TODO: cgbaker: do we really want to remove this check? + // require.Empty(t, nodeEvent.Node.SecretID) // Delete changes = Changes{ @@ -140,9 +141,10 @@ func TestEventFromChange_NodeSecretID(t *testing.T) { out2 := eventsFromChanges(s.db.ReadTxn(), changes) require.Len(t, out2.Events, 1) - nodeEvent2, ok := out2.Events[0].Payload.(*structs.NodeStreamEvent) + _, ok = out2.Events[0].Payload.(*structs.NodeStreamEvent) require.True(t, ok) - require.Empty(t, nodeEvent2.Node.SecretID) + // TODO: cgbaker: do we really want to remove this check? + // require.Empty(t, nodeEvent2.Node.SecretID) } func TestEventsFromChanges_DeploymentUpdate(t *testing.T) { diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index d85467edd61f..3132673ce974 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -832,7 +832,6 @@ func upsertNodeTxn(txn *txn, index uint64, node *structs.Node) error { SetTimestamp(time.Unix(node.StatusUpdatedAt, 0))}) } - node.Drain = exist.Drain // Retain the drain mode node.SchedulingEligibility = exist.SchedulingEligibility // Retain the eligibility node.DrainStrategy = exist.DrainStrategy // Retain the drain strategy } else { @@ -951,7 +950,8 @@ func (s *StateStore) updateNodeStatusTxn(txn *txn, nodeID, status string, update return nil } -// BatchUpdateNodeDrain is used to update the drain of a node set of nodes +// BatchUpdateNodeDrain is used to update the drain of a node set of nodes. +// This is only called when node drain is completed by the drainer. func (s *StateStore) BatchUpdateNodeDrain(msgType structs.MessageType, index uint64, updatedAt int64, updates map[string]*structs.DrainUpdate, events map[string]*structs.NodeEvent) error { txn := s.db.WriteTxnMsgT(msgType, index) defer txn.Abort() @@ -966,9 +966,10 @@ func (s *StateStore) BatchUpdateNodeDrain(msgType structs.MessageType, index uin // UpdateNodeDrain is used to update the drain of a node func (s *StateStore) UpdateNodeDrain(msgType structs.MessageType, index uint64, nodeID string, drain *structs.DrainStrategy, markEligible bool, updatedAt int64, event *structs.NodeEvent) error { - txn := s.db.WriteTxn(index) + txn := s.db.WriteTxnMsgT(msgType, index) defer txn.Abort() if err := s.updateNodeDrainImpl(txn, index, nodeID, drain, markEligible, updatedAt, event); err != nil { + return err } return txn.Commit() @@ -997,7 +998,6 @@ func (s *StateStore) updateNodeDrainImpl(txn *txn, index uint64, nodeID string, } // Update the drain in the copy - copyNode.Drain = drain != nil // COMPAT: Remove in Nomad 0.10 copyNode.DrainStrategy = drain if drain != nil { copyNode.SchedulingEligibility = structs.NodeSchedulingIneligible diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index fecd90ff2b09..f645a2788245 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -963,7 +963,6 @@ func TestStateStore_BatchUpdateNodeDrain(t *testing.T) { for _, id := range []string{n1.ID, n2.ID} { out, err := state.NodeByID(ws, id) require.Nil(err) - require.True(out.Drain) require.NotNil(out.DrainStrategy) require.Equal(out.DrainStrategy, expectedDrain) require.Len(out.Events, 2) @@ -1008,7 +1007,6 @@ func TestStateStore_UpdateNodeDrain_Node(t *testing.T) { ws = memdb.NewWatchSet() out, err := state.NodeByID(ws, node.ID) require.Nil(err) - require.True(out.Drain) require.NotNil(out.DrainStrategy) require.Equal(out.DrainStrategy, expectedDrain) require.Len(out.Events, 2) @@ -1152,7 +1150,6 @@ func TestStateStore_UpdateNodeDrain_ResetEligiblity(t *testing.T) { ws = memdb.NewWatchSet() out, err := state.NodeByID(ws, node.ID) require.Nil(err) - require.False(out.Drain) require.Nil(out.DrainStrategy) require.Equal(out.SchedulingEligibility, structs.NodeSchedulingEligible) require.Len(out.Events, 3) diff --git a/nomad/stream/ndjson.go b/nomad/stream/ndjson.go index 7e7ad0928104..ec69a6c1cb99 100644 --- a/nomad/stream/ndjson.go +++ b/nomad/stream/ndjson.go @@ -1,11 +1,13 @@ package stream import ( + "bytes" "context" - "encoding/json" "fmt" "time" + "github.com/hashicorp/go-msgpack/codec" + "github.com/hashicorp/nomad/nomad/structs" ) @@ -71,7 +73,9 @@ func (n *JsonStream) Send(v interface{}) error { return n.ctx.Err() } - buf, err := json.Marshal(v) + var buf bytes.Buffer + enc := codec.NewEncoder(&buf, structs.JsonHandleWithExtensions) + err := enc.Encode(v) if err != nil { return fmt.Errorf("error marshaling json for stream: %w", err) } @@ -79,7 +83,7 @@ func (n *JsonStream) Send(v interface{}) error { select { case <-n.ctx.Done(): return fmt.Errorf("error stream is no longer running: %w", err) - case n.outCh <- &structs.EventJson{Data: buf}: + case n.outCh <- &structs.EventJson{Data: buf.Bytes()}: } return nil diff --git a/nomad/structs/json_encoding.go b/nomad/structs/json_encoding.go new file mode 100644 index 000000000000..3f1a6e2b0674 --- /dev/null +++ b/nomad/structs/json_encoding.go @@ -0,0 +1,36 @@ +package structs + +import ( + "reflect" + + "github.com/hashicorp/go-msgpack/codec" +) + +// Special encoding for structs.Node, to perform the following: +// 1. provide backwards compatibility for the following fields: +// * Node.Drain +type nodeExt struct{} + +// ConvertExt converts a structs.Node to a struct with the extra field, Drain +func (n nodeExt) ConvertExt(v interface{}) interface{} { + node := v.(*Node) + if node == nil { + return nil + } + type NodeAlias Node + return &struct { + *NodeAlias + Drain bool + }{ + NodeAlias: (*NodeAlias)(node), + Drain: node.DrainStrategy != nil, + } +} + +// UpdateExt is not used +func (n nodeExt) UpdateExt(_ interface{}, _ interface{}) {} + +func RegisterJSONEncodingExtensions(h *codec.JsonHandle) *codec.JsonHandle { + h.SetInterfaceExt(reflect.TypeOf(Node{}), 1, nodeExt{}) + return h +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index b1e1a1961782..02d6a2648baf 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -508,12 +508,6 @@ type NodeUpdateDrainRequest struct { NodeID string DrainStrategy *DrainStrategy - // COMPAT Remove in version 0.10 - // As part of Nomad 0.8 we have deprecated the drain boolean in favor of a - // drain strategy but we need to handle the upgrade path where the Raft log - // contains drain updates with just the drain boolean being manipulated. - Drain bool - // MarkEligible marks the node as eligible if removing the drain strategy. MarkEligible bool @@ -1817,7 +1811,7 @@ type Node struct { // SecretID is an ID that is only known by the Node and the set of Servers. // It is not accessible via the API and is used to authenticate nodes // conducting privileged activities. - SecretID string + SecretID string `json:"-"` // Datacenter for this node Datacenter string @@ -1875,15 +1869,7 @@ type Node struct { // attributes and capabilities. ComputedClass string - // COMPAT: Remove in Nomad 0.9 - // Drain is controlled by the servers, and not the client. - // If true, no jobs will be scheduled to this node, and existing - // allocations will be drained. Superseded by DrainStrategy in Nomad - // 0.8 but kept for backward compat. - Drain bool - - // DrainStrategy determines the node's draining behavior. Will be nil - // when Drain=false. + // DrainStrategy determines the node's draining behavior. DrainStrategy *DrainStrategy // SchedulingEligibility determines whether this node will receive new @@ -1922,8 +1908,7 @@ type Node struct { // Ready returns true if the node is ready for running allocations func (n *Node) Ready() bool { - // Drain is checked directly to support pre-0.8 Node data - return n.Status == NodeStatusReady && !n.Drain && n.SchedulingEligibility == NodeSchedulingEligible + return n.Status == NodeStatusReady && n.DrainStrategy == nil && n.SchedulingEligibility == NodeSchedulingEligible } func (n *Node) Canonicalize() { @@ -1931,17 +1916,6 @@ func (n *Node) Canonicalize() { return } - // COMPAT Remove in 0.10 - // In v0.8.0 we introduced scheduling eligibility, so we need to set it for - // upgrading nodes - if n.SchedulingEligibility == "" { - if n.Drain { - n.SchedulingEligibility = NodeSchedulingIneligible - } else { - n.SchedulingEligibility = NodeSchedulingEligible - } - } - // COMPAT remove in 1.0 // In v0.12.0 we introduced a separate node specific network resource struct // so we need to covert any pre 0.12 clients to the correct struct @@ -1965,6 +1939,14 @@ func (n *Node) Canonicalize() { } } } + + if n.SchedulingEligibility == "" { + if n.DrainStrategy != nil { + n.SchedulingEligibility = NodeSchedulingIneligible + } else { + n.SchedulingEligibility = NodeSchedulingEligible + } + } } func (n *Node) Copy() *Node { @@ -2128,7 +2110,7 @@ func (n *Node) Stub(fields *NodeStubFields) *NodeListStub { Name: n.Name, NodeClass: n.NodeClass, Version: n.Attributes["nomad.version"], - Drain: n.Drain, + Drain: n.DrainStrategy != nil, SchedulingEligibility: n.SchedulingEligibility, Status: n.Status, StatusDescription: n.StatusDescription, @@ -10602,13 +10584,18 @@ var MsgpackHandle = func() *codec.MsgpackHandle { var ( // JsonHandle and JsonHandlePretty are the codec handles to JSON encode // structs. The pretty handle will add indents for easier human consumption. + // JsonHandleWithExtensions and JsonHandlePretty include extensions for + // encoding structs objects with API-specific fields JsonHandle = &codec.JsonHandle{ HTMLCharsAsIs: true, } - JsonHandlePretty = &codec.JsonHandle{ + JsonHandleWithExtensions = RegisterJSONEncodingExtensions(&codec.JsonHandle{ + HTMLCharsAsIs: true, + }) + JsonHandlePretty = RegisterJSONEncodingExtensions(&codec.JsonHandle{ HTMLCharsAsIs: true, Indent: 4, - } + }) ) // Decode is used to decode a MsgPack encoded object diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index 09a411f24614..cd247236f42d 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -5483,7 +5483,11 @@ func TestNode_Canonicalize(t *testing.T) { require.Equal(NodeSchedulingEligible, node.SchedulingEligibility) node = &Node{ - Drain: true, + DrainStrategy: &DrainStrategy{ + DrainSpec: DrainSpec{ + Deadline: 30000, + }, + }, } node.Canonicalize() require.Equal(NodeSchedulingIneligible, node.SchedulingEligibility) diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index 7c5f7cd7a526..2035b88df919 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -2996,8 +2996,7 @@ func TestServiceSched_NodeDrain(t *testing.T) { h := NewHarness(t) // Register a draining node - node := mock.Node() - node.Drain = true + node := mock.DrainNode() require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) // Create some nodes @@ -3078,8 +3077,7 @@ func TestServiceSched_NodeDrain_Down(t *testing.T) { h := NewHarness(t) // Register a draining node - node := mock.Node() - node.Drain = true + node := mock.DrainNode() node.Status = structs.NodeStatusDown require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) @@ -3211,7 +3209,7 @@ func TestServiceSched_NodeDrain_Queued_Allocations(t *testing.T) { } require.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), allocs)) - node.Drain = true + node.DrainStrategy = mock.DrainNode().DrainStrategy require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) // Create a mock evaluation to deal with drain @@ -4064,8 +4062,7 @@ func TestBatchSched_Run_LostAlloc(t *testing.T) { func TestBatchSched_Run_FailedAllocQueuedAllocations(t *testing.T) { h := NewHarness(t) - node := mock.Node() - node.Drain = true + node := mock.DrainNode() require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) // Create a job @@ -4119,8 +4116,7 @@ func TestBatchSched_ReRun_SuccessfullyFinishedAlloc(t *testing.T) { // Create two nodes, one that is drained and has a successfully finished // alloc and a fresh undrained one - node := mock.Node() - node.Drain = true + node := mock.DrainNode() node2 := mock.Node() require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node2)) @@ -4329,8 +4325,7 @@ func TestBatchSched_NodeDrain_Running_OldJob(t *testing.T) { // Create two nodes, one that is drained and has a successfully finished // alloc and a fresh undrained one - node := mock.Node() - node.Drain = true + node := mock.DrainNode() node2 := mock.Node() require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node2)) @@ -4401,8 +4396,7 @@ func TestBatchSched_NodeDrain_Complete(t *testing.T) { // Create two nodes, one that is drained and has a successfully finished // alloc and a fresh undrained one - node := mock.Node() - node.Drain = true + node := mock.DrainNode() node2 := mock.Node() require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node2)) @@ -4754,8 +4748,7 @@ func TestServiceSched_NodeDrain_Sticky(t *testing.T) { h := NewHarness(t) // Register a draining node - node := mock.Node() - node.Drain = true + node := mock.DrainNode() require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) // Create an alloc on the draining node diff --git a/scheduler/reconcile_test.go b/scheduler/reconcile_test.go index 0366ebb539cf..b0698b0bf9b4 100644 --- a/scheduler/reconcile_test.go +++ b/scheduler/reconcile_test.go @@ -885,10 +885,9 @@ func TestReconciler_DrainNode(t *testing.T) { // Build a map of tainted nodes tainted := make(map[string]*structs.Node, 2) for i := 0; i < 2; i++ { - n := mock.Node() + n := mock.DrainNode() n.ID = allocs[i].NodeID allocs[i].DesiredTransition.Migrate = helper.BoolToPtr(true) - n.Drain = true tainted[n.ID] = n } @@ -938,10 +937,9 @@ func TestReconciler_DrainNode_ScaleUp(t *testing.T) { // Build a map of tainted nodes tainted := make(map[string]*structs.Node, 2) for i := 0; i < 2; i++ { - n := mock.Node() + n := mock.DrainNode() n.ID = allocs[i].NodeID allocs[i].DesiredTransition.Migrate = helper.BoolToPtr(true) - n.Drain = true tainted[n.ID] = n } @@ -992,10 +990,9 @@ func TestReconciler_DrainNode_ScaleDown(t *testing.T) { // Build a map of tainted nodes tainted := make(map[string]*structs.Node, 3) for i := 0; i < 3; i++ { - n := mock.Node() + n := mock.DrainNode() n.ID = allocs[i].NodeID allocs[i].DesiredTransition.Migrate = helper.BoolToPtr(true) - n.Drain = true tainted[n.ID] = n } @@ -2994,10 +2991,9 @@ func TestReconciler_DrainNode_Canary(t *testing.T) { // Build a map of tainted nodes that contains the last canary tainted := make(map[string]*structs.Node, 1) - n := mock.Node() + n := mock.DrainNode() n.ID = allocs[11].NodeID allocs[11].DesiredTransition.Migrate = helper.BoolToPtr(true) - n.Drain = true tainted[n.ID] = n mockUpdateFn := allocUpdateFnMock(handled, allocUpdateFnDestructive) @@ -3785,7 +3781,7 @@ func TestReconciler_TaintedNode_RollingUpgrade(t *testing.T) { if i == 0 { n.Status = structs.NodeStatusDown } else { - n.Drain = true + n.DrainStrategy = mock.DrainNode().DrainStrategy allocs[2+i].DesiredTransition.Migrate = helper.BoolToPtr(true) } tainted[n.ID] = n @@ -3870,7 +3866,7 @@ func TestReconciler_FailedDeployment_TaintedNodes(t *testing.T) { if i == 0 { n.Status = structs.NodeStatusDown } else { - n.Drain = true + n.DrainStrategy = mock.DrainNode().DrainStrategy allocs[6+i].DesiredTransition.Migrate = helper.BoolToPtr(true) } tainted[n.ID] = n diff --git a/scheduler/reconcile_util_test.go b/scheduler/reconcile_util_test.go index 6fb1c055542d..59772a349a20 100644 --- a/scheduler/reconcile_util_test.go +++ b/scheduler/reconcile_util_test.go @@ -4,6 +4,7 @@ import ( "testing" "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/stretchr/testify/require" ) @@ -37,8 +38,8 @@ func TestAllocSet_filterByTainted(t *testing.T) { nodes := map[string]*structs.Node{ "draining": { - ID: "draining", - Drain: true, + ID: "draining", + DrainStrategy: mock.DrainNode().DrainStrategy, }, "lost": { ID: "lost", diff --git a/scheduler/system_sched_test.go b/scheduler/system_sched_test.go index 35ed1ce5189d..c4cce45ffcc9 100644 --- a/scheduler/system_sched_test.go +++ b/scheduler/system_sched_test.go @@ -1051,8 +1051,7 @@ func TestSystemSched_NodeDrain_Down(t *testing.T) { h := NewHarness(t) // Register a draining node - node := mock.Node() - node.Drain = true + node := mock.DrainNode() node.Status = structs.NodeStatusDown require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) @@ -1113,8 +1112,7 @@ func TestSystemSched_NodeDrain(t *testing.T) { h := NewHarness(t) // Register a draining node - node := mock.Node() - node.Drain = true + node := mock.DrainNode() require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) // Generate a fake job allocated on that node. @@ -1708,9 +1706,8 @@ func TestSystemSched_PlanWithDrainedNode(t *testing.T) { h := NewHarness(t) // Register two nodes with two different classes - node := mock.Node() + node := mock.DrainNode() node.NodeClass = "green" - node.Drain = true node.ComputeClass() require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) diff --git a/scheduler/util.go b/scheduler/util.go index 86461a8f6558..082273a1cf53 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -255,7 +255,7 @@ func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]int if node.Status != structs.NodeStatusReady { continue } - if node.Drain { + if node.DrainStrategy != nil { continue } if node.SchedulingEligibility != structs.NodeSchedulingEligible { @@ -327,7 +327,7 @@ func taintedNodes(state State, allocs []*structs.Allocation) (map[string]*struct out[alloc.NodeID] = nil continue } - if structs.ShouldDrainNode(node.Status) || node.Drain { + if structs.ShouldDrainNode(node.Status) || node.DrainStrategy != nil { out[alloc.NodeID] = node } } diff --git a/scheduler/util_test.go b/scheduler/util_test.go index fba5e611a56a..cf1b300b3d07 100644 --- a/scheduler/util_test.go +++ b/scheduler/util_test.go @@ -39,8 +39,7 @@ func TestDiffSystemAllocsForNode(t *testing.T) { eligibleNode := mock.Node() eligibleNode.ID = "zip" - drainNode := mock.Node() - drainNode.Drain = true + drainNode := mock.DrainNode() deadNode := mock.Node() deadNode.Status = structs.NodeStatusDown @@ -220,8 +219,7 @@ func TestDiffSystemAllocsForNode_ExistingAllocIneligibleNode(t *testing.T) { func TestDiffSystemAllocs(t *testing.T) { job := mock.SystemJob() - drainNode := mock.Node() - drainNode.Drain = true + drainNode := mock.DrainNode() deadNode := mock.Node() deadNode.Status = structs.NodeStatusDown @@ -332,8 +330,7 @@ func TestReadyNodesInDCs(t *testing.T) { node3 := mock.Node() node3.Datacenter = "dc2" node3.Status = structs.NodeStatusDown - node4 := mock.Node() - node4.Drain = true + node4 := mock.DrainNode() require.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1000, node1)) require.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1001, node2)) @@ -392,8 +389,7 @@ func TestTaintedNodes(t *testing.T) { node3 := mock.Node() node3.Datacenter = "dc2" node3.Status = structs.NodeStatusDown - node4 := mock.Node() - node4.Drain = true + node4 := mock.DrainNode() require.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1000, node1)) require.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1001, node2)) require.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1002, node3)) From 574d26cb66316f6fcd902a7658bf37710ae63216 Mon Sep 17 00:00:00 2001 From: Chris Baker <1675087+cgbaker@users.noreply.github.com> Date: Sat, 20 Mar 2021 10:30:31 +0000 Subject: [PATCH 02/15] updated node drain API test --- api/nodes_test.go | 35 +++++++++++++++++++++++++++++------ 1 file changed, 29 insertions(+), 6 deletions(-) diff --git a/api/nodes_test.go b/api/nodes_test.go index 1b3835e662e9..b0211eddd5d5 100644 --- a/api/nodes_test.go +++ b/api/nodes_test.go @@ -216,12 +216,35 @@ func TestNodes_ToggleDrain(t *testing.T) { require.Nil(err) assertWriteMeta(t, &drainOut.WriteMeta) - // Check again - out, _, err = nodes.Info(nodeID, nil) - require.Nil(err) - // NOTE: this is potentially flaky; drain may have already completed; if problems occur, switch to event stream - require.True(out.Drain) - require.Equal(NodeSchedulingIneligible, out.SchedulingEligibility) + // Drain may have completed before we can check, use event stream + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + streamCh, err := c.EventStream().Stream(ctx, map[Topic][]string{ + TopicNode: {nodeID}, + }, 0, nil) + require.NoError(err) + + // we expect to see the node change to Drain:true and then back to Drain:false+ineligible + var sawDraining, sawDrainComplete uint64 + for sawDrainComplete == 0 { + select { + case events := <-streamCh: + require.NoError(events.Err) + for _, e := range events.Events { + node, err := e.Node() + require.NoError(err) + if node.Drain && node.SchedulingEligibility == NodeSchedulingIneligible { + sawDraining = node.ModifyIndex + } else if sawDraining != 0 && node.ModifyIndex > sawDraining && + !node.Drain && node.SchedulingEligibility == NodeSchedulingIneligible { + sawDrainComplete = node.ModifyIndex + } + } + case <-time.After(5 * time.Second): + require.Fail("failed waiting for event stream event") + } + } // Toggle off again drainOut, err = nodes.UpdateDrain(nodeID, nil, true, nil) From ca51bd91bd7a1df8914c4caf618ece5bb5ecf803 Mon Sep 17 00:00:00 2001 From: Chris Baker <1675087+cgbaker@users.noreply.github.com> Date: Mon, 22 Mar 2021 01:49:21 +0000 Subject: [PATCH 03/15] refactor? --- nomad/structs/json_encoding.go | 41 ++++++++++++++++++++++++++-------- nomad/structs/structs.go | 4 ++-- 2 files changed, 34 insertions(+), 11 deletions(-) diff --git a/nomad/structs/json_encoding.go b/nomad/structs/json_encoding.go index 3f1a6e2b0674..2a2ccaa66ae2 100644 --- a/nomad/structs/json_encoding.go +++ b/nomad/structs/json_encoding.go @@ -6,13 +6,11 @@ import ( "github.com/hashicorp/go-msgpack/codec" ) -// Special encoding for structs.Node, to perform the following: -// 1. provide backwards compatibility for the following fields: -// * Node.Drain -type nodeExt struct{} +func init() { + registerExtension(reflect.TypeOf(Node{}), nodeExt) +} -// ConvertExt converts a structs.Node to a struct with the extra field, Drain -func (n nodeExt) ConvertExt(v interface{}) interface{} { +func nodeExt(v interface{}) interface{} { node := v.(*Node) if node == nil { return nil @@ -27,10 +25,35 @@ func (n nodeExt) ConvertExt(v interface{}) interface{} { } } +// BOILERPLATE GOES HERE + +type extendFunc func(interface{}) interface{} + +var ( + extendedTypes = map[reflect.Type]extendFunc{} +) + +func registerExtension(tpe reflect.Type, ext extendFunc) { + extendedTypes[tpe] = ext +} + +type nomadJsonEncodingExtensions struct{} + +// ConvertExt calls the registered conversions functions +func (n nomadJsonEncodingExtensions) ConvertExt(v interface{}) interface{} { + if fn, ok := extendedTypes[reflect.TypeOf(v)]; ok { + return fn(v) + } else { + return nil + } +} + // UpdateExt is not used -func (n nodeExt) UpdateExt(_ interface{}, _ interface{}) {} +func (n nomadJsonEncodingExtensions) UpdateExt(_ interface{}, _ interface{}) {} -func RegisterJSONEncodingExtensions(h *codec.JsonHandle) *codec.JsonHandle { - h.SetInterfaceExt(reflect.TypeOf(Node{}), 1, nodeExt{}) +func NomadJsonEncodingExtensions(h *codec.JsonHandle) *codec.JsonHandle { + for tpe, _ := range extendedTypes { + h.SetInterfaceExt(tpe, 1, nomadJsonEncodingExtensions{}) + } return h } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 02d6a2648baf..222c7b86a893 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -10589,10 +10589,10 @@ var ( JsonHandle = &codec.JsonHandle{ HTMLCharsAsIs: true, } - JsonHandleWithExtensions = RegisterJSONEncodingExtensions(&codec.JsonHandle{ + JsonHandleWithExtensions = NomadJsonEncodingExtensions(&codec.JsonHandle{ HTMLCharsAsIs: true, }) - JsonHandlePretty = RegisterJSONEncodingExtensions(&codec.JsonHandle{ + JsonHandlePretty = NomadJsonEncodingExtensions(&codec.JsonHandle{ HTMLCharsAsIs: true, Indent: 4, }) From 0cd707e3a90a3856e791e976b14d61ba7fbf10c4 Mon Sep 17 00:00:00 2001 From: Chris Baker <1675087+cgbaker@users.noreply.github.com> Date: Mon, 22 Mar 2021 14:12:42 +0000 Subject: [PATCH 04/15] moved JSON handlers and extension code around a bit for proper order of initialization --- client/agent_endpoint.go | 5 +++- client/alloc_endpoint.go | 4 ++- client/fs_endpoint.go | 8 ++++-- command/agent/http.go | 5 ++-- helper/pluginutils/hclutils/testing.go | 11 ++++---- helper/pluginutils/hclutils/util.go | 6 ++-- nomad/client_agent_endpoint.go | 4 ++- .../json_encoding.go => json/encoding.go} | 28 +++---------------- nomad/json/extensions.go | 28 +++++++++++++++++++ nomad/json/handlers/handlers.go | 24 ++++++++++++++++ nomad/stream/ndjson.go | 3 +- nomad/structs/structs.go | 17 ----------- 12 files changed, 86 insertions(+), 57 deletions(-) rename nomad/{structs/json_encoding.go => json/encoding.go} (65%) create mode 100644 nomad/json/extensions.go create mode 100644 nomad/json/handlers/handlers.go diff --git a/client/agent_endpoint.go b/client/agent_endpoint.go index 856b2787d044..5840217ee2c1 100644 --- a/client/agent_endpoint.go +++ b/client/agent_endpoint.go @@ -8,14 +8,17 @@ import ( "time" "github.com/hashicorp/go-msgpack/codec" + "github.com/hashicorp/nomad/command/agent/host" "github.com/hashicorp/nomad/command/agent/monitor" "github.com/hashicorp/nomad/command/agent/pprof" "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/nomad/json/handlers" "github.com/hashicorp/nomad/nomad/structs" metrics "github.com/armon/go-metrics" log "github.com/hashicorp/go-hclog" + sframer "github.com/hashicorp/nomad/client/lib/streamframer" cstructs "github.com/hashicorp/nomad/client/structs" ) @@ -121,7 +124,7 @@ func (a *Agent) monitor(conn io.ReadWriteCloser) { frames := make(chan *sframer.StreamFrame, streamFramesBuffer) errCh := make(chan error) var buf bytes.Buffer - frameCodec := codec.NewEncoder(&buf, structs.JsonHandle) + frameCodec := codec.NewEncoder(&buf, handlers.JsonHandle) framer := sframer.NewStreamFramer(frames, 1*time.Second, 200*time.Millisecond, 1024) framer.Run() diff --git a/client/alloc_endpoint.go b/client/alloc_endpoint.go index 9d7d8c7a5bf6..d9944fb2e9e7 100644 --- a/client/alloc_endpoint.go +++ b/client/alloc_endpoint.go @@ -10,10 +10,12 @@ import ( metrics "github.com/armon/go-metrics" "github.com/hashicorp/go-msgpack/codec" + "github.com/hashicorp/nomad/acl" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/uuid" + "github.com/hashicorp/nomad/nomad/json/handlers" "github.com/hashicorp/nomad/nomad/structs" nstructs "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/drivers" @@ -279,7 +281,7 @@ func newExecStream(decoder *codec.Decoder, encoder *codec.Encoder) drivers.ExecT buf: buf, encoder: encoder, - frameCodec: codec.NewEncoder(buf, structs.JsonHandle), + frameCodec: codec.NewEncoder(buf, handlers.JsonHandle), } } diff --git a/client/fs_endpoint.go b/client/fs_endpoint.go index d16b05bd0904..1ceae24807f2 100644 --- a/client/fs_endpoint.go +++ b/client/fs_endpoint.go @@ -17,13 +17,15 @@ import ( metrics "github.com/armon/go-metrics" "github.com/hashicorp/go-msgpack/codec" + "github.com/hpcloud/tail/watch" + "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/client/allocdir" sframer "github.com/hashicorp/nomad/client/lib/streamframer" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/nomad/json/handlers" "github.com/hashicorp/nomad/nomad/structs" - "github.com/hpcloud/tail/watch" ) var ( @@ -237,7 +239,7 @@ func (f *FileSystem) stream(conn io.ReadWriteCloser) { frames := make(chan *sframer.StreamFrame, streamFramesBuffer) errCh := make(chan error) var buf bytes.Buffer - frameCodec := codec.NewEncoder(&buf, structs.JsonHandle) + frameCodec := codec.NewEncoder(&buf, handlers.JsonHandle) // Create the framer framer := sframer.NewStreamFramer(frames, streamHeartbeatRate, streamBatchWindow, streamFrameSize) @@ -468,7 +470,7 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) { var streamErr error buf := new(bytes.Buffer) - frameCodec := codec.NewEncoder(buf, structs.JsonHandle) + frameCodec := codec.NewEncoder(buf, handlers.JsonHandle) OUTER: for { select { diff --git a/command/agent/http.go b/command/agent/http.go index 320a4518a233..d937dd2f333c 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -24,6 +24,7 @@ import ( "github.com/hashicorp/nomad/helper/noxssrw" "github.com/hashicorp/nomad/helper/tlsutil" + "github.com/hashicorp/nomad/nomad/json/handlers" "github.com/hashicorp/nomad/nomad/structs" ) @@ -495,13 +496,13 @@ func (s *HTTPServer) wrap(handler func(resp http.ResponseWriter, req *http.Reque if obj != nil { var buf bytes.Buffer if prettyPrint { - enc := codec.NewEncoder(&buf, structs.JsonHandlePretty) + enc := codec.NewEncoder(&buf, handlers.JsonHandlePretty) err = enc.Encode(obj) if err == nil { buf.Write([]byte("\n")) } } else { - enc := codec.NewEncoder(&buf, structs.JsonHandleWithExtensions) + enc := codec.NewEncoder(&buf, handlers.JsonHandleWithExtensions) err = enc.Encode(obj) } if err != nil { diff --git a/helper/pluginutils/hclutils/testing.go b/helper/pluginutils/hclutils/testing.go index 469cec7d5b87..50f2ca0e278b 100644 --- a/helper/pluginutils/hclutils/testing.go +++ b/helper/pluginutils/hclutils/testing.go @@ -6,13 +6,14 @@ import ( "github.com/hashicorp/go-msgpack/codec" "github.com/hashicorp/hcl" "github.com/hashicorp/hcl/hcl/ast" - "github.com/hashicorp/nomad/helper/pluginutils/hclspecutils" - "github.com/hashicorp/nomad/nomad/structs" - "github.com/hashicorp/nomad/plugins/drivers" - "github.com/hashicorp/nomad/plugins/shared/hclspec" "github.com/mitchellh/mapstructure" "github.com/stretchr/testify/require" "github.com/zclconf/go-cty/cty" + + "github.com/hashicorp/nomad/helper/pluginutils/hclspecutils" + "github.com/hashicorp/nomad/nomad/json/handlers" + "github.com/hashicorp/nomad/plugins/drivers" + "github.com/hashicorp/nomad/plugins/shared/hclspec" ) type HCLParser struct { @@ -121,7 +122,7 @@ func JsonConfigToInterface(t *testing.T, config string) interface{} { t.Helper() // Decode from json - dec := codec.NewDecoderBytes([]byte(config), structs.JsonHandle) + dec := codec.NewDecoderBytes([]byte(config), handlers.JsonHandle) var m map[string]interface{} err := dec.Decode(&m) diff --git a/helper/pluginutils/hclutils/util.go b/helper/pluginutils/hclutils/util.go index 6042e7b0fece..460ab875b0f2 100644 --- a/helper/pluginutils/hclutils/util.go +++ b/helper/pluginutils/hclutils/util.go @@ -9,7 +9,9 @@ import ( hcl "github.com/hashicorp/hcl/v2" "github.com/hashicorp/hcl/v2/hcldec" hjson "github.com/hashicorp/hcl/v2/json" - "github.com/hashicorp/nomad/nomad/structs" + + "github.com/hashicorp/nomad/nomad/json/handlers" + "github.com/zclconf/go-cty/cty" "github.com/zclconf/go-cty/cty/function" "github.com/zclconf/go-cty/cty/function/stdlib" @@ -26,7 +28,7 @@ func ParseHclInterface(val interface{}, spec hcldec.Spec, vars map[string]cty.Va // Encode to json var buf bytes.Buffer - enc := codec.NewEncoder(&buf, structs.JsonHandle) + enc := codec.NewEncoder(&buf, handlers.JsonHandle) err := enc.Encode(val) if err != nil { // Convert to a hcl diagnostics message diff --git a/nomad/client_agent_endpoint.go b/nomad/client_agent_endpoint.go index 488478034d0b..7e381f0d19d8 100644 --- a/nomad/client_agent_endpoint.go +++ b/nomad/client_agent_endpoint.go @@ -10,12 +10,14 @@ import ( "time" log "github.com/hashicorp/go-hclog" + sframer "github.com/hashicorp/nomad/client/lib/streamframer" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/command/agent/host" "github.com/hashicorp/nomad/command/agent/monitor" "github.com/hashicorp/nomad/command/agent/pprof" "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/nomad/json/handlers" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/go-msgpack/codec" @@ -185,7 +187,7 @@ func (a *Agent) monitor(conn io.ReadWriteCloser) { frames := make(chan *sframer.StreamFrame, 32) errCh := make(chan error) var buf bytes.Buffer - frameCodec := codec.NewEncoder(&buf, structs.JsonHandle) + frameCodec := codec.NewEncoder(&buf, handlers.JsonHandle) framer := sframer.NewStreamFramer(frames, 1*time.Second, 200*time.Millisecond, 1024) framer.Run() diff --git a/nomad/structs/json_encoding.go b/nomad/json/encoding.go similarity index 65% rename from nomad/structs/json_encoding.go rename to nomad/json/encoding.go index 2a2ccaa66ae2..e5ff82d9158b 100644 --- a/nomad/structs/json_encoding.go +++ b/nomad/json/encoding.go @@ -1,4 +1,4 @@ -package structs +package json import ( "reflect" @@ -6,27 +6,6 @@ import ( "github.com/hashicorp/go-msgpack/codec" ) -func init() { - registerExtension(reflect.TypeOf(Node{}), nodeExt) -} - -func nodeExt(v interface{}) interface{} { - node := v.(*Node) - if node == nil { - return nil - } - type NodeAlias Node - return &struct { - *NodeAlias - Drain bool - }{ - NodeAlias: (*NodeAlias)(node), - Drain: node.DrainStrategy != nil, - } -} - -// BOILERPLATE GOES HERE - type extendFunc func(interface{}) interface{} var ( @@ -44,7 +23,8 @@ func (n nomadJsonEncodingExtensions) ConvertExt(v interface{}) interface{} { if fn, ok := extendedTypes[reflect.TypeOf(v)]; ok { return fn(v) } else { - return nil + // shouldn't get here + return v } } @@ -52,7 +32,7 @@ func (n nomadJsonEncodingExtensions) ConvertExt(v interface{}) interface{} { func (n nomadJsonEncodingExtensions) UpdateExt(_ interface{}, _ interface{}) {} func NomadJsonEncodingExtensions(h *codec.JsonHandle) *codec.JsonHandle { - for tpe, _ := range extendedTypes { + for tpe := range extendedTypes { h.SetInterfaceExt(tpe, 1, nomadJsonEncodingExtensions{}) } return h diff --git a/nomad/json/extensions.go b/nomad/json/extensions.go new file mode 100644 index 000000000000..2246c1c01bea --- /dev/null +++ b/nomad/json/extensions.go @@ -0,0 +1,28 @@ +package json + +import ( + "reflect" + + "github.com/hashicorp/nomad/nomad/structs" +) + +func init() { + // TODO: this could be simplified by looking up the base type in the case of a pointer type + registerExtension(reflect.TypeOf(structs.Node{}), nodeExt) + registerExtension(reflect.TypeOf(&structs.Node{}), nodeExt) +} + +func nodeExt(v interface{}) interface{} { + node := v.(*structs.Node) + if node == nil { + return nil + } + type NodeAlias structs.Node + return &struct { + *NodeAlias + Drain bool + }{ + NodeAlias: (*NodeAlias)(node), + Drain: node.DrainStrategy != nil, + } +} diff --git a/nomad/json/handlers/handlers.go b/nomad/json/handlers/handlers.go new file mode 100644 index 000000000000..5ec676eb7165 --- /dev/null +++ b/nomad/json/handlers/handlers.go @@ -0,0 +1,24 @@ +package handlers + +import ( + "github.com/hashicorp/go-msgpack/codec" + + "github.com/hashicorp/nomad/nomad/json" +) + +var ( + // JsonHandle and JsonHandlePretty are the codec handles to JSON encode + // structs. The pretty handle will add indents for easier human consumption. + // JsonHandleWithExtensions and JsonHandlePretty include extensions for + // encoding structs objects with API-specific fields + JsonHandle = &codec.JsonHandle{ + HTMLCharsAsIs: true, + } + JsonHandleWithExtensions = json.NomadJsonEncodingExtensions(&codec.JsonHandle{ + HTMLCharsAsIs: true, + }) + JsonHandlePretty = json.NomadJsonEncodingExtensions(&codec.JsonHandle{ + HTMLCharsAsIs: true, + Indent: 4, + }) +) diff --git a/nomad/stream/ndjson.go b/nomad/stream/ndjson.go index ec69a6c1cb99..9c9ee11be11a 100644 --- a/nomad/stream/ndjson.go +++ b/nomad/stream/ndjson.go @@ -8,6 +8,7 @@ import ( "github.com/hashicorp/go-msgpack/codec" + "github.com/hashicorp/nomad/nomad/json/handlers" "github.com/hashicorp/nomad/nomad/structs" ) @@ -74,7 +75,7 @@ func (n *JsonStream) Send(v interface{}) error { } var buf bytes.Buffer - enc := codec.NewEncoder(&buf, structs.JsonHandleWithExtensions) + enc := codec.NewEncoder(&buf, handlers.JsonHandleWithExtensions) err := enc.Encode(v) if err != nil { return fmt.Errorf("error marshaling json for stream: %w", err) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 222c7b86a893..6a1d6028d6f5 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -10581,23 +10581,6 @@ var MsgpackHandle = func() *codec.MsgpackHandle { return h }() -var ( - // JsonHandle and JsonHandlePretty are the codec handles to JSON encode - // structs. The pretty handle will add indents for easier human consumption. - // JsonHandleWithExtensions and JsonHandlePretty include extensions for - // encoding structs objects with API-specific fields - JsonHandle = &codec.JsonHandle{ - HTMLCharsAsIs: true, - } - JsonHandleWithExtensions = NomadJsonEncodingExtensions(&codec.JsonHandle{ - HTMLCharsAsIs: true, - }) - JsonHandlePretty = NomadJsonEncodingExtensions(&codec.JsonHandle{ - HTMLCharsAsIs: true, - Indent: 4, - }) -) - // Decode is used to decode a MsgPack encoded object func Decode(buf []byte, out interface{}) error { return codec.NewDecoder(bytes.NewReader(buf), MsgpackHandle).Decode(out) From 7d78ffb8d096a29dd66469c997368e5d42d9f941 Mon Sep 17 00:00:00 2001 From: Chris Baker <1675087+cgbaker@users.noreply.github.com> Date: Tue, 23 Mar 2021 17:55:34 +0000 Subject: [PATCH 05/15] added tests that the API doesn't leak Node.SecretID added more documentation on JSON encoding to the contributing guide --- api/event_stream_test.go | 16 ++++++++ api/nodes_test.go | 32 ++++++++++++++++ command/agent/http_test.go | 6 ++- contributing/checklist-jobspec.md | 32 ++++++++++++++-- contributing/checklist-rpc-endpoint.md | 2 +- nomad/state/events_test.go | 53 -------------------------- nomad/structs/structs.go | 1 + 7 files changed, 83 insertions(+), 59 deletions(-) diff --git a/api/event_stream_test.go b/api/event_stream_test.go index 4bfdc8a33d23..aad6377c7d03 100644 --- a/api/event_stream_test.go +++ b/api/event_stream_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/hashicorp/nomad/api/internal/testutil" + "github.com/mitchellh/mapstructure" "github.com/stretchr/testify/require" ) @@ -148,9 +149,24 @@ func TestEventStream_PayloadValue(t *testing.T) { require.NoError(t, err) } for _, e := range event.Events { + // verify that we get a node n, err := e.Node() require.NoError(t, err) require.NotEqual(t, "", n.ID) + + // raw decoding to verify that the node did not contain SecretID + raw := make(map[string]map[string]interface{}, 0) + cfg := &mapstructure.DecoderConfig{ + Result: &raw, + } + + dec, err := mapstructure.NewDecoder(cfg) + require.NoError(t, err) + require.NoError(t, dec.Decode(e.Payload)) + require.Contains(t, raw, "Node") + rawNode := raw["Node"] + require.Equal(t, n.ID, rawNode["ID"]) + require.NotContains(t, rawNode, "SecretID") } case <-time.After(5 * time.Second): require.Fail(t, "failed waiting for event stream event") diff --git a/api/nodes_test.go b/api/nodes_test.go index b0211eddd5d5..5ba60b52cec0 100644 --- a/api/nodes_test.go +++ b/api/nodes_test.go @@ -178,6 +178,38 @@ func TestNodes_Info(t *testing.T) { } } +func TestNodes_NoSecretID(t *testing.T) { + t.Parallel() + c, s := makeClient(t, nil, func(c *testutil.TestServerConfig) { + c.DevMode = true + }) + defer s.Stop() + nodes := c.Nodes() + + // Get the node ID + var nodeID string + testutil.WaitForResult(func() (bool, error) { + out, _, err := nodes.List(nil) + if err != nil { + return false, err + } + if n := len(out); n != 1 { + return false, fmt.Errorf("expected 1 node, got: %d", n) + } + nodeID = out[0].ID + return true, nil + }, func(err error) { + t.Fatalf("err: %s", err) + }) + + // Query the node, ensure that .SecretID was not returned by the HTTP server + resp := make(map[string]interface{}) + _, err := c.query("/v1/node/"+nodeID, &resp, nil) + require.NoError(t, err) + require.Equal(t, nodeID, resp["ID"]) + require.Empty(t, resp["SecretID"]) +} + func TestNodes_ToggleDrain(t *testing.T) { t.Parallel() require := require.New(t) diff --git a/command/agent/http_test.go b/command/agent/http_test.go index ee252ab97c59..1caecbb2d528 100644 --- a/command/agent/http_test.go +++ b/command/agent/http_test.go @@ -29,6 +29,8 @@ import ( "github.com/hashicorp/nomad/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + nomadjson "github.com/hashicorp/nomad/nomad/json/handlers" ) // makeHTTPServer returns a test server whose logs will be written to @@ -312,11 +314,11 @@ func testPrettyPrint(pretty string, prettyFmt bool, t *testing.T) { var expected bytes.Buffer var err error if prettyFmt { - enc := codec.NewEncoder(&expected, structs.JsonHandlePretty) + enc := codec.NewEncoder(&expected, nomadjson.JsonHandlePretty) err = enc.Encode(r) expected.WriteByte('\n') } else { - enc := codec.NewEncoder(&expected, structs.JsonHandle) + enc := codec.NewEncoder(&expected, nomadjson.JsonHandle) err = enc.Encode(r) } if err != nil { diff --git a/contributing/checklist-jobspec.md b/contributing/checklist-jobspec.md index 75c951cf4695..2eb4e1fc56a6 100644 --- a/contributing/checklist-jobspec.md +++ b/contributing/checklist-jobspec.md @@ -13,10 +13,10 @@ * Note that analogous struct field names should match with `api/` package * Test the structs/fields via methods mentioned above * Implement and test other logical methods -* [ ] Add conversion between `api/` and `nomad/structs` in `command/agent/job_endpoint.go` +* [ ] Add conversion between `api/` and `nomad/structs/` in `command/agent/job_endpoint.go` * Add test for conversion - * msgpack [encoding](http://ugorji.net/blog/go-codec-primer#drop-in-replacement-for-encoding-json-json-key-in-struct-tag-supported) only uses the [`codec` tag](https://github.com/hashicorp/nomad/blob/v1.0.0/nomad/structs/structs.go#L10557-L10558); - the `json` tag is available for customizing API output when encoding `structs` objects +* [ ] Determine JSON encoding strategy for responses from RPC (see "JSON Encoding" below) + * [ ] Write `nomad/structs/` to `api/` conversions if necessary and write tests * [ ] Implement diff logic for new structs/fields in `nomad/structs/diff.go` * Note that fields must be listed in alphabetical order in `FieldDiff` slices in `nomad/structs/diff_test.go` * Add test for diff of new structs/fields @@ -42,3 +42,29 @@ required in the original `jobspec` package. * [ ] Job JSON API entry https://www.nomadproject.io/api/json-jobs.html * [ ] Sample Response output in API https://www.nomadproject.io/api/jobs.html * [ ] Consider if it needs a guide https://www.nomadproject.io/guides/index.html + +## JSON Encoding + +As a general rule, HTTP endpoints (under `command/agent/`) will make RPC calls that return structs belonging to +`nomad/structs/`. These handlers ultimately return an object that is encoded by the Nomad HTTP server. The encoded form +needs to match the Nomad API; specifically, it should have the form of the corresponding struct from `api/`. There are +a few ways that this can be accomplished: +* directly return the struct from the RPC call, if it has the same shape as the corresponding struct in `api/`. + This is convenient when possible, resulting in the least work for the developer. + Examples of this approach include [GET `/v1/evaluation/:id`](https://github.com/hashicorp/nomad/blob/v1.0. + 0/command/agent/eval_endpoint.go#L88). +* convert the struct from the RPC call to the appropriate `api/` struct. + This approach is the most developer effort, but it does have a strong guarantee that the HTTP response matches the + API, due to the explicit conversion (assuming proper implementation, which requires tests). + Examples of this approach include [GET `/v1/volume/csi/:id`](https://github.com/hashicorp/nomad/blob/v1.0.0/command/agent/csi_endpoint.go#L108) +* convert to an intermediate struct with the same shape as the `api/` struct. + This approach strikes a balance between the former two approaches. + This conversion can be performed in-situ in the agent HTTP handler, as long as the conversion doesn't need to + appear in other handlers. + Otherwise, it is possible to register an extension on the JSON encoding used by the HTTP agent; these extensions + can be put in `nomad/json/extensions.go`. + +Note, for simple transformations to encoding (like renaming or neglecting fields), we can use field tags on the structs +from `nomad/structs`. Our msgpack [encoding](http://ugorji.net/blog/go-codec-primer#drop-in-replacement-for-encoding-json-json-key-in-struct-tag-supported) +only uses the [`codec` tag](https://github.com/hashicorp/nomad/blob/v1.0.0/nomad/structs/structs.go#L10557-L10558). +Therefore, the `json` tag is available for customizing API output when encoding `structs` objects. See `structs.Node.SecretID`, for example. diff --git a/contributing/checklist-rpc-endpoint.md b/contributing/checklist-rpc-endpoint.md index 37e9ff50d2a2..b21fcfb60b75 100644 --- a/contributing/checklist-rpc-endpoint.md +++ b/contributing/checklist-rpc-endpoint.md @@ -30,7 +30,7 @@ Prefer adding a new message to changing any existing RPC messages. * [ ] `nomad/core_sched.go` sends many RPCs * `ServersMeetMinimumVersion` asserts that the server cluster is - upgraded, so use this to gaurd sending the new RPC, else send the old RPC + upgraded, so use this to guard sending the new RPC, else send the old RPC * Version must match the actual release version! ## Docs diff --git a/nomad/state/events_test.go b/nomad/state/events_test.go index 13271257980c..1ae6edf62319 100644 --- a/nomad/state/events_test.go +++ b/nomad/state/events_test.go @@ -94,59 +94,6 @@ func TestEventFromChange_ACLTokenSecretID(t *testing.T) { require.Empty(t, tokenEvent2.ACLToken.SecretID) } -// TestEventFromChange_NodeSecretID ensures that a node's secret ID is not -// included in a node event -func TestEventFromChange_NodeSecretID(t *testing.T) { - t.Parallel() - s := TestStateStoreCfg(t, TestStateStorePublisher(t)) - defer s.StopEventBroker() - - node := mock.Node() - require.NotEmpty(t, node.SecretID) - - // Create - changes := Changes{ - Index: 100, - MsgType: structs.NodeRegisterRequestType, - Changes: memdb.Changes{ - { - Table: "nodes", - Before: nil, - After: node, - }, - }, - } - - out := eventsFromChanges(s.db.ReadTxn(), changes) - require.Len(t, out.Events, 1) - - _, ok := out.Events[0].Payload.(*structs.NodeStreamEvent) - require.True(t, ok) - // TODO: cgbaker: do we really want to remove this check? - // require.Empty(t, nodeEvent.Node.SecretID) - - // Delete - changes = Changes{ - Index: 100, - MsgType: structs.NodeDeregisterRequestType, - Changes: memdb.Changes{ - { - Table: "nodes", - Before: node, - After: nil, - }, - }, - } - - out2 := eventsFromChanges(s.db.ReadTxn(), changes) - require.Len(t, out2.Events, 1) - - _, ok = out2.Events[0].Payload.(*structs.NodeStreamEvent) - require.True(t, ok) - // TODO: cgbaker: do we really want to remove this check? - // require.Empty(t, nodeEvent2.Node.SecretID) -} - func TestEventsFromChanges_DeploymentUpdate(t *testing.T) { t.Parallel() s := TestStateStoreCfg(t, TestStateStorePublisher(t)) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 6a1d6028d6f5..5ff0171d03a8 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1870,6 +1870,7 @@ type Node struct { ComputedClass string // DrainStrategy determines the node's draining behavior. + // Will be non-nil only while draining. DrainStrategy *DrainStrategy // SchedulingEligibility determines whether this node will receive new From bc90436b31a2a2fc70401a050e8b2f8f1ae441d5 Mon Sep 17 00:00:00 2001 From: Chris Baker <1675087+cgbaker@users.noreply.github.com> Date: Tue, 23 Mar 2021 18:13:10 +0000 Subject: [PATCH 06/15] change to fail-safe in json encoding --- nomad/json/encoding.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/nomad/json/encoding.go b/nomad/json/encoding.go index e5ff82d9158b..5ca3cea57844 100644 --- a/nomad/json/encoding.go +++ b/nomad/json/encoding.go @@ -23,8 +23,9 @@ func (n nomadJsonEncodingExtensions) ConvertExt(v interface{}) interface{} { if fn, ok := extendedTypes[reflect.TypeOf(v)]; ok { return fn(v) } else { - // shouldn't get here - return v + // shouldn't get here, but returning v will probably result in an infinite loop + // return nil and erase this field + return nil } } From 6043594e8d2d4f16a559656964175bb9de321aa4 Mon Sep 17 00:00:00 2001 From: Chris Baker <1675087+cgbaker@users.noreply.github.com> Date: Tue, 23 Mar 2021 18:18:51 +0000 Subject: [PATCH 07/15] some comments on the new json extensions/encoding --- nomad/json/encoding.go | 10 +++++++++- nomad/json/extensions.go | 2 ++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/nomad/json/encoding.go b/nomad/json/encoding.go index 5ca3cea57844..232efbb2b9d3 100644 --- a/nomad/json/encoding.go +++ b/nomad/json/encoding.go @@ -6,16 +6,21 @@ import ( "github.com/hashicorp/go-msgpack/codec" ) +// extendFunc is a mapping from one struct to another, to change the shape of the encoded JSON type extendFunc func(interface{}) interface{} var ( + // extendedTypes is a mapping of extended types to their extension function extendedTypes = map[reflect.Type]extendFunc{} ) +// registerExtension registers an extension function against a particular type func registerExtension(tpe reflect.Type, ext extendFunc) { extendedTypes[tpe] = ext } +// nomadJsonEncodingExtensions is a catch-all go-msgpack extension +// it looks up the types in the list of registered extension functions and applies it type nomadJsonEncodingExtensions struct{} // ConvertExt calls the registered conversions functions @@ -29,9 +34,12 @@ func (n nomadJsonEncodingExtensions) ConvertExt(v interface{}) interface{} { } } -// UpdateExt is not used +// UpdateExt is required by go-msgpack, but not used by us func (n nomadJsonEncodingExtensions) UpdateExt(_ interface{}, _ interface{}) {} +// NomadJsonEncodingExtensions registers all extension functions against the +// provided JsonHandle. +// It should be called on any JsonHandle which is used by the API HTTP server. func NomadJsonEncodingExtensions(h *codec.JsonHandle) *codec.JsonHandle { for tpe := range extendedTypes { h.SetInterfaceExt(tpe, 1, nomadJsonEncodingExtensions{}) diff --git a/nomad/json/extensions.go b/nomad/json/extensions.go index 2246c1c01bea..ec4be6c674e1 100644 --- a/nomad/json/extensions.go +++ b/nomad/json/extensions.go @@ -6,12 +6,14 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) +// init register all extensions used by the API HTTP server when encoding JSON func init() { // TODO: this could be simplified by looking up the base type in the case of a pointer type registerExtension(reflect.TypeOf(structs.Node{}), nodeExt) registerExtension(reflect.TypeOf(&structs.Node{}), nodeExt) } +// nodeExt adds the legacy field .Drain back to encoded Node objects func nodeExt(v interface{}) interface{} { node := v.(*structs.Node) if node == nil { From 23c75b0c206a764e9494347f695f587c5187052b Mon Sep 17 00:00:00 2001 From: Chris Baker <1675087+cgbaker@users.noreply.github.com> Date: Tue, 23 Mar 2021 20:23:06 +0000 Subject: [PATCH 08/15] added benchmark test for JSON encoding extensions --- command/agent/http_test.go | 27 ++++++++++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/command/agent/http_test.go b/command/agent/http_test.go index 1caecbb2d528..3a8e61ef3eb4 100644 --- a/command/agent/http_test.go +++ b/command/agent/http_test.go @@ -30,6 +30,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/hashicorp/nomad/nomad/json/handlers" nomadjson "github.com/hashicorp/nomad/nomad/json/handlers" ) @@ -318,7 +319,7 @@ func testPrettyPrint(pretty string, prettyFmt bool, t *testing.T) { err = enc.Encode(r) expected.WriteByte('\n') } else { - enc := codec.NewEncoder(&expected, nomadjson.JsonHandle) + enc := codec.NewEncoder(&expected, nomadjson.JsonHandleWithExtensions) err = enc.Encode(r) } if err != nil { @@ -1245,6 +1246,30 @@ func Test_decodeBody(t *testing.T) { } } +// BenchmarkHTTPServer_JSONEncodingWithExtensions benchmarks the performance of +// encoding JSON objects using extensions +func BenchmarkHTTPServer_JSONEncodingWithExtensions(b *testing.B) { + benchmarkJsonEncoding(b, handlers.JsonHandleWithExtensions) +} + +// BenchmarkHTTPServer_JSONEncodingWithoutExtensions benchmarks the performance of +// encoding JSON objects using extensions +func BenchmarkHTTPServer_JSONEncodingWithoutExtensions(b *testing.B) { + benchmarkJsonEncoding(b, handlers.JsonHandle) +} + +func benchmarkJsonEncoding(b *testing.B, handle *codec.JsonHandle) { + n := mock.Node() + var buf bytes.Buffer + + enc := codec.NewEncoder(&buf, handle) + for i := 0; i < b.N; i++ { + buf.Reset() + err := enc.Encode(n) + require.NoError(b, err) + } +} + func httpTest(t testing.TB, cb func(c *Config), f func(srv *TestAgent)) { s := makeHTTPServer(t, cb) defer s.Shutdown() From e0b5320aca55f5336002d699ff88a975d8d990c1 Mon Sep 17 00:00:00 2001 From: Chris Baker <1675087+cgbaker@users.noreply.github.com> Date: Tue, 23 Mar 2021 20:33:56 +0000 Subject: [PATCH 09/15] t push changelog for #10202 --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index d5c20f5a964b..7f930f7c4eaf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ IMPROVEMENTS: * csi: Added support for jobs to request a unique volume ID per allocation. [[GH-10136](https://github.com/hashicorp/nomad/issues/10136)] * driver/docker: Added support for optional extra container labels. [[GH-9885](https://github.com/hashicorp/nomad/issues/9885)] * driver/docker: Added support for configuring default logger behavior in the client configuration. [[GH-10156](https://github.com/hashicorp/nomad/issues/10156)] + * nomad/structs: Removed deprecated Node.Drain field, added API extensions to restore it [[GH-10202](https://github.com/hashicorp/nomad/issues/10202)] ## 1.0.4 (February 24, 2021) From 4e7d84e129f18d1de83b11befee0bb0249cf3164 Mon Sep 17 00:00:00 2001 From: Chris Baker <1675087+cgbaker@users.noreply.github.com> Date: Wed, 24 Mar 2021 16:36:18 +0000 Subject: [PATCH 10/15] additional consistency checking on nodes api --- api/nodes_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/api/nodes_test.go b/api/nodes_test.go index 5ba60b52cec0..7772eabc8efb 100644 --- a/api/nodes_test.go +++ b/api/nodes_test.go @@ -266,6 +266,8 @@ func TestNodes_ToggleDrain(t *testing.T) { for _, e := range events.Events { node, err := e.Node() require.NoError(err) + require.Equal(node.DrainStrategy != nil, node.Drain) + require.True(!node.Drain || node.SchedulingEligibility == NodeSchedulingIneligible) // node.Drain => "ineligible" if node.Drain && node.SchedulingEligibility == NodeSchedulingIneligible { sawDraining = node.ModifyIndex } else if sawDraining != 0 && node.ModifyIndex > sawDraining && From c5731ebeccde89811abd9869fd37b3a62669ce5a Mon Sep 17 00:00:00 2001 From: Chris Baker <1675087+cgbaker@users.noreply.github.com> Date: Fri, 26 Mar 2021 11:07:15 +0000 Subject: [PATCH 11/15] squash --- api/event_stream_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/api/event_stream_test.go b/api/event_stream_test.go index aad6377c7d03..5cb2464c5de0 100644 --- a/api/event_stream_test.go +++ b/api/event_stream_test.go @@ -152,9 +152,11 @@ func TestEventStream_PayloadValue(t *testing.T) { // verify that we get a node n, err := e.Node() require.NoError(t, err) - require.NotEqual(t, "", n.ID) + require.NotEmpty(t, n.ID) - // raw decoding to verify that the node did not contain SecretID + // perform a raw decoding and look for: + // - "ID", to make sure that raw decoding is correct + // - "SecretID", to make sure it's not present raw := make(map[string]map[string]interface{}, 0) cfg := &mapstructure.DecoderConfig{ Result: &raw, From a52f32dedc0089fcc28acd7d33db070d8456f987 Mon Sep 17 00:00:00 2001 From: Chris Baker <1675087+cgbaker@users.noreply.github.com> Date: Fri, 26 Mar 2021 17:03:15 +0000 Subject: [PATCH 12/15] restored Node.Sanitize() for RPC endpoints multiple other updates from code review --- api/event_stream_test.go | 5 ++- api/nodes_test.go | 4 ++- client/agent_endpoint.go | 4 +-- client/alloc_endpoint.go | 4 +-- client/fs_endpoint.go | 6 ++-- command/agent/http.go | 6 ++-- command/agent/http_test.go | 11 +++---- contributing/checklist-jobspec.md | 7 +--- helper/pluginutils/hclutils/testing.go | 4 +-- helper/pluginutils/hclutils/util.go | 4 +-- nomad/client_agent_endpoint.go | 4 +-- nomad/json/extensions.go | 30 ----------------- nomad/{json => jsonhandles}/encoding.go | 12 +------ nomad/jsonhandles/extensions.go | 33 +++++++++++++++++++ .../handlers => jsonhandles}/handlers.go | 8 ++--- nomad/node_endpoint.go | 1 + nomad/node_endpoint_test.go | 1 + nomad/plan_apply.go | 5 +-- nomad/state/events.go | 2 ++ nomad/stream/ndjson.go | 4 +-- nomad/structs/structs.go | 32 +++++++++++++----- nomad/structs/structs_test.go | 25 ++++++++++++++ scheduler/util.go | 8 +---- 23 files changed, 121 insertions(+), 99 deletions(-) delete mode 100644 nomad/json/extensions.go rename nomad/{json => jsonhandles}/encoding.go (80%) create mode 100644 nomad/jsonhandles/extensions.go rename nomad/{json/handlers => jsonhandles}/handlers.go (69%) diff --git a/api/event_stream_test.go b/api/event_stream_test.go index 5cb2464c5de0..e5f3492da4f2 100644 --- a/api/event_stream_test.go +++ b/api/event_stream_test.go @@ -155,13 +155,12 @@ func TestEventStream_PayloadValue(t *testing.T) { require.NotEmpty(t, n.ID) // perform a raw decoding and look for: - // - "ID", to make sure that raw decoding is correct - // - "SecretID", to make sure it's not present + // - "ID" to make sure that raw decoding is working correctly + // - "SecretID" to make sure it's not present raw := make(map[string]map[string]interface{}, 0) cfg := &mapstructure.DecoderConfig{ Result: &raw, } - dec, err := mapstructure.NewDecoder(cfg) require.NoError(t, err) require.NoError(t, dec.Decode(e.Payload)) diff --git a/api/nodes_test.go b/api/nodes_test.go index 7772eabc8efb..7ed2b956604c 100644 --- a/api/nodes_test.go +++ b/api/nodes_test.go @@ -202,7 +202,9 @@ func TestNodes_NoSecretID(t *testing.T) { t.Fatalf("err: %s", err) }) - // Query the node, ensure that .SecretID was not returned by the HTTP server + // perform a raw http call and make sure that: + // - "ID" to make sure that raw decoding is working correctly + // - "SecretID" to make sure it's not present resp := make(map[string]interface{}) _, err := c.query("/v1/node/"+nodeID, &resp, nil) require.NoError(t, err) diff --git a/client/agent_endpoint.go b/client/agent_endpoint.go index 5840217ee2c1..01e12d818cb9 100644 --- a/client/agent_endpoint.go +++ b/client/agent_endpoint.go @@ -13,7 +13,7 @@ import ( "github.com/hashicorp/nomad/command/agent/monitor" "github.com/hashicorp/nomad/command/agent/pprof" "github.com/hashicorp/nomad/helper" - "github.com/hashicorp/nomad/nomad/json/handlers" + "github.com/hashicorp/nomad/nomad/jsonhandles" "github.com/hashicorp/nomad/nomad/structs" metrics "github.com/armon/go-metrics" @@ -124,7 +124,7 @@ func (a *Agent) monitor(conn io.ReadWriteCloser) { frames := make(chan *sframer.StreamFrame, streamFramesBuffer) errCh := make(chan error) var buf bytes.Buffer - frameCodec := codec.NewEncoder(&buf, handlers.JsonHandle) + frameCodec := codec.NewEncoder(&buf, jsonhandles.JsonHandle) framer := sframer.NewStreamFramer(frames, 1*time.Second, 200*time.Millisecond, 1024) framer.Run() diff --git a/client/alloc_endpoint.go b/client/alloc_endpoint.go index d9944fb2e9e7..4427c7b52f3b 100644 --- a/client/alloc_endpoint.go +++ b/client/alloc_endpoint.go @@ -15,7 +15,7 @@ import ( cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/uuid" - "github.com/hashicorp/nomad/nomad/json/handlers" + "github.com/hashicorp/nomad/nomad/jsonhandles" "github.com/hashicorp/nomad/nomad/structs" nstructs "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/drivers" @@ -281,7 +281,7 @@ func newExecStream(decoder *codec.Decoder, encoder *codec.Encoder) drivers.ExecT buf: buf, encoder: encoder, - frameCodec: codec.NewEncoder(buf, handlers.JsonHandle), + frameCodec: codec.NewEncoder(buf, jsonhandles.JsonHandle), } } diff --git a/client/fs_endpoint.go b/client/fs_endpoint.go index 1ceae24807f2..16980c170f11 100644 --- a/client/fs_endpoint.go +++ b/client/fs_endpoint.go @@ -24,7 +24,7 @@ import ( sframer "github.com/hashicorp/nomad/client/lib/streamframer" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/helper" - "github.com/hashicorp/nomad/nomad/json/handlers" + "github.com/hashicorp/nomad/nomad/jsonhandles" "github.com/hashicorp/nomad/nomad/structs" ) @@ -239,7 +239,7 @@ func (f *FileSystem) stream(conn io.ReadWriteCloser) { frames := make(chan *sframer.StreamFrame, streamFramesBuffer) errCh := make(chan error) var buf bytes.Buffer - frameCodec := codec.NewEncoder(&buf, handlers.JsonHandle) + frameCodec := codec.NewEncoder(&buf, jsonhandles.JsonHandle) // Create the framer framer := sframer.NewStreamFramer(frames, streamHeartbeatRate, streamBatchWindow, streamFrameSize) @@ -470,7 +470,7 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) { var streamErr error buf := new(bytes.Buffer) - frameCodec := codec.NewEncoder(buf, handlers.JsonHandle) + frameCodec := codec.NewEncoder(buf, jsonhandles.JsonHandle) OUTER: for { select { diff --git a/command/agent/http.go b/command/agent/http.go index d937dd2f333c..cf6e9bd17729 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -24,7 +24,7 @@ import ( "github.com/hashicorp/nomad/helper/noxssrw" "github.com/hashicorp/nomad/helper/tlsutil" - "github.com/hashicorp/nomad/nomad/json/handlers" + "github.com/hashicorp/nomad/nomad/jsonhandles" "github.com/hashicorp/nomad/nomad/structs" ) @@ -496,13 +496,13 @@ func (s *HTTPServer) wrap(handler func(resp http.ResponseWriter, req *http.Reque if obj != nil { var buf bytes.Buffer if prettyPrint { - enc := codec.NewEncoder(&buf, handlers.JsonHandlePretty) + enc := codec.NewEncoder(&buf, jsonhandles.JsonHandlePretty) err = enc.Encode(obj) if err == nil { buf.Write([]byte("\n")) } } else { - enc := codec.NewEncoder(&buf, handlers.JsonHandleWithExtensions) + enc := codec.NewEncoder(&buf, jsonhandles.JsonHandleWithExtensions) err = enc.Encode(obj) } if err != nil { diff --git a/command/agent/http_test.go b/command/agent/http_test.go index 3a8e61ef3eb4..4315e16f2a1b 100644 --- a/command/agent/http_test.go +++ b/command/agent/http_test.go @@ -30,8 +30,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/hashicorp/nomad/nomad/json/handlers" - nomadjson "github.com/hashicorp/nomad/nomad/json/handlers" + "github.com/hashicorp/nomad/nomad/jsonhandles" ) // makeHTTPServer returns a test server whose logs will be written to @@ -315,11 +314,11 @@ func testPrettyPrint(pretty string, prettyFmt bool, t *testing.T) { var expected bytes.Buffer var err error if prettyFmt { - enc := codec.NewEncoder(&expected, nomadjson.JsonHandlePretty) + enc := codec.NewEncoder(&expected, jsonhandles.JsonHandlePretty) err = enc.Encode(r) expected.WriteByte('\n') } else { - enc := codec.NewEncoder(&expected, nomadjson.JsonHandleWithExtensions) + enc := codec.NewEncoder(&expected, jsonhandles.JsonHandleWithExtensions) err = enc.Encode(r) } if err != nil { @@ -1249,13 +1248,13 @@ func Test_decodeBody(t *testing.T) { // BenchmarkHTTPServer_JSONEncodingWithExtensions benchmarks the performance of // encoding JSON objects using extensions func BenchmarkHTTPServer_JSONEncodingWithExtensions(b *testing.B) { - benchmarkJsonEncoding(b, handlers.JsonHandleWithExtensions) + benchmarkJsonEncoding(b, jsonhandles.JsonHandleWithExtensions) } // BenchmarkHTTPServer_JSONEncodingWithoutExtensions benchmarks the performance of // encoding JSON objects using extensions func BenchmarkHTTPServer_JSONEncodingWithoutExtensions(b *testing.B) { - benchmarkJsonEncoding(b, handlers.JsonHandle) + benchmarkJsonEncoding(b, jsonhandles.JsonHandle) } func benchmarkJsonEncoding(b *testing.B, handle *codec.JsonHandle) { diff --git a/contributing/checklist-jobspec.md b/contributing/checklist-jobspec.md index 2eb4e1fc56a6..82318b43a7b1 100644 --- a/contributing/checklist-jobspec.md +++ b/contributing/checklist-jobspec.md @@ -62,9 +62,4 @@ a few ways that this can be accomplished: This conversion can be performed in-situ in the agent HTTP handler, as long as the conversion doesn't need to appear in other handlers. Otherwise, it is possible to register an extension on the JSON encoding used by the HTTP agent; these extensions - can be put in `nomad/json/extensions.go`. - -Note, for simple transformations to encoding (like renaming or neglecting fields), we can use field tags on the structs -from `nomad/structs`. Our msgpack [encoding](http://ugorji.net/blog/go-codec-primer#drop-in-replacement-for-encoding-json-json-key-in-struct-tag-supported) -only uses the [`codec` tag](https://github.com/hashicorp/nomad/blob/v1.0.0/nomad/structs/structs.go#L10557-L10558). -Therefore, the `json` tag is available for customizing API output when encoding `structs` objects. See `structs.Node.SecretID`, for example. + can be put in `nomad/json/extensions.go`. \ No newline at end of file diff --git a/helper/pluginutils/hclutils/testing.go b/helper/pluginutils/hclutils/testing.go index 50f2ca0e278b..3d587e360a4c 100644 --- a/helper/pluginutils/hclutils/testing.go +++ b/helper/pluginutils/hclutils/testing.go @@ -11,7 +11,7 @@ import ( "github.com/zclconf/go-cty/cty" "github.com/hashicorp/nomad/helper/pluginutils/hclspecutils" - "github.com/hashicorp/nomad/nomad/json/handlers" + "github.com/hashicorp/nomad/nomad/jsonhandles" "github.com/hashicorp/nomad/plugins/drivers" "github.com/hashicorp/nomad/plugins/shared/hclspec" ) @@ -122,7 +122,7 @@ func JsonConfigToInterface(t *testing.T, config string) interface{} { t.Helper() // Decode from json - dec := codec.NewDecoderBytes([]byte(config), handlers.JsonHandle) + dec := codec.NewDecoderBytes([]byte(config), jsonhandles.JsonHandle) var m map[string]interface{} err := dec.Decode(&m) diff --git a/helper/pluginutils/hclutils/util.go b/helper/pluginutils/hclutils/util.go index 460ab875b0f2..c6375c4387d1 100644 --- a/helper/pluginutils/hclutils/util.go +++ b/helper/pluginutils/hclutils/util.go @@ -10,7 +10,7 @@ import ( "github.com/hashicorp/hcl/v2/hcldec" hjson "github.com/hashicorp/hcl/v2/json" - "github.com/hashicorp/nomad/nomad/json/handlers" + "github.com/hashicorp/nomad/nomad/jsonhandles" "github.com/zclconf/go-cty/cty" "github.com/zclconf/go-cty/cty/function" @@ -28,7 +28,7 @@ func ParseHclInterface(val interface{}, spec hcldec.Spec, vars map[string]cty.Va // Encode to json var buf bytes.Buffer - enc := codec.NewEncoder(&buf, handlers.JsonHandle) + enc := codec.NewEncoder(&buf, jsonhandles.JsonHandle) err := enc.Encode(val) if err != nil { // Convert to a hcl diagnostics message diff --git a/nomad/client_agent_endpoint.go b/nomad/client_agent_endpoint.go index 7e381f0d19d8..59d8242d1442 100644 --- a/nomad/client_agent_endpoint.go +++ b/nomad/client_agent_endpoint.go @@ -17,7 +17,7 @@ import ( "github.com/hashicorp/nomad/command/agent/monitor" "github.com/hashicorp/nomad/command/agent/pprof" "github.com/hashicorp/nomad/helper" - "github.com/hashicorp/nomad/nomad/json/handlers" + "github.com/hashicorp/nomad/nomad/jsonhandles" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/go-msgpack/codec" @@ -187,7 +187,7 @@ func (a *Agent) monitor(conn io.ReadWriteCloser) { frames := make(chan *sframer.StreamFrame, 32) errCh := make(chan error) var buf bytes.Buffer - frameCodec := codec.NewEncoder(&buf, handlers.JsonHandle) + frameCodec := codec.NewEncoder(&buf, jsonhandles.JsonHandle) framer := sframer.NewStreamFramer(frames, 1*time.Second, 200*time.Millisecond, 1024) framer.Run() diff --git a/nomad/json/extensions.go b/nomad/json/extensions.go deleted file mode 100644 index ec4be6c674e1..000000000000 --- a/nomad/json/extensions.go +++ /dev/null @@ -1,30 +0,0 @@ -package json - -import ( - "reflect" - - "github.com/hashicorp/nomad/nomad/structs" -) - -// init register all extensions used by the API HTTP server when encoding JSON -func init() { - // TODO: this could be simplified by looking up the base type in the case of a pointer type - registerExtension(reflect.TypeOf(structs.Node{}), nodeExt) - registerExtension(reflect.TypeOf(&structs.Node{}), nodeExt) -} - -// nodeExt adds the legacy field .Drain back to encoded Node objects -func nodeExt(v interface{}) interface{} { - node := v.(*structs.Node) - if node == nil { - return nil - } - type NodeAlias structs.Node - return &struct { - *NodeAlias - Drain bool - }{ - NodeAlias: (*NodeAlias)(node), - Drain: node.DrainStrategy != nil, - } -} diff --git a/nomad/json/encoding.go b/nomad/jsonhandles/encoding.go similarity index 80% rename from nomad/json/encoding.go rename to nomad/jsonhandles/encoding.go index 232efbb2b9d3..004923b980b5 100644 --- a/nomad/json/encoding.go +++ b/nomad/jsonhandles/encoding.go @@ -1,4 +1,4 @@ -package json +package jsonhandles import ( "reflect" @@ -9,16 +9,6 @@ import ( // extendFunc is a mapping from one struct to another, to change the shape of the encoded JSON type extendFunc func(interface{}) interface{} -var ( - // extendedTypes is a mapping of extended types to their extension function - extendedTypes = map[reflect.Type]extendFunc{} -) - -// registerExtension registers an extension function against a particular type -func registerExtension(tpe reflect.Type, ext extendFunc) { - extendedTypes[tpe] = ext -} - // nomadJsonEncodingExtensions is a catch-all go-msgpack extension // it looks up the types in the list of registered extension functions and applies it type nomadJsonEncodingExtensions struct{} diff --git a/nomad/jsonhandles/extensions.go b/nomad/jsonhandles/extensions.go new file mode 100644 index 000000000000..ab8ab93d70de --- /dev/null +++ b/nomad/jsonhandles/extensions.go @@ -0,0 +1,33 @@ +package jsonhandles + +import ( + "reflect" + + "github.com/hashicorp/nomad/nomad/structs" +) + +var ( + // extendedTypes is a mapping of extended types to their extension function + // TODO: the duplicates could be simplified by looking up the base type in the case of a pointer type in ConvertExt + extendedTypes = map[reflect.Type]extendFunc{ + reflect.TypeOf(structs.Node{}): nodeExt, + reflect.TypeOf(&structs.Node{}): nodeExt, + } +) + +// nodeExt ensures the node is sanitized and adds the legacy field .Drain back to encoded Node objects +func nodeExt(v interface{}) interface{} { + node := v.(*structs.Node).Sanitize() + // transform to a struct with inlined Node fields plus the Drain field + // - using defined type (not an alias!) EmbeddedNode gives us free conversion to a distinct type + // - distinct type prevents this encoding extension from being called recursively/infinitely on the embedding + // - pointers mean the conversion function doesn't have to make a copy during conversion + type EmbeddedNode structs.Node + return &struct { + *EmbeddedNode + Drain bool + }{ + EmbeddedNode: (*EmbeddedNode)(node), + Drain: node != nil && node.DrainStrategy != nil, + } +} diff --git a/nomad/json/handlers/handlers.go b/nomad/jsonhandles/handlers.go similarity index 69% rename from nomad/json/handlers/handlers.go rename to nomad/jsonhandles/handlers.go index 5ec676eb7165..92a402f8e902 100644 --- a/nomad/json/handlers/handlers.go +++ b/nomad/jsonhandles/handlers.go @@ -1,9 +1,7 @@ -package handlers +package jsonhandles import ( "github.com/hashicorp/go-msgpack/codec" - - "github.com/hashicorp/nomad/nomad/json" ) var ( @@ -14,10 +12,10 @@ var ( JsonHandle = &codec.JsonHandle{ HTMLCharsAsIs: true, } - JsonHandleWithExtensions = json.NomadJsonEncodingExtensions(&codec.JsonHandle{ + JsonHandleWithExtensions = NomadJsonEncodingExtensions(&codec.JsonHandle{ HTMLCharsAsIs: true, }) - JsonHandlePretty = json.NomadJsonEncodingExtensions(&codec.JsonHandle{ + JsonHandlePretty = NomadJsonEncodingExtensions(&codec.JsonHandle{ HTMLCharsAsIs: true, Indent: 4, }) diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index 08907a307ebf..184ef1f56c8a 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -801,6 +801,7 @@ func (n *Node) GetNode(args *structs.NodeSpecificRequest, // Setup the output if out != nil { + out = out.Sanitize() reply.Node = out reply.Index = out.ModifyIndex } else { diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index ebcacf98ea99..c26945832c7b 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -1314,6 +1314,7 @@ func TestClientEndpoint_GetNode(t *testing.T) { // Update the status updated at value node.StatusUpdatedAt = resp2.Node.StatusUpdatedAt + node.SecretID = "" node.Events = resp2.Node.Events if !reflect.DeepEqual(node, resp2.Node) { t.Fatalf("bad: %#v \n %#v", node, resp2.Node) diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index d8c93b2be487..b566736a4955 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -647,10 +647,7 @@ func evaluateNodePlan(snap *state.StateSnapshot, plan *structs.Plan, nodeID stri } else if node.Status != structs.NodeStatusReady { return false, "node is not ready for placements", nil } else if node.SchedulingEligibility == structs.NodeSchedulingIneligible { - return false, "node is not eligible for draining", nil - } else if node.DrainStrategy != nil { - // Deprecate in favor of scheduling eligibility and remove post-0.8 - return false, "node is draining", nil + return false, "node is not eligible", nil } // Get the existing allocations that are non-terminal diff --git a/nomad/state/events.go b/nomad/state/events.go index 626bc4f40aea..82bcdbc9c95d 100644 --- a/nomad/state/events.go +++ b/nomad/state/events.go @@ -80,6 +80,7 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) { return structs.Event{}, false } + before = before.Sanitize() return structs.Event{ Topic: structs.TopicNode, Key: before.ID, @@ -175,6 +176,7 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) { return structs.Event{}, false } + after = after.Sanitize() return structs.Event{ Topic: structs.TopicNode, Key: after.ID, diff --git a/nomad/stream/ndjson.go b/nomad/stream/ndjson.go index 9c9ee11be11a..3e806c26ea7a 100644 --- a/nomad/stream/ndjson.go +++ b/nomad/stream/ndjson.go @@ -8,7 +8,7 @@ import ( "github.com/hashicorp/go-msgpack/codec" - "github.com/hashicorp/nomad/nomad/json/handlers" + "github.com/hashicorp/nomad/nomad/jsonhandles" "github.com/hashicorp/nomad/nomad/structs" ) @@ -75,7 +75,7 @@ func (n *JsonStream) Send(v interface{}) error { } var buf bytes.Buffer - enc := codec.NewEncoder(&buf, handlers.JsonHandleWithExtensions) + enc := codec.NewEncoder(&buf, jsonhandles.JsonHandleWithExtensions) err := enc.Encode(v) if err != nil { return fmt.Errorf("error marshaling json for stream: %w", err) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 5ff0171d03a8..a8dea8bf7571 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1907,6 +1907,20 @@ type Node struct { ModifyIndex uint64 } +// Sanitize returns a copy of the Node omitting confidential fields +// It only returns a copy if the Node contains the confidential fields +func (n *Node) Sanitize() *Node { + if n == nil { + return nil + } + if n.SecretID == "" { + return n + } + clean := n.Copy() + clean.SecretID = "" + return clean +} + // Ready returns true if the node is ready for running allocations func (n *Node) Ready() bool { return n.Status == NodeStatusReady && n.DrainStrategy == nil && n.SchedulingEligibility == NodeSchedulingEligible @@ -1917,6 +1931,16 @@ func (n *Node) Canonicalize() { return } + // Ensure SchedulingEligibility is set whenever draining so the plan applier and other scheduling logic only need + // to check SchedulingEligibility when determining whether a placement is feasible on a node. + if n.SchedulingEligibility == "" { + if n.DrainStrategy != nil { + n.SchedulingEligibility = NodeSchedulingIneligible + } else { + n.SchedulingEligibility = NodeSchedulingEligible + } + } + // COMPAT remove in 1.0 // In v0.12.0 we introduced a separate node specific network resource struct // so we need to covert any pre 0.12 clients to the correct struct @@ -1940,14 +1964,6 @@ func (n *Node) Canonicalize() { } } } - - if n.SchedulingEligibility == "" { - if n.DrainStrategy != nil { - n.SchedulingEligibility = NodeSchedulingIneligible - } else { - n.SchedulingEligibility = NodeSchedulingEligible - } - } } func (n *Node) Copy() *Node { diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index cd247236f42d..a37f47a4be84 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -5601,6 +5601,31 @@ func TestNode_Copy(t *testing.T) { require.Equal(node.Drivers, node2.Drivers) } +func TestNode_Sanitize(t *testing.T) { + require := require.New(t) + + testCases := []*Node{ + nil, + { + ID: uuid.Generate(), + SecretID: "", + }, + { + ID: uuid.Generate(), + SecretID: uuid.Generate(), + }, + } + for _, tc := range testCases { + sanitized := tc.Sanitize() + if tc == nil { + require.Nil(sanitized) + } else { + require.NotNil(sanitized) + require.Empty(sanitized.SecretID) + } + } +} + func TestSpread_Validate(t *testing.T) { type tc struct { spread *Spread diff --git a/scheduler/util.go b/scheduler/util.go index 082273a1cf53..97a8ec5b9f0b 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -252,13 +252,7 @@ func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]int // Filter on datacenter and status node := raw.(*structs.Node) - if node.Status != structs.NodeStatusReady { - continue - } - if node.DrainStrategy != nil { - continue - } - if node.SchedulingEligibility != structs.NodeSchedulingEligible { + if !node.Ready() { continue } if _, ok := dcMap[node.Datacenter]; !ok { From 405efe6da130aa5b024634a2ba45c7aac75e62b9 Mon Sep 17 00:00:00 2001 From: Chris Baker <1675087+cgbaker@users.noreply.github.com> Date: Fri, 26 Mar 2021 17:10:39 +0000 Subject: [PATCH 13/15] reinserted/expanded fsm node.canonicalize test that was still needed --- nomad/fsm_test.go | 63 ++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 59 insertions(+), 4 deletions(-) diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index f4a1949dd55e..60dc3fc3a609 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -11,6 +11,11 @@ import ( "github.com/google/go-cmp/cmp" memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/raft" + "github.com/kr/pretty" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/helper/uuid" @@ -19,10 +24,6 @@ import ( "github.com/hashicorp/nomad/nomad/stream" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" - "github.com/hashicorp/raft" - "github.com/kr/pretty" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) type MockSink struct { @@ -180,6 +181,60 @@ func TestFSM_UpsertNode(t *testing.T) { } +func TestFSM_UpsertNode_Canonicalize(t *testing.T) { + t.Parallel() + require := require.New(t) + + fsm := testFSM(t) + fsm.blockedEvals.SetEnabled(true) + + // Setup a node without eligibility, ensure that upsert/canonicalize put it back + node := mock.Node() + node.SchedulingEligibility = "" + + req := structs.NodeRegisterRequest{ + Node: node, + } + buf, err := structs.Encode(structs.NodeRegisterRequestType, req) + require.Nil(err) + + require.Nil(fsm.Apply(makeLog(buf))) + + // Verify we are registered + n, err := fsm.State().NodeByID(nil, req.Node.ID) + require.Nil(err) + require.NotNil(n) + require.EqualValues(1, n.CreateIndex) + require.Equal(structs.NodeSchedulingEligible, n.SchedulingEligibility) +} + +func TestFSM_UpsertNode_Canonicalize_Ineligible(t *testing.T) { + t.Parallel() + require := require.New(t) + + fsm := testFSM(t) + fsm.blockedEvals.SetEnabled(true) + + // Setup a node without eligibility, ensure that upsert/canonicalize put it back + node := mock.DrainNode() + node.SchedulingEligibility = "" + + req := structs.NodeRegisterRequest{ + Node: node, + } + buf, err := structs.Encode(structs.NodeRegisterRequestType, req) + require.Nil(err) + + require.Nil(fsm.Apply(makeLog(buf))) + + // Verify we are registered + n, err := fsm.State().NodeByID(nil, req.Node.ID) + require.Nil(err) + require.NotNil(n) + require.EqualValues(1, n.CreateIndex) + require.Equal(structs.NodeSchedulingIneligible, n.SchedulingEligibility) +} + func TestFSM_DeregisterNode(t *testing.T) { t.Parallel() fsm := testFSM(t) From e9c552b778aadc4d987d44da660736dc94c3d282 Mon Sep 17 00:00:00 2001 From: Chris Baker <1675087+cgbaker@users.noreply.github.com> Date: Fri, 26 Mar 2021 12:13:50 -0500 Subject: [PATCH 14/15] Update contributing/checklist-jobspec.md --- contributing/checklist-jobspec.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contributing/checklist-jobspec.md b/contributing/checklist-jobspec.md index 82318b43a7b1..bfe9b5d15917 100644 --- a/contributing/checklist-jobspec.md +++ b/contributing/checklist-jobspec.md @@ -62,4 +62,4 @@ a few ways that this can be accomplished: This conversion can be performed in-situ in the agent HTTP handler, as long as the conversion doesn't need to appear in other handlers. Otherwise, it is possible to register an extension on the JSON encoding used by the HTTP agent; these extensions - can be put in `nomad/json/extensions.go`. \ No newline at end of file + can be put in `nomad/jsonhandles/extensions.go`. From ed636adc0beae81bf6b48a323520a241bc6b14e2 Mon Sep 17 00:00:00 2001 From: Chris Baker <1675087+cgbaker@users.noreply.github.com> Date: Fri, 26 Mar 2021 18:57:59 +0000 Subject: [PATCH 15/15] reworked Node.Canonicalize() to enforce invariants, fixed a broken test --- nomad/mock/mock.go | 1 + nomad/plan_apply_test.go | 8 +------- nomad/structs/structs.go | 14 ++++++-------- 3 files changed, 8 insertions(+), 15 deletions(-) diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index 53cb91241433..bf80e154e8fb 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -123,6 +123,7 @@ func DrainNode() *structs.Node { node.DrainStrategy = &structs.DrainStrategy{ DrainSpec: structs.DrainSpec{}, } + node.Canonicalize() return node } diff --git a/nomad/plan_apply_test.go b/nomad/plan_apply_test.go index 7c69d150bd17..7550baf413d4 100644 --- a/nomad/plan_apply_test.go +++ b/nomad/plan_apply_test.go @@ -714,13 +714,7 @@ func TestPlanApply_EvalNodePlan_NodeNotReady(t *testing.T) { func TestPlanApply_EvalNodePlan_NodeDrain(t *testing.T) { t.Parallel() state := testStateStore(t) - node := mock.Node() - node.DrainStrategy = &structs.DrainStrategy{ - DrainSpec: structs.DrainSpec{ - Deadline: 0, - IgnoreSystemJobs: false, - }, - } + node := mock.DrainNode() state.UpsertNode(structs.MsgTypeTestSetup, 1000, node) snap, _ := state.Snapshot() diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index a8dea8bf7571..4194928c32de 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1931,14 +1931,12 @@ func (n *Node) Canonicalize() { return } - // Ensure SchedulingEligibility is set whenever draining so the plan applier and other scheduling logic only need - // to check SchedulingEligibility when determining whether a placement is feasible on a node. - if n.SchedulingEligibility == "" { - if n.DrainStrategy != nil { - n.SchedulingEligibility = NodeSchedulingIneligible - } else { - n.SchedulingEligibility = NodeSchedulingEligible - } + // Ensure SchedulingEligibility is correctly set whenever draining so the plan applier and other scheduling logic + // only need to check SchedulingEligibility when determining whether a placement is feasible on a node. + if n.DrainStrategy != nil { + n.SchedulingEligibility = NodeSchedulingIneligible + } else if n.SchedulingEligibility == "" { + n.SchedulingEligibility = NodeSchedulingEligible } // COMPAT remove in 1.0