diff --git a/.changelog/17653.txt b/.changelog/17653.txt new file mode 100644 index 000000000000..3915a404349c --- /dev/null +++ b/.changelog/17653.txt @@ -0,0 +1,3 @@ +```release-note:bug +scheduler: Fixed a bug that could cause replacements for failed allocations to be placed in the wrong datacenter during a canary deployment +``` diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 8180d3080bfd..320cfee13b4b 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -10,6 +10,7 @@ import ( "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-multierror" "github.com/hashicorp/go-version" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/structs" ) @@ -506,7 +507,7 @@ func (s *GenericScheduler) downgradedJobForPlacement(p placementResult) (string, // destructive updates to place and the set of new placements to place. func (s *GenericScheduler) computePlacements(destructive, place []placementResult) error { // Get the base nodes - nodes, _, byDC, err := readyNodesInDCs(s.state, s.job.Datacenters) + byDC, err := s.setNodes(s.job) if err != nil { return err } @@ -516,9 +517,6 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul deploymentID = s.deployment.ID } - // Update the set of placement nodes - s.stack.SetNodes(nodes) - // Capture current time to use as the start time for any rescheduled allocations now := time.Now() @@ -559,10 +557,17 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul continue } - // Use downgraded job in scheduling stack to honor - // old job resources and constraints + // Use downgraded job in scheduling stack to honor old job + // resources, constraints, and datacenter. if downgradedJob != nil { s.stack.SetJob(downgradedJob) + + if needsToSetNodes(downgradedJob, s.job) { + byDC, err = s.setNodes(downgradedJob) + if err != nil { + return err + } + } } // Find the preferred node @@ -592,9 +597,17 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul // Compute top K scoring node metadata s.ctx.Metrics().PopulateScoreMetaData() - // Restore stack job now that placement is done, to use plan job version + // Restore stack job and nodes now that placement is done, to use + // plan job version if downgradedJob != nil { s.stack.SetJob(s.job) + + if needsToSetNodes(downgradedJob, s.job) { + byDC, err = s.setNodes(s.job) + if err != nil { + return err + } + } } // Set fields based on if we found an allocation option @@ -686,6 +699,24 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul return nil } +// setnodes updates the stack with the nodes that are ready for placement for +// the given job. +func (s *GenericScheduler) setNodes(job *structs.Job) (map[string]int, error) { + nodes, _, byDC, err := readyNodesInDCs(s.state, job.Datacenters) + if err != nil { + return nil, err + } + + s.stack.SetNodes(nodes) + return byDC, nil +} + +// needsToSetNodes returns true if jobs a and b changed in a way that requires +// the nodes to be reset. +func needsToSetNodes(a, b *structs.Job) bool { + return !helper.SliceSetEq(a.Datacenters, b.Datacenters) +} + // propagateTaskState copies task handles from previous allocations to // replacement allocations when the previous allocation is being drained or was // lost. Remote task drivers rely on this to reconnect to remote tasks when the diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index e3f2f46fbc31..b2c03a3eb61c 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -851,6 +851,116 @@ func TestServiceSched_Spread(t *testing.T) { } } +// TestServiceSched_JobRegister_Datacenter_Downgrade tests the case where an +// allocation fails during a deployment with canaries, an the job changes its +// datacenter. The replacement for the failed alloc should be placed in the +// datacenter of the original job. +func TestServiceSched_JobRegister_Datacenter_Downgrade(t *testing.T) { + ci.Parallel(t) + + h := NewHarness(t) + + // Create 5 nodes in each datacenter. + // Use two loops so nodes are separated by datacenter. + nodes := []*structs.Node{} + for i := 0; i < 5; i++ { + node := mock.Node() + node.Name = fmt.Sprintf("node-dc1-%d", i) + node.Datacenter = "dc1" + nodes = append(nodes, node) + must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) + } + for i := 0; i < 5; i++ { + node := mock.Node() + node.Name = fmt.Sprintf("node-dc2-%d", i) + node.Datacenter = "dc2" + nodes = append(nodes, node) + must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) + } + + // Create first version of the test job running in dc1. + job1 := mock.Job() + job1.Version = 1 + job1.Datacenters = []string{"dc1"} + job1.Status = structs.JobStatusRunning + job1.TaskGroups[0].Count = 3 + job1.TaskGroups[0].Update = &structs.UpdateStrategy{ + Stagger: time.Duration(30 * time.Second), + MaxParallel: 1, + HealthCheck: "checks", + MinHealthyTime: time.Duration(30 * time.Second), + HealthyDeadline: time.Duration(9 * time.Minute), + ProgressDeadline: time.Duration(10 * time.Minute), + AutoRevert: true, + Canary: 1, + } + must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), job1)) + + // Create allocs for this job version with one being a canary and another + // marked as failed. + allocs := []*structs.Allocation{} + for i := 0; i < 3; i++ { + alloc := mock.Alloc() + alloc.Job = job1 + alloc.JobID = job1.ID + alloc.NodeID = nodes[i].ID + alloc.DeploymentStatus = &structs.AllocDeploymentStatus{ + Healthy: pointer.Of(true), + Timestamp: time.Now(), + Canary: false, + ModifyIndex: h.NextIndex(), + } + if i == 0 { + alloc.DeploymentStatus.Canary = true + } + if i == 1 { + alloc.ClientStatus = structs.AllocClientStatusFailed + } + allocs = append(allocs, alloc) + } + must.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), allocs)) + + // Update job to place it in dc2. + job2 := job1.Copy() + job2.Version = 2 + job2.Datacenters = []string{"dc2"} + must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), job2)) + + eval := &structs.Evaluation{ + Namespace: job2.Namespace, + ID: uuid.Generate(), + Priority: job2.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job2.ID, + Status: structs.EvalStatusPending, + } + must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval})) + + processErr := h.Process(NewServiceScheduler, eval) + must.NoError(t, processErr, must.Sprint("failed to process eval")) + must.Len(t, 1, h.Plans) + + // Verify the plan places the new allocation in dc2 and the replacement + // for the failed allocation from the previous job version in dc1. + for nodeID, allocs := range h.Plans[0].NodeAllocation { + var node *structs.Node + for _, n := range nodes { + if n.ID == nodeID { + node = n + break + } + } + + must.Len(t, 1, allocs) + alloc := allocs[0] + must.SliceContains(t, alloc.Job.Datacenters, node.Datacenter, must.Sprintf( + "alloc for job in datacenter %q placed in %q", + alloc.Job.Datacenters, + node.Datacenter, + )) + } +} + // Test job registration with even spread across dc func TestServiceSched_EvenSpread(t *testing.T) { ci.Parallel(t)