Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client handles updates to KillTimeout and Restart Policy #751

Merged
merged 3 commits into from
Feb 4, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 10 additions & 19 deletions client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,11 @@ type AllocRunner struct {

dirtyCh chan struct{}

ctx *driver.ExecContext
tasks map[string]*TaskRunner
taskStates map[string]*structs.TaskState
restored map[string]struct{}
RestartPolicy *structs.RestartPolicy
taskLock sync.RWMutex
ctx *driver.ExecContext
tasks map[string]*TaskRunner
taskStates map[string]*structs.TaskState
restored map[string]struct{}
taskLock sync.RWMutex

taskStatusLock sync.RWMutex

Expand All @@ -62,7 +61,6 @@ type allocRunnerState struct {
Alloc *structs.Allocation
AllocClientStatus string
AllocClientDescription string
RestartPolicy *structs.RestartPolicy
TaskStates map[string]*structs.TaskState
Context *driver.ExecContext
}
Expand Down Expand Up @@ -102,7 +100,6 @@ func (r *AllocRunner) RestoreState() error {

// Restore fields
r.alloc = snap.Alloc
r.RestartPolicy = snap.RestartPolicy
r.ctx = snap.Context
r.allocClientStatus = snap.AllocClientStatus
r.allocClientDescription = snap.AllocClientDescription
Expand All @@ -115,9 +112,8 @@ func (r *AllocRunner) RestoreState() error {
r.restored[name] = struct{}{}

task := &structs.Task{Name: name}
restartTracker := newRestartTracker(r.RestartPolicy, r.alloc.Job.Type)
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx,
r.alloc, task, restartTracker, r.consulService)
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx, r.alloc,
task, r.consulService)
r.tasks[name] = tr

// Skip tasks in terminal states.
Expand Down Expand Up @@ -163,7 +159,6 @@ func (r *AllocRunner) saveAllocRunnerState() error {
defer r.allocLock.Unlock()
snap := allocRunnerState{
Alloc: r.alloc,
RestartPolicy: r.RestartPolicy,
Context: r.ctx,
AllocClientStatus: r.allocClientStatus,
AllocClientDescription: r.allocClientDescription,
Expand Down Expand Up @@ -347,9 +342,6 @@ func (r *AllocRunner) Run() {
return
}

// Extract the RestartPolicy from the TG and set it on the alloc
r.RestartPolicy = tg.RestartPolicy

// Create the execution context
if r.ctx == nil {
allocDir := allocdir.NewAllocDir(filepath.Join(r.config.AllocDir, r.alloc.ID))
Expand All @@ -370,9 +362,8 @@ func (r *AllocRunner) Run() {

// Merge in the task resources
task.Resources = alloc.TaskResources[task.Name]
restartTracker := newRestartTracker(r.RestartPolicy, r.alloc.Job.Type)
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx,
r.alloc, task, restartTracker, r.consulService)
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx, r.alloc,
task, r.consulService)
r.tasks[task.Name] = tr
go tr.Run()
}
Expand Down Expand Up @@ -413,7 +404,7 @@ OUTER:
break FOUND
}
}
tr.Update(task)
tr.Update(update)
}
r.taskLock.RUnlock()

Expand Down
8 changes: 6 additions & 2 deletions client/alloc_runner_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package client

import (
"fmt"
"os"
"testing"
"time"
Expand Down Expand Up @@ -49,10 +50,13 @@ func TestAllocRunner_SimpleRun(t *testing.T) {

testutil.WaitForResult(func() (bool, error) {
if upd.Count == 0 {
return false, nil
return false, fmt.Errorf("No updates")
}
last := upd.Allocs[upd.Count-1]
return last.ClientStatus == structs.AllocClientStatusDead, nil
if last.ClientStatus == structs.AllocClientStatusDead {
return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusDead)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
Expand Down
3 changes: 3 additions & 0 deletions client/driver/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,9 @@ func (h *DockerHandle) WaitCh() chan *cstructs.WaitResult {
}

func (h *DockerHandle) Update(task *structs.Task) error {
// Store the updated kill timeout.
h.killTimeout = task.KillTimeout

// Update is not possible
return nil
}
Expand Down
3 changes: 2 additions & 1 deletion client/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ type DriverHandle interface {
// WaitCh is used to return a channel used wait for task completion
WaitCh() chan *cstructs.WaitResult

// Update is used to update the task if possible
// Update is used to update the task if possible and update task related
// configurations.
Update(task *structs.Task) error

// Kill is used to stop the task
Expand Down
3 changes: 3 additions & 0 deletions client/driver/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@ func (h *execHandle) WaitCh() chan *cstructs.WaitResult {
}

func (h *execHandle) Update(task *structs.Task) error {
// Store the updated kill timeout.
h.killTimeout = task.KillTimeout

// Update is not possible
return nil
}
Expand Down
3 changes: 3 additions & 0 deletions client/driver/java.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,9 @@ func (h *javaHandle) WaitCh() chan *cstructs.WaitResult {
}

func (h *javaHandle) Update(task *structs.Task) error {
// Store the updated kill timeout.
h.killTimeout = task.KillTimeout

// Update is not possible
return nil
}
Expand Down
3 changes: 3 additions & 0 deletions client/driver/qemu.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,9 @@ func (h *qemuHandle) WaitCh() chan *cstructs.WaitResult {
}

func (h *qemuHandle) Update(task *structs.Task) error {
// Store the updated kill timeout.
h.killTimeout = task.KillTimeout

// Update is not possible
return nil
}
Expand Down
3 changes: 3 additions & 0 deletions client/driver/raw_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ func (h *rawExecHandle) WaitCh() chan *cstructs.WaitResult {
}

func (h *rawExecHandle) Update(task *structs.Task) error {
// Store the updated kill timeout.
h.killTimeout = task.KillTimeout

// Update is not possible
return nil
}
Expand Down
3 changes: 3 additions & 0 deletions client/driver/rkt.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,9 @@ func (h *rktHandle) WaitCh() chan *cstructs.WaitResult {
}

func (h *rktHandle) Update(task *structs.Task) error {
// Store the updated kill timeout.
h.killTimeout = task.KillTimeout

// Update is not possible
return nil
}
Expand Down
14 changes: 14 additions & 0 deletions client/restarts.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package client

import (
"math/rand"
"sync"
"time"

"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -29,9 +30,22 @@ type RestartTracker struct {
startTime time.Time // When the interval began
policy *structs.RestartPolicy
rand *rand.Rand
lock sync.Mutex
}

// SetPolicy updates the policy used to determine restarts.
func (r *RestartTracker) SetPolicy(policy *structs.RestartPolicy) {
r.lock.Lock()
defer r.lock.Unlock()
r.policy = policy
}

// NextRestart takes the exit code from the last attempt and returns whether the
// task should be restarted and the duration to wait.
func (r *RestartTracker) NextRestart(exitCode int) (bool, time.Duration) {
r.lock.Lock()
defer r.lock.Unlock()

// Hot path if no attempts are expected
if r.policy.Attempts == 0 {
return false, 0
Expand Down
67 changes: 58 additions & 9 deletions client/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type TaskRunner struct {
consulService *ConsulService

task *structs.Task
updateCh chan *structs.Task
updateCh chan *structs.Allocation
handle driver.DriverHandle

destroy bool
Expand All @@ -52,7 +52,15 @@ type TaskStateUpdater func(taskName, state string, event *structs.TaskEvent)
func NewTaskRunner(logger *log.Logger, config *config.Config,
updater TaskStateUpdater, ctx *driver.ExecContext,
alloc *structs.Allocation, task *structs.Task,
restartTracker *RestartTracker, consulService *ConsulService) *TaskRunner {
consulService *ConsulService) *TaskRunner {

// Build the restart tracker.
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
if tg == nil {
logger.Printf("[ERR] client: alloc '%s' for missing task group '%s'", alloc.ID, alloc.TaskGroup)
return nil
}
restartTracker := newRestartTracker(tg.RestartPolicy, alloc.Job.Type)

tc := &TaskRunner{
config: config,
Expand All @@ -63,7 +71,7 @@ func NewTaskRunner(logger *log.Logger, config *config.Config,
ctx: ctx,
alloc: alloc,
task: task,
updateCh: make(chan *structs.Task, 8),
updateCh: make(chan *structs.Allocation, 8),
destroyCh: make(chan struct{}),
waitCh: make(chan struct{}),
}
Expand Down Expand Up @@ -231,10 +239,8 @@ func (r *TaskRunner) run() {
case waitRes = <-r.handle.WaitCh():
break OUTER
case update := <-r.updateCh:
// Update
r.task = update
if err := r.handle.Update(update); err != nil {
r.logger.Printf("[ERR] client: failed to update task '%s' for alloc '%s': %v", r.task.Name, r.alloc.ID, err)
if err := r.handleUpdate(update); err != nil {
r.logger.Printf("[ERR] client: update to task %q failed: %v", r.task.Name, err)
}
case <-r.destroyCh:
// Avoid destroying twice
Expand Down Expand Up @@ -303,6 +309,49 @@ func (r *TaskRunner) run() {
return
}

// handleUpdate takes an updated allocation and updates internal state to
// reflect the new config for the task.
func (r *TaskRunner) handleUpdate(update *structs.Allocation) error {
// Extract the task group from the alloc.
tg := update.Job.LookupTaskGroup(update.TaskGroup)
if tg == nil {
return fmt.Errorf("alloc '%s' missing task group '%s'", update.ID, update.TaskGroup)
}

// Extract the task.
var task *structs.Task
for _, t := range tg.Tasks {
if t.Name == r.task.Name {
task = t
}
}
if task == nil {
return fmt.Errorf("task group %q doesn't contain task %q", tg.Name, r.task.Name)
}
r.task = task

// Update will update resources and store the new kill timeout.
if r.handle != nil {
if err := r.handle.Update(task); err != nil {
r.logger.Printf("[ERR] client: failed to update task '%s' for alloc '%s': %v", r.task.Name, r.alloc.ID, err)
}
}

// Update the restart policy.
if r.restartTracker != nil {
r.restartTracker.SetPolicy(tg.RestartPolicy)
}

/* TODO
// Re-register the task to consul and store the updated alloc.
r.consulService.Deregister(r.task, r.alloc)
r.alloc = update
r.consulService.Register(r.task, r.alloc)
*/

return nil
}

// Helper function for converting a WaitResult into a TaskTerminated event.
func (r *TaskRunner) waitErrorToEvent(res *cstructs.WaitResult) *structs.TaskEvent {
return structs.NewTaskEvent(structs.TaskTerminated).
Expand All @@ -312,12 +361,12 @@ func (r *TaskRunner) waitErrorToEvent(res *cstructs.WaitResult) *structs.TaskEve
}

// Update is used to update the task of the context
func (r *TaskRunner) Update(update *structs.Task) {
func (r *TaskRunner) Update(update *structs.Allocation) {
select {
case r.updateCh <- update:
default:
r.logger.Printf("[ERR] client: dropping task update '%s' (alloc '%s')",
update.Name, r.alloc.ID)
r.task.Name, r.alloc.ID)
}
}

Expand Down
Loading