Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle Upgrades and Alloc.TaskResources modification #6922

Merged
merged 9 commits into from
Jan 28, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 4 additions & 20 deletions client/allocrunner/taskrunner/service_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,8 @@ func newServiceHook(c serviceHookConfig) *serviceHook {
delay: c.task.ShutdownDelay,
}

// COMPAT(0.11): AllocatedResources was added in 0.9 so assume its set
// in 0.11.
if c.alloc.AllocatedResources != nil {
if res := c.alloc.AllocatedResources.Tasks[c.task.Name]; res != nil {
h.networks = res.Networks
}
} else {
if res := c.alloc.TaskResources[c.task.Name]; res != nil {
h.networks = res.Networks
}
if res := c.alloc.AllocatedResources.Tasks[c.task.Name]; res != nil {
h.networks = res.Networks
}

if c.alloc.DeploymentStatus != nil && c.alloc.DeploymentStatus.Canary {
Expand Down Expand Up @@ -116,17 +108,9 @@ func (h *serviceHook) Update(ctx context.Context, req *interfaces.TaskUpdateRequ
canary = req.Alloc.DeploymentStatus.Canary
}

// COMPAT(0.11): AllocatedResources was added in 0.9 so assume its set
// in 0.11.
var networks structs.Networks
if req.Alloc.AllocatedResources != nil {
if res := req.Alloc.AllocatedResources.Tasks[h.taskName]; res != nil {
networks = res.Networks
}
} else {
if res := req.Alloc.TaskResources[h.taskName]; res != nil {
networks = res.Networks
}
if res := req.Alloc.AllocatedResources.Tasks[h.taskName]; res != nil {
networks = res.Networks
}

task := req.Alloc.LookupTask(h.taskName)
Expand Down
39 changes: 6 additions & 33 deletions client/allocrunner/taskrunner/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,23 +301,7 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) {
}
tr.taskResources = tres
} else {
// COMPAT(0.11): Upgrade from 0.8 resources to 0.9+ resources
// Grab the old task resources
oldTr, ok := tr.alloc.TaskResources[tr.taskName]
if !ok {
return nil, fmt.Errorf("no task resources found on allocation")
}

// Convert the old to new
tr.taskResources = &structs.AllocatedTaskResources{
Cpu: structs.AllocatedCpuResources{
CpuShares: int64(oldTr.CPU),
},
Memory: structs.AllocatedMemoryResources{
MemoryMB: int64(oldTr.MemoryMB),
},
Networks: oldTr.Networks,
}
return nil, fmt.Errorf("no task resources found on allocation")
notnoop marked this conversation as resolved.
Show resolved Hide resolved
}

// Build the restart tracker.
Expand Down Expand Up @@ -1253,15 +1237,9 @@ func (tr *TaskRunner) UpdateStats(ru *cstructs.TaskResourceUsage) {
func (tr *TaskRunner) setGaugeForMemory(ru *cstructs.TaskResourceUsage) {
alloc := tr.Alloc()
var allocatedMem float32
if alloc.AllocatedResources != nil {
if taskRes := alloc.AllocatedResources.Tasks[tr.taskName]; taskRes != nil {
// Convert to bytes to match other memory metrics
allocatedMem = float32(taskRes.Memory.MemoryMB) * 1024 * 1024
}
} else if taskRes := alloc.TaskResources[tr.taskName]; taskRes != nil {
// COMPAT(0.11) Remove in 0.11 when TaskResources is removed
allocatedMem = float32(taskRes.MemoryMB) * 1024 * 1024

if taskRes := alloc.AllocatedResources.Tasks[tr.taskName]; taskRes != nil {
// Convert to bytes to match other memory metrics
allocatedMem = float32(taskRes.Memory.MemoryMB) * 1024 * 1024
}

if !tr.clientConfig.DisableTaggedMetrics {
Expand Down Expand Up @@ -1303,13 +1281,8 @@ func (tr *TaskRunner) setGaugeForMemory(ru *cstructs.TaskResourceUsage) {
func (tr *TaskRunner) setGaugeForCPU(ru *cstructs.TaskResourceUsage) {
alloc := tr.Alloc()
var allocatedCPU float32
if alloc.AllocatedResources != nil {
if taskRes := alloc.AllocatedResources.Tasks[tr.taskName]; taskRes != nil {
allocatedCPU = float32(taskRes.Cpu.CpuShares)
}
} else if taskRes := alloc.TaskResources[tr.taskName]; taskRes != nil {
// COMPAT(0.11) Remove in 0.11 when TaskResources is removed
allocatedCPU = float32(taskRes.CPU)
if taskRes := alloc.AllocatedResources.Tasks[tr.taskName]; taskRes != nil {
allocatedCPU = float32(taskRes.Cpu.CpuShares)
}

if !tr.clientConfig.DisableTaggedMetrics {
Expand Down
4 changes: 4 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1995,6 +1995,10 @@ OUTER:
// Ensure that we received all the allocations we wanted
pulledAllocs = make(map[string]*structs.Allocation, len(allocsResp.Allocs))
for _, alloc := range allocsResp.Allocs {

// handle an old Server
alloc.Canonicalize()

pulledAllocs[alloc.ID] = alloc
}

Expand Down
3 changes: 3 additions & 0 deletions client/state/state_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,9 @@ func (s *BoltStateDB) getAllAllocations(tx *boltdd.Tx) ([]*structs.Allocation, m
continue
}

// Handle upgrade path
ae.Alloc.Canonicalize()

allocs = append(allocs, ae.Alloc)
}

Expand Down
25 changes: 0 additions & 25 deletions client/taskenv/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,6 @@ func (b *Builder) setAlloc(alloc *structs.Allocation) *Builder {

tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)

// COMPAT(0.11): Remove in 0.11
b.otherPorts = make(map[string]string, len(tg.Tasks)*2)
if alloc.AllocatedResources != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this check anymore? Seems like if AllocatedResources is somehow nil here we'll end up with an incomplete task environment which would be harder to detect in testing than a panic.

Sadly we don't have a logger or error return value here, so our options are limited. If you think risking an incomplete environment is better than risking a panic, let's just add that as a comment here as the code will look kind of strange down the road.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept this conditional because TestClient_AddAllocError asserts that when an invalid alloc is passed, taskenv doesn't panic and NewTaskRunner returns an error. Not sure what the conditions the test actually tests for.

// Populate task resources
Expand Down Expand Up @@ -645,30 +644,6 @@ func (b *Builder) setAlloc(alloc *structs.Allocation) *Builder {
addGroupPort(b.otherPorts, p)
}
}
} else if alloc.TaskResources != nil {
if tr, ok := alloc.TaskResources[b.taskName]; ok {
// Copy networks to prevent sharing
b.networks = make([]*structs.NetworkResource, len(tr.Networks))
for i, n := range tr.Networks {
b.networks[i] = n.Copy()
}

}

for taskName, resources := range alloc.TaskResources {
// Add ports from other tasks
if taskName == b.taskName {
continue
}
for _, nw := range resources.Networks {
for _, p := range nw.ReservedPorts {
addPort(b.otherPorts, taskName, nw.IP, p.Label, p.Value)
}
for _, p := range nw.DynamicPorts {
addPort(b.otherPorts, taskName, nw.IP, p.Label, p.Value)
}
}
}
}

upstreams := []structs.ConsulUpstream{}
Expand Down
4 changes: 4 additions & 0 deletions client/taskenv/env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,10 @@ func TestEnvironment_AsList_Old(t *testing.T) {
},
},
}

// simulate canonicalization on restore or fetch
a.Canonicalize()

task := a.Job.TaskGroups[0].Tasks[0]
task.Env = map[string]string{
"taskEnvKey": "taskEnvVal",
Expand Down
2 changes: 1 addition & 1 deletion command/alloc_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (c *AllocStatusCommand) Run(args []string) int {
}
c.Ui.Output(output)

if len(alloc.AllocatedResources.Shared.Networks) > 0 && alloc.AllocatedResources.Shared.Networks[0].HasPorts() {
schmichael marked this conversation as resolved.
Show resolved Hide resolved
if alloc.AllocatedResources != nil && len(alloc.AllocatedResources.Shared.Networks) > 0 && alloc.AllocatedResources.Shared.Networks[0].HasPorts() {
c.Ui.Output("")
c.Ui.Output(formatAllocNetworkInfo(alloc))
}
Expand Down
28 changes: 16 additions & 12 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -681,22 +681,23 @@ func (n *nomadFSM) applyAllocUpdate(buf []byte, index uint64) interface{} {
// prior to being inserted into MemDB.
structs.DenormalizeAllocationJobs(req.Job, req.Alloc)

// COMPAT(0.11): Remove in 0.11
// Calculate the total resources of allocations. It is pulled out in the
// payload to avoid encoding something that can be computed, but should be
// denormalized prior to being inserted into MemDB.
for _, alloc := range req.Alloc {
if alloc.Resources != nil {
continue
}
// COMPAT(0.11): Remove in 0.11
// Calculate the total resources of allocations. It is pulled out in the
// payload to avoid encoding something that can be computed, but should be
// denormalized prior to being inserted into MemDB.
if alloc.Resources == nil {
alloc.Resources = new(structs.Resources)
for _, task := range alloc.TaskResources {
alloc.Resources.Add(task)
}

alloc.Resources = new(structs.Resources)
for _, task := range alloc.TaskResources {
alloc.Resources.Add(task)
// Add the shared resources
alloc.Resources.Add(alloc.SharedResources)
}

// Add the shared resources
alloc.Resources.Add(alloc.SharedResources)
// Handle upgrade path
alloc.Canonicalize()
}

if err := n.state.UpsertAllocs(index, req.Alloc); err != nil {
Expand Down Expand Up @@ -1166,6 +1167,9 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error {
return err
}

// Handle upgrade path
alloc.Canonicalize()

if err := restore.AllocRestore(alloc); err != nil {
return err
}
Expand Down
66 changes: 66 additions & 0 deletions nomad/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1379,6 +1379,45 @@ func TestFSM_UpsertAllocs_StrippedResources(t *testing.T) {
}
}

// TestFSM_UpsertAllocs_Canonicalize asserts that allocations are Canonicalized
// to handle logs emited by servers running old versions
func TestFSM_UpsertAllocs_Canonicalize(t *testing.T) {
t.Parallel()
fsm := testFSM(t)

alloc := mock.Alloc()
alloc.Resources = &structs.Resources{} // COMPAT(0.11): Remove in 0.11, used to bypass resource creation in state store
alloc.AllocatedResources = nil

// pre-assert that our mock populates old field
require.NotEmpty(t, alloc.TaskResources)

fsm.State().UpsertJobSummary(1, mock.JobSummary(alloc.JobID))
req := structs.AllocUpdateRequest{
Alloc: []*structs.Allocation{alloc},
}
buf, err := structs.Encode(structs.AllocUpdateRequestType, req)
require.NoError(t, err)

resp := fsm.Apply(makeLog(buf))
require.Nil(t, resp)

// Verify we are registered
ws := memdb.NewWatchSet()
out, err := fsm.State().AllocByID(ws, alloc.ID)
require.NoError(t, err)

require.NotNil(t, out.AllocatedResources)
require.Contains(t, out.AllocatedResources.Tasks, "web")

expected := alloc.Copy()
expected.Canonicalize()
expected.CreateIndex = out.CreateIndex
expected.ModifyIndex = out.ModifyIndex
expected.AllocModifyIndex = out.AllocModifyIndex
require.Equal(t, expected, out)
}

func TestFSM_UpdateAllocFromClient_Unblock(t *testing.T) {
t.Parallel()
fsm := testFSM(t)
Expand Down Expand Up @@ -2453,6 +2492,33 @@ func TestFSM_SnapshotRestore_Allocs(t *testing.T) {
}
}

func TestFSM_SnapshotRestore_Allocs_Canonicalize(t *testing.T) {
t.Parallel()
// Add some state
fsm := testFSM(t)
state := fsm.State()
alloc := mock.Alloc()

// remove old versions to force migration path
alloc.AllocatedResources = nil

state.UpsertJobSummary(998, mock.JobSummary(alloc.JobID))
state.UpsertAllocs(1000, []*structs.Allocation{alloc})

// Verify the contents
fsm2 := testSnapshotRestore(t, fsm)
state2 := fsm2.State()
ws := memdb.NewWatchSet()
out, err := state2.AllocByID(ws, alloc.ID)
require.NoError(t, err)

require.NotNil(t, out.AllocatedResources)
require.Contains(t, out.AllocatedResources.Tasks, "web")

alloc.Canonicalize()
require.Equal(t, alloc, out)
}

func TestFSM_SnapshotRestore_Indexes(t *testing.T) {
t.Parallel()
// Add some state
Expand Down
39 changes: 37 additions & 2 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -7527,15 +7527,17 @@ type Allocation struct {
// the scheduler.
Resources *Resources

// COMPAT(0.11): Remove in 0.11
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't this be COMPAT(0.12): Remove in 0.12? If both clients and servers upgrade-allocs-on-restore in 0.11, what could still have AllocatedResources==nil in 0.12 (and therefore need these fields to populate AllocRes)? It seems like the only problem would be if someone used a 0.10 or earlier agents with 0.12 code.

(Not that we have to remove it in 0.12, I'm just curious if we could while maintaining our +/-1 Y version safety.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If both clients and servers upgrade-allocs-on-restore in 0.11, what could still have AllocatedResources==nil in 0.12 (and therefore need these fields to populate AllocRes)?

If both clients and servers upgrade the on-disk representation, then yes. But we currently don't do that, neither will with this PR. Consider the case where a cluster starts with nomad 0.8; then operator upgrades in rapid short successions through 0.9, 0.10 (with this PR), 0.11, and then to 0.12 - so fast such that Raft didn't generate a Snapshot during these upgrades. In this case, Nomad 0.12 will read the representation that was persisted by 0.8 and lacks AllocatedResources.

To be able to fully remove it, we must augment the recommended upgrade path to ensure on-disk representation get upgraded before a user can do a subsequent upgrade.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we Raft Snapshot on server agent startup?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can consider it - we'll need to do some vetting and testing before going there. Agent startup is critical to cluster recovery and I'd be nervous about adding a blocking call that may fail there; if done asynchronously, we'd need to properly indicate to operators when it's safe to safe to upgrade and potentially cope with operators potentially ignore the warning. Maybe consider it as part of 0.12?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, a blocking nomad operator ... command might be sufficient + upgrade instructions to use it if "rapidly" upgrading from 0.N to 0.N+2.

Definitely seems best to leave it out of scope for this effort, but could you file an issue describing how its currently impossible to safely remove deprecated fields that are persisted to raft? Doesn't seem like anything we need to rush to fix, but I can see it mattering a lot more post-1.0 when people are much slower to upgrade (Consul struggles with this), and may want options to upgrade from 1.N to 1.N+x (where x > 1) quickly and easily.

// SharedResources are the resources that are shared by all the tasks in an
// allocation
// Deprecated: use AllocatedResources.Shared instead.
// Keep field to allow us to handle upgrade paths from old versions
SharedResources *Resources

// COMPAT(0.11): Remove in 0.11
// TaskResources is the set of resources allocated to each
// task. These should sum to the total Resources. Dynamic ports will be
// set by the scheduler.
// Deprecated: use AllocatedResources.Tasks instead.
// Keep field to allow us to handle upgrade paths from old versions
TaskResources map[string]*Resources

// AllocatedResources is the total resources allocated for the task group.
Expand Down Expand Up @@ -7632,6 +7634,39 @@ func (a *Allocation) CopySkipJob() *Allocation {
return a.copyImpl(false)
}

func (a *Allocation) Canonicalize() {
notnoop marked this conversation as resolved.
Show resolved Hide resolved
if a.AllocatedResources == nil && a.TaskResources != nil {
ar := AllocatedResources{}
ar.Tasks = toAllocatedResources(a.TaskResources)

if a.SharedResources != nil {
tgross marked this conversation as resolved.
Show resolved Hide resolved
ar.Shared.DiskMB = int64(a.SharedResources.DiskMB)
ar.Shared.Networks = a.SharedResources.Networks.Copy()
}

a.AllocatedResources = &ar
}

// TODO: Investigate if we should canonicalize the job
// it may be out of sync with respect to the original job
// a.Job.Canonicalize()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// a.Job.Canonicalize()
a.Job.Canonicalize()

I think this is the right thing to do:

  • On clients they may have an old version of a job persisted that may not match the expectations of the code running in the agent.
  • On servers I actually don't remember off the top of my head if Jobs are persisted on Allocs (are allocs normalized only in raft messages on the wire, or in state store too 🤔 ?). Luckily this doesn't matter! Job.Canonicalize handles job being nil and Jobs are already canonicalized on statestore restore.

So I think we might waste a couple CPU instructions, but it seems necessary on clients at least.

}

func toAllocatedResources(taskResources map[string]*Resources) map[string]*AllocatedTaskResources {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want to just inline this code into Canonicalize as it seems easy to get orphaned here if we are able to remove the canonicalization in the future.

tasks := make(map[string]*AllocatedTaskResources, len(taskResources))

for name, tr := range taskResources {
atr := AllocatedTaskResources{}
atr.Cpu.CpuShares = int64(tr.CPU)
atr.Memory.MemoryMB = int64(tr.MemoryMB)
atr.Networks = tr.Networks.Copy()

tasks[name] = &atr
}

return tasks
}

func (a *Allocation) copyImpl(job bool) *Allocation {
if a == nil {
return nil
Expand Down
Loading