Skip to content

Commit

Permalink
Merge pull request #504 from hashicorp/b-checks
Browse files Browse the repository at this point in the history
This simplifies the logic of tracking services and checks
  • Loading branch information
diptanu committed Nov 26, 2015
2 parents 9b2ba5e + 0f39fa5 commit 3d36591
Show file tree
Hide file tree
Showing 6 changed files with 262 additions and 171 deletions.
280 changes: 144 additions & 136 deletions client/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,43 +19,66 @@ const (
syncInterval = 5 * time.Second
)

type trackedService struct {
allocId string
task *structs.Task
serviceHash string
service *structs.Service
host string
port int
// consulApi is the interface which wraps the actual consul api client
type consulApi interface {
CheckRegister(check *consul.AgentCheckRegistration) error
CheckDeregister(checkID string) error
ServiceRegister(service *consul.AgentServiceRegistration) error
ServiceDeregister(ServiceID string) error
Services() (map[string]*consul.AgentService, error)
Checks() (map[string]*consul.AgentCheck, error)
}

type trackedTask struct {
allocID string
task *structs.Task
// consulApiClient is the actual implementation of the consulApi which
// talks to the consul agent
type consulApiClient struct {
client *consul.Client
}

func (t *trackedService) IsServiceValid() bool {
for _, service := range t.task.Services {
if service.Id == t.service.Id && service.Hash() == t.serviceHash {
return true
}
}
func (a *consulApiClient) CheckRegister(check *consul.AgentCheckRegistration) error {
return a.client.Agent().CheckRegister(check)
}

func (a *consulApiClient) CheckDeregister(checkID string) error {
return a.client.Agent().CheckDeregister(checkID)
}

func (a *consulApiClient) ServiceRegister(service *consul.AgentServiceRegistration) error {
return a.client.Agent().ServiceRegister(service)
}

func (a *consulApiClient) ServiceDeregister(serviceId string) error {
return a.client.Agent().ServiceDeregister(serviceId)
}

return false
func (a *consulApiClient) Services() (map[string]*consul.AgentService, error) {
return a.client.Agent().Services()
}

func (a *consulApiClient) Checks() (map[string]*consul.AgentCheck, error) {
return a.client.Agent().Checks()
}

// trackedTask is a Task that we are tracking for changes in service and check
// definitions and keep them sycned with Consul Agent
type trackedTask struct {
allocID string
task *structs.Task
}

// ConsulService is the service which tracks tasks and syncs the services and
// checks defined in them with Consul Agent
type ConsulService struct {
client *consul.Client
client consulApi
logger *log.Logger
shutdownCh chan struct{}

trackedServices map[string]*trackedService // Service ID to Tracked Service Map
trackedChecks map[string]*consul.AgentCheckRegistration // List of check ids that is being tracked
trackedTasks map[string]*trackedTask
trackedSrvLock sync.Mutex
trackedChkLock sync.Mutex
trackedTskLock sync.Mutex
trackedTasks map[string]*trackedTask
serviceStates map[string]string
trackedTskLock sync.Mutex
}

// A factory method to create new consul service
func NewConsulService(logger *log.Logger, consulAddr string, token string,
auth string, enableSSL bool, verifySSL bool) (*ConsulService, error) {
var err error
Expand Down Expand Up @@ -97,17 +120,18 @@ func NewConsulService(logger *log.Logger, consulAddr string, token string,
}

consulService := ConsulService{
client: c,
logger: logger,
trackedServices: make(map[string]*trackedService),
trackedTasks: make(map[string]*trackedTask),
trackedChecks: make(map[string]*consul.AgentCheckRegistration),
shutdownCh: make(chan struct{}),
client: &consulApiClient{client: c},
logger: logger,
trackedTasks: make(map[string]*trackedTask),
serviceStates: make(map[string]string),
shutdownCh: make(chan struct{}),
}

return &consulService, nil
}

// Register starts tracking a task for changes to it's services and tasks and
// adds/removes services and checks associated with it.
func (c *ConsulService) Register(task *structs.Task, allocID string) error {
var mErr multierror.Error
c.trackedTskLock.Lock()
Expand All @@ -124,6 +148,8 @@ func (c *ConsulService) Register(task *structs.Task, allocID string) error {
return mErr.ErrorOrNil()
}

// Deregister stops tracking a task for changes to it's services and checks and
// removes all the services and checks associated with the Task
func (c *ConsulService) Deregister(task *structs.Task, allocID string) error {
var mErr multierror.Error
c.trackedTskLock.Lock()
Expand All @@ -146,14 +172,15 @@ func (c *ConsulService) ShutDown() {
close(c.shutdownCh)
}

// SyncWithConsul is a long lived function that performs calls to sync
// checks and services periodically with Consul Agent
func (c *ConsulService) SyncWithConsul() {
sync := time.After(syncInterval)
agent := c.client.Agent()

for {
select {
case <-sync:
c.performSync(agent)
c.performSync()
sync = time.After(syncInterval)
case <-c.shutdownCh:
c.logger.Printf("[INFO] Shutting down Consul Client")
Expand All @@ -162,91 +189,78 @@ func (c *ConsulService) SyncWithConsul() {
}
}

func (c *ConsulService) performSync(agent *consul.Agent) {
var consulServices map[string]*consul.AgentService
var consulChecks map[string]*consul.AgentCheck
// performSync syncs checks and services with Consul and removed tracked
// services which are no longer present in tasks
func (c *ConsulService) performSync() {
// Get the list of the services and that Consul knows about
consulServices, _ := c.client.Services()
consulChecks, _ := c.client.Checks()
delete(consulServices, "consul")

// Remove the tracked services which tasks no longer references
for serviceId, ts := range c.trackedServices {
if !ts.IsServiceValid() {
c.logger.Printf("[DEBUG] consul: Removing service: %s since the task doesn't have it anymore", ts.service.Name)
c.deregisterService(serviceId)
}
}
knownChecks := make(map[string]struct{})
knownServices := make(map[string]struct{})

// Add additional services that we might not have added from tasks
// Add services and checks which Consul doesn't know about
for _, trackedTask := range c.trackedTasks {
for _, service := range trackedTask.task.Services {
if _, ok := c.trackedServices[service.Id]; !ok {

// Add new services which Consul agent isn't aware of
knownServices[service.Id] = struct{}{}
if _, ok := consulServices[service.Id]; !ok {
c.registerService(service, trackedTask.task, trackedTask.allocID)
continue
}
}
}

// Get the list of the services that Consul knows about
consulServices, _ = agent.Services()
// If a service has changed, re-register it with Consul agent
if service.Hash() != c.serviceStates[service.Id] {
c.registerService(service, trackedTask.task, trackedTask.allocID)
continue
}

// See if we have services that Consul doesn't know about yet.
// Register with Consul the services which are not registered
for serviceId := range c.trackedServices {
if _, ok := consulServices[serviceId]; !ok {
ts := c.trackedServices[serviceId]
c.registerService(ts.service, ts.task, ts.allocId)
// Add new checks that Consul isn't aware of
for _, check := range service.Checks {
knownChecks[check.Id] = struct{}{}
if _, ok := consulChecks[check.Id]; !ok {
host, port := trackedTask.task.FindHostAndPortFor(service.PortLabel)
cr := c.makeCheck(service, check, host, port)
c.registerCheck(cr)
}
}
}
}

// See if consul thinks we have some services which are not running
// anymore on the node. We de-register those services
for serviceId := range consulServices {
if serviceId == "consul" {
continue
}
if _, ok := c.trackedServices[serviceId]; !ok {
if err := c.deregisterService(serviceId); err != nil {
c.logger.Printf("[DEBUG] consul: Error while de-registering service with ID: %s", serviceId)
}
// Remove services from the service tracker which no longer exists
for serviceId := range c.serviceStates {
if _, ok := knownServices[serviceId]; !ok {
delete(c.serviceStates, serviceId)
}
}

consulChecks, _ = agent.Checks()

// Remove checks that Consul knows about but we don't
for checkID := range consulChecks {
if _, ok := c.trackedChecks[checkID]; !ok {
c.deregisterCheck(checkID)
// Remove services that are not present anymore
for _, consulService := range consulServices {
if _, ok := knownServices[consulService.ID]; !ok {
delete(c.serviceStates, consulService.ID)
c.deregisterService(consulService.ID)
}
}

// Add checks that might not be present
for _, ts := range c.trackedServices {
checks := c.makeChecks(ts.service, ts.host, ts.port)
for _, check := range checks {
if _, ok := consulChecks[check.ID]; !ok {
c.registerCheck(check)
}
// Remove checks that are not present anymore
for _, consulCheck := range consulChecks {
if _, ok := knownChecks[consulCheck.CheckID]; !ok {
c.deregisterCheck(consulCheck.CheckID)
}

}
}

// registerService registers a Service with Consul
func (c *ConsulService) registerService(service *structs.Service, task *structs.Task, allocID string) error {
var mErr multierror.Error
service.Id = fmt.Sprintf("%s-%s", allocID, service.Name)
host, port := task.FindHostAndPortFor(service.PortLabel)
if host == "" || port == 0 {
return fmt.Errorf("consul: The port:%s marked for registration of service: %s couldn't be found", service.PortLabel, service.Name)
}
ts := &trackedService{
allocId: allocID,
task: task,
serviceHash: service.Hash(),
service: service,
host: host,
port: port,
}
c.trackedSrvLock.Lock()
c.trackedServices[service.Id] = ts
c.trackedSrvLock.Unlock()
c.serviceStates[service.Id] = service.Hash()

asr := &consul.AgentServiceRegistration{
ID: service.Id,
Expand All @@ -256,78 +270,72 @@ func (c *ConsulService) registerService(service *structs.Service, task *structs.
Address: host,
}

if err := c.client.Agent().ServiceRegister(asr); err != nil {
if err := c.client.ServiceRegister(asr); err != nil {
c.logger.Printf("[DEBUG] consul: Error while registering service %v with Consul: %v", service.Name, err)
mErr.Errors = append(mErr.Errors, err)
}
checks := c.makeChecks(service, host, port)
for _, check := range checks {
if err := c.registerCheck(check); err != nil {
for _, check := range service.Checks {
cr := c.makeCheck(service, check, host, port)
if err := c.registerCheck(cr); err != nil {
c.logger.Printf("[ERROR] consul: Error while registerting check %v with Consul: %v", check.Name, err)
mErr.Errors = append(mErr.Errors, err)
}

}
return mErr.ErrorOrNil()
}

// registerCheck registers a check with Consul
func (c *ConsulService) registerCheck(check *consul.AgentCheckRegistration) error {
c.logger.Printf("[DEBUG] Registering Check with ID: %v for Service: %v", check.ID, check.ServiceID)
c.trackedChkLock.Lock()
c.trackedChecks[check.ID] = check
c.trackedChkLock.Unlock()
return c.client.Agent().CheckRegister(check)
return c.client.CheckRegister(check)
}

// deregisterCheck de-registers a check with a specific ID from Consul
func (c *ConsulService) deregisterCheck(checkID string) error {
c.logger.Printf("[DEBUG] Removing check with ID: %v", checkID)
c.trackedChkLock.Lock()
delete(c.trackedChecks, checkID)
c.trackedChkLock.Unlock()
return c.client.Agent().CheckDeregister(checkID)
return c.client.CheckDeregister(checkID)
}

// deregisterService de-registers a Service with a specific id from Consul
func (c *ConsulService) deregisterService(serviceId string) error {
c.trackedSrvLock.Lock()
delete(c.trackedServices, serviceId)
c.trackedSrvLock.Unlock()

if err := c.client.Agent().ServiceDeregister(serviceId); err != nil {
delete(c.serviceStates, serviceId)
if err := c.client.ServiceDeregister(serviceId); err != nil {
return err
}
return nil
}

func (c *ConsulService) makeChecks(service *structs.Service, ip string, port int) []*consul.AgentCheckRegistration {
var checks []*consul.AgentCheckRegistration
for _, check := range service.Checks {
if check.Name == "" {
check.Name = fmt.Sprintf("service: '%s' check", service.Name)
}
cr := &consul.AgentCheckRegistration{
ID: check.Hash(),
Name: check.Name,
ServiceID: service.Id,
// makeCheck creates a Consul Check Registration struct
func (c *ConsulService) makeCheck(service *structs.Service, check *structs.ServiceCheck, ip string, port int) *consul.AgentCheckRegistration {
if check.Name == "" {
check.Name = fmt.Sprintf("service: %q%s%q check", service.Name)
}
check.Id = check.Hash(service.Id)

cr := &consul.AgentCheckRegistration{
ID: check.Id,
Name: check.Name,
ServiceID: service.Id,
}
cr.Interval = check.Interval.String()
cr.Timeout = check.Timeout.String()

switch check.Type {
case structs.ServiceCheckHTTP:
if check.Protocol == "" {
check.Protocol = "http"
}
cr.Interval = check.Interval.String()
cr.Timeout = check.Timeout.String()
switch check.Type {
case structs.ServiceCheckHTTP:
if check.Protocol == "" {
check.Protocol = "http"
}
url := url.URL{
Scheme: check.Protocol,
Host: fmt.Sprintf("%s:%d", ip, port),
Path: check.Path,
}
cr.HTTP = url.String()
case structs.ServiceCheckTCP:
cr.TCP = fmt.Sprintf("%s:%d", ip, port)
case structs.ServiceCheckScript:
cr.Script = check.Script // TODO This needs to include the path of the alloc dir and based on driver types
url := url.URL{
Scheme: check.Protocol,
Host: fmt.Sprintf("%s:%d", ip, port),
Path: check.Path,
}

checks = append(checks, cr)
cr.HTTP = url.String()
case structs.ServiceCheckTCP:
cr.TCP = fmt.Sprintf("%s:%d", ip, port)
case structs.ServiceCheckScript:
cr.Script = check.Script // TODO This needs to include the path of the alloc dir and based on driver types
}
return checks
return cr
}
Loading

0 comments on commit 3d36591

Please sign in to comment.