Skip to content

Commit

Permalink
allocrunner: fix health check monitoring for Consul services
Browse files Browse the repository at this point in the history
Services must be canonicalized and have its values interpolated before
comparing with the values returned by Consul.
  • Loading branch information
lgfa29 committed Mar 9, 2023
1 parent 95359b8 commit 0765b15
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 29 deletions.
18 changes: 15 additions & 3 deletions client/allochealth/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/hashicorp/nomad/client/serviceregistration"
"github.com/hashicorp/nomad/client/serviceregistration/checks/checkstore"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/client/taskenv"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
"golang.org/x/exp/maps"
Expand Down Expand Up @@ -96,6 +97,9 @@ type Tracker struct {
// name -> state
taskHealth map[string]*taskHealthState

// taskEnv is used to interpolate dynamic values in services.
taskEnv *taskenv.TaskEnv

// logger is for logging things
logger hclog.Logger
}
Expand All @@ -111,6 +115,7 @@ func NewTracker(
logger hclog.Logger,
alloc *structs.Allocation,
allocUpdates *cstructs.AllocListener,
taskEnv *taskenv.TaskEnv,
consulClient serviceregistration.Handler,
checkStore checkstore.Shim,
minHealthyTime time.Duration,
Expand All @@ -122,6 +127,7 @@ func NewTracker(
allocStopped: make(chan struct{}),
alloc: alloc,
tg: alloc.Job.LookupTaskGroup(alloc.TaskGroup),
taskEnv: taskEnv,
minHealthyTime: minHealthyTime,
useChecks: useChecks,
allocUpdates: allocUpdates,
Expand Down Expand Up @@ -504,7 +510,7 @@ OUTER:
passed := true

// scan for missing or unhealthy consul checks
if !evaluateConsulChecks(t.tg, allocReg) {
if !evaluateConsulChecks(t.alloc, t.tg, t.taskEnv, allocReg) {
t.setCheckHealth(false)
passed = false
}
Expand All @@ -523,14 +529,20 @@ OUTER:
}
}

func evaluateConsulChecks(tg *structs.TaskGroup, registrations *serviceregistration.AllocRegistration) bool {
func evaluateConsulChecks(
alloc *structs.Allocation,
tg *structs.TaskGroup,
taskEnv *taskenv.TaskEnv,
registrations *serviceregistration.AllocRegistration,
) bool {
// First, identify any case where a check definition is missing or outdated
// on the Consul side. Note that because check names are not unique, we must
// also keep track of the counts on each side and make sure those also match.
services := tg.ConsulServices()
services := taskenv.InterpolateServices(taskEnv, tg.ConsulServices())
expChecks := make(map[string]int)
regChecks := make(map[string]int)
for _, service := range services {
service.Canonicalize(alloc.JobID, alloc.TaskGroup, service.TaskName, alloc.Namespace)
for _, check := range service.Checks {
expChecks[check.Name]++
}
Expand Down
107 changes: 94 additions & 13 deletions client/allochealth/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
regmock "github.com/hashicorp/nomad/client/serviceregistration/mock"
"github.com/hashicorp/nomad/client/state"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/client/taskenv"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
Expand Down Expand Up @@ -83,7 +84,9 @@ func TestTracker_ConsulChecks_Healthy(t *testing.T) {

checks := checkstore.NewStore(logger, state.NewMemDB(logger))
checkInterval := 10 * time.Millisecond
tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul, checks, time.Millisecond, true)
taskEnv := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()

tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnv, consul, checks, time.Millisecond, true)
tracker.checkLookupInterval = checkInterval
tracker.Start()

Expand Down Expand Up @@ -134,7 +137,9 @@ func TestTracker_NomadChecks_Healthy(t *testing.T) {

consul := regmock.NewServiceRegistrationHandler(logger)
checkInterval := 10 * time.Millisecond
tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul, checks, time.Millisecond, true)
taskEnv := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()

tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnv, consul, checks, time.Millisecond, true)
tracker.checkLookupInterval = checkInterval
tracker.Start()

Expand Down Expand Up @@ -201,7 +206,9 @@ func TestTracker_NomadChecks_Unhealthy(t *testing.T) {

consul := regmock.NewServiceRegistrationHandler(logger)
checkInterval := 10 * time.Millisecond
tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul, checks, time.Millisecond, true)
taskEnv := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()

tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnv, consul, checks, time.Millisecond, true)
tracker.checkLookupInterval = checkInterval
tracker.Start()

Expand Down Expand Up @@ -260,7 +267,9 @@ func TestTracker_Checks_PendingPostStop_Healthy(t *testing.T) {

checks := checkstore.NewStore(logger, state.NewMemDB(logger))
checkInterval := 10 * time.Millisecond
tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul, checks, time.Millisecond, true)
taskEnv := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()

tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnv, consul, checks, time.Millisecond, true)
tracker.checkLookupInterval = checkInterval
tracker.Start()

Expand Down Expand Up @@ -301,7 +310,9 @@ func TestTracker_Succeeded_PostStart_Healthy(t *testing.T) {

checks := checkstore.NewStore(logger, state.NewMemDB(logger))
checkInterval := 10 * time.Millisecond
tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul, checks, alloc.Job.TaskGroups[0].Migrate.MinHealthyTime, true)
taskEnv := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()

tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnv, consul, checks, alloc.Job.TaskGroups[0].Migrate.MinHealthyTime, true)
tracker.checkLookupInterval = checkInterval
tracker.Start()

Expand Down Expand Up @@ -380,7 +391,9 @@ func TestTracker_ConsulChecks_Unhealthy(t *testing.T) {

checks := checkstore.NewStore(logger, state.NewMemDB(logger))
checkInterval := 10 * time.Millisecond
tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul, checks, time.Millisecond, true)
taskEnv := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()

tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnv, consul, checks, time.Millisecond, true)
tracker.checkLookupInterval = checkInterval
tracker.Start()

Expand Down Expand Up @@ -459,7 +472,9 @@ func TestTracker_ConsulChecks_HealthyToUnhealthy(t *testing.T) {
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
checkInterval := 10 * time.Millisecond
minHealthyTime := 2 * time.Second
tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul, checks, minHealthyTime, true)
taskEnv := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()

tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnv, consul, checks, minHealthyTime, true)
tracker.checkLookupInterval = checkInterval

assertChecksHealth := func(exp bool) {
Expand Down Expand Up @@ -548,7 +563,9 @@ func TestTracker_ConsulChecks_SlowCheckRegistration(t *testing.T) {
consul := regmock.NewServiceRegistrationHandler(logger)
checks := checkstore.NewStore(logger, state.NewMemDB(logger))
checkInterval := 10 * time.Millisecond
tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul, checks, time.Millisecond, true)
taskEnv := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()

tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnv, consul, checks, time.Millisecond, true)
tracker.checkLookupInterval = checkInterval

assertChecksHealth := func(exp bool) {
Expand Down Expand Up @@ -599,7 +616,8 @@ func TestTracker_Healthy_IfBothTasksAndConsulChecksAreHealthy(t *testing.T) {
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()

tracker := NewTracker(ctx, logger, alloc, nil, nil, nil, time.Millisecond, true)
taskEnv := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()
tracker := NewTracker(ctx, logger, alloc, nil, taskEnv, nil, nil, time.Millisecond, true)

assertNoHealth := func() {
require.NoError(t, tracker.ctx.Err())
Expand Down Expand Up @@ -708,7 +726,9 @@ func TestTracker_Checks_Healthy_Before_TaskHealth(t *testing.T) {

checks := checkstore.NewStore(logger, state.NewMemDB(logger))
checkInterval := 10 * time.Millisecond
tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul, checks, time.Millisecond, true)
taskEnv := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()

tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnv, consul, checks, time.Millisecond, true)
tracker.checkLookupInterval = checkInterval
tracker.Start()

Expand Down Expand Up @@ -853,7 +873,9 @@ func TestTracker_ConsulChecks_OnUpdate(t *testing.T) {

checks := checkstore.NewStore(logger, state.NewMemDB(logger))
checkInterval := 10 * time.Millisecond
tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul, checks, time.Millisecond, true)
taskEnv := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()

tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnv, consul, checks, time.Millisecond, true)
tracker.checkLookupInterval = checkInterval
tracker.Start()

Expand Down Expand Up @@ -971,7 +993,9 @@ func TestTracker_NomadChecks_OnUpdate(t *testing.T) {

consul := regmock.NewServiceRegistrationHandler(logger)
minHealthyTime := 1 * time.Millisecond
tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul, checks, minHealthyTime, true)
taskEnv := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()

tracker := NewTracker(ctx, logger, alloc, b.Listen(), taskEnv, consul, checks, minHealthyTime, true)
tracker.checkLookupInterval = 10 * time.Millisecond
tracker.Start()

Expand Down Expand Up @@ -1273,11 +1297,68 @@ func TestTracker_evaluateConsulChecks(t *testing.T) {
},
},
},
{
name: "checks with variable interpolation",
exp: true,
tg: &structs.TaskGroup{
Services: []*structs.Service{{
Name: "${TASKGROUP}-group-s1-${NOMAD_REGION}",
Checks: []*structs.ServiceCheck{
{Name: ""},
},
}},
Tasks: []*structs.Task{
{
Services: []*structs.Service{
{
Name: "${TASK}-task-s2-${NOMAD_NAMESPACE}",
TaskName: "web",
Checks: []*structs.ServiceCheck{
{Name: ""},
},
},
},
},
},
},
registrations: &serviceregistration.AllocRegistration{
Tasks: map[string]*serviceregistration.ServiceRegistrations{
"group": {
Services: map[string]*serviceregistration.ServiceRegistration{
"abc123": {
ServiceID: "abc123",
Checks: []*consulapi.AgentCheck{
{
Name: `service: "web-group-s1-global" check`,
Status: consulapi.HealthPassing,
},
},
},
},
},
"task": {
Services: map[string]*serviceregistration.ServiceRegistration{
"def234": {
ServiceID: "def234",
Checks: []*consulapi.AgentCheck{
{
Name: `service: "web-task-s2-default" check`,
Status: consulapi.HealthPassing,
},
},
},
},
},
},
},
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
result := evaluateConsulChecks(tc.tg, tc.registrations)
alloc := mock.Alloc()
taskEnv := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build()
result := evaluateConsulChecks(alloc, tc.tg, taskEnv, tc.registrations)
must.Eq(t, tc.exp, result)
})
}
Expand Down
4 changes: 2 additions & 2 deletions client/allocrunner/alloc_runner_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error {
config.Node, ar.Alloc(), nil, config.Region).SetAllocDir(ar.allocDir.AllocDir)

// Create a taskenv.TaskEnv which is used for read only purposes by the
// newNetworkHook.
// hooks that need to interpolate dynamic values.
builtTaskEnv := envBuilder.Build()

// Create the alloc directory hook. This is run first to ensure the
Expand All @@ -149,7 +149,7 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error {
newCgroupHook(ar.Alloc(), ar.cpusetManager),
newUpstreamAllocsHook(hookLogger, ar.prevAllocWatcher),
newDiskMigrationHook(hookLogger, ar.prevAllocMigrator, ar.allocDir),
newAllocHealthWatcherHook(hookLogger, alloc, hs, ar.Listener(), ar.consulClient, ar.checkStore),
newAllocHealthWatcherHook(hookLogger, alloc, builtTaskEnv, hs, ar.Listener(), ar.consulClient, ar.checkStore),
newNetworkHook(hookLogger, ns, alloc, nm, nc, ar, builtTaskEnv),
newGroupServiceHook(groupServiceHookConfig{
alloc: alloc,
Expand Down
18 changes: 15 additions & 3 deletions client/allocrunner/health_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/hashicorp/nomad/client/serviceregistration"
"github.com/hashicorp/nomad/client/serviceregistration/checks/checkstore"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/client/taskenv"
"github.com/hashicorp/nomad/nomad/structs"
)

Expand Down Expand Up @@ -64,15 +65,25 @@ type allocHealthWatcherHook struct {
// alloc set by new func or Update. Must hold hookLock to access.
alloc *structs.Allocation

// taskEnv is used to interpolate dynamic values in services.
taskEnv *taskenv.TaskEnv

// isDeploy is true if monitoring a deployment. Set in init(). Must
// hold hookLock to access.
isDeploy bool

logger hclog.Logger
}

func newAllocHealthWatcherHook(logger hclog.Logger, alloc *structs.Allocation, hs healthSetter,
listener *cstructs.AllocListener, consul serviceregistration.Handler, checkStore checkstore.Shim) interfaces.RunnerHook {
func newAllocHealthWatcherHook(
logger hclog.Logger,
alloc *structs.Allocation,
taskEnv *taskenv.TaskEnv,
hs healthSetter,
listener *cstructs.AllocListener,
consul serviceregistration.Handler,
checkStore checkstore.Shim,
) interfaces.RunnerHook {

// Neither deployments nor migrations care about the health of
// non-service jobs so never watch their health
Expand All @@ -86,6 +97,7 @@ func newAllocHealthWatcherHook(logger hclog.Logger, alloc *structs.Allocation, h

h := &allocHealthWatcherHook{
alloc: alloc,
taskEnv: taskEnv,
cancelFn: func() {}, // initialize to prevent nil func panics
watchDone: closedDone,
consul: consul,
Expand Down Expand Up @@ -138,7 +150,7 @@ func (h *allocHealthWatcherHook) init() error {
h.logger.Trace("watching", "deadline", deadline, "checks", useChecks, "min_healthy_time", minHealthyTime)
// Create a new tracker, start it, and watch for health results.
tracker := allochealth.NewTracker(
ctx, h.logger, h.alloc, h.listener, h.consul, h.checkStore, minHealthyTime, useChecks,
ctx, h.logger, h.alloc, h.listener, h.taskEnv, h.consul, h.checkStore, minHealthyTime, useChecks,
)
tracker.Start()

Expand Down
Loading

0 comments on commit 0765b15

Please sign in to comment.