Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api: remove mapstructure tags fromPort struct #12916

Merged
merged 12 commits into from
Nov 8, 2022
3 changes: 3 additions & 0 deletions .changelog/12916.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
event_stream: fixed a bug where dynamic port values would fail to serialize in the event stream
```
6 changes: 3 additions & 3 deletions api/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,9 @@ func (r *Resources) Merge(other *Resources) {

type Port struct {
Label string `hcl:",label"`
Value int `mapstructure:"static" hcl:"static,optional"`
To int `mapstructure:"to" hcl:"to,optional"`
HostNetwork string `mapstructure:"host_network" hcl:"host_network,optional"`
Value int `hcl:"static,optional"`
To int `hcl:"to,optional"`
HostNetwork string `hcl:"host_network,optional"`
}

type DNSConfig struct {
Expand Down
24 changes: 24 additions & 0 deletions api/testutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package api

import (
"testing"

"github.com/hashicorp/nomad/api/internal/testutil"
)

// NewTestClient returns an API client useful for testing.
func NewTestClient(t *testing.T) (*Client, error) {
pkazmierczak marked this conversation as resolved.
Show resolved Hide resolved
// Make client config
conf := DefaultConfig()
// Create server
server := testutil.NewTestServer(t, nil)
conf.Address = "http://" + server.HTTPAddr

// Create client
client, err := NewClient(conf)
if err != nil {
return nil, err
}

return client, nil
}
125 changes: 125 additions & 0 deletions client/client_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package client

import (
"context"
"fmt"
"net"
"os"
Expand All @@ -11,6 +12,7 @@ import (
"time"

memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/ci"
trstate "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state"
"github.com/hashicorp/nomad/client/config"
Expand Down Expand Up @@ -727,6 +729,7 @@ func TestClient_Init(t *testing.T) {
ci.Parallel(t)

dir := t.TempDir()

allocDir := filepath.Join(dir, "alloc")

config := config.DefaultConfig()
Expand Down Expand Up @@ -1792,3 +1795,125 @@ func TestClient_ReconnectAllocs(t *testing.T) {
require.False(t, invalid, "expected alloc to not be marked invalid")
require.Equal(t, unknownAlloc.AllocModifyIndex, finalAlloc.AllocModifyIndex)
}

func TestClient_Alloc_Port_Response(t *testing.T) {
pkazmierczak marked this conversation as resolved.
Show resolved Hide resolved
t.Parallel()

srv, _, cleanupSrv := testServer(t, func(config *nomad.Config) {
config.DevMode = false
})
defer cleanupSrv()
testutil.WaitForLeader(t, srv.RPC)

client, cleanupClient := TestClient(t, func(c *config.Config) {
c.DevMode = false
c.RPCHandler = srv
})
defer cleanupClient()

waitTilNodeReady(client, t)

job := mock.Job()
job.TaskGroups[0].Count = 1
alloc := mock.Alloc()
alloc.NodeID = client.Node().ID
alloc.Job = job
alloc.JobID = job.ID
alloc.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver"
alloc.Job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
"run_for": "10s",
}
alloc.ClientStatus = structs.AllocClientStatusPending

state := srv.State()
err := state.UpsertJob(structs.MsgTypeTestSetup, 100, job)
require.NoError(t, err)

err = state.UpsertJobSummary(101, mock.JobSummary(alloc.JobID))
require.NoError(t, err)

err = state.UpsertAllocs(structs.MsgTypeTestSetup, 102, []*structs.Allocation{alloc})
require.NoError(t, err)

// Ensure allocation gets upserted with desired status.
running := false
testutil.WaitForResult(func() (bool, error) {
upsertResult, stateErr := state.AllocByID(nil, alloc.ID)
if stateErr != nil {
return false, stateErr
}
if upsertResult.ClientStatus == structs.AllocClientStatusRunning {
running = true
return true, nil
}
return false, nil
}, func(err error) {
require.NoError(t, err, "allocation query failed")
})

require.True(t, running)

topics := map[api.Topic][]string{
api.TopicJob: {job.ID},
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

conf := api.DefaultConfig()
conf.Address = "http://" + srv.GetConfig().RPCAddr.String()
apiClient, err := api.NewClient(conf)
defer apiClient.Close()
require.NoError(t, err)

events := apiClient.EventStream()
streamCh, err := events.Stream(ctx, topics, 102, nil)
require.NoError(t, err)

var allocEvents []api.Event
// gather job alloc events
go func() {
for {
select {
case <-ctx.Done():
return
case event, ok := <-streamCh:
if !ok {
return
}
if event.Err != nil {
return
}
if event.IsHeartbeat() {
continue
}
t.Logf("event_type: %s", event.Events[0].Type)
allocEvents = append(allocEvents, event.Events...)
case <-time.After(30 * time.Second):
require.Fail(t, "failed waiting for event stream event")
}
}
}()

var eventAlloc *api.Allocation
testutil.WaitForResult(func() (bool, error) {
var got string
for _, e := range allocEvents {
t.Logf("event_type: %s", e.Type)
if e.Type == structs.TypeAllocationCreated || e.Type == structs.TypeAllocationUpdated {
eventAlloc, err = e.Allocation()
return true, nil
}
got = e.Type
}
return false, fmt.Errorf("expected to receive allocation updated event, got: %#v", got)
}, func(e error) {
require.NoError(t, err)
})

require.NotNil(t, eventAlloc)

networkResource := eventAlloc.AllocatedResources.Tasks["web"].Networks[0]
require.Equal(t, 9000, networkResource.ReservedPorts[0].Value)
require.NotEqual(t, 0, networkResource.DynamicPorts[0].Value)
}
102 changes: 102 additions & 0 deletions command/agent/event_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import (
"testing"
"time"

"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -202,3 +204,103 @@ func TestEventStream_QueryParse(t *testing.T) {
})
}
}

func TestHTTP_AllocPort_Parsing(t *testing.T) {
pkazmierczak marked this conversation as resolved.
Show resolved Hide resolved
ci.Parallel(t)

httpTest(t, nil, func(srv *TestAgent) {
client := srv.Client()
defer srv.Shutdown()
defer client.Close()

testutil.WaitForLeader(t, srv.Agent.RPC)
testutil.WaitForClient(t, srv.Agent.Client().RPC, srv.Agent.Client().NodeID(), srv.Agent.Client().Region())

job := MockRunnableJob()

resp, _, err := client.Jobs().Register(job, nil)
require.NoError(t, err)
require.NotEmpty(t, resp.EvalID)

alloc := mock.Alloc()
alloc.Job = ApiJobToStructJob(job)
alloc.JobID = *job.ID
alloc.NodeID = srv.client.NodeID()

require.Nil(t, srv.server.State().UpsertJobSummary(101, mock.JobSummary(alloc.JobID)))
require.Nil(t, srv.server.State().UpsertAllocs(structs.MsgTypeTestSetup, 102, []*structs.Allocation{alloc}))

running := false
testutil.WaitForResult(func() (bool, error) {
upsertResult, stateErr := srv.server.State().AllocByID(nil, alloc.ID)
if stateErr != nil {
return false, stateErr
}
if upsertResult.ClientStatus == structs.AllocClientStatusRunning {
running = true
return true, nil
}
return false, nil
}, func(err error) {
require.NoError(t, err, "allocation query failed")
})

require.True(t, running)

topics := map[api.Topic][]string{
api.TopicAllocation: {*job.ID},
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

events := client.EventStream()
streamCh, err := events.Stream(ctx, topics, 1, nil)
require.NoError(t, err)

var allocEvents []api.Event
// gather job alloc events
go func() {
for {
select {
case event, ok := <-streamCh:
if !ok {
return
}
if event.IsHeartbeat() {
continue
}
allocEvents = append(allocEvents, event.Events...)
case <-time.After(10 * time.Second):
require.Fail(t, "failed waiting for event stream event")
}
}
}()

var networkResource *api.NetworkResource
testutil.WaitForResult(func() (bool, error) {
for _, e := range allocEvents {
if e.Type == structs.TypeAllocationUpdated {
eventAlloc, err := e.Allocation()
if err != nil {
return false, err
}
if len(eventAlloc.AllocatedResources.Tasks["web"].Networks) == 0 {
return false, nil
}
networkResource = eventAlloc.AllocatedResources.Tasks["web"].Networks[0]
if networkResource.ReservedPorts[0].Value == 5000 {
return true, nil
}
}
}
return false, nil
}, func(e error) {
require.NoError(t, err)
})

require.NotNil(t, networkResource)
require.Equal(t, 5000, networkResource.ReservedPorts[0].Value)
require.NotEqual(t, 0, networkResource.DynamicPorts[0].Value)
})
}
32 changes: 32 additions & 0 deletions command/agent/test-resources/parse-ports.nomad
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
job "parse-ports" {
pkazmierczak marked this conversation as resolved.
Show resolved Hide resolved
datacenters = ["dc1"]
type = "service"

constraint {
attribute = "${attr.kernel.name}"
value = "linux"
}

group "group" {
count = 1

network {
port "static" {
static = 9000
}

port "dynamic" {}
}

task "parse-port" {
driver = "docker"

config {
image = "busybox:1"
command = "nc"
args = ["-ll", "-p", "1234", "-e", "/bin/cat"]
ports = ["static", "dynamic"]
}
}
}
}
18 changes: 18 additions & 0 deletions command/agent/testingutils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,21 @@ func MockRegionalJob() *api.Job {
j.Region = pointer.Of("north-america")
return j
}

// MockRunnableJob returns a mock job that has a configuration that allows it to be
// placed on a TestAgent.
func MockRunnableJob() *api.Job {
job := MockJob()

// Configure job so it can be run on a TestAgent
job.Constraints = nil
job.TaskGroups[0].Constraints = nil
job.TaskGroups[0].Count = helper.IntToPtr(1)
job.TaskGroups[0].Tasks[0].Driver = "mock_driver"
job.TaskGroups[0].Tasks[0].Services = nil
job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
"run_for": "10s",
}

return job
}
12 changes: 10 additions & 2 deletions e2e/events/input/deploy.nomad
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ job "deployment_auto.nomad" {
min_healthy_time = "1s"
}

network {
port "db" {
static = 9000
}
}

task "one" {
driver = "raw_exec"

Expand All @@ -24,10 +30,12 @@ job "deployment_auto.nomad" {
}

config {
command = "/bin/sleep"
image = "busybox:1"
command = "nc"

# change args to update the job, the only changes
args = ["1000000"]
args = ["-ll", "-p", "1234", "-e", "/bin/cat"]
ports = ["db"]
}

resources {
Expand Down
Loading