Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix dispatched optional meta correctly #4403

Merged
merged 7 commits into from
Jun 11, 2018
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ IMPROVEMENTS:
image pulls [[GH-4192](https://github.com/hashicorp/nomad/issues/4192)]
* driver/raw_exec: Use cgroups to manage process tree for precise cleanup of
launched processes [[GH-4350](https://github.com/hashicorp/nomad/issues/4350)]
* env: Default interpolation of optional meta fields of parameterized jobs to
an empty string rather than the field key. [[GH-3720](https://github.com/hashicorp/nomad/issues/3720)]
* ui: Show node drain, node eligibility, and node drain strategy information in the Client list and Client detail pages [[GH-4353](https://github.com/hashicorp/nomad/issues/4353)]
* ui: Show reschedule-event information for allocations that were server-side rescheduled [[GH-4254](https://github.com/hashicorp/nomad/issues/4254)]
* ui: Show the running deployment Progress Deadlines on the Job Detail Page [[GH-4388](https://github.com/hashicorp/nomad/issues/4388)]
Expand Down
3 changes: 2 additions & 1 deletion api/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,7 @@ type Job struct {
Update *UpdateStrategy
Periodic *PeriodicConfig
ParameterizedJob *ParameterizedJobConfig
Dispatched bool
Payload []byte
Reschedule *ReschedulePolicy
Migrate *MigrateStrategy
Expand All @@ -636,7 +637,7 @@ func (j *Job) IsPeriodic() bool {

// IsParameterized returns whether a job is parameterized job.
func (j *Job) IsParameterized() bool {
return j.ParameterizedJob != nil
return j.ParameterizedJob != nil && !j.Dispatched
}

func (j *Job) Canonicalize() {
Expand Down
18 changes: 17 additions & 1 deletion client/driver/env/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,23 @@ func (b *Builder) setAlloc(alloc *structs.Allocation) *Builder {

// Set meta
combined := alloc.Job.CombinedTaskMeta(alloc.TaskGroup, b.taskName)
b.taskMeta = make(map[string]string, len(combined)*2)
// taskMetaSize is double to total meta keys to account for given and upper
// cased values
taskMetaSize := len(combined) * 2

// if job is parameterized initialize optional meta to empty strings
if alloc.Job.Dispatched {
b.taskMeta = make(map[string]string,
taskMetaSize+(len(alloc.Job.ParameterizedJob.MetaOptional)*2))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extract the size into a named variable so reading this is clearer.


for _, k := range alloc.Job.ParameterizedJob.MetaOptional {
b.taskMeta[fmt.Sprintf("%s%s", MetaPrefix, strings.ToUpper(k))] = ""
b.taskMeta[fmt.Sprintf("%s%s", MetaPrefix, k)] = ""
}
} else {
b.taskMeta = make(map[string]string, taskMetaSize)
}

for k, v := range combined {
b.taskMeta[fmt.Sprintf("%s%s", MetaPrefix, strings.ToUpper(k))] = v
b.taskMeta[fmt.Sprintf("%s%s", MetaPrefix, k)] = v
Expand Down
17 changes: 17 additions & 0 deletions client/driver/env/env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
)

const (
Expand Down Expand Up @@ -372,3 +373,19 @@ func TestEnvironment_UpdateTask(t *testing.T) {
t.Errorf("Expected NOMAD_META_taskmeta to be unset but found: %q", v)
}
}

// TestEnvironment_InterpolateEmptyOptionalMeta asserts that in a parameterized
// job, if an optional meta field is not set, it will get interpolated as an
// empty string.
func TestEnvironment_InterpolateEmptyOptionalMeta(t *testing.T) {
a := mock.Alloc()
a.Job.ParameterizedJob = &structs.ParameterizedJobConfig{
MetaOptional: []string{"metaopt1", "metaopt2"},
}
a.Job.Dispatched = true
task := a.Job.TaskGroups[0].Tasks[0]
task.Meta = map[string]string{"metaopt1": "metaopt1val"}
env := NewBuilder(mock.Node(), a, task, "global").Build()
require.Equal(t, "metaopt1val", env.ReplaceEnv("${NOMAD_META_metaopt1}"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For consistency with other tests, initialize require := require.New(t)

require.Empty(t, env.ReplaceEnv("${NOMAD_META_metaopt2}"))
}
20 changes: 15 additions & 5 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,8 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
}

// Validate job transitions if its an update
if existingJob != nil {
if err := validateJobUpdate(existingJob, args.Job); err != nil {
return err
}
if err := validateJobUpdate(existingJob, args.Job); err != nil {
return err
}

// Ensure that the job has permissions for the requested Vault tokens
Expand Down Expand Up @@ -1327,6 +1325,14 @@ func validateJob(job *structs.Job) (invalid, warnings error) {

// validateJobUpdate ensures updates to a job are valid.
func validateJobUpdate(old, new *structs.Job) error {
// Validate Dispatch not set on new Jobs
if old == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment above saying we are validating a new job

if new.Dispatched {
return fmt.Errorf("job can't be submitted with 'Dispatched' set")
}
return nil
}

// Type transitions are disallowed
if old.Type != new.Type {
return fmt.Errorf("cannot update job from type %q to %q", old.Type, new.Type)
Expand All @@ -1348,6 +1354,10 @@ func validateJobUpdate(old, new *structs.Job) error {
return fmt.Errorf("cannot update parameterized job to being non-parameterized")
}

if old.Dispatched != new.Dispatched {
return fmt.Errorf("field 'Dispatched' is read-only")
}

return nil
}

Expand Down Expand Up @@ -1398,11 +1408,11 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa

// Derive the child job and commit it via Raft
dispatchJob := parameterizedJob.Copy()
dispatchJob.ParameterizedJob = nil
dispatchJob.ID = structs.DispatchedID(parameterizedJob.ID, time.Now())
dispatchJob.ParentID = parameterizedJob.ID
dispatchJob.Name = dispatchJob.ID
dispatchJob.SetSubmitTime()
dispatchJob.Dispatched = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


// Merge in the meta data
for k, v := range args.Meta {
Expand Down
47 changes: 47 additions & 0 deletions nomad/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,33 @@ func TestJobEndpoint_Register_ParameterizedJob(t *testing.T) {
}
}

func TestJobEndpoint_Register_Dispatched(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use require for new tests

t.Parallel()
s1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

// Create the register request with a job with 'Dispatch' set to true
job := mock.Job()
job.Dispatched = true
req := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}

// Fetch the response
var resp structs.JobRegisterResponse
err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)
if err == nil || !strings.Contains(err.Error(), "'Dispatched' is read-only") {
t.Fatalf("expected dispatch read-only error: %v", err)
}
}
func TestJobEndpoint_Register_EnforceIndex(t *testing.T) {
t.Parallel()
s1 := TestServer(t, func(c *Config) {
Expand Down Expand Up @@ -3959,6 +3986,7 @@ func TestJobEndpoint_ValidateJob_KillSignal(t *testing.T) {

func TestJobEndpoint_ValidateJobUpdate(t *testing.T) {
t.Parallel()
require := require.New(t)
old := mock.Job()
new := mock.Job()

Expand Down Expand Up @@ -3988,6 +4016,16 @@ func TestJobEndpoint_ValidateJobUpdate(t *testing.T) {
} else {
t.Log(err)
}

new = mock.Job()
new.Dispatched = true
require.Error(validateJobUpdate(old, new),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally you want to assert the error you are expecting. require.Contains(err.Error(), "foo bar")

"expected err when setting new job to dispatched")
require.Error(validateJobUpdate(nil, new),
"expected err when setting new job to dispatched")
require.Error(validateJobUpdate(new, old),
"expected err when setting dispatched to false")
require.NoError(validateJobUpdate(nil, old))
}

func TestJobEndpoint_ValidateJobUpdate_ACL(t *testing.T) {
Expand Down Expand Up @@ -4343,6 +4381,15 @@ func TestJobEndpoint_Dispatch(t *testing.T) {
if out.ParentID != tc.parameterizedJob.ID {
t.Fatalf("bad parent ID")
}
if !out.Dispatched {
t.Fatal("expected dispatched job")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a test that the parameterized stanza is on the job

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
if out.IsParameterized() {
t.Fatal("dispatched job should not be parameterized")
}
if out.ParameterizedJob == nil {
t.Fatal("parameter job config should exist")
}

if tc.noEval {
return
Expand Down
12 changes: 12 additions & 0 deletions nomad/structs/diff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,12 @@ func TestJobDiff(t *testing.T) {
Old: "true",
New: "",
},
{
Type: DiffTypeDeleted,
Name: "Dispatched",
Old: "false",
New: "",
},
{
Type: DiffTypeDeleted,
Name: "Meta[foo]",
Expand Down Expand Up @@ -213,6 +219,12 @@ func TestJobDiff(t *testing.T) {
Old: "",
New: "true",
},
{
Type: DiffTypeAdded,
Name: "Dispatched",
Old: "",
New: "false",
},
{
Type: DiffTypeAdded,
Name: "Meta[foo]",
Expand Down
6 changes: 5 additions & 1 deletion nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2018,6 +2018,10 @@ type Job struct {
// for dispatching.
ParameterizedJob *ParameterizedJobConfig

// Dispatched is used to identify if the Job has been dispatched from a
// parameterized job.
Dispatched bool

// Payload is the payload supplied when the job was dispatched.
Payload []byte

Expand Down Expand Up @@ -2328,7 +2332,7 @@ func (j *Job) IsPeriodicActive() bool {

// IsParameterized returns whether a job is parameterized job.
func (j *Job) IsParameterized() bool {
return j.ParameterizedJob != nil
return j.ParameterizedJob != nil && !j.Dispatched
}

// VaultPolicies returns the set of Vault policies per task group, per task
Expand Down