diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index e9efa120f15f..544f9ba6ffae 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -571,11 +571,11 @@ func (ar *allocRunner) clientAlloc(taskStates map[string]*structs.TaskState) *st // Make sure we have marked the finished at for every task. This is used // to calculate the reschedule time for failed allocations. now := time.Now() - for _, task := range alloc.Job.LookupTaskGroup(alloc.TaskGroup).Tasks { - ts, ok := a.TaskStates[task.Name] + for taskName := range ar.tasks { + ts, ok := a.TaskStates[taskName] if !ok { ts = &structs.TaskState{} - a.TaskStates[task.Name] = ts + a.TaskStates[taskName] = ts } if ts.FinishedAt.IsZero() { ts.FinishedAt = now diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index f9b8d7c63a23..92873d0d52ba 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -339,12 +339,12 @@ func evaluatePlan(pool *EvaluatePool, snap *state.StateSnapshot, plan *structs.P defer metrics.MeasureSince([]string{"nomad", "plan", "evaluate"}, time.Now()) // Denormalize without the job - err := snap.DenormalizeAllocationsMap(plan.NodeUpdate, nil) + err := snap.DenormalizeAllocationsMap(plan.NodeUpdate) if err != nil { return nil, err } // Denormalize without the job - err = snap.DenormalizeAllocationsMap(plan.NodePreemptions, nil) + err = snap.DenormalizeAllocationsMap(plan.NodePreemptions) if err != nil { return nil, err } diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 6da9f6e8f014..e38099091f4e 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -230,18 +230,18 @@ func (s *StateStore) UpsertPlanResults(index uint64, results *structs.ApplyPlanR return err } - allocsStopped, err := snapshot.DenormalizeAllocationDiffSlice(results.AllocsStopped, results.Job) + allocsStopped, err := snapshot.DenormalizeAllocationDiffSlice(results.AllocsStopped) if err != nil { return err } - allocsPreempted, err := snapshot.DenormalizeAllocationDiffSlice(results.AllocsPreempted, results.Job) + allocsPreempted, err := snapshot.DenormalizeAllocationDiffSlice(results.AllocsPreempted) if err != nil { return err } // COMPAT 0.11: Remove this denormalization when NodePreemptions is removed - results.NodePreemptions, err = snapshot.DenormalizeAllocationSlice(results.NodePreemptions, results.Job) + results.NodePreemptions, err = snapshot.DenormalizeAllocationSlice(results.NodePreemptions) if err != nil { return err } @@ -4192,9 +4192,9 @@ type StateSnapshot struct { // DenormalizeAllocationsMap takes in a map of nodes to allocations, and queries the // Allocation for each of the Allocation diffs and merges the updated attributes with // the existing Allocation, and attaches the Job provided -func (s *StateSnapshot) DenormalizeAllocationsMap(nodeAllocations map[string][]*structs.Allocation, job *structs.Job) error { +func (s *StateSnapshot) DenormalizeAllocationsMap(nodeAllocations map[string][]*structs.Allocation) error { for nodeID, allocs := range nodeAllocations { - denormalizedAllocs, err := s.DenormalizeAllocationSlice(allocs, job) + denormalizedAllocs, err := s.DenormalizeAllocationSlice(allocs) if err != nil { return err } @@ -4207,18 +4207,18 @@ func (s *StateSnapshot) DenormalizeAllocationsMap(nodeAllocations map[string][]* // DenormalizeAllocationSlice queries the Allocation for each allocation diff // represented as an Allocation and merges the updated attributes with the existing // Allocation, and attaches the Job provided. -func (s *StateSnapshot) DenormalizeAllocationSlice(allocs []*structs.Allocation, job *structs.Job) ([]*structs.Allocation, error) { +func (s *StateSnapshot) DenormalizeAllocationSlice(allocs []*structs.Allocation) ([]*structs.Allocation, error) { allocDiffs := make([]*structs.AllocationDiff, len(allocs)) for i, alloc := range allocs { allocDiffs[i] = alloc.AllocationDiff() } - return s.DenormalizeAllocationDiffSlice(allocDiffs, job) + return s.DenormalizeAllocationDiffSlice(allocDiffs) } // DenormalizeAllocationDiffSlice queries the Allocation for each AllocationDiff and merges // the updated attributes with the existing Allocation, and attaches the Job provided -func (s *StateSnapshot) DenormalizeAllocationDiffSlice(allocDiffs []*structs.AllocationDiff, planJob *structs.Job) ([]*structs.Allocation, error) { +func (s *StateSnapshot) DenormalizeAllocationDiffSlice(allocDiffs []*structs.AllocationDiff) ([]*structs.Allocation, error) { // Output index for denormalized Allocations j := 0 @@ -4233,17 +4233,15 @@ func (s *StateSnapshot) DenormalizeAllocationDiffSlice(allocDiffs []*structs.All } // Merge the updates to the Allocation - allocCopy := alloc.CopySkipJob() + allocCopy := alloc.Copy() if allocDiff.PreemptedByAllocation != "" { // If alloc is a preemption set the job from the alloc read from the state store - allocCopy.Job = alloc.Job.Copy() allocCopy.PreemptedByAllocation = allocDiff.PreemptedByAllocation allocCopy.DesiredDescription = getPreemptedAllocDesiredDescription(allocDiff.PreemptedByAllocation) allocCopy.DesiredStatus = structs.AllocDesiredStatusEvict } else { // If alloc is a stopped alloc - allocCopy.Job = planJob allocCopy.DesiredDescription = allocDiff.DesiredDescription allocCopy.DesiredStatus = structs.AllocDesiredStatusStop if allocDiff.ClientStatus != "" { diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 7db7a2c7ff90..d21e4bbdefbc 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -144,6 +144,7 @@ func TestStateStore_UpsertPlanResults_AllocationsCreated_Denormalized(t *testing // This test checks that: // 1) The job is denormalized // 2) Allocations are denormalized and updated with the diff +// That stopped allocs Job is unmodified func TestStateStore_UpsertPlanResults_AllocationsDenormalized(t *testing.T) { state := testStateStore(t) alloc := mock.Alloc() @@ -168,6 +169,12 @@ func TestStateStore_UpsertPlanResults_AllocationsDenormalized(t *testing.T) { require.NoError(state.UpsertAllocs(900, []*structs.Allocation{stoppedAlloc, preemptedAlloc})) require.NoError(state.UpsertJob(999, job)) + // modify job and ensure that stopped and preempted alloc point to original Job + mJob := job.Copy() + mJob.TaskGroups[0].Name = "other" + + require.NoError(state.UpsertJob(1001, mJob)) + eval := mock.Eval() eval.JobID = job.ID @@ -179,7 +186,7 @@ func TestStateStore_UpsertPlanResults_AllocationsDenormalized(t *testing.T) { AllocUpdateRequest: structs.AllocUpdateRequest{ AllocsUpdated: []*structs.Allocation{alloc}, AllocsStopped: []*structs.AllocationDiff{stoppedAllocDiff}, - Job: job, + Job: mJob, }, EvalID: eval.ID, AllocsPreempted: []*structs.AllocationDiff{preemptedAllocDiff}, @@ -194,6 +201,11 @@ func TestStateStore_UpsertPlanResults_AllocationsDenormalized(t *testing.T) { require.NoError(err) assert.Equal(alloc, out) + outJob, err := state.JobByID(ws, job.Namespace, job.ID) + require.NoError(err) + require.Equal(mJob.TaskGroups, outJob.TaskGroups) + require.NotEmpty(job.TaskGroups, outJob.TaskGroups) + updatedStoppedAlloc, err := state.AllocByID(ws, stoppedAlloc.ID) require.NoError(err) assert.Equal(stoppedAllocDiff.DesiredDescription, updatedStoppedAlloc.DesiredDescription) @@ -201,6 +213,7 @@ func TestStateStore_UpsertPlanResults_AllocationsDenormalized(t *testing.T) { assert.Equal(stoppedAllocDiff.ClientStatus, updatedStoppedAlloc.ClientStatus) assert.Equal(planModifyIndex, updatedStoppedAlloc.AllocModifyIndex) assert.Equal(planModifyIndex, updatedStoppedAlloc.AllocModifyIndex) + assert.Equal(job.TaskGroups, updatedStoppedAlloc.Job.TaskGroups) updatedPreemptedAlloc, err := state.AllocByID(ws, preemptedAlloc.ID) require.NoError(err) @@ -208,6 +221,7 @@ func TestStateStore_UpsertPlanResults_AllocationsDenormalized(t *testing.T) { assert.Equal(preemptedAllocDiff.PreemptedByAllocation, updatedPreemptedAlloc.PreemptedByAllocation) assert.Equal(planModifyIndex, updatedPreemptedAlloc.AllocModifyIndex) assert.Equal(planModifyIndex, updatedPreemptedAlloc.AllocModifyIndex) + assert.Equal(job.TaskGroups, updatedPreemptedAlloc.Job.TaskGroups) index, err := state.Index("allocs") require.NoError(err) @@ -219,6 +233,7 @@ func TestStateStore_UpsertPlanResults_AllocationsDenormalized(t *testing.T) { require.NoError(err) require.NotNil(evalOut) assert.EqualValues(planModifyIndex, evalOut.ModifyIndex) + } // This test checks that the deployment is created and allocations count towards @@ -7117,7 +7132,7 @@ func TestStateSnapshot_DenormalizeAllocationDiffSlice_AllocDoesNotExist(t *testi snap, err := state.Snapshot() require.NoError(err) - denormalizedAllocs, err := snap.DenormalizeAllocationDiffSlice(allocDiffs, alloc.Job) + denormalizedAllocs, err := snap.DenormalizeAllocationDiffSlice(allocDiffs) require.EqualError(err, fmt.Sprintf("alloc %v doesn't exist", alloc.ID)) require.Nil(denormalizedAllocs)