Skip to content

Commit

Permalink
Merge pull request #811 from hashicorp/b-heartbeat-loop
Browse files Browse the repository at this point in the history
Extract the heartbeat and saveState into their own go routines
  • Loading branch information
dadgar committed Feb 17, 2016
2 parents da73f59 + 1cc927e commit daf1231
Showing 1 changed file with 49 additions and 20 deletions.
69 changes: 49 additions & 20 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,16 +152,23 @@ func NewClient(cfg *config.Config) (*Client, error) {
// initialized.
c.configCopy = c.config.Copy()

// Start the consul service
go c.consulService.SyncWithConsul()

// Restore the state
if err := c.restoreState(); err != nil {
return nil, fmt.Errorf("failed to restore state: %v", err)
}

// Register and then start heartbeating to the servers.
go c.registerAndHeartbeat()

// Begin periodic snapshotting of state.
go c.periodicSnapshot()

// Start the client!
go c.run()

// Start the consul service
go c.consulService.SyncWithConsul()
return c, nil
}

Expand Down Expand Up @@ -625,13 +632,12 @@ func (c *Client) retryIntv(base time.Duration) time.Duration {
return base + randomStagger(base)
}

// run is a long lived goroutine used to run the client
func (c *Client) run() {
// registerAndHeartbeat is a long lived goroutine used to register the client
// and then start heartbeatng to the server.
func (c *Client) registerAndHeartbeat() {
// Register the node
c.retryRegisterNode()

// Watch for node changes
go c.watchNodeUpdates()

// Setup the heartbeat timer, for the initial registration
// we want to do this quickly. We want to do it extra quickly
// in development mode.
Expand All @@ -642,14 +648,29 @@ func (c *Client) run() {
heartbeat = time.After(randomStagger(initialHeartbeatStagger))
}

// Watch for changes in allocations
allocUpdates := make(chan *allocUpdates, 1)
go c.watchAllocations(allocUpdates)
for {
select {
case <-heartbeat:
if err := c.updateNodeStatus(); err != nil {
heartbeat = time.After(c.retryIntv(registerRetryIntv))
} else {
c.heartbeatLock.Lock()
heartbeat = time.After(c.heartbeatTTL)
c.heartbeatLock.Unlock()
}

case <-c.shutdownCh:
return
}
}
}

// periodicSnapshot is a long lived goroutine used to periodically snapshot the
// state of the client
func (c *Client) periodicSnapshot() {
// Create a snapshot timer
snapshot := time.After(stateSnapshotIntv)

// Periodically update our status and wait for termination
for {
select {
case <-snapshot:
Expand All @@ -658,18 +679,26 @@ func (c *Client) run() {
c.logger.Printf("[ERR] client: failed to save state: %v", err)
}

case <-c.shutdownCh:
return
}
}
}

// run is a long lived goroutine used to run the client
func (c *Client) run() {
// Watch for node changes
go c.watchNodeUpdates()

// Watch for changes in allocations
allocUpdates := make(chan *allocUpdates, 1)
go c.watchAllocations(allocUpdates)

for {
select {
case update := <-allocUpdates:
c.runAllocs(update)

case <-heartbeat:
if err := c.updateNodeStatus(); err != nil {
heartbeat = time.After(c.retryIntv(registerRetryIntv))
} else {
c.heartbeatLock.Lock()
heartbeat = time.After(c.heartbeatTTL)
c.heartbeatLock.Unlock()
}

case <-c.shutdownCh:
return
}
Expand Down

0 comments on commit daf1231

Please sign in to comment.