From 2cbca7bf45ccbd58ef9f64d32bd978f5702b85f3 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Tue, 17 Mar 2020 11:49:42 -0400 Subject: [PATCH] health: fail health if any task is pending Fixes a bug where an allocation is considered healthy if some of the tasks are being restarted and as such, their checks aren't tracked by consul agent client. Here, we fix the immediate case by ensuring that an alloc is healthy only if tasks are running and the registered checks at the time are healthy. Previously, health tracker tracked task "health" independently from checks and leads to problems when a task restarts. Consider the following series of events: 1. all tasks start running -> `tracker.tasksHealthy` is true 2. one task has unhealthy checks and get restarted 3. remaining checks are healthy -> `tracker.checksHealthy` is true 4. propagate health status now that `tracker.tasksHealthy` and `tracker.checksHealthy`. This change ensures that we accurately use the latest status of tasks and checks regardless of their status changes. Also, ensures that we only consider check health after tasks are considered healthy, otherwise we risk trusting incomplete checks. This approach accomodates task dependencies well. Service jobs can have prestart short-lived tasks that will terminate before main process runs. These dead tasks that complete successfully will not negate health status. --- client/allochealth/tracker.go | 44 +++-- client/allochealth/tracker_test.go | 229 +++++++++++++++++++++++++ client/allocrunner/health_hook_test.go | 97 +---------- 3 files changed, 263 insertions(+), 107 deletions(-) create mode 100644 client/allochealth/tracker_test.go diff --git a/client/allochealth/tracker.go b/client/allochealth/tracker.go index 9b158828c311..f27ec67d65ca 100644 --- a/client/allochealth/tracker.go +++ b/client/allochealth/tracker.go @@ -41,6 +41,10 @@ type Tracker struct { // considered healthy minHealthyTime time.Duration + // checkLookupInterval is the interval at which we check if the + // Consul checks are healthy or unhealthy. + checkLookupInterval time.Duration + // useChecks specifies whether to use Consul healh checks or not useChecks bool @@ -92,15 +96,16 @@ func NewTracker(parentCtx context.Context, logger hclog.Logger, alloc *structs.A // this struct should pass in an appropriately named // sub-logger. t := &Tracker{ - healthy: make(chan bool, 1), - allocStopped: make(chan struct{}), - alloc: alloc, - tg: alloc.Job.LookupTaskGroup(alloc.TaskGroup), - minHealthyTime: minHealthyTime, - useChecks: useChecks, - allocUpdates: allocUpdates, - consulClient: consulClient, - logger: logger, + healthy: make(chan bool, 1), + allocStopped: make(chan struct{}), + alloc: alloc, + tg: alloc.Job.LookupTaskGroup(alloc.TaskGroup), + minHealthyTime: minHealthyTime, + useChecks: useChecks, + allocUpdates: allocUpdates, + consulClient: consulClient, + checkLookupInterval: consulCheckLookupInterval, + logger: logger, } t.taskHealth = make(map[string]*taskHealthState, len(t.tg.Tasks)) @@ -171,6 +176,12 @@ func (t *Tracker) setTaskHealth(healthy, terminal bool) { defer t.l.Unlock() t.tasksHealthy = healthy + // if unhealthy, force waiting for new checks health status + if !terminal && !healthy { + t.checksHealthy = false + return + } + // If we are marked healthy but we also require Consul to be healthy and it // isn't yet, return, unless the task is terminal requireConsul := t.useChecks && t.consulCheckCount > 0 @@ -191,10 +202,13 @@ func (t *Tracker) setTaskHealth(healthy, terminal bool) { func (t *Tracker) setCheckHealth(healthy bool) { t.l.Lock() defer t.l.Unlock() - t.checksHealthy = healthy + + // check health should always be false if tasks are unhealthy + // as checks might be missing from unhealthy tasks + t.checksHealthy = healthy && t.tasksHealthy // Only signal if we are healthy and so is the tasks - if !healthy || !t.tasksHealthy { + if !t.checksHealthy { return } @@ -256,10 +270,11 @@ func (t *Tracker) watchTaskEvents() { return } - if state.State != structs.TaskStateRunning { + if state.State == structs.TaskStatePending { latestStartTime = time.Time{} break } else if state.StartedAt.After(latestStartTime) { + // task is either running or exited successfully latestStartTime = state.StartedAt } } @@ -276,6 +291,9 @@ func (t *Tracker) watchTaskEvents() { } if !latestStartTime.Equal(allStartedTime) { + // reset task health + t.setTaskHealth(false, false) + // Avoid the timer from firing at the old start time if !healthyTimer.Stop() { select { @@ -310,7 +328,7 @@ func (t *Tracker) watchTaskEvents() { func (t *Tracker) watchConsulEvents() { // checkTicker is the ticker that triggers us to look at the checks in // Consul - checkTicker := time.NewTicker(consulCheckLookupInterval) + checkTicker := time.NewTicker(t.checkLookupInterval) defer checkTicker.Stop() // healthyTimer fires when the checks have been healthy for the diff --git a/client/allochealth/tracker_test.go b/client/allochealth/tracker_test.go new file mode 100644 index 000000000000..6e50ef636468 --- /dev/null +++ b/client/allochealth/tracker_test.go @@ -0,0 +1,229 @@ +package allochealth + +import ( + "context" + "fmt" + "sync/atomic" + "testing" + "time" + + consulapi "github.com/hashicorp/consul/api" + "github.com/hashicorp/nomad/client/consul" + cstructs "github.com/hashicorp/nomad/client/structs" + agentconsul "github.com/hashicorp/nomad/command/agent/consul" + "github.com/hashicorp/nomad/helper/testlog" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/testutil" + "github.com/stretchr/testify/require" +) + +func TestTracker_Checks_Healthy(t *testing.T) { + t.Parallel() + + alloc := mock.Alloc() + alloc.Job.TaskGroups[0].Migrate.MinHealthyTime = 1 // let's speed things up + task := alloc.Job.TaskGroups[0].Tasks[0] + + // Synthesize running alloc and tasks + alloc.ClientStatus = structs.AllocClientStatusRunning + alloc.TaskStates = map[string]*structs.TaskState{ + task.Name: { + State: structs.TaskStateRunning, + StartedAt: time.Now(), + }, + } + + // Make Consul response + check := &consulapi.AgentCheck{ + Name: task.Services[0].Checks[0].Name, + Status: consulapi.HealthPassing, + } + taskRegs := map[string]*agentconsul.ServiceRegistrations{ + task.Name: { + Services: map[string]*agentconsul.ServiceRegistration{ + task.Services[0].Name: { + Service: &consulapi.AgentService{ + ID: "foo", + Service: task.Services[0].Name, + }, + Checks: []*consulapi.AgentCheck{check}, + }, + }, + }, + } + + logger := testlog.HCLogger(t) + b := cstructs.NewAllocBroadcaster(logger) + defer b.Close() + + // Don't reply on the first call + var called uint64 + consul := consul.NewMockConsulServiceClient(t, logger) + consul.AllocRegistrationsFn = func(string) (*agentconsul.AllocRegistration, error) { + if atomic.AddUint64(&called, 1) == 1 { + return nil, nil + } + + reg := &agentconsul.AllocRegistration{ + Tasks: taskRegs, + } + + return reg, nil + } + + ctx, cancelFn := context.WithCancel(context.Background()) + defer cancelFn() + + checkInterval := 10 * time.Millisecond + tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul, + time.Millisecond, true) + tracker.checkLookupInterval = checkInterval + tracker.Start() + + select { + case <-time.After(4 * checkInterval): + require.Fail(t, "timed out while waiting for health") + case h := <-tracker.HealthyCh(): + require.True(t, h) + } +} + +func TestTracker_Checks_Unhealthy(t *testing.T) { + t.Parallel() + + alloc := mock.Alloc() + alloc.Job.TaskGroups[0].Migrate.MinHealthyTime = 1 // let's speed things up + task := alloc.Job.TaskGroups[0].Tasks[0] + + newCheck := task.Services[0].Checks[0].Copy() + newCheck.Name = "failing-check" + task.Services[0].Checks = append(task.Services[0].Checks, newCheck) + + // Synthesize running alloc and tasks + alloc.ClientStatus = structs.AllocClientStatusRunning + alloc.TaskStates = map[string]*structs.TaskState{ + task.Name: { + State: structs.TaskStateRunning, + StartedAt: time.Now(), + }, + } + + // Make Consul response + checkHealthy := &consulapi.AgentCheck{ + Name: task.Services[0].Checks[0].Name, + Status: consulapi.HealthPassing, + } + checksUnhealthy := &consulapi.AgentCheck{ + Name: task.Services[0].Checks[1].Name, + Status: consulapi.HealthCritical, + } + taskRegs := map[string]*agentconsul.ServiceRegistrations{ + task.Name: { + Services: map[string]*agentconsul.ServiceRegistration{ + task.Services[0].Name: { + Service: &consulapi.AgentService{ + ID: "foo", + Service: task.Services[0].Name, + }, + Checks: []*consulapi.AgentCheck{checkHealthy, checksUnhealthy}, + }, + }, + }, + } + + logger := testlog.HCLogger(t) + b := cstructs.NewAllocBroadcaster(logger) + defer b.Close() + + // Don't reply on the first call + var called uint64 + consul := consul.NewMockConsulServiceClient(t, logger) + consul.AllocRegistrationsFn = func(string) (*agentconsul.AllocRegistration, error) { + if atomic.AddUint64(&called, 1) == 1 { + return nil, nil + } + + reg := &agentconsul.AllocRegistration{ + Tasks: taskRegs, + } + + return reg, nil + } + + ctx, cancelFn := context.WithCancel(context.Background()) + defer cancelFn() + + checkInterval := 10 * time.Millisecond + tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul, + time.Millisecond, true) + tracker.checkLookupInterval = checkInterval + tracker.Start() + + testutil.WaitForResult(func() (bool, error) { + lookup := atomic.LoadUint64(&called) + return lookup < 4, fmt.Errorf("wait to get more task registration lookups: %v", lookup) + }, func(err error) { + require.NoError(t, err) + }) + + tracker.l.Lock() + require.False(t, tracker.checksHealthy) + tracker.l.Unlock() + + select { + case v := <-tracker.HealthyCh(): + require.Failf(t, "expected no health value", " got %v", v) + default: + // good + } +} + +func TestTracker_Healthy_IfBothTasksAndConsulChecksAreHealthy(t *testing.T) { + t.Parallel() + + alloc := mock.Alloc() + logger := testlog.HCLogger(t) + + ctx, cancelFn := context.WithCancel(context.Background()) + defer cancelFn() + + tracker := NewTracker(ctx, logger, alloc, nil, nil, + time.Millisecond, true) + + assertNoHealth := func() { + require.NoError(t, tracker.ctx.Err()) + select { + case v := <-tracker.HealthyCh(): + require.Failf(t, "unexpected healthy event", "got %v", v) + default: + } + } + + // first set task health without checks + tracker.setTaskHealth(true, false) + assertNoHealth() + + // now fail task health again before checks are successful + tracker.setTaskHealth(false, false) + assertNoHealth() + + // now pass health checks - do not propagate health yet + tracker.setCheckHealth(true) + assertNoHealth() + + // set tasks to healthy - don't propagate health yet, wait for the next check + tracker.setTaskHealth(true, false) + assertNoHealth() + + // set checks to true, now propagate health status + tracker.setCheckHealth(true) + + require.Error(t, tracker.ctx.Err()) + select { + case v := <-tracker.HealthyCh(): + require.True(t, v) + default: + require.Fail(t, "expected a health status") + } +} diff --git a/client/allocrunner/health_hook_test.go b/client/allocrunner/health_hook_test.go index 454ea52b9150..62c14892560f 100644 --- a/client/allocrunner/health_hook_test.go +++ b/client/allocrunner/health_hook_test.go @@ -328,7 +328,7 @@ func TestHealthHook_SetHealth_unhealthy(t *testing.T) { Status: consulapi.HealthPassing, } checksUnhealthy := &consulapi.AgentCheck{ - Name :task.Services[0].Checks[1].Name, + Name: task.Services[0].Checks[1].Name, Status: consulapi.HealthCritical, } taskRegs := map[string]*agentconsul.ServiceRegistrations{ @@ -374,107 +374,16 @@ func TestHealthHook_SetHealth_unhealthy(t *testing.T) { // Wait to ensure we don't get a healthy status select { - case <-time.After(5 * time.Second): - // great no healthy status - case health := <-hs.healthCh: - require.False(health.healthy) - - // Unhealthy allocs shouldn't emit task events - ev := health.taskEvents[task.Name] - require.NotNilf(ev, "%#v", health.taskEvents) - } - - // Postrun - require.NoError(h.Postrun()) -} - -// TestHealthHook_SetHealth_missingchecks asserts SetHealth recovers from -// missing checks -func TestHealthHook_SetHealth_missingchecks(t *testing.T) { - t.Parallel() - require := require.New(t) - - alloc := mock.Alloc() - alloc.Job.TaskGroups[0].Migrate.MinHealthyTime = 1 // let's speed things up - task := alloc.Job.TaskGroups[0].Tasks[0] - - newCheck := task.Services[0].Checks[0].Copy() - newCheck.Name = "failing-check" - task.Services[0].Checks = append(task.Services[0].Checks, newCheck) - - // Synthesize running alloc and tasks - alloc.ClientStatus = structs.AllocClientStatusRunning - alloc.TaskStates = map[string]*structs.TaskState{ - task.Name: { - State: structs.TaskStateRunning, - StartedAt: time.Now(), - }, - } - - // Make Consul response - checkHealthy := &consulapi.AgentCheck{ - Name: task.Services[0].Checks[0].Name, - Status: consulapi.HealthPassing, - } - taskRegs := map[string]*agentconsul.ServiceRegistrations{ - task.Name: { - Services: map[string]*agentconsul.ServiceRegistration{ - task.Services[0].Name: { - Service: &consulapi.AgentService{ - ID: "foo", - Service: task.Services[0].Name, - }, - // notice missing check - Checks: []*consulapi.AgentCheck{checkHealthy }, - }, - }, - }, - } - - logger := testlog.HCLogger(t) - b := cstructs.NewAllocBroadcaster(logger) - defer b.Close() - - // Don't reply on the first call - called := false - consul := consul.NewMockConsulServiceClient(t, logger) - consul.AllocRegistrationsFn = func(string) (*agentconsul.AllocRegistration, error) { - if !called { - called = true - return nil, nil - } - - reg := &agentconsul.AllocRegistration{ - Tasks: taskRegs, - } - - return reg, nil - } - - hs := newMockHealthSetter() - - h := newAllocHealthWatcherHook(logger, alloc.Copy(), hs, b.Listen(), consul).(*allocHealthWatcherHook) - - // Prerun - require.NoError(h.Prerun()) - - // Wait to ensure we don't get a healthy status - select { - case <-time.After(5 * time.Second): + case <-time.After(2 * time.Second): // great no healthy status case health := <-hs.healthCh: - require.False(health.healthy) - - // Unhealthy allocs shouldn't emit task events - ev := health.taskEvents[task.Name] - require.NotNilf(ev, "%#v", health.taskEvents) + require.Fail("expected no health event", "got %v", health) } // Postrun require.NoError(h.Postrun()) } - // TestHealthHook_SystemNoop asserts that system jobs return the noop tracker. func TestHealthHook_SystemNoop(t *testing.T) { t.Parallel()