Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: set proper nodes on downgrade #17653

Merged
merged 3 commits into from
Jun 22, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/17653.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
scheduler: Fixed a bug that could cause replacements for failed allocations to be placed in the wrong datacenter during a canary deployment
```
45 changes: 38 additions & 7 deletions scheduler/generic_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-version"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/structs"
)
Expand Down Expand Up @@ -506,7 +507,7 @@ func (s *GenericScheduler) downgradedJobForPlacement(p placementResult) (string,
// destructive updates to place and the set of new placements to place.
func (s *GenericScheduler) computePlacements(destructive, place []placementResult) error {
// Get the base nodes
nodes, _, byDC, err := readyNodesInDCs(s.state, s.job.Datacenters)
byDC, err := s.setNodes(s.job)
if err != nil {
return err
}
Expand All @@ -516,9 +517,6 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
deploymentID = s.deployment.ID
}

// Update the set of placement nodes
s.stack.SetNodes(nodes)

// Capture current time to use as the start time for any rescheduled allocations
now := time.Now()

Expand Down Expand Up @@ -559,10 +557,17 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
continue
}

// Use downgraded job in scheduling stack to honor
// old job resources and constraints
// Use downgraded job in scheduling stack to honor old job
// resources, constraints, and datacenter.
if downgradedJob != nil {
s.stack.SetJob(downgradedJob)

if needsToSetNodes(downgradedJob, s.job) {
byDC, err = s.setNodes(downgradedJob)
if err != nil {
return err
}
}
}

// Find the preferred node
Expand Down Expand Up @@ -592,9 +597,17 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
// Compute top K scoring node metadata
s.ctx.Metrics().PopulateScoreMetaData()

// Restore stack job now that placement is done, to use plan job version
// Restore stack job and nodes now that placement is done, to use
// plan job version
if downgradedJob != nil {
s.stack.SetJob(s.job)

if needsToSetNodes(downgradedJob, s.job) {
byDC, err = s.setNodes(s.job)
if err != nil {
return err
}
}
}

// Set fields based on if we found an allocation option
Expand Down Expand Up @@ -686,6 +699,24 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
return nil
}

// setnodes updates the stack with the nodes that are ready for placement for
// the given job.
func (s *GenericScheduler) setNodes(job *structs.Job) (map[string]int, error) {
nodes, _, byDC, err := readyNodesInDCs(s.state, job.Datacenters)
if err != nil {
return nil, err
}

s.stack.SetNodes(nodes)
return byDC, nil
}

// needsToSetNodes returns true if jobs a and b changed in a way that requires
// the nodes to be reset.
func needsToSetNodes(a, b *structs.Job) bool {
return !helper.SliceSetEq(a.Datacenters, b.Datacenters)
}

// propagateTaskState copies task handles from previous allocations to
// replacement allocations when the previous allocation is being drained or was
// lost. Remote task drivers rely on this to reconnect to remote tasks when the
Expand Down
115 changes: 115 additions & 0 deletions scheduler/generic_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -851,6 +851,121 @@ func TestServiceSched_Spread(t *testing.T) {
}
}

// TestServiceSched_JobRegister_Datacenter_Downgrade tests the case where an
// allocation fails during a deployment with canaries, an the job changes its
// datacenter. The failed alloc should be placed in the datacenter of the
// original job.
lgfa29 marked this conversation as resolved.
Show resolved Hide resolved
func TestServiceSched_JobRegister_Datacenter_Downgrade(t *testing.T) {
ci.Parallel(t)

h := NewHarness(t)

// Set global scheduler configuration.
//h.State.SchedulerSetConfig(h.NextIndex(), &structs.SchedulerConfiguration{
// SchedulerAlgorithm: structs.SchedulerAlgorithmBinpack,
//})

lgfa29 marked this conversation as resolved.
Show resolved Hide resolved
// Create 5 nodes in each datacenter.
// Use two loops so nodes are separated by datacenter.
nodes := []*structs.Node{}
for i := 0; i < 5; i++ {
node := mock.Node()
node.Name = fmt.Sprintf("node-dc1-%d", i)
node.Datacenter = "dc1"
nodes = append(nodes, node)
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
}
for i := 0; i < 5; i++ {
node := mock.Node()
node.Name = fmt.Sprintf("node-dc2-%d", i)
node.Datacenter = "dc2"
nodes = append(nodes, node)
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
}

// Create first version of the test job running in dc1.
job1 := mock.Job()
job1.Version = 1
job1.Datacenters = []string{"dc1"}
job1.Status = structs.JobStatusRunning
job1.TaskGroups[0].Count = 3
job1.TaskGroups[0].Update = &structs.UpdateStrategy{
Stagger: time.Duration(30 * time.Second),
MaxParallel: 1,
HealthCheck: "checks",
MinHealthyTime: time.Duration(30 * time.Second),
HealthyDeadline: time.Duration(9 * time.Minute),
ProgressDeadline: time.Duration(10 * time.Minute),
AutoRevert: true,
Canary: 1,
}
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), job1))

// Create allocs for this job version with one being a canary and another
// marked as failed.
allocs := []*structs.Allocation{}
for i := 0; i < 3; i++ {
alloc := mock.Alloc()
alloc.Job = job1
alloc.JobID = job1.ID
alloc.NodeID = nodes[i].ID
alloc.DeploymentStatus = &structs.AllocDeploymentStatus{
Healthy: pointer.Of(true),
Timestamp: time.Now(),
Canary: false,
ModifyIndex: h.NextIndex(),
}
if i == 0 {
alloc.DeploymentStatus.Canary = true
}
if i == 1 {
alloc.ClientStatus = structs.AllocClientStatusFailed
}
allocs = append(allocs, alloc)
}
must.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), allocs))

// Update job to place it in dc2.
job2 := job1.Copy()
job2.Version = 2
job2.Datacenters = []string{"dc2"}
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), job2))

eval := &structs.Evaluation{
Namespace: job2.Namespace,
ID: uuid.Generate(),
Priority: job2.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job2.ID,
Status: structs.EvalStatusPending,
}
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))

processErr := h.Process(NewServiceScheduler, eval)
require.NoError(t, processErr, "failed to process eval")
require.Len(t, h.Plans, 1)
lgfa29 marked this conversation as resolved.
Show resolved Hide resolved

// Verify the plan places the new allocation in dc2 and the replacement
// for the failed allocation from the previous job version in dc1.
for nodeID, allocs := range h.Plans[0].NodeAllocation {
var node *structs.Node
for _, n := range nodes {
if n.ID == nodeID {
node = n
break
}
}

must.Len(t, 1, allocs)
alloc := allocs[0]
must.SliceContains(t, alloc.Job.Datacenters, node.Datacenter, must.Sprintf(
"alloc for job in datacenter %q placed in %q",
alloc.Job.Datacenters,
node.Datacenter,
))
}
}

// Test job registration with even spread across dc
func TestServiceSched_EvenSpread(t *testing.T) {
ci.Parallel(t)
Expand Down