diff --git a/client/allochealth/tracker.go b/client/allochealth/tracker.go index 9b158828c311..fe5aac69f360 100644 --- a/client/allochealth/tracker.go +++ b/client/allochealth/tracker.go @@ -41,6 +41,10 @@ type Tracker struct { // considered healthy minHealthyTime time.Duration + // checkLookupInterval is the interval at which we check if the + // Consul checks are healthy or unhealthy. + checkLookupInterval time.Duration + // useChecks specifies whether to use Consul healh checks or not useChecks bool @@ -62,6 +66,10 @@ type Tracker struct { // not needed allocStopped chan struct{} + // lifecycleTasks is a set of tasks with lifecycle hook set and may + // terminate without affecting alloc health + lifecycleTasks map[string]bool + // l is used to lock shared fields listed below l sync.Mutex @@ -92,28 +100,36 @@ func NewTracker(parentCtx context.Context, logger hclog.Logger, alloc *structs.A // this struct should pass in an appropriately named // sub-logger. t := &Tracker{ - healthy: make(chan bool, 1), - allocStopped: make(chan struct{}), - alloc: alloc, - tg: alloc.Job.LookupTaskGroup(alloc.TaskGroup), - minHealthyTime: minHealthyTime, - useChecks: useChecks, - allocUpdates: allocUpdates, - consulClient: consulClient, - logger: logger, + healthy: make(chan bool, 1), + allocStopped: make(chan struct{}), + alloc: alloc, + tg: alloc.Job.LookupTaskGroup(alloc.TaskGroup), + minHealthyTime: minHealthyTime, + useChecks: useChecks, + allocUpdates: allocUpdates, + consulClient: consulClient, + checkLookupInterval: consulCheckLookupInterval, + logger: logger, + lifecycleTasks: map[string]bool{}, } t.taskHealth = make(map[string]*taskHealthState, len(t.tg.Tasks)) for _, task := range t.tg.Tasks { t.taskHealth[task.Name] = &taskHealthState{task: task} - } - for _, task := range t.tg.Tasks { + if task.Lifecycle != nil && !task.Lifecycle.Sidecar { + t.lifecycleTasks[task.Name] = true + } + for _, s := range task.Services { t.consulCheckCount += len(s.Checks) } } + for _, s := range t.tg.Services { + t.consulCheckCount += len(s.Checks) + } + t.ctx, t.cancelFn = context.WithCancel(parentCtx) return t } @@ -171,6 +187,12 @@ func (t *Tracker) setTaskHealth(healthy, terminal bool) { defer t.l.Unlock() t.tasksHealthy = healthy + // if unhealthy, force waiting for new checks health status + if !terminal && !healthy { + t.checksHealthy = false + return + } + // If we are marked healthy but we also require Consul to be healthy and it // isn't yet, return, unless the task is terminal requireConsul := t.useChecks && t.consulCheckCount > 0 @@ -191,10 +213,13 @@ func (t *Tracker) setTaskHealth(healthy, terminal bool) { func (t *Tracker) setCheckHealth(healthy bool) { t.l.Lock() defer t.l.Unlock() - t.checksHealthy = healthy + + // check health should always be false if tasks are unhealthy + // as checks might be missing from unhealthy tasks + t.checksHealthy = healthy && t.tasksHealthy // Only signal if we are healthy and so is the tasks - if !healthy || !t.tasksHealthy { + if !t.checksHealthy { return } @@ -249,17 +274,18 @@ func (t *Tracker) watchTaskEvents() { // Detect if the alloc is unhealthy or if all tasks have started yet latestStartTime := time.Time{} - for _, state := range alloc.TaskStates { + for taskName, state := range alloc.TaskStates { // One of the tasks has failed so we can exit watching - if state.Failed || !state.FinishedAt.IsZero() { + if state.Failed || (!state.FinishedAt.IsZero() && !t.lifecycleTasks[taskName]) { t.setTaskHealth(false, true) return } - if state.State != structs.TaskStateRunning { + if state.State == structs.TaskStatePending { latestStartTime = time.Time{} break } else if state.StartedAt.After(latestStartTime) { + // task is either running or exited successfully latestStartTime = state.StartedAt } } @@ -276,6 +302,9 @@ func (t *Tracker) watchTaskEvents() { } if !latestStartTime.Equal(allStartedTime) { + // reset task health + t.setTaskHealth(false, false) + // Avoid the timer from firing at the old start time if !healthyTimer.Stop() { select { @@ -310,7 +339,7 @@ func (t *Tracker) watchTaskEvents() { func (t *Tracker) watchConsulEvents() { // checkTicker is the ticker that triggers us to look at the checks in // Consul - checkTicker := time.NewTicker(consulCheckLookupInterval) + checkTicker := time.NewTicker(t.checkLookupInterval) defer checkTicker.Stop() // healthyTimer fires when the checks have been healthy for the @@ -440,13 +469,20 @@ func (t *taskHealthState) event(deadline time.Time, minHealthyTime time.Duration if t.state.Failed { return "Unhealthy because of failed task", true } - if t.state.State != structs.TaskStateRunning { - return "Task not running by deadline", true - } - // We are running so check if we have been running long enough - if t.state.StartedAt.Add(minHealthyTime).After(deadline) { - return fmt.Sprintf("Task not running for min_healthy_time of %v by deadline", minHealthyTime), true + switch t.state.State { + case structs.TaskStatePending: + return "Task not running by deadline", true + case structs.TaskStateDead: + // hook tasks are healthy when dead successfully + if t.task.Lifecycle == nil || t.task.Lifecycle.Sidecar { + return "Unhealthy because of dead task", true + } + case structs.TaskStateRunning: + // We are running so check if we have been running long enough + if t.state.StartedAt.Add(minHealthyTime).After(deadline) { + return fmt.Sprintf("Task not running for min_healthy_time of %v by deadline", minHealthyTime), true + } } } diff --git a/client/allochealth/tracker_test.go b/client/allochealth/tracker_test.go new file mode 100644 index 000000000000..6e50ef636468 --- /dev/null +++ b/client/allochealth/tracker_test.go @@ -0,0 +1,229 @@ +package allochealth + +import ( + "context" + "fmt" + "sync/atomic" + "testing" + "time" + + consulapi "github.com/hashicorp/consul/api" + "github.com/hashicorp/nomad/client/consul" + cstructs "github.com/hashicorp/nomad/client/structs" + agentconsul "github.com/hashicorp/nomad/command/agent/consul" + "github.com/hashicorp/nomad/helper/testlog" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/testutil" + "github.com/stretchr/testify/require" +) + +func TestTracker_Checks_Healthy(t *testing.T) { + t.Parallel() + + alloc := mock.Alloc() + alloc.Job.TaskGroups[0].Migrate.MinHealthyTime = 1 // let's speed things up + task := alloc.Job.TaskGroups[0].Tasks[0] + + // Synthesize running alloc and tasks + alloc.ClientStatus = structs.AllocClientStatusRunning + alloc.TaskStates = map[string]*structs.TaskState{ + task.Name: { + State: structs.TaskStateRunning, + StartedAt: time.Now(), + }, + } + + // Make Consul response + check := &consulapi.AgentCheck{ + Name: task.Services[0].Checks[0].Name, + Status: consulapi.HealthPassing, + } + taskRegs := map[string]*agentconsul.ServiceRegistrations{ + task.Name: { + Services: map[string]*agentconsul.ServiceRegistration{ + task.Services[0].Name: { + Service: &consulapi.AgentService{ + ID: "foo", + Service: task.Services[0].Name, + }, + Checks: []*consulapi.AgentCheck{check}, + }, + }, + }, + } + + logger := testlog.HCLogger(t) + b := cstructs.NewAllocBroadcaster(logger) + defer b.Close() + + // Don't reply on the first call + var called uint64 + consul := consul.NewMockConsulServiceClient(t, logger) + consul.AllocRegistrationsFn = func(string) (*agentconsul.AllocRegistration, error) { + if atomic.AddUint64(&called, 1) == 1 { + return nil, nil + } + + reg := &agentconsul.AllocRegistration{ + Tasks: taskRegs, + } + + return reg, nil + } + + ctx, cancelFn := context.WithCancel(context.Background()) + defer cancelFn() + + checkInterval := 10 * time.Millisecond + tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul, + time.Millisecond, true) + tracker.checkLookupInterval = checkInterval + tracker.Start() + + select { + case <-time.After(4 * checkInterval): + require.Fail(t, "timed out while waiting for health") + case h := <-tracker.HealthyCh(): + require.True(t, h) + } +} + +func TestTracker_Checks_Unhealthy(t *testing.T) { + t.Parallel() + + alloc := mock.Alloc() + alloc.Job.TaskGroups[0].Migrate.MinHealthyTime = 1 // let's speed things up + task := alloc.Job.TaskGroups[0].Tasks[0] + + newCheck := task.Services[0].Checks[0].Copy() + newCheck.Name = "failing-check" + task.Services[0].Checks = append(task.Services[0].Checks, newCheck) + + // Synthesize running alloc and tasks + alloc.ClientStatus = structs.AllocClientStatusRunning + alloc.TaskStates = map[string]*structs.TaskState{ + task.Name: { + State: structs.TaskStateRunning, + StartedAt: time.Now(), + }, + } + + // Make Consul response + checkHealthy := &consulapi.AgentCheck{ + Name: task.Services[0].Checks[0].Name, + Status: consulapi.HealthPassing, + } + checksUnhealthy := &consulapi.AgentCheck{ + Name: task.Services[0].Checks[1].Name, + Status: consulapi.HealthCritical, + } + taskRegs := map[string]*agentconsul.ServiceRegistrations{ + task.Name: { + Services: map[string]*agentconsul.ServiceRegistration{ + task.Services[0].Name: { + Service: &consulapi.AgentService{ + ID: "foo", + Service: task.Services[0].Name, + }, + Checks: []*consulapi.AgentCheck{checkHealthy, checksUnhealthy}, + }, + }, + }, + } + + logger := testlog.HCLogger(t) + b := cstructs.NewAllocBroadcaster(logger) + defer b.Close() + + // Don't reply on the first call + var called uint64 + consul := consul.NewMockConsulServiceClient(t, logger) + consul.AllocRegistrationsFn = func(string) (*agentconsul.AllocRegistration, error) { + if atomic.AddUint64(&called, 1) == 1 { + return nil, nil + } + + reg := &agentconsul.AllocRegistration{ + Tasks: taskRegs, + } + + return reg, nil + } + + ctx, cancelFn := context.WithCancel(context.Background()) + defer cancelFn() + + checkInterval := 10 * time.Millisecond + tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul, + time.Millisecond, true) + tracker.checkLookupInterval = checkInterval + tracker.Start() + + testutil.WaitForResult(func() (bool, error) { + lookup := atomic.LoadUint64(&called) + return lookup < 4, fmt.Errorf("wait to get more task registration lookups: %v", lookup) + }, func(err error) { + require.NoError(t, err) + }) + + tracker.l.Lock() + require.False(t, tracker.checksHealthy) + tracker.l.Unlock() + + select { + case v := <-tracker.HealthyCh(): + require.Failf(t, "expected no health value", " got %v", v) + default: + // good + } +} + +func TestTracker_Healthy_IfBothTasksAndConsulChecksAreHealthy(t *testing.T) { + t.Parallel() + + alloc := mock.Alloc() + logger := testlog.HCLogger(t) + + ctx, cancelFn := context.WithCancel(context.Background()) + defer cancelFn() + + tracker := NewTracker(ctx, logger, alloc, nil, nil, + time.Millisecond, true) + + assertNoHealth := func() { + require.NoError(t, tracker.ctx.Err()) + select { + case v := <-tracker.HealthyCh(): + require.Failf(t, "unexpected healthy event", "got %v", v) + default: + } + } + + // first set task health without checks + tracker.setTaskHealth(true, false) + assertNoHealth() + + // now fail task health again before checks are successful + tracker.setTaskHealth(false, false) + assertNoHealth() + + // now pass health checks - do not propagate health yet + tracker.setCheckHealth(true) + assertNoHealth() + + // set tasks to healthy - don't propagate health yet, wait for the next check + tracker.setTaskHealth(true, false) + assertNoHealth() + + // set checks to true, now propagate health status + tracker.setCheckHealth(true) + + require.Error(t, tracker.ctx.Err()) + select { + case v := <-tracker.HealthyCh(): + require.True(t, v) + default: + require.Fail(t, "expected a health status") + } +} diff --git a/client/allocrunner/health_hook_test.go b/client/allocrunner/health_hook_test.go index e4e3cfd2cf3f..62c14892560f 100644 --- a/client/allocrunner/health_hook_test.go +++ b/client/allocrunner/health_hook_test.go @@ -219,9 +219,9 @@ func TestHealthHook_Postrun(t *testing.T) { require.NoError(h.Postrun()) } -// TestHealthHook_SetHealth asserts SetHealth is called when health status is +// TestHealthHook_SetHealth_healthy asserts SetHealth is called when health status is // set. Uses task state and health checks. -func TestHealthHook_SetHealth(t *testing.T) { +func TestHealthHook_SetHealth_healthy(t *testing.T) { t.Parallel() require := require.New(t) @@ -300,6 +300,90 @@ func TestHealthHook_SetHealth(t *testing.T) { require.NoError(h.Postrun()) } +// TestHealthHook_SetHealth_unhealthy asserts SetHealth notices unhealthy allocs +func TestHealthHook_SetHealth_unhealthy(t *testing.T) { + t.Parallel() + require := require.New(t) + + alloc := mock.Alloc() + alloc.Job.TaskGroups[0].Migrate.MinHealthyTime = 1 // let's speed things up + task := alloc.Job.TaskGroups[0].Tasks[0] + + newCheck := task.Services[0].Checks[0].Copy() + newCheck.Name = "failing-check" + task.Services[0].Checks = append(task.Services[0].Checks, newCheck) + + // Synthesize running alloc and tasks + alloc.ClientStatus = structs.AllocClientStatusRunning + alloc.TaskStates = map[string]*structs.TaskState{ + task.Name: { + State: structs.TaskStateRunning, + StartedAt: time.Now(), + }, + } + + // Make Consul response + checkHealthy := &consulapi.AgentCheck{ + Name: task.Services[0].Checks[0].Name, + Status: consulapi.HealthPassing, + } + checksUnhealthy := &consulapi.AgentCheck{ + Name: task.Services[0].Checks[1].Name, + Status: consulapi.HealthCritical, + } + taskRegs := map[string]*agentconsul.ServiceRegistrations{ + task.Name: { + Services: map[string]*agentconsul.ServiceRegistration{ + task.Services[0].Name: { + Service: &consulapi.AgentService{ + ID: "foo", + Service: task.Services[0].Name, + }, + Checks: []*consulapi.AgentCheck{checkHealthy, checksUnhealthy}, + }, + }, + }, + } + + logger := testlog.HCLogger(t) + b := cstructs.NewAllocBroadcaster(logger) + defer b.Close() + + // Don't reply on the first call + called := false + consul := consul.NewMockConsulServiceClient(t, logger) + consul.AllocRegistrationsFn = func(string) (*agentconsul.AllocRegistration, error) { + if !called { + called = true + return nil, nil + } + + reg := &agentconsul.AllocRegistration{ + Tasks: taskRegs, + } + + return reg, nil + } + + hs := newMockHealthSetter() + + h := newAllocHealthWatcherHook(logger, alloc.Copy(), hs, b.Listen(), consul).(*allocHealthWatcherHook) + + // Prerun + require.NoError(h.Prerun()) + + // Wait to ensure we don't get a healthy status + select { + case <-time.After(2 * time.Second): + // great no healthy status + case health := <-hs.healthCh: + require.Fail("expected no health event", "got %v", health) + } + + // Postrun + require.NoError(h.Postrun()) +} + // TestHealthHook_SystemNoop asserts that system jobs return the noop tracker. func TestHealthHook_SystemNoop(t *testing.T) { t.Parallel()