Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: ensure dup alloc names are fixed before plan submit. #18873

Merged
merged 4 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/18873.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
scheduler: Ensure duplicate allocation IDs are tracked and fixed when performing job updates
```
12 changes: 9 additions & 3 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -10679,13 +10679,19 @@ func (a *Allocation) JobNamespacedID() NamespacedID {
// Index returns the index of the allocation. If the allocation is from a task
// group with count greater than 1, there will be multiple allocations for it.
func (a *Allocation) Index() uint {
l := len(a.Name)
prefix := len(a.JobID) + len(a.TaskGroup) + 2
return AllocIndexFromName(a.Name, a.JobID, a.TaskGroup)
jrasell marked this conversation as resolved.
Show resolved Hide resolved
}

// AllocIndexFromName returns the index of an allocation given its name, the
// jobID and the task group name.
func AllocIndexFromName(allocName, jobID, taskGroup string) uint {
l := len(allocName)
prefix := len(jobID) + len(taskGroup) + 2
if l <= 3 || l <= prefix {
return uint(0)
}

strNum := a.Name[prefix : len(a.Name)-1]
strNum := allocName[prefix : len(allocName)-1]
num, _ := strconv.Atoi(strNum)
return uint(num)
}
Expand Down
35 changes: 32 additions & 3 deletions scheduler/generic_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ func (s *GenericScheduler) computeJobAllocs() error {
s.queuedAllocs[p.placeTaskGroup.Name] += 1
destructive = append(destructive, p)
}
return s.computePlacements(destructive, place)
return s.computePlacements(destructive, place, results.taskGroupAllocNameIndexes)
}

// downgradedJobForPlacement returns the job appropriate for non-canary placement replacement
Expand Down Expand Up @@ -508,7 +508,8 @@ func (s *GenericScheduler) downgradedJobForPlacement(p placementResult) (string,

// computePlacements computes placements for allocations. It is given the set of
// destructive updates to place and the set of new placements to place.
func (s *GenericScheduler) computePlacements(destructive, place []placementResult) error {
func (s *GenericScheduler) computePlacements(destructive, place []placementResult, nameIndex map[string]*allocNameIndex) error {

// Get the base nodes
nodes, byDC, err := s.setNodes(s.job)
if err != nil {
Expand All @@ -531,6 +532,12 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
// Get the task group
tg := missing.TaskGroup()

// This is populated from the reconciler via the compute results,
// therefore we cannot have an allocation belonging to a task group
// that has not generated and been through allocation name index
// tracking.
taskGroupNameIndex := nameIndex[tg.Name]

var downgradedJob *structs.Job

if missing.DowngradeNonCanary() {
Expand Down Expand Up @@ -628,12 +635,34 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
resources.Shared.Ports = option.AllocResources.Ports
}

// Pull the allocation name as a new variables, so we can alter
// this as needed without making changes to the original
// object.
newAllocName := missing.Name()
Comment on lines +638 to +641
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I'm misunderstanding it, but this comment seems a bit inaccurate. newAllocName doesn't seem to be modified, and even if it were strings are constants, so it shouldn't affect missing? 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

newAllocName is potentially modified on L654 if it is found to be a duplicate.


// Identify the index from the name, so we can check this
// against the allocation name index tracking for duplicates.
allocIndex := structs.AllocIndexFromName(newAllocName, s.job.ID, tg.Name)

// If the allocation index is a duplicate, we cannot simply
// create a new allocation with the same name. We need to
// generate a new index and use this. The log message is useful
// for debugging and development, but could be removed in a
// future version of Nomad.
if taskGroupNameIndex.IsDuplicate(allocIndex) {
oldAllocName := newAllocName
newAllocName = taskGroupNameIndex.Next(1)[0]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not immediately obvious to me how this could cause any problems, but given that we're trying to avoid duplicate names it could useful to further investigate if this bit of code could cause problems:

// We have exhausted the free set, now just pick overlapping indexes
var i uint
for i = 0; i < remainder; i++ {
next = append(next, structs.AllocName(a.job, a.taskGroup, i))
a.b.Set(i)
}

Maybe if, somehow, the count value differs between the time the allocNameIndex is created and the call to Next() (like in a job update? or a version revert?) we could maybe hit an overlap? 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good thread to pull on and something I will take a look at once this PR has been merged. This resolves a reproducible manifestation of the bug, so I would like to get that fixed given the time pressures before opening up new investigations.

taskGroupNameIndex.UnsetIndex(allocIndex)
s.logger.Debug("duplicate alloc index found and changed",
"old_alloc_name", oldAllocName, "new_alloc_name", newAllocName)
}

// Create an allocation for this
alloc := &structs.Allocation{
ID: uuid.Generate(),
Namespace: s.job.Namespace,
EvalID: s.eval.ID,
Name: missing.Name(),
Name: newAllocName,
JobID: s.job.ID,
TaskGroup: tg.Name,
Metrics: s.ctx.Metrics(),
Expand Down
209 changes: 209 additions & 0 deletions scheduler/generic_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1970,6 +1970,215 @@ func TestServiceSched_JobModify(t *testing.T) {
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}

func TestServiceSched_JobModify_ExistingDuplicateAllocIndex(t *testing.T) {
ci.Parallel(t)

testHarness := NewHarness(t)

// Create some nodes
var nodes []*structs.Node
for i := 0; i < 10; i++ {
node := mock.Node()
nodes = append(nodes, node)
must.NoError(t, testHarness.State.UpsertNode(structs.MsgTypeTestSetup, testHarness.NextIndex(), node))
}

// Generate a fake job with allocations
mockJob := mock.Job()
must.NoError(t, testHarness.State.UpsertJob(structs.MsgTypeTestSetup, testHarness.NextIndex(), nil, mockJob))

// Generate some allocations which will represent our pre-existing
// allocations. These have aggressive duplicate names.
var allocs []*structs.Allocation
for i := 0; i < 10; i++ {
alloc := mock.Alloc()
alloc.Job = mockJob
alloc.JobID = mockJob.ID
alloc.NodeID = nodes[i].ID

alloc.Name = fmt.Sprintf("my-job.web[%d]", i)

if i%2 == 0 {
alloc.Name = "my-job.web[0]"
}
allocs = append(allocs, alloc)
}
must.NoError(t, testHarness.State.UpsertAllocs(structs.MsgTypeTestSetup, testHarness.NextIndex(), allocs))

// Generate a job modification which will force a destructive update.
mockJob2 := mock.Job()
mockJob2.ID = mockJob.ID
mockJob2.TaskGroups[0].Tasks[0].Config["command"] = "/bin/other"
must.NoError(t, testHarness.State.UpsertJob(structs.MsgTypeTestSetup, testHarness.NextIndex(), nil, mockJob2))
jrasell marked this conversation as resolved.
Show resolved Hide resolved

// Create a mock evaluation which represents work to reconcile the job
// update.
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: 50,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: mockJob2.ID,
Status: structs.EvalStatusPending,
}
must.NoError(t, testHarness.State.UpsertEvals(structs.MsgTypeTestSetup, testHarness.NextIndex(), []*structs.Evaluation{eval}))

// Process the evaluation and ensure we get a single plan as a result.
must.NoError(t, testHarness.Process(NewServiceScheduler, eval))
must.Len(t, 1, testHarness.Plans)

// Iterate and track the node allocations to ensure we have the correct
// amount, and that there a now no duplicate names.
totalNodeAllocations := 0
allocIndexNames := make(map[string]int)

for _, planNodeAlloc := range testHarness.Plans[0].NodeAllocation {
for _, nodeAlloc := range planNodeAlloc {
totalNodeAllocations++
allocIndexNames[nodeAlloc.Name]++

if val, ok := allocIndexNames[nodeAlloc.Name]; ok && val > 1 {
t.Fatalf("found duplicate alloc name %q found", nodeAlloc.Name)
}
}
}
must.Eq(t, 10, totalNodeAllocations)

testHarness.AssertEvalStatus(t, structs.EvalStatusComplete)
}

func TestServiceSched_JobModify_ProposedDuplicateAllocIndex(t *testing.T) {
ci.Parallel(t)

testHarness := NewHarness(t)

// Create some nodes
var nodes []*structs.Node
for i := 0; i < 10; i++ {
node := mock.Node()
nodes = append(nodes, node)
must.NoError(t, testHarness.State.UpsertNode(structs.MsgTypeTestSetup, testHarness.NextIndex(), node))
}

// Generate a job which includes a canary update strategy.
mockJob := mock.MinJob()
mockJob.TaskGroups[0].Count = 3
mockJob.Update = structs.UpdateStrategy{
Canary: 1,
MaxParallel: 3,
}
must.NoError(t, testHarness.State.UpsertJob(structs.MsgTypeTestSetup, testHarness.NextIndex(), nil, mockJob))

// Generate some allocations which will represent our pre-existing
// allocations.
var allocs []*structs.Allocation
for i := 0; i < 3; i++ {
alloc := mock.MinAlloc()
alloc.Namespace = structs.DefaultNamespace
alloc.Job = mockJob
alloc.JobID = mockJob.ID
alloc.NodeID = nodes[i].ID
alloc.Name = structs.AllocName(mockJob.ID, mockJob.TaskGroups[0].Name, uint(i))
allocs = append(allocs, alloc)
}
must.NoError(t, testHarness.State.UpsertAllocs(structs.MsgTypeTestSetup, testHarness.NextIndex(), allocs))

// Generate a job modification which will force a destructive update as
// well as a scaling.
mockJob2 := mockJob.Copy()
mockJob2.Version++
mockJob2.TaskGroups[0].Tasks[0].Config["command"] = "/bin/other"
mockJob2.TaskGroups[0].Count++
must.NoError(t, testHarness.State.UpsertJob(structs.MsgTypeTestSetup, testHarness.NextIndex(), nil, mockJob2))

nextRaftIndex := testHarness.NextIndex()
deploymentID := uuid.Generate()

// Upsert a canary into state, this represents the first stage of the
// deployment process and jumps us to the point where duplicate allocation
// indexes could be produced.
canaryAlloc := mock.MinAlloc()
canaryAlloc.Namespace = structs.DefaultNamespace
canaryAlloc.Job = mockJob2
canaryAlloc.JobID = mockJob2.ID
canaryAlloc.NodeID = nodes[1].ID
canaryAlloc.Name = structs.AllocName(mockJob2.ID, mockJob2.TaskGroups[0].Name, uint(0))
canaryAlloc.DeploymentID = deploymentID
canaryAlloc.ClientStatus = structs.AllocClientStatusRunning
must.NoError(t, testHarness.State.UpsertAllocs(structs.MsgTypeTestSetup, nextRaftIndex, []*structs.Allocation{
canaryAlloc,
}))

// Craft our deployment object which represents the post-canary state. This
// unblocks the rest of the deployment process, where we replace the old
// job version allocations.
canaryDeployment := structs.Deployment{
ID: deploymentID,
Namespace: mockJob2.Namespace,
JobID: mockJob2.ID,
JobVersion: mockJob2.Version,
TaskGroups: map[string]*structs.DeploymentState{
mockJob2.TaskGroups[0].Name: {
Promoted: true,
DesiredTotal: 4,
HealthyAllocs: 1,
PlacedAllocs: 1,
PlacedCanaries: []string{canaryAlloc.ID},
},
},
Status: structs.DeploymentStatusRunning,
StatusDescription: structs.DeploymentStatusDescriptionRunning,
EvalPriority: 50,
JobCreateIndex: mockJob2.CreateIndex,
}
must.NoError(t, testHarness.State.UpsertDeployment(nextRaftIndex, &canaryDeployment))

// Create a mock evaluation which represents work to reconcile the job
// update.
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: 50,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: mockJob2.ID,
Status: structs.EvalStatusPending,
DeploymentID: deploymentID,
}
must.NoError(t, testHarness.State.UpsertEvals(structs.MsgTypeTestSetup, testHarness.NextIndex(), []*structs.Evaluation{eval}))

// Process the evaluation and ensure we get a single plan as a result.
must.NoError(t, testHarness.Process(NewServiceScheduler, eval))
must.Len(t, 1, testHarness.Plans)

// Iterate and track the node allocations to ensure we have the correct
// amount, and that there a now no duplicate names. Before the duplicate
// allocation name fix, this section of testing would fail.
totalNodeAllocations := 0
allocIndexNames := map[string]int{canaryAlloc.Name: 1}

for _, planNodeAlloc := range testHarness.Plans[0].NodeAllocation {
for _, nodeAlloc := range planNodeAlloc {
totalNodeAllocations++
allocIndexNames[nodeAlloc.Name]++

if val, ok := allocIndexNames[nodeAlloc.Name]; ok && val > 1 {
t.Fatalf("found duplicate alloc name %q found", nodeAlloc.Name)
}
}
}
must.Eq(t, 3, totalNodeAllocations)

// Ensure the correct number of destructive node updates.
totalNodeUpdates := 0

for _, planNodeUpdate := range testHarness.Plans[0].NodeUpdate {
totalNodeUpdates += len(planNodeUpdate)
}
must.Eq(t, 3, totalNodeUpdates)

testHarness.AssertEvalStatus(t, structs.EvalStatusComplete)
}

func TestServiceSched_JobModify_Datacenters(t *testing.T) {
ci.Parallel(t)

Expand Down
30 changes: 24 additions & 6 deletions scheduler/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,13 @@ type reconcileResults struct {
// desiredFollowupEvals is the map of follow up evaluations to create per task group
// This is used to create a delayed evaluation for rescheduling failed allocations.
desiredFollowupEvals map[string][]*structs.Evaluation

// taskGroupAllocNameIndexes is a tracking of the allocation name index,
// keyed by the task group name. This is stored within the results, so the
// generic scheduler can use this to perform duplicate alloc index checks
// before submitting the plan. This is always non-nil and is handled within
// a single routine, so does not require a mutex.
taskGroupAllocNameIndexes map[string]*allocNameIndex
}

// delayedRescheduleInfo contains the allocation id and a time when its eligible to be rescheduled.
Expand Down Expand Up @@ -193,11 +200,12 @@ func NewAllocReconciler(logger log.Logger, allocUpdateFn allocUpdateType, batch
supportsDisconnectedClients: supportsDisconnectedClients,
now: time.Now(),
result: &reconcileResults{
attributeUpdates: make(map[string]*structs.Allocation),
disconnectUpdates: make(map[string]*structs.Allocation),
reconnectUpdates: make(map[string]*structs.Allocation),
desiredTGUpdates: make(map[string]*structs.DesiredUpdates),
desiredFollowupEvals: make(map[string][]*structs.Evaluation),
attributeUpdates: make(map[string]*structs.Allocation),
disconnectUpdates: make(map[string]*structs.Allocation),
reconnectUpdates: make(map[string]*structs.Allocation),
desiredTGUpdates: make(map[string]*structs.DesiredUpdates),
desiredFollowupEvals: make(map[string][]*structs.Evaluation),
taskGroupAllocNameIndexes: make(map[string]*allocNameIndex),
},
}
}
Expand Down Expand Up @@ -481,6 +489,7 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool {
// which is the union of untainted, rescheduled, allocs on migrating
// nodes, and allocs on down nodes (includes canaries)
nameIndex := newAllocNameIndex(a.jobID, groupName, tg.Count, untainted.union(migrate, rescheduleNow, lost))
a.result.taskGroupAllocNameIndexes[groupName] = nameIndex

// Stop any unneeded allocations and update the untainted set to not
// include stopped allocations.
Expand Down Expand Up @@ -968,7 +977,16 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc
untainted = untainted.difference(canaries)
}

// Hot path the nothing to do case
// Hot path the nothing to do case.
//
// Duplicate allocation indexes can be caused due to the way this piece of
// code works. The reproduction involved canaries, and performing both a
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to make I understood, this hot path can return early with duplicate alloc names? Is there a way to avoid that so that callers don't have to handle them?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment has been clarified, hopefully that makes things clearer? I spent a while looking to see if I could fix the problem at the source, but I wasn't able to figure out a way.

// destructive job change (image update) and increasing the group count by
// one. In this scenario, once the canary is placed and we compute the next
// set of stops, the following maths takes place compared to a canary
// deployment that does not generate duplicate indexes.
// NonBuggy: Untainted = "4" and GroupCount = "3" | remove = "1"
// YesBuggy: Untainted = "4" and GroupCount = "4" | remove = "0"
jrasell marked this conversation as resolved.
Show resolved Hide resolved
remove := len(untainted) + len(migrate) - group.Count
if remove <= 0 {
return stop
Expand Down
Loading