Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle Upgrades and Alloc.TaskResources modification #6922

Merged
merged 9 commits into from
Jan 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 4 additions & 20 deletions client/allocrunner/taskrunner/service_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,8 @@ func newServiceHook(c serviceHookConfig) *serviceHook {
delay: c.task.ShutdownDelay,
}

// COMPAT(0.11): AllocatedResources was added in 0.9 so assume its set
// in 0.11.
if c.alloc.AllocatedResources != nil {
if res := c.alloc.AllocatedResources.Tasks[c.task.Name]; res != nil {
h.networks = res.Networks
}
} else {
if res := c.alloc.TaskResources[c.task.Name]; res != nil {
h.networks = res.Networks
}
if res := c.alloc.AllocatedResources.Tasks[c.task.Name]; res != nil {
h.networks = res.Networks
}

if c.alloc.DeploymentStatus != nil && c.alloc.DeploymentStatus.Canary {
Expand Down Expand Up @@ -116,17 +108,9 @@ func (h *serviceHook) Update(ctx context.Context, req *interfaces.TaskUpdateRequ
canary = req.Alloc.DeploymentStatus.Canary
}

// COMPAT(0.11): AllocatedResources was added in 0.9 so assume its set
// in 0.11.
var networks structs.Networks
if req.Alloc.AllocatedResources != nil {
if res := req.Alloc.AllocatedResources.Tasks[h.taskName]; res != nil {
networks = res.Networks
}
} else {
if res := req.Alloc.TaskResources[h.taskName]; res != nil {
networks = res.Networks
}
if res := req.Alloc.AllocatedResources.Tasks[h.taskName]; res != nil {
networks = res.Networks
}

task := req.Alloc.LookupTask(h.taskName)
Expand Down
51 changes: 12 additions & 39 deletions client/allocrunner/taskrunner/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,31 +294,15 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) {

// Pull out the task's resources
ares := tr.alloc.AllocatedResources
if ares != nil {
tres, ok := ares.Tasks[tr.taskName]
if !ok {
return nil, fmt.Errorf("no task resources found on allocation")
}
tr.taskResources = tres
} else {
// COMPAT(0.11): Upgrade from 0.8 resources to 0.9+ resources
// Grab the old task resources
oldTr, ok := tr.alloc.TaskResources[tr.taskName]
if !ok {
return nil, fmt.Errorf("no task resources found on allocation")
}
if ares == nil {
return nil, fmt.Errorf("no task resources found on allocation")
notnoop marked this conversation as resolved.
Show resolved Hide resolved
}

// Convert the old to new
tr.taskResources = &structs.AllocatedTaskResources{
Cpu: structs.AllocatedCpuResources{
CpuShares: int64(oldTr.CPU),
},
Memory: structs.AllocatedMemoryResources{
MemoryMB: int64(oldTr.MemoryMB),
},
Networks: oldTr.Networks,
}
tres, ok := ares.Tasks[tr.taskName]
if !ok {
return nil, fmt.Errorf("no task resources found on allocation")
}
tr.taskResources = tres

// Build the restart tracker.
tg := tr.alloc.Job.LookupTaskGroup(tr.alloc.TaskGroup)
Expand Down Expand Up @@ -1253,15 +1237,9 @@ func (tr *TaskRunner) UpdateStats(ru *cstructs.TaskResourceUsage) {
func (tr *TaskRunner) setGaugeForMemory(ru *cstructs.TaskResourceUsage) {
alloc := tr.Alloc()
var allocatedMem float32
if alloc.AllocatedResources != nil {
if taskRes := alloc.AllocatedResources.Tasks[tr.taskName]; taskRes != nil {
// Convert to bytes to match other memory metrics
allocatedMem = float32(taskRes.Memory.MemoryMB) * 1024 * 1024
}
} else if taskRes := alloc.TaskResources[tr.taskName]; taskRes != nil {
// COMPAT(0.11) Remove in 0.11 when TaskResources is removed
allocatedMem = float32(taskRes.MemoryMB) * 1024 * 1024

if taskRes := alloc.AllocatedResources.Tasks[tr.taskName]; taskRes != nil {
// Convert to bytes to match other memory metrics
allocatedMem = float32(taskRes.Memory.MemoryMB) * 1024 * 1024
}

if !tr.clientConfig.DisableTaggedMetrics {
Expand Down Expand Up @@ -1303,13 +1281,8 @@ func (tr *TaskRunner) setGaugeForMemory(ru *cstructs.TaskResourceUsage) {
func (tr *TaskRunner) setGaugeForCPU(ru *cstructs.TaskResourceUsage) {
alloc := tr.Alloc()
var allocatedCPU float32
if alloc.AllocatedResources != nil {
if taskRes := alloc.AllocatedResources.Tasks[tr.taskName]; taskRes != nil {
allocatedCPU = float32(taskRes.Cpu.CpuShares)
}
} else if taskRes := alloc.TaskResources[tr.taskName]; taskRes != nil {
// COMPAT(0.11) Remove in 0.11 when TaskResources is removed
allocatedCPU = float32(taskRes.CPU)
if taskRes := alloc.AllocatedResources.Tasks[tr.taskName]; taskRes != nil {
allocatedCPU = float32(taskRes.Cpu.CpuShares)
}

if !tr.clientConfig.DisableTaggedMetrics {
Expand Down
4 changes: 4 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1995,6 +1995,10 @@ OUTER:
// Ensure that we received all the allocations we wanted
pulledAllocs = make(map[string]*structs.Allocation, len(allocsResp.Allocs))
for _, alloc := range allocsResp.Allocs {

// handle an old Server
alloc.Canonicalize()

pulledAllocs[alloc.ID] = alloc
}

Expand Down
6 changes: 0 additions & 6 deletions client/state/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,6 @@ func TestStateDB_Allocations(t *testing.T) {
alloc1 := mock.Alloc()
alloc2 := mock.BatchAlloc()

//XXX Sadly roundtripping allocs loses time.Duration type
// information from the Config map[string]interface{}. As
// the mock driver itself with unmarshal run_for into the
// proper type, we can safely ignore it here.
delete(alloc2.Job.TaskGroups[0].Tasks[0].Config, "run_for")

require.NoError(db.PutAllocation(alloc1))
require.NoError(db.PutAllocation(alloc2))

Expand Down
4 changes: 4 additions & 0 deletions client/state/state_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,10 @@ func (s *BoltStateDB) getAllAllocations(tx *boltdd.Tx) ([]*structs.Allocation, m
continue
}

// Handle upgrade path
ae.Alloc.Canonicalize()
ae.Alloc.Job.Canonicalize()

allocs = append(allocs, ae.Alloc)
}

Expand Down
28 changes: 3 additions & 25 deletions client/taskenv/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,8 +603,10 @@ func (b *Builder) setAlloc(alloc *structs.Allocation) *Builder {

tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)

// COMPAT(0.11): Remove in 0.11
b.otherPorts = make(map[string]string, len(tg.Tasks)*2)

// Protect against invalid allocs where AllocatedResources isn't set.
// TestClient_AddAllocError explicitly tests for this condition
if alloc.AllocatedResources != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this check anymore? Seems like if AllocatedResources is somehow nil here we'll end up with an incomplete task environment which would be harder to detect in testing than a panic.

Sadly we don't have a logger or error return value here, so our options are limited. If you think risking an incomplete environment is better than risking a panic, let's just add that as a comment here as the code will look kind of strange down the road.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept this conditional because TestClient_AddAllocError asserts that when an invalid alloc is passed, taskenv doesn't panic and NewTaskRunner returns an error. Not sure what the conditions the test actually tests for.

// Populate task resources
if tr, ok := alloc.AllocatedResources.Tasks[b.taskName]; ok {
Expand Down Expand Up @@ -645,30 +647,6 @@ func (b *Builder) setAlloc(alloc *structs.Allocation) *Builder {
addGroupPort(b.otherPorts, p)
}
}
} else if alloc.TaskResources != nil {
if tr, ok := alloc.TaskResources[b.taskName]; ok {
// Copy networks to prevent sharing
b.networks = make([]*structs.NetworkResource, len(tr.Networks))
for i, n := range tr.Networks {
b.networks[i] = n.Copy()
}

}

for taskName, resources := range alloc.TaskResources {
// Add ports from other tasks
if taskName == b.taskName {
continue
}
for _, nw := range resources.Networks {
for _, p := range nw.ReservedPorts {
addPort(b.otherPorts, taskName, nw.IP, p.Label, p.Value)
}
for _, p := range nw.DynamicPorts {
addPort(b.otherPorts, taskName, nw.IP, p.Label, p.Value)
}
}
}
}

upstreams := []structs.ConsulUpstream{}
Expand Down
4 changes: 4 additions & 0 deletions client/taskenv/env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,10 @@ func TestEnvironment_AsList_Old(t *testing.T) {
},
},
}

// simulate canonicalization on restore or fetch
a.Canonicalize()

task := a.Job.TaskGroups[0].Tasks[0]
task.Env = map[string]string{
"taskEnvKey": "taskEnvVal",
Expand Down
2 changes: 1 addition & 1 deletion command/alloc_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (c *AllocStatusCommand) Run(args []string) int {
}
c.Ui.Output(output)

if len(alloc.AllocatedResources.Shared.Networks) > 0 && alloc.AllocatedResources.Shared.Networks[0].HasPorts() {
schmichael marked this conversation as resolved.
Show resolved Hide resolved
if alloc.AllocatedResources != nil && len(alloc.AllocatedResources.Shared.Networks) > 0 && alloc.AllocatedResources.Shared.Networks[0].HasPorts() {
c.Ui.Output("")
c.Ui.Output(formatAllocNetworkInfo(alloc))
}
Expand Down
28 changes: 16 additions & 12 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -681,22 +681,23 @@ func (n *nomadFSM) applyAllocUpdate(buf []byte, index uint64) interface{} {
// prior to being inserted into MemDB.
structs.DenormalizeAllocationJobs(req.Job, req.Alloc)

// COMPAT(0.11): Remove in 0.11
// Calculate the total resources of allocations. It is pulled out in the
// payload to avoid encoding something that can be computed, but should be
// denormalized prior to being inserted into MemDB.
for _, alloc := range req.Alloc {
if alloc.Resources != nil {
continue
}
// COMPAT(0.11): Remove in 0.11
// Calculate the total resources of allocations. It is pulled out in the
// payload to avoid encoding something that can be computed, but should be
// denormalized prior to being inserted into MemDB.
if alloc.Resources == nil {
alloc.Resources = new(structs.Resources)
for _, task := range alloc.TaskResources {
alloc.Resources.Add(task)
}

alloc.Resources = new(structs.Resources)
for _, task := range alloc.TaskResources {
alloc.Resources.Add(task)
// Add the shared resources
alloc.Resources.Add(alloc.SharedResources)
}

// Add the shared resources
alloc.Resources.Add(alloc.SharedResources)
// Handle upgrade path
alloc.Canonicalize()
}

if err := n.state.UpsertAllocs(index, req.Alloc); err != nil {
Expand Down Expand Up @@ -1166,6 +1167,9 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error {
return err
}

// Handle upgrade path
alloc.Canonicalize()

if err := restore.AllocRestore(alloc); err != nil {
return err
}
Expand Down
66 changes: 66 additions & 0 deletions nomad/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1379,6 +1379,45 @@ func TestFSM_UpsertAllocs_StrippedResources(t *testing.T) {
}
}

// TestFSM_UpsertAllocs_Canonicalize asserts that allocations are Canonicalized
// to handle logs emited by servers running old versions
func TestFSM_UpsertAllocs_Canonicalize(t *testing.T) {
t.Parallel()
fsm := testFSM(t)

alloc := mock.Alloc()
alloc.Resources = &structs.Resources{} // COMPAT(0.11): Remove in 0.11, used to bypass resource creation in state store
alloc.AllocatedResources = nil

// pre-assert that our mock populates old field
require.NotEmpty(t, alloc.TaskResources)

fsm.State().UpsertJobSummary(1, mock.JobSummary(alloc.JobID))
req := structs.AllocUpdateRequest{
Alloc: []*structs.Allocation{alloc},
}
buf, err := structs.Encode(structs.AllocUpdateRequestType, req)
require.NoError(t, err)

resp := fsm.Apply(makeLog(buf))
require.Nil(t, resp)

// Verify we are registered
ws := memdb.NewWatchSet()
out, err := fsm.State().AllocByID(ws, alloc.ID)
require.NoError(t, err)

require.NotNil(t, out.AllocatedResources)
require.Contains(t, out.AllocatedResources.Tasks, "web")

expected := alloc.Copy()
expected.Canonicalize()
expected.CreateIndex = out.CreateIndex
expected.ModifyIndex = out.ModifyIndex
expected.AllocModifyIndex = out.AllocModifyIndex
require.Equal(t, expected, out)
}

func TestFSM_UpdateAllocFromClient_Unblock(t *testing.T) {
t.Parallel()
fsm := testFSM(t)
Expand Down Expand Up @@ -2453,6 +2492,33 @@ func TestFSM_SnapshotRestore_Allocs(t *testing.T) {
}
}

func TestFSM_SnapshotRestore_Allocs_Canonicalize(t *testing.T) {
t.Parallel()
// Add some state
fsm := testFSM(t)
state := fsm.State()
alloc := mock.Alloc()

// remove old versions to force migration path
alloc.AllocatedResources = nil

state.UpsertJobSummary(998, mock.JobSummary(alloc.JobID))
state.UpsertAllocs(1000, []*structs.Allocation{alloc})

// Verify the contents
fsm2 := testSnapshotRestore(t, fsm)
state2 := fsm2.State()
ws := memdb.NewWatchSet()
out, err := state2.AllocByID(ws, alloc.ID)
require.NoError(t, err)

require.NotNil(t, out.AllocatedResources)
require.Contains(t, out.AllocatedResources.Tasks, "web")

alloc.Canonicalize()
require.Equal(t, alloc, out)
}

func TestFSM_SnapshotRestore_Indexes(t *testing.T) {
t.Parallel()
// Add some state
Expand Down
5 changes: 5 additions & 0 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,11 @@ func (s *StateStore) UpsertPlanResults(index uint64, results *structs.ApplyPlanR
allocsToUpsert = append(allocsToUpsert, results.AllocsUpdated...)
allocsToUpsert = append(allocsToUpsert, allocsPreempted...)

// handle upgrade path
for _, alloc := range allocsToUpsert {
alloc.Canonicalize()
}

if err := s.upsertAllocsImpl(index, allocsToUpsert, txn); err != nil {
return err
}
Expand Down
Loading