Skip to content

Commit

Permalink
Merge pull request #751 from hashicorp/b-client-updates
Browse files Browse the repository at this point in the history
Client handles updates to KillTimeout and Restart Policy
  • Loading branch information
dadgar committed Feb 4, 2016
2 parents 099f5ad + cc90670 commit cef6bff
Show file tree
Hide file tree
Showing 12 changed files with 145 additions and 43 deletions.
29 changes: 10 additions & 19 deletions client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,11 @@ type AllocRunner struct {

dirtyCh chan struct{}

ctx *driver.ExecContext
tasks map[string]*TaskRunner
taskStates map[string]*structs.TaskState
restored map[string]struct{}
RestartPolicy *structs.RestartPolicy
taskLock sync.RWMutex
ctx *driver.ExecContext
tasks map[string]*TaskRunner
taskStates map[string]*structs.TaskState
restored map[string]struct{}
taskLock sync.RWMutex

taskStatusLock sync.RWMutex

Expand All @@ -62,7 +61,6 @@ type allocRunnerState struct {
Alloc *structs.Allocation
AllocClientStatus string
AllocClientDescription string
RestartPolicy *structs.RestartPolicy
TaskStates map[string]*structs.TaskState
Context *driver.ExecContext
}
Expand Down Expand Up @@ -102,7 +100,6 @@ func (r *AllocRunner) RestoreState() error {

// Restore fields
r.alloc = snap.Alloc
r.RestartPolicy = snap.RestartPolicy
r.ctx = snap.Context
r.allocClientStatus = snap.AllocClientStatus
r.allocClientDescription = snap.AllocClientDescription
Expand All @@ -115,9 +112,8 @@ func (r *AllocRunner) RestoreState() error {
r.restored[name] = struct{}{}

task := &structs.Task{Name: name}
restartTracker := newRestartTracker(r.RestartPolicy, r.alloc.Job.Type)
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx,
r.alloc, task, restartTracker, r.consulService)
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx, r.alloc,
task, r.consulService)
r.tasks[name] = tr

// Skip tasks in terminal states.
Expand Down Expand Up @@ -163,7 +159,6 @@ func (r *AllocRunner) saveAllocRunnerState() error {
defer r.allocLock.Unlock()
snap := allocRunnerState{
Alloc: r.alloc,
RestartPolicy: r.RestartPolicy,
Context: r.ctx,
AllocClientStatus: r.allocClientStatus,
AllocClientDescription: r.allocClientDescription,
Expand Down Expand Up @@ -347,9 +342,6 @@ func (r *AllocRunner) Run() {
return
}

// Extract the RestartPolicy from the TG and set it on the alloc
r.RestartPolicy = tg.RestartPolicy

// Create the execution context
if r.ctx == nil {
allocDir := allocdir.NewAllocDir(filepath.Join(r.config.AllocDir, r.alloc.ID))
Expand All @@ -370,9 +362,8 @@ func (r *AllocRunner) Run() {

// Merge in the task resources
task.Resources = alloc.TaskResources[task.Name]
restartTracker := newRestartTracker(r.RestartPolicy, r.alloc.Job.Type)
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx,
r.alloc, task, restartTracker, r.consulService)
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx, r.alloc,
task, r.consulService)
r.tasks[task.Name] = tr
go tr.Run()
}
Expand Down Expand Up @@ -413,7 +404,7 @@ OUTER:
break FOUND
}
}
tr.Update(task)
tr.Update(update)
}
r.taskLock.RUnlock()

Expand Down
8 changes: 6 additions & 2 deletions client/alloc_runner_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package client

import (
"fmt"
"os"
"testing"
"time"
Expand Down Expand Up @@ -49,10 +50,13 @@ func TestAllocRunner_SimpleRun(t *testing.T) {

testutil.WaitForResult(func() (bool, error) {
if upd.Count == 0 {
return false, nil
return false, fmt.Errorf("No updates")
}
last := upd.Allocs[upd.Count-1]
return last.ClientStatus == structs.AllocClientStatusDead, nil
if last.ClientStatus == structs.AllocClientStatusDead {
return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusDead)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
Expand Down
3 changes: 3 additions & 0 deletions client/driver/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,9 @@ func (h *DockerHandle) WaitCh() chan *cstructs.WaitResult {
}

func (h *DockerHandle) Update(task *structs.Task) error {
// Store the updated kill timeout.
h.killTimeout = task.KillTimeout

// Update is not possible
return nil
}
Expand Down
3 changes: 2 additions & 1 deletion client/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ type DriverHandle interface {
// WaitCh is used to return a channel used wait for task completion
WaitCh() chan *cstructs.WaitResult

// Update is used to update the task if possible
// Update is used to update the task if possible and update task related
// configurations.
Update(task *structs.Task) error

// Kill is used to stop the task
Expand Down
3 changes: 3 additions & 0 deletions client/driver/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@ func (h *execHandle) WaitCh() chan *cstructs.WaitResult {
}

func (h *execHandle) Update(task *structs.Task) error {
// Store the updated kill timeout.
h.killTimeout = task.KillTimeout

// Update is not possible
return nil
}
Expand Down
3 changes: 3 additions & 0 deletions client/driver/java.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,9 @@ func (h *javaHandle) WaitCh() chan *cstructs.WaitResult {
}

func (h *javaHandle) Update(task *structs.Task) error {
// Store the updated kill timeout.
h.killTimeout = task.KillTimeout

// Update is not possible
return nil
}
Expand Down
3 changes: 3 additions & 0 deletions client/driver/qemu.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,9 @@ func (h *qemuHandle) WaitCh() chan *cstructs.WaitResult {
}

func (h *qemuHandle) Update(task *structs.Task) error {
// Store the updated kill timeout.
h.killTimeout = task.KillTimeout

// Update is not possible
return nil
}
Expand Down
3 changes: 3 additions & 0 deletions client/driver/raw_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ func (h *rawExecHandle) WaitCh() chan *cstructs.WaitResult {
}

func (h *rawExecHandle) Update(task *structs.Task) error {
// Store the updated kill timeout.
h.killTimeout = task.KillTimeout

// Update is not possible
return nil
}
Expand Down
3 changes: 3 additions & 0 deletions client/driver/rkt.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,9 @@ func (h *rktHandle) WaitCh() chan *cstructs.WaitResult {
}

func (h *rktHandle) Update(task *structs.Task) error {
// Store the updated kill timeout.
h.killTimeout = task.KillTimeout

// Update is not possible
return nil
}
Expand Down
14 changes: 14 additions & 0 deletions client/restarts.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package client

import (
"math/rand"
"sync"
"time"

"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -29,9 +30,22 @@ type RestartTracker struct {
startTime time.Time // When the interval began
policy *structs.RestartPolicy
rand *rand.Rand
lock sync.Mutex
}

// SetPolicy updates the policy used to determine restarts.
func (r *RestartTracker) SetPolicy(policy *structs.RestartPolicy) {
r.lock.Lock()
defer r.lock.Unlock()
r.policy = policy
}

// NextRestart takes the exit code from the last attempt and returns whether the
// task should be restarted and the duration to wait.
func (r *RestartTracker) NextRestart(exitCode int) (bool, time.Duration) {
r.lock.Lock()
defer r.lock.Unlock()

// Hot path if no attempts are expected
if r.policy.Attempts == 0 {
return false, 0
Expand Down
67 changes: 58 additions & 9 deletions client/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type TaskRunner struct {
consulService *ConsulService

task *structs.Task
updateCh chan *structs.Task
updateCh chan *structs.Allocation
handle driver.DriverHandle

destroy bool
Expand All @@ -52,7 +52,15 @@ type TaskStateUpdater func(taskName, state string, event *structs.TaskEvent)
func NewTaskRunner(logger *log.Logger, config *config.Config,
updater TaskStateUpdater, ctx *driver.ExecContext,
alloc *structs.Allocation, task *structs.Task,
restartTracker *RestartTracker, consulService *ConsulService) *TaskRunner {
consulService *ConsulService) *TaskRunner {

// Build the restart tracker.
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
if tg == nil {
logger.Printf("[ERR] client: alloc '%s' for missing task group '%s'", alloc.ID, alloc.TaskGroup)
return nil
}
restartTracker := newRestartTracker(tg.RestartPolicy, alloc.Job.Type)

tc := &TaskRunner{
config: config,
Expand All @@ -63,7 +71,7 @@ func NewTaskRunner(logger *log.Logger, config *config.Config,
ctx: ctx,
alloc: alloc,
task: task,
updateCh: make(chan *structs.Task, 8),
updateCh: make(chan *structs.Allocation, 8),
destroyCh: make(chan struct{}),
waitCh: make(chan struct{}),
}
Expand Down Expand Up @@ -231,10 +239,8 @@ func (r *TaskRunner) run() {
case waitRes = <-r.handle.WaitCh():
break OUTER
case update := <-r.updateCh:
// Update
r.task = update
if err := r.handle.Update(update); err != nil {
r.logger.Printf("[ERR] client: failed to update task '%s' for alloc '%s': %v", r.task.Name, r.alloc.ID, err)
if err := r.handleUpdate(update); err != nil {
r.logger.Printf("[ERR] client: update to task %q failed: %v", r.task.Name, err)
}
case <-r.destroyCh:
// Avoid destroying twice
Expand Down Expand Up @@ -303,6 +309,49 @@ func (r *TaskRunner) run() {
return
}

// handleUpdate takes an updated allocation and updates internal state to
// reflect the new config for the task.
func (r *TaskRunner) handleUpdate(update *structs.Allocation) error {
// Extract the task group from the alloc.
tg := update.Job.LookupTaskGroup(update.TaskGroup)
if tg == nil {
return fmt.Errorf("alloc '%s' missing task group '%s'", update.ID, update.TaskGroup)
}

// Extract the task.
var task *structs.Task
for _, t := range tg.Tasks {
if t.Name == r.task.Name {
task = t
}
}
if task == nil {
return fmt.Errorf("task group %q doesn't contain task %q", tg.Name, r.task.Name)
}
r.task = task

// Update will update resources and store the new kill timeout.
if r.handle != nil {
if err := r.handle.Update(task); err != nil {
r.logger.Printf("[ERR] client: failed to update task '%s' for alloc '%s': %v", r.task.Name, r.alloc.ID, err)
}
}

// Update the restart policy.
if r.restartTracker != nil {
r.restartTracker.SetPolicy(tg.RestartPolicy)
}

/* TODO
// Re-register the task to consul and store the updated alloc.
r.consulService.Deregister(r.task, r.alloc)
r.alloc = update
r.consulService.Register(r.task, r.alloc)
*/

return nil
}

// Helper function for converting a WaitResult into a TaskTerminated event.
func (r *TaskRunner) waitErrorToEvent(res *cstructs.WaitResult) *structs.TaskEvent {
return structs.NewTaskEvent(structs.TaskTerminated).
Expand All @@ -312,12 +361,12 @@ func (r *TaskRunner) waitErrorToEvent(res *cstructs.WaitResult) *structs.TaskEve
}

// Update is used to update the task of the context
func (r *TaskRunner) Update(update *structs.Task) {
func (r *TaskRunner) Update(update *structs.Allocation) {
select {
case r.updateCh <- update:
default:
r.logger.Printf("[ERR] client: dropping task update '%s' (alloc '%s')",
update.Name, r.alloc.ID)
r.task.Name, r.alloc.ID)
}
}

Expand Down
Loading

0 comments on commit cef6bff

Please sign in to comment.