Skip to content

Commit

Permalink
Merge pull request #1679 from hashicorp/fix-upgrade-alloc
Browse files Browse the repository at this point in the history
Adding LocalDisk to alloc.Job
  • Loading branch information
diptanu authored Sep 2, 2016
2 parents 56b2b36 + 8dcbeee commit 41d5d63
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 35 deletions.
91 changes: 56 additions & 35 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,23 +357,9 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error {
return fmt.Errorf("unable to create job summary: %v", err)
}

// COMPAT 0.4.1 -> 0.5 Create the LocalDisk if it's nil by adding up DiskMB
// from task resources
for _, tg := range job.TaskGroups {
if tg.LocalDisk != nil {
continue
}
var diskMB int
for _, task := range tg.Tasks {
if task.Resources != nil {
diskMB += task.Resources.DiskMB
task.Resources.DiskMB = 0
}
}
tg.LocalDisk = &structs.LocalDisk{
DiskMB: diskMB,
}
}
// Create the LocalDisk if it's nil by adding up DiskMB from task resources.
// COMPAT 0.4.1 -> 0.5
s.addLocalDiskToTaskGroups(job)

// Insert the job
if err := txn.Insert("jobs", job); err != nil {
Expand Down Expand Up @@ -975,6 +961,12 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er
return fmt.Errorf("error updating job summary: %v", err)
}

// Create the LocalDisk if it's nil by adding up DiskMB from task resources.
// COMPAT 0.4.1 -> 0.5
if alloc.Job != nil {
s.addLocalDiskToTaskGroups(alloc.Job)
}

if err := txn.Insert("allocs", alloc); err != nil {
return fmt.Errorf("alloc insert failed: %v", err)
}
Expand Down Expand Up @@ -1664,6 +1656,25 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat
return nil
}

// addLocalDiskToTaskGroups adds missing LocalDisk objects to TaskGroups
func (s *StateStore) addLocalDiskToTaskGroups(job *structs.Job) {
for _, tg := range job.TaskGroups {
if tg.LocalDisk != nil {
continue
}
var diskMB int
for _, task := range tg.Tasks {
if task.Resources != nil {
diskMB += task.Resources.DiskMB
task.Resources.DiskMB = 0
}
}
tg.LocalDisk = &structs.LocalDisk{
DiskMB: diskMB,
}
}
}

// StateSnapshot is used to provide a point-in-time snapshot
type StateSnapshot struct {
StateStore
Expand Down Expand Up @@ -1704,23 +1715,9 @@ func (r *StateRestore) JobRestore(job *structs.Job) error {
r.items.Add(watch.Item{Table: "jobs"})
r.items.Add(watch.Item{Job: job.ID})

// COMPAT 0.4.1 -> 0.5 Create the LocalDisk if it's nil by adding up DiskMB
// from task resources
for _, tg := range job.TaskGroups {
if tg.LocalDisk != nil {
continue
}
var diskMB int
for _, task := range tg.Tasks {
if task.Resources != nil {
diskMB += task.Resources.DiskMB
task.Resources.DiskMB = 0
}
}
tg.LocalDisk = &structs.LocalDisk{
DiskMB: diskMB,
}
}
// Create the LocalDisk if it's nil by adding up DiskMB from task resources.
// COMPAT 0.4.1 -> 0.5
r.addLocalDiskToTaskGroups(job)

if err := r.txn.Insert("jobs", job); err != nil {
return fmt.Errorf("job insert failed: %v", err)
Expand All @@ -1746,14 +1743,19 @@ func (r *StateRestore) AllocRestore(alloc *structs.Allocation) error {
r.items.Add(watch.Item{AllocJob: alloc.JobID})
r.items.Add(watch.Item{AllocNode: alloc.NodeID})

//COMPAT 0.4.1 -> 0.5
// Set the shared resources if it's not present
// COMPAT 0.4.1 -> 0.5
if alloc.SharedResources == nil {
alloc.SharedResources = &structs.Resources{
DiskMB: alloc.Resources.DiskMB,
}
}

// Create the LocalDisk if it's nil by adding up DiskMB from task resources.
if alloc.Job != nil {
r.addLocalDiskToTaskGroups(alloc.Job)
}

if err := r.txn.Insert("allocs", alloc); err != nil {
return fmt.Errorf("alloc insert failed: %v", err)
}
Expand Down Expand Up @@ -1794,6 +1796,25 @@ func (r *StateRestore) VaultAccessorRestore(accessor *structs.VaultAccessor) err
return nil
}

// addLocalDiskToTaskGroups adds missing LocalDisk objects to TaskGroups
func (r *StateRestore) addLocalDiskToTaskGroups(job *structs.Job) {
for _, tg := range job.TaskGroups {
if tg.LocalDisk != nil {
continue
}
var diskMB int
for _, task := range tg.Tasks {
if task.Resources != nil {
diskMB += task.Resources.DiskMB
task.Resources.DiskMB = 0
}
}
tg.LocalDisk = &structs.LocalDisk{
DiskMB: diskMB,
}
}
}

// stateWatch holds shared state for watching updates. This is
// outside of StateStore so it can be shared with snapshots.
type stateWatch struct {
Expand Down
59 changes: 59 additions & 0 deletions nomad/state/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1792,6 +1792,33 @@ func TestStateStore_UpsertAlloc_Alloc(t *testing.T) {
notify.verify(t)
}

func TestStateStore_UpsertAlloc_NoLocalDisk(t *testing.T) {
state := testStateStore(t)
alloc := mock.Alloc()
alloc.Job.TaskGroups[0].LocalDisk = nil
alloc.Job.TaskGroups[0].Tasks[0].Resources.DiskMB = 120

if err := state.UpsertJob(999, alloc.Job); err != nil {
t.Fatalf("err: %v", err)
}

err := state.UpsertAllocs(1000, []*structs.Allocation{alloc})
if err != nil {
t.Fatalf("err: %v", err)
}

out, err := state.AllocByID(alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}

expected := alloc.Copy()
expected.Job.TaskGroups[0].LocalDisk = &structs.LocalDisk{DiskMB: 120}
if !reflect.DeepEqual(expected, out) {
t.Fatalf("bad: %#v %#v", expected, out)
}
}

func TestStateStore_UpdateAlloc_Alloc(t *testing.T) {
state := testStateStore(t)
alloc := mock.Alloc()
Expand Down Expand Up @@ -2489,6 +2516,38 @@ func TestStateStore_RestoreAlloc(t *testing.T) {
notify.verify(t)
}

func TestStateStore_RestoreAlloc_NoLocalDisk(t *testing.T) {
state := testStateStore(t)
alloc := mock.Alloc()
alloc.Job.TaskGroups[0].LocalDisk = nil
alloc.Job.TaskGroups[0].Tasks[0].Resources.DiskMB = 120

restore, err := state.Restore()
if err != nil {
t.Fatalf("err: %v", err)
}

err = restore.AllocRestore(alloc)
if err != nil {
t.Fatalf("err: %v", err)
}

restore.Commit()

out, err := state.AllocByID(alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}

expected := alloc.Copy()
expected.Job.TaskGroups[0].LocalDisk = &structs.LocalDisk{DiskMB: 120}
expected.Job.TaskGroups[0].Tasks[0].Resources.DiskMB = 0

if !reflect.DeepEqual(out, expected) {
t.Fatalf("Bad: %#v %#v", out, expected)
}
}

func TestStateStore_SetJobStatus_ForceStatus(t *testing.T) {
state := testStateStore(t)
watcher := watch.NewItems()
Expand Down

0 comments on commit 41d5d63

Please sign in to comment.