Skip to content

Commit

Permalink
Add idempotency token to dispatch request instead of special meta key
Browse files Browse the repository at this point in the history
  • Loading branch information
alexmunda committed Jun 29, 2021
1 parent b5d21a9 commit 2a58824
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 30 deletions.
23 changes: 20 additions & 3 deletions api/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,22 @@ func (j *Jobs) Dispatch(jobID string, meta map[string]string,
return &resp, wm, nil
}

func (j *Jobs) DispatchIdempotent(jobID string, meta map[string]string,
payload []byte, idempotencyToken string, q *WriteOptions) (*JobDispatchResponse, *WriteMeta, error) {
var resp JobDispatchResponse
req := &JobDispatchRequest{
JobID: jobID,
Meta: meta,
Payload: payload,
IdempotencyToken: idempotencyToken,
}
wm, err := j.client.write("/v1/job/"+url.PathEscape(jobID)+"/dispatch", req, &resp, q)
if err != nil {
return nil, nil, err
}
return &resp, wm, nil
}

// Revert is used to revert the given job to the passed version. If
// enforceVersion is set, the job is only reverted if the current version is at
// the passed version.
Expand Down Expand Up @@ -1262,9 +1278,10 @@ type DesiredUpdates struct {
}

type JobDispatchRequest struct {
JobID string
Payload []byte
Meta map[string]string
JobID string
Payload []byte
Meta map[string]string
IdempotencyToken string
}

type JobDispatchResponse struct {
Expand Down
21 changes: 8 additions & 13 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,6 @@ const (
// DispatchPayloadSizeLimit is the maximum size of the uncompressed input
// data payload.
DispatchPayloadSizeLimit = 16 * 1024

// MetaDispatchIdempotencyKey is the meta key that when provided, is used
// to perform an idempotency check to ensure only 1 child of a parameterized job
// with the supplied key may be running (or pending) at a time.
MetaDispatchIdempotencyKey = "nomad_dispatch_idempotency_key"
)

// ErrMultipleNamespaces is send when multiple namespaces are used in the OSS setup
Expand Down Expand Up @@ -1904,6 +1899,7 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa
dispatchJob.Dispatched = true
dispatchJob.Status = ""
dispatchJob.StatusDescription = ""
dispatchJob.DispatchIdempotencyToken = args.IdempotencyToken

// Merge in the meta data
for k, v := range args.Meta {
Expand All @@ -1913,8 +1909,8 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa
dispatchJob.Meta[k] = v
}

// Check to see if an idempotency key was provided on the meta
if idempotencyKey, ok := dispatchJob.Meta[MetaDispatchIdempotencyKey]; ok {
// Check to see if an idempotency token was specified on the request
if args.IdempotencyToken != "" {
// Fetch all jobs that match the parameterized job ID prefix
iter, err := snap.JobsByIDPrefix(ws, parameterizedJob.Namespace, parameterizedJob.ID)
if err != nil {
Expand All @@ -1934,12 +1930,12 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa
continue
}

// Idempotency keys match. Ensure existing job is not currently running.
if ik, ok := existingJob.Meta[MetaDispatchIdempotencyKey]; ok && ik == idempotencyKey {
// Idempotency tokens match. Ensure existing job is terminal.
if existingJob.DispatchIdempotencyToken == args.IdempotencyToken {
// The existing job is either pending or running.
// Registering a new job would violate the idempotency key.
// Registering a new job would violate the idempotency token.
if existingJob.Status != structs.JobStatusDead {
return fmt.Errorf("dispatch violates idempotency key of non-terminal child job: %s", existingJob.ID)
return fmt.Errorf("dispatch violates idempotency token of non-terminal child job: %s", existingJob.ID)
}
}
}
Expand Down Expand Up @@ -2039,8 +2035,7 @@ func validateDispatchRequest(req *structs.JobDispatchRequest, job *structs.Job)
for k := range req.Meta {
_, req := required[k]
_, opt := optional[k]
// Always allow the idempotency key
if !req && !opt && k != MetaDispatchIdempotencyKey {
if !req && !opt {
unpermitted[k] = struct{}{}
}
}
Expand Down
20 changes: 9 additions & 11 deletions nomad/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6133,10 +6133,8 @@ func TestJobEndpoint_Dispatch(t *testing.T) {
reqInputDataTooLarge := &structs.JobDispatchRequest{
Payload: make([]byte, DispatchPayloadSizeLimit+100),
}
reqIdempotentMeta := &structs.JobDispatchRequest{
Meta: map[string]string{
MetaDispatchIdempotencyKey: "foo",
},
reqIdempotentToken := &structs.JobDispatchRequest{
IdempotencyToken: "foo",
}

type existingIdempotentChildJob struct {
Expand Down Expand Up @@ -6244,26 +6242,26 @@ func TestJobEndpoint_Dispatch(t *testing.T) {
errStr: "stopped",
},
{
name: "idempotent meta key, no existing child job",
name: "idempotency token, no existing child job",
parameterizedJob: d1,
dispatchReq: reqIdempotentMeta,
dispatchReq: reqIdempotentToken,
err: false,
existingIdempotentJob: nil,
},
{
name: "idempotent meta key, w/ existing non-terminal child job",
name: "idempotency token, w/ existing non-terminal child job",
parameterizedJob: d1,
dispatchReq: reqIdempotentMeta,
dispatchReq: reqIdempotentToken,
err: true,
errStr: "dispatch violates idempotency key of non-terminal child job",
errStr: "dispatch violates idempotency token of non-terminal child job",
existingIdempotentJob: &existingIdempotentChildJob{
isTerminal: false,
},
},
{
name: "idempotent meta key, w/ existing terminal job",
name: "idempotency token, w/ existing terminal job",
parameterizedJob: d1,
dispatchReq: reqIdempotentMeta,
dispatchReq: reqIdempotentToken,
err: false,
existingIdempotentJob: &existingIdempotentChildJob{
isTerminal: true,
Expand Down
11 changes: 8 additions & 3 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -728,9 +728,10 @@ type JobScaleStatusRequest struct {

// JobDispatchRequest is used to dispatch a job based on a parameterized job
type JobDispatchRequest struct {
JobID string
Payload []byte
Meta map[string]string
JobID string
Payload []byte
Meta map[string]string
IdempotencyToken string
WriteRequest
}

Expand Down Expand Up @@ -4016,6 +4017,10 @@ type Job struct {
// parameterized job.
Dispatched bool

// DispatchIdempotencyToken is optionally used to ensure that a dispatched job does not have any
// non-terminal siblings which have the same token value.
DispatchIdempotencyToken string

// Payload is the payload supplied when the job was dispatched.
Payload []byte

Expand Down

0 comments on commit 2a58824

Please sign in to comment.