Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consul Catalog Integration Fixes #5536

Merged
merged 3 commits into from
May 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 53 additions & 17 deletions command/agent/consul/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"net"
"net/url"
"reflect"
"strconv"
"strings"
"sync"
Expand All @@ -29,6 +30,10 @@ const (
// for tasks.
nomadTaskPrefix = nomadServicePrefix + "-task-"

// nomadCheckPrefix is the prefix that scopes Nomad registered checks for
// services.
nomadCheckPrefix = nomadServicePrefix + "-check-"

// defaultRetryInterval is how quickly to retry syncing services and
// checks to Consul when an error occurs. Will backoff up to a max.
defaultRetryInterval = time.Second
Expand Down Expand Up @@ -83,6 +88,15 @@ type AgentAPI interface {
UpdateTTL(id, output, status string) error
}

func agentServiceUpdateRequired(reg *api.AgentServiceRegistration, svc *api.AgentService) bool {
return !(reg.Kind == svc.Kind &&
reg.ID == svc.ID &&
reg.Port == svc.Port &&
reg.Address == svc.Address &&
reg.Name == svc.Service &&
reflect.DeepEqual(reg.Tags, svc.Tags))
}

// operations are submitted to the main loop via commit() for synchronizing
// with Consul.
type operations struct {
Expand Down Expand Up @@ -466,16 +480,26 @@ func (c *ServiceClient) sync() error {
metrics.IncrCounter([]string{"client", "consul", "service_deregistrations"}, 1)
}

// Add Nomad services missing from Consul
// Add Nomad services missing from Consul, or where the service has been updated.
for id, locals := range c.services {
if _, ok := consulServices[id]; !ok {
if err = c.client.ServiceRegister(locals); err != nil {
metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1)
return err
existingSvc, ok := consulServices[id]

if ok {
// There is an existing registration of this service in Consul, so here
// we validate to see if the service has been invalidated to see if it
// should be updated.
if !agentServiceUpdateRequired(locals, existingSvc) {
// No Need to update services that have not changed
continue
}
sreg++
metrics.IncrCounter([]string{"client", "consul", "service_registrations"}, 1)
}

if err = c.client.ServiceRegister(locals); err != nil {
metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1)
return err
}
sreg++
metrics.IncrCounter([]string{"client", "consul", "service_registrations"}, 1)
}

// Remove Nomad checks in Consul but unknown locally
Expand All @@ -489,7 +513,7 @@ func (c *ServiceClient) sync() error {
// Nomad managed checks if this is not a client agent.
// This is to prevent server agents from removing checks
// registered by client agents
if !isNomadService(check.ServiceID) || !c.isClientAgent {
if !isNomadService(check.ServiceID) || !c.isClientAgent || !isNomadCheck(check.CheckID) {
// Service not managed by Nomad, skip
continue
}
Expand Down Expand Up @@ -809,10 +833,10 @@ func (c *ServiceClient) UpdateTask(old, newTask *TaskServices) error {
newIDs[makeTaskServiceID(newTask.AllocID, newTask.Name, s, newTask.Canary)] = s
}

// Loop over existing Service IDs to see if they have been removed or
// updated.
// Loop over existing Service IDs to see if they have been removed
for existingID, existingSvc := range existingIDs {
newSvc, ok := newIDs[existingID]

if !ok {
// Existing service entry removed
ops.deregServices = append(ops.deregServices, existingID)
Expand All @@ -828,8 +852,12 @@ func (c *ServiceClient) UpdateTask(old, newTask *TaskServices) error {
continue
}

// Service exists and hasn't changed, don't re-add it later
delete(newIDs, existingID)
oldHash := existingSvc.Hash(old.AllocID, old.Name, old.Canary)
newHash := newSvc.Hash(newTask.AllocID, newTask.Name, newTask.Canary)
if oldHash == newHash {
// Service exists and hasn't changed, don't re-add it later
delete(newIDs, existingID)
}

// Service still exists so add it to the task's registration
sreg := &ServiceRegistration{
Expand All @@ -848,7 +876,8 @@ func (c *ServiceClient) UpdateTask(old, newTask *TaskServices) error {
for _, check := range newSvc.Checks {
checkID := makeCheckID(existingID, check)
if _, exists := existingChecks[checkID]; exists {
// Check exists, so don't remove it
// Check is still required. Remove it from the map so it doesn't get
// deleted later.
delete(existingChecks, checkID)
sreg.checkIDs[checkID] = struct{}{}
}
Expand All @@ -861,7 +890,6 @@ func (c *ServiceClient) UpdateTask(old, newTask *TaskServices) error {

for _, checkID := range newCheckIDs {
sreg.checkIDs[checkID] = struct{}{}

}

// Update all watched checks as CheckRestart fields aren't part of ID
Expand Down Expand Up @@ -1082,14 +1110,16 @@ func makeAgentServiceID(role string, service *structs.Service) string {
// Consul. All structs.Service fields are included in the ID's hash except
// Checks. This allows updates to merely compare IDs.
//
// Example Service ID: _nomad-task-TNM333JKJPM5AK4FAS3VXQLXFDWOF4VH
// Example Service ID: _nomad-task-b4e61df9-b095-d64e-f241-23860da1375f-redis-http
func makeTaskServiceID(allocID, taskName string, service *structs.Service, canary bool) string {
return nomadTaskPrefix + service.Hash(allocID, taskName, canary)
return fmt.Sprintf("%s%s-%s-%s", nomadTaskPrefix, allocID, taskName, service.Name)
}

// makeCheckID creates a unique ID for a check.
endocrimes marked this conversation as resolved.
Show resolved Hide resolved
//
// Example Check ID: _nomad-check-434ae42f9a57c5705344974ac38de2aee0ee089d
func makeCheckID(serviceID string, check *structs.ServiceCheck) string {
return check.Hash(serviceID)
return fmt.Sprintf("%s%s", nomadCheckPrefix, check.Hash(serviceID))
}

// createCheckReg creates a Check that can be registered with Consul.
Expand Down Expand Up @@ -1154,6 +1184,12 @@ func createCheckReg(serviceID, checkID string, check *structs.ServiceCheck, host
return &chkReg, nil
}

// isNomadCheck returns true if the ID matches the pattern of a Nomad managed
// check.
func isNomadCheck(id string) bool {
return strings.HasPrefix(id, nomadCheckPrefix)
}

// isNomadService returns true if the ID matches the pattern of a Nomad managed
// service (new or old formats). Agent services return false as independent
// client and server agents may be running on the same machine. #2827
Expand Down
Loading