diff --git a/client/allocrunner/taskrunner/service_hook.go b/client/allocrunner/taskrunner/service_hook.go index 956033f3f315..02b8d75b71a0 100644 --- a/client/allocrunner/taskrunner/service_hook.go +++ b/client/allocrunner/taskrunner/service_hook.go @@ -63,16 +63,8 @@ func newServiceHook(c serviceHookConfig) *serviceHook { delay: c.task.ShutdownDelay, } - // COMPAT(0.11): AllocatedResources was added in 0.9 so assume its set - // in 0.11. - if c.alloc.AllocatedResources != nil { - if res := c.alloc.AllocatedResources.Tasks[c.task.Name]; res != nil { - h.networks = res.Networks - } - } else { - if res := c.alloc.TaskResources[c.task.Name]; res != nil { - h.networks = res.Networks - } + if res := c.alloc.AllocatedResources.Tasks[c.task.Name]; res != nil { + h.networks = res.Networks } if c.alloc.DeploymentStatus != nil && c.alloc.DeploymentStatus.Canary { @@ -116,17 +108,9 @@ func (h *serviceHook) Update(ctx context.Context, req *interfaces.TaskUpdateRequ canary = req.Alloc.DeploymentStatus.Canary } - // COMPAT(0.11): AllocatedResources was added in 0.9 so assume its set - // in 0.11. var networks structs.Networks - if req.Alloc.AllocatedResources != nil { - if res := req.Alloc.AllocatedResources.Tasks[h.taskName]; res != nil { - networks = res.Networks - } - } else { - if res := req.Alloc.TaskResources[h.taskName]; res != nil { - networks = res.Networks - } + if res := req.Alloc.AllocatedResources.Tasks[h.taskName]; res != nil { + networks = res.Networks } task := req.Alloc.LookupTask(h.taskName) diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index 9f6e5ac34cb6..bb6c6a445963 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -294,31 +294,15 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) { // Pull out the task's resources ares := tr.alloc.AllocatedResources - if ares != nil { - tres, ok := ares.Tasks[tr.taskName] - if !ok { - return nil, fmt.Errorf("no task resources found on allocation") - } - tr.taskResources = tres - } else { - // COMPAT(0.11): Upgrade from 0.8 resources to 0.9+ resources - // Grab the old task resources - oldTr, ok := tr.alloc.TaskResources[tr.taskName] - if !ok { - return nil, fmt.Errorf("no task resources found on allocation") - } + if ares == nil { + return nil, fmt.Errorf("no task resources found on allocation") + } - // Convert the old to new - tr.taskResources = &structs.AllocatedTaskResources{ - Cpu: structs.AllocatedCpuResources{ - CpuShares: int64(oldTr.CPU), - }, - Memory: structs.AllocatedMemoryResources{ - MemoryMB: int64(oldTr.MemoryMB), - }, - Networks: oldTr.Networks, - } + tres, ok := ares.Tasks[tr.taskName] + if !ok { + return nil, fmt.Errorf("no task resources found on allocation") } + tr.taskResources = tres // Build the restart tracker. tg := tr.alloc.Job.LookupTaskGroup(tr.alloc.TaskGroup) @@ -1253,15 +1237,9 @@ func (tr *TaskRunner) UpdateStats(ru *cstructs.TaskResourceUsage) { func (tr *TaskRunner) setGaugeForMemory(ru *cstructs.TaskResourceUsage) { alloc := tr.Alloc() var allocatedMem float32 - if alloc.AllocatedResources != nil { - if taskRes := alloc.AllocatedResources.Tasks[tr.taskName]; taskRes != nil { - // Convert to bytes to match other memory metrics - allocatedMem = float32(taskRes.Memory.MemoryMB) * 1024 * 1024 - } - } else if taskRes := alloc.TaskResources[tr.taskName]; taskRes != nil { - // COMPAT(0.11) Remove in 0.11 when TaskResources is removed - allocatedMem = float32(taskRes.MemoryMB) * 1024 * 1024 - + if taskRes := alloc.AllocatedResources.Tasks[tr.taskName]; taskRes != nil { + // Convert to bytes to match other memory metrics + allocatedMem = float32(taskRes.Memory.MemoryMB) * 1024 * 1024 } if !tr.clientConfig.DisableTaggedMetrics { @@ -1303,13 +1281,8 @@ func (tr *TaskRunner) setGaugeForMemory(ru *cstructs.TaskResourceUsage) { func (tr *TaskRunner) setGaugeForCPU(ru *cstructs.TaskResourceUsage) { alloc := tr.Alloc() var allocatedCPU float32 - if alloc.AllocatedResources != nil { - if taskRes := alloc.AllocatedResources.Tasks[tr.taskName]; taskRes != nil { - allocatedCPU = float32(taskRes.Cpu.CpuShares) - } - } else if taskRes := alloc.TaskResources[tr.taskName]; taskRes != nil { - // COMPAT(0.11) Remove in 0.11 when TaskResources is removed - allocatedCPU = float32(taskRes.CPU) + if taskRes := alloc.AllocatedResources.Tasks[tr.taskName]; taskRes != nil { + allocatedCPU = float32(taskRes.Cpu.CpuShares) } if !tr.clientConfig.DisableTaggedMetrics { diff --git a/client/client.go b/client/client.go index fe7447dcacf1..f23cc1efdc9c 100644 --- a/client/client.go +++ b/client/client.go @@ -1995,6 +1995,10 @@ OUTER: // Ensure that we received all the allocations we wanted pulledAllocs = make(map[string]*structs.Allocation, len(allocsResp.Allocs)) for _, alloc := range allocsResp.Allocs { + + // handle an old Server + alloc.Canonicalize() + pulledAllocs[alloc.ID] = alloc } diff --git a/client/state/db_test.go b/client/state/db_test.go index 230c7ad869eb..c37ed8bd27cc 100644 --- a/client/state/db_test.go +++ b/client/state/db_test.go @@ -76,12 +76,6 @@ func TestStateDB_Allocations(t *testing.T) { alloc1 := mock.Alloc() alloc2 := mock.BatchAlloc() - //XXX Sadly roundtripping allocs loses time.Duration type - // information from the Config map[string]interface{}. As - // the mock driver itself with unmarshal run_for into the - // proper type, we can safely ignore it here. - delete(alloc2.Job.TaskGroups[0].Tasks[0].Config, "run_for") - require.NoError(db.PutAllocation(alloc1)) require.NoError(db.PutAllocation(alloc2)) diff --git a/client/state/state_database.go b/client/state/state_database.go index d6784e98ee53..6d1e65fb2972 100644 --- a/client/state/state_database.go +++ b/client/state/state_database.go @@ -207,6 +207,10 @@ func (s *BoltStateDB) getAllAllocations(tx *boltdd.Tx) ([]*structs.Allocation, m continue } + // Handle upgrade path + ae.Alloc.Canonicalize() + ae.Alloc.Job.Canonicalize() + allocs = append(allocs, ae.Alloc) } diff --git a/client/taskenv/env.go b/client/taskenv/env.go index 57402b442cf1..8612651c3e46 100644 --- a/client/taskenv/env.go +++ b/client/taskenv/env.go @@ -603,8 +603,10 @@ func (b *Builder) setAlloc(alloc *structs.Allocation) *Builder { tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) - // COMPAT(0.11): Remove in 0.11 b.otherPorts = make(map[string]string, len(tg.Tasks)*2) + + // Protect against invalid allocs where AllocatedResources isn't set. + // TestClient_AddAllocError explicitly tests for this condition if alloc.AllocatedResources != nil { // Populate task resources if tr, ok := alloc.AllocatedResources.Tasks[b.taskName]; ok { @@ -645,30 +647,6 @@ func (b *Builder) setAlloc(alloc *structs.Allocation) *Builder { addGroupPort(b.otherPorts, p) } } - } else if alloc.TaskResources != nil { - if tr, ok := alloc.TaskResources[b.taskName]; ok { - // Copy networks to prevent sharing - b.networks = make([]*structs.NetworkResource, len(tr.Networks)) - for i, n := range tr.Networks { - b.networks[i] = n.Copy() - } - - } - - for taskName, resources := range alloc.TaskResources { - // Add ports from other tasks - if taskName == b.taskName { - continue - } - for _, nw := range resources.Networks { - for _, p := range nw.ReservedPorts { - addPort(b.otherPorts, taskName, nw.IP, p.Label, p.Value) - } - for _, p := range nw.DynamicPorts { - addPort(b.otherPorts, taskName, nw.IP, p.Label, p.Value) - } - } - } } upstreams := []structs.ConsulUpstream{} diff --git a/client/taskenv/env_test.go b/client/taskenv/env_test.go index f68bb05d7950..035291436281 100644 --- a/client/taskenv/env_test.go +++ b/client/taskenv/env_test.go @@ -266,6 +266,10 @@ func TestEnvironment_AsList_Old(t *testing.T) { }, }, } + + // simulate canonicalization on restore or fetch + a.Canonicalize() + task := a.Job.TaskGroups[0].Tasks[0] task.Env = map[string]string{ "taskEnvKey": "taskEnvVal", diff --git a/command/alloc_status.go b/command/alloc_status.go index 1184534c6a39..3ea543d90521 100644 --- a/command/alloc_status.go +++ b/command/alloc_status.go @@ -191,7 +191,7 @@ func (c *AllocStatusCommand) Run(args []string) int { } c.Ui.Output(output) - if len(alloc.AllocatedResources.Shared.Networks) > 0 && alloc.AllocatedResources.Shared.Networks[0].HasPorts() { + if alloc.AllocatedResources != nil && len(alloc.AllocatedResources.Shared.Networks) > 0 && alloc.AllocatedResources.Shared.Networks[0].HasPorts() { c.Ui.Output("") c.Ui.Output(formatAllocNetworkInfo(alloc)) } diff --git a/nomad/fsm.go b/nomad/fsm.go index c41e5b9d1b43..fbe9b2342be3 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -681,22 +681,23 @@ func (n *nomadFSM) applyAllocUpdate(buf []byte, index uint64) interface{} { // prior to being inserted into MemDB. structs.DenormalizeAllocationJobs(req.Job, req.Alloc) - // COMPAT(0.11): Remove in 0.11 - // Calculate the total resources of allocations. It is pulled out in the - // payload to avoid encoding something that can be computed, but should be - // denormalized prior to being inserted into MemDB. for _, alloc := range req.Alloc { - if alloc.Resources != nil { - continue - } + // COMPAT(0.11): Remove in 0.11 + // Calculate the total resources of allocations. It is pulled out in the + // payload to avoid encoding something that can be computed, but should be + // denormalized prior to being inserted into MemDB. + if alloc.Resources == nil { + alloc.Resources = new(structs.Resources) + for _, task := range alloc.TaskResources { + alloc.Resources.Add(task) + } - alloc.Resources = new(structs.Resources) - for _, task := range alloc.TaskResources { - alloc.Resources.Add(task) + // Add the shared resources + alloc.Resources.Add(alloc.SharedResources) } - // Add the shared resources - alloc.Resources.Add(alloc.SharedResources) + // Handle upgrade path + alloc.Canonicalize() } if err := n.state.UpsertAllocs(index, req.Alloc); err != nil { @@ -1166,6 +1167,9 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { return err } + // Handle upgrade path + alloc.Canonicalize() + if err := restore.AllocRestore(alloc); err != nil { return err } diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index d75ba1ee3965..a9d712c6e38d 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -1379,6 +1379,45 @@ func TestFSM_UpsertAllocs_StrippedResources(t *testing.T) { } } +// TestFSM_UpsertAllocs_Canonicalize asserts that allocations are Canonicalized +// to handle logs emited by servers running old versions +func TestFSM_UpsertAllocs_Canonicalize(t *testing.T) { + t.Parallel() + fsm := testFSM(t) + + alloc := mock.Alloc() + alloc.Resources = &structs.Resources{} // COMPAT(0.11): Remove in 0.11, used to bypass resource creation in state store + alloc.AllocatedResources = nil + + // pre-assert that our mock populates old field + require.NotEmpty(t, alloc.TaskResources) + + fsm.State().UpsertJobSummary(1, mock.JobSummary(alloc.JobID)) + req := structs.AllocUpdateRequest{ + Alloc: []*structs.Allocation{alloc}, + } + buf, err := structs.Encode(structs.AllocUpdateRequestType, req) + require.NoError(t, err) + + resp := fsm.Apply(makeLog(buf)) + require.Nil(t, resp) + + // Verify we are registered + ws := memdb.NewWatchSet() + out, err := fsm.State().AllocByID(ws, alloc.ID) + require.NoError(t, err) + + require.NotNil(t, out.AllocatedResources) + require.Contains(t, out.AllocatedResources.Tasks, "web") + + expected := alloc.Copy() + expected.Canonicalize() + expected.CreateIndex = out.CreateIndex + expected.ModifyIndex = out.ModifyIndex + expected.AllocModifyIndex = out.AllocModifyIndex + require.Equal(t, expected, out) +} + func TestFSM_UpdateAllocFromClient_Unblock(t *testing.T) { t.Parallel() fsm := testFSM(t) @@ -2453,6 +2492,33 @@ func TestFSM_SnapshotRestore_Allocs(t *testing.T) { } } +func TestFSM_SnapshotRestore_Allocs_Canonicalize(t *testing.T) { + t.Parallel() + // Add some state + fsm := testFSM(t) + state := fsm.State() + alloc := mock.Alloc() + + // remove old versions to force migration path + alloc.AllocatedResources = nil + + state.UpsertJobSummary(998, mock.JobSummary(alloc.JobID)) + state.UpsertAllocs(1000, []*structs.Allocation{alloc}) + + // Verify the contents + fsm2 := testSnapshotRestore(t, fsm) + state2 := fsm2.State() + ws := memdb.NewWatchSet() + out, err := state2.AllocByID(ws, alloc.ID) + require.NoError(t, err) + + require.NotNil(t, out.AllocatedResources) + require.Contains(t, out.AllocatedResources.Tasks, "web") + + alloc.Canonicalize() + require.Equal(t, alloc, out) +} + func TestFSM_SnapshotRestore_Indexes(t *testing.T) { t.Parallel() // Add some state diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 9d915536df97..1590a3fee3ef 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -294,6 +294,11 @@ func (s *StateStore) UpsertPlanResults(index uint64, results *structs.ApplyPlanR allocsToUpsert = append(allocsToUpsert, results.AllocsUpdated...) allocsToUpsert = append(allocsToUpsert, allocsPreempted...) + // handle upgrade path + for _, alloc := range allocsToUpsert { + alloc.Canonicalize() + } + if err := s.upsertAllocsImpl(index, allocsToUpsert, txn); err != nil { return err } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 4c6b7c4efbd3..9938b938b9db 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -7527,15 +7527,17 @@ type Allocation struct { // the scheduler. Resources *Resources - // COMPAT(0.11): Remove in 0.11 // SharedResources are the resources that are shared by all the tasks in an // allocation + // Deprecated: use AllocatedResources.Shared instead. + // Keep field to allow us to handle upgrade paths from old versions SharedResources *Resources - // COMPAT(0.11): Remove in 0.11 // TaskResources is the set of resources allocated to each // task. These should sum to the total Resources. Dynamic ports will be // set by the scheduler. + // Deprecated: use AllocatedResources.Tasks instead. + // Keep field to allow us to handle upgrade paths from old versions TaskResources map[string]*Resources // AllocatedResources is the total resources allocated for the task group. @@ -7632,6 +7634,36 @@ func (a *Allocation) CopySkipJob() *Allocation { return a.copyImpl(false) } +// Canonicalize Allocation to ensure fields are initialized to the expectations +// of this version of Nomad. Should be called when restoring persisted +// Allocations or receiving Allocations from Nomad agents potentially on an +// older version of Nomad. +func (a *Allocation) Canonicalize() { + if a.AllocatedResources == nil && a.TaskResources != nil { + ar := AllocatedResources{} + + tasks := make(map[string]*AllocatedTaskResources, len(a.TaskResources)) + for name, tr := range a.TaskResources { + atr := AllocatedTaskResources{} + atr.Cpu.CpuShares = int64(tr.CPU) + atr.Memory.MemoryMB = int64(tr.MemoryMB) + atr.Networks = tr.Networks.Copy() + + tasks[name] = &atr + } + ar.Tasks = tasks + + if a.SharedResources != nil { + ar.Shared.DiskMB = int64(a.SharedResources.DiskMB) + ar.Shared.Networks = a.SharedResources.Networks.Copy() + } + + a.AllocatedResources = &ar + } + + a.Job.Canonicalize() +} + func (a *Allocation) copyImpl(job bool) *Allocation { if a == nil { return nil diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index dcb2dfcc82b5..e67dc4b5230b 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -4089,6 +4089,67 @@ func TestAllocation_NextDelay(t *testing.T) { } +func TestAllocation_Canonicalize_Old(t *testing.T) { + alloc := MockAlloc() + alloc.AllocatedResources = nil + alloc.TaskResources = map[string]*Resources{ + "web": { + CPU: 500, + MemoryMB: 256, + Networks: []*NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + ReservedPorts: []Port{{Label: "admin", Value: 5000}}, + MBits: 50, + DynamicPorts: []Port{{Label: "http", Value: 9876}}, + }, + }, + }, + } + alloc.SharedResources = &Resources{ + DiskMB: 150, + } + alloc.Canonicalize() + + expected := &AllocatedResources{ + Tasks: map[string]*AllocatedTaskResources{ + "web": { + Cpu: AllocatedCpuResources{ + CpuShares: 500, + }, + Memory: AllocatedMemoryResources{ + MemoryMB: 256, + }, + Networks: []*NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + ReservedPorts: []Port{{Label: "admin", Value: 5000}}, + MBits: 50, + DynamicPorts: []Port{{Label: "http", Value: 9876}}, + }, + }, + }, + }, + Shared: AllocatedSharedResources{ + DiskMB: 150, + }, + } + + require.Equal(t, expected, alloc.AllocatedResources) +} + +// TestAllocation_Canonicalize_New asserts that an alloc with latest +// schema isn't modified with Canonicalize +func TestAllocation_Canonicalize_New(t *testing.T) { + alloc := MockAlloc() + copy := alloc.Copy() + + alloc.Canonicalize() + require.Equal(t, copy, alloc) +} + func TestRescheduleTracker_Copy(t *testing.T) { type testCase struct { original *RescheduleTracker