Skip to content

Commit

Permalink
Merge pull request #6922 from hashicorp/b-alloc-canoncalize
Browse files Browse the repository at this point in the history
Handle Upgrades and Alloc.TaskResources modification
  • Loading branch information
Mahmood Ali committed Jan 28, 2020
2 parents eb0acc3 + 99bc650 commit b789b50
Show file tree
Hide file tree
Showing 13 changed files with 214 additions and 105 deletions.
24 changes: 4 additions & 20 deletions client/allocrunner/taskrunner/service_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,8 @@ func newServiceHook(c serviceHookConfig) *serviceHook {
delay: c.task.ShutdownDelay,
}

// COMPAT(0.11): AllocatedResources was added in 0.9 so assume its set
// in 0.11.
if c.alloc.AllocatedResources != nil {
if res := c.alloc.AllocatedResources.Tasks[c.task.Name]; res != nil {
h.networks = res.Networks
}
} else {
if res := c.alloc.TaskResources[c.task.Name]; res != nil {
h.networks = res.Networks
}
if res := c.alloc.AllocatedResources.Tasks[c.task.Name]; res != nil {
h.networks = res.Networks
}

if c.alloc.DeploymentStatus != nil && c.alloc.DeploymentStatus.Canary {
Expand Down Expand Up @@ -116,17 +108,9 @@ func (h *serviceHook) Update(ctx context.Context, req *interfaces.TaskUpdateRequ
canary = req.Alloc.DeploymentStatus.Canary
}

// COMPAT(0.11): AllocatedResources was added in 0.9 so assume its set
// in 0.11.
var networks structs.Networks
if req.Alloc.AllocatedResources != nil {
if res := req.Alloc.AllocatedResources.Tasks[h.taskName]; res != nil {
networks = res.Networks
}
} else {
if res := req.Alloc.TaskResources[h.taskName]; res != nil {
networks = res.Networks
}
if res := req.Alloc.AllocatedResources.Tasks[h.taskName]; res != nil {
networks = res.Networks
}

task := req.Alloc.LookupTask(h.taskName)
Expand Down
51 changes: 12 additions & 39 deletions client/allocrunner/taskrunner/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,31 +294,15 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) {

// Pull out the task's resources
ares := tr.alloc.AllocatedResources
if ares != nil {
tres, ok := ares.Tasks[tr.taskName]
if !ok {
return nil, fmt.Errorf("no task resources found on allocation")
}
tr.taskResources = tres
} else {
// COMPAT(0.11): Upgrade from 0.8 resources to 0.9+ resources
// Grab the old task resources
oldTr, ok := tr.alloc.TaskResources[tr.taskName]
if !ok {
return nil, fmt.Errorf("no task resources found on allocation")
}
if ares == nil {
return nil, fmt.Errorf("no task resources found on allocation")
}

// Convert the old to new
tr.taskResources = &structs.AllocatedTaskResources{
Cpu: structs.AllocatedCpuResources{
CpuShares: int64(oldTr.CPU),
},
Memory: structs.AllocatedMemoryResources{
MemoryMB: int64(oldTr.MemoryMB),
},
Networks: oldTr.Networks,
}
tres, ok := ares.Tasks[tr.taskName]
if !ok {
return nil, fmt.Errorf("no task resources found on allocation")
}
tr.taskResources = tres

// Build the restart tracker.
tg := tr.alloc.Job.LookupTaskGroup(tr.alloc.TaskGroup)
Expand Down Expand Up @@ -1253,15 +1237,9 @@ func (tr *TaskRunner) UpdateStats(ru *cstructs.TaskResourceUsage) {
func (tr *TaskRunner) setGaugeForMemory(ru *cstructs.TaskResourceUsage) {
alloc := tr.Alloc()
var allocatedMem float32
if alloc.AllocatedResources != nil {
if taskRes := alloc.AllocatedResources.Tasks[tr.taskName]; taskRes != nil {
// Convert to bytes to match other memory metrics
allocatedMem = float32(taskRes.Memory.MemoryMB) * 1024 * 1024
}
} else if taskRes := alloc.TaskResources[tr.taskName]; taskRes != nil {
// COMPAT(0.11) Remove in 0.11 when TaskResources is removed
allocatedMem = float32(taskRes.MemoryMB) * 1024 * 1024

if taskRes := alloc.AllocatedResources.Tasks[tr.taskName]; taskRes != nil {
// Convert to bytes to match other memory metrics
allocatedMem = float32(taskRes.Memory.MemoryMB) * 1024 * 1024
}

if !tr.clientConfig.DisableTaggedMetrics {
Expand Down Expand Up @@ -1303,13 +1281,8 @@ func (tr *TaskRunner) setGaugeForMemory(ru *cstructs.TaskResourceUsage) {
func (tr *TaskRunner) setGaugeForCPU(ru *cstructs.TaskResourceUsage) {
alloc := tr.Alloc()
var allocatedCPU float32
if alloc.AllocatedResources != nil {
if taskRes := alloc.AllocatedResources.Tasks[tr.taskName]; taskRes != nil {
allocatedCPU = float32(taskRes.Cpu.CpuShares)
}
} else if taskRes := alloc.TaskResources[tr.taskName]; taskRes != nil {
// COMPAT(0.11) Remove in 0.11 when TaskResources is removed
allocatedCPU = float32(taskRes.CPU)
if taskRes := alloc.AllocatedResources.Tasks[tr.taskName]; taskRes != nil {
allocatedCPU = float32(taskRes.Cpu.CpuShares)
}

if !tr.clientConfig.DisableTaggedMetrics {
Expand Down
4 changes: 4 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1995,6 +1995,10 @@ OUTER:
// Ensure that we received all the allocations we wanted
pulledAllocs = make(map[string]*structs.Allocation, len(allocsResp.Allocs))
for _, alloc := range allocsResp.Allocs {

// handle an old Server
alloc.Canonicalize()

pulledAllocs[alloc.ID] = alloc
}

Expand Down
6 changes: 0 additions & 6 deletions client/state/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,6 @@ func TestStateDB_Allocations(t *testing.T) {
alloc1 := mock.Alloc()
alloc2 := mock.BatchAlloc()

//XXX Sadly roundtripping allocs loses time.Duration type
// information from the Config map[string]interface{}. As
// the mock driver itself with unmarshal run_for into the
// proper type, we can safely ignore it here.
delete(alloc2.Job.TaskGroups[0].Tasks[0].Config, "run_for")

require.NoError(db.PutAllocation(alloc1))
require.NoError(db.PutAllocation(alloc2))

Expand Down
4 changes: 4 additions & 0 deletions client/state/state_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,10 @@ func (s *BoltStateDB) getAllAllocations(tx *boltdd.Tx) ([]*structs.Allocation, m
continue
}

// Handle upgrade path
ae.Alloc.Canonicalize()
ae.Alloc.Job.Canonicalize()

allocs = append(allocs, ae.Alloc)
}

Expand Down
28 changes: 3 additions & 25 deletions client/taskenv/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,8 +603,10 @@ func (b *Builder) setAlloc(alloc *structs.Allocation) *Builder {

tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)

// COMPAT(0.11): Remove in 0.11
b.otherPorts = make(map[string]string, len(tg.Tasks)*2)

// Protect against invalid allocs where AllocatedResources isn't set.
// TestClient_AddAllocError explicitly tests for this condition
if alloc.AllocatedResources != nil {
// Populate task resources
if tr, ok := alloc.AllocatedResources.Tasks[b.taskName]; ok {
Expand Down Expand Up @@ -645,30 +647,6 @@ func (b *Builder) setAlloc(alloc *structs.Allocation) *Builder {
addGroupPort(b.otherPorts, p)
}
}
} else if alloc.TaskResources != nil {
if tr, ok := alloc.TaskResources[b.taskName]; ok {
// Copy networks to prevent sharing
b.networks = make([]*structs.NetworkResource, len(tr.Networks))
for i, n := range tr.Networks {
b.networks[i] = n.Copy()
}

}

for taskName, resources := range alloc.TaskResources {
// Add ports from other tasks
if taskName == b.taskName {
continue
}
for _, nw := range resources.Networks {
for _, p := range nw.ReservedPorts {
addPort(b.otherPorts, taskName, nw.IP, p.Label, p.Value)
}
for _, p := range nw.DynamicPorts {
addPort(b.otherPorts, taskName, nw.IP, p.Label, p.Value)
}
}
}
}

upstreams := []structs.ConsulUpstream{}
Expand Down
4 changes: 4 additions & 0 deletions client/taskenv/env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,10 @@ func TestEnvironment_AsList_Old(t *testing.T) {
},
},
}

// simulate canonicalization on restore or fetch
a.Canonicalize()

task := a.Job.TaskGroups[0].Tasks[0]
task.Env = map[string]string{
"taskEnvKey": "taskEnvVal",
Expand Down
2 changes: 1 addition & 1 deletion command/alloc_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (c *AllocStatusCommand) Run(args []string) int {
}
c.Ui.Output(output)

if len(alloc.AllocatedResources.Shared.Networks) > 0 && alloc.AllocatedResources.Shared.Networks[0].HasPorts() {
if alloc.AllocatedResources != nil && len(alloc.AllocatedResources.Shared.Networks) > 0 && alloc.AllocatedResources.Shared.Networks[0].HasPorts() {
c.Ui.Output("")
c.Ui.Output(formatAllocNetworkInfo(alloc))
}
Expand Down
28 changes: 16 additions & 12 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -681,22 +681,23 @@ func (n *nomadFSM) applyAllocUpdate(buf []byte, index uint64) interface{} {
// prior to being inserted into MemDB.
structs.DenormalizeAllocationJobs(req.Job, req.Alloc)

// COMPAT(0.11): Remove in 0.11
// Calculate the total resources of allocations. It is pulled out in the
// payload to avoid encoding something that can be computed, but should be
// denormalized prior to being inserted into MemDB.
for _, alloc := range req.Alloc {
if alloc.Resources != nil {
continue
}
// COMPAT(0.11): Remove in 0.11
// Calculate the total resources of allocations. It is pulled out in the
// payload to avoid encoding something that can be computed, but should be
// denormalized prior to being inserted into MemDB.
if alloc.Resources == nil {
alloc.Resources = new(structs.Resources)
for _, task := range alloc.TaskResources {
alloc.Resources.Add(task)
}

alloc.Resources = new(structs.Resources)
for _, task := range alloc.TaskResources {
alloc.Resources.Add(task)
// Add the shared resources
alloc.Resources.Add(alloc.SharedResources)
}

// Add the shared resources
alloc.Resources.Add(alloc.SharedResources)
// Handle upgrade path
alloc.Canonicalize()
}

if err := n.state.UpsertAllocs(index, req.Alloc); err != nil {
Expand Down Expand Up @@ -1166,6 +1167,9 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error {
return err
}

// Handle upgrade path
alloc.Canonicalize()

if err := restore.AllocRestore(alloc); err != nil {
return err
}
Expand Down
66 changes: 66 additions & 0 deletions nomad/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1379,6 +1379,45 @@ func TestFSM_UpsertAllocs_StrippedResources(t *testing.T) {
}
}

// TestFSM_UpsertAllocs_Canonicalize asserts that allocations are Canonicalized
// to handle logs emited by servers running old versions
func TestFSM_UpsertAllocs_Canonicalize(t *testing.T) {
t.Parallel()
fsm := testFSM(t)

alloc := mock.Alloc()
alloc.Resources = &structs.Resources{} // COMPAT(0.11): Remove in 0.11, used to bypass resource creation in state store
alloc.AllocatedResources = nil

// pre-assert that our mock populates old field
require.NotEmpty(t, alloc.TaskResources)

fsm.State().UpsertJobSummary(1, mock.JobSummary(alloc.JobID))
req := structs.AllocUpdateRequest{
Alloc: []*structs.Allocation{alloc},
}
buf, err := structs.Encode(structs.AllocUpdateRequestType, req)
require.NoError(t, err)

resp := fsm.Apply(makeLog(buf))
require.Nil(t, resp)

// Verify we are registered
ws := memdb.NewWatchSet()
out, err := fsm.State().AllocByID(ws, alloc.ID)
require.NoError(t, err)

require.NotNil(t, out.AllocatedResources)
require.Contains(t, out.AllocatedResources.Tasks, "web")

expected := alloc.Copy()
expected.Canonicalize()
expected.CreateIndex = out.CreateIndex
expected.ModifyIndex = out.ModifyIndex
expected.AllocModifyIndex = out.AllocModifyIndex
require.Equal(t, expected, out)
}

func TestFSM_UpdateAllocFromClient_Unblock(t *testing.T) {
t.Parallel()
fsm := testFSM(t)
Expand Down Expand Up @@ -2453,6 +2492,33 @@ func TestFSM_SnapshotRestore_Allocs(t *testing.T) {
}
}

func TestFSM_SnapshotRestore_Allocs_Canonicalize(t *testing.T) {
t.Parallel()
// Add some state
fsm := testFSM(t)
state := fsm.State()
alloc := mock.Alloc()

// remove old versions to force migration path
alloc.AllocatedResources = nil

state.UpsertJobSummary(998, mock.JobSummary(alloc.JobID))
state.UpsertAllocs(1000, []*structs.Allocation{alloc})

// Verify the contents
fsm2 := testSnapshotRestore(t, fsm)
state2 := fsm2.State()
ws := memdb.NewWatchSet()
out, err := state2.AllocByID(ws, alloc.ID)
require.NoError(t, err)

require.NotNil(t, out.AllocatedResources)
require.Contains(t, out.AllocatedResources.Tasks, "web")

alloc.Canonicalize()
require.Equal(t, alloc, out)
}

func TestFSM_SnapshotRestore_Indexes(t *testing.T) {
t.Parallel()
// Add some state
Expand Down
5 changes: 5 additions & 0 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,11 @@ func (s *StateStore) UpsertPlanResults(index uint64, results *structs.ApplyPlanR
allocsToUpsert = append(allocsToUpsert, results.AllocsUpdated...)
allocsToUpsert = append(allocsToUpsert, allocsPreempted...)

// handle upgrade path
for _, alloc := range allocsToUpsert {
alloc.Canonicalize()
}

if err := s.upsertAllocsImpl(index, allocsToUpsert, txn); err != nil {
return err
}
Expand Down
Loading

0 comments on commit b789b50

Please sign in to comment.