Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract the heartbeat and saveState into their own go routines #811

Merged
merged 1 commit into from
Feb 17, 2016
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 49 additions & 20 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,16 +152,23 @@ func NewClient(cfg *config.Config) (*Client, error) {
// initialized.
c.configCopy = c.config.Copy()

// Start the consul service
go c.consulService.SyncWithConsul()

// Restore the state
if err := c.restoreState(); err != nil {
return nil, fmt.Errorf("failed to restore state: %v", err)
}

// Register and then start heartbeating to the servers.
go c.registerAndHeartbeat()

// Begin periodic snapshotting of state.
go c.periodicSnapshot()

// Start the client!
go c.run()

// Start the consul service
go c.consulService.SyncWithConsul()
return c, nil
}

Expand Down Expand Up @@ -625,13 +632,12 @@ func (c *Client) retryIntv(base time.Duration) time.Duration {
return base + randomStagger(base)
}

// run is a long lived goroutine used to run the client
func (c *Client) run() {
// registerAndHeartbeat is a long lived goroutine used to register the client
// and then start heartbeatng to the server.
func (c *Client) registerAndHeartbeat() {
// Register the node
c.retryRegisterNode()

// Watch for node changes
go c.watchNodeUpdates()

// Setup the heartbeat timer, for the initial registration
// we want to do this quickly. We want to do it extra quickly
// in development mode.
Expand All @@ -642,14 +648,29 @@ func (c *Client) run() {
heartbeat = time.After(randomStagger(initialHeartbeatStagger))
}

// Watch for changes in allocations
allocUpdates := make(chan *allocUpdates, 1)
go c.watchAllocations(allocUpdates)
for {
select {
case <-heartbeat:
if err := c.updateNodeStatus(); err != nil {
heartbeat = time.After(c.retryIntv(registerRetryIntv))
} else {
c.heartbeatLock.Lock()
heartbeat = time.After(c.heartbeatTTL)
c.heartbeatLock.Unlock()
}

case <-c.shutdownCh:
return
}
}
}

// periodicSnapshot is a long lived goroutine used to periodically snapshot the
// state of the client
func (c *Client) periodicSnapshot() {
// Create a snapshot timer
snapshot := time.After(stateSnapshotIntv)

// Periodically update our status and wait for termination
for {
select {
case <-snapshot:
Expand All @@ -658,18 +679,26 @@ func (c *Client) run() {
c.logger.Printf("[ERR] client: failed to save state: %v", err)
}

case <-c.shutdownCh:
return
}
}
}

// run is a long lived goroutine used to run the client
func (c *Client) run() {
// Watch for node changes
go c.watchNodeUpdates()

// Watch for changes in allocations
allocUpdates := make(chan *allocUpdates, 1)
go c.watchAllocations(allocUpdates)

for {
select {
case update := <-allocUpdates:
c.runAllocs(update)

case <-heartbeat:
if err := c.updateNodeStatus(); err != nil {
heartbeat = time.After(c.retryIntv(registerRetryIntv))
} else {
c.heartbeatLock.Lock()
heartbeat = time.After(c.heartbeatTTL)
c.heartbeatLock.Unlock()
}

case <-c.shutdownCh:
return
}
Expand Down