Skip to content

Commit

Permalink
Merge pull request #498 from hashicorp/b-consul-check
Browse files Browse the repository at this point in the history
Handle updates of Service and Check definitions
  • Loading branch information
diptanu committed Nov 25, 2015
2 parents 5babe23 + e9019d1 commit beebb7c
Show file tree
Hide file tree
Showing 9 changed files with 368 additions and 136 deletions.
36 changes: 18 additions & 18 deletions client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ type AllocStateUpdater func(alloc *structs.Allocation) error

// AllocRunner is used to wrap an allocation and provide the execution context.
type AllocRunner struct {
config *config.Config
updater AllocStateUpdater
logger *log.Logger
consulClient *ConsulClient
config *config.Config
updater AllocStateUpdater
logger *log.Logger
consulService *ConsulService

alloc *structs.Allocation

Expand Down Expand Up @@ -68,19 +68,19 @@ type allocRunnerState struct {

// NewAllocRunner is used to create a new allocation context
func NewAllocRunner(logger *log.Logger, config *config.Config, updater AllocStateUpdater,
alloc *structs.Allocation, consulClient *ConsulClient) *AllocRunner {
alloc *structs.Allocation, consulService *ConsulService) *AllocRunner {
ar := &AllocRunner{
config: config,
updater: updater,
logger: logger,
alloc: alloc,
consulClient: consulClient,
dirtyCh: make(chan struct{}, 1),
tasks: make(map[string]*TaskRunner),
restored: make(map[string]struct{}),
updateCh: make(chan *structs.Allocation, 8),
destroyCh: make(chan struct{}),
waitCh: make(chan struct{}),
config: config,
updater: updater,
logger: logger,
alloc: alloc,
consulService: consulService,
dirtyCh: make(chan struct{}, 1),
tasks: make(map[string]*TaskRunner),
restored: make(map[string]struct{}),
updateCh: make(chan *structs.Allocation, 8),
destroyCh: make(chan struct{}),
waitCh: make(chan struct{}),
}
return ar
}
Expand Down Expand Up @@ -113,7 +113,7 @@ func (r *AllocRunner) RestoreState() error {
restartTracker := newRestartTracker(r.alloc.Job.Type, r.RestartPolicy)
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx,
r.alloc.ID, task, r.alloc.TaskStates[task.Name], restartTracker,
r.consulClient)
r.consulService)
r.tasks[name] = tr

// Skip tasks in terminal states.
Expand Down Expand Up @@ -325,7 +325,7 @@ func (r *AllocRunner) Run() {
restartTracker := newRestartTracker(r.alloc.Job.Type, r.RestartPolicy)
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx,
r.alloc.ID, task, r.alloc.TaskStates[task.Name], restartTracker,
r.consulClient)
r.consulService)
r.tasks[task.Name] = tr
go tr.Run()
}
Expand Down
4 changes: 2 additions & 2 deletions client/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func testAllocRunner(restarts bool) (*MockAllocStateUpdater, *AllocRunner) {
conf.AllocDir = os.TempDir()
upd := &MockAllocStateUpdater{}
alloc := mock.Alloc()
consulClient, _ := NewConsulClient(logger, "127.0.0.1:8500")
consulClient, _ := NewConsulService(logger, "127.0.0.1:8500")
if !restarts {
alloc.Job.Type = structs.JobTypeBatch
*alloc.Job.LookupTaskGroup(alloc.TaskGroup).RestartPolicy = structs.RestartPolicy{Attempts: 0}
Expand Down Expand Up @@ -142,7 +142,7 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) {
}

// Create a new alloc runner
consulClient, err := NewConsulClient(ar.logger, "127.0.0.1:8500")
consulClient, err := NewConsulService(ar.logger, "127.0.0.1:8500")
ar2 := NewAllocRunner(ar.logger, ar.config, upd.Update,
&structs.Allocation{ID: ar.alloc.ID}, consulClient)
err = ar2.RestoreState()
Expand Down
32 changes: 16 additions & 16 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type Client struct {

logger *log.Logger

consulClient *ConsulClient
consulService *ConsulService

lastServer net.Addr
lastRPCTime time.Time
Expand Down Expand Up @@ -98,22 +98,22 @@ func NewClient(cfg *config.Config) (*Client, error) {
// Create a logger
logger := log.New(cfg.LogOutput, "", log.LstdFlags)

// Create the consul client
// Create the consul service
consulAddr := cfg.ReadDefault("consul.address", "127.0.0.1:8500")
consulClient, err := NewConsulClient(logger, consulAddr)
consulService, err := NewConsulService(logger, consulAddr)
if err != nil {
return nil, fmt.Errorf("failed to create the consul client: %v", err)
}

// Create the client
c := &Client{
config: cfg,
start: time.Now(),
consulClient: consulClient,
connPool: nomad.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, nil),
logger: logger,
allocs: make(map[string]*AllocRunner),
shutdownCh: make(chan struct{}),
config: cfg,
start: time.Now(),
consulService: consulService,
connPool: nomad.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, nil),
logger: logger,
allocs: make(map[string]*AllocRunner),
shutdownCh: make(chan struct{}),
}

// Initialize the client
Expand Down Expand Up @@ -147,8 +147,8 @@ func NewClient(cfg *config.Config) (*Client, error) {
// Start the client!
go c.run()

// Start the consul client
go c.consulClient.SyncWithConsul()
// Start the consul service
go c.consulService.SyncWithConsul()
return c, nil
}

Expand Down Expand Up @@ -213,8 +213,8 @@ func (c *Client) Shutdown() error {
}
}

// Stop the consul client
c.consulClient.ShutDown()
// Stop the consul service
c.consulService.ShutDown()

c.shutdown = true
close(c.shutdownCh)
Expand Down Expand Up @@ -351,7 +351,7 @@ func (c *Client) restoreState() error {
for _, entry := range list {
id := entry.Name()
alloc := &structs.Allocation{ID: id}
ar := NewAllocRunner(c.logger, c.config, c.updateAllocStatus, alloc, c.consulClient)
ar := NewAllocRunner(c.logger, c.config, c.updateAllocStatus, alloc, c.consulService)
c.allocs[id] = ar
if err := ar.RestoreState(); err != nil {
c.logger.Printf("[ERR] client: failed to restore state for alloc %s: %v", id, err)
Expand Down Expand Up @@ -795,7 +795,7 @@ func (c *Client) updateAlloc(exist, update *structs.Allocation) error {
func (c *Client) addAlloc(alloc *structs.Allocation) error {
c.allocLock.Lock()
defer c.allocLock.Unlock()
ar := NewAllocRunner(c.logger, c.config, c.updateAllocStatus, alloc, c.consulClient)
ar := NewAllocRunner(c.logger, c.config, c.updateAllocStatus, alloc, c.consulService)
c.allocs[alloc.ID] = ar
go ar.Run()
return nil
Expand Down
Loading

0 comments on commit beebb7c

Please sign in to comment.