Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Making the allocs hold service ids #583

Merged
merged 11 commits into from
Dec 15, 2015
1 change: 1 addition & 0 deletions api/allocations.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type Allocation struct {
TaskGroup string
Resources *Resources
TaskResources map[string]*Resources
Services map[string]string
Metrics *AllocationMetric
DesiredStatus string
DesiredDescription string
Expand Down
4 changes: 2 additions & 2 deletions client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (r *AllocRunner) RestoreState() error {
task := &structs.Task{Name: name}
restartTracker := newRestartTracker(r.alloc.Job.Type, r.RestartPolicy)
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx,
r.alloc.ID, task, r.alloc.TaskStates[task.Name], restartTracker,
r.alloc, task, r.alloc.TaskStates[task.Name], restartTracker,
r.consulService)
r.tasks[name] = tr

Expand Down Expand Up @@ -324,7 +324,7 @@ func (r *AllocRunner) Run() {
task.Resources = alloc.TaskResources[task.Name]
restartTracker := newRestartTracker(r.alloc.Job.Type, r.RestartPolicy)
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx,
r.alloc.ID, task, r.alloc.TaskStates[task.Name], restartTracker,
r.alloc, task, r.alloc.TaskStates[task.Name], restartTracker,
r.consulService)
r.tasks[task.Name] = tr
go tr.Run()
Expand Down
72 changes: 39 additions & 33 deletions client/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ func (a *consulApiClient) ServiceRegister(service *consul.AgentServiceRegistrati
return a.client.Agent().ServiceRegister(service)
}

func (a *consulApiClient) ServiceDeregister(serviceId string) error {
return a.client.Agent().ServiceDeregister(serviceId)
func (a *consulApiClient) ServiceDeregister(serviceID string) error {
return a.client.Agent().ServiceDeregister(serviceID)
}

func (a *consulApiClient) Services() (map[string]*consul.AgentService, error) {
Expand All @@ -62,8 +62,8 @@ func (a *consulApiClient) Checks() (map[string]*consul.AgentCheck, error) {
// trackedTask is a Task that we are tracking for changes in service and check
// definitions and keep them sycned with Consul Agent
type trackedTask struct {
allocID string
task *structs.Task
task *structs.Task
alloc *structs.Allocation
}

// ConsulService is the service which tracks tasks and syncs the services and
Expand Down Expand Up @@ -143,15 +143,16 @@ func NewConsulService(config *consulServiceConfig) (*ConsulService, error) {

// Register starts tracking a task for changes to it's services and tasks and
// adds/removes services and checks associated with it.
func (c *ConsulService) Register(task *structs.Task, allocID string) error {
func (c *ConsulService) Register(task *structs.Task, alloc *structs.Allocation) error {
var mErr multierror.Error
c.trackedTskLock.Lock()
tt := &trackedTask{allocID: allocID, task: task}
c.trackedTasks[fmt.Sprintf("%s-%s", allocID, task.Name)] = tt
tt := &trackedTask{task: task, alloc: alloc}
c.trackedTasks[fmt.Sprintf("%s-%s", alloc.ID, task.Name)] = tt
c.trackedTskLock.Unlock()
for _, service := range task.Services {
c.logger.Printf("[INFO] consul: registering service %s with consul.", service.Name)
if err := c.registerService(service, task, allocID); err != nil {
if err := c.registerService(service, task, alloc); err != nil {
fmt.Printf("DIPTANU ERR %v\n", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Debug message? :)

mErr.Errors = append(mErr.Errors, err)
}
}
Expand All @@ -161,17 +162,18 @@ func (c *ConsulService) Register(task *structs.Task, allocID string) error {

// Deregister stops tracking a task for changes to it's services and checks and
// removes all the services and checks associated with the Task
func (c *ConsulService) Deregister(task *structs.Task, allocID string) error {
func (c *ConsulService) Deregister(task *structs.Task, alloc *structs.Allocation) error {
var mErr multierror.Error
c.trackedTskLock.Lock()
delete(c.trackedTasks, fmt.Sprintf("%s-%s", allocID, task.Name))
delete(c.trackedTasks, fmt.Sprintf("%s-%s", alloc.ID, task.Name))
c.trackedTskLock.Unlock()
for _, service := range task.Services {
if service.Id == "" {
serviceID := alloc.Services[service.Name]
if serviceID == "" {
continue
}
c.logger.Printf("[INFO] consul: deregistering service %v with consul", service.Name)
if err := c.deregisterService(service.Id); err != nil {
if err := c.deregisterService(serviceID); err != nil {
c.printLogMessage("[DEBUG] consul: error in deregistering service %v from consul", service.Name)
mErr.Errors = append(mErr.Errors, err)
}
Expand Down Expand Up @@ -223,38 +225,40 @@ func (c *ConsulService) performSync() {
// Add services and checks which Consul doesn't know about
for _, trackedTask := range c.trackedTasks {
for _, service := range trackedTask.task.Services {
serviceID := trackedTask.alloc.Services[service.Name]

// Add new services which Consul agent isn't aware of
knownServices[service.Id] = struct{}{}
if _, ok := consulServices[service.Id]; !ok {
knownServices[serviceID] = struct{}{}
if _, ok := consulServices[serviceID]; !ok {
c.printLogMessage("[INFO] consul: registering service %s with consul.", service.Name)
c.registerService(service, trackedTask.task, trackedTask.allocID)
c.registerService(service, trackedTask.task, trackedTask.alloc)
continue
}

// If a service has changed, re-register it with Consul agent
if service.Hash() != c.serviceStates[service.Id] {
if service.Hash() != c.serviceStates[serviceID] {
c.printLogMessage("[INFO] consul: reregistering service %s with consul.", service.Name)
c.registerService(service, trackedTask.task, trackedTask.allocID)
c.registerService(service, trackedTask.task, trackedTask.alloc)
continue
}

// Add new checks that Consul isn't aware of
for _, check := range service.Checks {
knownChecks[check.Id] = struct{}{}
if _, ok := consulChecks[check.Id]; !ok {
checkID := check.Hash(serviceID)
knownChecks[checkID] = struct{}{}
if _, ok := consulChecks[checkID]; !ok {
host, port := trackedTask.task.FindHostAndPortFor(service.PortLabel)
cr := c.makeCheck(service, check, host, port)
cr := c.makeCheck(serviceID, check, host, port)
c.registerCheck(cr)
}
}
}
}

// Remove services from the service tracker which no longer exists
for serviceId := range c.serviceStates {
if _, ok := knownServices[serviceId]; !ok {
delete(c.serviceStates, serviceId)
for serviceID := range c.serviceStates {
if _, ok := knownServices[serviceID]; !ok {
delete(c.serviceStates, serviceID)
}
}

Expand All @@ -276,16 +280,17 @@ func (c *ConsulService) performSync() {
}

// registerService registers a Service with Consul
func (c *ConsulService) registerService(service *structs.Service, task *structs.Task, allocID string) error {
func (c *ConsulService) registerService(service *structs.Service, task *structs.Task, alloc *structs.Allocation) error {
var mErr multierror.Error
host, port := task.FindHostAndPortFor(service.PortLabel)
if host == "" || port == 0 {
return fmt.Errorf("consul: the port:%q marked for registration of service: %q couldn't be found", service.PortLabel, service.Name)
}
c.serviceStates[service.Id] = service.Hash()
serviceID := alloc.Services[service.Name]
c.serviceStates[serviceID] = service.Hash()

asr := &consul.AgentServiceRegistration{
ID: service.Id,
ID: serviceID,
Name: service.Name,
Tags: service.Tags,
Port: port,
Expand All @@ -297,7 +302,7 @@ func (c *ConsulService) registerService(service *structs.Service, task *structs.
mErr.Errors = append(mErr.Errors, err)
}
for _, check := range service.Checks {
cr := c.makeCheck(service, check, host, port)
cr := c.makeCheck(serviceID, check, host, port)
if err := c.registerCheck(cr); err != nil {
c.printLogMessage("[DEBUG] consul: error while registerting check %v with consul: %v", check.Name, err)
mErr.Errors = append(mErr.Errors, err)
Expand All @@ -320,20 +325,21 @@ func (c *ConsulService) deregisterCheck(checkID string) error {
}

// deregisterService de-registers a Service with a specific id from Consul
func (c *ConsulService) deregisterService(serviceId string) error {
delete(c.serviceStates, serviceId)
if err := c.client.ServiceDeregister(serviceId); err != nil {
func (c *ConsulService) deregisterService(serviceID string) error {
delete(c.serviceStates, serviceID)
if err := c.client.ServiceDeregister(serviceID); err != nil {
return err
}
return nil
}

// makeCheck creates a Consul Check Registration struct
func (c *ConsulService) makeCheck(service *structs.Service, check *structs.ServiceCheck, ip string, port int) *consul.AgentCheckRegistration {
func (c *ConsulService) makeCheck(serviceID string, check *structs.ServiceCheck, ip string, port int) *consul.AgentCheckRegistration {
checkID := check.Hash(serviceID)
cr := &consul.AgentCheckRegistration{
ID: check.Id,
ID: checkID,
Name: check.Name,
ServiceID: service.Id,
ServiceID: serviceID,
}
cr.Interval = check.Interval.String()
cr.Timeout = check.Timeout.String()
Expand Down
32 changes: 16 additions & 16 deletions client/consul_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package client

import (
"fmt"
consul "github.com/hashicorp/consul/api"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"log"
"os"
Expand Down Expand Up @@ -32,7 +34,7 @@ func (a *mockConsulApiClient) ServiceRegister(service *consul.AgentServiceRegist
return nil
}

func (a *mockConsulApiClient) ServiceDeregister(serviceId string) error {
func (a *mockConsulApiClient) ServiceDeregister(serviceID string) error {
a.serviceDeregisterCallCount += 1
return nil
}
Expand Down Expand Up @@ -70,7 +72,6 @@ func newTask() *structs.Task {

func TestConsul_MakeChecks(t *testing.T) {
service := &structs.Service{
Id: "Foo",
Name: "Bar",
Checks: []*structs.ServiceCheck{
{
Expand All @@ -95,10 +96,11 @@ func TestConsul_MakeChecks(t *testing.T) {
}

c := newConsulService()
serviceID := fmt.Sprintf("%s-1234", structs.NomadConsulPrefix)

check1 := c.makeCheck(service, service.Checks[0], "10.10.0.1", 8090)
check2 := c.makeCheck(service, service.Checks[1], "10.10.0.1", 8090)
check3 := c.makeCheck(service, service.Checks[2], "10.10.0.1", 8090)
check1 := c.makeCheck(serviceID, service.Checks[0], "10.10.0.1", 8090)
check2 := c.makeCheck(serviceID, service.Checks[1], "10.10.0.1", 8090)
check3 := c.makeCheck(serviceID, service.Checks[2], "10.10.0.1", 8090)

if check1.HTTP != "http://10.10.0.1:8090/foo/bar" {
t.Fatalf("Invalid http url for check: %v", check1.HTTP)
Expand Down Expand Up @@ -142,15 +144,14 @@ func TestConsul_InvalidPortLabelForService(t *testing.T) {
},
}
service := &structs.Service{
Id: "service-id",
Name: "foo",
Tags: []string{"a", "b"},
PortLabel: "https",
Checks: make([]*structs.ServiceCheck, 0),
}

c := newConsulService()
if err := c.registerService(service, task, "allocid"); err == nil {
if err := c.registerService(service, task, mock.Alloc()); err == nil {
t.Fatalf("Service should be invalid")
}
}
Expand All @@ -175,7 +176,7 @@ func TestConsul_Services_Deleted_From_Task(t *testing.T) {
},
},
}
c.Register(&task, "1")
c.Register(&task, mock.Alloc())
if len(c.serviceStates) != 1 {
t.Fatalf("Expected tracked services: %v, Actual: %v", 1, len(c.serviceStates))
}
Expand All @@ -191,13 +192,14 @@ func TestConsul_Service_Should_Be_Re_Reregistered_On_Change(t *testing.T) {
c := newConsulService()
task := newTask()
s1 := structs.Service{
Id: "1-example-cache-redis",
Name: "example-cache-redis",
Tags: []string{"global"},
PortLabel: "db",
}
task.Services = append(task.Services, &s1)
c.Register(task, "1")
alloc := mock.Alloc()
serviceID := alloc.Services[s1.Name]
c.Register(task, alloc)

s1.Tags = []string{"frontcache"}

Expand All @@ -207,8 +209,8 @@ func TestConsul_Service_Should_Be_Re_Reregistered_On_Change(t *testing.T) {
t.Fatal("We should be tracking one service")
}

if c.serviceStates[s1.Id] != s1.Hash() {
t.Fatalf("Hash is %v, expected %v", c.serviceStates[s1.Id], s1.Hash())
if c.serviceStates[serviceID] != s1.Hash() {
t.Fatalf("Hash is %v, expected %v", c.serviceStates[serviceID], s1.Hash())
}
}

Expand All @@ -219,14 +221,13 @@ func TestConsul_AddCheck_To_Service(t *testing.T) {
task := newTask()
var checks []*structs.ServiceCheck
s1 := structs.Service{
Id: "1-example-cache-redis",
Name: "example-cache-redis",
Tags: []string{"global"},
PortLabel: "db",
Checks: checks,
}
task.Services = append(task.Services, &s1)
c.Register(task, "1")
c.Register(task, mock.Alloc())

check1 := structs.ServiceCheck{
Name: "alive",
Expand All @@ -250,14 +251,13 @@ func TestConsul_ModifyCheck(t *testing.T) {
task := newTask()
var checks []*structs.ServiceCheck
s1 := structs.Service{
Id: "1-example-cache-redis",
Name: "example-cache-redis",
Tags: []string{"global"},
PortLabel: "db",
Checks: checks,
}
task.Services = append(task.Services, &s1)
c.Register(task, "1")
c.Register(task, mock.Alloc())

check1 := structs.ServiceCheck{
Name: "alive",
Expand Down
Loading