From 69aeaf016fcf067bac5cecb79ae0c53c0f4b5d9a Mon Sep 17 00:00:00 2001 From: Derek Strickland <1111455+DerekStrickland@users.noreply.github.com> Date: Tue, 8 Nov 2022 05:26:28 -0500 Subject: [PATCH] api: remove `mapstructure` tags from`Port` struct (#12916) This PR solves a defect in the deserialization of api.Port structs when returning structs from theEventStream. Previously, the api.Port struct's fields were decorated with both mapstructure and hcl tags to support the network.port stanza's use of the keyword static when posting a static port value. This works fine when posting a job and when retrieving any struct that has an embedded api.Port instance as long as the value is deserialized using JSON decoding. The EventStream, however, uses mapstructure to decode event payloads in the api package. mapstructure expects an underlying field named static which does not exist. The result was that the Port.Value field would always be set to 0. Upon further inspection, a few things became apparent. The struct already has hcl tags that support the indirection during job submission. Serialization/deserialization with both the json and hcl packages produce the desired result. The use of of the mapstructure tags provided no value as the Port struct contains only fields with primitive types. This PR: Removes the mapstructure tags from the api.Port structs Updates the job parsing logic to use hcl instead of mapstructure when decoding Port instances. Closes #11044 Co-authored-by: DerekStrickland Co-authored-by: Piotr Kazmierczak <470696+pkazmierczak@users.noreply.github.com> --- .changelog/12916.txt | 3 + api/resources.go | 6 +- client/client_test.go | 8 +- command/agent/event_endpoint_test.go | 102 ++++++++++++++++++++++++++ command/agent/testingutils_test.go | 18 +++++ e2e/events/input/deploy.nomad | 12 ++- e2e/events/input/initial.nomad | 14 +++- jobspec/parse_network.go | 8 +- jobspec/parse_test.go | 25 +++++++ jobspec/test-fixtures/parse-ports.hcl | 11 +++ 10 files changed, 188 insertions(+), 19 deletions(-) create mode 100644 .changelog/12916.txt create mode 100644 jobspec/test-fixtures/parse-ports.hcl diff --git a/.changelog/12916.txt b/.changelog/12916.txt new file mode 100644 index 000000000000..83056fc4d3a5 --- /dev/null +++ b/.changelog/12916.txt @@ -0,0 +1,3 @@ +```release-note:bug +event_stream: fixed a bug where dynamic port values would fail to serialize in the event stream +``` diff --git a/api/resources.go b/api/resources.go index 43f6bbe86ad1..84d9d1905474 100644 --- a/api/resources.go +++ b/api/resources.go @@ -98,9 +98,9 @@ func (r *Resources) Merge(other *Resources) { type Port struct { Label string `hcl:",label"` - Value int `mapstructure:"static" hcl:"static,optional"` - To int `mapstructure:"to" hcl:"to,optional"` - HostNetwork string `mapstructure:"host_network" hcl:"host_network,optional"` + Value int `hcl:"static,optional"` + To int `hcl:"to,optional"` + HostNetwork string `hcl:"host_network,optional"` } type DNSConfig struct { diff --git a/client/client_test.go b/client/client_test.go index 7135218dd43f..ff07eab6769d 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -2,7 +2,6 @@ package client import ( "fmt" - "io/ioutil" "net" "os" "path/filepath" @@ -726,11 +725,8 @@ func TestClient_AddAllocError(t *testing.T) { func TestClient_Init(t *testing.T) { ci.Parallel(t) - dir, err := ioutil.TempDir("", "nomad") - if err != nil { - t.Fatalf("err: %s", err) - } - defer os.RemoveAll(dir) + dir := t.TempDir() + allocDir := filepath.Join(dir, "alloc") config := config.DefaultConfig() diff --git a/command/agent/event_endpoint_test.go b/command/agent/event_endpoint_test.go index acef9f08ecbb..209b45960338 100644 --- a/command/agent/event_endpoint_test.go +++ b/command/agent/event_endpoint_test.go @@ -10,8 +10,10 @@ import ( "testing" "time" + "github.com/hashicorp/nomad/api" "github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/helper/uuid" + "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" "github.com/stretchr/testify/assert" @@ -202,3 +204,103 @@ func TestEventStream_QueryParse(t *testing.T) { }) } } + +func TestHTTP_Alloc_Port_Response(t *testing.T) { + ci.Parallel(t) + + httpTest(t, nil, func(srv *TestAgent) { + client := srv.Client() + defer srv.Shutdown() + defer client.Close() + + testutil.WaitForLeader(t, srv.Agent.RPC) + testutil.WaitForClient(t, srv.Agent.Client().RPC, srv.Agent.Client().NodeID(), srv.Agent.Client().Region()) + + job := MockRunnableJob() + + resp, _, err := client.Jobs().Register(job, nil) + require.NoError(t, err) + require.NotEmpty(t, resp.EvalID) + + alloc := mock.Alloc() + alloc.Job = ApiJobToStructJob(job) + alloc.JobID = *job.ID + alloc.NodeID = srv.client.NodeID() + + require.Nil(t, srv.server.State().UpsertJobSummary(101, mock.JobSummary(alloc.JobID))) + require.Nil(t, srv.server.State().UpsertAllocs(structs.MsgTypeTestSetup, 102, []*structs.Allocation{alloc})) + + running := false + testutil.WaitForResult(func() (bool, error) { + upsertResult, stateErr := srv.server.State().AllocByID(nil, alloc.ID) + if stateErr != nil { + return false, stateErr + } + if upsertResult.ClientStatus == structs.AllocClientStatusRunning { + running = true + return true, nil + } + return false, nil + }, func(err error) { + require.NoError(t, err, "allocation query failed") + }) + + require.True(t, running) + + topics := map[api.Topic][]string{ + api.TopicAllocation: {*job.ID}, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + events := client.EventStream() + streamCh, err := events.Stream(ctx, topics, 1, nil) + require.NoError(t, err) + + var allocEvents []api.Event + // gather job alloc events + go func() { + for { + select { + case event, ok := <-streamCh: + if !ok { + return + } + if event.IsHeartbeat() { + continue + } + allocEvents = append(allocEvents, event.Events...) + case <-time.After(10 * time.Second): + require.Fail(t, "failed waiting for event stream event") + } + } + }() + + var networkResource *api.NetworkResource + testutil.WaitForResult(func() (bool, error) { + for _, e := range allocEvents { + if e.Type == structs.TypeAllocationUpdated { + eventAlloc, err := e.Allocation() + if err != nil { + return false, err + } + if len(eventAlloc.AllocatedResources.Tasks["web"].Networks) == 0 { + return false, nil + } + networkResource = eventAlloc.AllocatedResources.Tasks["web"].Networks[0] + if networkResource.ReservedPorts[0].Value == 5000 { + return true, nil + } + } + } + return false, nil + }, func(e error) { + require.NoError(t, err) + }) + + require.NotNil(t, networkResource) + require.Equal(t, 5000, networkResource.ReservedPorts[0].Value) + require.NotEqual(t, 0, networkResource.DynamicPorts[0].Value) + }) +} diff --git a/command/agent/testingutils_test.go b/command/agent/testingutils_test.go index 152effae365a..270621f73d31 100644 --- a/command/agent/testingutils_test.go +++ b/command/agent/testingutils_test.go @@ -104,3 +104,21 @@ func MockRegionalJob() *api.Job { j.Region = pointer.Of("north-america") return j } + +// MockRunnableJob returns a mock job that has a configuration that allows it to be +// placed on a TestAgent. +func MockRunnableJob() *api.Job { + job := MockJob() + + // Configure job so it can be run on a TestAgent + job.Constraints = nil + job.TaskGroups[0].Constraints = nil + job.TaskGroups[0].Count = pointer.Of(1) + job.TaskGroups[0].Tasks[0].Driver = "mock_driver" + job.TaskGroups[0].Tasks[0].Services = nil + job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{ + "run_for": "10s", + } + + return job +} diff --git a/e2e/events/input/deploy.nomad b/e2e/events/input/deploy.nomad index f889872d4443..d998d68d66dc 100644 --- a/e2e/events/input/deploy.nomad +++ b/e2e/events/input/deploy.nomad @@ -16,6 +16,12 @@ job "deployment_auto.nomad" { min_healthy_time = "1s" } + network { + port "db" { + static = 9000 + } + } + task "one" { driver = "raw_exec" @@ -24,10 +30,12 @@ job "deployment_auto.nomad" { } config { - command = "/bin/sleep" + image = "busybox:1" + command = "nc" # change args to update the job, the only changes - args = ["1000000"] + args = ["-ll", "-p", "1234", "-e", "/bin/cat"] + ports = ["db"] } resources { diff --git a/e2e/events/input/initial.nomad b/e2e/events/input/initial.nomad index 3384c64f9d59..418364ccd817 100644 --- a/e2e/events/input/initial.nomad +++ b/e2e/events/input/initial.nomad @@ -15,17 +15,25 @@ job "deployment_auto.nomad" { canary = 2 } + network { + port "db" { + static = 9000 + } + } + task "one" { - driver = "raw_exec" + driver = "docker" env { version = "0" } config { - command = "/bin/sleep" + image = "busybox:1" + command = "nc" # change args to update the job, the only changes - args = ["1000000"] + args = ["-ll", "-p", "1234", "-e", "/bin/cat"] + ports = ["db"] } resources { diff --git a/jobspec/parse_network.go b/jobspec/parse_network.go index 795f38a3eb99..a97bea38fae4 100644 --- a/jobspec/parse_network.go +++ b/jobspec/parse_network.go @@ -93,14 +93,12 @@ func parsePorts(networkObj *ast.ObjectList, nw *api.NetworkResource) error { if knownPortLabels[l] { return fmt.Errorf("found a port label collision: %s", label) } - var p map[string]interface{} + var res api.Port - if err := hcl.DecodeObject(&p, port.Val); err != nil { - return err - } - if err := mapstructure.WeakDecode(p, &res); err != nil { + if err := hcl.DecodeObject(&res, port.Val); err != nil { return err } + res.Label = label if res.Value > 0 { nw.ReservedPorts = append(nw.ReservedPorts, res) diff --git a/jobspec/parse_test.go b/jobspec/parse_test.go index bbd183abcd15..38251a03ee39 100644 --- a/jobspec/parse_test.go +++ b/jobspec/parse_test.go @@ -1846,3 +1846,28 @@ func TestIncorrectKey(t *testing.T) { t.Fatalf("Expected key error; got %v", err) } } + +// TestPortParsing validates that the removal of the mapstructure tags on the +// Port struct don't cause issues with HCL 1 parsing. +// +// TODO: in the future, see if we need `mapstructure` tags on any of the API +func TestPortParsing(t *testing.T) { + ci.Parallel(t) + + var err error + var path string + var job *api.Job + + path, err = filepath.Abs(filepath.Join("./test-fixtures", "parse-ports.hcl")) + require.NoError(t, err, "Can't get absolute path for file: parse-ports.hcl") + + job, err = ParseFile(path) + require.NoError(t, err, "cannot parse job") + require.NotNil(t, job) + require.Len(t, job.TaskGroups, 1) + require.Len(t, job.TaskGroups[0].Networks, 1) + require.Len(t, job.TaskGroups[0].Networks[0].ReservedPorts, 1) + require.Len(t, job.TaskGroups[0].Networks[0].DynamicPorts, 1) + require.Equal(t, 9000, job.TaskGroups[0].Networks[0].ReservedPorts[0].Value) + require.Equal(t, 0, job.TaskGroups[0].Networks[0].DynamicPorts[0].Value) +} diff --git a/jobspec/test-fixtures/parse-ports.hcl b/jobspec/test-fixtures/parse-ports.hcl new file mode 100644 index 000000000000..e87bafbd72c2 --- /dev/null +++ b/jobspec/test-fixtures/parse-ports.hcl @@ -0,0 +1,11 @@ +job "parse-ports" { + group "group" { + network { + port "static" { + static = 9000 + } + + port "dynamic" {} + } + } +}