Skip to content

Commit

Permalink
RestartDelay isn't needed as checks are re-added on restarts
Browse files Browse the repository at this point in the history
@dadgar made the excellent observation in #3105 that TaskRunner removes
and re-registers checks on restarts. This means checkWatcher doesn't
need to do *any* internal restart tracking. Individual checks can just
remove themselves and be re-added when the task restarts.
  • Loading branch information
schmichael committed Sep 14, 2017
1 parent 2312033 commit 006429e
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 61 deletions.
7 changes: 0 additions & 7 deletions client/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -1703,13 +1703,6 @@ func (r *TaskRunner) Restart(source, reason string, failure bool) {
}
}

// RestartDelay returns the *max* value of the delay for this task's restart
// policy for use by the healtcheck watcher.
func (r *TaskRunner) RestartDelay() time.Duration {
delay := r.alloc.Job.LookupTaskGroup(r.alloc.TaskGroup).RestartPolicy.Delay
return delay + time.Duration(float64(delay)*jitter)
}

// Signal will send a signal to the task
func (r *TaskRunner) Signal(source, reason string, s os.Signal) error {

Expand Down
48 changes: 25 additions & 23 deletions command/agent/consul/check_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@ type ChecksAPI interface {
Checks() (map[string]*api.AgentCheck, error)
}

// TaskRestarter allows the checkWatcher to restart tasks and how long the
// grace period should be afterward.
// TaskRestarter allows the checkWatcher to restart tasks.
type TaskRestarter interface {
RestartDelay() time.Duration
Restart(source, reason string, failure bool)
}

Expand All @@ -36,7 +34,6 @@ type checkRestart struct {
checkName string

task TaskRestarter
restartDelay time.Duration
grace time.Duration
interval time.Duration
timeLimit time.Duration
Expand All @@ -55,35 +52,37 @@ type checkRestart struct {
logger *log.Logger
}

// update restart state for check and restart task if necessary. Currrent
// apply restart state for check and restart task if necessary. Currrent
// timestamp is passed in so all check updates have the same view of time (and
// to ease testing).
func (c *checkRestart) update(now time.Time, status string) {
//
// Returns true if a restart was triggered in which case this check should be
// removed (checks are added on task startup).
func (c *checkRestart) apply(now time.Time, status string) bool {
healthy := func() {
if !c.unhealthyStart.IsZero() {
c.logger.Printf("[DEBUG] consul.health: alloc %q task %q check %q became healthy; canceling restart",
c.allocID, c.taskName, c.checkName)
c.unhealthyStart = time.Time{}
}
return
}
switch status {
case api.HealthCritical:
case api.HealthWarning:
if c.ignoreWarnings {
// Warnings are ignored, reset state and exit
healthy()
return
return false
}
default:
// All other statuses are ok, reset state and exit
healthy()
return
return false
}

if now.Before(c.graceUntil) {
// In grace period exit
return
// In grace period, exit
return false
}

if c.unhealthyStart.IsZero() {
Expand All @@ -106,11 +105,10 @@ func (c *checkRestart) update(now time.Time, status string) {
// Tell TaskRunner to restart due to failure
const failure = true
c.task.Restart("healthcheck", fmt.Sprintf("check %q unhealthy", c.checkName), failure)

// Reset grace time to grace + restart.delay
c.graceUntil = now.Add(c.grace + c.restartDelay)
c.unhealthyStart = time.Time{}
return true
}

return false
}

// checkWatchUpdates add or remove checks from the watcher
Expand Down Expand Up @@ -179,17 +177,16 @@ func (w *checkWatcher) Run(ctx context.Context) {

// Main watch loop
for {
// disable polling if there are no checks
if len(checks) == 0 {
stopTimer()
}

select {
case update := <-w.checkUpdateCh:
if update.remove {
// Remove a check
delete(checks, update.checkID)

// disable polling if last check was removed
if len(checks) == 0 {
stopTimer()
}

continue
}

Expand Down Expand Up @@ -235,7 +232,13 @@ func (w *checkWatcher) Run(ctx context.Context) {
continue
}

check.update(now, result.Status)
restarted := check.apply(now, result.Status)
if restarted {
// Checks are registered+watched on
// startup, so it's safe to remove them
// whenever they're restarted
delete(checks, cid)
}
}
}
}
Expand All @@ -254,7 +257,6 @@ func (w *checkWatcher) Watch(allocID, taskName, checkID string, check *structs.S
checkID: checkID,
checkName: check.Name,
task: restarter,
restartDelay: restarter.RestartDelay(),
interval: check.Interval,
grace: check.CheckRestart.Grace,
graceUntil: time.Now().Add(check.CheckRestart.Grace),
Expand Down
28 changes: 13 additions & 15 deletions command/agent/consul/check_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestCheckWatcher_Skip(t *testing.T) {
check.CheckRestart = nil

cw := newCheckWatcher(testLogger(), newFakeChecksAPI())
restarter1 := newFakeCheckRestarter()
restarter1 := newFakeCheckRestarter(cw, "testalloc1", "testtask1", "testcheck1", check)
cw.Watch("testalloc1", "testtask1", "testcheck1", check, restarter1)

// Check should have been dropped as it's not watched
Expand All @@ -101,13 +101,13 @@ func TestCheckWatcher_Healthy(t *testing.T) {
fakeAPI, cw := testWatcherSetup()

check1 := testCheck()
restarter1 := newFakeCheckRestarter()
restarter1 := newFakeCheckRestarter(cw, "testalloc1", "testtask1", "testcheck1", check1)
cw.Watch("testalloc1", "testtask1", "testcheck1", check1, restarter1)

check2 := testCheck()
check2.CheckRestart.Limit = 1
check2.CheckRestart.Grace = 0
restarter2 := newFakeCheckRestarter()
restarter2 := newFakeCheckRestarter(cw, "testalloc2", "testtask2", "testcheck2", check2)
cw.Watch("testalloc2", "testtask2", "testcheck2", check2, restarter2)

// Make both checks healthy from the beginning
Expand Down Expand Up @@ -135,22 +135,21 @@ func TestCheckWatcher_Unhealthy(t *testing.T) {
fakeAPI, cw := testWatcherSetup()

check1 := testCheck()
restarter1 := newFakeCheckRestarter()
restarter1 := newFakeCheckRestarter(cw, "testalloc1", "testtask1", "testcheck1", check1)
cw.Watch("testalloc1", "testtask1", "testcheck1", check1, restarter1)

check2 := testCheck()
check2.CheckRestart.Limit = 1
check2.CheckRestart.Grace = 0
restarter2 := newFakeCheckRestarter()
restarter2.restartDelay = 600 * time.Millisecond
check2.CheckRestart.Grace = 200 * time.Millisecond
restarter2 := newFakeCheckRestarter(cw, "testalloc2", "testtask2", "testcheck2", check2)
cw.Watch("testalloc2", "testtask2", "testcheck2", check2, restarter2)

// Check 1 always passes, check 2 always fails
fakeAPI.add("testcheck1", "passing", time.Time{})
fakeAPI.add("testcheck2", "critical", time.Time{})

// Run for 1 second
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
cw.Run(ctx)

Expand All @@ -176,15 +175,14 @@ func TestCheckWatcher_HealthyWarning(t *testing.T) {
check1.CheckRestart.Limit = 1
check1.CheckRestart.Grace = 0
check1.CheckRestart.IgnoreWarnings = true
restarter1 := newFakeCheckRestarter()
restarter1.restartDelay = 1100 * time.Millisecond
restarter1 := newFakeCheckRestarter(cw, "testalloc1", "testtask1", "testcheck1", check1)
cw.Watch("testalloc1", "testtask1", "testcheck1", check1, restarter1)

// Check is always in warning but that's ok
fakeAPI.add("testcheck1", "warning", time.Time{})

// Run for 1 second
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
cw.Run(ctx)

Expand All @@ -203,7 +201,7 @@ func TestCheckWatcher_Flapping(t *testing.T) {

check1 := testCheck()
check1.CheckRestart.Grace = 0
restarter1 := newFakeCheckRestarter()
restarter1 := newFakeCheckRestarter(cw, "testalloc1", "testtask1", "testcheck1", check1)
cw.Watch("testalloc1", "testtask1", "testcheck1", check1, restarter1)

// Check flaps and is never failing for the full 200ms needed to restart
Expand All @@ -214,7 +212,7 @@ func TestCheckWatcher_Flapping(t *testing.T) {
fakeAPI.add("testcheck1", "critical", now.Add(300*time.Millisecond))
fakeAPI.add("testcheck1", "passing", now.Add(450*time.Millisecond))

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 600*time.Millisecond)
defer cancel()
cw.Run(ctx)

Expand All @@ -234,14 +232,14 @@ func TestCheckWatcher_Unwatch(t *testing.T) {
check1 := testCheck()
check1.CheckRestart.Limit = 1
check1.CheckRestart.Grace = 100 * time.Millisecond
restarter1 := newFakeCheckRestarter()
restarter1 := newFakeCheckRestarter(cw, "testalloc1", "testtask1", "testcheck1", check1)
cw.Watch("testalloc1", "testtask1", "testcheck1", check1, restarter1)
cw.Unwatch("testcheck1")

// Always failing
fakeAPI.add("testcheck1", "critical", time.Time{})

ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
ctx, cancel := context.WithTimeout(context.Background(), 300*time.Millisecond)
defer cancel()
cw.Run(ctx)

Expand Down
37 changes: 21 additions & 16 deletions command/agent/consul/unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,37 +64,43 @@ type checkRestartRecord struct {

// fakeCheckRestarter is a test implementation of
type fakeCheckRestarter struct {
// restartDelay is returned by RestartDelay to control the behavior of
// the checkWatcher
restartDelay time.Duration

// restarts is a slice of all of the restarts triggered by the checkWatcher
restarts []checkRestartRecord
}

func newFakeCheckRestarter() *fakeCheckRestarter {
return &fakeCheckRestarter{}
// need the checkWatcher to re-Watch restarted tasks like TaskRunner
watcher *checkWatcher

// check to re-Watch on restarts
check *structs.ServiceCheck
allocID string
taskName string
checkName string
}

// RestartDelay implements part of the TaskRestarter interface needed for check
// watching and is normally fulfilled by a task runner.
//
// The return value is determined by the restartDelay field.
func (c *fakeCheckRestarter) RestartDelay() time.Duration {
return c.restartDelay
func newFakeCheckRestarter(w *checkWatcher, allocID, taskName, checkName string, c *structs.ServiceCheck) *fakeCheckRestarter {
return &fakeCheckRestarter{
watcher: w,
check: c,
allocID: allocID,
taskName: taskName,
checkName: checkName,
}
}

// Restart implements part of the TaskRestarter interface needed for check
// watching and is normally fulfilled by a TaskRunner.
//
// Restarts are recorded in the []restarts field.
// Restarts are recorded in the []restarts field and re-Watch the check.
func (c *fakeCheckRestarter) Restart(source, reason string, failure bool) {
c.restarts = append(c.restarts, checkRestartRecord{time.Now(), source, reason, failure})

// Re-Watch the check just like TaskRunner
c.watcher.Watch(c.allocID, c.taskName, c.checkName, c.check, c)
}

// String for debugging
func (c *fakeCheckRestarter) String() string {
s := ""
s := fmt.Sprintf("%s %s %s restarts:\n", c.allocID, c.taskName, c.checkName)
for _, r := range c.restarts {
s += fmt.Sprintf("%s - %s: %s (failure: %t)\n", r.timestamp, r.source, r.reason, r.failure)
}
Expand Down Expand Up @@ -152,7 +158,6 @@ func setupFake() *testFakeCtx {
ServiceClient: NewServiceClient(fc, true, testLogger()),
FakeConsul: fc,
Task: testTask(),
Restarter: newFakeCheckRestarter(),
execs: make(chan int, 100),
}
}
Expand Down

0 comments on commit 006429e

Please sign in to comment.