Skip to content

Commit

Permalink
Update the consul service when the task/alloc changes
Browse files Browse the repository at this point in the history
  • Loading branch information
dadgar committed Feb 7, 2016
1 parent d81ace2 commit b290b8e
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 33 deletions.
18 changes: 0 additions & 18 deletions client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,6 @@ func (r *AllocRunner) Run() {
continue
}

// Merge in the task resources
task.Resources = alloc.TaskResources[task.Name]
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx, r.alloc,
task, r.consulService)
r.tasks[task.Name] = tr
Expand All @@ -392,22 +390,6 @@ OUTER:
r.taskLock.RLock()
for _, task := range tg.Tasks {
tr := r.tasks[task.Name]

// Merge in the task resources
task.Resources = update.TaskResources[task.Name]
FOUND:
for _, updateGroup := range update.Job.TaskGroups {
if tg.Name != updateGroup.Name {
continue
}
for _, updateTask := range updateGroup.Tasks {
if updateTask.Name != task.Name {
continue
}
task.Services = updateTask.Services
break FOUND
}
}
tr.Update(update)
}
r.taskLock.RUnlock()
Expand Down
31 changes: 22 additions & 9 deletions client/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type ConsulService struct {

trackedTasks map[string]*trackedTask
serviceStates map[string]string
allocToService map[string][]string
trackedTskLock sync.Mutex
}

Expand Down Expand Up @@ -130,12 +131,13 @@ func NewConsulService(config *consulServiceConfig) (*ConsulService, error) {
}

consulService := ConsulService{
client: &consulApiClient{client: c},
logger: config.logger,
node: config.node,
trackedTasks: make(map[string]*trackedTask),
serviceStates: make(map[string]string),
shutdownCh: make(chan struct{}),
client: &consulApiClient{client: c},
logger: config.logger,
node: config.node,
trackedTasks: make(map[string]*trackedTask),
serviceStates: make(map[string]string),
allocToService: make(map[string][]string),
shutdownCh: make(chan struct{}),
}

return &consulService, nil
Expand All @@ -148,8 +150,18 @@ func (c *ConsulService) Register(task *structs.Task, alloc *structs.Allocation)
c.trackedTskLock.Lock()
tt := &trackedTask{task: task, alloc: alloc}
c.trackedTasks[fmt.Sprintf("%s-%s", alloc.ID, task.Name)] = tt

// Delete any previously registered service as the same alloc is being
// re-registered.
for _, service := range c.allocToService[alloc.ID] {
delete(c.serviceStates, service)
}
c.trackedTskLock.Unlock()

for _, service := range task.Services {
// Track the services this alloc is registering.
c.allocToService[alloc.ID] = append(c.allocToService[alloc.ID], service.Name)

c.logger.Printf("[INFO] consul: registering service %s with consul.", service.Name)
if err := c.registerService(service, task, alloc); err != nil {
mErr.Errors = append(mErr.Errors, err)
Expand All @@ -165,6 +177,7 @@ func (c *ConsulService) Deregister(task *structs.Task, alloc *structs.Allocation
var mErr multierror.Error
c.trackedTskLock.Lock()
delete(c.trackedTasks, fmt.Sprintf("%s-%s", alloc.ID, task.Name))
delete(c.allocToService, alloc.ID)
c.trackedTskLock.Unlock()
for _, service := range task.Services {
serviceID := alloc.Services[service.Name]
Expand Down Expand Up @@ -229,14 +242,14 @@ func (c *ConsulService) performSync() {
// Add new services which Consul agent isn't aware of
knownServices[serviceID] = struct{}{}
if _, ok := consulServices[serviceID]; !ok {
c.printLogMessage("[INFO] consul: registering service %s with consul.", service.Name)
c.printLogMessage("[INFO] consul: perform sync, registering service %s with consul.", service.Name)
c.registerService(service, trackedTask.task, trackedTask.alloc)
continue
}

// If a service has changed, re-register it with Consul agent
if service.Hash() != c.serviceStates[serviceID] {
c.printLogMessage("[INFO] consul: reregistering service %s with consul.", service.Name)
c.printLogMessage("[INFO] consul: perform sync hash change, reregistering service %s with consul.", service.Name)
c.registerService(service, trackedTask.task, trackedTask.alloc)
continue
}
Expand Down Expand Up @@ -268,7 +281,7 @@ func (c *ConsulService) performSync() {
for _, consulService := range consulServices {
if _, ok := knownServices[consulService.ID]; !ok {
delete(c.serviceStates, consulService.ID)
c.printLogMessage("[INFO] consul: deregistering service %v with consul", consulService.Service)
c.printLogMessage("[INFO] consul: perform sync, deregistering service %v with consul", consulService.Service)
c.deregisterService(consulService.ID)
}
}
Expand Down
13 changes: 7 additions & 6 deletions client/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ func NewTaskRunner(logger *log.Logger, config *config.Config,
alloc *structs.Allocation, task *structs.Task,
consulService *ConsulService) *TaskRunner {

// Merge in the task resources
task.Resources = alloc.TaskResources[task.Name]

// Build the restart tracker.
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
if tg == nil {
Expand Down Expand Up @@ -328,6 +331,9 @@ func (r *TaskRunner) handleUpdate(update *structs.Allocation) error {
if task == nil {
return fmt.Errorf("task group %q doesn't contain task %q", tg.Name, r.task.Name)
}

// Merge in the task resources
task.Resources = update.TaskResources[task.Name]
r.task = task

// Update will update resources and store the new kill timeout.
Expand All @@ -342,14 +348,9 @@ func (r *TaskRunner) handleUpdate(update *structs.Allocation) error {
r.restartTracker.SetPolicy(tg.RestartPolicy)
}

/* TODO
// Re-register the task to consul and store the updated alloc.
r.consulService.Deregister(r.task, r.alloc)
r.alloc = update
r.consulService.Register(r.task, r.alloc)
*/

return nil
return r.consulService.Register(r.task, r.alloc)
}

// Helper function for converting a WaitResult into a TaskTerminated event.
Expand Down

0 comments on commit b290b8e

Please sign in to comment.