Skip to content

Commit

Permalink
Merge pull request #568 from hashicorp/f-precise-consul-dereg
Browse files Browse the repository at this point in the history
Don't deregister services and checks which are not managed by Nomad
  • Loading branch information
diptanu committed Dec 11, 2015
2 parents 32c0177 + 6a1e615 commit 6df7f35
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 23 deletions.
4 changes: 2 additions & 2 deletions client/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func testAllocRunner(restarts bool) (*MockAllocStateUpdater, *AllocRunner) {
conf.AllocDir = os.TempDir()
upd := &MockAllocStateUpdater{}
alloc := mock.Alloc()
consulClient, _ := NewConsulService(logger, "127.0.0.1:8500", "", "", false, false, &structs.Node{})
consulClient, _ := NewConsulService(&consulServiceConfig{logger, "127.0.0.1:8500", "", "", false, false, &structs.Node{}})
if !restarts {
alloc.Job.Type = structs.JobTypeBatch
*alloc.Job.LookupTaskGroup(alloc.TaskGroup).RestartPolicy = structs.RestartPolicy{Attempts: 0}
Expand Down Expand Up @@ -142,7 +142,7 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) {
}

// Create a new alloc runner
consulClient, err := NewConsulService(ar.logger, "127.0.0.1:8500", "", "", false, false, &structs.Node{})
consulClient, err := NewConsulService(&consulServiceConfig{ar.logger, "127.0.0.1:8500", "", "", false, false, &structs.Node{}})
ar2 := NewAllocRunner(ar.logger, ar.config, upd.Update,
&structs.Allocation{ID: ar.alloc.ID}, consulClient)
err = ar2.RestoreState()
Expand Down
11 changes: 10 additions & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,16 @@ func (c *Client) setupConsulService() error {
auth := c.config.Read("consul.auth")
enableSSL := c.config.ReadBoolDefault("consul.ssl", false)
verifySSL := c.config.ReadBoolDefault("consul.verifyssl", true)
if consulService, err = NewConsulService(c.logger, addr, token, auth, enableSSL, verifySSL, c.config.Node); err != nil {
consulServiceCfg := &consulServiceConfig{
logger: c.logger,
consulAddr: addr,
token: token,
auth: auth,
enableSSL: enableSSL,
verifySSL: verifySSL,
node: c.config.Node,
}
if consulService, err = NewConsulService(consulServiceCfg); err != nil {
return err
}
c.consulService = consulService
Expand Down
73 changes: 57 additions & 16 deletions client/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,36 +79,45 @@ type ConsulService struct {
trackedTskLock sync.Mutex
}

type consulServiceConfig struct {
logger *log.Logger
consulAddr string
token string
auth string
enableSSL bool
verifySSL bool
node *structs.Node
}

// A factory method to create new consul service
func NewConsulService(logger *log.Logger, consulAddr string, token string,
auth string, enableSSL bool, verifySSL bool, node *structs.Node) (*ConsulService, error) {
func NewConsulService(config *consulServiceConfig) (*ConsulService, error) {
var err error
var c *consul.Client
cfg := consul.DefaultConfig()
cfg.Address = consulAddr
if token != "" {
cfg.Token = token
cfg.Address = config.consulAddr
if config.token != "" {
cfg.Token = config.token
}

if auth != "" {
if config.auth != "" {
var username, password string
if strings.Contains(auth, ":") {
split := strings.SplitN(auth, ":", 2)
if strings.Contains(config.auth, ":") {
split := strings.SplitN(config.auth, ":", 2)
username = split[0]
password = split[1]
} else {
username = auth
username = config.auth
}

cfg.HttpAuth = &consul.HttpBasicAuth{
Username: username,
Password: password,
}
}
if enableSSL {
if config.enableSSL {
cfg.Scheme = "https"
}
if enableSSL && !verifySSL {
if config.enableSSL && !config.verifySSL {
cfg.HttpClient.Transport = &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
Expand All @@ -122,8 +131,8 @@ func NewConsulService(logger *log.Logger, consulAddr string, token string,

consulService := ConsulService{
client: &consulApiClient{client: c},
logger: logger,
node: node,
logger: config.logger,
node: config.node,
trackedTasks: make(map[string]*trackedTask),
serviceStates: make(map[string]string),
shutdownCh: make(chan struct{}),
Expand Down Expand Up @@ -195,15 +204,18 @@ func (c *ConsulService) SyncWithConsul() {
// services which are no longer present in tasks
func (c *ConsulService) performSync() {
// Get the list of the services and that Consul knows about
consulServices, err := c.client.Services()
srvcs, err := c.client.Services()
if err != nil {
return
}
consulChecks, err := c.client.Checks()
chks, err := c.client.Checks()
if err != nil {
return
}
delete(consulServices, "consul")

// Filter the services and checks that isn't managed by consul
consulServices := c.filterConsulServices(srvcs)
consulChecks := c.filterConsulChecks(chks)

knownChecks := make(map[string]struct{})
knownServices := make(map[string]struct{})
Expand Down Expand Up @@ -345,6 +357,35 @@ func (c *ConsulService) makeCheck(service *structs.Service, check *structs.Servi
return cr
}

// filterConsulServices prunes out all the service whose ids are not prefixed
// with nomad-
func (c *ConsulService) filterConsulServices(srvcs map[string]*consul.AgentService) map[string]*consul.AgentService {
nomadServices := make(map[string]*consul.AgentService)
delete(srvcs, "consul")
for _, srv := range srvcs {
if strings.HasPrefix(srv.ID, structs.NomadConsulPrefix) {
nomadServices[srv.ID] = srv
}
}
return nomadServices

}

// filterConsulChecks prunes out all the consul checks which do not have
// services with id prefixed with noamd-
func (c *ConsulService) filterConsulChecks(chks map[string]*consul.AgentCheck) map[string]*consul.AgentCheck {
nomadChecks := make(map[string]*consul.AgentCheck)
for _, chk := range chks {
if strings.HasPrefix(chk.ServiceID, structs.NomadConsulPrefix) {
nomadChecks[chk.CheckID] = chk
}
}
return nomadChecks

}

// printLogMessage prints log messages only when the node attributes have consul
// related information
func (c *ConsulService) printLogMessage(message string, v ...interface{}) {
if _, ok := c.node.Attributes["consul.version"]; ok {
c.logger.Printf(message, v)
Expand Down
79 changes: 78 additions & 1 deletion client/consul_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
"log"
"os"
"reflect"
"testing"
"time"
)
Expand Down Expand Up @@ -46,7 +47,7 @@ func (a *mockConsulApiClient) Checks() (map[string]*consul.AgentCheck, error) {

func newConsulService() *ConsulService {
logger := log.New(os.Stdout, "logger: ", log.Lshortfile)
c, _ := NewConsulService(logger, "", "", "", false, false, &structs.Node{})
c, _ := NewConsulService(&consulServiceConfig{logger, "", "", "", false, false, &structs.Node{}})
c.client = &mockConsulApiClient{}
return c
}
Expand Down Expand Up @@ -278,3 +279,79 @@ func TestConsul_ModifyCheck(t *testing.T) {
t.Fatalf("Expected number of check registrations: %v, Actual: %v", 2, apiClient.checkRegisterCallCount)
}
}

func TestConsul_FilterNomadServicesAndChecks(t *testing.T) {
c := newConsulService()
srvs := map[string]*consul.AgentService{
"foo-bar": {
ID: "foo-bar",
Service: "http-frontend",
Tags: []string{"global"},
Port: 8080,
Address: "10.10.1.11",
},
"nomad-2121212": {
ID: "nomad-2121212",
Service: "identity-service",
Tags: []string{"global"},
Port: 8080,
Address: "10.10.1.11",
},
}

expSrvcs := map[string]*consul.AgentService{
"nomad-2121212": {
ID: "nomad-2121212",
Service: "identity-service",
Tags: []string{"global"},
Port: 8080,
Address: "10.10.1.11",
},
}

nomadServices := c.filterConsulServices(srvs)
if !reflect.DeepEqual(expSrvcs, nomadServices) {
t.Fatalf("Expected: %v, Actual: %v", expSrvcs, nomadServices)
}

nomadServices = c.filterConsulServices(nil)
if len(nomadServices) != 0 {
t.Fatalf("Expected number of services: %v, Actual: %v", 0, len(nomadServices))
}

chks := map[string]*consul.AgentCheck{
"foo-bar-chk": {
CheckID: "foo-bar-chk",
ServiceID: "foo-bar",
Name: "alive",
},
"212121212": {
CheckID: "212121212",
ServiceID: "nomad-2121212",
Name: "ping",
},
}

expChks := map[string]*consul.AgentCheck{
"212121212": {
CheckID: "212121212",
ServiceID: "nomad-2121212",
Name: "ping",
},
}

nomadChecks := c.filterConsulChecks(chks)
if !reflect.DeepEqual(expChks, nomadChecks) {
t.Fatalf("Expected: %v, Actual: %v", expChks, nomadChecks)
}

if len(nomadChecks) != 1 {
t.Fatalf("Expected number of checks: %v, Actual: %v", 1, len(nomadChecks))
}

nomadChecks = c.filterConsulChecks(nil)
if len(nomadChecks) != 0 {
t.Fatalf("Expected number of checks: %v, Actual: %v", 0, len(nomadChecks))
}

}
4 changes: 2 additions & 2 deletions client/task_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func testTaskRunner(restarts bool) (*MockTaskStateUpdater, *TaskRunner) {
upd := &MockTaskStateUpdater{}
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
consulClient, _ := NewConsulService(logger, "127.0.0.1:8500", "", "", false, false, &structs.Node{})
consulClient, _ := NewConsulService(&consulServiceConfig{logger, "127.0.0.1:8500", "", "", false, false, &structs.Node{}})
// Initialize the port listing. This should be done by the offer process but
// we have a mock so that doesn't happen.
task.Resources.Networks[0].ReservedPorts = []structs.Port{{"", 80}}
Expand Down Expand Up @@ -164,7 +164,7 @@ func TestTaskRunner_SaveRestoreState(t *testing.T) {
}

// Create a new task runner
consulClient, _ := NewConsulService(tr.logger, "127.0.0.1:8500", "", "", false, false, &structs.Node{})
consulClient, _ := NewConsulService(&consulServiceConfig{tr.logger, "127.0.0.1:8500", "", "", false, false, &structs.Node{}})
tr2 := NewTaskRunner(tr.logger, tr.config, upd.Update,
tr.ctx, tr.allocID, &structs.Task{Name: tr.task.Name}, tr.state, tr.restartTracker,
consulClient)
Expand Down
9 changes: 8 additions & 1 deletion nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1145,6 +1145,10 @@ func (sc *ServiceCheck) Hash(serviceId string) string {
return fmt.Sprintf("%x", h.Sum(nil))
}

const (
NomadConsulPrefix = "nomad"
)

// The Service model represents a Consul service defintion
type Service struct {
Id string // Id of the service, this needs to be unique on a local machine
Expand All @@ -1157,7 +1161,10 @@ type Service struct {
// InitFields interpolates values of Job, Task Group and Task in the Service
// Name. This also generates check names, service id and check ids.
func (s *Service) InitFields(job string, taskGroup string, task string) {
s.Id = GenerateUUID()
// We add a prefix to the Service ID so that we can know that this service
// is managed by Consul since Consul can also have service which are not
// managed by Nomad
s.Id = fmt.Sprintf("%s-%s", NomadConsulPrefix, GenerateUUID())
s.Name = args.ReplaceEnv(s.Name, map[string]string{
"JOB": job,
"TASKGROUP": taskGroup,
Expand Down

0 comments on commit 6df7f35

Please sign in to comment.