From 1cc927e885e9cbf95ba100ac5b99104c5f742e04 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 17 Feb 2016 11:32:17 -0800 Subject: [PATCH] Extract the heartbeat and saveState into their own go routines --- client/client.go | 69 ++++++++++++++++++++++++++++++++++-------------- 1 file changed, 49 insertions(+), 20 deletions(-) diff --git a/client/client.go b/client/client.go index bdcde6227aca..19f609809198 100644 --- a/client/client.go +++ b/client/client.go @@ -152,16 +152,23 @@ func NewClient(cfg *config.Config) (*Client, error) { // initialized. c.configCopy = c.config.Copy() + // Start the consul service + go c.consulService.SyncWithConsul() + // Restore the state if err := c.restoreState(); err != nil { return nil, fmt.Errorf("failed to restore state: %v", err) } + // Register and then start heartbeating to the servers. + go c.registerAndHeartbeat() + + // Begin periodic snapshotting of state. + go c.periodicSnapshot() + // Start the client! go c.run() - // Start the consul service - go c.consulService.SyncWithConsul() return c, nil } @@ -625,13 +632,12 @@ func (c *Client) retryIntv(base time.Duration) time.Duration { return base + randomStagger(base) } -// run is a long lived goroutine used to run the client -func (c *Client) run() { +// registerAndHeartbeat is a long lived goroutine used to register the client +// and then start heartbeatng to the server. +func (c *Client) registerAndHeartbeat() { + // Register the node c.retryRegisterNode() - // Watch for node changes - go c.watchNodeUpdates() - // Setup the heartbeat timer, for the initial registration // we want to do this quickly. We want to do it extra quickly // in development mode. @@ -642,14 +648,29 @@ func (c *Client) run() { heartbeat = time.After(randomStagger(initialHeartbeatStagger)) } - // Watch for changes in allocations - allocUpdates := make(chan *allocUpdates, 1) - go c.watchAllocations(allocUpdates) + for { + select { + case <-heartbeat: + if err := c.updateNodeStatus(); err != nil { + heartbeat = time.After(c.retryIntv(registerRetryIntv)) + } else { + c.heartbeatLock.Lock() + heartbeat = time.After(c.heartbeatTTL) + c.heartbeatLock.Unlock() + } + case <-c.shutdownCh: + return + } + } +} + +// periodicSnapshot is a long lived goroutine used to periodically snapshot the +// state of the client +func (c *Client) periodicSnapshot() { // Create a snapshot timer snapshot := time.After(stateSnapshotIntv) - // Periodically update our status and wait for termination for { select { case <-snapshot: @@ -658,18 +679,26 @@ func (c *Client) run() { c.logger.Printf("[ERR] client: failed to save state: %v", err) } + case <-c.shutdownCh: + return + } + } +} + +// run is a long lived goroutine used to run the client +func (c *Client) run() { + // Watch for node changes + go c.watchNodeUpdates() + + // Watch for changes in allocations + allocUpdates := make(chan *allocUpdates, 1) + go c.watchAllocations(allocUpdates) + + for { + select { case update := <-allocUpdates: c.runAllocs(update) - case <-heartbeat: - if err := c.updateNodeStatus(); err != nil { - heartbeat = time.After(c.retryIntv(registerRetryIntv)) - } else { - c.heartbeatLock.Lock() - heartbeat = time.After(c.heartbeatTTL) - c.heartbeatLock.Unlock() - } - case <-c.shutdownCh: return }