Skip to content

Commit

Permalink
address feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
dadgar committed Feb 22, 2016
1 parent a6eb1ec commit 48eff00
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 11 deletions.
4 changes: 0 additions & 4 deletions client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@ import (
)

const (
// allocSyncRetryIntv is the interval on which we retry updating
// the status of the allocation
allocSyncRetryIntv = 15 * time.Second

// taskReceivedSyncLimit is how long the client will wait before sending
// that a task was received to the server. The client does not immediately
// send that the task was received to the server because another transistion
Expand Down
27 changes: 20 additions & 7 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ const (
// allocSyncIntv is the batching period of allocation updates before they
// are synced with the server.
allocSyncIntv = 200 * time.Millisecond

// allocSyncRetryIntv is the interval on which we retry updating
// the status of the allocation
allocSyncRetryIntv = 5 * time.Second
)

// DefaultConfig returns the default configuration
Expand Down Expand Up @@ -834,26 +838,27 @@ func (c *Client) updateAllocStatus(alloc *structs.Allocation) {
stripped.TaskStates = alloc.TaskStates
stripped.ClientStatus = alloc.ClientStatus
stripped.ClientDescription = alloc.ClientDescription
c.allocUpdates <- stripped
select {
case c.allocUpdates <- stripped:
case <-c.shutdownCh:
}
}

// allocSync is a long lived function that batches allocation updates to the
// server.
func (c *Client) allocSync() {
timeoutTimer := time.NewTimer(allocSyncIntv)
timeoutCh := timeoutTimer.C
staggered := false
syncTicker := time.NewTicker(allocSyncIntv)
updates := make(map[string]*structs.Allocation)
for {
select {
case <-c.shutdownCh:
syncTicker.Stop()
return
case alloc := <-c.allocUpdates:
// Batch the allocation updates until the timer triggers.
updates[alloc.ID] = alloc
case <-timeoutCh:
// Reset the timer
timeoutTimer.Reset(allocSyncIntv)

case <-syncTicker.C:
// Fast path if there are no updates
if len(updates) == 0 {
continue
Expand All @@ -873,8 +878,16 @@ func (c *Client) allocSync() {
var resp structs.GenericResponse
if err := c.RPC("Node.UpdateAlloc", &args, &resp); err != nil {
c.logger.Printf("[ERR] client: failed to update allocations: %v", err)
syncTicker.Stop()
syncTicker = time.NewTicker(c.retryIntv(allocSyncRetryIntv))
staggered = true
} else {
updates = make(map[string]*structs.Allocation)
if staggered {
syncTicker.Stop()
syncTicker = time.NewTicker(allocSyncIntv)
staggered = false
}
}
}
}
Expand Down

0 comments on commit 48eff00

Please sign in to comment.