From 7e8ab115c6aa84c7251f0d3b5981d32c1592eab7 Mon Sep 17 00:00:00 2001 From: Derek Strickland Date: Fri, 9 Sep 2022 07:38:33 -0400 Subject: [PATCH 1/2] job_endpoint: check spec for all regions --- nomad/job_endpoint.go | 54 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 53 insertions(+), 1 deletion(-) diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index b94d7205420b..6e2a51ac01e2 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -349,7 +349,12 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis } // Check if the job has changed at all - if existingJob == nil || existingJob.SpecChanged(args.Job) { + specChanged, err := j.specChanged(existingJob, args) + if err != nil { + return err + } + + if existingJob == nil || specChanged { // COMPAT(1.1.0): Remove the ServerMeetMinimumVersion check to always set args.Eval // 0.12.1 introduced atomic eval job registration @@ -433,6 +438,53 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis return nil } +// specChanged checks to see if the job spec has changed. If the job is multiregion, +// it checks all regions. This handles the case of a multiregion job that has been +// stopped in one or more regions being restarted via the run command. +func (j *Job) specChanged(existingJob *structs.Job, args *structs.JobRegisterRequest) (bool, error) { + if existingJob == nil { + return false, nil + } + + if existingJob.SpecChanged(args.Job) { + return true, nil + } + + if !existingJob.IsMultiregion() { + return false, nil + } + + // If the spec hasn't changed, but this is a multiregion job, check other regions. + for _, region := range existingJob.Multiregion.Regions { + // Copy the job so we can mutate it by region and compare it with the response. + incomingJob := args.Job.Copy() + incomingJob.Region = region.Name + req := &structs.JobSpecificRequest{ + JobID: incomingJob.ID, + QueryOptions: structs.QueryOptions{ + Region: region.Name, + Namespace: incomingJob.Namespace, + AuthToken: args.AuthToken, + }, + } + resp := structs.SingleJobResponse{} + j.logger.Debug("checking spec for multiregion job") + err := j.GetJob(req, &resp) + if err != nil { + j.logger.Error("job lookup failed", "error", err) + return false, err + } + + // If the jobspec changed or is nil (purged) for that region, assume it's being re-registered + // and treat like a spec change. + if resp.Job.SpecChanged(incomingJob) || resp.Job == nil { + return true, nil + } + } + + return false, nil +} + // propagateScalingPolicyIDs propagates scaling policy IDs from existing job // to updated job, or generates random IDs in new job func propagateScalingPolicyIDs(old, new *structs.Job) error { From 5300ec56b887c93e87049ecbf1caf46f812319f9 Mon Sep 17 00:00:00 2001 From: Derek Strickland Date: Fri, 9 Sep 2022 23:06:24 -0400 Subject: [PATCH 2/2] Apply changes from code review; add changelog entry --- .changelog/14519.txt | 3 +++ nomad/job_endpoint.go | 49 +-------------------------------------- nomad/job_endpoint_oss.go | 9 +++++++ 3 files changed, 13 insertions(+), 48 deletions(-) create mode 100644 .changelog/14519.txt diff --git a/.changelog/14519.txt b/.changelog/14519.txt new file mode 100644 index 000000000000..8eae18caf7b7 --- /dev/null +++ b/.changelog/14519.txt @@ -0,0 +1,3 @@ +```release-note:bug +rpc: check for spec changes in all regions when registering multiregion jobs +``` diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 6e2a51ac01e2..50ac74e54151 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -349,7 +349,7 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis } // Check if the job has changed at all - specChanged, err := j.specChanged(existingJob, args) + specChanged, err := j.multiregionSpecChanged(existingJob, args) if err != nil { return err } @@ -438,53 +438,6 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis return nil } -// specChanged checks to see if the job spec has changed. If the job is multiregion, -// it checks all regions. This handles the case of a multiregion job that has been -// stopped in one or more regions being restarted via the run command. -func (j *Job) specChanged(existingJob *structs.Job, args *structs.JobRegisterRequest) (bool, error) { - if existingJob == nil { - return false, nil - } - - if existingJob.SpecChanged(args.Job) { - return true, nil - } - - if !existingJob.IsMultiregion() { - return false, nil - } - - // If the spec hasn't changed, but this is a multiregion job, check other regions. - for _, region := range existingJob.Multiregion.Regions { - // Copy the job so we can mutate it by region and compare it with the response. - incomingJob := args.Job.Copy() - incomingJob.Region = region.Name - req := &structs.JobSpecificRequest{ - JobID: incomingJob.ID, - QueryOptions: structs.QueryOptions{ - Region: region.Name, - Namespace: incomingJob.Namespace, - AuthToken: args.AuthToken, - }, - } - resp := structs.SingleJobResponse{} - j.logger.Debug("checking spec for multiregion job") - err := j.GetJob(req, &resp) - if err != nil { - j.logger.Error("job lookup failed", "error", err) - return false, err - } - - // If the jobspec changed or is nil (purged) for that region, assume it's being re-registered - // and treat like a spec change. - if resp.Job.SpecChanged(incomingJob) || resp.Job == nil { - return true, nil - } - } - - return false, nil -} - // propagateScalingPolicyIDs propagates scaling policy IDs from existing job // to updated job, or generates random IDs in new job func propagateScalingPolicyIDs(old, new *structs.Job) error { diff --git a/nomad/job_endpoint_oss.go b/nomad/job_endpoint_oss.go index d80281a3bcaa..a6836a554712 100644 --- a/nomad/job_endpoint_oss.go +++ b/nomad/job_endpoint_oss.go @@ -38,3 +38,12 @@ func (j *Job) multiregionStop(job *structs.Job, args *structs.JobDeregisterReque func (j *Job) interpolateMultiregionFields(args *structs.JobPlanRequest) error { return nil } + +// multiregionSpecChanged checks to see if the job spec has changed. If the job is multiregion, +// it checks all regions to determine if any deployed jobs instances have been stopped or +// otherwise differ from the incoming jobspec. Since multiregion jobs require coordinated +// deployments and synchronized job versions across all regions, a change in one requires +// redeployment of all. +func (j *Job) multiregionSpecChanged(existingJob *structs.Job, args *structs.JobRegisterRequest) (bool, error) { + return existingJob.SpecChanged(args.Job), nil +}