Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More restart policy options and consolidate batch/service restart tracker #594

Merged
merged 10 commits into from
Dec 18, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion api/compose_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ func TestCompose(t *testing.T) {
Operand: "=",
},
},
RestartPolicy: NewRestartPolicy(),
Tasks: []*Task{
&Task{
Name: "task1",
Expand Down
22 changes: 7 additions & 15 deletions api/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,11 @@ import (
// RestartPolicy defines how the Nomad client restarts
// tasks in a taskgroup when they fail
type RestartPolicy struct {
Interval time.Duration
Attempts int
Delay time.Duration
}

func NewRestartPolicy() *RestartPolicy {
return &RestartPolicy{
Attempts: 10,
Interval: 3 * time.Minute,
Delay: 5 * time.Second,
}
Interval time.Duration
Attempts int
Delay time.Duration
RestartOnSuccess bool
Mode string
}

// The ServiceCheck data model represents the consul health check that
Expand Down Expand Up @@ -54,11 +48,9 @@ type TaskGroup struct {

// NewTaskGroup creates a new TaskGroup.
func NewTaskGroup(name string, count int) *TaskGroup {
restartPolicy := NewRestartPolicy()
return &TaskGroup{
Name: name,
Count: count,
RestartPolicy: restartPolicy,
Name: name,
Count: count,
}
}

Expand Down
5 changes: 2 additions & 3 deletions api/tasks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@ import (
func TestTaskGroup_NewTaskGroup(t *testing.T) {
grp := NewTaskGroup("grp1", 2)
expect := &TaskGroup{
Name: "grp1",
Count: 2,
RestartPolicy: NewRestartPolicy(),
Name: "grp1",
Count: 2,
}
if !reflect.DeepEqual(grp, expect) {
t.Fatalf("expect: %#v, got: %#v", expect, grp)
Expand Down
4 changes: 2 additions & 2 deletions client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (r *AllocRunner) RestoreState() error {
r.restored[name] = struct{}{}

task := &structs.Task{Name: name}
restartTracker := newRestartTracker(r.alloc.Job.Type, r.RestartPolicy)
restartTracker := newRestartTracker(r.RestartPolicy)
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx,
r.alloc, task, r.alloc.TaskStates[task.Name], restartTracker,
r.consulService)
Expand Down Expand Up @@ -322,7 +322,7 @@ func (r *AllocRunner) Run() {

// Merge in the task resources
task.Resources = alloc.TaskResources[task.Name]
restartTracker := newRestartTracker(r.alloc.Job.Type, r.RestartPolicy)
restartTracker := newRestartTracker(r.RestartPolicy)
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx,
r.alloc, task, r.alloc.TaskStates[task.Name], restartTracker,
r.consulService)
Expand Down
110 changes: 46 additions & 64 deletions client/restarts.go
Original file line number Diff line number Diff line change
@@ -1,89 +1,71 @@
package client

import (
"math/rand"
"time"

"github.com/hashicorp/nomad/nomad/structs"
)

// The errorCounter keeps track of the number of times a process has exited
// It returns the duration after which a task is restarted
// For Batch jobs, the interval is set to zero value since the takss
// will be restarted only upto maxAttempts times
type restartTracker interface {
nextRestart(exitCode int) (bool, time.Duration)
}
// jitter is the percent of jitter added to restart delays.
const jitter = 0.25

func newRestartTracker(jobType string, restartPolicy *structs.RestartPolicy) restartTracker {
switch jobType {
case structs.JobTypeService:
return &serviceRestartTracker{
maxAttempts: restartPolicy.Attempts,
startTime: time.Now(),
interval: restartPolicy.Interval,
delay: restartPolicy.Delay,
}
default:
return &batchRestartTracker{
maxAttempts: restartPolicy.Attempts,
delay: restartPolicy.Delay,
}
func newRestartTracker(policy *structs.RestartPolicy) *RestartTracker {
return &RestartTracker{
startTime: time.Now(),
policy: policy,
rand: rand.New(rand.NewSource(time.Now().Unix())),
}
}

// noRestartsTracker returns a RestartTracker that never restarts.
func noRestartsTracker() restartTracker {
return &batchRestartTracker{maxAttempts: 0}
type RestartTracker struct {
count int // Current number of attempts.
startTime time.Time // When the interval began
policy *structs.RestartPolicy
rand *rand.Rand
}

type batchRestartTracker struct {
maxAttempts int
delay time.Duration

count int
}
func (r *RestartTracker) NextRestart(exitCode int) (bool, time.Duration) {
// Check if we have entered a new interval.
end := r.startTime.Add(r.policy.Interval)
now := time.Now()
if now.After(end) {
r.count = 0
r.startTime = now
return true, r.jitter()
}

func (b *batchRestartTracker) increment() {
b.count += 1
}
r.count++

func (b *batchRestartTracker) nextRestart(exitCode int) (bool, time.Duration) {
if b.count < b.maxAttempts && exitCode > 0 {
b.increment()
return true, b.delay
// If we are under the attempts, restart with delay.
if r.count <= r.policy.Attempts {
return r.shouldRestart(exitCode), r.jitter()
}
return false, 0
}

type serviceRestartTracker struct {
maxAttempts int
delay time.Duration
interval time.Duration
// Don't restart since mode is "fail"
if r.policy.Mode == structs.RestartPolicyModeFail {
return false, 0
}

count int
startTime time.Time
// Apply an artifical wait to enter the next interval
return r.shouldRestart(exitCode), end.Sub(now)
}

func (s *serviceRestartTracker) increment() {
s.count += 1
// shouldRestart returns whether a restart should occur based on the exit code
// and the RestartOnSuccess configuration.
func (r *RestartTracker) shouldRestart(exitCode int) bool {
return exitCode != 0 || r.policy.RestartOnSuccess
}

func (s *serviceRestartTracker) nextRestart(exitCode int) (bool, time.Duration) {
defer s.increment()
windowEndTime := s.startTime.Add(s.interval)
now := time.Now()
// If the window of restart is over we wait until the delay duration
if now.After(windowEndTime) {
s.count = 0
s.startTime = time.Now()
return true, s.delay
}

// If we are within the delay duration and didn't exhaust all retries
if s.count < s.maxAttempts {
return true, s.delay
}
// jitter returns the delay time plus a jitter.
func (r *RestartTracker) jitter() time.Duration {
d := r.policy.Delay.Nanoseconds()
j := float64(r.rand.Int63n(d)) * jitter
return time.Duration(d + int64(j))
}

// If we exhausted all the retries and are withing the time window
return true, windowEndTime.Sub(now)
// Returns a tracker that never restarts.
func noRestartsTracker() *RestartTracker {
policy := &structs.RestartPolicy{Attempts: 0, Mode: structs.RestartPolicyModeFail}
return newRestartTracker(policy)
}
99 changes: 51 additions & 48 deletions client/restarts_test.go
Original file line number Diff line number Diff line change
@@ -1,78 +1,81 @@
package client

import (
"github.com/hashicorp/nomad/nomad/structs"
"testing"
"time"

"github.com/hashicorp/nomad/nomad/structs"
)

func TestTaskRunner_ServiceRestartCounter(t *testing.T) {
interval := 2 * time.Minute
delay := 1 * time.Second
attempts := 3
rt := newRestartTracker(structs.JobTypeService, &structs.RestartPolicy{Attempts: attempts, Interval: interval, Delay: delay})
func testPolicy(success bool, mode string) *structs.RestartPolicy {
return &structs.RestartPolicy{
Interval: 2 * time.Minute,
Delay: 1 * time.Second,
Attempts: 3,
Mode: mode,
RestartOnSuccess: success,
}
}

// withinJitter is a helper that returns whether the returned delay is within
// the jitter.
func withinJitter(expected, actual time.Duration) bool {
return float64((actual.Nanoseconds()-expected.Nanoseconds())/
expected.Nanoseconds()) <= jitter
}

for i := 0; i < attempts; i++ {
actual, when := rt.nextRestart(127)
func TestClient_RestartTracker_ModeDelay(t *testing.T) {
t.Parallel()
p := testPolicy(true, structs.RestartPolicyModeDelay)
rt := newRestartTracker(p)
for i := 0; i < p.Attempts; i++ {
actual, when := rt.NextRestart(127)
if !actual {
t.Fatalf("should restart returned %v, actual %v", actual, true)
t.Fatalf("NextRestart() returned %v, want %v", actual, true)
}
if when != delay {
t.Fatalf("nextRestart() returned %v; want %v", when, delay)
if !withinJitter(p.Delay, when) {
t.Fatalf("NextRestart() returned %v; want %v+jitter", when, p.Delay)
}
}

time.Sleep(1 * time.Second)
// Follow up restarts should cause delay.
for i := 0; i < 3; i++ {
actual, when := rt.nextRestart(127)
actual, when := rt.NextRestart(127)
if !actual {
t.Fail()
}
if !(when > delay && when < interval) {
t.Fatalf("nextRestart() returned %v; want less than %v and more than %v", when, interval, delay)
if !(when > p.Delay && when < p.Interval) {
t.Fatalf("NextRestart() returned %v; want less than %v and more than %v", when, p.Interval, p.Delay)
}
}

}

func TestTaskRunner_BatchRestartCounter(t *testing.T) {
attempts := 2
interval := 1 * time.Second
delay := 1 * time.Second
rt := newRestartTracker(structs.JobTypeBatch,
&structs.RestartPolicy{Attempts: attempts,
Interval: interval,
Delay: delay,
},
)
for i := 0; i < attempts; i++ {
shouldRestart, when := rt.nextRestart(127)
if !shouldRestart {
t.Fatalf("should restart returned %v, actual %v", shouldRestart, true)
func TestClient_RestartTracker_ModeFail(t *testing.T) {
t.Parallel()
p := testPolicy(true, structs.RestartPolicyModeFail)
rt := newRestartTracker(p)
for i := 0; i < p.Attempts; i++ {
actual, when := rt.NextRestart(127)
if !actual {
t.Fatalf("NextRestart() returned %v, want %v", actual, true)
}
if when != delay {
t.Fatalf("Delay should be %v, actual: %v", delay, when)
if !withinJitter(p.Delay, when) {
t.Fatalf("NextRestart() returned %v; want %v+jitter", when, p.Delay)
}
}
actual, _ := rt.nextRestart(1)
if actual {
t.Fatalf("Expect %v, Actual: %v", false, actual)

// Next restart should cause fail
if actual, _ := rt.NextRestart(127); actual {
t.Fail()
}
}

func TestTaskRunner_BatchRestartOnSuccess(t *testing.T) {
attempts := 2
interval := 1 * time.Second
delay := 1 * time.Second
rt := newRestartTracker(structs.JobTypeBatch,
&structs.RestartPolicy{Attempts: attempts,
Interval: interval,
Delay: delay,
},
)
shouldRestart, _ := rt.nextRestart(0)
if shouldRestart {
t.Fatalf("should restart returned %v, expected: %v", shouldRestart, false)
func TestClient_RestartTracker_NoRestartOnSuccess(t *testing.T) {
t.Parallel()
p := testPolicy(false, structs.RestartPolicyModeDelay)
rt := newRestartTracker(p)
if shouldRestart, _ := rt.NextRestart(0); shouldRestart {
t.Fatalf("NextRestart() returned %v, expected: %v", shouldRestart, false)
}

}
6 changes: 3 additions & 3 deletions client/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type TaskRunner struct {
logger *log.Logger
ctx *driver.ExecContext
alloc *structs.Allocation
restartTracker restartTracker
restartTracker *RestartTracker
consulService *ConsulService

task *structs.Task
Expand Down Expand Up @@ -53,7 +53,7 @@ type TaskStateUpdater func(taskName string)
func NewTaskRunner(logger *log.Logger, config *config.Config,
updater TaskStateUpdater, ctx *driver.ExecContext,
alloc *structs.Allocation, task *structs.Task, state *structs.TaskState,
restartTracker restartTracker, consulService *ConsulService) *TaskRunner {
restartTracker *RestartTracker, consulService *ConsulService) *TaskRunner {

tc := &TaskRunner{
config: config,
Expand Down Expand Up @@ -280,7 +280,7 @@ func (r *TaskRunner) run() {
}

// Check if we should restart. If not mark task as dead and exit.
shouldRestart, when := r.restartTracker.nextRestart(waitRes.ExitCode)
shouldRestart, when := r.restartTracker.NextRestart(waitRes.ExitCode)
waitEvent := r.waitErrorToEvent(waitRes)
if !shouldRestart {
r.logger.Printf("[INFO] client: Not restarting task: %v for alloc: %v ", r.task.Name, r.alloc.ID)
Expand Down
2 changes: 1 addition & 1 deletion client/task_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func testTaskRunner(restarts bool) (*MockTaskStateUpdater, *TaskRunner) {

ctx := driver.NewExecContext(allocDir, alloc.ID)
rp := structs.NewRestartPolicy(structs.JobTypeService)
restartTracker := newRestartTracker(structs.JobTypeService, rp)
restartTracker := newRestartTracker(rp)
if !restarts {
restartTracker = noRestartsTracker()
}
Expand Down
Loading