Skip to content

Commit

Permalink
Merge pull request #5805 from hashicorp/b-tg-rename-panic
Browse files Browse the repository at this point in the history
Fix a panic related to updating alloc taskgroups
  • Loading branch information
Mahmood Ali committed Jun 11, 2019
2 parents d7edf9b + 41a7fe8 commit 7423b30
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 18 deletions.
6 changes: 3 additions & 3 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,11 +571,11 @@ func (ar *allocRunner) clientAlloc(taskStates map[string]*structs.TaskState) *st
// Make sure we have marked the finished at for every task. This is used
// to calculate the reschedule time for failed allocations.
now := time.Now()
for _, task := range alloc.Job.LookupTaskGroup(alloc.TaskGroup).Tasks {
ts, ok := a.TaskStates[task.Name]
for taskName := range ar.tasks {
ts, ok := a.TaskStates[taskName]
if !ok {
ts = &structs.TaskState{}
a.TaskStates[task.Name] = ts
a.TaskStates[taskName] = ts
}
if ts.FinishedAt.IsZero() {
ts.FinishedAt = now
Expand Down
4 changes: 2 additions & 2 deletions nomad/plan_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,12 +339,12 @@ func evaluatePlan(pool *EvaluatePool, snap *state.StateSnapshot, plan *structs.P
defer metrics.MeasureSince([]string{"nomad", "plan", "evaluate"}, time.Now())

// Denormalize without the job
err := snap.DenormalizeAllocationsMap(plan.NodeUpdate, nil)
err := snap.DenormalizeAllocationsMap(plan.NodeUpdate)
if err != nil {
return nil, err
}
// Denormalize without the job
err = snap.DenormalizeAllocationsMap(plan.NodePreemptions, nil)
err = snap.DenormalizeAllocationsMap(plan.NodePreemptions)
if err != nil {
return nil, err
}
Expand Down
20 changes: 9 additions & 11 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,18 +230,18 @@ func (s *StateStore) UpsertPlanResults(index uint64, results *structs.ApplyPlanR
return err
}

allocsStopped, err := snapshot.DenormalizeAllocationDiffSlice(results.AllocsStopped, results.Job)
allocsStopped, err := snapshot.DenormalizeAllocationDiffSlice(results.AllocsStopped)
if err != nil {
return err
}

allocsPreempted, err := snapshot.DenormalizeAllocationDiffSlice(results.AllocsPreempted, results.Job)
allocsPreempted, err := snapshot.DenormalizeAllocationDiffSlice(results.AllocsPreempted)
if err != nil {
return err
}

// COMPAT 0.11: Remove this denormalization when NodePreemptions is removed
results.NodePreemptions, err = snapshot.DenormalizeAllocationSlice(results.NodePreemptions, results.Job)
results.NodePreemptions, err = snapshot.DenormalizeAllocationSlice(results.NodePreemptions)
if err != nil {
return err
}
Expand Down Expand Up @@ -4192,9 +4192,9 @@ type StateSnapshot struct {
// DenormalizeAllocationsMap takes in a map of nodes to allocations, and queries the
// Allocation for each of the Allocation diffs and merges the updated attributes with
// the existing Allocation, and attaches the Job provided
func (s *StateSnapshot) DenormalizeAllocationsMap(nodeAllocations map[string][]*structs.Allocation, job *structs.Job) error {
func (s *StateSnapshot) DenormalizeAllocationsMap(nodeAllocations map[string][]*structs.Allocation) error {
for nodeID, allocs := range nodeAllocations {
denormalizedAllocs, err := s.DenormalizeAllocationSlice(allocs, job)
denormalizedAllocs, err := s.DenormalizeAllocationSlice(allocs)
if err != nil {
return err
}
Expand All @@ -4207,18 +4207,18 @@ func (s *StateSnapshot) DenormalizeAllocationsMap(nodeAllocations map[string][]*
// DenormalizeAllocationSlice queries the Allocation for each allocation diff
// represented as an Allocation and merges the updated attributes with the existing
// Allocation, and attaches the Job provided.
func (s *StateSnapshot) DenormalizeAllocationSlice(allocs []*structs.Allocation, job *structs.Job) ([]*structs.Allocation, error) {
func (s *StateSnapshot) DenormalizeAllocationSlice(allocs []*structs.Allocation) ([]*structs.Allocation, error) {
allocDiffs := make([]*structs.AllocationDiff, len(allocs))
for i, alloc := range allocs {
allocDiffs[i] = alloc.AllocationDiff()
}

return s.DenormalizeAllocationDiffSlice(allocDiffs, job)
return s.DenormalizeAllocationDiffSlice(allocDiffs)
}

// DenormalizeAllocationDiffSlice queries the Allocation for each AllocationDiff and merges
// the updated attributes with the existing Allocation, and attaches the Job provided
func (s *StateSnapshot) DenormalizeAllocationDiffSlice(allocDiffs []*structs.AllocationDiff, planJob *structs.Job) ([]*structs.Allocation, error) {
func (s *StateSnapshot) DenormalizeAllocationDiffSlice(allocDiffs []*structs.AllocationDiff) ([]*structs.Allocation, error) {
// Output index for denormalized Allocations
j := 0

Expand All @@ -4233,17 +4233,15 @@ func (s *StateSnapshot) DenormalizeAllocationDiffSlice(allocDiffs []*structs.All
}

// Merge the updates to the Allocation
allocCopy := alloc.CopySkipJob()
allocCopy := alloc.Copy()

if allocDiff.PreemptedByAllocation != "" {
// If alloc is a preemption set the job from the alloc read from the state store
allocCopy.Job = alloc.Job.Copy()
allocCopy.PreemptedByAllocation = allocDiff.PreemptedByAllocation
allocCopy.DesiredDescription = getPreemptedAllocDesiredDescription(allocDiff.PreemptedByAllocation)
allocCopy.DesiredStatus = structs.AllocDesiredStatusEvict
} else {
// If alloc is a stopped alloc
allocCopy.Job = planJob
allocCopy.DesiredDescription = allocDiff.DesiredDescription
allocCopy.DesiredStatus = structs.AllocDesiredStatusStop
if allocDiff.ClientStatus != "" {
Expand Down
19 changes: 17 additions & 2 deletions nomad/state/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ func TestStateStore_UpsertPlanResults_AllocationsCreated_Denormalized(t *testing
// This test checks that:
// 1) The job is denormalized
// 2) Allocations are denormalized and updated with the diff
// That stopped allocs Job is unmodified
func TestStateStore_UpsertPlanResults_AllocationsDenormalized(t *testing.T) {
state := testStateStore(t)
alloc := mock.Alloc()
Expand All @@ -168,6 +169,12 @@ func TestStateStore_UpsertPlanResults_AllocationsDenormalized(t *testing.T) {
require.NoError(state.UpsertAllocs(900, []*structs.Allocation{stoppedAlloc, preemptedAlloc}))
require.NoError(state.UpsertJob(999, job))

// modify job and ensure that stopped and preempted alloc point to original Job
mJob := job.Copy()
mJob.TaskGroups[0].Name = "other"

require.NoError(state.UpsertJob(1001, mJob))

eval := mock.Eval()
eval.JobID = job.ID

Expand All @@ -179,7 +186,7 @@ func TestStateStore_UpsertPlanResults_AllocationsDenormalized(t *testing.T) {
AllocUpdateRequest: structs.AllocUpdateRequest{
AllocsUpdated: []*structs.Allocation{alloc},
AllocsStopped: []*structs.AllocationDiff{stoppedAllocDiff},
Job: job,
Job: mJob,
},
EvalID: eval.ID,
AllocsPreempted: []*structs.AllocationDiff{preemptedAllocDiff},
Expand All @@ -194,20 +201,27 @@ func TestStateStore_UpsertPlanResults_AllocationsDenormalized(t *testing.T) {
require.NoError(err)
assert.Equal(alloc, out)

outJob, err := state.JobByID(ws, job.Namespace, job.ID)
require.NoError(err)
require.Equal(mJob.TaskGroups, outJob.TaskGroups)
require.NotEmpty(job.TaskGroups, outJob.TaskGroups)

updatedStoppedAlloc, err := state.AllocByID(ws, stoppedAlloc.ID)
require.NoError(err)
assert.Equal(stoppedAllocDiff.DesiredDescription, updatedStoppedAlloc.DesiredDescription)
assert.Equal(structs.AllocDesiredStatusStop, updatedStoppedAlloc.DesiredStatus)
assert.Equal(stoppedAllocDiff.ClientStatus, updatedStoppedAlloc.ClientStatus)
assert.Equal(planModifyIndex, updatedStoppedAlloc.AllocModifyIndex)
assert.Equal(planModifyIndex, updatedStoppedAlloc.AllocModifyIndex)
assert.Equal(job.TaskGroups, updatedStoppedAlloc.Job.TaskGroups)

updatedPreemptedAlloc, err := state.AllocByID(ws, preemptedAlloc.ID)
require.NoError(err)
assert.Equal(structs.AllocDesiredStatusEvict, updatedPreemptedAlloc.DesiredStatus)
assert.Equal(preemptedAllocDiff.PreemptedByAllocation, updatedPreemptedAlloc.PreemptedByAllocation)
assert.Equal(planModifyIndex, updatedPreemptedAlloc.AllocModifyIndex)
assert.Equal(planModifyIndex, updatedPreemptedAlloc.AllocModifyIndex)
assert.Equal(job.TaskGroups, updatedPreemptedAlloc.Job.TaskGroups)

index, err := state.Index("allocs")
require.NoError(err)
Expand All @@ -219,6 +233,7 @@ func TestStateStore_UpsertPlanResults_AllocationsDenormalized(t *testing.T) {
require.NoError(err)
require.NotNil(evalOut)
assert.EqualValues(planModifyIndex, evalOut.ModifyIndex)

}

// This test checks that the deployment is created and allocations count towards
Expand Down Expand Up @@ -7117,7 +7132,7 @@ func TestStateSnapshot_DenormalizeAllocationDiffSlice_AllocDoesNotExist(t *testi
snap, err := state.Snapshot()
require.NoError(err)

denormalizedAllocs, err := snap.DenormalizeAllocationDiffSlice(allocDiffs, alloc.Job)
denormalizedAllocs, err := snap.DenormalizeAllocationDiffSlice(allocDiffs)

require.EqualError(err, fmt.Sprintf("alloc %v doesn't exist", alloc.ID))
require.Nil(denormalizedAllocs)
Expand Down

0 comments on commit 7423b30

Please sign in to comment.