From c7975700ed942797e71127c4f66689136bb189e4 Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Thu, 22 Jul 2021 19:03:22 -0400 Subject: [PATCH 1/5] add dispatch idempotency token support in the CLI --- api/jobs.go | 66 +++++++++-------- command/job_dispatch.go | 29 +++++++- command/job_status.go | 11 ++- nomad/structs/structs.go | 74 ++++++++++--------- website/content/api-docs/jobs.mdx | 4 + .../content/docs/commands/job/dispatch.mdx | 29 ++++++++ 6 files changed, 138 insertions(+), 75 deletions(-) diff --git a/api/jobs.go b/api/jobs.go index 416fce1fa579..059c77a195fa 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -813,21 +813,22 @@ type Job struct { /* Fields set by server, not sourced from job config file */ - Stop *bool - ParentID *string - Dispatched bool - Payload []byte - ConsulNamespace *string `mapstructure:"consul_namespace"` - VaultNamespace *string `mapstructure:"vault_namespace"` - NomadTokenID *string `mapstructure:"nomad_token_id"` - Status *string - StatusDescription *string - Stable *bool - Version *uint64 - SubmitTime *int64 - CreateIndex *uint64 - ModifyIndex *uint64 - JobModifyIndex *uint64 + Stop *bool + ParentID *string + Dispatched bool + DispatchIdempotencyToken *string + Payload []byte + ConsulNamespace *string `mapstructure:"consul_namespace"` + VaultNamespace *string `mapstructure:"vault_namespace"` + NomadTokenID *string `mapstructure:"nomad_token_id"` + Status *string + StatusDescription *string + Stable *bool + Version *uint64 + SubmitTime *int64 + CreateIndex *uint64 + ModifyIndex *uint64 + JobModifyIndex *uint64 } // IsPeriodic returns whether a job is periodic. @@ -987,23 +988,24 @@ type TaskGroupSummary struct { // JobListStub is used to return a subset of information about // jobs during list operations. type JobListStub struct { - ID string - ParentID string - Name string - Namespace string `json:",omitempty"` - Datacenters []string - Type string - Priority int - Periodic bool - ParameterizedJob bool - Stop bool - Status string - StatusDescription string - JobSummary *JobSummary - CreateIndex uint64 - ModifyIndex uint64 - JobModifyIndex uint64 - SubmitTime int64 + ID string + ParentID string + Name string + Namespace string `json:",omitempty"` + Datacenters []string + Type string + Priority int + Periodic bool + ParameterizedJob bool + DispatchIdempotencyToken string + Stop bool + Status string + StatusDescription string + JobSummary *JobSummary + CreateIndex uint64 + ModifyIndex uint64 + JobModifyIndex uint64 + SubmitTime int64 } // JobIDSort is used to sort jobs by their job ID's. diff --git a/command/job_dispatch.go b/command/job_dispatch.go index 68e9863ab384..825a70a77af5 100644 --- a/command/job_dispatch.go +++ b/command/job_dispatch.go @@ -6,6 +6,7 @@ import ( "os" "strings" + "github.com/hashicorp/nomad/api" flaghelper "github.com/hashicorp/nomad/helper/flags" "github.com/posener/complete" ) @@ -23,6 +24,10 @@ Usage: nomad job dispatch [options] [input source] path to a file. Metadata can be supplied by using the meta flag one or more times. + An optional idempotency token can be used to prevent more than one instance + of the job to be dispatched. If an instance with the same token already + exists, the command returns without any action. + Upon successful creation, the dispatched job ID will be printed and the triggered evaluation will be monitored. This can be disabled by supplying the detach flag. @@ -48,6 +53,10 @@ Dispatch Options: the evaluation ID will be printed to the screen, which can be used to examine the evaluation using the eval-status command. + -idempotency-token + Optional identifier used to prevent more than one instance of the job from + being dispatched. + -verbose Display full information. ` @@ -61,9 +70,10 @@ func (c *JobDispatchCommand) Synopsis() string { func (c *JobDispatchCommand) AutocompleteFlags() complete.Flags { return mergeAutocompleteFlags(c.Meta.AutocompleteFlags(FlagSetClient), complete.Flags{ - "-meta": complete.PredictAnything, - "-detach": complete.PredictNothing, - "-verbose": complete.PredictNothing, + "-meta": complete.PredictAnything, + "-detach": complete.PredictNothing, + "-idempotency-token": complete.PredictAnything, + "-verbose": complete.PredictNothing, }) } @@ -95,12 +105,14 @@ func (c *JobDispatchCommand) Name() string { return "job dispatch" } func (c *JobDispatchCommand) Run(args []string) int { var detach, verbose bool + var idempotencyToken string var meta []string flags := c.Meta.FlagSet(c.Name(), FlagSetClient) flags.Usage = func() { c.Ui.Output(c.Help()) } flags.BoolVar(&detach, "detach", false, "") flags.BoolVar(&verbose, "verbose", false, "") + flags.StringVar(&idempotencyToken, "idempotency-token", "", "") flags.Var((*flaghelper.StringFlag)(&meta), "meta", "") if err := flags.Parse(args); err != nil { @@ -159,7 +171,10 @@ func (c *JobDispatchCommand) Run(args []string) int { } // Dispatch the job - resp, _, err := client.Jobs().Dispatch(job, metaMap, payload, nil) + w := &api.WriteOptions{ + IdempotencyToken: idempotencyToken, + } + resp, _, err := client.Jobs().Dispatch(job, metaMap, payload, w) if err != nil { c.Ui.Error(fmt.Sprintf("Failed to dispatch job: %s", err)) return 1 @@ -169,6 +184,12 @@ func (c *JobDispatchCommand) Run(args []string) int { // eval. evalCreated := resp.EvalID != "" + // See if dispatched job was skipped due to idempotency. + if !evalCreated && idempotencyToken != "" { + c.Ui.Output(fmt.Sprintf("Job %q already dispatched with idempotency token %q.", resp.DispatchedJobID, idempotencyToken)) + return 0 + } + basic := []string{ fmt.Sprintf("Dispatched Job ID|%s", resp.DispatchedJobID), } diff --git a/command/job_status.go b/command/job_status.go index 1637b6b58719..4c1d5826f2df 100644 --- a/command/job_status.go +++ b/command/job_status.go @@ -189,6 +189,10 @@ func (c *JobStatusCommand) Run(args []string) int { fmt.Sprintf("Parameterized|%v", parameterized), } + if job.DispatchIdempotencyToken != nil && *job.DispatchIdempotencyToken != "" { + basic = append(basic, fmt.Sprintf("Idempotency Token|%v", *job.DispatchIdempotencyToken)) + } + if periodic && !parameterized { if *job.Stop { basic = append(basic, "Next Periodic Launch|none (job stopped)") @@ -302,7 +306,7 @@ func (c *JobStatusCommand) outputParameterizedInfo(client *api.Client, job *api. } out := make([]string, 1) - out[0] = "ID|Status" + out[0] = "ID|Status|Idempotency Token" for _, child := range children { // Ensure that we are only showing jobs whose parent is the requested // job. @@ -310,9 +314,10 @@ func (c *JobStatusCommand) outputParameterizedInfo(client *api.Client, job *api. continue } - out = append(out, fmt.Sprintf("%s|%s", + out = append(out, fmt.Sprintf("%s|%s|%s", child.ID, - child.Status)) + child.Status, + child.DispatchIdempotencyToken)) } c.Ui.Output(c.Colorize().Color("\n[bold]Dispatched Jobs[reset]")) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index a773ba1071c4..86fff46340b8 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -4409,24 +4409,25 @@ func (j *Job) HasUpdateStrategy() bool { // Stub is used to return a summary of the job func (j *Job) Stub(summary *JobSummary) *JobListStub { return &JobListStub{ - ID: j.ID, - Namespace: j.Namespace, - ParentID: j.ParentID, - Name: j.Name, - Datacenters: j.Datacenters, - Multiregion: j.Multiregion, - Type: j.Type, - Priority: j.Priority, - Periodic: j.IsPeriodic(), - ParameterizedJob: j.IsParameterized(), - Stop: j.Stop, - Status: j.Status, - StatusDescription: j.StatusDescription, - CreateIndex: j.CreateIndex, - ModifyIndex: j.ModifyIndex, - JobModifyIndex: j.JobModifyIndex, - SubmitTime: j.SubmitTime, - JobSummary: summary, + ID: j.ID, + Namespace: j.Namespace, + ParentID: j.ParentID, + Name: j.Name, + Datacenters: j.Datacenters, + Multiregion: j.Multiregion, + Type: j.Type, + Priority: j.Priority, + Periodic: j.IsPeriodic(), + ParameterizedJob: j.IsParameterized(), + DispatchIdempotencyToken: j.DispatchIdempotencyToken, + Stop: j.Stop, + Status: j.Status, + StatusDescription: j.StatusDescription, + CreateIndex: j.CreateIndex, + ModifyIndex: j.ModifyIndex, + JobModifyIndex: j.JobModifyIndex, + SubmitTime: j.SubmitTime, + JobSummary: summary, } } @@ -4581,24 +4582,25 @@ func (j *Job) SetSubmitTime() { // JobListStub is used to return a subset of job information // for the job list type JobListStub struct { - ID string - ParentID string - Name string - Namespace string `json:",omitempty"` - Datacenters []string - Multiregion *Multiregion - Type string - Priority int - Periodic bool - ParameterizedJob bool - Stop bool - Status string - StatusDescription string - JobSummary *JobSummary - CreateIndex uint64 - ModifyIndex uint64 - JobModifyIndex uint64 - SubmitTime int64 + ID string + ParentID string + Name string + Namespace string `json:",omitempty"` + Datacenters []string + Multiregion *Multiregion + Type string + Priority int + Periodic bool + ParameterizedJob bool + DispatchIdempotencyToken string + Stop bool + Status string + StatusDescription string + JobSummary *JobSummary + CreateIndex uint64 + ModifyIndex uint64 + JobModifyIndex uint64 + SubmitTime int64 } // JobSummary summarizes the state of the allocations of a job diff --git a/website/content/api-docs/jobs.mdx b/website/content/api-docs/jobs.mdx index 1a2bdd85a0d4..3fffaf83fe93 100644 --- a/website/content/api-docs/jobs.mdx +++ b/website/content/api-docs/jobs.mdx @@ -1617,6 +1617,10 @@ The table below shows this endpoint's support for - `:job_id` `(string: )` - Specifies the ID of the job (as specified in the job file during submission). This is specified as part of the path. +- `idempotency_token` `(string: "")` - Optional identifier used to prevent more + than one instance of the job from being dispatched. This is specified as a + URL query parameter. + - `Payload` `(string: "")` - Specifies a base64 encoded string containing the payload. This is limited to 16384 bytes (16KiB). diff --git a/website/content/docs/commands/job/dispatch.mdx b/website/content/docs/commands/job/dispatch.mdx index 41f33be1884a..d87dd4c00c5e 100644 --- a/website/content/docs/commands/job/dispatch.mdx +++ b/website/content/docs/commands/job/dispatch.mdx @@ -30,6 +30,11 @@ flag one or more times. The payload has a **size limit of 16384 bytes (16KiB)**. +An optional idempotency token can be specified to prevent dispatching more than +one instance of the same job. The token can have any value and will be matched +with existing jobs. If an instance with the same token already exists, the job +will not be dispatched. + Upon successful creation, the dispatched job ID will be printed and the triggered evaluation will be monitored. This can be disabled by supplying the detach flag. @@ -58,6 +63,9 @@ capability for the job's namespace. will be output, which can be used to examine the evaluation using the [eval status] command +- `-idempotency-token`: Optional identifier used to prevent more than one + instance of the job from being dispatched. + - `-verbose`: Show full information. ## Examples @@ -109,5 +117,26 @@ Dispatched Job ID = example/dispatch-1485380684-c37b3dba Evaluation ID = d9034c4e ``` +Dispatch with an idempotency token for the first time: + +```shell-session +$ nomad job dispatch -idempotency-token=prod video-encode video-config.json +Dispatched Job ID = video-encode/dispatch-1485379325-cb38d00d +Evaluation ID = 31199841 + +==> Monitoring evaluation "31199841" + Evaluation triggered by job "example/dispatch-1485379325-cb38d00d" + Allocation "8254b85f" created: node "82ff9c50", group "cache" + Evaluation status changed: "pending" -> "complete" +==> Evaluation "31199841" finished with status "complete" +``` + +Dispatch with the same idempotency token: + +```shell-session +$ nomad job dispatch -idempotency-token=prod video-encode video-config.json +Job "video-encode/dispatch-1485379325-cb38d00d" already dispatched with idempotency token "prod". +``` + [eval status]: /docs/commands/eval-status [parameterized job]: /docs/job-specification/parameterized 'Nomad parameterized Job Specification' From 39333d9fc947dc3dfc946f4cab1ddfcf83478d4a Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Thu, 21 Oct 2021 18:26:52 -0400 Subject: [PATCH 2/5] remove `DispatchIdempotencyToken` from JobListStub --- api/jobs.go | 35 +++++++++---------- command/job_status.go | 11 ++---- nomad/structs/structs.go | 74 +++++++++++++++++++--------------------- 3 files changed, 56 insertions(+), 64 deletions(-) diff --git a/api/jobs.go b/api/jobs.go index 059c77a195fa..46a3fff74910 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -988,24 +988,23 @@ type TaskGroupSummary struct { // JobListStub is used to return a subset of information about // jobs during list operations. type JobListStub struct { - ID string - ParentID string - Name string - Namespace string `json:",omitempty"` - Datacenters []string - Type string - Priority int - Periodic bool - ParameterizedJob bool - DispatchIdempotencyToken string - Stop bool - Status string - StatusDescription string - JobSummary *JobSummary - CreateIndex uint64 - ModifyIndex uint64 - JobModifyIndex uint64 - SubmitTime int64 + ID string + ParentID string + Name string + Namespace string `json:",omitempty"` + Datacenters []string + Type string + Priority int + Periodic bool + ParameterizedJob bool + Stop bool + Status string + StatusDescription string + JobSummary *JobSummary + CreateIndex uint64 + ModifyIndex uint64 + JobModifyIndex uint64 + SubmitTime int64 } // JobIDSort is used to sort jobs by their job ID's. diff --git a/command/job_status.go b/command/job_status.go index 4c1d5826f2df..1637b6b58719 100644 --- a/command/job_status.go +++ b/command/job_status.go @@ -189,10 +189,6 @@ func (c *JobStatusCommand) Run(args []string) int { fmt.Sprintf("Parameterized|%v", parameterized), } - if job.DispatchIdempotencyToken != nil && *job.DispatchIdempotencyToken != "" { - basic = append(basic, fmt.Sprintf("Idempotency Token|%v", *job.DispatchIdempotencyToken)) - } - if periodic && !parameterized { if *job.Stop { basic = append(basic, "Next Periodic Launch|none (job stopped)") @@ -306,7 +302,7 @@ func (c *JobStatusCommand) outputParameterizedInfo(client *api.Client, job *api. } out := make([]string, 1) - out[0] = "ID|Status|Idempotency Token" + out[0] = "ID|Status" for _, child := range children { // Ensure that we are only showing jobs whose parent is the requested // job. @@ -314,10 +310,9 @@ func (c *JobStatusCommand) outputParameterizedInfo(client *api.Client, job *api. continue } - out = append(out, fmt.Sprintf("%s|%s|%s", + out = append(out, fmt.Sprintf("%s|%s", child.ID, - child.Status, - child.DispatchIdempotencyToken)) + child.Status)) } c.Ui.Output(c.Colorize().Color("\n[bold]Dispatched Jobs[reset]")) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 86fff46340b8..a773ba1071c4 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -4409,25 +4409,24 @@ func (j *Job) HasUpdateStrategy() bool { // Stub is used to return a summary of the job func (j *Job) Stub(summary *JobSummary) *JobListStub { return &JobListStub{ - ID: j.ID, - Namespace: j.Namespace, - ParentID: j.ParentID, - Name: j.Name, - Datacenters: j.Datacenters, - Multiregion: j.Multiregion, - Type: j.Type, - Priority: j.Priority, - Periodic: j.IsPeriodic(), - ParameterizedJob: j.IsParameterized(), - DispatchIdempotencyToken: j.DispatchIdempotencyToken, - Stop: j.Stop, - Status: j.Status, - StatusDescription: j.StatusDescription, - CreateIndex: j.CreateIndex, - ModifyIndex: j.ModifyIndex, - JobModifyIndex: j.JobModifyIndex, - SubmitTime: j.SubmitTime, - JobSummary: summary, + ID: j.ID, + Namespace: j.Namespace, + ParentID: j.ParentID, + Name: j.Name, + Datacenters: j.Datacenters, + Multiregion: j.Multiregion, + Type: j.Type, + Priority: j.Priority, + Periodic: j.IsPeriodic(), + ParameterizedJob: j.IsParameterized(), + Stop: j.Stop, + Status: j.Status, + StatusDescription: j.StatusDescription, + CreateIndex: j.CreateIndex, + ModifyIndex: j.ModifyIndex, + JobModifyIndex: j.JobModifyIndex, + SubmitTime: j.SubmitTime, + JobSummary: summary, } } @@ -4582,25 +4581,24 @@ func (j *Job) SetSubmitTime() { // JobListStub is used to return a subset of job information // for the job list type JobListStub struct { - ID string - ParentID string - Name string - Namespace string `json:",omitempty"` - Datacenters []string - Multiregion *Multiregion - Type string - Priority int - Periodic bool - ParameterizedJob bool - DispatchIdempotencyToken string - Stop bool - Status string - StatusDescription string - JobSummary *JobSummary - CreateIndex uint64 - ModifyIndex uint64 - JobModifyIndex uint64 - SubmitTime int64 + ID string + ParentID string + Name string + Namespace string `json:",omitempty"` + Datacenters []string + Multiregion *Multiregion + Type string + Priority int + Periodic bool + ParameterizedJob bool + Stop bool + Status string + StatusDescription string + JobSummary *JobSummary + CreateIndex uint64 + ModifyIndex uint64 + JobModifyIndex uint64 + SubmitTime int64 } // JobSummary summarizes the state of the allocations of a job From b8bc3e543263574eb0aabfc35b5d7e1e4ebf39ec Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Thu, 21 Oct 2021 18:35:28 -0400 Subject: [PATCH 3/5] remove special case for job dispatched with existing token --- command/job_dispatch.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/command/job_dispatch.go b/command/job_dispatch.go index 825a70a77af5..b7ce752128ff 100644 --- a/command/job_dispatch.go +++ b/command/job_dispatch.go @@ -184,12 +184,6 @@ func (c *JobDispatchCommand) Run(args []string) int { // eval. evalCreated := resp.EvalID != "" - // See if dispatched job was skipped due to idempotency. - if !evalCreated && idempotencyToken != "" { - c.Ui.Output(fmt.Sprintf("Job %q already dispatched with idempotency token %q.", resp.DispatchedJobID, idempotencyToken)) - return 0 - } - basic := []string{ fmt.Sprintf("Dispatched Job ID|%s", resp.DispatchedJobID), } From c57751baf62f3fe9d7b05dd12a4d529b3c5b50c3 Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Thu, 21 Oct 2021 18:40:05 -0400 Subject: [PATCH 4/5] display idempotency token in job status if available --- command/job_status.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/command/job_status.go b/command/job_status.go index 1637b6b58719..5cb735cdfb02 100644 --- a/command/job_status.go +++ b/command/job_status.go @@ -189,6 +189,10 @@ func (c *JobStatusCommand) Run(args []string) int { fmt.Sprintf("Parameterized|%v", parameterized), } + if job.DispatchIdempotencyToken != nil && *job.DispatchIdempotencyToken != "" { + basic = append(basic, fmt.Sprintf("Idempotency Token|%v", *job.DispatchIdempotencyToken)) + } + if periodic && !parameterized { if *job.Stop { basic = append(basic, "Next Periodic Launch|none (job stopped)") From 91d89f65e90b8f322e2c3c29b1ff6b9a4abd4d60 Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Thu, 21 Oct 2021 19:02:57 -0400 Subject: [PATCH 5/5] changelog: add entry for #10930 --- .changelog/10930.txt | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 .changelog/10930.txt diff --git a/.changelog/10930.txt b/.changelog/10930.txt new file mode 100644 index 000000000000..ed8832285be3 --- /dev/null +++ b/.changelog/10930.txt @@ -0,0 +1,3 @@ +```release-note:improvement +cli: Add `-idempotency-token` option for the `nomad job dispatch` command +```