Skip to content

Commit

Permalink
Merge pull request #859 from hashicorp/f-driver-start-restarts
Browse files Browse the repository at this point in the history
client: Driver starting is included in restart policy.
  • Loading branch information
dadgar committed Feb 29, 2016
2 parents 559b787 + 343fddb commit 7007409
Show file tree
Hide file tree
Showing 11 changed files with 308 additions and 153 deletions.
4 changes: 4 additions & 0 deletions api/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,12 @@ type TaskState struct {

const (
TaskDriverFailure = "Driver Failure"
TaskReceived = "Received"
TaskStarted = "Started"
TaskTerminated = "Terminated"
TaskKilled = "Killed"
TaskRestarting = "Restarting"
TaskNotRestarting = "Restarts Exceeded"
)

// TaskEvent is an event that effects the state of a task and contains meta-data
Expand All @@ -163,4 +166,5 @@ type TaskEvent struct {
Signal int
Message string
KillError string
StartDelay int64
}
3 changes: 1 addition & 2 deletions client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,7 @@ func (r *AllocRunner) Alloc() *structs.Allocation {
case structs.TaskStatePending:
pending = true
case structs.TaskStateDead:
last := len(state.Events) - 1
if state.Events[last].Type == structs.TaskDriverFailure {
if state.Failed() {
failed = true
} else {
dead = true
Expand Down
42 changes: 18 additions & 24 deletions client/driver/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"os/exec"
"path/filepath"
"regexp"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -209,32 +210,9 @@ func (d *DockerDriver) createContainer(ctx *ExecContext, task *structs.Task,

hostConfig := &docker.HostConfig{
// Convert MB to bytes. This is an absolute value.
//
// This value represents the total amount of memory a process can use.
// Swap is added to total memory and is managed by the OS, not docker.
// Since this may cause other processes to swap and cause system
// instability, we will simply not use swap.
//
// See: https://www.kernel.org/doc/Documentation/cgroups/memory.txt
Memory: int64(task.Resources.MemoryMB) * 1024 * 1024,
MemorySwap: -1,
// Convert Mhz to shares. This is a relative value.
//
// There are two types of CPU limiters available: Shares and Quotas. A
// Share allows a particular process to have a proportion of CPU time
// relative to other processes; 1024 by default. A CPU Quota is enforced
// over a Period of time and is a HARD limit on the amount of CPU time a
// process can use. Processes with quotas cannot burst, while processes
// with shares can, so we'll use shares.
//
// The simplest scale is 1 share to 1 MHz so 1024 = 1GHz. This means any
// given process will have at least that amount of resources, but likely
// more since it is (probably) rare that the machine will run at 100%
// CPU. This scale will cease to work if a node is overprovisioned.
//
// See:
// - https://www.kernel.org/doc/Documentation/scheduler/sched-bwc.txt
// - https://www.kernel.org/doc/Documentation/scheduler/sched-design-CFS.txt
CPUShares: int64(task.Resources.CPU),

// Binds are used to mount a host volume into the container. We mount a
Expand Down Expand Up @@ -403,6 +381,22 @@ func (d *DockerDriver) createContainer(ctx *ExecContext, task *structs.Task,
}, nil
}

var (
// imageNotFoundMatcher is a regex expression that matches the image not
// found error Docker returns.
imageNotFoundMatcher = regexp.MustCompile(`Error: image .+ not found`)
)

// recoverablePullError wraps the error gotten when trying to pull and image if
// the error is recoverable.
func (d *DockerDriver) recoverablePullError(err error, image string) error {
recoverable := true
if imageNotFoundMatcher.MatchString(err.Error()) {
recoverable = false
}
return cstructs.NewRecoverableError(fmt.Errorf("Failed to pull `%s`: %s", image, err), recoverable)
}

func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) {
var driverConfig DockerDriverConfig
if err := mapstructure.WeakDecode(task.Config, &driverConfig); err != nil {
Expand Down Expand Up @@ -482,7 +476,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
err = client.PullImage(pullOptions, authOptions)
if err != nil {
d.logger.Printf("[ERR] driver.docker: failed pulling container %s:%s: %s", repo, tag, err)
return nil, fmt.Errorf("Failed to pull `%s`: %s", image, err)
return nil, d.recoverablePullError(err, image)
}
d.logger.Printf("[DEBUG] driver.docker: docker pull %s:%s succeeded", repo, tag)

Expand Down
3 changes: 1 addition & 2 deletions client/driver/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,7 @@ func (h *execHandle) run() {
h.logger.Printf("[ERR] driver.exec: unmounting dev,proc and alloc dirs failed: %v", e)
}
}
h.waitCh <- &cstructs.WaitResult{ExitCode: ps.ExitCode, Signal: 0,
Err: err}
h.waitCh <- cstructs.NewWaitResult(ps.ExitCode, 0, err)
close(h.waitCh)
h.pluginClient.Kill()
}
21 changes: 21 additions & 0 deletions client/driver/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package structs

import (
"fmt"

cgroupConfig "github.com/opencontainers/runc/libcontainer/configs"
)

Expand Down Expand Up @@ -34,3 +35,23 @@ func (r *WaitResult) String() string {
type IsolationConfig struct {
Cgroup *cgroupConfig.Cgroup
}

// RecoverableError wraps an error and marks whether it is recoverable and could
// be retried or it is fatal.
type RecoverableError struct {
Err error
Recoverable bool
}

// NewRecoverableError is used to wrap an error and mark it as recoverable or
// not.
func NewRecoverableError(e error, recoverable bool) *RecoverableError {
return &RecoverableError{
Err: e,
Recoverable: recoverable,
}
}

func (r *RecoverableError) Error() string {
return r.Err.Error()
}
100 changes: 82 additions & 18 deletions client/restarts.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sync"
"time"

cstructs "github.com/hashicorp/nomad/client/driver/structs"
"github.com/hashicorp/nomad/nomad/structs"
)

Expand All @@ -25,6 +26,8 @@ func newRestartTracker(policy *structs.RestartPolicy, jobType string) *RestartTr
}

type RestartTracker struct {
waitRes *cstructs.WaitResult
startErr error
count int // Current number of attempts.
onSuccess bool // Whether to restart on successful exit code.
startTime time.Time // When the interval began
Expand All @@ -40,46 +43,107 @@ func (r *RestartTracker) SetPolicy(policy *structs.RestartPolicy) {
r.policy = policy
}

// NextRestart takes the exit code from the last attempt and returns whether the
// task should be restarted and the duration to wait.
func (r *RestartTracker) NextRestart(exitCode int) (bool, time.Duration) {
// SetStartError is used to mark the most recent start error. If starting was
// successful the error should be nil.
func (r *RestartTracker) SetStartError(err error) *RestartTracker {
r.lock.Lock()
defer r.lock.Unlock()
r.startErr = err
return r
}

// SetWaitResult is used to mark the most recent wait result.
func (r *RestartTracker) SetWaitResult(res *cstructs.WaitResult) *RestartTracker {
r.lock.Lock()
defer r.lock.Unlock()
r.waitRes = res
return r
}

// GetState returns the tasks next state given the set exit code and start
// error. One of the following states are returned:
// * TaskRestarting - Task should be restarted
// * TaskNotRestarting - Task should not be restarted and has exceeded its
// restart policy.
// * TaskTerminated - Task has terminated successfully and does not need a
// restart.
//
// If TaskRestarting is returned, the duration is how long to wait until
// starting the task again.
func (r *RestartTracker) GetState() (string, time.Duration) {
r.lock.Lock()
defer r.lock.Unlock()

// Hot path if no attempts are expected
if r.policy.Attempts == 0 {
return false, 0
if r.waitRes != nil && r.waitRes.Successful() {
return structs.TaskTerminated, 0
}

return structs.TaskNotRestarting, 0
}

r.count++

// Check if we have entered a new interval.
end := r.startTime.Add(r.policy.Interval)
now := time.Now()
if now.After(end) {
r.count = 0
r.startTime = now
return r.shouldRestart(exitCode), r.jitter()
}

r.count++
if r.startErr != nil {
return r.handleStartError()
} else if r.waitRes != nil {
return r.handleWaitResult()
} else {
return "", 0
}
}

// If we are under the attempts, restart with delay.
if r.count <= r.policy.Attempts {
return r.shouldRestart(exitCode), r.jitter()
// handleStartError returns the new state and potential wait duration for
// restarting the task after it was not successfully started. On start errors,
// the restart policy is always treated as fail mode to ensure we don't
// infinitely try to start a task.
func (r *RestartTracker) handleStartError() (string, time.Duration) {
// If the error is not recoverable, do not restart.
if rerr, ok := r.startErr.(*cstructs.RecoverableError); !(ok && rerr.Recoverable) {
return structs.TaskNotRestarting, 0
}

// Don't restart since mode is "fail"
if r.policy.Mode == structs.RestartPolicyModeFail {
return false, 0
if r.count > r.policy.Attempts {
return structs.TaskNotRestarting, 0
}

// Apply an artifical wait to enter the next interval
return r.shouldRestart(exitCode), end.Sub(now)
return structs.TaskRestarting, r.jitter()
}

// shouldRestart returns whether a restart should occur based on the exit code
// and job type.
func (r *RestartTracker) shouldRestart(exitCode int) bool {
return exitCode != 0 || r.onSuccess
// handleWaitResult returns the new state and potential wait duration for
// restarting the task after it has exited.
func (r *RestartTracker) handleWaitResult() (string, time.Duration) {
// If the task started successfully and restart on success isn't specified,
// don't restart but don't mark as failed.
if r.waitRes.Successful() && !r.onSuccess {
return structs.TaskTerminated, 0
}

if r.count > r.policy.Attempts {
if r.policy.Mode == structs.RestartPolicyModeFail {
return structs.TaskNotRestarting, 0
} else {
return structs.TaskRestarting, r.getDelay()
}
}

return structs.TaskRestarting, r.jitter()
}

// getDelay returns the delay time to enter the next interval.
func (r *RestartTracker) getDelay() time.Duration {
end := r.startTime.Add(r.policy.Interval)
now := time.Now()
return end.Sub(now)
}

// jitter returns the delay time plus a jitter.
Expand Down
53 changes: 40 additions & 13 deletions client/restarts_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package client

import (
"fmt"
"testing"
"time"

cstructs "github.com/hashicorp/nomad/client/driver/structs"
"github.com/hashicorp/nomad/nomad/structs"
)

Expand All @@ -23,14 +25,18 @@ func withinJitter(expected, actual time.Duration) bool {
expected.Nanoseconds()) <= jitter
}

func testWaitResult(exit int) *cstructs.WaitResult {
return cstructs.NewWaitResult(exit, 0, nil)
}

func TestClient_RestartTracker_ModeDelay(t *testing.T) {
t.Parallel()
p := testPolicy(true, structs.RestartPolicyModeDelay)
rt := newRestartTracker(p, structs.JobTypeService)
for i := 0; i < p.Attempts; i++ {
actual, when := rt.NextRestart(127)
if !actual {
t.Fatalf("NextRestart() returned %v, want %v", actual, true)
state, when := rt.SetWaitResult(testWaitResult(127)).GetState()
if state != structs.TaskRestarting {
t.Fatalf("NextRestart() returned %v, want %v", state, structs.TaskRestarting)
}
if !withinJitter(p.Delay, when) {
t.Fatalf("NextRestart() returned %v; want %v+jitter", when, p.Delay)
Expand All @@ -39,8 +45,8 @@ func TestClient_RestartTracker_ModeDelay(t *testing.T) {

// Follow up restarts should cause delay.
for i := 0; i < 3; i++ {
actual, when := rt.NextRestart(127)
if !actual {
state, when := rt.SetWaitResult(testWaitResult(127)).GetState()
if state != structs.TaskRestarting {
t.Fail()
}
if !(when > p.Delay && when <= p.Interval) {
Expand All @@ -54,27 +60,27 @@ func TestClient_RestartTracker_ModeFail(t *testing.T) {
p := testPolicy(true, structs.RestartPolicyModeFail)
rt := newRestartTracker(p, structs.JobTypeSystem)
for i := 0; i < p.Attempts; i++ {
actual, when := rt.NextRestart(127)
if !actual {
t.Fatalf("NextRestart() returned %v, want %v", actual, true)
state, when := rt.SetWaitResult(testWaitResult(127)).GetState()
if state != structs.TaskRestarting {
t.Fatalf("NextRestart() returned %v, want %v", state, structs.TaskRestarting)
}
if !withinJitter(p.Delay, when) {
t.Fatalf("NextRestart() returned %v; want %v+jitter", when, p.Delay)
}
}

// Next restart should cause fail
if actual, _ := rt.NextRestart(127); actual {
t.Fail()
if state, _ := rt.SetWaitResult(testWaitResult(127)).GetState(); state != structs.TaskNotRestarting {
t.Fatalf("NextRestart() returned %v; want %v", state, structs.TaskNotRestarting)
}
}

func TestClient_RestartTracker_NoRestartOnSuccess(t *testing.T) {
t.Parallel()
p := testPolicy(false, structs.RestartPolicyModeDelay)
rt := newRestartTracker(p, structs.JobTypeBatch)
if shouldRestart, _ := rt.NextRestart(0); shouldRestart {
t.Fatalf("NextRestart() returned %v, expected: %v", shouldRestart, false)
if state, _ := rt.SetWaitResult(testWaitResult(0)).GetState(); state != structs.TaskTerminated {
t.Fatalf("NextRestart() returned %v, expected: %v", state, structs.TaskTerminated)
}
}

Expand All @@ -83,7 +89,28 @@ func TestClient_RestartTracker_ZeroAttempts(t *testing.T) {
p := testPolicy(true, structs.RestartPolicyModeFail)
p.Attempts = 0
rt := newRestartTracker(p, structs.JobTypeService)
if actual, when := rt.NextRestart(1); actual {
if state, when := rt.SetWaitResult(testWaitResult(1)).GetState(); state != structs.TaskNotRestarting {
t.Fatalf("expect no restart, got restart/delay: %v", when)
}
}

func TestClient_RestartTracker_StartError_Recoverable(t *testing.T) {
t.Parallel()
p := testPolicy(true, structs.RestartPolicyModeDelay)
rt := newRestartTracker(p, structs.JobTypeSystem)
recErr := cstructs.NewRecoverableError(fmt.Errorf("foo"), true)
for i := 0; i < p.Attempts; i++ {
state, when := rt.SetStartError(recErr).GetState()
if state != structs.TaskRestarting {
t.Fatalf("NextRestart() returned %v, want %v", state, structs.TaskRestarting)
}
if !withinJitter(p.Delay, when) {
t.Fatalf("NextRestart() returned %v; want %v+jitter", when, p.Delay)
}
}

// Next restart should cause fail
if state, _ := rt.SetStartError(recErr).GetState(); state != structs.TaskNotRestarting {
t.Fatalf("NextRestart() returned %v; want %v", state, structs.TaskNotRestarting)
}
}
Loading

0 comments on commit 7007409

Please sign in to comment.