From 8e453d0b8f5b22e3545cf198d4b5e303b9138a6d Mon Sep 17 00:00:00 2001 From: Nick Ethier Date: Mon, 11 Jun 2018 11:59:03 -0400 Subject: [PATCH 1/7] nomad: add 'Dispatch' field to Job New -bash: Dispatch: command not found field is used to denote if the Job is a child dispatched job of a parameterized job. --- api/jobs.go | 3 ++- nomad/job_endpoint.go | 18 +++++++++++++---- nomad/job_endpoint_test.go | 41 ++++++++++++++++++++++++++++++++++++++ nomad/structs/structs.go | 6 +++++- 4 files changed, 62 insertions(+), 6 deletions(-) diff --git a/api/jobs.go b/api/jobs.go index 2543229770da..45a7c180bfdf 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -614,6 +614,7 @@ type Job struct { Update *UpdateStrategy Periodic *PeriodicConfig ParameterizedJob *ParameterizedJobConfig + Dispatched bool Payload []byte Reschedule *ReschedulePolicy Migrate *MigrateStrategy @@ -636,7 +637,7 @@ func (j *Job) IsPeriodic() bool { // IsParameterized returns whether a job is parameterized job. func (j *Job) IsParameterized() bool { - return j.ParameterizedJob != nil + return j.ParameterizedJob != nil && !j.Dispatched } func (j *Job) Canonicalize() { diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 164899e5873d..9948b5a66504 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -124,10 +124,8 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis } // Validate job transitions if its an update - if existingJob != nil { - if err := validateJobUpdate(existingJob, args.Job); err != nil { - return err - } + if err := validateJobUpdate(existingJob, args.Job); err != nil { + return err } // Ensure that the job has permissions for the requested Vault tokens @@ -1327,6 +1325,13 @@ func validateJob(job *structs.Job) (invalid, warnings error) { // validateJobUpdate ensures updates to a job are valid. func validateJobUpdate(old, new *structs.Job) error { + if old == nil { + if new.Dispatched { + return fmt.Errorf("job can't be submitted with 'Dispatched' set") + } + return nil + } + // Type transitions are disallowed if old.Type != new.Type { return fmt.Errorf("cannot update job from type %q to %q", old.Type, new.Type) @@ -1348,6 +1353,10 @@ func validateJobUpdate(old, new *structs.Job) error { return fmt.Errorf("cannot update parameterized job to being non-parameterized") } + if old.Dispatched != new.Dispatched { + return fmt.Errorf("field 'Dispatched' is read-only") + } + return nil } @@ -1403,6 +1412,7 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa dispatchJob.ParentID = parameterizedJob.ID dispatchJob.Name = dispatchJob.ID dispatchJob.SetSubmitTime() + dispatchJob.Dispatched = true // Merge in the meta data for k, v := range args.Meta { diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index fe51fbabee2c..3aab1a14b3a7 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -458,6 +458,33 @@ func TestJobEndpoint_Register_ParameterizedJob(t *testing.T) { } } +func TestJobEndpoint_Register_Dispatched(t *testing.T) { + t.Parallel() + s1 := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request with a job with 'Dispatch' set to true + job := mock.Job() + job.Dispatched = true + req := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + + // Fetch the response + var resp structs.JobRegisterResponse + err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp) + if err == nil || !strings.Contains(err.Error(), "'Dispatched' is read-only") { + t.Fatalf("expected dispatch read-only error: %v", err) + } +} func TestJobEndpoint_Register_EnforceIndex(t *testing.T) { t.Parallel() s1 := TestServer(t, func(c *Config) { @@ -3959,6 +3986,7 @@ func TestJobEndpoint_ValidateJob_KillSignal(t *testing.T) { func TestJobEndpoint_ValidateJobUpdate(t *testing.T) { t.Parallel() + require := require.New(t) old := mock.Job() new := mock.Job() @@ -3988,6 +4016,16 @@ func TestJobEndpoint_ValidateJobUpdate(t *testing.T) { } else { t.Log(err) } + + new = mock.Job() + new.Dispatched = true + require.Error(validateJobUpdate(old, new), + "expected err when setting new job to dispatched") + require.Error(validateJobUpdate(nil, new), + "expected err when setting new job to dispatched") + require.Error(validateJobUpdate(new, old), + "expected err when setting dispatched to false") + require.NoError(validateJobUpdate(nil, old)) } func TestJobEndpoint_ValidateJobUpdate_ACL(t *testing.T) { @@ -4343,6 +4381,9 @@ func TestJobEndpoint_Dispatch(t *testing.T) { if out.ParentID != tc.parameterizedJob.ID { t.Fatalf("bad parent ID") } + if !out.Dispatched { + t.Fatal("expected dispatched job") + } if tc.noEval { return diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 93ad1bdbfa96..969f1133887a 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -2018,6 +2018,10 @@ type Job struct { // for dispatching. ParameterizedJob *ParameterizedJobConfig + // Dispatched is used to identify if the Job has been dispatched from a + // parameterized job. + Dispatched bool + // Payload is the payload supplied when the job was dispatched. Payload []byte @@ -2328,7 +2332,7 @@ func (j *Job) IsPeriodicActive() bool { // IsParameterized returns whether a job is parameterized job. func (j *Job) IsParameterized() bool { - return j.ParameterizedJob != nil + return j.ParameterizedJob != nil && !j.Dispatched } // VaultPolicies returns the set of Vault policies per task group, per task From e8f9b40321b9613b5299048042ba1d9dd3fe3198 Mon Sep 17 00:00:00 2001 From: Nick Ethier Date: Mon, 11 Jun 2018 11:59:23 -0400 Subject: [PATCH 2/7] Revert "Revert "client/driver/env: interpolate empty optional meta params as empty strings"" This reverts commit c17e0fc9dc5fd288935ab2b68fb441b4d25ac189. --- CHANGELOG.md | 2 ++ client/driver/env/env.go | 18 +++++++++++++++++- client/driver/env/env_test.go | 16 ++++++++++++++++ 3 files changed, 35 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2a56e73b698b..03c5f28f0964 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,8 @@ IMPROVEMENTS: image pulls [[GH-4192](https://github.com/hashicorp/nomad/issues/4192)] * driver/raw_exec: Use cgroups to manage process tree for precise cleanup of launched processes [[GH-4350](https://github.com/hashicorp/nomad/issues/4350)] + * env: Default interpolation of optional meta fields of parameterized jobs to + an empty string rather than the field key. [[GH-3720](https://github.com/hashicorp/nomad/issues/3720)] * ui: Show node drain, node eligibility, and node drain strategy information in the Client list and Client detail pages [[GH-4353](https://github.com/hashicorp/nomad/issues/4353)] * ui: Show reschedule-event information for allocations that were server-side rescheduled [[GH-4254](https://github.com/hashicorp/nomad/issues/4254)] * ui: Show the running deployment Progress Deadlines on the Job Detail Page [[GH-4388](https://github.com/hashicorp/nomad/issues/4388)] diff --git a/client/driver/env/env.go b/client/driver/env/env.go index 923ebaa656dc..19f934967415 100644 --- a/client/driver/env/env.go +++ b/client/driver/env/env.go @@ -397,7 +397,23 @@ func (b *Builder) setAlloc(alloc *structs.Allocation) *Builder { // Set meta combined := alloc.Job.CombinedTaskMeta(alloc.TaskGroup, b.taskName) - b.taskMeta = make(map[string]string, len(combined)*2) + // taskMetaSize is double to total meta keys to account for given and upper + // cased values + taskMetaSize := len(combined) * 2 + + // if job is parameterized initialize optional meta to empty strings + if alloc.Job.IsParameterized() { + b.taskMeta = make(map[string]string, + taskMetaSize+(len(alloc.Job.ParameterizedJob.MetaOptional)*2)) + + for _, k := range alloc.Job.ParameterizedJob.MetaOptional { + b.taskMeta[fmt.Sprintf("%s%s", MetaPrefix, strings.ToUpper(k))] = "" + b.taskMeta[fmt.Sprintf("%s%s", MetaPrefix, k)] = "" + } + } else { + b.taskMeta = make(map[string]string, taskMetaSize) + } + for k, v := range combined { b.taskMeta[fmt.Sprintf("%s%s", MetaPrefix, strings.ToUpper(k))] = v b.taskMeta[fmt.Sprintf("%s%s", MetaPrefix, k)] = v diff --git a/client/driver/env/env_test.go b/client/driver/env/env_test.go index 6ea4e72e60d1..35a70329283a 100644 --- a/client/driver/env/env_test.go +++ b/client/driver/env/env_test.go @@ -11,6 +11,7 @@ import ( cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" + "github.com/stretchr/testify/require" ) const ( @@ -372,3 +373,18 @@ func TestEnvironment_UpdateTask(t *testing.T) { t.Errorf("Expected NOMAD_META_taskmeta to be unset but found: %q", v) } } + +// TestEnvironment_InterpolateEmptyOptionalMeta asserts that in a parameterized +// job, if an optional meta field is not set, it will get interpolated as an +// empty string. +func TestEnvironment_InterpolateEmptyOptionalMeta(t *testing.T) { + a := mock.Alloc() + a.Job.ParameterizedJob = &structs.ParameterizedJobConfig{ + MetaOptional: []string{"metaopt1", "metaopt2"}, + } + task := a.Job.TaskGroups[0].Tasks[0] + task.Meta = map[string]string{"metaopt1": "metaopt1val"} + env := NewBuilder(mock.Node(), a, task, "global").Build() + require.Equal(t, "metaopt1val", env.ReplaceEnv("${NOMAD_META_metaopt1}")) + require.Empty(t, env.ReplaceEnv("${NOMAD_META_metaopt2}")) +} From e325e85a19554e998114201d738a366641aa23bb Mon Sep 17 00:00:00 2001 From: Nick Ethier Date: Mon, 11 Jun 2018 12:01:19 -0400 Subject: [PATCH 3/7] client/driver/env: use 'job.Dispatch' to trigger optional meta logic --- client/driver/env/env.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/driver/env/env.go b/client/driver/env/env.go index 19f934967415..56cd89bf8f6d 100644 --- a/client/driver/env/env.go +++ b/client/driver/env/env.go @@ -402,7 +402,7 @@ func (b *Builder) setAlloc(alloc *structs.Allocation) *Builder { taskMetaSize := len(combined) * 2 // if job is parameterized initialize optional meta to empty strings - if alloc.Job.IsParameterized() { + if alloc.Job.Dispatched { b.taskMeta = make(map[string]string, taskMetaSize+(len(alloc.Job.ParameterizedJob.MetaOptional)*2)) From 81abf0a7d28424cde5a3a72a5c03330688ccf861 Mon Sep 17 00:00:00 2001 From: Nick Ethier Date: Mon, 11 Jun 2018 12:29:13 -0400 Subject: [PATCH 4/7] client/driver/env: fix optional meta test --- client/driver/env/env_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/client/driver/env/env_test.go b/client/driver/env/env_test.go index 35a70329283a..8214bacb2e3d 100644 --- a/client/driver/env/env_test.go +++ b/client/driver/env/env_test.go @@ -382,6 +382,7 @@ func TestEnvironment_InterpolateEmptyOptionalMeta(t *testing.T) { a.Job.ParameterizedJob = &structs.ParameterizedJobConfig{ MetaOptional: []string{"metaopt1", "metaopt2"}, } + a.Job.Dispatched = true task := a.Job.TaskGroups[0].Tasks[0] task.Meta = map[string]string{"metaopt1": "metaopt1val"} env := NewBuilder(mock.Node(), a, task, "global").Build() From 304b752a50a4d5216fc773bdaf28a79912735654 Mon Sep 17 00:00:00 2001 From: Nick Ethier Date: Mon, 11 Jun 2018 13:06:49 -0400 Subject: [PATCH 5/7] nomad/structs: fix job diff test --- nomad/structs/diff_test.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/nomad/structs/diff_test.go b/nomad/structs/diff_test.go index 879522ecdb69..30a59c7bdfa8 100644 --- a/nomad/structs/diff_test.go +++ b/nomad/structs/diff_test.go @@ -150,6 +150,12 @@ func TestJobDiff(t *testing.T) { Old: "true", New: "", }, + { + Type: DiffTypeDeleted, + Name: "Dispatched", + Old: "false", + New: "", + }, { Type: DiffTypeDeleted, Name: "Meta[foo]", @@ -213,6 +219,12 @@ func TestJobDiff(t *testing.T) { Old: "", New: "true", }, + { + Type: DiffTypeAdded, + Name: "Dispatched", + Old: "", + New: "false", + }, { Type: DiffTypeAdded, Name: "Meta[foo]", From e79d31480710afbba913b65bb6052d42788c9cb0 Mon Sep 17 00:00:00 2001 From: Nick Ethier Date: Mon, 11 Jun 2018 13:27:48 -0400 Subject: [PATCH 6/7] nomad: code review comments --- nomad/job_endpoint.go | 2 +- nomad/job_endpoint_test.go | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 9948b5a66504..d1e2af642ad6 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -1325,6 +1325,7 @@ func validateJob(job *structs.Job) (invalid, warnings error) { // validateJobUpdate ensures updates to a job are valid. func validateJobUpdate(old, new *structs.Job) error { + // Validate Dispatch not set on new Jobs if old == nil { if new.Dispatched { return fmt.Errorf("job can't be submitted with 'Dispatched' set") @@ -1407,7 +1408,6 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa // Derive the child job and commit it via Raft dispatchJob := parameterizedJob.Copy() - dispatchJob.ParameterizedJob = nil dispatchJob.ID = structs.DispatchedID(parameterizedJob.ID, time.Now()) dispatchJob.ParentID = parameterizedJob.ID dispatchJob.Name = dispatchJob.ID diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index 3aab1a14b3a7..a1d389bdce87 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -4384,6 +4384,12 @@ func TestJobEndpoint_Dispatch(t *testing.T) { if !out.Dispatched { t.Fatal("expected dispatched job") } + if out.IsParameterized() { + t.Fatal("dispatched job should not be parameterized") + } + if out.ParameterizedJob == nil { + t.Fatal("parameter job config should exist") + } if tc.noEval { return From f00875aecf0f4bcbab9a567189c45743fd44b976 Mon Sep 17 00:00:00 2001 From: Nick Ethier Date: Mon, 11 Jun 2018 13:50:50 -0400 Subject: [PATCH 7/7] nomad: use require pkg for tests --- client/driver/env/env.go | 4 ++-- client/driver/env/env_test.go | 5 +++-- nomad/job_endpoint_test.go | 6 +++--- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/client/driver/env/env.go b/client/driver/env/env.go index 56cd89bf8f6d..c59ef807ecdb 100644 --- a/client/driver/env/env.go +++ b/client/driver/env/env.go @@ -403,8 +403,8 @@ func (b *Builder) setAlloc(alloc *structs.Allocation) *Builder { // if job is parameterized initialize optional meta to empty strings if alloc.Job.Dispatched { - b.taskMeta = make(map[string]string, - taskMetaSize+(len(alloc.Job.ParameterizedJob.MetaOptional)*2)) + optionalMetaCount := len(alloc.Job.ParameterizedJob.MetaOptional) + b.taskMeta = make(map[string]string, taskMetaSize+optionalMetaCount*2) for _, k := range alloc.Job.ParameterizedJob.MetaOptional { b.taskMeta[fmt.Sprintf("%s%s", MetaPrefix, strings.ToUpper(k))] = "" diff --git a/client/driver/env/env_test.go b/client/driver/env/env_test.go index 8214bacb2e3d..01a712e57cd0 100644 --- a/client/driver/env/env_test.go +++ b/client/driver/env/env_test.go @@ -378,6 +378,7 @@ func TestEnvironment_UpdateTask(t *testing.T) { // job, if an optional meta field is not set, it will get interpolated as an // empty string. func TestEnvironment_InterpolateEmptyOptionalMeta(t *testing.T) { + require := require.New(t) a := mock.Alloc() a.Job.ParameterizedJob = &structs.ParameterizedJobConfig{ MetaOptional: []string{"metaopt1", "metaopt2"}, @@ -386,6 +387,6 @@ func TestEnvironment_InterpolateEmptyOptionalMeta(t *testing.T) { task := a.Job.TaskGroups[0].Tasks[0] task.Meta = map[string]string{"metaopt1": "metaopt1val"} env := NewBuilder(mock.Node(), a, task, "global").Build() - require.Equal(t, "metaopt1val", env.ReplaceEnv("${NOMAD_META_metaopt1}")) - require.Empty(t, env.ReplaceEnv("${NOMAD_META_metaopt2}")) + require.Equal("metaopt1val", env.ReplaceEnv("${NOMAD_META_metaopt1}")) + require.Empty(env.ReplaceEnv("${NOMAD_META_metaopt2}")) } diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index a1d389bdce87..3d70d281c37c 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -460,6 +460,7 @@ func TestJobEndpoint_Register_ParameterizedJob(t *testing.T) { func TestJobEndpoint_Register_Dispatched(t *testing.T) { t.Parallel() + require := require.New(t) s1 := TestServer(t, func(c *Config) { c.NumSchedulers = 0 // Prevent automatic dequeue }) @@ -481,9 +482,8 @@ func TestJobEndpoint_Register_Dispatched(t *testing.T) { // Fetch the response var resp structs.JobRegisterResponse err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp) - if err == nil || !strings.Contains(err.Error(), "'Dispatched' is read-only") { - t.Fatalf("expected dispatch read-only error: %v", err) - } + require.Error(err) + require.Contains(err.Error(), "job can't be submitted with 'Dispatched'") } func TestJobEndpoint_Register_EnforceIndex(t *testing.T) { t.Parallel()