From 48eff00427c438329810c41e3f300f01317be9d8 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Sun, 21 Feb 2016 21:32:32 -0800 Subject: [PATCH] address feedback --- client/alloc_runner.go | 4 ---- client/client.go | 27 ++++++++++++++++++++------- 2 files changed, 20 insertions(+), 11 deletions(-) diff --git a/client/alloc_runner.go b/client/alloc_runner.go index 5ec5873dfff4..dd0f1f8db933 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -16,10 +16,6 @@ import ( ) const ( - // allocSyncRetryIntv is the interval on which we retry updating - // the status of the allocation - allocSyncRetryIntv = 15 * time.Second - // taskReceivedSyncLimit is how long the client will wait before sending // that a task was received to the server. The client does not immediately // send that the task was received to the server because another transistion diff --git a/client/client.go b/client/client.go index c5879796369c..1295ed062409 100644 --- a/client/client.go +++ b/client/client.go @@ -62,6 +62,10 @@ const ( // allocSyncIntv is the batching period of allocation updates before they // are synced with the server. allocSyncIntv = 200 * time.Millisecond + + // allocSyncRetryIntv is the interval on which we retry updating + // the status of the allocation + allocSyncRetryIntv = 5 * time.Second ) // DefaultConfig returns the default configuration @@ -834,26 +838,27 @@ func (c *Client) updateAllocStatus(alloc *structs.Allocation) { stripped.TaskStates = alloc.TaskStates stripped.ClientStatus = alloc.ClientStatus stripped.ClientDescription = alloc.ClientDescription - c.allocUpdates <- stripped + select { + case c.allocUpdates <- stripped: + case <-c.shutdownCh: + } } // allocSync is a long lived function that batches allocation updates to the // server. func (c *Client) allocSync() { - timeoutTimer := time.NewTimer(allocSyncIntv) - timeoutCh := timeoutTimer.C + staggered := false + syncTicker := time.NewTicker(allocSyncIntv) updates := make(map[string]*structs.Allocation) for { select { case <-c.shutdownCh: + syncTicker.Stop() return case alloc := <-c.allocUpdates: // Batch the allocation updates until the timer triggers. updates[alloc.ID] = alloc - case <-timeoutCh: - // Reset the timer - timeoutTimer.Reset(allocSyncIntv) - + case <-syncTicker.C: // Fast path if there are no updates if len(updates) == 0 { continue @@ -873,8 +878,16 @@ func (c *Client) allocSync() { var resp structs.GenericResponse if err := c.RPC("Node.UpdateAlloc", &args, &resp); err != nil { c.logger.Printf("[ERR] client: failed to update allocations: %v", err) + syncTicker.Stop() + syncTicker = time.NewTicker(c.retryIntv(allocSyncRetryIntv)) + staggered = true } else { updates = make(map[string]*structs.Allocation) + if staggered { + syncTicker.Stop() + syncTicker = time.NewTicker(allocSyncIntv) + staggered = false + } } } }