From 1a9dd5266f49c766b1231f6b92d86d8f02aff3f0 Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Thu, 22 Jun 2023 13:25:38 -0400 Subject: [PATCH] core: set proper nodes on downgrade (#17652) When a non-canary allocation fails during a canary deployment it must be replaced according to the original job specification, which includes resources, constraints, datacenters etc. Prior to this change, only the job was being reset in the stack, which could lead to the replacement being placed in nodes in the wrong datacenter. --- .changelog/17652.txt | 3 + scheduler/generic_sched.go | 45 +++++++++++-- scheduler/generic_sched_test.go | 110 ++++++++++++++++++++++++++++++++ 3 files changed, 151 insertions(+), 7 deletions(-) create mode 100644 .changelog/17652.txt diff --git a/.changelog/17652.txt b/.changelog/17652.txt new file mode 100644 index 00000000000..3915a404349 --- /dev/null +++ b/.changelog/17652.txt @@ -0,0 +1,3 @@ +```release-note:bug +scheduler: Fixed a bug that could cause replacements for failed allocations to be placed in the wrong datacenter during a canary deployment +``` diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 2ff2cb6ee8b..2dde5837f55 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -10,6 +10,7 @@ import ( "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-multierror" "github.com/hashicorp/go-version" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/structs" ) @@ -506,7 +507,7 @@ func (s *GenericScheduler) downgradedJobForPlacement(p placementResult) (string, // destructive updates to place and the set of new placements to place. func (s *GenericScheduler) computePlacements(destructive, place []placementResult) error { // Get the base nodes - nodes, _, byDC, err := readyNodesInDCs(s.state, s.job.Datacenters) + byDC, err := s.setNodes(s.job) if err != nil { return err } @@ -516,9 +517,6 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul deploymentID = s.deployment.ID } - // Update the set of placement nodes - s.stack.SetNodes(nodes) - // Capture current time to use as the start time for any rescheduled allocations now := time.Now() @@ -559,10 +557,17 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul continue } - // Use downgraded job in scheduling stack to honor - // old job resources and constraints + // Use downgraded job in scheduling stack to honor old job + // resources, constraints, and datacenter. if downgradedJob != nil { s.stack.SetJob(downgradedJob) + + if needsToSetNodes(downgradedJob, s.job) { + byDC, err = s.setNodes(downgradedJob) + if err != nil { + return err + } + } } // Find the preferred node @@ -592,9 +597,17 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul // Compute top K scoring node metadata s.ctx.Metrics().PopulateScoreMetaData() - // Restore stack job now that placement is done, to use plan job version + // Restore stack job and nodes now that placement is done, to use + // plan job version if downgradedJob != nil { s.stack.SetJob(s.job) + + if needsToSetNodes(downgradedJob, s.job) { + byDC, err = s.setNodes(s.job) + if err != nil { + return err + } + } } // Set fields based on if we found an allocation option @@ -686,6 +699,24 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul return nil } +// setnodes updates the stack with the nodes that are ready for placement for +// the given job. +func (s *GenericScheduler) setNodes(job *structs.Job) (map[string]int, error) { + nodes, _, byDC, err := readyNodesInDCs(s.state, job.Datacenters) + if err != nil { + return nil, err + } + + s.stack.SetNodes(nodes) + return byDC, nil +} + +// needsToSetNodes returns true if jobs a and b changed in a way that requires +// the nodes to be reset. +func needsToSetNodes(a, b *structs.Job) bool { + return !helper.SliceSetEq(a.Datacenters, b.Datacenters) +} + // propagateTaskState copies task handles from previous allocations to // replacement allocations when the previous allocation is being drained or was // lost. Remote task drivers rely on this to reconnect to remote tasks when the diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index 6be2d56c10e..7ccf9ebd384 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -851,6 +851,116 @@ func TestServiceSched_Spread(t *testing.T) { } } +// TestServiceSched_JobRegister_Datacenter_Downgrade tests the case where an +// allocation fails during a deployment with canaries, an the job changes its +// datacenter. The replacement for the failed alloc should be placed in the +// datacenter of the original job. +func TestServiceSched_JobRegister_Datacenter_Downgrade(t *testing.T) { + ci.Parallel(t) + + h := NewHarness(t) + + // Create 5 nodes in each datacenter. + // Use two loops so nodes are separated by datacenter. + nodes := []*structs.Node{} + for i := 0; i < 5; i++ { + node := mock.Node() + node.Name = fmt.Sprintf("node-dc1-%d", i) + node.Datacenter = "dc1" + nodes = append(nodes, node) + must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) + } + for i := 0; i < 5; i++ { + node := mock.Node() + node.Name = fmt.Sprintf("node-dc2-%d", i) + node.Datacenter = "dc2" + nodes = append(nodes, node) + must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) + } + + // Create first version of the test job running in dc1. + job1 := mock.Job() + job1.Version = 1 + job1.Datacenters = []string{"dc1"} + job1.Status = structs.JobStatusRunning + job1.TaskGroups[0].Count = 3 + job1.TaskGroups[0].Update = &structs.UpdateStrategy{ + Stagger: time.Duration(30 * time.Second), + MaxParallel: 1, + HealthCheck: "checks", + MinHealthyTime: time.Duration(30 * time.Second), + HealthyDeadline: time.Duration(9 * time.Minute), + ProgressDeadline: time.Duration(10 * time.Minute), + AutoRevert: true, + Canary: 1, + } + must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), job1)) + + // Create allocs for this job version with one being a canary and another + // marked as failed. + allocs := []*structs.Allocation{} + for i := 0; i < 3; i++ { + alloc := mock.Alloc() + alloc.Job = job1 + alloc.JobID = job1.ID + alloc.NodeID = nodes[i].ID + alloc.DeploymentStatus = &structs.AllocDeploymentStatus{ + Healthy: pointer.Of(true), + Timestamp: time.Now(), + Canary: false, + ModifyIndex: h.NextIndex(), + } + if i == 0 { + alloc.DeploymentStatus.Canary = true + } + if i == 1 { + alloc.ClientStatus = structs.AllocClientStatusFailed + } + allocs = append(allocs, alloc) + } + must.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), allocs)) + + // Update job to place it in dc2. + job2 := job1.Copy() + job2.Version = 2 + job2.Datacenters = []string{"dc2"} + must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), job2)) + + eval := &structs.Evaluation{ + Namespace: job2.Namespace, + ID: uuid.Generate(), + Priority: job2.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job2.ID, + Status: structs.EvalStatusPending, + } + must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval})) + + processErr := h.Process(NewServiceScheduler, eval) + must.NoError(t, processErr, must.Sprint("failed to process eval")) + must.Len(t, 1, h.Plans) + + // Verify the plan places the new allocation in dc2 and the replacement + // for the failed allocation from the previous job version in dc1. + for nodeID, allocs := range h.Plans[0].NodeAllocation { + var node *structs.Node + for _, n := range nodes { + if n.ID == nodeID { + node = n + break + } + } + + must.Len(t, 1, allocs) + alloc := allocs[0] + must.SliceContains(t, alloc.Job.Datacenters, node.Datacenter, must.Sprintf( + "alloc for job in datacenter %q placed in %q", + alloc.Job.Datacenters, + node.Datacenter, + )) + } +} + // Test job registration with even spread across dc func TestServiceSched_EvenSpread(t *testing.T) { ci.Parallel(t)