Skip to content

Commit

Permalink
api: remove mapstructure tags fromPort struct (#12916) (#15184)
Browse files Browse the repository at this point in the history
This PR solves a defect in the deserialization of api.Port structs when returning structs from theEventStream.

Previously, the api.Port struct's fields were decorated with both mapstructure and hcl tags to support the network.port stanza's use of the keyword static when posting a static port value. This works fine when posting a job and when retrieving any struct that has an embedded api.Port instance as long as the value is deserialized using JSON decoding. The EventStream, however, uses mapstructure to decode event payloads in the api package. mapstructure expects an underlying field named static which does not exist. The result was that the Port.Value field would always be set to 0.

Upon further inspection, a few things became apparent.

The struct already has hcl tags that support the indirection during job submission.
Serialization/deserialization with both the json and hcl packages produce the desired result.
The use of of the mapstructure tags provided no value as the Port struct contains only fields with primitive types.
This PR:

Removes the mapstructure tags from the api.Port structs
Updates the job parsing logic to use hcl instead of mapstructure when decoding Port instances.
Closes #11044

Co-authored-by: DerekStrickland <dstrickland@hashicorp.com>
Co-authored-by: Piotr Kazmierczak <470696+pkazmierczak@users.noreply.github.com>
  • Loading branch information
3 people committed Nov 8, 2022
1 parent 5f6b167 commit 875efd9
Show file tree
Hide file tree
Showing 10 changed files with 187 additions and 13 deletions.
3 changes: 3 additions & 0 deletions .changelog/12916.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
event_stream: fixed a bug where dynamic port values would fail to serialize in the event stream
```
6 changes: 3 additions & 3 deletions api/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,9 @@ func (r *Resources) Merge(other *Resources) {

type Port struct {
Label string `hcl:",label"`
Value int `mapstructure:"static" hcl:"static,optional"`
To int `mapstructure:"to" hcl:"to,optional"`
HostNetwork string `mapstructure:"host_network" hcl:"host_network,optional"`
Value int `hcl:"static,optional"`
To int `hcl:"to,optional"`
HostNetwork string `hcl:"host_network,optional"`
}

type DNSConfig struct {
Expand Down
1 change: 1 addition & 0 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,7 @@ func TestClient_Init(t *testing.T) {
ci.Parallel(t)

dir := t.TempDir()

allocDir := filepath.Join(dir, "alloc")

config := config.DefaultConfig()
Expand Down
102 changes: 102 additions & 0 deletions command/agent/event_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import (
"testing"
"time"

"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -202,3 +204,103 @@ func TestEventStream_QueryParse(t *testing.T) {
})
}
}

func TestHTTP_Alloc_Port_Response(t *testing.T) {
ci.Parallel(t)

httpTest(t, nil, func(srv *TestAgent) {
client := srv.Client()
defer srv.Shutdown()
defer client.Close()

testutil.WaitForLeader(t, srv.Agent.RPC)
testutil.WaitForClient(t, srv.Agent.Client().RPC, srv.Agent.Client().NodeID(), srv.Agent.Client().Region())

job := MockRunnableJob()

resp, _, err := client.Jobs().Register(job, nil)
require.NoError(t, err)
require.NotEmpty(t, resp.EvalID)

alloc := mock.Alloc()
alloc.Job = ApiJobToStructJob(job)
alloc.JobID = *job.ID
alloc.NodeID = srv.client.NodeID()

require.Nil(t, srv.server.State().UpsertJobSummary(101, mock.JobSummary(alloc.JobID)))
require.Nil(t, srv.server.State().UpsertAllocs(structs.MsgTypeTestSetup, 102, []*structs.Allocation{alloc}))

running := false
testutil.WaitForResult(func() (bool, error) {
upsertResult, stateErr := srv.server.State().AllocByID(nil, alloc.ID)
if stateErr != nil {
return false, stateErr
}
if upsertResult.ClientStatus == structs.AllocClientStatusRunning {
running = true
return true, nil
}
return false, nil
}, func(err error) {
require.NoError(t, err, "allocation query failed")
})

require.True(t, running)

topics := map[api.Topic][]string{
api.TopicAllocation: {*job.ID},
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

events := client.EventStream()
streamCh, err := events.Stream(ctx, topics, 1, nil)
require.NoError(t, err)

var allocEvents []api.Event
// gather job alloc events
go func() {
for {
select {
case event, ok := <-streamCh:
if !ok {
return
}
if event.IsHeartbeat() {
continue
}
allocEvents = append(allocEvents, event.Events...)
case <-time.After(10 * time.Second):
require.Fail(t, "failed waiting for event stream event")
}
}
}()

var networkResource *api.NetworkResource
testutil.WaitForResult(func() (bool, error) {
for _, e := range allocEvents {
if e.Type == structs.TypeAllocationUpdated {
eventAlloc, err := e.Allocation()
if err != nil {
return false, err
}
if len(eventAlloc.AllocatedResources.Tasks["web"].Networks) == 0 {
return false, nil
}
networkResource = eventAlloc.AllocatedResources.Tasks["web"].Networks[0]
if networkResource.ReservedPorts[0].Value == 5000 {
return true, nil
}
}
}
return false, nil
}, func(e error) {
require.NoError(t, err)
})

require.NotNil(t, networkResource)
require.Equal(t, 5000, networkResource.ReservedPorts[0].Value)
require.NotEqual(t, 0, networkResource.DynamicPorts[0].Value)
})
}
18 changes: 18 additions & 0 deletions command/agent/testingutils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,21 @@ func MockRegionalJob() *api.Job {
j.Region = pointer.Of("north-america")
return j
}

// MockRunnableJob returns a mock job that has a configuration that allows it to be
// placed on a TestAgent.
func MockRunnableJob() *api.Job {
job := MockJob()

// Configure job so it can be run on a TestAgent
job.Constraints = nil
job.TaskGroups[0].Constraints = nil
job.TaskGroups[0].Count = pointer.Of(1)
job.TaskGroups[0].Tasks[0].Driver = "mock_driver"
job.TaskGroups[0].Tasks[0].Services = nil
job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
"run_for": "10s",
}

return job
}
12 changes: 10 additions & 2 deletions e2e/events/input/deploy.nomad
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ job "deployment_auto.nomad" {
min_healthy_time = "1s"
}

network {
port "db" {
static = 9000
}
}

task "one" {
driver = "raw_exec"

Expand All @@ -24,10 +30,12 @@ job "deployment_auto.nomad" {
}

config {
command = "/bin/sleep"
image = "busybox:1"
command = "nc"

# change args to update the job, the only changes
args = ["1000000"]
args = ["-ll", "-p", "1234", "-e", "/bin/cat"]
ports = ["db"]
}

resources {
Expand Down
14 changes: 11 additions & 3 deletions e2e/events/input/initial.nomad
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,25 @@ job "deployment_auto.nomad" {
canary = 2
}

network {
port "db" {
static = 9000
}
}

task "one" {
driver = "raw_exec"
driver = "docker"

env {
version = "0"
}
config {
command = "/bin/sleep"
image = "busybox:1"
command = "nc"

# change args to update the job, the only changes
args = ["1000000"]
args = ["-ll", "-p", "1234", "-e", "/bin/cat"]
ports = ["db"]
}

resources {
Expand Down
8 changes: 3 additions & 5 deletions jobspec/parse_network.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,12 @@ func parsePorts(networkObj *ast.ObjectList, nw *api.NetworkResource) error {
if knownPortLabels[l] {
return fmt.Errorf("found a port label collision: %s", label)
}
var p map[string]interface{}

var res api.Port
if err := hcl.DecodeObject(&p, port.Val); err != nil {
return err
}
if err := mapstructure.WeakDecode(p, &res); err != nil {
if err := hcl.DecodeObject(&res, port.Val); err != nil {
return err
}

res.Label = label
if res.Value > 0 {
nw.ReservedPorts = append(nw.ReservedPorts, res)
Expand Down
25 changes: 25 additions & 0 deletions jobspec/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1898,3 +1898,28 @@ func TestIncorrectKey(t *testing.T) {
t.Fatalf("Expected key error; got %v", err)
}
}

// TestPortParsing validates that the removal of the mapstructure tags on the
// Port struct don't cause issues with HCL 1 parsing.
//
// TODO: in the future, see if we need `mapstructure` tags on any of the API
func TestPortParsing(t *testing.T) {
ci.Parallel(t)

var err error
var path string
var job *api.Job

path, err = filepath.Abs(filepath.Join("./test-fixtures", "parse-ports.hcl"))
require.NoError(t, err, "Can't get absolute path for file: parse-ports.hcl")

job, err = ParseFile(path)
require.NoError(t, err, "cannot parse job")
require.NotNil(t, job)
require.Len(t, job.TaskGroups, 1)
require.Len(t, job.TaskGroups[0].Networks, 1)
require.Len(t, job.TaskGroups[0].Networks[0].ReservedPorts, 1)
require.Len(t, job.TaskGroups[0].Networks[0].DynamicPorts, 1)
require.Equal(t, 9000, job.TaskGroups[0].Networks[0].ReservedPorts[0].Value)
require.Equal(t, 0, job.TaskGroups[0].Networks[0].DynamicPorts[0].Value)
}
11 changes: 11 additions & 0 deletions jobspec/test-fixtures/parse-ports.hcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
job "parse-ports" {
group "group" {
network {
port "static" {
static = 9000
}

port "dynamic" {}
}
}
}

0 comments on commit 875efd9

Please sign in to comment.