Skip to content

Commit

Permalink
consul: register checks along with service on initial registration (#…
Browse files Browse the repository at this point in the history
…14944)

* consul: register checks along with service on initial registration

This PR updates Nomad's Consul service client to include checks in
an initial service registration, so that the checks associated with
the service are registered "atomically" with the service. Before, we
would only register the checks after the service registration, which
causes problems where the service is deemed healthy, even if one or
more checks are unhealthy - especially problematic in the case where
SuccessBeforePassing is configured.

Fixes #3935

* cr: followup to fix cause of extra consul logging

* cr: fix another bug

* cr: fixup changelog
  • Loading branch information
shoenig committed Oct 19, 2022
1 parent d59dc3d commit faac908
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 65 deletions.
3 changes: 3 additions & 0 deletions .changelog/14944.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
consul: atomically register checks on initial service registration
```
2 changes: 1 addition & 1 deletion client/serviceregistration/service_registration.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ type ServiceRegistration struct {
// services/checks registered in Consul. It is used to materialize the other
// fields when queried.
ServiceID string
CheckIDs map[string]struct{}
CheckIDs map[string]struct{} // todo: use a Set?

// CheckOnUpdate is a map of checkIDs and the associated OnUpdate value
// from the ServiceCheck It is used to determine how a reported checks
Expand Down
36 changes: 35 additions & 1 deletion command/agent/consul/catalog_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,11 @@ func (c *MockAgent) CheckRegs() []*api.AgentCheckRegistration {
func (c *MockAgent) CheckRegister(check *api.AgentCheckRegistration) error {
c.mu.Lock()
defer c.mu.Unlock()
return c.checkRegister(check)
}

// checkRegister registers a check; c.mu must be held.
func (c *MockAgent) checkRegister(check *api.AgentCheckRegistration) error {
c.hits++

// Consul will set empty Namespace to default, do the same here
Expand All @@ -275,14 +280,29 @@ func (c *MockAgent) CheckRegister(check *api.AgentCheckRegistration) error {
if c.checks[check.Namespace] == nil {
c.checks[check.Namespace] = make(map[string]*api.AgentCheckRegistration)
}

c.checks[check.Namespace][check.ID] = check

// Be nice and make checks reachable-by-service
serviceCheck := check.AgentServiceCheck

if c.services[check.Namespace] == nil {
c.services[check.Namespace] = make(map[string]*api.AgentServiceRegistration)
}
c.services[check.Namespace][check.ServiceID].Checks = append(c.services[check.Namespace][check.ServiceID].Checks, &serviceCheck)

// replace existing check if one with same id already exists
replace := false
for i := 0; i < len(c.services[check.Namespace][check.ServiceID].Checks); i++ {
if c.services[check.Namespace][check.ServiceID].Checks[i].CheckID == check.CheckID {
c.services[check.Namespace][check.ServiceID].Checks[i] = &check.AgentServiceCheck
replace = true
break
}
}

if !replace {
c.services[check.Namespace][check.ServiceID].Checks = append(c.services[check.Namespace][check.ServiceID].Checks, &serviceCheck)
}
return nil
}

Expand Down Expand Up @@ -315,6 +335,20 @@ func (c *MockAgent) ServiceRegister(service *api.AgentServiceRegistration) error
c.services[service.Namespace] = make(map[string]*api.AgentServiceRegistration)
}
c.services[service.Namespace][service.ID] = service

// as of Nomad v1.4.x registering service now also registers its checks
for _, check := range service.Checks {
if err := c.checkRegister(&api.AgentCheckRegistration{
ID: check.CheckID,
Name: check.Name,
ServiceID: service.ID,
AgentServiceCheck: *check,
Namespace: service.Namespace,
}); err != nil {
return err
}
}

return nil
}

Expand Down
62 changes: 53 additions & 9 deletions command/agent/consul/service_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,12 +212,19 @@ func maybeTweakTags(wanted *api.AgentServiceRegistration, existing *api.AgentSer
}
}

// maybeTweakTaggedAddresses will remove the .TaggedAddresses fields from existing
// if wanted represents a Nomad agent (Client or Server). We do this because Consul
// sets the TaggedAddress on these legacy registrations for us
// maybeTweakTaggedAddresses will remove the Consul-injected .TaggedAddresses fields
// from existing if wanted represents a Nomad agent (Client or Server) or Nomad managed
// service, which do not themselves configure those tagged addresses. We do this
// because Consul will magically set the .TaggedAddress to values Nomad does not
// know about if they are submitted as unset.
func maybeTweakTaggedAddresses(wanted *api.AgentServiceRegistration, existing *api.AgentService) {
if isNomadAgent(wanted.ID) && len(wanted.TaggedAddresses) == 0 {
existing.TaggedAddresses = nil
if isNomadAgent(wanted.ID) || isNomadService(wanted.ID) {
if _, exists := wanted.TaggedAddresses["lan_ipv4"]; !exists {
delete(existing.TaggedAddresses, "lan_ipv4")
}
if _, exists := wanted.TaggedAddresses["wan_ipv4"]; !exists {
delete(existing.TaggedAddresses, "wan_ipv4")
}
}
}

Expand Down Expand Up @@ -973,8 +980,10 @@ func (c *ServiceClient) RegisterAgent(role string, services []*structs.Service)
// checks from a service. It returns a service registration object with the
// service and check IDs populated.
func (c *ServiceClient) serviceRegs(
ops *operations, service *structs.Service, workload *serviceregistration.WorkloadServices) (
*serviceregistration.ServiceRegistration, error) {
ops *operations,
service *structs.Service,
workload *serviceregistration.WorkloadServices,
) (*serviceregistration.ServiceRegistration, error) {

// Get the services ID
id := serviceregistration.MakeAllocServiceID(workload.AllocInfo.AllocID, workload.Name(), service)
Expand Down Expand Up @@ -1090,6 +1099,7 @@ func (c *ServiceClient) serviceRegs(
TaggedAddresses: taggedAddresses,
Connect: connect, // will be nil if no Connect stanza
Proxy: gateway, // will be nil if no Connect Gateway stanza
Checks: make([]*api.AgentServiceCheck, 0, len(service.Checks)),
}
ops.regServices = append(ops.regServices, serviceReg)

Expand All @@ -1098,17 +1108,51 @@ func (c *ServiceClient) serviceRegs(
if err != nil {
return nil, err
}

for _, registration := range checkRegs {
sreg.CheckIDs[registration.ID] = struct{}{}
ops.regChecks = append(ops.regChecks, registration)
serviceReg.Checks = append(
serviceReg.Checks,
apiCheckRegistrationToCheck(registration),
)
}

return sreg, nil
}

// apiCheckRegistrationToCheck converts a check registration to a check, so that
// we can include them in the initial service registration. It is expected the
// Nomad-conversion (e.g. turning script checks into ttl checks) has already been
// applied.
func apiCheckRegistrationToCheck(r *api.AgentCheckRegistration) *api.AgentServiceCheck {
return &api.AgentServiceCheck{
CheckID: r.ID,
Name: r.Name,
Interval: r.Interval,
Timeout: r.Timeout,
TTL: r.TTL,
HTTP: r.HTTP,
Header: maps.Clone(r.Header),
Method: r.Method,
Body: r.Body,
TCP: r.TCP,
Status: r.Status,
TLSSkipVerify: r.TLSSkipVerify,
GRPC: r.GRPC,
GRPCUseTLS: r.GRPCUseTLS,
SuccessBeforePassing: r.SuccessBeforePassing,
FailuresBeforeCritical: r.FailuresBeforeCritical,
}
}

// checkRegs creates check registrations for the given service
func (c *ServiceClient) checkRegs(serviceID string, service *structs.Service,
workload *serviceregistration.WorkloadServices, sreg *serviceregistration.ServiceRegistration) ([]*api.AgentCheckRegistration, error) {
func (c *ServiceClient) checkRegs(
serviceID string,
service *structs.Service,
workload *serviceregistration.WorkloadServices,
sreg *serviceregistration.ServiceRegistration,
) ([]*api.AgentCheckRegistration, error) {

registrations := make([]*api.AgentCheckRegistration, 0, len(service.Checks))
for _, check := range service.Checks {
Expand Down
100 changes: 46 additions & 54 deletions command/agent/consul/unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/drivers"
"github.com/kr/pretty"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -211,7 +212,6 @@ func TestConsul_ChangePorts(t *testing.T) {
ci.Parallel(t)

ctx := setupFake(t)
require := require.New(t)

ctx.Workload.Services[0].Checks = []*structs.ServiceCheck{
{
Expand All @@ -238,17 +238,17 @@ func TestConsul_ChangePorts(t *testing.T) {
},
}

require.NoError(ctx.ServiceClient.RegisterWorkload(ctx.Workload))
require.NoError(ctx.syncOnce(syncNewOps))
require.Equal(1, len(ctx.FakeConsul.services["default"]), "Expected 1 service to be registered with Consul")
must.NoError(t, ctx.ServiceClient.RegisterWorkload(ctx.Workload))
must.NoError(t, ctx.syncOnce(syncNewOps))
must.MapLen(t, 1, ctx.FakeConsul.services["default"])

for _, v := range ctx.FakeConsul.services["default"] {
require.Equal(ctx.Workload.Services[0].Name, v.Name)
require.Equal(ctx.Workload.Services[0].Tags, v.Tags)
require.Equal(xPort, v.Port)
must.Eq(t, ctx.Workload.Services[0].Name, v.Name)
must.Eq(t, ctx.Workload.Services[0].Tags, v.Tags)
must.Eq(t, xPort, v.Port)
}

require.Len(ctx.FakeConsul.checks["default"], 3)
must.MapLen(t, 3, ctx.FakeConsul.checks["default"], must.Sprintf("checks %#v", ctx.FakeConsul.checks))

origTCPKey := ""
origScriptKey := ""
Expand All @@ -257,20 +257,20 @@ func TestConsul_ChangePorts(t *testing.T) {
switch v.Name {
case "c1":
origTCPKey = k
require.Equal(fmt.Sprintf(":%d", xPort), v.TCP)
must.Eq(t, fmt.Sprintf(":%d", xPort), v.TCP)
case "c2":
origScriptKey = k
case "c3":
origHTTPKey = k
require.Equal(fmt.Sprintf("http://:%d/", yPort), v.HTTP)
must.Eq(t, fmt.Sprintf("http://:%d/", yPort), v.HTTP)
default:
t.Fatalf("unexpected check: %q", v.Name)
}
}

require.NotEmpty(origTCPKey)
require.NotEmpty(origScriptKey)
require.NotEmpty(origHTTPKey)
must.StrHasPrefix(t, origTCPKey, "_nomad-check-")
must.StrHasPrefix(t, origScriptKey, "_nomad-check-")
must.StrHasPrefix(t, origHTTPKey, "_nomad-check-")

// Now update the PortLabel on the Service and Check c3
origWorkload := ctx.Workload.Copy()
Expand Down Expand Up @@ -300,32 +300,31 @@ func TestConsul_ChangePorts(t *testing.T) {
},
}

require.NoError(ctx.ServiceClient.UpdateWorkload(origWorkload, ctx.Workload))
require.NoError(ctx.syncOnce(syncNewOps))
require.Equal(1, len(ctx.FakeConsul.services["default"]), "Expected 1 service to be registered with Consul")
must.NoError(t, ctx.ServiceClient.UpdateWorkload(origWorkload, ctx.Workload))
must.NoError(t, ctx.syncOnce(syncNewOps))
must.MapLen(t, 1, ctx.FakeConsul.services["default"])

for _, v := range ctx.FakeConsul.services["default"] {
require.Equal(ctx.Workload.Services[0].Name, v.Name)
require.Equal(ctx.Workload.Services[0].Tags, v.Tags)
require.Equal(yPort, v.Port)
must.Eq(t, ctx.Workload.Services[0].Name, v.Name)
must.Eq(t, ctx.Workload.Services[0].Tags, v.Tags)
must.Eq(t, yPort, v.Port)
}

require.Equal(3, len(ctx.FakeConsul.checks["default"]))
must.MapLen(t, 3, ctx.FakeConsul.checks["default"])

for k, v := range ctx.FakeConsul.checks["default"] {
switch v.Name {
case "c1":
// C1 is changed because the service was re-registered
require.NotEqual(origTCPKey, k)
require.Equal(fmt.Sprintf(":%d", xPort), v.TCP)
must.NotEq(t, origTCPKey, k)
must.Eq(t, fmt.Sprintf(":%d", xPort), v.TCP)
case "c2":
// C2 is changed because the service was re-registered
require.NotEqual(origScriptKey, k)
must.NotEq(t, origScriptKey, k)
case "c3":
require.NotEqual(origHTTPKey, k)
require.Equal(fmt.Sprintf("http://:%d/", yPort), v.HTTP)
must.NotEq(t, origHTTPKey, k)
must.Eq(t, fmt.Sprintf("http://:%d/", yPort), v.HTTP)
default:
t.Errorf("Unknown check: %q", k)
must.Unreachable(t, must.Sprintf("unknown check: %q", k))
}
}
}
Expand Down Expand Up @@ -981,7 +980,7 @@ func TestCreateCheckReg_GRPC(t *testing.T) {
expected := &api.AgentCheckRegistration{
Namespace: "",
ID: checkID,
Name: "name",
Name: check.Name,
ServiceID: serviceID,
AgentServiceCheck: api.AgentServiceCheck{
Timeout: "1s",
Expand All @@ -993,23 +992,19 @@ func TestCreateCheckReg_GRPC(t *testing.T) {
}

actual, err := createCheckReg(serviceID, checkID, check, "localhost", 8080, "default")
require.NoError(t, err)
require.Equal(t, expected, actual)
must.NoError(t, err)
must.Eq(t, expected, actual)
}

func TestConsul_ServiceName_Duplicates(t *testing.T) {
ci.Parallel(t)

ctx := setupFake(t)
require := require.New(t)

ctx.Workload.Services = []*structs.Service{
{
Name: "best-service",
PortLabel: "x",
Tags: []string{
"foo",
},
Tags: []string{"foo"},
Checks: []*structs.ServiceCheck{
{
Name: "check-a",
Expand All @@ -1022,12 +1017,10 @@ func TestConsul_ServiceName_Duplicates(t *testing.T) {
{
Name: "best-service",
PortLabel: "y",
Tags: []string{
"bar",
},
Tags: []string{"bar"},
Checks: []*structs.ServiceCheck{
{
Name: "checky-mccheckface",
Name: "check-b",
Type: "tcp",
Interval: time.Second,
Timeout: time.Second,
Expand All @@ -1040,21 +1033,20 @@ func TestConsul_ServiceName_Duplicates(t *testing.T) {
},
}

require.NoError(ctx.ServiceClient.RegisterWorkload(ctx.Workload))

require.NoError(ctx.syncOnce(syncNewOps))

require.Len(ctx.FakeConsul.services["default"], 3)

for _, v := range ctx.FakeConsul.services["default"] {
if v.Name == ctx.Workload.Services[0].Name && v.Port == xPort {
require.ElementsMatch(v.Tags, ctx.Workload.Services[0].Tags)
require.Len(v.Checks, 1)
} else if v.Name == ctx.Workload.Services[1].Name && v.Port == yPort {
require.ElementsMatch(v.Tags, ctx.Workload.Services[1].Tags)
require.Len(v.Checks, 1)
} else if v.Name == ctx.Workload.Services[2].Name {
require.Len(v.Checks, 0)
must.NoError(t, ctx.ServiceClient.RegisterWorkload(ctx.Workload))
must.NoError(t, ctx.syncOnce(syncNewOps))
must.MapLen(t, 3, ctx.FakeConsul.services["default"])

for _, s := range ctx.FakeConsul.services["default"] {
switch {
case s.Name == "best-service" && s.Port == xPort:
must.SliceContainsAll(t, s.Tags, ctx.Workload.Services[0].Tags)
must.SliceLen(t, 1, s.Checks)
case s.Name == "best-service" && s.Port == yPort:
must.SliceContainsAll(t, s.Tags, ctx.Workload.Services[1].Tags)
must.SliceLen(t, 1, s.Checks)
case s.Name == "worst-service":
must.SliceEmpty(t, s.Checks)
}
}
}
Expand Down

0 comments on commit faac908

Please sign in to comment.