Skip to content

Commit

Permalink
Merge pull request #594 from hashicorp/f-restart-policy
Browse files Browse the repository at this point in the history
More restart policy options and consolidate batch/service restart tracker
  • Loading branch information
dadgar committed Dec 18, 2015
2 parents c72bbb7 + 659e822 commit e410b92
Show file tree
Hide file tree
Showing 21 changed files with 267 additions and 228 deletions.
1 change: 0 additions & 1 deletion api/compose_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ func TestCompose(t *testing.T) {
Operand: "=",
},
},
RestartPolicy: NewRestartPolicy(),
Tasks: []*Task{
&Task{
Name: "task1",
Expand Down
22 changes: 7 additions & 15 deletions api/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,11 @@ import (
// RestartPolicy defines how the Nomad client restarts
// tasks in a taskgroup when they fail
type RestartPolicy struct {
Interval time.Duration
Attempts int
Delay time.Duration
}

func NewRestartPolicy() *RestartPolicy {
return &RestartPolicy{
Attempts: 10,
Interval: 3 * time.Minute,
Delay: 5 * time.Second,
}
Interval time.Duration
Attempts int
Delay time.Duration
RestartOnSuccess bool
Mode string
}

// The ServiceCheck data model represents the consul health check that
Expand Down Expand Up @@ -54,11 +48,9 @@ type TaskGroup struct {

// NewTaskGroup creates a new TaskGroup.
func NewTaskGroup(name string, count int) *TaskGroup {
restartPolicy := NewRestartPolicy()
return &TaskGroup{
Name: name,
Count: count,
RestartPolicy: restartPolicy,
Name: name,
Count: count,
}
}

Expand Down
5 changes: 2 additions & 3 deletions api/tasks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@ import (
func TestTaskGroup_NewTaskGroup(t *testing.T) {
grp := NewTaskGroup("grp1", 2)
expect := &TaskGroup{
Name: "grp1",
Count: 2,
RestartPolicy: NewRestartPolicy(),
Name: "grp1",
Count: 2,
}
if !reflect.DeepEqual(grp, expect) {
t.Fatalf("expect: %#v, got: %#v", expect, grp)
Expand Down
4 changes: 2 additions & 2 deletions client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (r *AllocRunner) RestoreState() error {
r.restored[name] = struct{}{}

task := &structs.Task{Name: name}
restartTracker := newRestartTracker(r.alloc.Job.Type, r.RestartPolicy)
restartTracker := newRestartTracker(r.RestartPolicy)
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx,
r.alloc, task, r.alloc.TaskStates[task.Name], restartTracker,
r.consulService)
Expand Down Expand Up @@ -322,7 +322,7 @@ func (r *AllocRunner) Run() {

// Merge in the task resources
task.Resources = alloc.TaskResources[task.Name]
restartTracker := newRestartTracker(r.alloc.Job.Type, r.RestartPolicy)
restartTracker := newRestartTracker(r.RestartPolicy)
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx,
r.alloc, task, r.alloc.TaskStates[task.Name], restartTracker,
r.consulService)
Expand Down
110 changes: 46 additions & 64 deletions client/restarts.go
Original file line number Diff line number Diff line change
@@ -1,89 +1,71 @@
package client

import (
"math/rand"
"time"

"github.com/hashicorp/nomad/nomad/structs"
)

// The errorCounter keeps track of the number of times a process has exited
// It returns the duration after which a task is restarted
// For Batch jobs, the interval is set to zero value since the takss
// will be restarted only upto maxAttempts times
type restartTracker interface {
nextRestart(exitCode int) (bool, time.Duration)
}
// jitter is the percent of jitter added to restart delays.
const jitter = 0.25

func newRestartTracker(jobType string, restartPolicy *structs.RestartPolicy) restartTracker {
switch jobType {
case structs.JobTypeService:
return &serviceRestartTracker{
maxAttempts: restartPolicy.Attempts,
startTime: time.Now(),
interval: restartPolicy.Interval,
delay: restartPolicy.Delay,
}
default:
return &batchRestartTracker{
maxAttempts: restartPolicy.Attempts,
delay: restartPolicy.Delay,
}
func newRestartTracker(policy *structs.RestartPolicy) *RestartTracker {
return &RestartTracker{
startTime: time.Now(),
policy: policy,
rand: rand.New(rand.NewSource(time.Now().Unix())),
}
}

// noRestartsTracker returns a RestartTracker that never restarts.
func noRestartsTracker() restartTracker {
return &batchRestartTracker{maxAttempts: 0}
type RestartTracker struct {
count int // Current number of attempts.
startTime time.Time // When the interval began
policy *structs.RestartPolicy
rand *rand.Rand
}

type batchRestartTracker struct {
maxAttempts int
delay time.Duration

count int
}
func (r *RestartTracker) NextRestart(exitCode int) (bool, time.Duration) {
// Check if we have entered a new interval.
end := r.startTime.Add(r.policy.Interval)
now := time.Now()
if now.After(end) {
r.count = 0
r.startTime = now
return true, r.jitter()
}

func (b *batchRestartTracker) increment() {
b.count += 1
}
r.count++

func (b *batchRestartTracker) nextRestart(exitCode int) (bool, time.Duration) {
if b.count < b.maxAttempts && exitCode > 0 {
b.increment()
return true, b.delay
// If we are under the attempts, restart with delay.
if r.count <= r.policy.Attempts {
return r.shouldRestart(exitCode), r.jitter()
}
return false, 0
}

type serviceRestartTracker struct {
maxAttempts int
delay time.Duration
interval time.Duration
// Don't restart since mode is "fail"
if r.policy.Mode == structs.RestartPolicyModeFail {
return false, 0
}

count int
startTime time.Time
// Apply an artifical wait to enter the next interval
return r.shouldRestart(exitCode), end.Sub(now)
}

func (s *serviceRestartTracker) increment() {
s.count += 1
// shouldRestart returns whether a restart should occur based on the exit code
// and the RestartOnSuccess configuration.
func (r *RestartTracker) shouldRestart(exitCode int) bool {
return exitCode != 0 || r.policy.RestartOnSuccess
}

func (s *serviceRestartTracker) nextRestart(exitCode int) (bool, time.Duration) {
defer s.increment()
windowEndTime := s.startTime.Add(s.interval)
now := time.Now()
// If the window of restart is over we wait until the delay duration
if now.After(windowEndTime) {
s.count = 0
s.startTime = time.Now()
return true, s.delay
}

// If we are within the delay duration and didn't exhaust all retries
if s.count < s.maxAttempts {
return true, s.delay
}
// jitter returns the delay time plus a jitter.
func (r *RestartTracker) jitter() time.Duration {
d := r.policy.Delay.Nanoseconds()
j := float64(r.rand.Int63n(d)) * jitter
return time.Duration(d + int64(j))
}

// If we exhausted all the retries and are withing the time window
return true, windowEndTime.Sub(now)
// Returns a tracker that never restarts.
func noRestartsTracker() *RestartTracker {
policy := &structs.RestartPolicy{Attempts: 0, Mode: structs.RestartPolicyModeFail}
return newRestartTracker(policy)
}
99 changes: 51 additions & 48 deletions client/restarts_test.go
Original file line number Diff line number Diff line change
@@ -1,78 +1,81 @@
package client

import (
"github.com/hashicorp/nomad/nomad/structs"
"testing"
"time"

"github.com/hashicorp/nomad/nomad/structs"
)

func TestTaskRunner_ServiceRestartCounter(t *testing.T) {
interval := 2 * time.Minute
delay := 1 * time.Second
attempts := 3
rt := newRestartTracker(structs.JobTypeService, &structs.RestartPolicy{Attempts: attempts, Interval: interval, Delay: delay})
func testPolicy(success bool, mode string) *structs.RestartPolicy {
return &structs.RestartPolicy{
Interval: 2 * time.Minute,
Delay: 1 * time.Second,
Attempts: 3,
Mode: mode,
RestartOnSuccess: success,
}
}

// withinJitter is a helper that returns whether the returned delay is within
// the jitter.
func withinJitter(expected, actual time.Duration) bool {
return float64((actual.Nanoseconds()-expected.Nanoseconds())/
expected.Nanoseconds()) <= jitter
}

for i := 0; i < attempts; i++ {
actual, when := rt.nextRestart(127)
func TestClient_RestartTracker_ModeDelay(t *testing.T) {
t.Parallel()
p := testPolicy(true, structs.RestartPolicyModeDelay)
rt := newRestartTracker(p)
for i := 0; i < p.Attempts; i++ {
actual, when := rt.NextRestart(127)
if !actual {
t.Fatalf("should restart returned %v, actual %v", actual, true)
t.Fatalf("NextRestart() returned %v, want %v", actual, true)
}
if when != delay {
t.Fatalf("nextRestart() returned %v; want %v", when, delay)
if !withinJitter(p.Delay, when) {
t.Fatalf("NextRestart() returned %v; want %v+jitter", when, p.Delay)
}
}

time.Sleep(1 * time.Second)
// Follow up restarts should cause delay.
for i := 0; i < 3; i++ {
actual, when := rt.nextRestart(127)
actual, when := rt.NextRestart(127)
if !actual {
t.Fail()
}
if !(when > delay && when < interval) {
t.Fatalf("nextRestart() returned %v; want less than %v and more than %v", when, interval, delay)
if !(when > p.Delay && when < p.Interval) {
t.Fatalf("NextRestart() returned %v; want less than %v and more than %v", when, p.Interval, p.Delay)
}
}

}

func TestTaskRunner_BatchRestartCounter(t *testing.T) {
attempts := 2
interval := 1 * time.Second
delay := 1 * time.Second
rt := newRestartTracker(structs.JobTypeBatch,
&structs.RestartPolicy{Attempts: attempts,
Interval: interval,
Delay: delay,
},
)
for i := 0; i < attempts; i++ {
shouldRestart, when := rt.nextRestart(127)
if !shouldRestart {
t.Fatalf("should restart returned %v, actual %v", shouldRestart, true)
func TestClient_RestartTracker_ModeFail(t *testing.T) {
t.Parallel()
p := testPolicy(true, structs.RestartPolicyModeFail)
rt := newRestartTracker(p)
for i := 0; i < p.Attempts; i++ {
actual, when := rt.NextRestart(127)
if !actual {
t.Fatalf("NextRestart() returned %v, want %v", actual, true)
}
if when != delay {
t.Fatalf("Delay should be %v, actual: %v", delay, when)
if !withinJitter(p.Delay, when) {
t.Fatalf("NextRestart() returned %v; want %v+jitter", when, p.Delay)
}
}
actual, _ := rt.nextRestart(1)
if actual {
t.Fatalf("Expect %v, Actual: %v", false, actual)

// Next restart should cause fail
if actual, _ := rt.NextRestart(127); actual {
t.Fail()
}
}

func TestTaskRunner_BatchRestartOnSuccess(t *testing.T) {
attempts := 2
interval := 1 * time.Second
delay := 1 * time.Second
rt := newRestartTracker(structs.JobTypeBatch,
&structs.RestartPolicy{Attempts: attempts,
Interval: interval,
Delay: delay,
},
)
shouldRestart, _ := rt.nextRestart(0)
if shouldRestart {
t.Fatalf("should restart returned %v, expected: %v", shouldRestart, false)
func TestClient_RestartTracker_NoRestartOnSuccess(t *testing.T) {
t.Parallel()
p := testPolicy(false, structs.RestartPolicyModeDelay)
rt := newRestartTracker(p)
if shouldRestart, _ := rt.NextRestart(0); shouldRestart {
t.Fatalf("NextRestart() returned %v, expected: %v", shouldRestart, false)
}

}
6 changes: 3 additions & 3 deletions client/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type TaskRunner struct {
logger *log.Logger
ctx *driver.ExecContext
alloc *structs.Allocation
restartTracker restartTracker
restartTracker *RestartTracker
consulService *ConsulService

task *structs.Task
Expand Down Expand Up @@ -53,7 +53,7 @@ type TaskStateUpdater func(taskName string)
func NewTaskRunner(logger *log.Logger, config *config.Config,
updater TaskStateUpdater, ctx *driver.ExecContext,
alloc *structs.Allocation, task *structs.Task, state *structs.TaskState,
restartTracker restartTracker, consulService *ConsulService) *TaskRunner {
restartTracker *RestartTracker, consulService *ConsulService) *TaskRunner {

tc := &TaskRunner{
config: config,
Expand Down Expand Up @@ -280,7 +280,7 @@ func (r *TaskRunner) run() {
}

// Check if we should restart. If not mark task as dead and exit.
shouldRestart, when := r.restartTracker.nextRestart(waitRes.ExitCode)
shouldRestart, when := r.restartTracker.NextRestart(waitRes.ExitCode)
waitEvent := r.waitErrorToEvent(waitRes)
if !shouldRestart {
r.logger.Printf("[INFO] client: Not restarting task: %v for alloc: %v ", r.task.Name, r.alloc.ID)
Expand Down
2 changes: 1 addition & 1 deletion client/task_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func testTaskRunner(restarts bool) (*MockTaskStateUpdater, *TaskRunner) {

ctx := driver.NewExecContext(allocDir, alloc.ID)
rp := structs.NewRestartPolicy(structs.JobTypeService)
restartTracker := newRestartTracker(structs.JobTypeService, rp)
restartTracker := newRestartTracker(rp)
if !restarts {
restartTracker = noRestartsTracker()
}
Expand Down
Loading

0 comments on commit e410b92

Please sign in to comment.