Skip to content

Commit

Permalink
Enforce idempotency of dispatched jobs using special meta key
Browse files Browse the repository at this point in the history
  • Loading branch information
alexmunda committed Jun 23, 2021
1 parent 09bd336 commit 2b8ecda
Showing 1 changed file with 39 additions and 0 deletions.
39 changes: 39 additions & 0 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ const (
// DispatchPayloadSizeLimit is the maximum size of the uncompressed input
// data payload.
DispatchPayloadSizeLimit = 16 * 1024

// MetaDispatchIdempotencyKey is the meta key that when provided, is used
// to perform an idempotency check to ensure only 1 child of a parameterized job
// with the supplied key may be running (or pending) at a time.
MetaDispatchIdempotencyKey = "nomad_dispatch_idempotency_key"
)

// ErrMultipleNamespaces is send when multiple namespaces are used in the OSS setup
Expand Down Expand Up @@ -1908,6 +1913,40 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa
dispatchJob.Meta[k] = v
}

// Check to see if an idempotency key was provided on the meta
if idempotencyKey, ok := dispatchJob.Meta[MetaDispatchIdempotencyKey]; ok {
// TODO: Do parameterized jobs maintain the same prefix as their parent?
// If yes, use the prefix index so we don't have to iterate over all jobs.
iter, err := snap.Jobs(ws)
if err != nil {
return fmt.Errorf("failed to retrieve jobs for idempotency check")
}

// Iterate
for {
raw := iter.Next()
if raw == nil {
break
}

// Ensure the parent ID is an exact match
existingJob := raw.(*structs.Job)
if existingJob.ParentID != dispatchJob.ParentID {
continue
}

// Idempotency keys match. Ensure existing job is not currently running.
if ik, ok := existingJob.Meta[MetaDispatchIdempotencyKey]; ok && ik == idempotencyKey {
// The existing job is either pending or running.
// Registering a new job would violate the idempotency key.
if existingJob.Status != structs.JobStatusDead {
return fmt.Errorf("dispatch violates idempotency key of non-terminal child job: %s", existingJob.ID)
}
}
}

}

// Compress the payload
dispatchJob.Payload = snappy.Encode(nil, args.Payload)

Expand Down

0 comments on commit 2b8ecda

Please sign in to comment.