Skip to content

Commit

Permalink
Merge pull request #755 from hashicorp/b-alloc-runner-destroy
Browse files Browse the repository at this point in the history
Fix AllocRunner not capturing destroy signal and tests
  • Loading branch information
dadgar committed Feb 4, 2016
2 parents c87a4f9 + 3308841 commit 6711525
Show file tree
Hide file tree
Showing 4 changed files with 241 additions and 19 deletions.
35 changes: 23 additions & 12 deletions client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,15 +326,8 @@ func (r *AllocRunner) Run() {
defer close(r.waitCh)
go r.dirtySyncState()

// Check if the allocation is in a terminal status
alloc := r.alloc
if alloc.TerminalStatus() {
r.logger.Printf("[DEBUG] client: aborting runner for alloc '%s', terminal status", r.alloc.ID)
return
}
r.logger.Printf("[DEBUG] client: starting runner for alloc '%s'", r.alloc.ID)

// Find the task group to run in the allocation
alloc := r.alloc
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
if tg == nil {
r.logger.Printf("[ERR] client: alloc '%s' for missing task group '%s'", alloc.ID, alloc.TaskGroup)
Expand All @@ -353,7 +346,18 @@ func (r *AllocRunner) Run() {
r.ctx = driver.NewExecContext(allocDir, r.alloc.ID)
}

// Check if the allocation is in a terminal status. In this case, we don't
// start any of the task runners and directly wait for the destroy signal to
// clean up the allocation.
if alloc.TerminalStatus() {
r.logger.Printf("[DEBUG] client: alloc %q in terminal status, waiting for destroy", r.alloc.ID)
r.handleDestroy()
r.logger.Printf("[DEBUG] client: terminating runner for alloc '%s'", r.alloc.ID)
return
}

// Start the task runners
r.logger.Printf("[DEBUG] client: starting task runners for alloc '%s'", r.alloc.ID)
r.taskLock.Lock()
for _, task := range tg.Tasks {
if _, ok := r.restored[task.Name]; ok {
Expand Down Expand Up @@ -415,7 +419,6 @@ OUTER:

// Destroy each sub-task
r.taskLock.Lock()
defer r.taskLock.Unlock()
for _, tr := range r.tasks {
tr.Destroy()
}
Expand All @@ -424,12 +427,21 @@ OUTER:
for _, tr := range r.tasks {
<-tr.WaitCh()
}
r.taskLock.Unlock()

// Final state sync
r.retrySyncState(nil)

// Check if we should destroy our state
if r.destroy {
// Block until we should destroy the state of the alloc
r.handleDestroy()
r.logger.Printf("[DEBUG] client: terminating runner for alloc '%s'", r.alloc.ID)
}

// handleDestroy blocks till the AllocRunner should be destroyed and does the
// necessary cleanup.
func (r *AllocRunner) handleDestroy() {
select {
case <-r.destroyCh:
if err := r.DestroyContext(); err != nil {
r.logger.Printf("[ERR] client: failed to destroy context for alloc '%s': %v",
r.alloc.ID, err)
Expand All @@ -439,7 +451,6 @@ OUTER:
r.alloc.ID, err)
}
}
r.logger.Printf("[DEBUG] client: terminating runner for alloc '%s'", r.alloc.ID)
}

// Update is used to update the allocation of the context
Expand Down
217 changes: 214 additions & 3 deletions client/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,95 @@ func TestAllocRunner_SimpleRun(t *testing.T) {
})
}

func TestAllocRunner_TerminalUpdate_Destroy(t *testing.T) {
ctestutil.ExecCompatible(t)
upd, ar := testAllocRunner(false)

// Ensure task takes some time
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
task.Config["command"] = "/bin/sleep"
task.Config["args"] = []string{"10"}
go ar.Run()

testutil.WaitForResult(func() (bool, error) {
if upd.Count == 0 {
return false, fmt.Errorf("No updates")
}
last := upd.Allocs[upd.Count-1]
if last.ClientStatus == structs.AllocClientStatusRunning {
return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusRunning)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})

// Update the alloc to be terminal which should cause the alloc runner to
// stop the tasks and wait for a destroy.
update := ar.alloc.Copy()
update.DesiredStatus = structs.AllocDesiredStatusStop
ar.Update(update)

testutil.WaitForResult(func() (bool, error) {
if upd.Count == 0 {
return false, nil
}

// Check the status has changed.
last := upd.Allocs[upd.Count-1]
if last.ClientStatus != structs.AllocClientStatusDead {
return false, fmt.Errorf("got client status %v; want %v", last.ClientStatus, structs.AllocClientStatusDead)
}

// Check the state still exists
if _, err := os.Stat(ar.stateFilePath()); err != nil {
return false, fmt.Errorf("state file destroyed: %v", err)
}

// Check the alloc directory still exists
if _, err := os.Stat(ar.ctx.AllocDir.AllocDir); err != nil {
return false, fmt.Errorf("alloc dir destroyed: %v", ar.ctx.AllocDir.AllocDir)
}

return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})

// Send the destroy signal and ensure the AllocRunner cleans up.
ar.Destroy()

testutil.WaitForResult(func() (bool, error) {
if upd.Count == 0 {
return false, nil
}

// Check the status has changed.
last := upd.Allocs[upd.Count-1]
if last.ClientStatus != structs.AllocClientStatusDead {
return false, fmt.Errorf("got client status %v; want %v", last.ClientStatus, structs.AllocClientStatusDead)
}

// Check the state was cleaned
if _, err := os.Stat(ar.stateFilePath()); err == nil {
return false, fmt.Errorf("state file still exists: %v", ar.stateFilePath())
} else if !os.IsNotExist(err) {
return false, fmt.Errorf("stat err: %v", err)
}

// Check the alloc directory was cleaned
if _, err := os.Stat(ar.ctx.AllocDir.AllocDir); err == nil {
return false, fmt.Errorf("alloc dir still exists: %v", ar.ctx.AllocDir.AllocDir)
} else if !os.IsNotExist(err) {
return false, fmt.Errorf("stat err: %v", err)
}

return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}

func TestAllocRunner_Destroy(t *testing.T) {
ctestutil.ExecCompatible(t)
upd, ar := testAllocRunner(false)
Expand All @@ -83,10 +172,30 @@ func TestAllocRunner_Destroy(t *testing.T) {
if upd.Count == 0 {
return false, nil
}

// Check the status has changed.
last := upd.Allocs[upd.Count-1]
return last.ClientStatus == structs.AllocClientStatusDead, nil
if last.ClientStatus != structs.AllocClientStatusDead {
return false, fmt.Errorf("got client status %v; want %v", last.ClientStatus, structs.AllocClientStatusDead)
}

// Check the state was cleaned
if _, err := os.Stat(ar.stateFilePath()); err == nil {
return false, fmt.Errorf("state file still exists: %v", ar.stateFilePath())
} else if !os.IsNotExist(err) {
return false, fmt.Errorf("stat err: %v", err)
}

// Check the alloc directory was cleaned
if _, err := os.Stat(ar.ctx.AllocDir.AllocDir); err == nil {
return false, fmt.Errorf("alloc dir still exists: %v", ar.ctx.AllocDir.AllocDir)
} else if !os.IsNotExist(err) {
return false, fmt.Errorf("stat err: %v", err)
}

return true, nil
}, func(err error) {
t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.alloc.TaskStates)
t.Fatalf("err: %v", err)
})

if time.Since(start) > 15*time.Second {
Expand Down Expand Up @@ -129,7 +238,6 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) {
task.Config["command"] = "/bin/sleep"
task.Config["args"] = []string{"10"}
go ar.Run()
defer ar.Destroy()

// Snapshot state
testutil.WaitForResult(func() (bool, error) {
Expand Down Expand Up @@ -171,3 +279,106 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) {
t.Fatalf("took too long to terminate")
}
}

func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) {
ctestutil.ExecCompatible(t)
upd, ar := testAllocRunner(false)

// Ensure task takes some time
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
task.Config["command"] = "/bin/sleep"
task.Config["args"] = []string{"10"}
go ar.Run()

testutil.WaitForResult(func() (bool, error) {
if upd.Count == 0 {
return false, fmt.Errorf("No updates")
}
last := upd.Allocs[upd.Count-1]
if last.ClientStatus == structs.AllocClientStatusRunning {
return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusRunning)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})

// Update the alloc to be terminal which should cause the alloc runner to
// stop the tasks and wait for a destroy.
update := ar.alloc.Copy()
update.DesiredStatus = structs.AllocDesiredStatusStop
ar.Update(update)

testutil.WaitForResult(func() (bool, error) {
return ar.alloc.DesiredStatus == structs.AllocDesiredStatusStop, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})

err := ar.SaveState()
if err != nil {
t.Fatalf("err: %v", err)
}

// Ensure both alloc runners don't destroy
ar.destroy = true

// Create a new alloc runner
consulClient, err := NewConsulService(&consulServiceConfig{ar.logger, "127.0.0.1:8500", "", "", false, false, &structs.Node{}})
ar2 := NewAllocRunner(ar.logger, ar.config, upd.Update,
&structs.Allocation{ID: ar.alloc.ID}, consulClient)
err = ar2.RestoreState()
if err != nil {
t.Fatalf("err: %v", err)
}
go ar2.Run()

testutil.WaitForResult(func() (bool, error) {
// Check the state still exists
if _, err := os.Stat(ar.stateFilePath()); err != nil {
return false, fmt.Errorf("state file destroyed: %v", err)
}

// Check the alloc directory still exists
if _, err := os.Stat(ar.ctx.AllocDir.AllocDir); err != nil {
return false, fmt.Errorf("alloc dir destroyed: %v", ar.ctx.AllocDir.AllocDir)
}

return true, nil
}, func(err error) {
t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.alloc.TaskStates)
})

// Send the destroy signal and ensure the AllocRunner cleans up.
ar2.Destroy()

testutil.WaitForResult(func() (bool, error) {
if upd.Count == 0 {
return false, nil
}

// Check the status has changed.
last := upd.Allocs[upd.Count-1]
if last.ClientStatus != structs.AllocClientStatusDead {
return false, fmt.Errorf("got client status %v; want %v", last.ClientStatus, structs.AllocClientStatusDead)
}

// Check the state was cleaned
if _, err := os.Stat(ar.stateFilePath()); err == nil {
return false, fmt.Errorf("state file still exists: %v", ar.stateFilePath())
} else if !os.IsNotExist(err) {
return false, fmt.Errorf("stat err: %v", err)
}

// Check the alloc directory was cleaned
if _, err := os.Stat(ar.ctx.AllocDir.AllocDir); err == nil {
return false, fmt.Errorf("alloc dir still exists: %v", ar.ctx.AllocDir.AllocDir)
} else if !os.IsNotExist(err) {
return false, fmt.Errorf("stat err: %v", err)
}

return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
6 changes: 3 additions & 3 deletions client/allocdir/alloc_dir.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type AllocDir struct {
TaskDirs map[string]string

// A list of locations the shared alloc has been mounted to.
mounted []string
Mounted []string
}

// AllocFileInfo holds information about a file inside the AllocDir
Expand Down Expand Up @@ -64,7 +64,7 @@ func NewAllocDir(allocDir string) *AllocDir {
// Tears down previously build directory structure.
func (d *AllocDir) Destroy() error {
// Unmount all mounted shared alloc dirs.
for _, m := range d.mounted {
for _, m := range d.Mounted {
if err := d.unmountSharedDir(m); err != nil {
return fmt.Errorf("Failed to unmount shared directory: %v", err)
}
Expand Down Expand Up @@ -233,7 +233,7 @@ func (d *AllocDir) MountSharedDir(task string) error {
return fmt.Errorf("Failed to mount shared directory for task %v: %v", task, err)
}

d.mounted = append(d.mounted, taskLoc)
d.Mounted = append(d.Mounted, taskLoc)
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion testutil/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type testFn func() (bool, error)
type errorFn func(error)

func WaitForResult(test testFn, error errorFn) {
WaitForResultRetries(1000*TestMultiplier(), test, error)
WaitForResultRetries(2000*TestMultiplier(), test, error)
}

func WaitForResultRetries(retries int64, test testFn, error errorFn) {
Expand Down

0 comments on commit 6711525

Please sign in to comment.