diff --git a/CHANGELOG.md b/CHANGELOG.md index 5b2170ccfac9..a071c07946ac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ IMPROVEMENTS: * client: Use ec2 CPU perf data from AWS API [[GH-7830](https://github.com/hashicorp/nomad/issues/7830)] * client: Added support for Azure fingerprinting. [[GH-8979](https://github.com/hashicorp/nomad/issues/8979)] * client: Batch state store writes to reduce disk IO. [[GH-9093](https://github.com/hashicorp/nomad/issues/9093)] + * client: Reduce rate of sending allocation updates when servers are slow. [[GH-9435](https://github.com/hashicorp/nomad/issues/9435)] * client: Added support for fingerprinting the client node's Consul segment. [[GH-7214](https://github.com/hashicorp/nomad/issues/7214)] * client: Added `NOMAD_JOB_ID` and `NOMAD_PARENT_JOB_ID` environment variables to those made available to jobs. [[GH-8967](https://github.com/hashicorp/nomad/issues/8967)] * client: Updated consul-template to v0.25.0 - config `function_blacklist` deprecated and replaced with `function_denylist` [[GH-8988](https://github.com/hashicorp/nomad/pull/8988)] diff --git a/client/client.go b/client/client.go index 9a1e8e1d04e3..2674e02dd657 100644 --- a/client/client.go +++ b/client/client.go @@ -1904,7 +1904,6 @@ func (c *Client) AllocStateUpdated(alloc *structs.Allocation) { // allocSync is a long lived function that batches allocation updates to the // server. func (c *Client) allocSync() { - staggered := false syncTicker := time.NewTicker(allocSyncIntv) updates := make(map[string]*structs.Allocation) for { @@ -1933,19 +1932,24 @@ func (c *Client) allocSync() { } var resp structs.GenericResponse - if err := c.RPC("Node.UpdateAlloc", &args, &resp); err != nil { + err := c.RPC("Node.UpdateAlloc", &args, &resp) + if err != nil { + // Error updating allocations, do *not* clear + // updates and retry after backoff c.logger.Error("error updating allocations", "error", err) syncTicker.Stop() syncTicker = time.NewTicker(c.retryIntv(allocSyncRetryIntv)) - staggered = true - } else { - updates = make(map[string]*structs.Allocation) - if staggered { - syncTicker.Stop() - syncTicker = time.NewTicker(allocSyncIntv) - staggered = false - } + continue } + + // Successfully updated allocs, reset map and ticker. + // Always reset ticker to give loop time to receive + // alloc updates. If the RPC took the ticker interval + // we may call it in a tight loop before draining + // buffered updates. + updates = make(map[string]*structs.Allocation, len(updates)) + syncTicker.Stop() + syncTicker = time.NewTicker(allocSyncIntv) } } } diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index f795d1bd2c3a..36a18a26f640 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -1144,8 +1144,13 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene updates := n.updates evals := n.evals future := n.updateFuture - n.updates = nil - n.evals = nil + + // Assume future update patterns will be similar to + // current batch and set cap appropriately to avoid + // slice resizing. + n.updates = make([]*structs.Allocation, 0, len(updates)) + n.evals = make([]*structs.Evaluation, 0, len(evals)) + n.updateFuture = nil n.updateTimer = nil n.updatesLock.Unlock()