diff --git a/.changelog/12916.txt b/.changelog/12916.txt new file mode 100644 index 000000000000..83056fc4d3a5 --- /dev/null +++ b/.changelog/12916.txt @@ -0,0 +1,3 @@ +```release-note:bug +event_stream: fixed a bug where dynamic port values would fail to serialize in the event stream +``` diff --git a/api/resources.go b/api/resources.go index 43f6bbe86ad1..84d9d1905474 100644 --- a/api/resources.go +++ b/api/resources.go @@ -98,9 +98,9 @@ func (r *Resources) Merge(other *Resources) { type Port struct { Label string `hcl:",label"` - Value int `mapstructure:"static" hcl:"static,optional"` - To int `mapstructure:"to" hcl:"to,optional"` - HostNetwork string `mapstructure:"host_network" hcl:"host_network,optional"` + Value int `hcl:"static,optional"` + To int `hcl:"to,optional"` + HostNetwork string `hcl:"host_network,optional"` } type DNSConfig struct { diff --git a/client/client_test.go b/client/client_test.go index 4040ec238a7f..c9b4b6fb98b9 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -727,6 +727,7 @@ func TestClient_Init(t *testing.T) { ci.Parallel(t) dir := t.TempDir() + allocDir := filepath.Join(dir, "alloc") config := config.DefaultConfig() diff --git a/command/agent/event_endpoint_test.go b/command/agent/event_endpoint_test.go index acef9f08ecbb..209b45960338 100644 --- a/command/agent/event_endpoint_test.go +++ b/command/agent/event_endpoint_test.go @@ -10,8 +10,10 @@ import ( "testing" "time" + "github.com/hashicorp/nomad/api" "github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/helper/uuid" + "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" "github.com/stretchr/testify/assert" @@ -202,3 +204,103 @@ func TestEventStream_QueryParse(t *testing.T) { }) } } + +func TestHTTP_Alloc_Port_Response(t *testing.T) { + ci.Parallel(t) + + httpTest(t, nil, func(srv *TestAgent) { + client := srv.Client() + defer srv.Shutdown() + defer client.Close() + + testutil.WaitForLeader(t, srv.Agent.RPC) + testutil.WaitForClient(t, srv.Agent.Client().RPC, srv.Agent.Client().NodeID(), srv.Agent.Client().Region()) + + job := MockRunnableJob() + + resp, _, err := client.Jobs().Register(job, nil) + require.NoError(t, err) + require.NotEmpty(t, resp.EvalID) + + alloc := mock.Alloc() + alloc.Job = ApiJobToStructJob(job) + alloc.JobID = *job.ID + alloc.NodeID = srv.client.NodeID() + + require.Nil(t, srv.server.State().UpsertJobSummary(101, mock.JobSummary(alloc.JobID))) + require.Nil(t, srv.server.State().UpsertAllocs(structs.MsgTypeTestSetup, 102, []*structs.Allocation{alloc})) + + running := false + testutil.WaitForResult(func() (bool, error) { + upsertResult, stateErr := srv.server.State().AllocByID(nil, alloc.ID) + if stateErr != nil { + return false, stateErr + } + if upsertResult.ClientStatus == structs.AllocClientStatusRunning { + running = true + return true, nil + } + return false, nil + }, func(err error) { + require.NoError(t, err, "allocation query failed") + }) + + require.True(t, running) + + topics := map[api.Topic][]string{ + api.TopicAllocation: {*job.ID}, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + events := client.EventStream() + streamCh, err := events.Stream(ctx, topics, 1, nil) + require.NoError(t, err) + + var allocEvents []api.Event + // gather job alloc events + go func() { + for { + select { + case event, ok := <-streamCh: + if !ok { + return + } + if event.IsHeartbeat() { + continue + } + allocEvents = append(allocEvents, event.Events...) + case <-time.After(10 * time.Second): + require.Fail(t, "failed waiting for event stream event") + } + } + }() + + var networkResource *api.NetworkResource + testutil.WaitForResult(func() (bool, error) { + for _, e := range allocEvents { + if e.Type == structs.TypeAllocationUpdated { + eventAlloc, err := e.Allocation() + if err != nil { + return false, err + } + if len(eventAlloc.AllocatedResources.Tasks["web"].Networks) == 0 { + return false, nil + } + networkResource = eventAlloc.AllocatedResources.Tasks["web"].Networks[0] + if networkResource.ReservedPorts[0].Value == 5000 { + return true, nil + } + } + } + return false, nil + }, func(e error) { + require.NoError(t, err) + }) + + require.NotNil(t, networkResource) + require.Equal(t, 5000, networkResource.ReservedPorts[0].Value) + require.NotEqual(t, 0, networkResource.DynamicPorts[0].Value) + }) +} diff --git a/command/agent/testingutils_test.go b/command/agent/testingutils_test.go index 152effae365a..270621f73d31 100644 --- a/command/agent/testingutils_test.go +++ b/command/agent/testingutils_test.go @@ -104,3 +104,21 @@ func MockRegionalJob() *api.Job { j.Region = pointer.Of("north-america") return j } + +// MockRunnableJob returns a mock job that has a configuration that allows it to be +// placed on a TestAgent. +func MockRunnableJob() *api.Job { + job := MockJob() + + // Configure job so it can be run on a TestAgent + job.Constraints = nil + job.TaskGroups[0].Constraints = nil + job.TaskGroups[0].Count = pointer.Of(1) + job.TaskGroups[0].Tasks[0].Driver = "mock_driver" + job.TaskGroups[0].Tasks[0].Services = nil + job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{ + "run_for": "10s", + } + + return job +} diff --git a/e2e/events/input/deploy.nomad b/e2e/events/input/deploy.nomad index f889872d4443..d998d68d66dc 100644 --- a/e2e/events/input/deploy.nomad +++ b/e2e/events/input/deploy.nomad @@ -16,6 +16,12 @@ job "deployment_auto.nomad" { min_healthy_time = "1s" } + network { + port "db" { + static = 9000 + } + } + task "one" { driver = "raw_exec" @@ -24,10 +30,12 @@ job "deployment_auto.nomad" { } config { - command = "/bin/sleep" + image = "busybox:1" + command = "nc" # change args to update the job, the only changes - args = ["1000000"] + args = ["-ll", "-p", "1234", "-e", "/bin/cat"] + ports = ["db"] } resources { diff --git a/e2e/events/input/initial.nomad b/e2e/events/input/initial.nomad index a583654cbba0..994b3afbb696 100644 --- a/e2e/events/input/initial.nomad +++ b/e2e/events/input/initial.nomad @@ -15,17 +15,25 @@ job "deployment_auto.nomad" { canary = 2 } + network { + port "db" { + static = 9000 + } + } + task "one" { - driver = "raw_exec" + driver = "docker" env { version = "0" } config { - command = "/bin/sleep" + image = "busybox:1" + command = "nc" # change args to update the job, the only changes - args = ["1000000"] + args = ["-ll", "-p", "1234", "-e", "/bin/cat"] + ports = ["db"] } resources { diff --git a/jobspec/parse_network.go b/jobspec/parse_network.go index 795f38a3eb99..a97bea38fae4 100644 --- a/jobspec/parse_network.go +++ b/jobspec/parse_network.go @@ -93,14 +93,12 @@ func parsePorts(networkObj *ast.ObjectList, nw *api.NetworkResource) error { if knownPortLabels[l] { return fmt.Errorf("found a port label collision: %s", label) } - var p map[string]interface{} + var res api.Port - if err := hcl.DecodeObject(&p, port.Val); err != nil { - return err - } - if err := mapstructure.WeakDecode(p, &res); err != nil { + if err := hcl.DecodeObject(&res, port.Val); err != nil { return err } + res.Label = label if res.Value > 0 { nw.ReservedPorts = append(nw.ReservedPorts, res) diff --git a/jobspec/parse_test.go b/jobspec/parse_test.go index 44fb8c196782..1e5cd8bb2b3f 100644 --- a/jobspec/parse_test.go +++ b/jobspec/parse_test.go @@ -1924,3 +1924,28 @@ func TestIncorrectKey(t *testing.T) { t.Fatalf("Expected key error; got %v", err) } } + +// TestPortParsing validates that the removal of the mapstructure tags on the +// Port struct don't cause issues with HCL 1 parsing. +// +// TODO: in the future, see if we need `mapstructure` tags on any of the API +func TestPortParsing(t *testing.T) { + ci.Parallel(t) + + var err error + var path string + var job *api.Job + + path, err = filepath.Abs(filepath.Join("./test-fixtures", "parse-ports.hcl")) + require.NoError(t, err, "Can't get absolute path for file: parse-ports.hcl") + + job, err = ParseFile(path) + require.NoError(t, err, "cannot parse job") + require.NotNil(t, job) + require.Len(t, job.TaskGroups, 1) + require.Len(t, job.TaskGroups[0].Networks, 1) + require.Len(t, job.TaskGroups[0].Networks[0].ReservedPorts, 1) + require.Len(t, job.TaskGroups[0].Networks[0].DynamicPorts, 1) + require.Equal(t, 9000, job.TaskGroups[0].Networks[0].ReservedPorts[0].Value) + require.Equal(t, 0, job.TaskGroups[0].Networks[0].DynamicPorts[0].Value) +} diff --git a/jobspec/test-fixtures/parse-ports.hcl b/jobspec/test-fixtures/parse-ports.hcl new file mode 100644 index 000000000000..e87bafbd72c2 --- /dev/null +++ b/jobspec/test-fixtures/parse-ports.hcl @@ -0,0 +1,11 @@ +job "parse-ports" { + group "group" { + network { + port "static" { + static = 9000 + } + + port "dynamic" {} + } + } +}