Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle updates of Service and Check definitions #498

Merged
merged 13 commits into from
Nov 25, 2015
36 changes: 18 additions & 18 deletions client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ type AllocStateUpdater func(alloc *structs.Allocation) error

// AllocRunner is used to wrap an allocation and provide the execution context.
type AllocRunner struct {
config *config.Config
updater AllocStateUpdater
logger *log.Logger
consulClient *ConsulClient
config *config.Config
updater AllocStateUpdater
logger *log.Logger
consulService *ConsulService

alloc *structs.Allocation

Expand Down Expand Up @@ -68,19 +68,19 @@ type allocRunnerState struct {

// NewAllocRunner is used to create a new allocation context
func NewAllocRunner(logger *log.Logger, config *config.Config, updater AllocStateUpdater,
alloc *structs.Allocation, consulClient *ConsulClient) *AllocRunner {
alloc *structs.Allocation, consulService *ConsulService) *AllocRunner {
ar := &AllocRunner{
config: config,
updater: updater,
logger: logger,
alloc: alloc,
consulClient: consulClient,
dirtyCh: make(chan struct{}, 1),
tasks: make(map[string]*TaskRunner),
restored: make(map[string]struct{}),
updateCh: make(chan *structs.Allocation, 8),
destroyCh: make(chan struct{}),
waitCh: make(chan struct{}),
config: config,
updater: updater,
logger: logger,
alloc: alloc,
consulService: consulService,
dirtyCh: make(chan struct{}, 1),
tasks: make(map[string]*TaskRunner),
restored: make(map[string]struct{}),
updateCh: make(chan *structs.Allocation, 8),
destroyCh: make(chan struct{}),
waitCh: make(chan struct{}),
}
return ar
}
Expand Down Expand Up @@ -113,7 +113,7 @@ func (r *AllocRunner) RestoreState() error {
restartTracker := newRestartTracker(r.alloc.Job.Type, r.RestartPolicy)
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx,
r.alloc.ID, task, r.alloc.TaskStates[task.Name], restartTracker,
r.consulClient)
r.consulService)
r.tasks[name] = tr

// Skip tasks in terminal states.
Expand Down Expand Up @@ -325,7 +325,7 @@ func (r *AllocRunner) Run() {
restartTracker := newRestartTracker(r.alloc.Job.Type, r.RestartPolicy)
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx,
r.alloc.ID, task, r.alloc.TaskStates[task.Name], restartTracker,
r.consulClient)
r.consulService)
r.tasks[task.Name] = tr
go tr.Run()
}
Expand Down
4 changes: 2 additions & 2 deletions client/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func testAllocRunner(restarts bool) (*MockAllocStateUpdater, *AllocRunner) {
conf.AllocDir = os.TempDir()
upd := &MockAllocStateUpdater{}
alloc := mock.Alloc()
consulClient, _ := NewConsulClient(logger, "127.0.0.1:8500")
consulClient, _ := NewConsulService(logger, "127.0.0.1:8500")
if !restarts {
alloc.Job.Type = structs.JobTypeBatch
*alloc.Job.LookupTaskGroup(alloc.TaskGroup).RestartPolicy = structs.RestartPolicy{Attempts: 0}
Expand Down Expand Up @@ -142,7 +142,7 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) {
}

// Create a new alloc runner
consulClient, err := NewConsulClient(ar.logger, "127.0.0.1:8500")
consulClient, err := NewConsulService(ar.logger, "127.0.0.1:8500")
ar2 := NewAllocRunner(ar.logger, ar.config, upd.Update,
&structs.Allocation{ID: ar.alloc.ID}, consulClient)
err = ar2.RestoreState()
Expand Down
32 changes: 16 additions & 16 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type Client struct {

logger *log.Logger

consulClient *ConsulClient
consulService *ConsulService

lastServer net.Addr
lastRPCTime time.Time
Expand Down Expand Up @@ -99,22 +99,22 @@ func NewClient(cfg *config.Config) (*Client, error) {
// Create a logger
logger := log.New(cfg.LogOutput, "", log.LstdFlags)

// Create the consul client
// Create the consul service
consulAddr := cfg.ReadDefault("consul.address", "127.0.0.1:8500")
consulClient, err := NewConsulClient(logger, consulAddr)
consulService, err := NewConsulService(logger, consulAddr)
if err != nil {
return nil, fmt.Errorf("failed to create the consul client: %v", err)
}

// Create the client
c := &Client{
config: cfg,
start: time.Now(),
consulClient: consulClient,
connPool: nomad.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, nil),
logger: logger,
allocs: make(map[string]*AllocRunner),
shutdownCh: make(chan struct{}),
config: cfg,
start: time.Now(),
consulService: consulService,
connPool: nomad.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, nil),
logger: logger,
allocs: make(map[string]*AllocRunner),
shutdownCh: make(chan struct{}),
}

// Initialize the client
Expand Down Expand Up @@ -148,8 +148,8 @@ func NewClient(cfg *config.Config) (*Client, error) {
// Start the client!
go c.run()

// Start the consul client
go c.consulClient.SyncWithConsul()
// Start the consul service
go c.consulService.SyncWithConsul()
return c, nil
}

Expand Down Expand Up @@ -214,8 +214,8 @@ func (c *Client) Shutdown() error {
}
}

// Stop the consul client
c.consulClient.ShutDown()
// Stop the consul service
c.consulService.ShutDown()

c.shutdown = true
close(c.shutdownCh)
Expand Down Expand Up @@ -352,7 +352,7 @@ func (c *Client) restoreState() error {
for _, entry := range list {
id := entry.Name()
alloc := &structs.Allocation{ID: id}
ar := NewAllocRunner(c.logger, c.config, c.updateAllocStatus, alloc, c.consulClient)
ar := NewAllocRunner(c.logger, c.config, c.updateAllocStatus, alloc, c.consulService)
c.allocs[id] = ar
if err := ar.RestoreState(); err != nil {
c.logger.Printf("[ERR] client: failed to restore state for alloc %s: %v", id, err)
Expand Down Expand Up @@ -791,7 +791,7 @@ func (c *Client) updateAlloc(exist, update *structs.Allocation) error {
func (c *Client) addAlloc(alloc *structs.Allocation) error {
c.allocLock.Lock()
defer c.allocLock.Unlock()
ar := NewAllocRunner(c.logger, c.config, c.updateAllocStatus, alloc, c.consulClient)
ar := NewAllocRunner(c.logger, c.config, c.updateAllocStatus, alloc, c.consulService)
c.allocs[alloc.ID] = ar
go ar.Run()
return nil
Expand Down
Loading