From 2b8ecdaef83beb91bb10f1b01023940b531bd6ea Mon Sep 17 00:00:00 2001 From: Alex Munda Date: Wed, 23 Jun 2021 16:51:59 -0500 Subject: [PATCH] Enforce idempotency of dispatched jobs using special meta key --- nomad/job_endpoint.go | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 5a298fd9ac58..ac822d10ed8e 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -32,6 +32,11 @@ const ( // DispatchPayloadSizeLimit is the maximum size of the uncompressed input // data payload. DispatchPayloadSizeLimit = 16 * 1024 + + // MetaDispatchIdempotencyKey is the meta key that when provided, is used + // to perform an idempotency check to ensure only 1 child of a parameterized job + // with the supplied key may be running (or pending) at a time. + MetaDispatchIdempotencyKey = "nomad_dispatch_idempotency_key" ) // ErrMultipleNamespaces is send when multiple namespaces are used in the OSS setup @@ -1908,6 +1913,40 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa dispatchJob.Meta[k] = v } + // Check to see if an idempotency key was provided on the meta + if idempotencyKey, ok := dispatchJob.Meta[MetaDispatchIdempotencyKey]; ok { + // TODO: Do parameterized jobs maintain the same prefix as their parent? + // If yes, use the prefix index so we don't have to iterate over all jobs. + iter, err := snap.Jobs(ws) + if err != nil { + return fmt.Errorf("failed to retrieve jobs for idempotency check") + } + + // Iterate + for { + raw := iter.Next() + if raw == nil { + break + } + + // Ensure the parent ID is an exact match + existingJob := raw.(*structs.Job) + if existingJob.ParentID != dispatchJob.ParentID { + continue + } + + // Idempotency keys match. Ensure existing job is not currently running. + if ik, ok := existingJob.Meta[MetaDispatchIdempotencyKey]; ok && ik == idempotencyKey { + // The existing job is either pending or running. + // Registering a new job would violate the idempotency key. + if existingJob.Status != structs.JobStatusDead { + return fmt.Errorf("dispatch violates idempotency key of non-terminal child job: %s", existingJob.ID) + } + } + } + + } + // Compress the payload dispatchJob.Payload = snappy.Encode(nil, args.Payload)