Skip to content

Commit

Permalink
Merge pull request #5536 from hashicorp/dani/consul
Browse files Browse the repository at this point in the history
Consul Catalog Integration Fixes
  • Loading branch information
endocrimes committed May 9, 2019
2 parents 7ba2378 + 3d3b2ca commit 62274e9
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 182 deletions.
70 changes: 53 additions & 17 deletions command/agent/consul/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"net"
"net/url"
"reflect"
"strconv"
"strings"
"sync"
Expand All @@ -29,6 +30,10 @@ const (
// for tasks.
nomadTaskPrefix = nomadServicePrefix + "-task-"

// nomadCheckPrefix is the prefix that scopes Nomad registered checks for
// services.
nomadCheckPrefix = nomadServicePrefix + "-check-"

// defaultRetryInterval is how quickly to retry syncing services and
// checks to Consul when an error occurs. Will backoff up to a max.
defaultRetryInterval = time.Second
Expand Down Expand Up @@ -83,6 +88,15 @@ type AgentAPI interface {
UpdateTTL(id, output, status string) error
}

func agentServiceUpdateRequired(reg *api.AgentServiceRegistration, svc *api.AgentService) bool {
return !(reg.Kind == svc.Kind &&
reg.ID == svc.ID &&
reg.Port == svc.Port &&
reg.Address == svc.Address &&
reg.Name == svc.Service &&
reflect.DeepEqual(reg.Tags, svc.Tags))
}

// operations are submitted to the main loop via commit() for synchronizing
// with Consul.
type operations struct {
Expand Down Expand Up @@ -466,16 +480,26 @@ func (c *ServiceClient) sync() error {
metrics.IncrCounter([]string{"client", "consul", "service_deregistrations"}, 1)
}

// Add Nomad services missing from Consul
// Add Nomad services missing from Consul, or where the service has been updated.
for id, locals := range c.services {
if _, ok := consulServices[id]; !ok {
if err = c.client.ServiceRegister(locals); err != nil {
metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1)
return err
existingSvc, ok := consulServices[id]

if ok {
// There is an existing registration of this service in Consul, so here
// we validate to see if the service has been invalidated to see if it
// should be updated.
if !agentServiceUpdateRequired(locals, existingSvc) {
// No Need to update services that have not changed
continue
}
sreg++
metrics.IncrCounter([]string{"client", "consul", "service_registrations"}, 1)
}

if err = c.client.ServiceRegister(locals); err != nil {
metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1)
return err
}
sreg++
metrics.IncrCounter([]string{"client", "consul", "service_registrations"}, 1)
}

// Remove Nomad checks in Consul but unknown locally
Expand All @@ -489,7 +513,7 @@ func (c *ServiceClient) sync() error {
// Nomad managed checks if this is not a client agent.
// This is to prevent server agents from removing checks
// registered by client agents
if !isNomadService(check.ServiceID) || !c.isClientAgent {
if !isNomadService(check.ServiceID) || !c.isClientAgent || !isNomadCheck(check.CheckID) {
// Service not managed by Nomad, skip
continue
}
Expand Down Expand Up @@ -809,10 +833,10 @@ func (c *ServiceClient) UpdateTask(old, newTask *TaskServices) error {
newIDs[makeTaskServiceID(newTask.AllocID, newTask.Name, s, newTask.Canary)] = s
}

// Loop over existing Service IDs to see if they have been removed or
// updated.
// Loop over existing Service IDs to see if they have been removed
for existingID, existingSvc := range existingIDs {
newSvc, ok := newIDs[existingID]

if !ok {
// Existing service entry removed
ops.deregServices = append(ops.deregServices, existingID)
Expand All @@ -828,8 +852,12 @@ func (c *ServiceClient) UpdateTask(old, newTask *TaskServices) error {
continue
}

// Service exists and hasn't changed, don't re-add it later
delete(newIDs, existingID)
oldHash := existingSvc.Hash(old.AllocID, old.Name, old.Canary)
newHash := newSvc.Hash(newTask.AllocID, newTask.Name, newTask.Canary)
if oldHash == newHash {
// Service exists and hasn't changed, don't re-add it later
delete(newIDs, existingID)
}

// Service still exists so add it to the task's registration
sreg := &ServiceRegistration{
Expand All @@ -848,7 +876,8 @@ func (c *ServiceClient) UpdateTask(old, newTask *TaskServices) error {
for _, check := range newSvc.Checks {
checkID := makeCheckID(existingID, check)
if _, exists := existingChecks[checkID]; exists {
// Check exists, so don't remove it
// Check is still required. Remove it from the map so it doesn't get
// deleted later.
delete(existingChecks, checkID)
sreg.checkIDs[checkID] = struct{}{}
}
Expand All @@ -861,7 +890,6 @@ func (c *ServiceClient) UpdateTask(old, newTask *TaskServices) error {

for _, checkID := range newCheckIDs {
sreg.checkIDs[checkID] = struct{}{}

}

// Update all watched checks as CheckRestart fields aren't part of ID
Expand Down Expand Up @@ -1082,14 +1110,16 @@ func makeAgentServiceID(role string, service *structs.Service) string {
// Consul. All structs.Service fields are included in the ID's hash except
// Checks. This allows updates to merely compare IDs.
//
// Example Service ID: _nomad-task-TNM333JKJPM5AK4FAS3VXQLXFDWOF4VH
// Example Service ID: _nomad-task-b4e61df9-b095-d64e-f241-23860da1375f-redis-http
func makeTaskServiceID(allocID, taskName string, service *structs.Service, canary bool) string {
return nomadTaskPrefix + service.Hash(allocID, taskName, canary)
return fmt.Sprintf("%s%s-%s-%s", nomadTaskPrefix, allocID, taskName, service.Name)
}

// makeCheckID creates a unique ID for a check.
//
// Example Check ID: _nomad-check-434ae42f9a57c5705344974ac38de2aee0ee089d
func makeCheckID(serviceID string, check *structs.ServiceCheck) string {
return check.Hash(serviceID)
return fmt.Sprintf("%s%s", nomadCheckPrefix, check.Hash(serviceID))
}

// createCheckReg creates a Check that can be registered with Consul.
Expand Down Expand Up @@ -1154,6 +1184,12 @@ func createCheckReg(serviceID, checkID string, check *structs.ServiceCheck, host
return &chkReg, nil
}

// isNomadCheck returns true if the ID matches the pattern of a Nomad managed
// check.
func isNomadCheck(id string) bool {
return strings.HasPrefix(id, nomadCheckPrefix)
}

// isNomadService returns true if the ID matches the pattern of a Nomad managed
// service (new or old formats). Agent services return false as independent
// client and server agents may be running on the same machine. #2827
Expand Down
Loading

0 comments on commit 62274e9

Please sign in to comment.