Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a panic related to updating alloc taskgroups #5805

Merged
merged 3 commits into from
Jun 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,11 +571,11 @@ func (ar *allocRunner) clientAlloc(taskStates map[string]*structs.TaskState) *st
// Make sure we have marked the finished at for every task. This is used
// to calculate the reschedule time for failed allocations.
now := time.Now()
for _, task := range alloc.Job.LookupTaskGroup(alloc.TaskGroup).Tasks {
ts, ok := a.TaskStates[task.Name]
for taskName := range ar.tasks {
ts, ok := a.TaskStates[taskName]
if !ok {
ts = &structs.TaskState{}
a.TaskStates[task.Name] = ts
a.TaskStates[taskName] = ts
}
if ts.FinishedAt.IsZero() {
ts.FinishedAt = now
Expand Down
4 changes: 2 additions & 2 deletions nomad/plan_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,12 +339,12 @@ func evaluatePlan(pool *EvaluatePool, snap *state.StateSnapshot, plan *structs.P
defer metrics.MeasureSince([]string{"nomad", "plan", "evaluate"}, time.Now())

// Denormalize without the job
err := snap.DenormalizeAllocationsMap(plan.NodeUpdate, nil)
err := snap.DenormalizeAllocationsMap(plan.NodeUpdate)
if err != nil {
return nil, err
}
// Denormalize without the job
err = snap.DenormalizeAllocationsMap(plan.NodePreemptions, nil)
err = snap.DenormalizeAllocationsMap(plan.NodePreemptions)
if err != nil {
return nil, err
}
Expand Down
20 changes: 9 additions & 11 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,18 +230,18 @@ func (s *StateStore) UpsertPlanResults(index uint64, results *structs.ApplyPlanR
return err
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we used to update the Job field on all allocs (including stopped) in 0.9.1. The plan applier used to add both stopped and new allocs here [1] , and the state store would set the job from the plan to all allocs [2].

So something else must have changed in tandem to cause this panic in 0.9.2. Were you able to determine if any other additional changes besides the plan applier changes contributed to this?

[1] https://github.com/hashicorp/nomad/blob/v0.9.1/nomad/plan_apply.go#L168
[2] https://github.com/hashicorp/nomad/blob/v0.9.1/nomad/state/state_store.go#L191

Was there something in allocrunner that changed as well in 0.9.2?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In 0.9.1, DenormalizeAllocationJobs would set alloc.Job if it's not set already and it's not terminal: https://github.com/hashicorp/nomad/blob/v0.9.1/nomad/structs/funcs.go#L275-L286 .

I confirmed this behavior by adding some logging statements in various places. I can demo it offline.

allocsStopped, err := snapshot.DenormalizeAllocationDiffSlice(results.AllocsStopped, results.Job)
allocsStopped, err := snapshot.DenormalizeAllocationDiffSlice(results.AllocsStopped)
if err != nil {
return err
}

allocsPreempted, err := snapshot.DenormalizeAllocationDiffSlice(results.AllocsPreempted, results.Job)
allocsPreempted, err := snapshot.DenormalizeAllocationDiffSlice(results.AllocsPreempted)
if err != nil {
return err
}

// COMPAT 0.11: Remove this denormalization when NodePreemptions is removed
results.NodePreemptions, err = snapshot.DenormalizeAllocationSlice(results.NodePreemptions, results.Job)
results.NodePreemptions, err = snapshot.DenormalizeAllocationSlice(results.NodePreemptions)
if err != nil {
return err
}
Expand Down Expand Up @@ -4192,9 +4192,9 @@ type StateSnapshot struct {
// DenormalizeAllocationsMap takes in a map of nodes to allocations, and queries the
// Allocation for each of the Allocation diffs and merges the updated attributes with
// the existing Allocation, and attaches the Job provided
func (s *StateSnapshot) DenormalizeAllocationsMap(nodeAllocations map[string][]*structs.Allocation, job *structs.Job) error {
func (s *StateSnapshot) DenormalizeAllocationsMap(nodeAllocations map[string][]*structs.Allocation) error {
for nodeID, allocs := range nodeAllocations {
denormalizedAllocs, err := s.DenormalizeAllocationSlice(allocs, job)
denormalizedAllocs, err := s.DenormalizeAllocationSlice(allocs)
if err != nil {
return err
}
Expand All @@ -4207,18 +4207,18 @@ func (s *StateSnapshot) DenormalizeAllocationsMap(nodeAllocations map[string][]*
// DenormalizeAllocationSlice queries the Allocation for each allocation diff
// represented as an Allocation and merges the updated attributes with the existing
// Allocation, and attaches the Job provided.
func (s *StateSnapshot) DenormalizeAllocationSlice(allocs []*structs.Allocation, job *structs.Job) ([]*structs.Allocation, error) {
func (s *StateSnapshot) DenormalizeAllocationSlice(allocs []*structs.Allocation) ([]*structs.Allocation, error) {
allocDiffs := make([]*structs.AllocationDiff, len(allocs))
for i, alloc := range allocs {
allocDiffs[i] = alloc.AllocationDiff()
}

return s.DenormalizeAllocationDiffSlice(allocDiffs, job)
return s.DenormalizeAllocationDiffSlice(allocDiffs)
}

// DenormalizeAllocationDiffSlice queries the Allocation for each AllocationDiff and merges
// the updated attributes with the existing Allocation, and attaches the Job provided
func (s *StateSnapshot) DenormalizeAllocationDiffSlice(allocDiffs []*structs.AllocationDiff, planJob *structs.Job) ([]*structs.Allocation, error) {
func (s *StateSnapshot) DenormalizeAllocationDiffSlice(allocDiffs []*structs.AllocationDiff) ([]*structs.Allocation, error) {
// Output index for denormalized Allocations
j := 0

Expand All @@ -4233,17 +4233,15 @@ func (s *StateSnapshot) DenormalizeAllocationDiffSlice(allocDiffs []*structs.All
}

// Merge the updates to the Allocation
allocCopy := alloc.CopySkipJob()
allocCopy := alloc.Copy()

if allocDiff.PreemptedByAllocation != "" {
// If alloc is a preemption set the job from the alloc read from the state store
allocCopy.Job = alloc.Job.Copy()
allocCopy.PreemptedByAllocation = allocDiff.PreemptedByAllocation
allocCopy.DesiredDescription = getPreemptedAllocDesiredDescription(allocDiff.PreemptedByAllocation)
allocCopy.DesiredStatus = structs.AllocDesiredStatusEvict
} else {
// If alloc is a stopped alloc
allocCopy.Job = planJob
allocCopy.DesiredDescription = allocDiff.DesiredDescription
allocCopy.DesiredStatus = structs.AllocDesiredStatusStop
if allocDiff.ClientStatus != "" {
Expand Down
19 changes: 17 additions & 2 deletions nomad/state/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ func TestStateStore_UpsertPlanResults_AllocationsCreated_Denormalized(t *testing
// This test checks that:
// 1) The job is denormalized
// 2) Allocations are denormalized and updated with the diff
// That stopped allocs Job is unmodified
func TestStateStore_UpsertPlanResults_AllocationsDenormalized(t *testing.T) {
state := testStateStore(t)
alloc := mock.Alloc()
Expand All @@ -168,6 +169,12 @@ func TestStateStore_UpsertPlanResults_AllocationsDenormalized(t *testing.T) {
require.NoError(state.UpsertAllocs(900, []*structs.Allocation{stoppedAlloc, preemptedAlloc}))
require.NoError(state.UpsertJob(999, job))

// modify job and ensure that stopped and preempted alloc point to original Job
mJob := job.Copy()
mJob.TaskGroups[0].Name = "other"

require.NoError(state.UpsertJob(1001, mJob))

eval := mock.Eval()
eval.JobID = job.ID

Expand All @@ -179,7 +186,7 @@ func TestStateStore_UpsertPlanResults_AllocationsDenormalized(t *testing.T) {
AllocUpdateRequest: structs.AllocUpdateRequest{
AllocsUpdated: []*structs.Allocation{alloc},
AllocsStopped: []*structs.AllocationDiff{stoppedAllocDiff},
Job: job,
Job: mJob,
},
EvalID: eval.ID,
AllocsPreempted: []*structs.AllocationDiff{preemptedAllocDiff},
Expand All @@ -194,20 +201,27 @@ func TestStateStore_UpsertPlanResults_AllocationsDenormalized(t *testing.T) {
require.NoError(err)
assert.Equal(alloc, out)

outJob, err := state.JobByID(ws, job.Namespace, job.ID)
require.NoError(err)
require.Equal(mJob.TaskGroups, outJob.TaskGroups)
require.NotEmpty(job.TaskGroups, outJob.TaskGroups)

updatedStoppedAlloc, err := state.AllocByID(ws, stoppedAlloc.ID)
require.NoError(err)
assert.Equal(stoppedAllocDiff.DesiredDescription, updatedStoppedAlloc.DesiredDescription)
assert.Equal(structs.AllocDesiredStatusStop, updatedStoppedAlloc.DesiredStatus)
assert.Equal(stoppedAllocDiff.ClientStatus, updatedStoppedAlloc.ClientStatus)
assert.Equal(planModifyIndex, updatedStoppedAlloc.AllocModifyIndex)
assert.Equal(planModifyIndex, updatedStoppedAlloc.AllocModifyIndex)
assert.Equal(job.TaskGroups, updatedStoppedAlloc.Job.TaskGroups)

updatedPreemptedAlloc, err := state.AllocByID(ws, preemptedAlloc.ID)
require.NoError(err)
assert.Equal(structs.AllocDesiredStatusEvict, updatedPreemptedAlloc.DesiredStatus)
assert.Equal(preemptedAllocDiff.PreemptedByAllocation, updatedPreemptedAlloc.PreemptedByAllocation)
assert.Equal(planModifyIndex, updatedPreemptedAlloc.AllocModifyIndex)
assert.Equal(planModifyIndex, updatedPreemptedAlloc.AllocModifyIndex)
assert.Equal(job.TaskGroups, updatedPreemptedAlloc.Job.TaskGroups)

index, err := state.Index("allocs")
require.NoError(err)
Expand All @@ -219,6 +233,7 @@ func TestStateStore_UpsertPlanResults_AllocationsDenormalized(t *testing.T) {
require.NoError(err)
require.NotNil(evalOut)
assert.EqualValues(planModifyIndex, evalOut.ModifyIndex)

}

// This test checks that the deployment is created and allocations count towards
Expand Down Expand Up @@ -7117,7 +7132,7 @@ func TestStateSnapshot_DenormalizeAllocationDiffSlice_AllocDoesNotExist(t *testi
snap, err := state.Snapshot()
require.NoError(err)

denormalizedAllocs, err := snap.DenormalizeAllocationDiffSlice(allocDiffs, alloc.Job)
denormalizedAllocs, err := snap.DenormalizeAllocationDiffSlice(allocDiffs)

require.EqualError(err, fmt.Sprintf("alloc %v doesn't exist", alloc.ID))
require.Nil(denormalizedAllocs)
Expand Down